Merge branch 'ondrej/clean-netmgr-callback-locks-v9_16' into 'v9_16'

Clean netmgr callback locks (v9.16)

See merge request isc-projects/bind9!4252
This commit is contained in:
Ondřej Surý
2020-10-08 06:50:23 +00:00
5 changed files with 75 additions and 68 deletions

View File

@@ -149,6 +149,7 @@ typedef enum isc__netievent_type {
netievent_tcpdnssend,
netievent_tcpdnsclose,
netievent_tcpdnsstop,
netievent_closecb,
netievent_shutdown,
@@ -217,6 +218,7 @@ typedef isc__netievent__socket_t isc__netievent_startread_t;
typedef isc__netievent__socket_t isc__netievent_pauseread_t;
typedef isc__netievent__socket_t isc__netievent_closecb_t;
typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t;
typedef isc__netievent__socket_t isc__netievent_tcpdnsstop_t;
typedef struct isc__netievent__socket_req {
isc__netievent_type type;
@@ -787,9 +789,10 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock);
void
isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0);
#define isc__nm_uverr2result(x) \
isc___nm_uverr2result(x, true, __FILE__, __LINE__)

View File

@@ -648,6 +648,9 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
case netievent_tcpdnsclose:
isc__nm_async_tcpdnsclose(worker, ievent);
break;
case netievent_tcpdnsstop:
isc__nm_async_tcpdnsstop(worker, ievent);
break;
case netievent_closecb:
isc__nm_async_closecb(worker, ievent);
@@ -1037,6 +1040,7 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type,
void
isc__nmsocket_clearcb(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(!isc__nm_in_netthread() || sock->tid == isc_nm_tid());
sock->recv_cb = NULL;
sock->recv_cbarg = NULL;

View File

@@ -431,8 +431,7 @@ void
isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpchildaccept_t *ievent =
(isc__netievent_tcpchildaccept_t *)ev0;
isc_nmsocket_t *ssock = ievent->sock;
isc_nmsocket_t *csock = NULL;
isc_nmsocket_t *sock = ievent->sock;
isc_nmhandle_t *handle;
isc_result_t result;
struct sockaddr_storage ss;
@@ -442,20 +441,15 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
void *accept_cbarg;
REQUIRE(isc__nm_in_netthread());
REQUIRE(ssock->type == isc_nm_tcplistener);
REQUIRE(sock->tid == isc_nm_tid());
csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t));
isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface);
csock->tid = isc_nm_tid();
csock->extrahandlesize = ssock->extrahandlesize;
csock->quota = ievent->quota;
sock->quota = ievent->quota;
ievent->quota = NULL;
worker = &ssock->mgr->workers[isc_nm_tid()];
uv_tcp_init(&worker->loop, &csock->uv_handle.tcp);
worker = &sock->mgr->workers[isc_nm_tid()];
uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
r = isc_uv_import(&csock->uv_handle.stream, &ievent->streaminfo);
r = isc_uv_import(&sock->uv_handle.stream, &ievent->streaminfo);
if (r != 0) {
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
@@ -465,20 +459,19 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
goto error;
}
r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
r = uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
&(int){ sizeof(ss) });
if (r != 0) {
result = isc__nm_uverr2result(r);
goto error;
}
result = isc_sockaddr_fromsockaddr(&csock->peer,
(struct sockaddr *)&ss);
result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss);
if (result != ISC_R_SUCCESS) {
goto error;
}
r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
r = uv_tcp_getsockname(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
&(int){ sizeof(ss) });
if (r != 0) {
result = isc__nm_uverr2result(r);
@@ -490,23 +483,19 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
goto error;
}
isc__nmsocket_attach(ssock, &csock->server);
handle = isc__nmhandle_get(sock, NULL, &local);
handle = isc__nmhandle_get(csock, NULL, &local);
INSIST(sock->accept_cb != NULL);
accept_cb = sock->accept_cb;
accept_cbarg = sock->accept_cbarg;
LOCK(&ssock->lock);
INSIST(ssock->accept_cb != NULL);
accept_cb = ssock->accept_cb;
accept_cbarg = ssock->accept_cbarg;
UNLOCK(&ssock->lock);
csock->read_timeout = ssock->mgr->init;
sock->read_timeout = sock->mgr->init;
accept_cb(handle, ISC_R_SUCCESS, accept_cbarg);
/*
* csock is now attached to the handle.
* sock is now attached to the handle.
*/
isc__nmsocket_detach(&csock);
isc__nmsocket_detach(&sock);
/*
* The accept callback should have attached to the handle.
@@ -521,8 +510,8 @@ error:
* otherwise it'd be detached later asynchronously, and clog
* the quota unnecessarily.
*/
if (csock->quota != NULL) {
isc_quota_detach(&csock->quota);
if (sock->quota != NULL) {
isc_quota_detach(&sock->quota);
}
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR,
ISC_LOG_ERROR, "Accepting TCP connection failed: %s",
@@ -531,17 +520,16 @@ error:
/*
* Detach the socket properly to make sure uv_close() is called.
*/
isc__nmsocket_detach(&csock);
isc__nmsocket_detach(&sock);
}
void
isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
isc__netievent_tcpstop_t *ievent = NULL;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(!isc__nm_in_netthread());
REQUIRE(sock->type == isc_nm_tcplistener);
ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstop);
isc__netievent_tcpstop_t *ievent =
isc__nm_get_ievent(sock->mgr, netievent_tcpstop);
isc__nmsocket_attach(sock, &ievent->sock);
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
@@ -617,11 +605,9 @@ readtimeout_cb(uv_timer_t *handle) {
isc_quota_detach(&sock->quota);
}
LOCK(&sock->lock);
cb = sock->recv_cb;
cbarg = sock->recv_cbarg;
isc__nmsocket_clearcb(sock);
UNLOCK(&sock->lock);
if (cb != NULL) {
cb(sock->statichandle, ISC_R_TIMEDOUT, NULL, cbarg);
@@ -638,10 +624,9 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
sock = handle->sock;
LOCK(&sock->lock);
REQUIRE(sock->tid == isc_nm_tid());
sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
UNLOCK(&sock->lock);
ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
ievent->sock = sock;
@@ -750,12 +735,11 @@ isc__nm_tcp_resumeread(isc_nmsocket_t *sock) {
isc__netievent_startread_t *ievent = NULL;
REQUIRE(VALID_NMSOCK(sock));
LOCK(&sock->lock);
REQUIRE(sock->tid == isc_nm_tid());
if (sock->recv_cb == NULL) {
UNLOCK(&sock->lock);
return (ISC_R_CANCELED);
}
UNLOCK(&sock->lock);
if (!atomic_load(&sock->readpaused)) {
return (ISC_R_SUCCESS);
@@ -785,12 +769,11 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
void *cbarg;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(buf != NULL);
LOCK(&sock->lock);
cb = sock->recv_cb;
cbarg = sock->recv_cbarg;
UNLOCK(&sock->lock);
if (nread >= 0) {
isc_region_t region = { .base = (unsigned char *)buf->base,
@@ -896,7 +879,6 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
int r, w;
REQUIRE(VALID_NMSOCK(ssock));
REQUIRE(ssock->tid == isc_nm_tid());
if (!atomic_load_relaxed(&ssock->active) ||
atomic_load_relaxed(&ssock->mgr->closing))
@@ -956,7 +938,18 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
/* We have an accepted TCP socket, pass it to a random worker */
w = isc_random_uniform(ssock->mgr->nworkers);
event = isc__nm_get_ievent(ssock->mgr, netievent_tcpchildaccept);
event->sock = ssock;
/* Duplicate the server socket */
isc_nmsocket_t *csock = isc_mem_get(ssock->mgr->mctx,
sizeof(isc_nmsocket_t));
isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface);
csock->tid = w;
csock->extrahandlesize = ssock->extrahandlesize;
isc__nmsocket_attach(ssock, &csock->server);
csock->accept_cb = ssock->accept_cb;
csock->accept_cbarg = ssock->accept_cbarg;
event->sock = csock;
event->quota = quota;
r = isc_uv_export((uv_stream_t *)uvstream, &event->streaminfo);
@@ -1150,16 +1143,15 @@ isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
void
isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid());
if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL) {
isc_nm_recv_cb_t cb;
void *cbarg;
LOCK(&sock->lock);
cb = sock->recv_cb;
cbarg = sock->recv_cbarg;
isc__nmsocket_clearcb(sock);
UNLOCK(&sock->lock);
if (cb != NULL) {
cb(sock->statichandle, ISC_R_CANCELED, NULL, cbarg);
@@ -1177,16 +1169,15 @@ isc__nm_tcp_cancelread(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->tid == isc_nm_tid());
if (atomic_load(&sock->client)) {
isc_nm_recv_cb_t cb;
void *cbarg;
LOCK(&sock->lock);
cb = sock->recv_cb;
cbarg = sock->recv_cbarg;
isc__nmsocket_clearcb(sock);
UNLOCK(&sock->lock);
cb(handle, ISC_R_EOF, NULL, cbarg);
}

View File

@@ -119,10 +119,8 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
return (result);
}
LOCK(&dnslistensock->lock);
accept_cb = dnslistensock->accept_cb;
accept_cbarg = dnslistensock->accept_cbarg;
UNLOCK(&dnslistensock->lock);
if (accept_cb != NULL) {
result = accept_cb(handle, ISC_R_SUCCESS, accept_cbarg);
@@ -184,6 +182,7 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) {
size_t len;
REQUIRE(VALID_NMSOCK(dnssock));
REQUIRE(dnssock->tid == isc_nm_tid());
REQUIRE(handlep != NULL && *handlep == NULL);
/*
@@ -214,12 +213,9 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) {
listener = dnssock->listener;
if (listener != NULL) {
LOCK(&listener->lock);
cb = listener->recv_cb;
cbarg = listener->recv_cbarg;
UNLOCK(&listener->lock);
} else if (dnssock->recv_cb != NULL) {
LOCK(&dnssock->lock);
cb = dnssock->recv_cb;
cbarg = dnssock->recv_cbarg;
/*
@@ -228,7 +224,6 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) {
* call to isc_nm_read() and set up a new callback.
*/
isc__nmsocket_clearcb(dnssock);
UNLOCK(&dnssock->lock);
}
if (cb != NULL) {
@@ -265,6 +260,7 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
size_t len;
REQUIRE(VALID_NMSOCK(dnssock));
REQUIRE(dnssock->tid == isc_nm_tid());
REQUIRE(VALID_NMHANDLE(handle));
if (region == NULL || eresult != ISC_R_SUCCESS) {
@@ -314,10 +310,8 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
uv_timer_stop(&dnssock->timer);
}
LOCK(&dnssock->lock);
if (atomic_load(&dnssock->sequential) ||
dnssock->recv_cb == NULL) {
UNLOCK(&dnssock->lock);
/*
* There are two reasons we might want to pause here:
* - We're in sequential mode and we've received
@@ -328,7 +322,6 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
isc_nm_pauseread(dnssock->outerhandle);
done = true;
} else {
UNLOCK(&dnssock->lock);
/*
* We're pipelining, so we now resume processing
* packets until the clients-per-connection limit
@@ -364,12 +357,10 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
REQUIRE(VALID_NM(mgr));
isc__nmsocket_init(dnslistensock, mgr, isc_nm_tcpdnslistener, iface);
LOCK(&dnslistensock->lock);
dnslistensock->recv_cb = cb;
dnslistensock->recv_cbarg = cbarg;
dnslistensock->accept_cb = accept_cb;
dnslistensock->accept_cbarg = accept_cbarg;
UNLOCK(&dnslistensock->lock);
dnslistensock->extrahandlesize = extrahandlesize;
/*
@@ -392,18 +383,40 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
}
void
isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock) {
isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpdnsstop_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
UNUSED(worker);
REQUIRE(isc__nm_in_netthread());
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpdnslistener);
REQUIRE(sock->tid == isc_nm_tid());
atomic_store(&sock->listening, false);
atomic_store(&sock->closed, true);
isc__nmsocket_clearcb(sock);
if (sock->outer != NULL) {
isc__nm_tcp_stoplistening(sock->outer);
isc__nmsocket_detach(&sock->outer);
}
isc__nmsocket_detach(&sock);
}
void
isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpdnslistener);
isc__netievent_tcpdnsstop_t *ievent =
isc__nm_get_ievent(sock->mgr, netievent_tcpdnsstop);
isc__nmsocket_attach(sock, &ievent->sock);
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
}
void
@@ -536,6 +549,7 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) {
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(worker->id == sock->tid);
REQUIRE(sock->tid == isc_nm_tid());
result = ISC_R_NOTCONNECTED;
if (atomic_load(&sock->active) && sock->outerhandle != NULL) {

View File

@@ -377,12 +377,7 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
region.base = (unsigned char *)buf->base;
region.length = nrecv;
/*
* In tcp.c and tcpdns.c, this would need to be locked
* by sock->lock because callbacks may be set to NULL
* unexpectedly when the connection drops, but that isn't
* a factor in the UDP case.
*/
INSIST(sock->tid == isc_nm_tid());
INSIST(sock->recv_cb != NULL);
cb = sock->recv_cb;
cbarg = sock->recv_cbarg;