diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index ceeb5cbb27..4d6ef88193 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -149,6 +149,7 @@ typedef enum isc__netievent_type { netievent_tcpdnssend, netievent_tcpdnsclose, + netievent_tcpdnsstop, netievent_closecb, netievent_shutdown, @@ -217,6 +218,7 @@ typedef isc__netievent__socket_t isc__netievent_startread_t; typedef isc__netievent__socket_t isc__netievent_pauseread_t; typedef isc__netievent__socket_t isc__netievent_closecb_t; typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t; +typedef isc__netievent__socket_t isc__netievent_tcpdnsstop_t; typedef struct isc__netievent__socket_req { isc__netievent_type type; @@ -787,9 +789,10 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock); void isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0); - void isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0); +void +isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0); #define isc__nm_uverr2result(x) \ isc___nm_uverr2result(x, true, __FILE__, __LINE__) diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index a8f3310b20..13c04099f8 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -648,6 +648,9 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_tcpdnsclose: isc__nm_async_tcpdnsclose(worker, ievent); break; + case netievent_tcpdnsstop: + isc__nm_async_tcpdnsstop(worker, ievent); + break; case netievent_closecb: isc__nm_async_closecb(worker, ievent); @@ -1037,6 +1040,7 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type, void isc__nmsocket_clearcb(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(!isc__nm_in_netthread() || sock->tid == isc_nm_tid()); sock->recv_cb = NULL; sock->recv_cbarg = NULL; diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 0421e1b57f..9c1471217c 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -431,8 +431,7 @@ void isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpchildaccept_t *ievent = (isc__netievent_tcpchildaccept_t *)ev0; - isc_nmsocket_t *ssock = ievent->sock; - isc_nmsocket_t *csock = NULL; + isc_nmsocket_t *sock = ievent->sock; isc_nmhandle_t *handle; isc_result_t result; struct sockaddr_storage ss; @@ -442,20 +441,15 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { void *accept_cbarg; REQUIRE(isc__nm_in_netthread()); - REQUIRE(ssock->type == isc_nm_tcplistener); + REQUIRE(sock->tid == isc_nm_tid()); - csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t)); - isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface); - csock->tid = isc_nm_tid(); - csock->extrahandlesize = ssock->extrahandlesize; - - csock->quota = ievent->quota; + sock->quota = ievent->quota; ievent->quota = NULL; - worker = &ssock->mgr->workers[isc_nm_tid()]; - uv_tcp_init(&worker->loop, &csock->uv_handle.tcp); + worker = &sock->mgr->workers[isc_nm_tid()]; + uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); - r = isc_uv_import(&csock->uv_handle.stream, &ievent->streaminfo); + r = isc_uv_import(&sock->uv_handle.stream, &ievent->streaminfo); if (r != 0) { isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, @@ -465,20 +459,19 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { goto error; } - r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss, + r = uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss, &(int){ sizeof(ss) }); if (r != 0) { result = isc__nm_uverr2result(r); goto error; } - result = isc_sockaddr_fromsockaddr(&csock->peer, - (struct sockaddr *)&ss); + result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss); if (result != ISC_R_SUCCESS) { goto error; } - r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss, + r = uv_tcp_getsockname(&sock->uv_handle.tcp, (struct sockaddr *)&ss, &(int){ sizeof(ss) }); if (r != 0) { result = isc__nm_uverr2result(r); @@ -490,23 +483,19 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { goto error; } - isc__nmsocket_attach(ssock, &csock->server); + handle = isc__nmhandle_get(sock, NULL, &local); - handle = isc__nmhandle_get(csock, NULL, &local); + INSIST(sock->accept_cb != NULL); + accept_cb = sock->accept_cb; + accept_cbarg = sock->accept_cbarg; - LOCK(&ssock->lock); - INSIST(ssock->accept_cb != NULL); - accept_cb = ssock->accept_cb; - accept_cbarg = ssock->accept_cbarg; - UNLOCK(&ssock->lock); - - csock->read_timeout = ssock->mgr->init; + sock->read_timeout = sock->mgr->init; accept_cb(handle, ISC_R_SUCCESS, accept_cbarg); /* - * csock is now attached to the handle. + * sock is now attached to the handle. */ - isc__nmsocket_detach(&csock); + isc__nmsocket_detach(&sock); /* * The accept callback should have attached to the handle. @@ -521,8 +510,8 @@ error: * otherwise it'd be detached later asynchronously, and clog * the quota unnecessarily. */ - if (csock->quota != NULL) { - isc_quota_detach(&csock->quota); + if (sock->quota != NULL) { + isc_quota_detach(&sock->quota); } isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, "Accepting TCP connection failed: %s", @@ -531,17 +520,16 @@ error: /* * Detach the socket properly to make sure uv_close() is called. */ - isc__nmsocket_detach(&csock); + isc__nmsocket_detach(&sock); } void isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) { - isc__netievent_tcpstop_t *ievent = NULL; - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(!isc__nm_in_netthread()); + REQUIRE(sock->type == isc_nm_tcplistener); - ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstop); + isc__netievent_tcpstop_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_tcpstop); isc__nmsocket_attach(sock, &ievent->sock); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)ievent); @@ -617,11 +605,9 @@ readtimeout_cb(uv_timer_t *handle) { isc_quota_detach(&sock->quota); } - LOCK(&sock->lock); cb = sock->recv_cb; cbarg = sock->recv_cbarg; isc__nmsocket_clearcb(sock); - UNLOCK(&sock->lock); if (cb != NULL) { cb(sock->statichandle, ISC_R_TIMEDOUT, NULL, cbarg); @@ -638,10 +624,9 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { sock = handle->sock; - LOCK(&sock->lock); + REQUIRE(sock->tid == isc_nm_tid()); sock->recv_cb = cb; sock->recv_cbarg = cbarg; - UNLOCK(&sock->lock); ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread); ievent->sock = sock; @@ -750,12 +735,11 @@ isc__nm_tcp_resumeread(isc_nmsocket_t *sock) { isc__netievent_startread_t *ievent = NULL; REQUIRE(VALID_NMSOCK(sock)); - LOCK(&sock->lock); + REQUIRE(sock->tid == isc_nm_tid()); + if (sock->recv_cb == NULL) { - UNLOCK(&sock->lock); return (ISC_R_CANCELED); } - UNLOCK(&sock->lock); if (!atomic_load(&sock->readpaused)) { return (ISC_R_SUCCESS); @@ -785,12 +769,11 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { void *cbarg; REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(buf != NULL); - LOCK(&sock->lock); cb = sock->recv_cb; cbarg = sock->recv_cbarg; - UNLOCK(&sock->lock); if (nread >= 0) { isc_region_t region = { .base = (unsigned char *)buf->base, @@ -896,7 +879,6 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { int r, w; REQUIRE(VALID_NMSOCK(ssock)); - REQUIRE(ssock->tid == isc_nm_tid()); if (!atomic_load_relaxed(&ssock->active) || atomic_load_relaxed(&ssock->mgr->closing)) @@ -956,7 +938,18 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { /* We have an accepted TCP socket, pass it to a random worker */ w = isc_random_uniform(ssock->mgr->nworkers); event = isc__nm_get_ievent(ssock->mgr, netievent_tcpchildaccept); - event->sock = ssock; + + /* Duplicate the server socket */ + isc_nmsocket_t *csock = isc_mem_get(ssock->mgr->mctx, + sizeof(isc_nmsocket_t)); + isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface); + csock->tid = w; + csock->extrahandlesize = ssock->extrahandlesize; + isc__nmsocket_attach(ssock, &csock->server); + csock->accept_cb = ssock->accept_cb; + csock->accept_cbarg = ssock->accept_cbarg; + + event->sock = csock; event->quota = quota; r = isc_uv_export((uv_stream_t *)uvstream, &event->streaminfo); @@ -1150,16 +1143,15 @@ isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) { void isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_nm_tid()); if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL) { isc_nm_recv_cb_t cb; void *cbarg; - LOCK(&sock->lock); cb = sock->recv_cb; cbarg = sock->recv_cbarg; isc__nmsocket_clearcb(sock); - UNLOCK(&sock->lock); if (cb != NULL) { cb(sock->statichandle, ISC_R_CANCELED, NULL, cbarg); @@ -1177,16 +1169,15 @@ isc__nm_tcp_cancelread(isc_nmhandle_t *handle) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcpsocket); + REQUIRE(sock->tid == isc_nm_tid()); if (atomic_load(&sock->client)) { isc_nm_recv_cb_t cb; void *cbarg; - LOCK(&sock->lock); cb = sock->recv_cb; cbarg = sock->recv_cbarg; isc__nmsocket_clearcb(sock); - UNLOCK(&sock->lock); cb(handle, ISC_R_EOF, NULL, cbarg); } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 4e20be080f..283f77665e 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -119,10 +119,8 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { return (result); } - LOCK(&dnslistensock->lock); accept_cb = dnslistensock->accept_cb; accept_cbarg = dnslistensock->accept_cbarg; - UNLOCK(&dnslistensock->lock); if (accept_cb != NULL) { result = accept_cb(handle, ISC_R_SUCCESS, accept_cbarg); @@ -184,6 +182,7 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) { size_t len; REQUIRE(VALID_NMSOCK(dnssock)); + REQUIRE(dnssock->tid == isc_nm_tid()); REQUIRE(handlep != NULL && *handlep == NULL); /* @@ -214,12 +213,9 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) { listener = dnssock->listener; if (listener != NULL) { - LOCK(&listener->lock); cb = listener->recv_cb; cbarg = listener->recv_cbarg; - UNLOCK(&listener->lock); } else if (dnssock->recv_cb != NULL) { - LOCK(&dnssock->lock); cb = dnssock->recv_cb; cbarg = dnssock->recv_cbarg; /* @@ -228,7 +224,6 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) { * call to isc_nm_read() and set up a new callback. */ isc__nmsocket_clearcb(dnssock); - UNLOCK(&dnssock->lock); } if (cb != NULL) { @@ -265,6 +260,7 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, size_t len; REQUIRE(VALID_NMSOCK(dnssock)); + REQUIRE(dnssock->tid == isc_nm_tid()); REQUIRE(VALID_NMHANDLE(handle)); if (region == NULL || eresult != ISC_R_SUCCESS) { @@ -314,10 +310,8 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, uv_timer_stop(&dnssock->timer); } - LOCK(&dnssock->lock); if (atomic_load(&dnssock->sequential) || dnssock->recv_cb == NULL) { - UNLOCK(&dnssock->lock); /* * There are two reasons we might want to pause here: * - We're in sequential mode and we've received @@ -328,7 +322,6 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, isc_nm_pauseread(dnssock->outerhandle); done = true; } else { - UNLOCK(&dnssock->lock); /* * We're pipelining, so we now resume processing * packets until the clients-per-connection limit @@ -364,12 +357,10 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, REQUIRE(VALID_NM(mgr)); isc__nmsocket_init(dnslistensock, mgr, isc_nm_tcpdnslistener, iface); - LOCK(&dnslistensock->lock); dnslistensock->recv_cb = cb; dnslistensock->recv_cbarg = cbarg; dnslistensock->accept_cb = accept_cb; dnslistensock->accept_cbarg = accept_cbarg; - UNLOCK(&dnslistensock->lock); dnslistensock->extrahandlesize = extrahandlesize; /* @@ -392,18 +383,40 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, } void -isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock) { +isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpdnsstop_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + + UNUSED(worker); + + REQUIRE(isc__nm_in_netthread()); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcpdnslistener); + REQUIRE(sock->tid == isc_nm_tid()); atomic_store(&sock->listening, false); atomic_store(&sock->closed, true); + isc__nmsocket_clearcb(sock); if (sock->outer != NULL) { isc__nm_tcp_stoplistening(sock->outer); isc__nmsocket_detach(&sock->outer); } + + isc__nmsocket_detach(&sock); +} + +void +isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpdnslistener); + + isc__netievent_tcpdnsstop_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_tcpdnsstop); + isc__nmsocket_attach(sock, &ievent->sock); + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); } void @@ -536,6 +549,7 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) { isc_nmsocket_t *sock = ievent->sock; REQUIRE(worker->id == sock->tid); + REQUIRE(sock->tid == isc_nm_tid()); result = ISC_R_NOTCONNECTED; if (atomic_load(&sock->active) && sock->outerhandle != NULL) { diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 7056a29bf2..fd017fde9b 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -377,12 +377,7 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, region.base = (unsigned char *)buf->base; region.length = nrecv; - /* - * In tcp.c and tcpdns.c, this would need to be locked - * by sock->lock because callbacks may be set to NULL - * unexpectedly when the connection drops, but that isn't - * a factor in the UDP case. - */ + INSIST(sock->tid == isc_nm_tid()); INSIST(sock->recv_cb != NULL); cb = sock->recv_cb; cbarg = sock->recv_cbarg;