Compare commits

...

8 Commits

Author SHA1 Message Date
Witold Kręcicki
552baec50e bigger quota 2018-11-24 18:52:46 +00:00
Witold Kręcicki
e0175d6207 pseudosend 2018-11-24 18:07:06 +00:00
Witold Kręcicki
78813037ad More WiP 2018-11-24 18:06:50 +00:00
Witold Kręcicki
0b685d61e6 WiP4 2018-11-24 18:06:50 +00:00
Witold Kręcicki
baa2fe97a0 WiP 2018-11-24 18:06:50 +00:00
Witold Kręcicki
e67418b47f always go through event loops 2018-11-24 18:06:50 +00:00
Witold Kręcicki
d04bb1d044 Use send_list 2018-11-24 18:06:50 +00:00
Witold Kręcicki
0979230096 Get rid of recv/send/accept/connect_lists 2018-11-24 18:06:50 +00:00
8 changed files with 426 additions and 210 deletions

View File

@@ -89,6 +89,12 @@ isc_quota_release(isc_quota_t *quota);
* Release one unit of quota.
*/
isc_result_t
isc_quota_release_verbose(isc_quota_t *quota);
/*%<
* Release one unit of quota and return the result.
*/
isc_result_t
isc_quota_attach(isc_quota_t *quota, isc_quota_t **p);
/*%<
@@ -103,6 +109,9 @@ isc_quota_detach(isc_quota_t **p);
* quota.
*/
isc_result_t
isc_quota_detach_verbose(isc_quota_t **p);
ISC_LANG_ENDDECLS
#endif /* ISC_QUOTA_H */

View File

@@ -1038,6 +1038,15 @@ isc_socketmgr_renderjson(isc_socketmgr_t *mgr, json_object *stats);
typedef isc_result_t
(*isc_socketmgrcreatefunc_t)(isc_mem_t *mctx, isc_socketmgr_t **managerp);
typedef isc_result_t
(*isc_socketevent_factory_t)(void* arg, isc_socketevent_t **ret);
isc_result_t
isc_socket_udpsubscribe(isc_socket_t *sock, isc_socketevent_factory_t evf, void* arg);
void
isc_socket_udpsubscription_toggle(isc_socket_t *usock, bool on);
ISC_LANG_ENDDECLS
#endif /* ISC_SOCKET_H */

View File

@@ -55,13 +55,15 @@ isc_quota_reserve(isc_quota_t *quota) {
isc_result_t result;
LOCK(&quota->lock);
if (quota->max == 0 || quota->used < quota->max) {
if (quota->soft == 0 || quota->used < quota->soft)
if (quota->soft == 0 || quota->used < quota->soft) {
result = ISC_R_SUCCESS;
else
} else {
result = ISC_R_SOFTQUOTA;
}
quota->used++;
} else
} else {
result = ISC_R_QUOTA;
}
UNLOCK(&quota->lock);
return (result);
}
@@ -74,14 +76,34 @@ isc_quota_release(isc_quota_t *quota) {
UNLOCK(&quota->lock);
}
isc_result_t
isc_quota_release_verbose(isc_quota_t *quota) {
isc_result_t result;
LOCK(&quota->lock);
INSIST(quota->used > 0);
quota->used--;
if (quota->max == 0 || quota->used <= quota->max) {
if (quota->soft == 0 || quota->used <= quota->soft) {
result = ISC_R_SUCCESS;
} else {
result = ISC_R_SOFTQUOTA;
}
} else {
result = ISC_R_QUOTA;
}
UNLOCK(&quota->lock);
return (result);
}
isc_result_t
isc_quota_attach(isc_quota_t *quota, isc_quota_t **p)
{
isc_result_t result;
INSIST(p != NULL && *p == NULL);
result = isc_quota_reserve(quota);
if (result == ISC_R_SUCCESS || result == ISC_R_SOFTQUOTA)
if (result == ISC_R_SUCCESS || result == ISC_R_SOFTQUOTA) {
*p = quota;
}
return (result);
}
@@ -92,3 +114,13 @@ isc_quota_detach(isc_quota_t **p)
isc_quota_release(*p);
*p = NULL;
}
isc_result_t
isc_quota_detach_verbose(isc_quota_t **p)
{
isc_result_t result;
INSIST(p != NULL && *p != NULL);
result = isc_quota_release_verbose(*p);
*p = NULL;
return (result);
}

View File

@@ -345,13 +345,16 @@ struct isc__socket {
int fd;
int pf;
int threadid;
char name[16];
void * tag;
char name[16];
void * tag;
bool recv_subscribed;
isc_socketevent_factory_t recv_subscriber;
void *recv_subscriber_arg;
ISC_LIST(isc_socketevent_t) send_list;
ISC_LIST(isc_socketevent_t) recv_list;
ISC_LIST(isc_socket_newconnev_t) accept_list;
ISC_LIST(isc_socket_connev_t) connect_list;
isc_socketevent_t* recv_event;
isc_socket_newconnev_t* accept_event;
isc_socket_connev_t* connect_event;
isc_sockaddr_t peer_address; /* remote address */
@@ -685,6 +688,7 @@ dec_stats(isc_stats_t *stats, isc_statscounter_t counterid) {
static inline isc_result_t
watch_fd(isc__socketthread_t *thread, int fd, int msg) {
isc_result_t result = ISC_R_SUCCESS;
//printf("WATCH %d\n", fd);
#ifdef USE_KQUEUE
struct kevent evchange;
@@ -771,7 +775,7 @@ watch_fd(isc__socketthread_t *thread, int fd, int msg) {
static inline isc_result_t
unwatch_fd(isc__socketthread_t *thread, int fd, int msg) {
isc_result_t result = ISC_R_SUCCESS;
//printf("UNWATCH %d\n", fd);
#ifdef USE_KQUEUE
struct kevent evchange;
@@ -1663,8 +1667,10 @@ doio_recv(isc__socket_t *sock, isc_socketevent_t *dev) {
* If we read less than we expected, update counters,
* and let the upper layer poke the descriptor.
*/
if (((size_t)cc != read_count) && (dev->n < dev->minimum))
if (((size_t)cc != read_count) && (dev->n < dev->minimum)) {
//printf("Incomplete read\n");
return (DOIO_SOFT);
}
/*
* Full reads are posted, or partials if partials are ok.
@@ -1866,10 +1872,10 @@ destroy(isc__socket_t **sockp) {
socket_log(sock, NULL, CREATION, isc_msgcat, ISC_MSGSET_SOCKET,
ISC_MSG_DESTROYING, "destroying");
INSIST(ISC_LIST_EMPTY(sock->connect_list));
INSIST(ISC_LIST_EMPTY(sock->accept_list));
INSIST(ISC_LIST_EMPTY(sock->recv_list));
INSIST(ISC_LIST_EMPTY(sock->send_list));
INSIST(sock->connect_event == NULL);
INSIST(sock->accept_event == NULL);
INSIST(sock->recv_event == NULL);
INSIST(sock->recv_event == NULL);
INSIST(sock->fd >= -1 && sock->fd < (int)manager->maxsocks);
if (sock->fd >= 0) {
@@ -1920,12 +1926,14 @@ allocate_socket(isc__socketmgr_t *manager, isc_sockettype_t type,
sock->tag = NULL;
/*
* Set up list of readers and writers to be initially empty.
* Set up readers and writers to be initially empty.
*/
ISC_LIST_INIT(sock->recv_list);
ISC_LIST_INIT(sock->send_list);
ISC_LIST_INIT(sock->accept_list);
ISC_LIST_INIT(sock->connect_list);
sock->recv_event = NULL;
sock->accept_event = NULL;
sock->connect_event = NULL;
sock->recv_subscribed = false;
sock->recv_subscriber = NULL;
sock->listener = 0;
sock->connected = 0;
sock->connecting = 0;
@@ -1958,10 +1966,10 @@ free_socket(isc__socket_t **socketp) {
INSIST(VALID_SOCKET(sock));
INSIST(isc_refcount_current(&sock->references) == 0);
INSIST(!sock->connecting);
INSIST(ISC_LIST_EMPTY(sock->recv_list));
INSIST(ISC_LIST_EMPTY(sock->send_list));
INSIST(ISC_LIST_EMPTY(sock->accept_list));
INSIST(ISC_LIST_EMPTY(sock->connect_list));
INSIST(sock->connect_event == NULL);
INSIST(sock->accept_event == NULL);
INSIST(sock->recv_event == NULL);
INSIST(sock->recv_event == NULL);
INSIST(!ISC_LINK_LINKED(sock, link));
sock->common.magic = 0;
@@ -2761,10 +2769,10 @@ isc_socket_close(isc_socket_t *sock0) {
REQUIRE(sock->fd >= 0 && sock->fd < (int)sock->manager->maxsocks);
INSIST(!sock->connecting);
INSIST(ISC_LIST_EMPTY(sock->recv_list));
INSIST(ISC_LIST_EMPTY(sock->send_list));
INSIST(ISC_LIST_EMPTY(sock->accept_list));
INSIST(ISC_LIST_EMPTY(sock->connect_list));
INSIST(sock->connect_event == NULL);
INSIST(sock->accept_event == NULL);
INSIST(sock->recv_event == NULL);
INSIST(sock->recv_event == NULL);
manager = sock->manager;
thread = &manager->threads[sock->threadid];
@@ -2806,8 +2814,8 @@ send_recvdone_event(isc__socket_t *sock, isc_socketevent_t **dev) {
(*dev)->ev_sender = sock;
if (ISC_LINK_LINKED(*dev, ev_link)) {
ISC_LIST_DEQUEUE(sock->recv_list, *dev, ev_link);
if (sock->recv_event == *dev) {
sock->recv_event = NULL;
}
if (((*dev)->attributes & ISC_SOCKEVENTATTR_ATTACHED) != 0) {
@@ -2816,6 +2824,7 @@ send_recvdone_event(isc__socket_t *sock, isc_socketevent_t **dev) {
} else {
isc_task_sendto(task, (isc_event_t **)dev, sock->threadid);
}
*dev = NULL;
}
/*
@@ -2857,8 +2866,8 @@ send_connectdone_event(isc__socket_t *sock, isc_socket_connev_t **dev) {
task = (*dev)->ev_sender;
(*dev)->ev_sender = sock;
if (ISC_LINK_LINKED(*dev, ev_link)) {
ISC_LIST_DEQUEUE(sock->connect_list, *dev, ev_link);
if (sock->connect_event == *dev) {
sock->connect_event = NULL;
}
isc_task_sendtoanddetach(&task, (isc_event_t **)dev, sock->threadid);
@@ -2904,7 +2913,7 @@ internal_accept(isc__socket_t *sock) {
* Get the first item off the accept list.
* If it is empty, unlock the socket and return.
*/
dev = ISC_LIST_HEAD(sock->accept_list);
dev = sock->accept_event;
if (dev == NULL) {
unwatch_fd(thread, sock->fd, SELECT_POKE_ACCEPT);
UNLOCK(&sock->lock);
@@ -3027,14 +3036,13 @@ internal_accept(isc__socket_t *sock) {
/*
* Pull off the done event.
*/
ISC_LIST_UNLINK(sock->accept_list, dev, ev_link);
sock->accept_event = NULL;
/*
* Poke watcher if there are more pending accepts.
*/
if (ISC_LIST_EMPTY(sock->accept_list))
unwatch_fd(thread, sock->fd,
SELECT_POKE_ACCEPT);
unwatch_fd(thread, sock->fd,
SELECT_POKE_ACCEPT);
UNLOCK(&sock->lock);
@@ -3132,11 +3140,23 @@ internal_accept(isc__socket_t *sock) {
static void
internal_recv(isc__socket_t *sock) {
isc_socketevent_t *dev;
isc_result_t result;
INSIST(VALID_SOCKET(sock));
LOCK(&sock->lock);
dev = ISC_LIST_HEAD(sock->recv_list);
dev = sock->recv_event;
//printf("sock %p dev %p subscribed %d\n", sock, dev, sock->recv_subscribed);
if (dev == NULL && sock->recv_subscribed) {
result = sock->recv_subscriber(sock->recv_subscriber_arg,
&sock->recv_event);
if (result == ISC_R_QUOTA) {
// Unsubscribe
//printf("Unsubscribing %p because of %s\n", sock, isc_result_totext(result));
sock->recv_subscribed = false;
} /* XXXWPK TODO log outher failures? */
//printf("new %p\n", sock->recv_event);
}
dev = sock->recv_event;
if (dev == NULL) {
goto finish;
}
@@ -3144,40 +3164,36 @@ internal_recv(isc__socket_t *sock) {
socket_log(sock, NULL, IOEVENT,
isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_INTERNALRECV,
"internal_recv: event %p -> task %p", dev, dev->ev_sender);
/*
* Try to do as much I/O as possible on this socket. There are no
* limits here, currently.
*/
while (dev != NULL) {
switch (doio_recv(sock, dev)) {
case DOIO_SOFT:
goto finish;
case DOIO_EOF:
/*
* read of 0 means the remote end was closed.
* Run through the event queue and dispatch all
* the events with an EOF result code.
*/
do {
dev->result = ISC_R_EOF;
send_recvdone_event(sock, &dev);
dev = ISC_LIST_HEAD(sock->recv_list);
} while (dev != NULL);
goto finish;
case DOIO_SUCCESS:
case DOIO_HARD:
send_recvdone_event(sock, &dev);
break;
//printf("Receiving...\n");
result = doio_recv(sock, dev);
if (result == DOIO_EOF) {
dev->result = ISC_R_EOF;
}
dev = ISC_LIST_HEAD(sock->recv_list);
if (result != DOIO_SOFT) {
//printf("recv succ %p %s\n", sock, isc_result_totext(dev->result));
send_recvdone_event(sock, &dev);
} else {
//printf("Soft err\n");
goto finish;
}
if (dev == NULL && sock->recv_subscribed) {
//printf("RETRY RETRY\n");
result = sock->recv_subscriber(sock->recv_subscriber_arg,
&sock->recv_event);
if (result == ISC_R_QUOTA) {
// Unsubscribe
//printf("Unsubscribing %p because of %s\n", sock, isc_result_totext(result));
sock->recv_subscribed = false;
} /* XXXWPK TODO log outher failures? */
//printf("new %p %d\n", sock->recv_event, result);
}
dev = sock->recv_event;
}
finish:
if (ISC_LIST_EMPTY(sock->recv_list)) {
if (sock->recv_event == NULL && !sock->recv_subscribed) {
//printf("Unwatching\n");
unwatch_fd(&sock->manager->threads[sock->threadid], sock->fd,
SELECT_POKE_READ);
}
@@ -3195,6 +3211,7 @@ internal_send(isc__socket_t *sock) {
if (dev == NULL) {
goto finish;
}
//printf("internal send %p\n", sock);
socket_log(sock, NULL, EVENT, NULL, 0, 0,
"internal_send: event %p -> task %p",
dev, dev->ev_sender);
@@ -3203,6 +3220,7 @@ internal_send(isc__socket_t *sock) {
* Try to do as much I/O as possible on this socket. There are no
* limits here, currently.
*/
while (dev != NULL) {
switch (doio_send(sock, dev)) {
case DOIO_SOFT:
@@ -3233,6 +3251,7 @@ static void
process_fd(isc__socketthread_t *thread, int fd, bool readable,
bool writeable)
{
//printf("Process fd %d\n", fd);
isc__socket_t *sock;
int lockid = FDLOCK_ID(fd);
@@ -4057,84 +4076,6 @@ isc_socketmgr_destroy(isc_socketmgr_t **managerp) {
}
static isc_result_t
socket_recv(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task,
unsigned int flags)
{
int io_state;
bool have_lock = false;
isc_task_t *ntask = NULL;
isc_result_t result = ISC_R_SUCCESS;
dev->ev_sender = task;
if (sock->type == isc_sockettype_udp) {
io_state = doio_recv(sock, dev);
} else {
LOCK(&sock->lock);
have_lock = true;
if (ISC_LIST_EMPTY(sock->recv_list)) {
io_state = doio_recv(sock, dev);
} else {
io_state = DOIO_SOFT;
}
}
switch (io_state) {
case DOIO_SOFT:
/*
* We couldn't read all or part of the request right now, so
* queue it.
*
* Attach to socket and to task
*/
isc_task_attach(task, &ntask);
dev->attributes |= ISC_SOCKEVENTATTR_ATTACHED;
if (!have_lock) {
LOCK(&sock->lock);
have_lock = true;
}
/*
* Enqueue the request. If the socket was previously not being
* watched, poke the watcher to start paying attention to it.
*/
bool do_poke = ISC_LIST_EMPTY(sock->recv_list);
ISC_LIST_ENQUEUE(sock->recv_list, dev, ev_link);
if (do_poke) {
select_poke(sock->manager, sock->threadid, sock->fd,
SELECT_POKE_READ);
}
socket_log(sock, NULL, EVENT, NULL, 0, 0,
"socket_recv: event %p -> task %p",
dev, ntask);
if ((flags & ISC_SOCKFLAG_IMMEDIATE) != 0) {
result = ISC_R_INPROGRESS;
}
break;
case DOIO_EOF:
dev->result = ISC_R_EOF;
/* fallthrough */
case DOIO_HARD:
case DOIO_SUCCESS:
if ((flags & ISC_SOCKFLAG_IMMEDIATE) == 0) {
send_recvdone_event(sock, &dev);
}
break;
}
if (have_lock) {
UNLOCK(&sock->lock);
}
return (result);
}
isc_result_t
isc_socket_recv(isc_socket_t *sock0, isc_region_t *region,
@@ -4187,7 +4128,31 @@ isc_socket_recv2(isc_socket_t *sock0, isc_region_t *region,
event->minimum = minimum;
}
return (socket_recv(sock, event, task, flags));
isc_task_t *ntask = NULL;
isc_result_t result = ISC_R_SUCCESS;
event->ev_sender = task;
LOCK(&sock->lock);
INSIST(sock->recv_event == NULL);
sock->recv_event = event;
UNLOCK(&sock->lock);
isc_task_attach(task, &ntask);
event->attributes |= ISC_SOCKEVENTATTR_ATTACHED;
select_poke(sock->manager, sock->threadid, sock->fd,
SELECT_POKE_READ);
socket_log(sock, NULL, EVENT, NULL, 0, 0,
"socket_recv: event %p -> task %p",
event, ntask);
if ((flags & ISC_SOCKFLAG_IMMEDIATE) != 0) {
result = ISC_R_INPROGRESS;
}
return (result);
}
static isc_result_t
@@ -4227,7 +4192,6 @@ socket_send(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task,
} else {
LOCK(&sock->lock);
have_lock = true;
if (ISC_LIST_EMPTY(sock->send_list)) {
io_state = doio_send(sock, dev);
} else {
@@ -4276,6 +4240,7 @@ socket_send(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task,
case DOIO_HARD:
case DOIO_SUCCESS:
//printf("send immediate %p\n", sock);
if ((flags & ISC_SOCKFLAG_IMMEDIATE) == 0) {
send_senddone_event(sock, &dev);
}
@@ -4776,13 +4741,13 @@ isc_socket_accept(isc_socket_t *sock0,
isc_task_t *ntask = NULL;
isc__socket_t *nsock;
isc_result_t result;
bool do_poke = false;
REQUIRE(VALID_SOCKET(sock));
manager = sock->manager;
REQUIRE(VALID_MANAGER(manager));
LOCK(&sock->lock);
INSIST(sock->connect_event == NULL);
REQUIRE(sock->listener);
@@ -4829,12 +4794,9 @@ isc_socket_accept(isc_socket_t *sock0,
* is no race condition. We will keep the lock for such a short
* bit of time waking it up now or later won't matter all that much.
*/
do_poke = ISC_LIST_EMPTY(sock->accept_list);
ISC_LIST_ENQUEUE(sock->accept_list, dev, ev_link);
if (do_poke) {
select_poke(manager, sock->threadid, sock->fd,
SELECT_POKE_ACCEPT);
}
sock->accept_event = dev;
select_poke(manager, sock->threadid, sock->fd,
SELECT_POKE_ACCEPT);
UNLOCK(&sock->lock);
return (ISC_R_SUCCESS);
}
@@ -4864,6 +4826,7 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr,
return (ISC_R_MULTICAST);
LOCK(&sock->lock);
INSIST(sock->connect_event == NULL);
dev = (isc_socket_connev_t *)isc_event_allocate(manager->mctx, sock,
ISC_SOCKEVENT_CONNECT,
@@ -4987,9 +4950,8 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr,
* is no race condition. We will keep the lock for such a short
* bit of time waking it up now or later won't matter all that much.
*/
bool do_poke = ISC_LIST_EMPTY(sock->connect_list);
ISC_LIST_ENQUEUE(sock->connect_list, dev, ev_link);
if (do_poke && !sock->connecting) {
sock->connect_event = dev;
if (!sock->connecting) {
sock->connecting = 1;
select_poke(manager, sock->threadid, sock->fd,
SELECT_POKE_CONNECT);
@@ -5019,7 +4981,7 @@ internal_connect(isc__socket_t *sock) {
* Get the first item off the connect list.
* If it is empty, unlock the socket and return.
*/
dev = ISC_LIST_HEAD(sock->connect_list);
dev = sock->connect_event;
if (dev == NULL) {
INSIST(!sock->connecting);
goto finish;
@@ -5089,11 +5051,8 @@ internal_connect(isc__socket_t *sock) {
sock->bound = 1;
}
do {
dev->result = result;
send_connectdone_event(sock, &dev);
dev = ISC_LIST_HEAD(sock->connect_list);
} while (dev != NULL);
dev->result = result;
send_connectdone_event(sock, &dev);
finish:
unwatch_fd(&sock->manager->threads[sock->threadid], sock->fd,
@@ -5189,22 +5148,16 @@ isc_socket_cancel(isc_socket_t *sock0, isc_task_t *task, unsigned int how) {
* o Reset any state needed.
*/
if (((how & ISC_SOCKCANCEL_RECV) != 0)
&& !ISC_LIST_EMPTY(sock->recv_list)) {
&& sock->recv_event != NULL) {
isc_socketevent_t *dev;
isc_socketevent_t *next;
isc_task_t *current_task;
dev = ISC_LIST_HEAD(sock->recv_list);
dev = sock->recv_event;
while (dev != NULL) {
current_task = dev->ev_sender;
next = ISC_LIST_NEXT(dev, ev_link);
if ((task == NULL) || (task == current_task)) {
dev->result = ISC_R_CANCELED;
send_recvdone_event(sock, &dev);
}
dev = next;
current_task = dev->ev_sender;
if ((task == NULL) || (task == current_task)) {
dev->result = ISC_R_CANCELED;
send_recvdone_event(sock, &dev);
}
}
@@ -5219,7 +5172,6 @@ isc_socket_cancel(isc_socket_t *sock0, isc_task_t *task, unsigned int how) {
while (dev != NULL) {
current_task = dev->ev_sender;
next = ISC_LIST_NEXT(dev, ev_link);
if ((task == NULL) || (task == current_task)) {
dev->result = ISC_R_CANCELED;
send_senddone_event(sock, &dev);
@@ -5229,54 +5181,41 @@ isc_socket_cancel(isc_socket_t *sock0, isc_task_t *task, unsigned int how) {
}
if (((how & ISC_SOCKCANCEL_ACCEPT) != 0)
&& !ISC_LIST_EMPTY(sock->accept_list)) {
&& sock->accept_event != NULL) {
isc_socket_newconnev_t *dev;
isc_socket_newconnev_t *next;
isc_task_t *current_task;
dev = ISC_LIST_HEAD(sock->accept_list);
while (dev != NULL) {
current_task = dev->ev_sender;
next = ISC_LIST_NEXT(dev, ev_link);
dev = sock->accept_event;
current_task = dev->ev_sender;
if ((task == NULL) || (task == current_task)) {
if ((task == NULL) || (task == current_task)) {
sock->accept_event = NULL;
ISC_LIST_UNLINK(sock->accept_list, dev,
ev_link);
NEWCONNSOCK(dev)->references--;
free_socket((isc__socket_t **)&dev->newsocket);
NEWCONNSOCK(dev)->references--;
free_socket((isc__socket_t **)&dev->newsocket);
dev->result = ISC_R_CANCELED;
dev->ev_sender = sock;
isc_task_sendtoanddetach(&current_task,
ISC_EVENT_PTR(&dev), sock->threadid);
}
dev = next;
dev->result = ISC_R_CANCELED;
dev->ev_sender = sock;
isc_task_sendtoanddetach(&current_task,
ISC_EVENT_PTR(&dev), sock->threadid);
}
}
if (((how & ISC_SOCKCANCEL_CONNECT) != 0)
&& !ISC_LIST_EMPTY(sock->connect_list)) {
&& sock->connect_event != NULL) {
isc_socket_connev_t *dev;
isc_socket_connev_t *next;
isc_task_t *current_task;
INSIST(sock->connecting);
sock->connecting = 0;
dev = ISC_LIST_HEAD(sock->connect_list);
dev = sock->connect_event;
while (dev != NULL) {
current_task = dev->ev_sender;
next = ISC_LIST_NEXT(dev, ev_link);
current_task = dev->ev_sender;
if ((task == NULL) || (task == current_task)) {
dev->result = ISC_R_CANCELED;
send_connectdone_event(sock, &dev);
}
dev = next;
if ((task == NULL) || (task == current_task)) {
dev->result = ISC_R_CANCELED;
send_connectdone_event(sock, &dev);
}
}
@@ -5722,3 +5661,34 @@ isc_socketmgr_createinctx(isc_mem_t *mctx, isc_appctx_t *actx,
return (result);
}
isc_result_t
isc_socket_udpsubscribe(isc_socket_t *usock, isc_socketevent_factory_t evf, void* arg) {
isc__socket_t *sock = (isc__socket_t*) usock;
REQUIRE(sock->recv_subscriber == NULL);
REQUIRE(sock->recv_event == NULL);
LOCK(&sock->lock);
sock->recv_subscriber = evf;
sock->recv_subscriber_arg = arg;
sock->recv_subscribed = true;
UNLOCK(&sock->lock);
select_poke(sock->manager, sock->threadid, sock->fd,
SELECT_POKE_READ);
return (ISC_R_SUCCESS);
}
void
isc_socket_udpsubscription_toggle(isc_socket_t *usock, bool on) {
isc__socket_t *sock = (isc__socket_t*) usock;
bool old;
LOCK(&sock->lock);
REQUIRE(sock->recv_subscriber != NULL);
old = sock->recv_subscribed;
sock->recv_subscribed = on;
UNLOCK(&sock->lock);
if (!old && on) {
//printf("Re-subscribing %p\n", sock);
select_poke(sock->manager, sock->threadid, sock->fd,
SELECT_POKE_READ);
}
}

View File

@@ -156,6 +156,13 @@ struct ns_clientmgr {
#define MANAGER_MAGIC ISC_MAGIC('N', 'S', 'C', 'm')
#define VALID_MANAGER(m) ISC_MAGIC_VALID(m, MANAGER_MAGIC)
typedef struct create_udp_socketevent_arg {
ns_clientmgr_t *manager;
isc_socket_t *socket;
ns_interface_t *interface;
int disp;
} create_udp_socketevent_arg_t;
/*!
* Client object states. Ordering is significant: higher-numbered
* states are generally "more active", meaning that the client can
@@ -247,6 +254,8 @@ static isc_result_t get_worker(ns_clientmgr_t *manager, ns_interface_t *ifp,
static void compute_cookie(ns_client_t *client, uint32_t when,
uint32_t nonce, const unsigned char *secret,
isc_buffer_t *buf);
static isc_result_t
ns__create_udp_socketevent(void* argp, isc_socketevent_t **sockevp);
void
ns_client_recursing(ns_client_t *client) {
@@ -524,6 +533,14 @@ exit_check(ns_client_t *client) {
if (client->nctls > 0)
return (true);
/* We need to do it before detaching interface */
if (client->udpinflightquota != NULL) {
if (isc_quota_detach_verbose(&client->udpinflightquota) == ISC_R_SUCCESS) {
/* We are back 'in quota', can reenable subscribtion to socket */
isc_socket_udpsubscription_toggle(client->udpsocket, true);
}
}
/* Deactivate the client. */
if (client->interface)
ns_interface_detach(&client->interface);
@@ -532,7 +549,8 @@ exit_check(ns_client_t *client) {
INSIST(client->recursionquota == NULL);
if (client->tcplistener != NULL)
isc_socket_detach(&client->tcplistener);
if (client->udpsocket != NULL)
isc_socket_detach(&client->udpsocket);
@@ -772,7 +790,7 @@ ns_client_endrequest(ns_client_t *client) {
ns_stats_decrement(client->sctx->nsstats,
ns_statscounter_recursclients);
}
/*
* Clear all client attributes that are specific to
* the request; that's all except the TCP flag.
@@ -1049,6 +1067,51 @@ ns_client_sendraw(ns_client_t *client, dns_message_t *message) {
ns_client_next(client, result);
}
#ifdef PSEUDOSEND
static void
client_pseudosend(ns_client_t *client, uint16_t id) {
isc_result_t result;
unsigned char *data;
isc_buffer_t buffer;
isc_buffer_t tcpbuffer;
isc_region_t r;
unsigned char sendbuf[SEND_BUFFER_SIZE];
result = client_allocsendbuf(client, &buffer, &tcpbuffer, 0,
sendbuf, &data);
if (result != ISC_R_SUCCESS) {
goto done;
}
isc_buffer_putuint16(&buffer, id);
isc_buffer_putuint8(&buffer, 0x81);
isc_buffer_putuint8(&buffer, 0x80);
isc_buffer_putuint16(&buffer, 0);
isc_buffer_putuint16(&buffer, 0);
isc_buffer_putuint16(&buffer, 0);
isc_buffer_putuint16(&buffer, 0);
if (client->sendcb != NULL) {
client->sendcb(&buffer);
} else if (TCP_CLIENT(client)) {
isc_buffer_usedregion(&buffer, &r);
isc_buffer_putuint16(&tcpbuffer, (uint16_t) r.length);
isc_buffer_add(&tcpbuffer, r.length);
result = client_sendpkg(client, &tcpbuffer);
} else {
result = client_sendpkg(client, &buffer);
}
if (result == ISC_R_SUCCESS) {
return;
}
done:
if (client->tcpbuf != NULL) {
isc_mem_put(client->mctx, client->tcpbuf, TCP_BUFFER_SIZE);
client->tcpbuf = NULL;
}
ns_client_next(client, result);
}
#endif
static void
client_send(ns_client_t *client) {
isc_result_t result;
@@ -2335,6 +2398,20 @@ ns__client_request(isc_task_t *task, isc_event_t *event) {
return;
}
#ifdef PSEUDOSEND
result = dns_message_peekheader(buffer, &id, &flags);
if (result != ISC_R_SUCCESS) {
/*
* There isn't enough header to determine whether
* this was a request or a response. Drop it.
*/
ns_client_next(client, result);
return;
}
client_pseudosend(client, id);
return;
#endif
isc_netaddr_fromsockaddr(&netaddr, &client->peeraddr);
#if NS_CLIENT_DROPPORT
@@ -3045,6 +3122,7 @@ client_create(ns_clientmgr_t *manager, ns_client_t **clientp) {
client->recursionquota = NULL;
client->interface = NULL;
client->peeraddr_valid = false;
client->udpinflightquota = NULL;
dns_ecs_init(&client->ecs);
client->filter_aaaa = dns_aaaa_ok;
client->needshutdown = (client->sctx->options & NS_SERVER_CLIENTTEST);
@@ -3710,6 +3788,26 @@ ns_clientmgr_createclients(ns_clientmgr_t *manager, unsigned int n,
return (result);
}
isc_result_t
ns_clientmgr_subscribe_clients(ns_clientmgr_t *manager, unsigned int n, ns_interface_t *ifp) {
isc_result_t result = ISC_R_SUCCESS;
unsigned int disp;
create_udp_socketevent_arg_t *arg;
for (disp = 0; disp < n; disp++) {
arg = malloc(sizeof(*arg));
arg->manager = manager;
arg->socket = dns_dispatch_getsocket(ifp->udpdispatch[disp]);
arg->interface = ifp;
arg->disp = disp;
result = isc_socket_udpsubscribe(arg->socket, &ns__create_udp_socketevent, arg);
if (result != ISC_R_SUCCESS) {
break;
}
}
return (result);
}
isc_sockaddr_t *
ns_client_getsockaddr(ns_client_t *client) {
return (&client->peeraddr);
@@ -3997,3 +4095,87 @@ ns_client_sourceip(dns_clientinfo_t *ci, isc_sockaddr_t **addrp) {
*addrp = &client->peeraddr;
return (ISC_R_SUCCESS);
}
static isc_result_t
ns__create_udp_socketevent(void* argp, isc_socketevent_t **sockevp) {
create_udp_socketevent_arg_t *arg = (create_udp_socketevent_arg_t*)argp;
ns_clientmgr_t *manager = arg->manager;
isc_socket_t *sock = arg->socket;
isc_socketevent_t *sev;
ns_client_t *client;
isc_result_t result;
isc_quota_t* inflightquota = NULL;
MTRACE("create_udp_socketevent");
REQUIRE(manager != NULL);
if (manager->exiting) {
return (ISC_R_FAILURE);
}
result = isc_quota_attach(&arg->interface->udpinflightquota[arg->disp], &inflightquota);
if (result == ISC_R_QUOTA) {
return (ISC_R_QUOTA);
}
/*
* Allocate a client. First try to get a recycled one;
* if that fails, make a new one.
*/
client = NULL;
if ((manager->sctx->options & NS_SERVER_CLIENTTEST) == 0)
ISC_QUEUE_POP(manager->inactive, ilink, client);
if (client != NULL) {
MTRACE("recycle");
} else {
MTRACE("create new");
LOCK(&manager->lock);
result = client_create(manager, &client);
UNLOCK(&manager->lock);
if (result != ISC_R_SUCCESS) {
printf("Cant create client %s\n", isc_result_totext(result));
return (result);
}
LOCK(&manager->listlock);
ISC_LIST_APPEND(manager->clients, client, link);
UNLOCK(&manager->listlock);
}
ns_interface_attach(arg->interface, &client->interface);
client->manager = manager;
client->mortal = true;
client->state = NS_CLIENTSTATE_READY;
client->sctx = manager->sctx;
INSIST(client->udpinflightquota == NULL);
INSIST(client->recursionquota == NULL);
client->dscp = arg->interface->dscp;
client->mortal = true;
client->udpinflightquota = inflightquota;
isc_socket_attach(sock, &client->udpsocket);
INSIST(client->nctls == 0);
if (exit_check(client)) {
printf("exit _check\n");
return (ISC_R_FAILURE);
}
sev = client->recvevent;
sev->ev_sender = client->task;
sev->ev_arg = client;
sev->result = ISC_R_UNSET;
sev->n = 0;
sev->offset = 0;
sev->attributes = 0;
sev->region.base = client->recvbuf;
sev->region.length = RECV_BUFFER_SIZE;
sev->minimum = 1;
client->nrecvs++;
*sockevp = sev;
return (result);
}

View File

@@ -136,6 +136,7 @@ struct ns_client {
bool pipelined; /*%< TCP queries not in sequence */
isc_quota_t *tcpquota;
isc_quota_t *recursionquota;
isc_quota_t *udpinflightquota;
ns_interface_t *interface;
isc_sockaddr_t peeraddr;
@@ -421,6 +422,9 @@ ns__clientmgr_getclient(ns_clientmgr_t *manager, ns_interface_t *ifp,
* (Not intended for use outside this module and associated tests.)
*/
isc_result_t
ns_clientmgr_subscribe_clients(ns_clientmgr_t*, unsigned int, ns_interface_t*);
void
ns__client_request(isc_task_t *task, isc_event_t *event);
/*

View File

@@ -44,6 +44,7 @@
#include <isc/magic.h>
#include <isc/mem.h>
#include <isc/quota.h>
#include <isc/socket.h>
#include <dns/geoip.h>
@@ -82,6 +83,7 @@ struct ns_interface {
int nudpdispatch; /*%< Number of UDP dispatches */
ns_clientmgr_t * clientmgr; /*%< Client manager. */
ISC_LINK(ns_interface_t) link;
isc_quota_t udpinflightquota[100]; /*%< Number of UDP clients in flight. */
};
/***

View File

@@ -220,6 +220,7 @@ ns_interfacemgr_create(isc_mem_t *mctx,
ISC_LIST_INIT(mgr->interfaces);
ISC_LIST_INIT(mgr->listenon);
/*
* The listen-on lists are initially empty.
@@ -429,6 +430,12 @@ ns_interface_create(ns_interfacemgr_t *mgr, isc_sockaddr_t *addr,
ifp->ntcpcurrent = 0;
ifp->nudpdispatch = 0;
/* XXXWPK TODO */
for (int i=0; i<100; i++) {
isc_quota_init(&ifp->udpinflightquota[i], 150);
isc_quota_soft(&ifp->udpinflightquota[i], 120);
}
ifp->dscp = -1;
ISC_LINK_INIT(ifp, link);
@@ -491,8 +498,9 @@ ns_interface_listenudp(ns_interface_t *ifp) {
}
result = ns_clientmgr_createclients(ifp->clientmgr, ifp->nudpdispatch,
ifp, false);
result = ns_clientmgr_subscribe_clients(ifp->clientmgr,
ifp->nudpdispatch,
ifp);
if (result != ISC_R_SUCCESS) {
UNEXPECTED_ERROR(__FILE__, __LINE__,
"UDP ns_clientmgr_createclients(): %s",