diff --git a/lib/isc/include/isc/netmgr.h b/lib/isc/include/isc/netmgr.h index ca40fc6bcd..b8f19bf15a 100644 --- a/lib/isc/include/isc/netmgr.h +++ b/lib/isc/include/isc/netmgr.h @@ -68,6 +68,12 @@ typedef void (*isc_nm_opaquecb_t)(void *arg); * callbacks. */ +typedef void (*isc_nm_workcb_t)(void *arg); +typedef void (*isc_nm_after_workcb_t)(void *arg, isc_result_t result); +/*%< + * Callback functions for libuv threadpool work (see uv_work_t) + */ + void isc_nm_attach(isc_nm_t *mgr, isc_nm_t **dst); void @@ -529,6 +535,19 @@ isc_nm_task_enqueue(isc_nm_t *mgr, isc_task_t *task, int threadid); * maximum number of 'workers' as specifed in isc_nm_start() */ +void +isc_nm_work_offload(isc_nm_t *mgr, isc_nm_workcb_t work_cb, + isc_nm_after_workcb_t after_work_cb, void *data); +/*%< + * Schedules a job to be handled by the libuv thread pool (see uv_work_t). + * The function specified in `work_cb` will be run by a thread in the + * thread pool; when complete, the `after_work_cb` function will run. + * + * Requires: + * \li 'mgr' is a valid netmgr object. + * \li We are currently running in a network manager thread. + */ + void isc__nm_force_tid(int tid); /*%< diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index dece53bfec..9e5445664b 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -646,6 +646,17 @@ typedef union { isc__netievent_tlsconnect_t nitc; } isc__netievent_storage_t; +/* + * Work item for a uv_work threadpool. + */ +typedef struct isc__nm_work { + isc_nm_t *netmgr; + uv_work_t req; + isc_nm_workcb_t cb; + isc_nm_after_workcb_t after_cb; + void *data; +} isc__nm_work_t; + /* * Network manager */ diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 9692be6a4b..0b708b57d5 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -40,6 +40,7 @@ #include "netmgr-int.h" #include "netmgr_p.h" #include "openssl_shim.h" +#include "trampoline_p.h" #include "uv-compat.h" /*% @@ -199,6 +200,14 @@ static void isc__nm_async_detach(isc__networker_t *worker, isc__netievent_t *ev0); static void isc__nm_async_close(isc__networker_t *worker, isc__netievent_t *ev0); + +static void +isc__nm_threadpool_initialize(uint32_t workers); +static void +isc__nm_work_cb(uv_work_t *req); +static void +isc__nm_after_work_cb(uv_work_t *req, int status); + /*%< * Issue a 'handle closed' callback on the socket. */ @@ -256,6 +265,17 @@ isc__nm_winsock_destroy(void) { } #endif /* WIN32 */ +static void +isc__nm_threadpool_initialize(uint32_t workers) { + char buf[11]; + int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf, + &(size_t){ sizeof(buf) }); + if (r == UV_ENOENT) { + snprintf(buf, sizeof(buf), "%" PRIu32, workers); + uv_os_setenv("UV_THREADPOOL_SIZE", buf); + } +} + void isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { isc_nm_t *mgr = NULL; @@ -267,6 +287,8 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { isc__nm_winsock_initialize(); #endif /* WIN32 */ + isc__nm_threadpool_initialize(workers); + mgr = isc_mem_get(mctx, sizeof(*mgr)); *mgr = (isc_nm_t){ .nworkers = workers }; @@ -280,8 +302,6 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { atomic_init(&mgr->workers_paused, 0); atomic_init(&mgr->paused, false); atomic_init(&mgr->closing, false); - atomic_init(&mgr->idle, false); - atomic_init(&mgr->keepalive, false); atomic_init(&mgr->recv_tcp_buffer_size, 0); atomic_init(&mgr->send_tcp_buffer_size, 0); atomic_init(&mgr->recv_udp_buffer_size, 0); @@ -818,7 +838,8 @@ async_cb(uv_async_t *handle) { isc__networker_t *worker = (isc__networker_t *)handle->loop->data; if (process_all_queues(worker)) { - /* If we didn't process all the events, we need to enqueue + /* + * If we didn't process all the events, we need to enqueue * async_cb to be run in the next iteration of the uv_loop */ uv_async_send(handle); @@ -3242,6 +3263,73 @@ isc__nm_set_network_buffers(isc_nm_t *nm, uv_handle_t *handle) { } } +static isc_threadresult_t +isc__nm_work_run(isc_threadarg_t arg) { + isc__nm_work_t *work = (isc__nm_work_t *)arg; + + work->cb(work->data); + + return ((isc_threadresult_t)0); +} + +static void +isc__nm_work_cb(uv_work_t *req) { + isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req); + + if (isc_tid_v == SIZE_MAX) { + isc__trampoline_t *trampoline_arg = + isc__trampoline_get(isc__nm_work_run, work); + (void)isc__trampoline_run(trampoline_arg); + } else { + (void)isc__nm_work_run((isc_threadarg_t)work); + } +} + +static void +isc__nm_after_work_cb(uv_work_t *req, int status) { + isc_result_t result = ISC_R_SUCCESS; + isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req); + isc_nm_t *netmgr = work->netmgr; + + if (status != 0) { + result = isc__nm_uverr2result(status); + } + + work->after_cb(work->data, result); + + isc_mem_put(netmgr->mctx, work, sizeof(*work)); + + isc_nm_detach(&netmgr); +} + +void +isc_nm_work_offload(isc_nm_t *netmgr, isc_nm_workcb_t work_cb, + isc_nm_after_workcb_t after_work_cb, void *data) { + isc__networker_t *worker = NULL; + isc__nm_work_t *work = NULL; + int r; + + REQUIRE(isc__nm_in_netthread()); + REQUIRE(VALID_NM(netmgr)); + + worker = &netmgr->workers[isc_nm_tid()]; + + work = isc_mem_get(netmgr->mctx, sizeof(*work)); + *work = (isc__nm_work_t){ + .cb = work_cb, + .after_cb = after_work_cb, + .data = data, + }; + + isc_nm_attach(netmgr, &work->netmgr); + + uv_req_set_data((uv_req_t *)&work->req, work); + + r = uv_queue_work(&worker->loop, &work->req, isc__nm_work_cb, + isc__nm_after_work_cb); + RUNTIME_CHECK(r == 0); +} + #ifdef NETMGR_TRACE /* * Dump all active sockets in netmgr. We output to stderr @@ -3302,7 +3390,8 @@ nmsocket_dump(isc_nmsocket_t *sock) { nmsocket_type_totext(sock->type), isc_refcount_current(&sock->references)); fprintf(stderr, - "Parent %p, listener %p, server %p, statichandle = %p\n", + "Parent %p, listener %p, server %p, statichandle = " + "%p\n", sock->parent, sock->listener, sock->server, sock->statichandle); fprintf(stderr, "Flags:%s%s%s%s%s\n", atomic_load(&sock->active) ? " active" : "", diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index fdf402a572..664ff6c065 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -474,11 +474,13 @@ isc_nm_tcpdnsconnect isc_nm_gettimeouts isc_nm_setnetbuffers isc_nm_settimeouts +isc_nm_task_enqueue isc_nm_tcpdns_keepalive isc_nm_tcpdns_sequential isc_nm_tid isc_nm_tlsdnsconnect isc_nm_udpconnect +isc_nm_work_offload isc_nmsocket_close isc__nm_acquire_interlocked isc__nm_drop_interlocked