Compare commits

...

1 Commits

Author SHA1 Message Date
Ondřej Surý
5335ed2524 Replace netmgr quantum with loop-preventing barrier
Instead of using fixed quantum, this commit adds atomic counter for
number of items on each queue and uses the number of netievents
scheduled to run as the limit of maximum number of netievents for a
single process_queue() run.

This prevents the endless loops when the netievent would schedule more
netievents onto the same loop, but we don't have to pick "magic" number
for the quantum.
2021-05-16 21:24:56 +02:00
4 changed files with 172 additions and 75 deletions

View File

@@ -64,7 +64,7 @@
#define ISC_R_MULTICAST 43 /*%< invalid use of multicast */
#define ISC_R_NOTFILE 44 /*%< not a file */
#define ISC_R_NOTDIRECTORY 45 /*%< not a directory */
#define ISC_R_QUEUEFULL 46 /*%< queue is full */
#define ISC_R_EMPTY 46 /*%< queue is empty */
#define ISC_R_FAMILYMISMATCH 47 /*%< address family mismatch */
#define ISC_R_FAMILYNOSUPPORT 48 /*%< AF not supported */
#define ISC_R_BADHEX 49 /*%< bad hex encoding */

View File

@@ -40,8 +40,6 @@
#include "uv-compat.h"
#define ISC_NETMGR_QUANTUM_DEFAULT 1024
#define ISC_NETMGR_TID_UNKNOWN -1
/* Must be different from ISC_NETMGR_TID_UNKNOWN */
@@ -179,12 +177,16 @@ typedef struct isc__networker {
bool finished;
isc_thread_t thread;
isc_queue_t *ievents; /* incoming async events */
atomic_uint_fast32_t nievents;
isc_queue_t *ievents_priv; /* privileged async tasks */
atomic_uint_fast32_t nievents_priv;
isc_queue_t *ievents_task; /* async tasks */
atomic_uint_fast32_t nievents_task;
isc_queue_t *ievents_prio; /* priority async events
* used for listening etc.
* can be processed while
* worker is paused */
atomic_uint_fast32_t nievents_prio;
isc_condition_t cond_prio;
isc_refcount_t references;
@@ -192,7 +194,6 @@ typedef struct isc__networker {
char *recvbuf;
char *sendbuf;
bool recvbuf_inuse;
unsigned int quantum;
} isc__networker_t;
/*

View File

@@ -137,28 +137,25 @@ static void
async_cb(uv_async_t *handle);
static bool
process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
static bool
static isc_result_t
process_queue(isc__networker_t *worker, isc_queue_t *queue,
unsigned int *quantump);
atomic_uint_fast32_t *nqueue);
static void
wait_for_priority_queue(isc__networker_t *worker);
static bool
process_priority_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_privilege_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_task_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_normal_queue(isc__networker_t *worker, unsigned int *quantump);
#define drain_priority_queue(worker) \
(void)process_priority_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_privilege_queue(worker) \
(void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_task_queue(worker) \
(void)process_task_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_normal_queue(worker) \
(void)process_normal_queue(worker, &(unsigned int){ UINT_MAX })
static void
drain_priority_queue(isc__networker_t *worker);
static void
drain_privilege_queue(isc__networker_t *worker);
static void
drain_task_queue(isc__networker_t *worker);
static isc_result_t
process_priority_queue(isc__networker_t *worker);
static isc_result_t
process_privilege_queue(isc__networker_t *worker);
static isc_result_t
process_task_queue(isc__networker_t *worker);
static isc_result_t
process_normal_queue(isc__networker_t *worker);
static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
@@ -283,7 +280,6 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
*worker = (isc__networker_t){
.mgr = mgr,
.id = i,
.quantum = ISC_NETMGR_QUANTUM_DEFAULT,
};
r = uv_loop_init(&worker->loop);
@@ -735,20 +731,72 @@ nm_thread(isc_threadarg_t worker0) {
}
static bool
process_all_queues(isc__networker_t *worker, unsigned int quantum) {
process_all_queues(isc__networker_t *worker) {
isc_result_t result = ISC_R_SUCCESS;
bool reschedule = false;
/*
* The queue processing functions will return false when the
* system is pausing or stopping, or if we have completed
* 'quantum' events.
*
* We don't want to proceed to a new queue until the previous one
* has been fully drained, so whenever one queue is interrupted,
* we skip all the later ones.
* system is pausing or stopping and we don't want to process
* the other queues in such case, but we need the async event
* to be rescheduled in the next uv_run().
*/
return (process_priority_queue(worker, &quantum) &&
process_privilege_queue(worker, &quantum) &&
process_task_queue(worker, &quantum) &&
process_normal_queue(worker, &quantum));
result = process_priority_queue(worker);
switch (result) {
case ISC_R_SUSPEND:
return (true);
case ISC_R_EMPTY:
/* empty queue */
break;
case ISC_R_SUCCESS:
reschedule = true;
break;
default:
INSIST(0);
}
result = process_privilege_queue(worker);
switch (result) {
case ISC_R_SUSPEND:
return (true);
case ISC_R_EMPTY:
/* empty queue */
break;
case ISC_R_SUCCESS:
reschedule = true;
break;
default:
INSIST(0);
}
result = process_task_queue(worker);
switch (result) {
case ISC_R_SUSPEND:
return (true);
case ISC_R_EMPTY:
/* empty queue */
break;
case ISC_R_SUCCESS:
reschedule = true;
break;
default:
INSIST(0);
}
result = process_normal_queue(worker);
switch (result) {
case ISC_R_SUSPEND:
return (true);
case ISC_R_EMPTY:
/* empty queue */
break;
case ISC_R_SUCCESS:
reschedule = true;
break;
default:
INSIST(0);
}
return (reschedule);
}
/*
@@ -760,9 +808,8 @@ process_all_queues(isc__networker_t *worker, unsigned int quantum) {
static void
async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
unsigned int quantum = worker->quantum;
if (!process_all_queues(worker, quantum)) {
if (process_all_queues(worker)) {
/* If we didn't process all the events, we need to enqueue
* async_cb to be run in the next iteration of the uv_loop
*/
@@ -836,43 +883,65 @@ wait_for_priority_queue(isc__networker_t *worker) {
while (true) {
isc__netievent_t *ievent;
LOCK(&worker->lock);
ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
if (wait_for_work) {
while (ievent == NULL) {
while (true) {
ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
if (wait_for_work && ievent == NULL) {
WAIT(cond, &worker->lock);
ievent = (isc__netievent_t *)isc_queue_dequeue(
queue);
continue;
}
break;
}
UNLOCK(&worker->lock);
wait_for_work = false;
UNLOCK(&worker->lock);
if (ievent == NULL) {
return;
}
atomic_fetch_sub_release(&worker->nievents_prio, 1);
(void)process_netievent(worker, ievent);
}
}
static bool
process_priority_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_prio, quantump));
static void
drain_priority_queue(isc__networker_t *worker) {
while (process_priority_queue(worker) != ISC_R_EMPTY)
;
}
static bool
process_privilege_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_priv, quantump));
static void
drain_privilege_queue(isc__networker_t *worker) {
while (process_privilege_queue(worker) != ISC_R_EMPTY)
;
}
static bool
process_task_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_task, quantump));
static void
drain_task_queue(isc__networker_t *worker) {
while (process_task_queue(worker) != ISC_R_EMPTY)
;
}
static bool
process_normal_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents, quantump));
static isc_result_t
process_priority_queue(isc__networker_t *worker) {
return (process_queue(worker, worker->ievents_prio,
&worker->nievents_prio));
}
static isc_result_t
process_privilege_queue(isc__networker_t *worker) {
return (process_queue(worker, worker->ievents_priv,
&worker->nievents_priv));
}
static isc_result_t
process_task_queue(isc__networker_t *worker) {
return (process_queue(worker, worker->ievents_task,
&worker->nievents_task));
}
static isc_result_t
process_normal_queue(isc__networker_t *worker) {
return (process_queue(worker, worker->ievents, &worker->nievents));
}
/*
@@ -973,28 +1042,51 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) {
return (true);
}
static bool
/*
* The queue and nqueue is only loosely synchronized, it's possible that there's
* item
*/
static isc_result_t
process_queue(isc__networker_t *worker, isc_queue_t *queue,
unsigned int *quantump) {
while (*quantump > 0) {
isc__netievent_t *ievent =
(isc__netievent_t *)isc_queue_dequeue(queue);
atomic_uint_fast32_t *nqueue) {
/*
* The number of items on the queue is only loosely synchronized with
* the items on the queue. But there's a guarantee that if there's an
* item on the queue, it will be accounted for. However there's a
* possibility that the counter might be higher than the items on the
* queue stored.
*/
uint_fast32_t waiting = atomic_load_acquire(nqueue);
isc__netievent_t *ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
if (ievent == NULL) {
/* We fully drained this queue */
return (true);
}
(*quantump)--;
if (!process_netievent(worker, ievent)) {
/* Netievent told us to stop */
return (false);
}
if (ievent == NULL && waiting == 0) {
/* There's nothing scheduled */
return (ISC_R_EMPTY);
} else if (ievent == NULL) {
/* There's at least one item scheduled, but not on the queue yet
*/
return (ISC_R_SUCCESS);
}
/* No more quantum */
return (false);
while (ievent != NULL) {
atomic_fetch_sub_release(nqueue, 1);
bool stop = !process_netievent(worker, ievent);
if (stop) {
/* Netievent told us to stop */
return (ISC_R_SUSPEND);
}
if (waiting-- == 0) {
/* We reached this round "quota" */
break;
}
ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
}
/* We processed at least one */
return (ISC_R_SUCCESS);
}
void *
@@ -1096,14 +1188,18 @@ isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
* the queue will be processed.
*/
LOCK(&worker->lock);
atomic_fetch_add_release(&worker->nievents_prio, 1);
isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event);
SIGNAL(&worker->cond_prio);
UNLOCK(&worker->lock);
} else if (event->type == netievent_privilegedtask) {
atomic_fetch_add_release(&worker->nievents_priv, 1);
isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event);
} else if (event->type == netievent_task) {
atomic_fetch_add_release(&worker->nievents_task, 1);
isc_queue_enqueue(worker->ievents_task, (uintptr_t)event);
} else {
atomic_fetch_add_release(&worker->nievents, 1);
isc_queue_enqueue(worker->ievents, (uintptr_t)event);
}
uv_async_send(&worker->async);

View File

@@ -77,7 +77,7 @@ static const char *description[ISC_R_NRESULTS] = {
"invalid use of multicast address", /*%< 43 */
"not a file", /*%< 44 */
"not a directory", /*%< 45 */
"queue is full", /*%< 46 */
"queue is empty", /*%< 46 */
"address family mismatch", /*%< 47 */
"address family not supported", /*%< 48 */
"bad hex encoding", /*%< 49 */
@@ -151,7 +151,7 @@ static const char *identifier[ISC_R_NRESULTS] = { "ISC_R_SUCCESS",
"ISC_R_MULTICAST",
"ISC_R_NOTFILE",
"ISC_R_NOTDIRECTORY",
"ISC_R_QUEUEFULL",
"ISC_R_EMPTY",
"ISC_R_FAMILYMISMATCH",
"ISC_R_FAMILYNOSUPPORT",
"ISC_R_BADHEX",