From 9e3cb396b23db668c2585606d4284a1b89649f88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Fri, 7 May 2021 12:58:40 +0200 Subject: [PATCH] Replace netmgr quantum with loop-preventing barrier Instead of using fixed quantum, this commit adds atomic counter for number of items on each queue and uses the number of netievents scheduled to run as the limit of maximum number of netievents for a single process_queue() run. This prevents the endless loops when the netievent would schedule more netievents onto the same loop, but we don't have to pick "magic" number for the quantum. --- lib/isc/include/isc/result.h | 2 +- lib/isc/netmgr/netmgr-int.h | 23 ++-- lib/isc/netmgr/netmgr.c | 244 ++++++++++++++++++++--------------- lib/isc/result.c | 4 +- 4 files changed, 157 insertions(+), 116 deletions(-) diff --git a/lib/isc/include/isc/result.h b/lib/isc/include/isc/result.h index 22969775df..21071c7dad 100644 --- a/lib/isc/include/isc/result.h +++ b/lib/isc/include/isc/result.h @@ -64,7 +64,7 @@ #define ISC_R_MULTICAST 43 /*%< invalid use of multicast */ #define ISC_R_NOTFILE 44 /*%< not a file */ #define ISC_R_NOTDIRECTORY 45 /*%< not a directory */ -#define ISC_R_QUEUEFULL 46 /*%< queue is full */ +#define ISC_R_EMPTY 46 /*%< queue is empty */ #define ISC_R_FAMILYMISMATCH 47 /*%< address family mismatch */ #define ISC_R_FAMILYNOSUPPORT 48 /*%< AF not supported */ #define ISC_R_BADHEX 49 /*%< bad hex encoding */ diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index f131400152..0ad9bd42c8 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -40,8 +40,6 @@ #include "uv-compat.h" -#define ISC_NETMGR_QUANTUM_DEFAULT 1024 - #define ISC_NETMGR_TID_UNKNOWN -1 /* Must be different from ISC_NETMGR_TID_UNKNOWN */ @@ -165,6 +163,17 @@ isc__nm_dump_active(isc_nm_t *nm); #define isc__nmsocket_prep_destroy(sock) isc___nmsocket_prep_destroy(sock) #endif +/* + * Queue types in the order of processing priority. + */ +typedef enum { + NETIEVENT_PRIORITY = 0, + NETIEVENT_PRIVILEGED = 1, + NETIEVENT_TASK = 2, + NETIEVENT_NORMAL = 3, + NETIEVENT_MAX = 4, +} netievent_type_t; + /* * Single network event loop worker. */ @@ -178,13 +187,8 @@ typedef struct isc__networker { bool paused; bool finished; isc_thread_t thread; - isc_queue_t *ievents; /* incoming async events */ - isc_queue_t *ievents_priv; /* privileged async tasks */ - isc_queue_t *ievents_task; /* async tasks */ - isc_queue_t *ievents_prio; /* priority async events - * used for listening etc. - * can be processed while - * worker is paused */ + isc_queue_t *ievents[NETIEVENT_MAX]; + atomic_uint_fast32_t nievents[NETIEVENT_MAX]; isc_condition_t cond_prio; isc_refcount_t references; @@ -192,7 +196,6 @@ typedef struct isc__networker { char *recvbuf; char *sendbuf; bool recvbuf_inuse; - unsigned int quantum; } isc__networker_t; /* diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index ac2f38992a..f9affb2c04 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -137,28 +137,57 @@ static void async_cb(uv_async_t *handle); static bool process_netievent(isc__networker_t *worker, isc__netievent_t *ievent); -static bool -process_queue(isc__networker_t *worker, isc_queue_t *queue, - unsigned int *quantump); +static isc_result_t +process_queue(isc__networker_t *worker, netievent_type_t type); static void wait_for_priority_queue(isc__networker_t *worker); -static bool -process_priority_queue(isc__networker_t *worker, unsigned int *quantump); -static bool -process_privilege_queue(isc__networker_t *worker, unsigned int *quantump); -static bool -process_task_queue(isc__networker_t *worker, unsigned int *quantump); -static bool -process_normal_queue(isc__networker_t *worker, unsigned int *quantump); +static void +drain_queue(isc__networker_t *worker, netievent_type_t type); -#define drain_priority_queue(worker) \ - (void)process_priority_queue(worker, &(unsigned int){ UINT_MAX }) -#define drain_privilege_queue(worker) \ - (void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX }) -#define drain_task_queue(worker) \ - (void)process_task_queue(worker, &(unsigned int){ UINT_MAX }) -#define drain_normal_queue(worker) \ - (void)process_normal_queue(worker, &(unsigned int){ UINT_MAX }) +#define ENQUEUE_NETIEVENT(worker, queue, event) \ + isc_queue_enqueue(worker->ievents[queue], (uintptr_t)event) +#define DEQUEUE_NETIEVENT(worker, queue) \ + (isc__netievent_t *)isc_queue_dequeue(worker->ievents[queue]) + +#define ENQUEUE_PRIORITY_NETIEVENT(worker, event) \ + ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY, event) +#define ENQUEUE_PRIVILEGED_NETIEVENT(worker, event) \ + ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED, event) +#define ENQUEUE_TASK_NETIEVENT(worker, event) \ + ENQUEUE_NETIEVENT(worker, NETIEVENT_TASK, event) +#define ENQUEUE_NORMAL_NETIEVENT(worker, event) \ + ENQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL, event) + +#define DEQUEUE_PRIORITY_NETIEVENT(worker) \ + DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY) +#define DEQUEUE_PRIVILEGED_NETIEVENT(worker) \ + DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED) +#define DEQUEUE_TASK_NETIEVENT(worker) DEQUEUE_NETIEVENT(worker, NETIEVENT_TASK) +#define DEQUEUE_NORMAL_NETIEVENT(worker) \ + DEQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL) + +#define INCREMENT_NETIEVENT(worker, queue) \ + atomic_fetch_add_release(&worker->nievents[queue], 1) +#define DECREMENT_NETIEVENT(worker, queue) \ + atomic_fetch_sub_release(&worker->nievents[queue], 1) + +#define INCREMENT_PRIORITY_NETIEVENT(worker) \ + INCREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY) +#define INCREMENT_PRIVILEGED_NETIEVENT(worker) \ + INCREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED) +#define INCREMENT_TASK_NETIEVENT(worker) \ + INCREMENT_NETIEVENT(worker, NETIEVENT_TASK) +#define INCREMENT_NORMAL_NETIEVENT(worker) \ + INCREMENT_NETIEVENT(worker, NETIEVENT_NORMAL) + +#define DECREMENT_PRIORITY_NETIEVENT(worker) \ + DECREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY) +#define DECREMENT_PRIVILEGED_NETIEVENT(worker) \ + DECREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED) +#define DECREMENT_TASK_NETIEVENT(worker) \ + DECREMENT_NETIEVENT(worker, NETIEVENT_TASK) +#define DECREMENT_NORMAL_NETIEVENT(worker) \ + DECREMENT_NETIEVENT(worker, NETIEVENT_NORMAL) static void isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0); @@ -283,7 +312,6 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { *worker = (isc__networker_t){ .mgr = mgr, .id = i, - .quantum = ISC_NETMGR_QUANTUM_DEFAULT, }; r = uv_loop_init(&worker->loop); @@ -296,11 +324,10 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { isc_mutex_init(&worker->lock); - worker->ievents = isc_queue_new(mgr->mctx, 128); - worker->ievents_task = isc_queue_new(mgr->mctx, 128); - worker->ievents_priv = isc_queue_new(mgr->mctx, 128); - worker->ievents_prio = isc_queue_new(mgr->mctx, 128); - isc_condition_init(&worker->cond_prio); + for (size_t type = 0; type < NETIEVENT_MAX; type++) { + worker->ievents[type] = isc_queue_new(mgr->mctx, 128); + atomic_init(&worker->nievents[type], 0); + } worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE); worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE); @@ -354,20 +381,14 @@ nm_destroy(isc_nm_t **mgr0) { int r; /* Empty the async event queues */ - while ((ievent = (isc__netievent_t *)isc_queue_dequeue( - worker->ievents)) != NULL) - { + while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) { isc_mempool_put(mgr->evpool, ievent); } - INSIST(isc_queue_dequeue(worker->ievents_priv) == - (uintptr_t)NULL); - INSIST(isc_queue_dequeue(worker->ievents_task) == - (uintptr_t)NULL); + INSIST(DEQUEUE_PRIVILEGED_NETIEVENT(worker) == NULL); + INSIST(DEQUEUE_TASK_NETIEVENT(worker) == NULL); - while ((ievent = (isc__netievent_t *)isc_queue_dequeue( - worker->ievents_prio)) != NULL) - { + while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) { isc_mempool_put(mgr->evpool, ievent); } isc_condition_destroy(&worker->cond_prio); @@ -375,11 +396,9 @@ nm_destroy(isc_nm_t **mgr0) { r = uv_loop_close(&worker->loop); INSIST(r == 0); - isc_queue_destroy(worker->ievents); - isc_queue_destroy(worker->ievents_priv); - isc_queue_destroy(worker->ievents_task); - isc_queue_destroy(worker->ievents_prio); - isc_mutex_destroy(&worker->lock); + for (size_t type = 0; type < NETIEVENT_MAX; type++) { + isc_queue_destroy(worker->ievents[type]); + } isc_mem_put(mgr->mctx, worker->sendbuf, ISC_NETMGR_SENDBUF_SIZE); @@ -487,7 +506,7 @@ isc_nm_resume(isc_nm_t *mgr) { if (isc__nm_in_netthread()) { REQUIRE(isc_nm_tid() == 0); - drain_priority_queue(&mgr->workers[isc_nm_tid()]); + drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIORITY); } for (int i = 0; i < mgr->nworkers; i++) { @@ -500,7 +519,7 @@ isc_nm_resume(isc_nm_t *mgr) { } if (isc__nm_in_netthread()) { - drain_privilege_queue(&mgr->workers[isc_nm_tid()]); + drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIVILEGED); atomic_fetch_sub(&mgr->workers_paused, 1); isc_barrier_wait(&mgr->resuming); @@ -711,7 +730,7 @@ nm_thread(isc_threadarg_t worker0) { * All workers must drain the privileged event * queue before we resume from pause. */ - drain_privilege_queue(worker); + drain_queue(worker, NETIEVENT_PRIVILEGED); atomic_fetch_sub(&mgr->workers_paused, 1); if (isc_barrier_wait(&mgr->resuming) != 0) { @@ -734,8 +753,8 @@ nm_thread(isc_threadarg_t worker0) { * (they may include shutdown events) but do not process * the netmgr event queue. */ - drain_privilege_queue(worker); - drain_task_queue(worker); + drain_queue(worker, NETIEVENT_PRIVILEGED); + drain_queue(worker, NETIEVENT_TASK); LOCK(&mgr->lock); mgr->workers_running--; @@ -746,20 +765,32 @@ nm_thread(isc_threadarg_t worker0) { } static bool -process_all_queues(isc__networker_t *worker, unsigned int quantum) { +process_all_queues(isc__networker_t *worker) { + bool reschedule = false; /* * The queue processing functions will return false when the - * system is pausing or stopping, or if we have completed - * 'quantum' events. - * - * We don't want to proceed to a new queue until the previous one - * has been fully drained, so whenever one queue is interrupted, - * we skip all the later ones. + * system is pausing or stopping and we don't want to process + * the other queues in such case, but we need the async event + * to be rescheduled in the next uv_run(). */ - return (process_priority_queue(worker, &quantum) && - process_privilege_queue(worker, &quantum) && - process_task_queue(worker, &quantum) && - process_normal_queue(worker, &quantum)); + for (size_t type = 0; type < NETIEVENT_MAX; type++) { + isc_result_t result = process_queue(worker, type); + switch (result) { + case ISC_R_SUSPEND: + return (true); + case ISC_R_EMPTY: + /* empty queue */ + break; + case ISC_R_SUCCESS: + reschedule = true; + break; + default: + INSIST(0); + ISC_UNREACHABLE(); + } + } + + return (reschedule); } /* @@ -771,9 +802,8 @@ process_all_queues(isc__networker_t *worker, unsigned int quantum) { static void async_cb(uv_async_t *handle) { isc__networker_t *worker = (isc__networker_t *)handle->loop->data; - unsigned int quantum = worker->quantum; - if (!process_all_queues(worker, quantum)) { + if (process_all_queues(worker)) { /* If we didn't process all the events, we need to enqueue * async_cb to be run in the next iteration of the uv_loop */ @@ -840,19 +870,17 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) { static void wait_for_priority_queue(isc__networker_t *worker) { - isc_queue_t *queue = worker->ievents_prio; isc_condition_t *cond = &worker->cond_prio; bool wait_for_work = true; while (true) { isc__netievent_t *ievent; LOCK(&worker->lock); - ievent = (isc__netievent_t *)isc_queue_dequeue(queue); + ievent = DEQUEUE_PRIORITY_NETIEVENT(worker); if (wait_for_work) { while (ievent == NULL) { WAIT(cond, &worker->lock); - ievent = (isc__netievent_t *)isc_queue_dequeue( - queue); + ievent = DEQUEUE_PRIORITY_NETIEVENT(worker); } } UNLOCK(&worker->lock); @@ -861,29 +889,17 @@ wait_for_priority_queue(isc__networker_t *worker) { if (ievent == NULL) { return; } + DECREMENT_PRIORITY_NETIEVENT(worker); (void)process_netievent(worker, ievent); } } -static bool -process_priority_queue(isc__networker_t *worker, unsigned int *quantump) { - return (process_queue(worker, worker->ievents_prio, quantump)); -} - -static bool -process_privilege_queue(isc__networker_t *worker, unsigned int *quantump) { - return (process_queue(worker, worker->ievents_priv, quantump)); -} - -static bool -process_task_queue(isc__networker_t *worker, unsigned int *quantump) { - return (process_queue(worker, worker->ievents_task, quantump)); -} - -static bool -process_normal_queue(isc__networker_t *worker, unsigned int *quantump) { - return (process_queue(worker, worker->ievents, quantump)); +static void +drain_queue(isc__networker_t *worker, netievent_type_t type) { + while (process_queue(worker, type) != ISC_R_EMPTY) { + ; + } } /* @@ -984,28 +1000,46 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) { return (true); } -static bool -process_queue(isc__networker_t *worker, isc_queue_t *queue, - unsigned int *quantump) { - while (*quantump > 0) { - isc__netievent_t *ievent = - (isc__netievent_t *)isc_queue_dequeue(queue); +static isc_result_t +process_queue(isc__networker_t *worker, netievent_type_t type) { + /* + * The number of items on the queue is only loosely synchronized with + * the items on the queue. But there's a guarantee that if there's an + * item on the queue, it will be accounted for. However there's a + * possibility that the counter might be higher than the items on the + * queue stored. + */ + uint_fast32_t waiting = atomic_load_acquire(&worker->nievents[type]); + isc__netievent_t *ievent = DEQUEUE_NETIEVENT(worker, type); - if (ievent == NULL) { - /* We fully drained this queue */ - return (true); - } - - (*quantump)--; - - if (!process_netievent(worker, ievent)) { - /* Netievent told us to stop */ - return (false); - } + if (ievent == NULL && waiting == 0) { + /* There's nothing scheduled */ + return (ISC_R_EMPTY); + } else if (ievent == NULL) { + /* There's at least one item scheduled, but not on the queue yet + */ + return (ISC_R_SUCCESS); } - /* No more quantum */ - return (false); + while (ievent != NULL) { + DECREMENT_NETIEVENT(worker, type); + bool stop = !process_netievent(worker, ievent); + + if (stop) { + /* Netievent told us to stop */ + return (ISC_R_SUSPEND); + } + + if (waiting-- == 0) { + /* We reached this round "quota" */ + break; + } + + ievent = DEQUEUE_NETIEVENT(worker, type); + } + + /* We processed at least one */ + return (ISC_R_SUCCESS); } void * @@ -1107,15 +1141,19 @@ isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { * the queue will be processed. */ LOCK(&worker->lock); - isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event); + INCREMENT_PRIORITY_NETIEVENT(worker); + ENQUEUE_PRIORITY_NETIEVENT(worker, event); SIGNAL(&worker->cond_prio); UNLOCK(&worker->lock); } else if (event->type == netievent_privilegedtask) { - isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event); + INCREMENT_PRIVILEGED_NETIEVENT(worker); + ENQUEUE_PRIVILEGED_NETIEVENT(worker, event); } else if (event->type == netievent_task) { - isc_queue_enqueue(worker->ievents_task, (uintptr_t)event); + INCREMENT_TASK_NETIEVENT(worker); + ENQUEUE_TASK_NETIEVENT(worker, event); } else { - isc_queue_enqueue(worker->ievents, (uintptr_t)event); + INCREMENT_NORMAL_NETIEVENT(worker); + ENQUEUE_NORMAL_NETIEVENT(worker, event); } uv_async_send(&worker->async); } diff --git a/lib/isc/result.c b/lib/isc/result.c index 565e8406d3..72e7a3c28e 100644 --- a/lib/isc/result.c +++ b/lib/isc/result.c @@ -77,7 +77,7 @@ static const char *description[ISC_R_NRESULTS] = { "invalid use of multicast address", /*%< 43 */ "not a file", /*%< 44 */ "not a directory", /*%< 45 */ - "queue is full", /*%< 46 */ + "queue is empty", /*%< 46 */ "address family mismatch", /*%< 47 */ "address family not supported", /*%< 48 */ "bad hex encoding", /*%< 49 */ @@ -151,7 +151,7 @@ static const char *identifier[ISC_R_NRESULTS] = { "ISC_R_SUCCESS", "ISC_R_MULTICAST", "ISC_R_NOTFILE", "ISC_R_NOTDIRECTORY", - "ISC_R_QUEUEFULL", + "ISC_R_EMPTY", "ISC_R_FAMILYMISMATCH", "ISC_R_FAMILYNOSUPPORT", "ISC_R_BADHEX",