Merge branch 'ondrej/remove-netmgr-quantum' into 'main'

Replace netmgr quantum with loop-preventing barrier

See merge request isc-projects/bind9!5028
This commit is contained in:
Ondřej Surý
2021-05-17 10:05:11 +00:00
4 changed files with 157 additions and 116 deletions

View File

@@ -64,7 +64,7 @@
#define ISC_R_MULTICAST 43 /*%< invalid use of multicast */
#define ISC_R_NOTFILE 44 /*%< not a file */
#define ISC_R_NOTDIRECTORY 45 /*%< not a directory */
#define ISC_R_QUEUEFULL 46 /*%< queue is full */
#define ISC_R_EMPTY 46 /*%< queue is empty */
#define ISC_R_FAMILYMISMATCH 47 /*%< address family mismatch */
#define ISC_R_FAMILYNOSUPPORT 48 /*%< AF not supported */
#define ISC_R_BADHEX 49 /*%< bad hex encoding */

View File

@@ -40,8 +40,6 @@
#include "uv-compat.h"
#define ISC_NETMGR_QUANTUM_DEFAULT 1024
#define ISC_NETMGR_TID_UNKNOWN -1
/* Must be different from ISC_NETMGR_TID_UNKNOWN */
@@ -165,6 +163,17 @@ isc__nm_dump_active(isc_nm_t *nm);
#define isc__nmsocket_prep_destroy(sock) isc___nmsocket_prep_destroy(sock)
#endif
/*
* Queue types in the order of processing priority.
*/
typedef enum {
NETIEVENT_PRIORITY = 0,
NETIEVENT_PRIVILEGED = 1,
NETIEVENT_TASK = 2,
NETIEVENT_NORMAL = 3,
NETIEVENT_MAX = 4,
} netievent_type_t;
/*
* Single network event loop worker.
*/
@@ -178,13 +187,8 @@ typedef struct isc__networker {
bool paused;
bool finished;
isc_thread_t thread;
isc_queue_t *ievents; /* incoming async events */
isc_queue_t *ievents_priv; /* privileged async tasks */
isc_queue_t *ievents_task; /* async tasks */
isc_queue_t *ievents_prio; /* priority async events
* used for listening etc.
* can be processed while
* worker is paused */
isc_queue_t *ievents[NETIEVENT_MAX];
atomic_uint_fast32_t nievents[NETIEVENT_MAX];
isc_condition_t cond_prio;
isc_refcount_t references;
@@ -192,7 +196,6 @@ typedef struct isc__networker {
char *recvbuf;
char *sendbuf;
bool recvbuf_inuse;
unsigned int quantum;
} isc__networker_t;
/*

View File

@@ -137,28 +137,57 @@ static void
async_cb(uv_async_t *handle);
static bool
process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue,
unsigned int *quantump);
static isc_result_t
process_queue(isc__networker_t *worker, netievent_type_t type);
static void
wait_for_priority_queue(isc__networker_t *worker);
static bool
process_priority_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_privilege_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_task_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_normal_queue(isc__networker_t *worker, unsigned int *quantump);
static void
drain_queue(isc__networker_t *worker, netievent_type_t type);
#define drain_priority_queue(worker) \
(void)process_priority_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_privilege_queue(worker) \
(void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_task_queue(worker) \
(void)process_task_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_normal_queue(worker) \
(void)process_normal_queue(worker, &(unsigned int){ UINT_MAX })
#define ENQUEUE_NETIEVENT(worker, queue, event) \
isc_queue_enqueue(worker->ievents[queue], (uintptr_t)event)
#define DEQUEUE_NETIEVENT(worker, queue) \
(isc__netievent_t *)isc_queue_dequeue(worker->ievents[queue])
#define ENQUEUE_PRIORITY_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY, event)
#define ENQUEUE_PRIVILEGED_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED, event)
#define ENQUEUE_TASK_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_TASK, event)
#define ENQUEUE_NORMAL_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL, event)
#define DEQUEUE_PRIORITY_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define DEQUEUE_PRIVILEGED_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define DEQUEUE_TASK_NETIEVENT(worker) DEQUEUE_NETIEVENT(worker, NETIEVENT_TASK)
#define DEQUEUE_NORMAL_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL)
#define INCREMENT_NETIEVENT(worker, queue) \
atomic_fetch_add_release(&worker->nievents[queue], 1)
#define DECREMENT_NETIEVENT(worker, queue) \
atomic_fetch_sub_release(&worker->nievents[queue], 1)
#define INCREMENT_PRIORITY_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define INCREMENT_PRIVILEGED_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define INCREMENT_TASK_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_TASK)
#define INCREMENT_NORMAL_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
#define DECREMENT_PRIORITY_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define DECREMENT_PRIVILEGED_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define DECREMENT_TASK_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_TASK)
#define DECREMENT_NORMAL_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
@@ -283,7 +312,6 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
*worker = (isc__networker_t){
.mgr = mgr,
.id = i,
.quantum = ISC_NETMGR_QUANTUM_DEFAULT,
};
r = uv_loop_init(&worker->loop);
@@ -296,11 +324,10 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
isc_mutex_init(&worker->lock);
worker->ievents = isc_queue_new(mgr->mctx, 128);
worker->ievents_task = isc_queue_new(mgr->mctx, 128);
worker->ievents_priv = isc_queue_new(mgr->mctx, 128);
worker->ievents_prio = isc_queue_new(mgr->mctx, 128);
isc_condition_init(&worker->cond_prio);
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
worker->ievents[type] = isc_queue_new(mgr->mctx, 128);
atomic_init(&worker->nievents[type], 0);
}
worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE);
@@ -354,20 +381,14 @@ nm_destroy(isc_nm_t **mgr0) {
int r;
/* Empty the async event queues */
while ((ievent = (isc__netievent_t *)isc_queue_dequeue(
worker->ievents)) != NULL)
{
while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {
isc_mempool_put(mgr->evpool, ievent);
}
INSIST(isc_queue_dequeue(worker->ievents_priv) ==
(uintptr_t)NULL);
INSIST(isc_queue_dequeue(worker->ievents_task) ==
(uintptr_t)NULL);
INSIST(DEQUEUE_PRIVILEGED_NETIEVENT(worker) == NULL);
INSIST(DEQUEUE_TASK_NETIEVENT(worker) == NULL);
while ((ievent = (isc__netievent_t *)isc_queue_dequeue(
worker->ievents_prio)) != NULL)
{
while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {
isc_mempool_put(mgr->evpool, ievent);
}
isc_condition_destroy(&worker->cond_prio);
@@ -375,11 +396,9 @@ nm_destroy(isc_nm_t **mgr0) {
r = uv_loop_close(&worker->loop);
INSIST(r == 0);
isc_queue_destroy(worker->ievents);
isc_queue_destroy(worker->ievents_priv);
isc_queue_destroy(worker->ievents_task);
isc_queue_destroy(worker->ievents_prio);
isc_mutex_destroy(&worker->lock);
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
isc_queue_destroy(worker->ievents[type]);
}
isc_mem_put(mgr->mctx, worker->sendbuf,
ISC_NETMGR_SENDBUF_SIZE);
@@ -487,7 +506,7 @@ isc_nm_resume(isc_nm_t *mgr) {
if (isc__nm_in_netthread()) {
REQUIRE(isc_nm_tid() == 0);
drain_priority_queue(&mgr->workers[isc_nm_tid()]);
drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIORITY);
}
for (int i = 0; i < mgr->nworkers; i++) {
@@ -500,7 +519,7 @@ isc_nm_resume(isc_nm_t *mgr) {
}
if (isc__nm_in_netthread()) {
drain_privilege_queue(&mgr->workers[isc_nm_tid()]);
drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIVILEGED);
atomic_fetch_sub(&mgr->workers_paused, 1);
isc_barrier_wait(&mgr->resuming);
@@ -711,7 +730,7 @@ nm_thread(isc_threadarg_t worker0) {
* All workers must drain the privileged event
* queue before we resume from pause.
*/
drain_privilege_queue(worker);
drain_queue(worker, NETIEVENT_PRIVILEGED);
atomic_fetch_sub(&mgr->workers_paused, 1);
if (isc_barrier_wait(&mgr->resuming) != 0) {
@@ -734,8 +753,8 @@ nm_thread(isc_threadarg_t worker0) {
* (they may include shutdown events) but do not process
* the netmgr event queue.
*/
drain_privilege_queue(worker);
drain_task_queue(worker);
drain_queue(worker, NETIEVENT_PRIVILEGED);
drain_queue(worker, NETIEVENT_TASK);
LOCK(&mgr->lock);
mgr->workers_running--;
@@ -746,20 +765,32 @@ nm_thread(isc_threadarg_t worker0) {
}
static bool
process_all_queues(isc__networker_t *worker, unsigned int quantum) {
process_all_queues(isc__networker_t *worker) {
bool reschedule = false;
/*
* The queue processing functions will return false when the
* system is pausing or stopping, or if we have completed
* 'quantum' events.
*
* We don't want to proceed to a new queue until the previous one
* has been fully drained, so whenever one queue is interrupted,
* we skip all the later ones.
* system is pausing or stopping and we don't want to process
* the other queues in such case, but we need the async event
* to be rescheduled in the next uv_run().
*/
return (process_priority_queue(worker, &quantum) &&
process_privilege_queue(worker, &quantum) &&
process_task_queue(worker, &quantum) &&
process_normal_queue(worker, &quantum));
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
isc_result_t result = process_queue(worker, type);
switch (result) {
case ISC_R_SUSPEND:
return (true);
case ISC_R_EMPTY:
/* empty queue */
break;
case ISC_R_SUCCESS:
reschedule = true;
break;
default:
INSIST(0);
ISC_UNREACHABLE();
}
}
return (reschedule);
}
/*
@@ -771,9 +802,8 @@ process_all_queues(isc__networker_t *worker, unsigned int quantum) {
static void
async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
unsigned int quantum = worker->quantum;
if (!process_all_queues(worker, quantum)) {
if (process_all_queues(worker)) {
/* If we didn't process all the events, we need to enqueue
* async_cb to be run in the next iteration of the uv_loop
*/
@@ -840,19 +870,17 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) {
static void
wait_for_priority_queue(isc__networker_t *worker) {
isc_queue_t *queue = worker->ievents_prio;
isc_condition_t *cond = &worker->cond_prio;
bool wait_for_work = true;
while (true) {
isc__netievent_t *ievent;
LOCK(&worker->lock);
ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
if (wait_for_work) {
while (ievent == NULL) {
WAIT(cond, &worker->lock);
ievent = (isc__netievent_t *)isc_queue_dequeue(
queue);
ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
}
}
UNLOCK(&worker->lock);
@@ -861,29 +889,17 @@ wait_for_priority_queue(isc__networker_t *worker) {
if (ievent == NULL) {
return;
}
DECREMENT_PRIORITY_NETIEVENT(worker);
(void)process_netievent(worker, ievent);
}
}
static bool
process_priority_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_prio, quantump));
}
static bool
process_privilege_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_priv, quantump));
}
static bool
process_task_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_task, quantump));
}
static bool
process_normal_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents, quantump));
static void
drain_queue(isc__networker_t *worker, netievent_type_t type) {
while (process_queue(worker, type) != ISC_R_EMPTY) {
;
}
}
/*
@@ -984,28 +1000,46 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) {
return (true);
}
static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue,
unsigned int *quantump) {
while (*quantump > 0) {
isc__netievent_t *ievent =
(isc__netievent_t *)isc_queue_dequeue(queue);
static isc_result_t
process_queue(isc__networker_t *worker, netievent_type_t type) {
/*
* The number of items on the queue is only loosely synchronized with
* the items on the queue. But there's a guarantee that if there's an
* item on the queue, it will be accounted for. However there's a
* possibility that the counter might be higher than the items on the
* queue stored.
*/
uint_fast32_t waiting = atomic_load_acquire(&worker->nievents[type]);
isc__netievent_t *ievent = DEQUEUE_NETIEVENT(worker, type);
if (ievent == NULL) {
/* We fully drained this queue */
return (true);
}
(*quantump)--;
if (!process_netievent(worker, ievent)) {
/* Netievent told us to stop */
return (false);
}
if (ievent == NULL && waiting == 0) {
/* There's nothing scheduled */
return (ISC_R_EMPTY);
} else if (ievent == NULL) {
/* There's at least one item scheduled, but not on the queue yet
*/
return (ISC_R_SUCCESS);
}
/* No more quantum */
return (false);
while (ievent != NULL) {
DECREMENT_NETIEVENT(worker, type);
bool stop = !process_netievent(worker, ievent);
if (stop) {
/* Netievent told us to stop */
return (ISC_R_SUSPEND);
}
if (waiting-- == 0) {
/* We reached this round "quota" */
break;
}
ievent = DEQUEUE_NETIEVENT(worker, type);
}
/* We processed at least one */
return (ISC_R_SUCCESS);
}
void *
@@ -1107,15 +1141,19 @@ isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
* the queue will be processed.
*/
LOCK(&worker->lock);
isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event);
INCREMENT_PRIORITY_NETIEVENT(worker);
ENQUEUE_PRIORITY_NETIEVENT(worker, event);
SIGNAL(&worker->cond_prio);
UNLOCK(&worker->lock);
} else if (event->type == netievent_privilegedtask) {
isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event);
INCREMENT_PRIVILEGED_NETIEVENT(worker);
ENQUEUE_PRIVILEGED_NETIEVENT(worker, event);
} else if (event->type == netievent_task) {
isc_queue_enqueue(worker->ievents_task, (uintptr_t)event);
INCREMENT_TASK_NETIEVENT(worker);
ENQUEUE_TASK_NETIEVENT(worker, event);
} else {
isc_queue_enqueue(worker->ievents, (uintptr_t)event);
INCREMENT_NORMAL_NETIEVENT(worker);
ENQUEUE_NORMAL_NETIEVENT(worker, event);
}
uv_async_send(&worker->async);
}

View File

@@ -77,7 +77,7 @@ static const char *description[ISC_R_NRESULTS] = {
"invalid use of multicast address", /*%< 43 */
"not a file", /*%< 44 */
"not a directory", /*%< 45 */
"queue is full", /*%< 46 */
"queue is empty", /*%< 46 */
"address family mismatch", /*%< 47 */
"address family not supported", /*%< 48 */
"bad hex encoding", /*%< 49 */
@@ -151,7 +151,7 @@ static const char *identifier[ISC_R_NRESULTS] = { "ISC_R_SUCCESS",
"ISC_R_MULTICAST",
"ISC_R_NOTFILE",
"ISC_R_NOTDIRECTORY",
"ISC_R_QUEUEFULL",
"ISC_R_EMPTY",
"ISC_R_FAMILYMISMATCH",
"ISC_R_FAMILYNOSUPPORT",
"ISC_R_BADHEX",