Compare commits

...

4 Commits

Author SHA1 Message Date
Witold Kręcicki
a28b46d60c WiP 2020-04-08 22:58:18 +02:00
Witold Kręcicki
2524abe0b7 Allow uvreq to not be bound to socket (e.g. for TCP connection initiation) 2020-04-03 12:41:01 +02:00
Witold Kręcicki
59bb3e9b72 Clean quota callback after calling it 2020-04-03 12:39:11 +02:00
Witold Kręcicki
bb991c356f Redesigned TCP accepting: one listen/accept loop, passing the connected socket.
Instead of using bind() and passing the listening socket to the children
threads using uv_export/uv_import use one thread that does the accepting,
and then passes the connected socket using uv_export/uv_import to a random
worker. The previous solution had thundering herd problems (all workers
waking up on one connection and trying to accept()), this one avoids this
and is simpler.
The tcp clients quota is simplified with isc_quota_attach_cb - a callback
is issued when the quota is available.
2020-04-03 12:39:11 +02:00
6 changed files with 341 additions and 347 deletions

View File

@@ -226,6 +226,10 @@ isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
* in 'cb'.
*/
isc_result_t
isc_nm_connecttcp(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *remote, isc_nm_cb_t cb);
isc_result_t
isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb,
void *cbarg, size_t extrahandlesize, int backlog,

View File

@@ -22,6 +22,7 @@
#include <isc/mem.h>
#include <isc/netmgr.h>
#include <isc/queue.h>
#include <isc/quota.h>
#include <isc/random.h>
#include <isc/refcount.h>
#include <isc/region.h>
@@ -131,8 +132,8 @@ typedef enum isc__netievent_type {
netievent_tcprecv,
netievent_tcpstartread,
netievent_tcppauseread,
netievent_tcpchildlisten,
netievent_tcpchildstop,
netievent_tcpchildaccept,
netievent_tcpaccept,
netievent_tcpstop,
netievent_tcpclose,
netievent_tcpdnsclose,
@@ -180,6 +181,7 @@ typedef union {
typedef struct isc__nm_uvreq {
int magic;
isc_nm_t *mgr;
isc_nmsocket_t *sock;
isc_nmhandle_t *handle;
uv_buf_t uvbuf; /* translated isc_region_t, to be
@@ -188,8 +190,7 @@ typedef struct isc__nm_uvreq {
isc_sockaddr_t peer; /* peer address */
isc__nm_cb_t cb; /* callback */
void *cbarg; /* callback argument */
uv_pipe_t ipc; /* used for sending socket
* uv_handles to other threads */
union {
uv_req_t req;
uv_getaddrinfo_t getaddrinfo;
@@ -211,7 +212,6 @@ typedef struct isc__netievent__socket {
typedef isc__netievent__socket_t isc__netievent_udplisten_t;
typedef isc__netievent__socket_t isc__netievent_udpstop_t;
typedef isc__netievent__socket_t isc__netievent_tcpstop_t;
typedef isc__netievent__socket_t isc__netievent_tcpchildstop_t;
typedef isc__netievent__socket_t isc__netievent_tcpclose_t;
typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t;
typedef isc__netievent__socket_t isc__netievent_startread_t;
@@ -228,13 +228,15 @@ typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t;
typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t;
typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t;
typedef struct isc__netievent__socket_streaminfo {
typedef struct isc__netievent__socket_streaminfo_quota {
isc__netievent_type type;
isc_nmsocket_t *sock;
isc_uv_stream_info_t streaminfo;
} isc__netievent__socket_streaminfo_t;
isc_quota_t *quota;
} isc__netievent__socket_streaminfo_quota_t;
typedef isc__netievent__socket_streaminfo_t isc__netievent_tcpchildlisten_t;
typedef isc__netievent__socket_streaminfo_quota_t
isc__netievent_tcpchildaccept_t;
typedef struct isc__netievent__socket_handle {
isc__netievent_type type;
@@ -242,6 +244,14 @@ typedef struct isc__netievent__socket_handle {
isc_nmhandle_t *handle;
} isc__netievent__socket_handle_t;
typedef struct isc__netievent__socket_quota {
isc__netievent_type type;
isc_nmsocket_t *sock;
isc_quota_t *quota;
} isc__netievent__socket_quota_t;
typedef isc__netievent__socket_quota_t isc__netievent_tcpaccept_t;
typedef struct isc__netievent_udpsend {
isc__netievent_type type;
isc_nmsocket_t *sock;
@@ -261,7 +271,8 @@ typedef union {
isc__netievent__socket_t nis;
isc__netievent__socket_req_t nisr;
isc__netievent_udpsend_t nius;
isc__netievent__socket_streaminfo_t niss;
isc__netievent__socket_quota_t nisq;
isc__netievent__socket_streaminfo_quota_t nissq;
} isc__netievent_storage_t;
/*
@@ -324,7 +335,6 @@ typedef enum isc_nmsocket_type {
isc_nm_udplistener, /* Aggregate of nm_udpsocks */
isc_nm_tcpsocket,
isc_nm_tcplistener,
isc_nm_tcpchildlistener,
isc_nm_tcpdnslistener,
isc_nm_tcpdnssocket
} isc_nmsocket_type;
@@ -370,16 +380,7 @@ struct isc_nmsocket {
*/
isc_quota_t *quota;
isc_quota_t *pquota;
/*%
* How many connections we have not accepted due to quota?
* When we close a connection we need to accept a new one.
*/
int overquota;
/*%
* How many active connections we have?
*/
int conns;
isc_quota_cb_t quotacb;
/*%
* Socket statistics
@@ -598,11 +599,11 @@ isc__nm_uvreq_get(isc_nm_t *mgr, isc_nmsocket_t *sock);
*/
void
isc__nm_uvreq_put(isc__nm_uvreq_t **req, isc_nmsocket_t *sock);
isc__nm_uvreq_put(isc__nm_uvreq_t **req);
/*%<
* Completes the use of a UV request structure, setting '*req' to NULL.
*
* The UV request is pushed onto the 'sock->inactivereqs' stack or,
* The UV request is pushed onto the 'req->sock->inactivereqs' stack or,
* if that doesn't work, freed.
*/
@@ -704,12 +705,12 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ev0);
isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpchildstop(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ev0);

View File

@@ -603,8 +603,11 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
case netievent_tcplisten:
isc__nm_async_tcplisten(worker, ievent);
break;
case netievent_tcpchildlisten:
isc__nm_async_tcpchildlisten(worker, ievent);
case netievent_tcpchildaccept:
isc__nm_async_tcpchildaccept(worker, ievent);
break;
case netievent_tcpaccept:
isc__nm_async_tcpaccept(worker, ievent);
break;
case netievent_tcpstartread:
isc__nm_async_tcp_startread(worker, ievent);
@@ -618,9 +621,6 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
case netievent_tcpstop:
isc__nm_async_tcpstop(worker, ievent);
break;
case netievent_tcpchildstop:
isc__nm_async_tcpchildstop(worker, ievent);
break;
case netievent_tcpclose:
isc__nm_async_tcpclose(worker, ievent);
break;
@@ -927,6 +927,7 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type,
sock->ah_size * sizeof(size_t));
sock->ah_handles = isc_mem_allocate(
mgr->mctx, sock->ah_size * sizeof(isc_nmhandle_t *));
ISC_LINK_INIT(&sock->quotacb, link);
for (size_t i = 0; i < 32; i++) {
sock->ah_frees[i] = i;
sock->ah_handles[i] = NULL;
@@ -944,7 +945,6 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type,
break;
case isc_nm_tcpsocket:
case isc_nm_tcplistener:
case isc_nm_tcpchildlistener:
if (family == AF_INET) {
sock->statsindex = tcp4statsindex;
} else {
@@ -1246,7 +1246,7 @@ isc__nm_uvreq_get(isc_nm_t *mgr, isc_nmsocket_t *sock) {
isc__nm_uvreq_t *req = NULL;
REQUIRE(VALID_NM(mgr));
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock == NULL || VALID_NMSOCK(sock));
if (sock != NULL && atomic_load(&sock->active)) {
/* Try to reuse one */
@@ -1259,16 +1259,22 @@ isc__nm_uvreq_get(isc_nm_t *mgr, isc_nmsocket_t *sock) {
*req = (isc__nm_uvreq_t){ .magic = 0 };
req->uv_req.req.data = req;
isc_nmsocket_attach(sock, &req->sock);
isc_nm_attach(mgr, &req->mgr);
if (sock != NULL) {
isc_nmsocket_attach(sock, &req->sock);
}
req->magic = UVREQ_MAGIC;
return (req);
}
void
isc__nm_uvreq_put(isc__nm_uvreq_t **req0, isc_nmsocket_t *sock) {
isc__nm_uvreq_put(isc__nm_uvreq_t **req0) {
isc__nm_uvreq_t *req = NULL;
isc_nmhandle_t *handle = NULL;
isc_nmsocket_t *sock;
isc_nm_t *mgr;
bool reuse = false;
REQUIRE(req0 != NULL);
REQUIRE(VALID_UVREQ(*req0));
@@ -1276,8 +1282,6 @@ isc__nm_uvreq_put(isc__nm_uvreq_t **req0, isc_nmsocket_t *sock) {
req = *req0;
*req0 = NULL;
INSIST(sock == req->sock);
req->magic = 0;
/*
@@ -1285,18 +1289,25 @@ isc__nm_uvreq_put(isc__nm_uvreq_t **req0, isc_nmsocket_t *sock) {
* sock, and the netmgr won't all disappear.
*/
handle = req->handle;
req->handle = NULL;
if (!atomic_load(&sock->active) ||
!isc_astack_trypush(sock->inactivereqs, req)) {
isc_mempool_put(sock->mgr->reqpool, req);
sock = req->sock;
mgr = req->mgr;
*req = (isc__nm_uvreq_t){ .magic = 0 };
if (sock != NULL && atomic_load(&sock->active)) {
reuse = isc_astack_trypush(sock->inactivereqs, req);
}
if (!reuse) {
isc_mempool_put(mgr->reqpool, req);
}
if (handle != NULL) {
isc_nmhandle_unref(handle);
}
isc_nmsocket_detach(&sock);
if (sock != NULL) {
isc_nmsocket_detach(&sock);
}
isc_nm_detach(&mgr);
}
isc_result_t

View File

@@ -49,7 +49,7 @@ can_log_tcp_quota() {
}
static int
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
tcp_connect_direct(isc__nm_uvreq_t *req);
static void
tcp_close_direct(isc_nmsocket_t *sock);
@@ -68,21 +68,27 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
static void
tcp_close_cb(uv_handle_t *uvhandle);
static void
stoplistening(isc_nmsocket_t *sock);
static void
tcp_listenclose_cb(uv_handle_t *handle);
static isc_result_t
accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota);
static int
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL;
tcp_connect_direct(isc__nm_uvreq_t *req) {
isc_nmsocket_t *sock;
isc__networker_t *worker;
int r;
REQUIRE(isc__nm_in_netthread());
worker = &req->mgr->workers[isc_nm_tid()];
worker = &sock->mgr->workers[isc_nm_tid()];
sock = isc_mem_get(req->mgr->mctx, sizeof(isc_nmsocket_t));
isc__nmsocket_init(sock, req->mgr, isc_nm_tcpsocket, NULL);
sock->tid = isc_nm_tid();
sock->extrahandlesize = 0;
r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
if (r != 0) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
return (r);
@@ -97,9 +103,13 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
return (r);
}
}
uv_handle_set_data(&sock->uv_handle.handle, sock);
r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
&req->peer.type.sa, tcp_connect_cb);
if (r != 0) {
tcp_close_direct(sock);
}
return (r);
}
@@ -107,14 +117,11 @@ void
isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpconnect_t *ievent =
(isc__netievent_tcpconnect_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc__nm_uvreq_t *req = ievent->req;
UNUSED(worker);
int r;
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(worker->id == ievent->req->sock->mgr->workers[isc_nm_tid()].id);
r = tcp_connect_direct(sock, req);
r = tcp_connect_direct(req);
if (r != 0) {
/* We need to issue callbacks ourselves */
tcp_connect_cb(&req->uv_req.connect, r);
@@ -153,9 +160,39 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
}
isc__nm_uvreq_put(&req, sock);
isc__nm_uvreq_put(&req);
}
isc_result_t
isc_nm_connecttcp(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *remote, isc_nm_cb_t cb) {
isc__nm_uvreq_t *req;
isc__netievent_tcpconnect_t *ievent;
REQUIRE(VALID_NM(mgr));
req = isc__nm_uvreq_get(mgr, NULL);
if (local != NULL) {
req->local = *local;
}
req->peer = *remote;
req->cb.connect = cb;
ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect);
ievent->req = req;
/*
* XXXWPK maybe we should always use random() and call connect
* directly if random value == isc_nm_tid()?
*/
if (isc__nm_in_netthread()) {
isc__nm_async_tcpconnect(&mgr->workers[isc_nm_tid()],
(isc__netievent_t *)ievent);
isc__nm_put_ievent(mgr, ievent);
} else {
int w = isc_random_uniform(mgr->nworkers);
isc__nm_enqueue_ievent(&mgr->workers[w],
(isc__netievent_t *)ievent);
}
return (ISC_R_SUCCESS);
}
isc_result_t
isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb,
void *cbarg, size_t extrahandlesize, int backlog,
@@ -167,11 +204,6 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb,
nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener, iface);
nsock->nchildren = mgr->nworkers;
atomic_init(&nsock->rchildren, mgr->nworkers);
nsock->children = isc_mem_get(mgr->mctx,
mgr->nworkers * sizeof(*nsock));
memset(nsock->children, 0, mgr->nworkers * sizeof(*nsock));
nsock->rcb.accept = cb;
nsock->rcbarg = cbarg;
nsock->extrahandlesize = extrahandlesize;
@@ -216,15 +248,10 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb,
}
/*
* For multi-threaded TCP listening, we create a single "parent" socket,
* bind to it, and then pass its uv_handle to a set of child sockets, one
* per worker. For thread safety, the passing of the socket's uv_handle has
* to be done via IPC socket.
*
* This design pattern is ugly but it's what's recommended by the libuv
* documentation. (A prior version of libuv had uv_export() and
* uv_import() functions which would have simplified this greatly, but
* they have been deprecated and removed.)
* For multi-threaded TCP listening, we create a single socket,
* bind to it, and start listening. On an incoming connection we accept
* it and then pass the accepted socket using uv_export/uv_import mechanism
* to child threads.
*/
void
isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
@@ -236,24 +263,6 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
REQUIRE(isc__nm_in_netthread());
REQUIRE(sock->type == isc_nm_tcplistener);
/* Initialize children now to make cleaning up easier */
for (int i = 0; i < sock->nchildren; i++) {
isc_nmsocket_t *csock = &sock->children[i];
isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener,
sock->iface);
csock->parent = sock;
csock->tid = i;
csock->pquota = sock->pquota;
csock->backlog = sock->backlog;
csock->extrahandlesize = sock->extrahandlesize;
INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL);
csock->rcb.accept = sock->rcb.accept;
csock->rcbarg = sock->rcbarg;
csock->fd = -1;
}
r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
if (r != 0) {
/* It was never opened */
@@ -295,39 +304,26 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
goto done;
}
uv_handle_set_data(&sock->uv_handle.handle, sock);
/*
* For each worker, we send a 'tcpchildlisten' event with
* the exported socket.
* We're in the network thread - tcp_connection_cb won't be called
* until we leave, there's no race between this and the callback.
*/
for (int i = 0; i < sock->nchildren; i++) {
isc_nmsocket_t *csock = &sock->children[i];
isc__netievent_tcpchildlisten_t *event = NULL;
r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog,
tcp_connection_cb);
event = isc__nm_get_ievent(csock->mgr,
netievent_tcpchildlisten);
r = isc_uv_export(&sock->uv_handle.stream, &event->streaminfo);
if (r != 0) {
isc_log_write(
isc_lctx, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
"uv_export failed: %s",
isc_result_totext(isc__nm_uverr2result(r)));
isc__nm_put_ievent(sock->mgr, event);
continue;
}
event->sock = csock;
if (csock->tid == isc_nm_tid()) {
isc__nm_async_tcpchildlisten(&sock->mgr->workers[i],
(isc__netievent_t *)event);
isc__nm_put_ievent(sock->mgr, event);
} else {
isc__nm_enqueue_ievent(&sock->mgr->workers[i],
(isc__netievent_t *)event);
}
if (r != 0) {
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
"uv_listen failed: %s",
isc_result_totext(isc__nm_uverr2result(r)));
uv_close(&sock->uv_handle.handle, tcp_close_cb);
sock->result = isc__nm_uverr2result(r);
atomic_store(&sock->listen_error, true);
goto done;
}
uv_handle_set_data(&sock->uv_handle.handle, sock);
atomic_store(&sock->listening, true);
done:
@@ -337,43 +333,108 @@ done:
return;
}
/*
* Connect to the parent socket and be ready to receive the uv_handle
* for the socket we'll be listening on.
*/
static void
tcp_connection_cb(uv_stream_t *server, int status) {
isc_nmsocket_t *psock = uv_handle_get_data((uv_handle_t *)server);
isc_result_t result;
UNUSED(status);
result = accept_connection(psock, NULL);
if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) {
if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) ||
can_log_tcp_quota()) {
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
"TCP connection failed: %s",
isc_result_totext(result));
}
}
}
void
isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpchildlisten_t *ievent =
(isc__netievent_tcpchildlisten_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(worker);
isc__netievent_tcpchildaccept_t *ievent =
(isc__netievent_tcpchildaccept_t *)ev0;
isc_nmsocket_t *ssock = ievent->sock;
isc_nmsocket_t *csock;
isc_nmhandle_t *handle;
isc_result_t result;
struct sockaddr_storage ss;
isc_sockaddr_t local;
int r;
REQUIRE(isc__nm_in_netthread());
REQUIRE(sock->type == isc_nm_tcpchildlistener);
REQUIRE(ssock->type == isc_nm_tcplistener);
worker = &sock->mgr->workers[isc_nm_tid()];
csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t));
isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface);
csock->tid = isc_nm_tid();
csock->extrahandlesize = ssock->extrahandlesize;
csock->quota = ievent->quota;
ievent->quota = NULL;
uv_tcp_init(&worker->loop, (uv_tcp_t *)&sock->uv_handle.tcp);
uv_handle_set_data(&sock->uv_handle.handle, sock);
r = isc_uv_import(&sock->uv_handle.stream, &ievent->streaminfo);
worker = &ssock->mgr->workers[isc_nm_tid()];
uv_tcp_init(&worker->loop, &csock->uv_handle.tcp);
r = isc_uv_import(&csock->uv_handle.stream, &ievent->streaminfo);
if (r != 0) {
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
"uv_import failed: %s",
isc_result_totext(isc__nm_uverr2result(r)));
return;
result = isc__nm_uverr2result(r);
goto error;
}
r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog,
tcp_connection_cb);
r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
&(int){ sizeof(ss) });
if (r != 0) {
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
"uv_listen failed: %s",
isc_result_totext(isc__nm_uverr2result(r)));
return;
result = isc__nm_uverr2result(r);
goto error;
}
result = isc_sockaddr_fromsockaddr(&csock->peer,
(struct sockaddr *)&ss);
if (result != ISC_R_SUCCESS) {
goto error;
}
r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
&(int){ sizeof(ss) });
if (r != 0) {
result = isc__nm_uverr2result(r);
goto error;
}
result = isc_sockaddr_fromsockaddr(&local, (struct sockaddr *)&ss);
if (result != ISC_R_SUCCESS) {
goto error;
}
isc_nmsocket_attach(ssock, &csock->server);
handle = isc__nmhandle_get(csock, NULL, &local);
INSIST(ssock->rcb.accept != NULL);
csock->read_timeout = ssock->mgr->init;
ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg);
isc_nmsocket_detach(&csock);
return;
error:
/*
* Detach it early to make room for other connections, otherwise
* it'd be detached later asynchronously clogging the quota.
*/
if (csock->quota != NULL) {
isc_quota_detach(&csock->quota);
}
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR,
ISC_LOG_ERROR, "Accepting TCP connection failed: %s",
isc_result_totext(result));
/* We need to detach it properly to make sure uv_close is called. */
isc_nmsocket_detach(&csock);
}
void
@@ -411,84 +472,23 @@ isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)event);
} else {
stoplistening(sock);
uv_close((uv_handle_t *)&sock->uv_handle.tcp,
tcp_listenclose_cb);
isc__nm_drop_interlocked(sock->mgr);
}
}
static void
stoplistening(isc_nmsocket_t *sock) {
for (int i = 0; i < sock->nchildren; i++) {
isc__netievent_tcpchildstop_t *event = NULL;
/*
* We can ignore the overhead of event allocation because
* stoplistening is a rare event, and doing it this way
* simplifies sock reference counting.
*/
event = isc__nm_get_ievent(sock->mgr, netievent_tcpchildstop);
isc_nmsocket_attach(&sock->children[i], &event->sock);
if (isc_nm_tid() == sock->children[i].tid) {
isc__nm_async_tcpchildstop(&sock->mgr->workers[i],
(isc__netievent_t *)event);
isc__nm_put_ievent(sock->mgr, event);
} else {
isc__nm_enqueue_ievent(&sock->mgr->workers[i],
(isc__netievent_t *)event);
}
}
LOCK(&sock->lock);
while (atomic_load_relaxed(&sock->rchildren) > 0) {
WAIT(&sock->cond, &sock->lock);
}
UNLOCK(&sock->lock);
uv_close((uv_handle_t *)&sock->uv_handle.tcp, tcp_listenclose_cb);
}
void
isc__nm_async_tcpchildstop(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpchildstop_t *ievent =
(isc__netievent_tcpchildstop_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
UNUSED(worker);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(isc_nm_tid() == sock->tid);
REQUIRE(sock->type == isc_nm_tcpchildlistener);
REQUIRE(sock->parent != NULL);
/*
* rchildren is atomic, but we still need to change it
* under a lock because the parent is waiting on conditional
* and without it we might deadlock.
*/
LOCK(&sock->parent->lock);
atomic_fetch_sub(&sock->parent->rchildren, 1);
UNLOCK(&sock->parent->lock);
uv_close((uv_handle_t *)&sock->uv_handle.tcp, tcp_listenclose_cb);
BROADCAST(&sock->parent->cond);
}
/*
* This callback is used for closing both child and parent listening
* sockets; that's why we need to choose the proper lock.
* This callback is used for closing listening sockets
*/
static void
tcp_listenclose_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
isc_mutex_t *lock = ((sock->parent != NULL) ? &sock->parent->lock
: &sock->lock);
LOCK(lock);
LOCK(&sock->lock);
atomic_store(&sock->closed, true);
atomic_store(&sock->listening, false);
sock->pquota = NULL;
UNLOCK(lock);
UNLOCK(&sock->lock);
isc_nmsocket_detach(&sock);
}
@@ -683,130 +683,29 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
*/
}
static isc_result_t
accept_connection(isc_nmsocket_t *ssock) {
isc_result_t result;
isc_quota_t *quota = NULL;
isc_nmsocket_t *csock = NULL;
isc__networker_t *worker = NULL;
isc_nmhandle_t *handle = NULL;
struct sockaddr_storage ss;
isc_sockaddr_t local;
int r;
bool overquota = false;
REQUIRE(VALID_NMSOCK(ssock));
REQUIRE(ssock->tid == isc_nm_tid());
if (!atomic_load_relaxed(&ssock->active) ||
atomic_load_relaxed(&ssock->mgr->closing))
{
/* We're closing, bail */
return (ISC_R_CANCELED);
}
if (ssock->pquota != NULL) {
result = isc_quota_attach(ssock->pquota, &quota);
/*
* We share the quota between all TCP sockets. Others
* may have used up all the quota slots, in which case
* this socket could starve. So we only fail here if we
* already had at least one active connection on this
* socket. This guarantees that we'll maintain some level
* of service while over quota, and will resume normal
* service when the quota comes back down.
*/
if (result != ISC_R_SUCCESS) {
ssock->overquota++;
overquota = true;
if (ssock->conns > 0) {
isc__nm_incstats(
ssock->mgr,
ssock->statsindex[STATID_ACCEPTFAIL]);
return (result);
}
}
}
isc__nm_incstats(ssock->mgr, ssock->statsindex[STATID_ACCEPT]);
csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t));
isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface);
csock->tid = isc_nm_tid();
csock->extrahandlesize = ssock->extrahandlesize;
csock->quota = quota;
quota = NULL;
worker = &ssock->mgr->workers[isc_nm_tid()];
uv_tcp_init(&worker->loop, &csock->uv_handle.tcp);
r = uv_accept(&ssock->uv_handle.stream, &csock->uv_handle.stream);
if (r != 0) {
result = isc__nm_uverr2result(r);
goto error;
}
r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
&(int){ sizeof(ss) });
if (r != 0) {
result = isc__nm_uverr2result(r);
goto error;
}
result = isc_sockaddr_fromsockaddr(&csock->peer,
(struct sockaddr *)&ss);
if (result != ISC_R_SUCCESS) {
goto error;
}
r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
&(int){ sizeof(ss) });
if (r != 0) {
result = isc__nm_uverr2result(r);
goto error;
}
result = isc_sockaddr_fromsockaddr(&local, (struct sockaddr *)&ss);
if (result != ISC_R_SUCCESS) {
goto error;
}
isc_nmsocket_attach(ssock, &csock->server);
ssock->conns++;
handle = isc__nmhandle_get(csock, NULL, &local);
INSIST(ssock->rcb.accept != NULL);
csock->read_timeout = ssock->mgr->init;
ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg);
isc_nmsocket_detach(&csock);
return (ISC_R_SUCCESS);
error:
static void
quota_accept_cb(isc_quota_t *quota, void *sock_) {
isc_nmsocket_t *sock = (isc_nmsocket_t *)sock_;
INSIST(VALID_NMSOCK(sock));
/*
* Detach it early to make room for other connections, otherwise
* it'd be detached later asynchronously clogging the quota.
* We need to create an event and pass it using async channel
*/
if (csock->quota != NULL) {
isc_quota_detach(&csock->quota);
}
if (overquota) {
ssock->overquota--;
}
/* We need to detach it properly to make sure uv_close is called. */
isc_nmsocket_detach(&csock);
return (result);
isc__netievent_tcpaccept_t *ievent =
isc__nm_get_ievent(sock->mgr, netievent_tcpaccept);
ievent->sock = sock;
ievent->quota = quota;
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
}
static void
tcp_connection_cb(uv_stream_t *server, int status) {
isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t *)server);
/* This is called when we get a quota callback. */
void
isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
isc_result_t result;
isc__netievent_tcpaccept_t *ievent = (isc__netievent_tcpaccept_t *)ev0;
UNUSED(status);
result = accept_connection(ssock);
REQUIRE(worker->id == ievent->sock->tid);
result = accept_connection(ievent->sock, ievent->quota);
if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) {
if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) ||
can_log_tcp_quota()) {
@@ -816,6 +715,104 @@ tcp_connection_cb(uv_stream_t *server, int status) {
isc_result_totext(result));
}
}
/* It was attached when we called isc_quota_attach_cb */
isc_nmsocket_detach(&ievent->sock);
}
/*
* Used to free uv_tcp_t that's used only to accept() and then export the
* socket to a child thread.
*/
static void
free_uvtcpt(uv_handle_t *uvs) {
isc_mem_t *mctx = (isc_mem_t *)uv_handle_get_data(uvs);
isc_mem_putanddetach(&mctx, uvs, sizeof(uv_tcp_t));
}
static isc_result_t
accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
isc_result_t result;
int r;
REQUIRE(VALID_NMSOCK(ssock));
REQUIRE(ssock->tid == isc_nm_tid());
if (!atomic_load_relaxed(&ssock->active) ||
atomic_load_relaxed(&ssock->mgr->closing))
{
/* We're closing, bail */
if (quota != NULL) {
isc_quota_detach(&quota);
}
return (ISC_R_CANCELED);
}
/* We can be called directly or as a callback from quota */
if (ssock->pquota != NULL && quota == NULL) {
/*
* We need to increase the reference on ssock, as might be
* queued in quota. The proper way of doing it would be
* to attach it here - before every call to quota_attach,
* and detach it afterwards but that'd be very suboptimal -
* instead, we just set it here, and attach it only if it'll
* really be used.
* XXXWPK do it 'the right way' for now, see what happens then.
*/
isc_nmsocket_t *tsock = NULL;
isc_nmsocket_attach(ssock, &tsock);
isc_quota_cb_init(&ssock->quotacb, quota_accept_cb, tsock);
result = isc_quota_attach_cb(ssock->pquota, &quota,
&ssock->quotacb);
if (result == ISC_R_QUOTA) {
isc__nm_incstats(ssock->mgr,
ssock->statsindex[STATID_ACCEPTFAIL]);
return (result);
}
/* Deinitialize & detach, just to be safe */
isc_quota_cb_init(&ssock->quotacb, NULL, NULL);
isc_nmsocket_detach(&tsock);
}
isc__nm_incstats(ssock->mgr, ssock->statsindex[STATID_ACCEPT]);
isc__networker_t *worker = &ssock->mgr->workers[isc_nm_tid()];
uv_tcp_t *uvstream = isc_mem_get(ssock->mgr->mctx, sizeof(uv_tcp_t));
isc_mem_t *mctx = NULL;
isc_mem_attach(ssock->mgr->mctx, &mctx);
uv_handle_set_data((uv_handle_t *)uvstream, mctx); /* pass it to free it
*/
mctx = NULL;
uv_tcp_init(&worker->loop, uvstream);
r = uv_accept(&ssock->uv_handle.stream, (uv_stream_t *)uvstream);
if (r != 0) {
result = isc__nm_uverr2result(r);
uv_close((uv_handle_t *)uvstream, free_uvtcpt);
isc_quota_detach(&quota);
return (result);
}
/* We have an accepted TCP socket, pass it to a random worker */
int w = isc_random_uniform(ssock->mgr->nworkers);
isc__netievent_tcpchildaccept_t *event =
isc__nm_get_ievent(ssock->mgr, netievent_tcpchildaccept);
event->sock = ssock;
event->quota = quota;
r = isc_uv_export((uv_stream_t *)uvstream, &event->streaminfo);
INSIST(r == 0);
uv_close((uv_handle_t *)uvstream, free_uvtcpt);
if (w == isc_nm_tid()) {
isc__nm_async_tcpchildaccept(&ssock->mgr->workers[w],
(isc__netievent_t *)event);
isc__nm_put_ievent(ssock->mgr, event);
} else {
isc__nm_enqueue_ievent(&ssock->mgr->workers[w],
(isc__netievent_t *)event);
}
return (ISC_R_SUCCESS);
}
isc_result_t
@@ -872,7 +869,7 @@ tcp_send_cb(uv_write_t *req, int status) {
uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
isc_nmhandle_unref(uvreq->handle);
isc__nm_uvreq_put(&uvreq, uvreq->handle->sock);
isc__nm_uvreq_put(&uvreq);
}
/*
@@ -893,7 +890,7 @@ isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
if (result != ISC_R_SUCCESS) {
ievent->req->cb.send(ievent->req->handle, result,
ievent->req->cbarg);
isc__nm_uvreq_put(&ievent->req, ievent->req->handle->sock);
isc__nm_uvreq_put(&ievent->req);
}
}
@@ -910,7 +907,7 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
if (r < 0) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
req->cb.send(NULL, isc__nm_uverr2result(r), req->cbarg);
isc__nm_uvreq_put(&req, sock);
isc__nm_uvreq_put(&req);
return (isc__nm_uverr2result(r));
}
@@ -943,30 +940,9 @@ tcp_close_direct(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(sock->type == isc_nm_tcpsocket);
isc_nmsocket_t *ssock = sock->server;
if (sock->quota != NULL) {
isc_quota_detach(&sock->quota);
}
if (ssock != NULL) {
ssock->conns--;
while (ssock->conns == 0 && ssock->overquota > 0) {
ssock->overquota--;
isc_result_t result = accept_connection(ssock);
if (result == ISC_R_SUCCESS || result == ISC_R_NOCONN) {
continue;
}
if ((result != ISC_R_QUOTA &&
result != ISC_R_SOFTQUOTA) ||
can_log_tcp_quota()) {
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_NETMGR,
ISC_LOG_ERROR,
"TCP connection failed: %s",
isc_result_totext(result));
}
}
}
if (sock->timer_initialized) {
sock->timer_initialized = false;
uv_timer_stop(&sock->timer);

View File

@@ -439,7 +439,7 @@ isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
} else {
ievent->req->cb.send(ievent->req->handle, ISC_R_CANCELED,
ievent->req->cbarg);
isc__nm_uvreq_put(&ievent->req, ievent->req->sock);
isc__nm_uvreq_put(&ievent->req);
}
}
@@ -462,7 +462,7 @@ udp_send_cb(uv_udp_send_t *req, int status) {
uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
isc_nmhandle_unref(uvreq->handle);
isc__nm_uvreq_put(&uvreq, uvreq->sock);
isc__nm_uvreq_put(&uvreq);
}
/*

View File

@@ -119,6 +119,8 @@ quota_release(isc_quota_t *quota) {
UNLOCK(&quota->cblock);
if (cb != NULL) {
cb->cb_func(quota, cb->data);
cb->cb_func = NULL;
cb->data = NULL;
return;
}
}