From 69e6c8467c3995e8620570415b699efc33928f08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Fri, 2 Oct 2020 09:28:29 +0200 Subject: [PATCH 1/2] Change the isc__nm_tcpdns_stoplistening() to be asynchronous event The isc__nm_tcpdns_stoplistening() would call isc__nmsocket_clearcb() that would clear the .accept_cb from non-netmgr thread. Change the tcpdns_stoplistening to enqueue ievent that would get processed in the right netmgr thread to avoid locking. (cherry picked from commit d86a74d8a4c1c530baa714bbbad78fba9a4b29ab) --- lib/isc/netmgr/netmgr-int.h | 5 ++++- lib/isc/netmgr/netmgr.c | 3 +++ lib/isc/netmgr/tcp.c | 7 +++---- lib/isc/netmgr/tcpdns.c | 24 +++++++++++++++++++++++- 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index ceeb5cbb27..4d6ef88193 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -149,6 +149,7 @@ typedef enum isc__netievent_type { netievent_tcpdnssend, netievent_tcpdnsclose, + netievent_tcpdnsstop, netievent_closecb, netievent_shutdown, @@ -217,6 +218,7 @@ typedef isc__netievent__socket_t isc__netievent_startread_t; typedef isc__netievent__socket_t isc__netievent_pauseread_t; typedef isc__netievent__socket_t isc__netievent_closecb_t; typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t; +typedef isc__netievent__socket_t isc__netievent_tcpdnsstop_t; typedef struct isc__netievent__socket_req { isc__netievent_type type; @@ -787,9 +789,10 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock); void isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0); - void isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0); +void +isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0); #define isc__nm_uverr2result(x) \ isc___nm_uverr2result(x, true, __FILE__, __LINE__) diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index a8f3310b20..c9284cdc6c 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -648,6 +648,9 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_tcpdnsclose: isc__nm_async_tcpdnsclose(worker, ievent); break; + case netievent_tcpdnsstop: + isc__nm_async_tcpdnsstop(worker, ievent); + break; case netievent_closecb: isc__nm_async_closecb(worker, ievent); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 0421e1b57f..2be6c2f1dc 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -536,12 +536,11 @@ error: void isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) { - isc__netievent_tcpstop_t *ievent = NULL; - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(!isc__nm_in_netthread()); + REQUIRE(sock->type == isc_nm_tcplistener); - ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstop); + isc__netievent_tcpstop_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_tcpstop); isc__nmsocket_attach(sock, &ievent->sock); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)ievent); diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 4e20be080f..8060bbfe9a 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -392,18 +392,40 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, } void -isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock) { +isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpdnsstop_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + + UNUSED(worker); + + REQUIRE(isc__nm_in_netthread()); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcpdnslistener); + REQUIRE(sock->tid == isc_nm_tid()); atomic_store(&sock->listening, false); atomic_store(&sock->closed, true); + isc__nmsocket_clearcb(sock); if (sock->outer != NULL) { isc__nm_tcp_stoplistening(sock->outer); isc__nmsocket_detach(&sock->outer); } + + isc__nmsocket_detach(&sock); +} + +void +isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpdnslistener); + + isc__netievent_tcpdnsstop_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_tcpdnsstop); + isc__nmsocket_attach(sock, &ievent->sock); + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); } void From 53d6a11a0e8ac21583a94b283e308c4f94680844 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Fri, 18 Sep 2020 12:27:40 +0200 Subject: [PATCH 2/2] Clone the csock in accept_connection(), not in callback If we clone the csock (children socket) in TCP accept_connection() instead of passing the ssock (server socket) to the call back and cloning it there we unbreak the assumption that every socket is handled inside it's own worker thread and therefore we can get rid of (at least) callback locking. (cherry picked from commit e8b56acb49f65b5143cd3f0d6f5e863df625a4c7) --- lib/isc/netmgr/netmgr.c | 1 + lib/isc/netmgr/tcp.c | 82 +++++++++++++++++++---------------------- lib/isc/netmgr/tcpdns.c | 14 ++----- lib/isc/netmgr/udp.c | 7 +--- 4 files changed, 42 insertions(+), 62 deletions(-) diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index c9284cdc6c..13c04099f8 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -1040,6 +1040,7 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type, void isc__nmsocket_clearcb(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(!isc__nm_in_netthread() || sock->tid == isc_nm_tid()); sock->recv_cb = NULL; sock->recv_cbarg = NULL; diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 2be6c2f1dc..9c1471217c 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -431,8 +431,7 @@ void isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpchildaccept_t *ievent = (isc__netievent_tcpchildaccept_t *)ev0; - isc_nmsocket_t *ssock = ievent->sock; - isc_nmsocket_t *csock = NULL; + isc_nmsocket_t *sock = ievent->sock; isc_nmhandle_t *handle; isc_result_t result; struct sockaddr_storage ss; @@ -442,20 +441,15 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { void *accept_cbarg; REQUIRE(isc__nm_in_netthread()); - REQUIRE(ssock->type == isc_nm_tcplistener); + REQUIRE(sock->tid == isc_nm_tid()); - csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t)); - isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface); - csock->tid = isc_nm_tid(); - csock->extrahandlesize = ssock->extrahandlesize; - - csock->quota = ievent->quota; + sock->quota = ievent->quota; ievent->quota = NULL; - worker = &ssock->mgr->workers[isc_nm_tid()]; - uv_tcp_init(&worker->loop, &csock->uv_handle.tcp); + worker = &sock->mgr->workers[isc_nm_tid()]; + uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); - r = isc_uv_import(&csock->uv_handle.stream, &ievent->streaminfo); + r = isc_uv_import(&sock->uv_handle.stream, &ievent->streaminfo); if (r != 0) { isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, @@ -465,20 +459,19 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { goto error; } - r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss, + r = uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss, &(int){ sizeof(ss) }); if (r != 0) { result = isc__nm_uverr2result(r); goto error; } - result = isc_sockaddr_fromsockaddr(&csock->peer, - (struct sockaddr *)&ss); + result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss); if (result != ISC_R_SUCCESS) { goto error; } - r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss, + r = uv_tcp_getsockname(&sock->uv_handle.tcp, (struct sockaddr *)&ss, &(int){ sizeof(ss) }); if (r != 0) { result = isc__nm_uverr2result(r); @@ -490,23 +483,19 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { goto error; } - isc__nmsocket_attach(ssock, &csock->server); + handle = isc__nmhandle_get(sock, NULL, &local); - handle = isc__nmhandle_get(csock, NULL, &local); + INSIST(sock->accept_cb != NULL); + accept_cb = sock->accept_cb; + accept_cbarg = sock->accept_cbarg; - LOCK(&ssock->lock); - INSIST(ssock->accept_cb != NULL); - accept_cb = ssock->accept_cb; - accept_cbarg = ssock->accept_cbarg; - UNLOCK(&ssock->lock); - - csock->read_timeout = ssock->mgr->init; + sock->read_timeout = sock->mgr->init; accept_cb(handle, ISC_R_SUCCESS, accept_cbarg); /* - * csock is now attached to the handle. + * sock is now attached to the handle. */ - isc__nmsocket_detach(&csock); + isc__nmsocket_detach(&sock); /* * The accept callback should have attached to the handle. @@ -521,8 +510,8 @@ error: * otherwise it'd be detached later asynchronously, and clog * the quota unnecessarily. */ - if (csock->quota != NULL) { - isc_quota_detach(&csock->quota); + if (sock->quota != NULL) { + isc_quota_detach(&sock->quota); } isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, "Accepting TCP connection failed: %s", @@ -531,7 +520,7 @@ error: /* * Detach the socket properly to make sure uv_close() is called. */ - isc__nmsocket_detach(&csock); + isc__nmsocket_detach(&sock); } void @@ -616,11 +605,9 @@ readtimeout_cb(uv_timer_t *handle) { isc_quota_detach(&sock->quota); } - LOCK(&sock->lock); cb = sock->recv_cb; cbarg = sock->recv_cbarg; isc__nmsocket_clearcb(sock); - UNLOCK(&sock->lock); if (cb != NULL) { cb(sock->statichandle, ISC_R_TIMEDOUT, NULL, cbarg); @@ -637,10 +624,9 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { sock = handle->sock; - LOCK(&sock->lock); + REQUIRE(sock->tid == isc_nm_tid()); sock->recv_cb = cb; sock->recv_cbarg = cbarg; - UNLOCK(&sock->lock); ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread); ievent->sock = sock; @@ -749,12 +735,11 @@ isc__nm_tcp_resumeread(isc_nmsocket_t *sock) { isc__netievent_startread_t *ievent = NULL; REQUIRE(VALID_NMSOCK(sock)); - LOCK(&sock->lock); + REQUIRE(sock->tid == isc_nm_tid()); + if (sock->recv_cb == NULL) { - UNLOCK(&sock->lock); return (ISC_R_CANCELED); } - UNLOCK(&sock->lock); if (!atomic_load(&sock->readpaused)) { return (ISC_R_SUCCESS); @@ -784,12 +769,11 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { void *cbarg; REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(buf != NULL); - LOCK(&sock->lock); cb = sock->recv_cb; cbarg = sock->recv_cbarg; - UNLOCK(&sock->lock); if (nread >= 0) { isc_region_t region = { .base = (unsigned char *)buf->base, @@ -895,7 +879,6 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { int r, w; REQUIRE(VALID_NMSOCK(ssock)); - REQUIRE(ssock->tid == isc_nm_tid()); if (!atomic_load_relaxed(&ssock->active) || atomic_load_relaxed(&ssock->mgr->closing)) @@ -955,7 +938,18 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { /* We have an accepted TCP socket, pass it to a random worker */ w = isc_random_uniform(ssock->mgr->nworkers); event = isc__nm_get_ievent(ssock->mgr, netievent_tcpchildaccept); - event->sock = ssock; + + /* Duplicate the server socket */ + isc_nmsocket_t *csock = isc_mem_get(ssock->mgr->mctx, + sizeof(isc_nmsocket_t)); + isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface); + csock->tid = w; + csock->extrahandlesize = ssock->extrahandlesize; + isc__nmsocket_attach(ssock, &csock->server); + csock->accept_cb = ssock->accept_cb; + csock->accept_cbarg = ssock->accept_cbarg; + + event->sock = csock; event->quota = quota; r = isc_uv_export((uv_stream_t *)uvstream, &event->streaminfo); @@ -1149,16 +1143,15 @@ isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) { void isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_nm_tid()); if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL) { isc_nm_recv_cb_t cb; void *cbarg; - LOCK(&sock->lock); cb = sock->recv_cb; cbarg = sock->recv_cbarg; isc__nmsocket_clearcb(sock); - UNLOCK(&sock->lock); if (cb != NULL) { cb(sock->statichandle, ISC_R_CANCELED, NULL, cbarg); @@ -1176,16 +1169,15 @@ isc__nm_tcp_cancelread(isc_nmhandle_t *handle) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcpsocket); + REQUIRE(sock->tid == isc_nm_tid()); if (atomic_load(&sock->client)) { isc_nm_recv_cb_t cb; void *cbarg; - LOCK(&sock->lock); cb = sock->recv_cb; cbarg = sock->recv_cbarg; isc__nmsocket_clearcb(sock); - UNLOCK(&sock->lock); cb(handle, ISC_R_EOF, NULL, cbarg); } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 8060bbfe9a..283f77665e 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -119,10 +119,8 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { return (result); } - LOCK(&dnslistensock->lock); accept_cb = dnslistensock->accept_cb; accept_cbarg = dnslistensock->accept_cbarg; - UNLOCK(&dnslistensock->lock); if (accept_cb != NULL) { result = accept_cb(handle, ISC_R_SUCCESS, accept_cbarg); @@ -184,6 +182,7 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) { size_t len; REQUIRE(VALID_NMSOCK(dnssock)); + REQUIRE(dnssock->tid == isc_nm_tid()); REQUIRE(handlep != NULL && *handlep == NULL); /* @@ -214,12 +213,9 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) { listener = dnssock->listener; if (listener != NULL) { - LOCK(&listener->lock); cb = listener->recv_cb; cbarg = listener->recv_cbarg; - UNLOCK(&listener->lock); } else if (dnssock->recv_cb != NULL) { - LOCK(&dnssock->lock); cb = dnssock->recv_cb; cbarg = dnssock->recv_cbarg; /* @@ -228,7 +224,6 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) { * call to isc_nm_read() and set up a new callback. */ isc__nmsocket_clearcb(dnssock); - UNLOCK(&dnssock->lock); } if (cb != NULL) { @@ -265,6 +260,7 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, size_t len; REQUIRE(VALID_NMSOCK(dnssock)); + REQUIRE(dnssock->tid == isc_nm_tid()); REQUIRE(VALID_NMHANDLE(handle)); if (region == NULL || eresult != ISC_R_SUCCESS) { @@ -314,10 +310,8 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, uv_timer_stop(&dnssock->timer); } - LOCK(&dnssock->lock); if (atomic_load(&dnssock->sequential) || dnssock->recv_cb == NULL) { - UNLOCK(&dnssock->lock); /* * There are two reasons we might want to pause here: * - We're in sequential mode and we've received @@ -328,7 +322,6 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, isc_nm_pauseread(dnssock->outerhandle); done = true; } else { - UNLOCK(&dnssock->lock); /* * We're pipelining, so we now resume processing * packets until the clients-per-connection limit @@ -364,12 +357,10 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, REQUIRE(VALID_NM(mgr)); isc__nmsocket_init(dnslistensock, mgr, isc_nm_tcpdnslistener, iface); - LOCK(&dnslistensock->lock); dnslistensock->recv_cb = cb; dnslistensock->recv_cbarg = cbarg; dnslistensock->accept_cb = accept_cb; dnslistensock->accept_cbarg = accept_cbarg; - UNLOCK(&dnslistensock->lock); dnslistensock->extrahandlesize = extrahandlesize; /* @@ -558,6 +549,7 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) { isc_nmsocket_t *sock = ievent->sock; REQUIRE(worker->id == sock->tid); + REQUIRE(sock->tid == isc_nm_tid()); result = ISC_R_NOTCONNECTED; if (atomic_load(&sock->active) && sock->outerhandle != NULL) { diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 7056a29bf2..fd017fde9b 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -377,12 +377,7 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, region.base = (unsigned char *)buf->base; region.length = nrecv; - /* - * In tcp.c and tcpdns.c, this would need to be locked - * by sock->lock because callbacks may be set to NULL - * unexpectedly when the connection drops, but that isn't - * a factor in the UDP case. - */ + INSIST(sock->tid == isc_nm_tid()); INSIST(sock->recv_cb != NULL); cb = sock->recv_cb; cbarg = sock->recv_cbarg;