From cbc1b123118c2f9dd27e0b8ac64adee3a1b7a803 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Tue, 21 Aug 2018 14:04:06 +0200 Subject: [PATCH] Do IO after event directly in the network thread, don't queue an event in a separate task. --- lib/isc/unix/socket.c | 307 ++++++------------------------------------ 1 file changed, 38 insertions(+), 269 deletions(-) diff --git a/lib/isc/unix/socket.c b/lib/isc/unix/socket.c index 2fe09ce0c3..248a2dd7f7 100644 --- a/lib/isc/unix/socket.c +++ b/lib/isc/unix/socket.c @@ -348,20 +348,9 @@ struct isc__socket { ISC_LIST(isc_socket_newconnev_t) accept_list; ISC_LIST(isc_socket_connev_t) connect_list; - /* - * Internal events. Posted when a descriptor is readable or - * writable. These are statically allocated and never freed. - * They will be set to non-purgable before use. - */ - intev_t readable_ev; - intev_t writable_ev; - isc_sockaddr_t peer_address; /* remote address */ - unsigned int pending_recv : 1, - pending_send : 1, - pending_accept : 1, - listener : 1, /* listener socket */ + unsigned int listener : 1, /* listener socket */ connected : 1, connecting : 1, /* connect pending */ bound : 1, /* bound to local addr */ @@ -459,10 +448,10 @@ static void free_socket(isc__socket_t **); static isc_result_t allocate_socket(isc__socketmgr_t *, isc_sockettype_t, isc__socket_t **); static void destroy(isc__socket_t **); -static void internal_accept(isc_task_t *, isc_event_t *); -static void internal_connect(isc_task_t *, isc_event_t *); -static void internal_recv(isc_task_t *, isc_event_t *); -static void internal_send(isc_task_t *, isc_event_t *); +static void internal_accept(isc__socket_t *); +static void internal_connect(isc__socket_t *); +static void internal_recv(isc__socket_t *); +static void internal_send(isc__socket_t *); static void process_cmsg(isc__socket_t *, struct msghdr *, isc_socketevent_t *); static void build_msghdr_send(isc__socket_t *, char *, isc_socketevent_t *, struct msghdr *, struct iovec *, size_t *); @@ -1883,9 +1872,6 @@ allocate_socket(isc__socketmgr_t *manager, isc_sockettype_t type, ISC_LIST_INIT(sock->send_list); ISC_LIST_INIT(sock->accept_list); ISC_LIST_INIT(sock->connect_list); - sock->pending_recv = 0; - sock->pending_send = 0; - sock->pending_accept = 0; sock->listener = 0; sock->connected = 0; sock->connecting = 0; @@ -1902,16 +1888,6 @@ allocate_socket(isc__socketmgr_t *manager, isc_sockettype_t type, goto error; } - /* - * Initialize readable and writable events. - */ - ISC_EVENT_INIT(&sock->readable_ev, sizeof(intev_t), - ISC_EVENTATTR_NOPURGE, NULL, ISC_SOCKEVENT_INTR, - NULL, sock, sock, NULL, NULL); - ISC_EVENT_INIT(&sock->writable_ev, sizeof(intev_t), - ISC_EVENTATTR_NOPURGE, NULL, ISC_SOCKEVENT_INTW, - NULL, sock, sock, NULL, NULL); - sock->common.magic = ISCAPI_SOCKET_MAGIC; sock->common.impmagic = SOCKET_MAGIC; *socketp = sock; @@ -1938,9 +1914,6 @@ free_socket(isc__socket_t **socketp) { INSIST(VALID_SOCKET(sock)); INSIST(sock->references == 0); INSIST(!sock->connecting); - INSIST(!sock->pending_recv); - INSIST(!sock->pending_send); - INSIST(!sock->pending_accept); INSIST(ISC_LIST_EMPTY(sock->recv_list)); INSIST(ISC_LIST_EMPTY(sock->send_list)); INSIST(ISC_LIST_EMPTY(sock->accept_list)); @@ -2745,9 +2718,6 @@ isc_socket_close(isc_socket_t *sock0) { REQUIRE(sock->fd >= 0 && sock->fd < (int)sock->manager->maxsocks); INSIST(!sock->connecting); - INSIST(!sock->pending_recv); - INSIST(!sock->pending_send); - INSIST(!sock->pending_accept); INSIST(ISC_LIST_EMPTY(sock->recv_list)); INSIST(ISC_LIST_EMPTY(sock->send_list)); INSIST(ISC_LIST_EMPTY(sock->accept_list)); @@ -2772,117 +2742,6 @@ isc_socket_close(isc_socket_t *sock0) { return (ISC_R_SUCCESS); } -/* - * I/O is possible on a given socket. Schedule an event to this task that - * will call an internal function to do the I/O. This will charge the - * task with the I/O operation and let our select loop handler get back - * to doing something real as fast as possible. - * - * The socket and manager must be locked before calling this function. - */ -static void -dispatch_recv(isc__socket_t *sock) { - intev_t *iev; - isc_socketevent_t *ev; - isc_task_t *sender; - - INSIST(!sock->pending_recv); - - ev = ISC_LIST_HEAD(sock->recv_list); - if (ev == NULL) - return; - socket_log(sock, NULL, EVENT, NULL, 0, 0, - "dispatch_recv: event %p -> task %p", - ev, ev->ev_sender); - sender = ev->ev_sender; - - sock->pending_recv = 1; - iev = &sock->readable_ev; - - sock->references++; - iev->ev_sender = sock; - iev->ev_action = internal_recv; - iev->ev_arg = sock; - - isc_task_send(sender, (isc_event_t **)&iev); -} - -static void -dispatch_send(isc__socket_t *sock) { - intev_t *iev; - isc_socketevent_t *ev; - isc_task_t *sender; - - INSIST(!sock->pending_send); - - ev = ISC_LIST_HEAD(sock->send_list); - if (ev == NULL) - return; - socket_log(sock, NULL, EVENT, NULL, 0, 0, - "dispatch_send: event %p -> task %p", - ev, ev->ev_sender); - sender = ev->ev_sender; - - sock->pending_send = 1; - iev = &sock->writable_ev; - - sock->references++; - iev->ev_sender = sock; - iev->ev_action = internal_send; - iev->ev_arg = sock; - - isc_task_send(sender, (isc_event_t **)&iev); -} - -/* - * Dispatch an internal accept event. - */ -static void -dispatch_accept(isc__socket_t *sock) { - intev_t *iev; - isc_socket_newconnev_t *ev; - - INSIST(!sock->pending_accept); - - /* - * Are there any done events left, or were they all canceled - * before the manager got the socket lock? - */ - ev = ISC_LIST_HEAD(sock->accept_list); - if (ev == NULL) - return; - - sock->pending_accept = 1; - iev = &sock->readable_ev; - - sock->references++; /* keep socket around for this internal event */ - iev->ev_sender = sock; - iev->ev_action = internal_accept; - iev->ev_arg = sock; - - isc_task_send(ev->ev_sender, (isc_event_t **)&iev); -} - -static void -dispatch_connect(isc__socket_t *sock) { - intev_t *iev; - isc_socket_connev_t *ev; - - iev = &sock->writable_ev; - - ev = ISC_LIST_HEAD(sock->connect_list); - INSIST(ev != NULL); /* XXX */ - - INSIST(sock->connecting); - - sock->references++; /* keep socket around for this internal event */ - iev->ev_sender = sock; - iev->ev_action = internal_connect; - iev->ev_arg = sock; - - isc_task_send(ev->ev_sender, (isc_event_t **)&iev); -} - /* * Dequeue an item off the given socket's read queue, set the result code * in the done event to the one provided, and send it to the task it was @@ -2967,8 +2826,7 @@ send_connectdone_event(isc__socket_t *sock, isc_socket_connev_t **dev) { * so just unlock and return. */ static void -internal_accept(isc_task_t *me, isc_event_t *ev) { - isc__socket_t *sock; +internal_accept(isc__socket_t *sock) { isc__socketmgr_t *manager; isc_socket_newconnev_t *dev; isc_task_t *task; @@ -2978,9 +2836,6 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { char strbuf[ISC_STRERRORSIZE]; const char *err = "accept"; - UNUSED(me); - - sock = ev->ev_sender; INSIST(VALID_SOCKET(sock)); LOCK(&sock->lock); @@ -2992,16 +2847,6 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { INSIST(VALID_MANAGER(manager)); INSIST(sock->listener); - INSIST(sock->pending_accept == 1); - sock->pending_accept = 0; - - INSIST(sock->references > 0); - sock->references--; /* the internal event is done with this socket */ - if (sock->references == 0) { - UNLOCK(&sock->lock); - destroy(&sock); - return; - } /* * Get the first item off the accept list. @@ -3135,7 +2980,7 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { * Poke watcher if there are more pending accepts. */ if (!ISC_LIST_EMPTY(sock->accept_list)) - select_poke(sock->manager, sock->fd, SELECT_POKE_ACCEPT); + watch_fd(sock->manager, sock->fd, SELECT_POKE_ACCEPT); UNLOCK(&sock->lock); @@ -3221,7 +3066,7 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { return; soft_error: - select_poke(sock->manager, sock->fd, SELECT_POKE_ACCEPT); + watch_fd(sock->manager, sock->fd, SELECT_POKE_ACCEPT); UNLOCK(&sock->lock); inc_stats(manager->stats, sock->statsindex[STATID_ACCEPTFAIL]); @@ -3229,36 +3074,26 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { } static void -internal_recv(isc_task_t *me, isc_event_t *ev) { +internal_recv(isc__socket_t *sock) { isc_socketevent_t *dev; - isc__socket_t *sock; - INSIST(ev->ev_type == ISC_SOCKEVENT_INTR); - - sock = ev->ev_sender; INSIST(VALID_SOCKET(sock)); LOCK(&sock->lock); - socket_log(sock, NULL, IOEVENT, - isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_INTERNALRECV, - "internal_recv: task %p got event %p", me, ev); - - INSIST(sock->pending_recv == 1); - sock->pending_recv = 0; - - INSIST(sock->references > 0); - sock->references--; /* the internal event is done with this socket */ - if (sock->references == 0) { + dev = ISC_LIST_HEAD(sock->recv_list); + if (dev == NULL) { UNLOCK(&sock->lock); - destroy(&sock); return; } + socket_log(sock, NULL, IOEVENT, + isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_INTERNALRECV, + "internal_recv: event %p -> task %p", dev, dev->ev_sender); + /* * Try to do as much I/O as possible on this socket. There are no * limits here, currently. */ - dev = ISC_LIST_HEAD(sock->recv_list); while (dev != NULL) { switch (doio_recv(sock, dev)) { case DOIO_SOFT: @@ -3288,45 +3123,31 @@ internal_recv(isc_task_t *me, isc_event_t *ev) { poke: if (!ISC_LIST_EMPTY(sock->recv_list)) - select_poke(sock->manager, sock->fd, SELECT_POKE_READ); + watch_fd(sock->manager, sock->fd, SELECT_POKE_READ); UNLOCK(&sock->lock); } static void -internal_send(isc_task_t *me, isc_event_t *ev) { +internal_send(isc__socket_t *sock) { isc_socketevent_t *dev; - isc__socket_t *sock; - INSIST(ev->ev_type == ISC_SOCKEVENT_INTW); - - /* - * Find out what socket this is and lock it. - */ - sock = (isc__socket_t *)ev->ev_sender; INSIST(VALID_SOCKET(sock)); LOCK(&sock->lock); - socket_log(sock, NULL, IOEVENT, - isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_INTERNALSEND, - "internal_send: task %p got event %p", me, ev); - - INSIST(sock->pending_send == 1); - sock->pending_send = 0; - - INSIST(sock->references > 0); - sock->references--; /* the internal event is done with this socket */ - if (sock->references == 0) { + dev = ISC_LIST_HEAD(sock->send_list); + if (dev == NULL) { UNLOCK(&sock->lock); - destroy(&sock); return; } + socket_log(sock, NULL, EVENT, NULL, 0, 0, + "internal_send: event %p -> task %p", + dev, dev->ev_sender); /* * Try to do as much I/O as possible on this socket. There are no * limits here, currently. */ - dev = ISC_LIST_HEAD(sock->send_list); while (dev != NULL) { switch (doio_send(sock, dev)) { case DOIO_SOFT: @@ -3343,7 +3164,7 @@ internal_send(isc_task_t *me, isc_event_t *ev) { poke: if (!ISC_LIST_EMPTY(sock->send_list)) - select_poke(sock->manager, sock->fd, SELECT_POKE_WRITE); + watch_fd(sock->manager, sock->fd, SELECT_POKE_WRITE); UNLOCK(&sock->lock); } @@ -3357,7 +3178,6 @@ process_fd(isc__socketmgr_t *manager, int fd, bool readable, bool writeable) { isc__socket_t *sock; - bool unlock_sock; bool unwatch_read = false, unwatch_write = false; int lockid = FDLOCK_ID(fd); @@ -3374,19 +3194,20 @@ process_fd(isc__socketmgr_t *manager, int fd, bool readable, } sock = manager->fds[fd]; - unlock_sock = false; + LOCK(&sock->lock); + sock->references++; + UNLOCK(&sock->lock); + if (readable) { if (sock == NULL) { unwatch_read = true; goto check_write; } - unlock_sock = true; - LOCK(&sock->lock); if (!SOCK_DEAD(sock)) { if (sock->listener) - dispatch_accept(sock); + internal_accept(sock); else - dispatch_recv(sock); + internal_recv(sock); } unwatch_read = true; } @@ -3396,20 +3217,14 @@ check_write: unwatch_write = true; goto unlock_fd; } - if (!unlock_sock) { - unlock_sock = true; - LOCK(&sock->lock); - } if (!SOCK_DEAD(sock)) { if (sock->connecting) - dispatch_connect(sock); + internal_connect(sock); else - dispatch_send(sock); + internal_send(sock); } unwatch_write = true; } - if (unlock_sock) - UNLOCK(&sock->lock); unlock_fd: UNLOCK(&manager->fdlock[lockid]); @@ -3417,7 +3232,9 @@ check_write: (void)unwatch_fd(manager, fd, SELECT_POKE_READ); if (unwatch_write) (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE); - + LOCK(&sock->lock); + sock->references--; + UNLOCK(&sock->lock); } #ifdef USE_KQUEUE @@ -4288,7 +4105,7 @@ socket_recv(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, * Enqueue the request. If the socket was previously not being * watched, poke the watcher to start paying attention to it. */ - if (ISC_LIST_EMPTY(sock->recv_list) && !sock->pending_recv) + if (ISC_LIST_EMPTY(sock->recv_list)) select_poke(sock->manager, sock->fd, SELECT_POKE_READ); ISC_LIST_ENQUEUE(sock->recv_list, dev, ev_link); @@ -4435,8 +4252,7 @@ socket_send(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, * not being watched, poke the watcher to start * paying attention to it. */ - if (ISC_LIST_EMPTY(sock->send_list) && - !sock->pending_send) + if (ISC_LIST_EMPTY(sock->send_list)) select_poke(sock->manager, sock->fd, SELECT_POKE_WRITE); ISC_LIST_ENQUEUE(sock->send_list, dev, ev_link); @@ -5168,8 +4984,7 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr, * Called when a socket with a pending connect() finishes. */ static void -internal_connect(isc_task_t *me, isc_event_t *ev) { - isc__socket_t *sock; +internal_connect(isc__socket_t *sock) { isc_socket_connev_t *dev; int cc; isc_result_t result; @@ -5177,26 +4992,10 @@ internal_connect(isc_task_t *me, isc_event_t *ev) { char strbuf[ISC_STRERRORSIZE]; char peerbuf[ISC_SOCKADDR_FORMATSIZE]; - UNUSED(me); - INSIST(ev->ev_type == ISC_SOCKEVENT_INTW); - - sock = ev->ev_sender; INSIST(VALID_SOCKET(sock)); LOCK(&sock->lock); - /* - * When the internal event was sent the reference count was bumped - * to keep the socket around for us. Decrement the count here. - */ - INSIST(sock->references > 0); - sock->references--; - if (sock->references == 0) { - UNLOCK(&sock->lock); - destroy(&sock); - return; - } - /* * Get the first item off the connect list. * If it is empty, unlock the socket and return. @@ -5228,7 +5027,7 @@ internal_connect(isc_task_t *me, isc_event_t *ev) { */ if (SOFT_ERROR(errno) || errno == EINPROGRESS) { sock->connecting = 1; - select_poke(sock->manager, sock->fd, + watch_fd(sock->manager, sock->fd, SELECT_POKE_CONNECT); UNLOCK(&sock->lock); @@ -5693,18 +5492,6 @@ isc_socketmgr_renderxml(isc_socketmgr_t *mgr0, xmlTextWriterPtr writer) { } TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "states")); - if (sock->pending_recv) - TRY0(xmlTextWriterWriteElement(writer, - ISC_XMLCHAR "state", - ISC_XMLCHAR "pending-receive")); - if (sock->pending_send) - TRY0(xmlTextWriterWriteElement(writer, - ISC_XMLCHAR "state", - ISC_XMLCHAR "pending-send")); - if (sock->pending_accept) - TRY0(xmlTextWriterWriteElement(writer, - ISC_XMLCHAR "state", - ISC_XMLCHAR "pending_accept")); if (sock->listener) TRY0(xmlTextWriterWriteElement(writer, ISC_XMLCHAR "state", @@ -5812,24 +5599,6 @@ isc_socketmgr_renderjson(isc_socketmgr_t *mgr0, json_object *stats) { CHECKMEM(states); json_object_object_add(entry, "states", states); - if (sock->pending_recv) { - obj = json_object_new_string("pending-receive"); - CHECKMEM(obj); - json_object_array_add(states, obj); - } - - if (sock->pending_send) { - obj = json_object_new_string("pending-send"); - CHECKMEM(obj); - json_object_array_add(states, obj); - } - - if (sock->pending_accept) { - obj = json_object_new_string("pending-accept"); - CHECKMEM(obj); - json_object_array_add(states, obj); - } - if (sock->listener) { obj = json_object_new_string("listener"); CHECKMEM(obj);