Make the netmgr queue processing quantized

There was a theoretical possibility of clogging up the queue processing
with an endless loop where currently processing netievent would schedule
new netievent that would get processed immediately.  This wasn't such a
problem when only netmgr netievents were processed, but with the
addition of the tasks, there are at least two situation where this could
happen:

 1. In lib/dns/zone.c:setnsec3param() the task would get re-enqueued
    when the zone was not yet fully loaded.

 2. Tasks have internal quantum for maximum number of isc_events to be
    processed, when the task quantum is reached, the task would get
    rescheduled and then immediately processed by the netmgr queue
    processing.

As the isc_queue doesn't have a mechanism to atomically move the queue,
this commit adds a mechanism to quantize the queue, so enqueueing new
netievents will never stop processing other uv_loop_t events.
The default quantum size is 128.

Since the queue used in the network manager allows items to be enqueued
more than once, tasks are now reference-counted around task_ready()
and task_run(). task_ready() now has a public API wrapper,
isc_task_ready(), that the netmgr can use to reschedule processing
of a task if the quantum has been reached.

Incidental changes: Cleaned up some unused fields left in isc_task_t
and isc_taskmgr_t after the last refactoring, and changed atomic
flags to atomic_bools for easier manipulation.
This commit is contained in:
Ondřej Surý
2021-04-27 12:03:34 +02:00
committed by Evan Hunt
parent b5bf58b419
commit dacf586e18
5 changed files with 125 additions and 126 deletions

View File

@@ -136,6 +136,12 @@ isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum,
*\li #ISC_R_SHUTTINGDOWN
*/
void
isc_task_ready(isc_task_t *task);
/*%<
* Enqueue the task onto netmgr queue.
*/
isc_result_t
isc_task_run(isc_task_t *task);
/*%<

View File

@@ -39,6 +39,8 @@
#include "uv-compat.h"
#define ISC_NETMGR_QUANTUM_DEFAULT 128
#define ISC_NETMGR_TID_UNKNOWN -1
/* Must be different from ISC_NETMGR_TID_UNKNOWN */
@@ -188,6 +190,7 @@ typedef struct isc__networker {
char *recvbuf;
char *sendbuf;
bool recvbuf_inuse;
unsigned int quantum;
} isc__networker_t;
/*

View File

@@ -135,15 +135,16 @@ nm_thread(isc_threadarg_t worker0);
static void
async_cb(uv_async_t *handle);
static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue);
process_queue(isc__networker_t *worker, isc_queue_t *queue,
unsigned int *quantump);
static bool
process_priority_queue(isc__networker_t *worker);
static void
process_privilege_queue(isc__networker_t *worker);
static void
process_tasks_queue(isc__networker_t *worker);
static void
process_normal_queue(isc__networker_t *worker);
process_priority_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_privilege_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_task_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_normal_queue(isc__networker_t *worker, unsigned int *quantump);
static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
@@ -264,6 +265,7 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
*worker = (isc__networker_t){
.mgr = mgr,
.id = i,
.quantum = ISC_NETMGR_QUANTUM_DEFAULT,
};
r = uv_loop_init(&worker->loop);
@@ -278,8 +280,8 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
isc_condition_init(&worker->cond);
worker->ievents = isc_queue_new(mgr->mctx, 128);
worker->ievents_priv = isc_queue_new(mgr->mctx, 128);
worker->ievents_task = isc_queue_new(mgr->mctx, 128);
worker->ievents_priv = isc_queue_new(mgr->mctx, 128);
worker->ievents_prio = isc_queue_new(mgr->mctx, 128);
worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE);
@@ -631,7 +633,8 @@ nm_thread(isc_threadarg_t worker0) {
while (worker->paused) {
WAIT(&worker->cond, &worker->lock);
UNLOCK(&worker->lock);
(void)process_priority_queue(worker);
(void)process_priority_queue(
worker, &(unsigned int){ UINT_MAX });
LOCK(&worker->lock);
}
@@ -642,10 +645,11 @@ nm_thread(isc_threadarg_t worker0) {
UNLOCK(&worker->lock);
/*
* All workers must run the privileged event
* All workers must drain the privileged event
* queue before we resume from pause.
*/
process_privilege_queue(worker);
(void)process_privilege_queue(
worker, &(unsigned int){ UINT_MAX });
LOCK(&mgr->lock);
while (atomic_load(&mgr->paused)) {
@@ -660,16 +664,6 @@ nm_thread(isc_threadarg_t worker0) {
}
INSIST(!worker->finished);
/*
* We've fully resumed from pause. Drain the normal
* asynchronous event queues before resuming the uv_run()
* loop. (This is not strictly necessary, it just ensures
* that all pending events are processed before another
* pause can slip in.)
*/
process_tasks_queue(worker);
process_normal_queue(worker);
}
/*
@@ -677,8 +671,8 @@ nm_thread(isc_threadarg_t worker0) {
* (they may include shutdown events) but do not process
* the netmgr event queue.
*/
process_privilege_queue(worker);
process_tasks_queue(worker);
(void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX });
(void)process_task_queue(worker, &(unsigned int){ UINT_MAX });
LOCK(&mgr->lock);
mgr->workers_running--;
@@ -688,6 +682,23 @@ nm_thread(isc_threadarg_t worker0) {
return ((isc_threadresult_t)0);
}
static bool
process_all_queues(isc__networker_t *worker, unsigned int quantum) {
/*
* The queue processing functions will return false when the
* system is pausing or stopping, or if we have completed
* 'quantum' events.
*
* We don't want to proceed to a new queue until the previous one
* has been fully drained, so whenever one queue is interrupted,
* we skip all the later ones.
*/
return (process_priority_queue(worker, &quantum) &&
process_privilege_queue(worker, &quantum) &&
process_task_queue(worker, &quantum) &&
process_normal_queue(worker, &quantum));
}
/*
* async_cb() is a universal callback for 'async' events sent to event loop.
* It's the only way to safely pass data to the libuv event loop. We use a
@@ -697,18 +708,14 @@ nm_thread(isc_threadarg_t worker0) {
static void
async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
unsigned int quantum = worker->quantum;
/*
* process_priority_queue() returns false when pausing or stopping,
* so we don't want to process the other queues in that case.
*/
if (!process_priority_queue(worker)) {
return;
if (!process_all_queues(worker, quantum)) {
/* If we didn't process all the events, we need to enqueue
* async_cb to be run in the next iteration of the uv_loop
*/
uv_async_send(handle);
}
process_privilege_queue(worker);
process_tasks_queue(worker);
process_normal_queue(worker);
}
static void
@@ -775,8 +782,7 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) {
switch (result) {
case ISC_R_QUOTA:
isc_nm_task_enqueue(worker->mgr, (isc_task_t *)ievent->task,
isc_nm_tid());
isc_task_ready(ievent->task);
return;
case ISC_R_SUCCESS:
return;
@@ -787,23 +793,23 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) {
}
static bool
process_priority_queue(isc__networker_t *worker) {
return (process_queue(worker, worker->ievents_prio));
process_priority_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_prio, quantump));
}
static void
process_privilege_queue(isc__networker_t *worker) {
(void)process_queue(worker, worker->ievents_priv);
static bool
process_privilege_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_priv, quantump));
}
static void
process_tasks_queue(isc__networker_t *worker) {
(void)process_queue(worker, worker->ievents_task);
static bool
process_task_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_task, quantump));
}
static void
process_normal_queue(isc__networker_t *worker) {
(void)process_queue(worker, worker->ievents);
static bool
process_normal_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents, quantump));
}
/*
@@ -905,16 +911,27 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) {
}
static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue) {
isc__netievent_t *ievent = NULL;
process_queue(isc__networker_t *worker, isc_queue_t *queue,
unsigned int *quantump) {
while (*quantump > 0) {
isc__netievent_t *ievent =
(isc__netievent_t *)isc_queue_dequeue(queue);
(*quantump)--;
if (ievent == NULL) {
/* We fully drained this queue */
return (true);
}
while ((ievent = (isc__netievent_t *)isc_queue_dequeue(queue)) != NULL)
{
if (!process_netievent(worker, ievent)) {
/* Netievent told us to stop */
return (false);
}
}
return (true);
/* No more quantum */
return (false);
}
void *

View File

@@ -17,6 +17,7 @@
*/
#include <stdbool.h>
#include <unistd.h>
#include <isc/app.h>
#include <isc/atomic.h>
@@ -102,6 +103,7 @@ struct isc_task {
task_state_t state;
int pause_cnt;
isc_refcount_t references;
isc_refcount_t running;
isc_eventlist_t events;
isc_eventlist_t on_shutdown;
unsigned int nevents;
@@ -112,21 +114,14 @@ struct isc_task {
void *tag;
bool bound;
/* Protected by atomics */
atomic_uint_fast32_t flags;
atomic_bool shuttingdown;
atomic_bool privileged;
/* Locked by task manager lock. */
LINK(isc_task_t) link;
};
#define TASK_F_SHUTTINGDOWN 0x01
#define TASK_F_PRIVILEGED 0x02
#define TASK_SHUTTINGDOWN(t) \
((atomic_load_acquire(&(t)->flags) & TASK_F_SHUTTINGDOWN) != 0)
#define TASK_PRIVILEGED(t) \
((atomic_load_acquire(&(t)->flags) & TASK_F_PRIVILEGED) != 0)
#define TASK_FLAG_SET(t, f) atomic_fetch_or_release(&(t)->flags, (f))
#define TASK_FLAG_CLR(t, f) atomic_fetch_and_release(&(t)->flags, ~(f))
#define TASK_SHUTTINGDOWN(t) (atomic_load_acquire(&(t)->shuttingdown))
#define TASK_PRIVILEGED(t) (atomic_load_acquire(&(t)->privileged))
#define TASK_MANAGER_MAGIC ISC_MAGIC('T', 'S', 'K', 'M')
#define VALID_MANAGER(m) ISC_MAGIC_VALID(m, TASK_MANAGER_MAGIC)
@@ -137,10 +132,8 @@ struct isc_taskmgr {
isc_refcount_t references;
isc_mem_t *mctx;
isc_mutex_t lock;
atomic_uint_fast32_t tasks_running;
atomic_uint_fast32_t tasks_ready;
atomic_uint_fast32_t tasks_count;
isc_nm_t *nm;
isc_nm_t *netmgr;
/* Locked by task manager lock. */
unsigned int default_quantum;
@@ -148,9 +141,6 @@ struct isc_taskmgr {
atomic_bool exclusive_req;
atomic_bool exiting;
/* Locked by halt_lock */
unsigned int halted;
/*
* Multiple threads can read/write 'excl' at the same time, so we need
* to protect the access. We can't use 'lock' since isc_task_detach()
@@ -161,9 +151,6 @@ struct isc_taskmgr {
};
#define DEFAULT_DEFAULT_QUANTUM 25
#define FINISHED(m) \
(atomic_load_relaxed(&((m)->exiting)) && \
atomic_load(&(m)->tasks_count) == 0)
/*%
* The following are intended for internal use (indicated by "isc__"
@@ -193,6 +180,7 @@ task_finished(isc_task_t *task) {
XTRACE("task_finished");
isc_refcount_destroy(&task->running);
isc_refcount_destroy(&task->references);
LOCK(&manager->lock);
@@ -251,11 +239,13 @@ isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum,
task->pause_cnt = 0;
isc_refcount_init(&task->references, 1);
isc_refcount_init(&task->running, 0);
INIT_LIST(task->events);
INIT_LIST(task->on_shutdown);
task->nevents = 0;
task->quantum = (quantum > 0) ? quantum : manager->default_quantum;
atomic_init(&task->flags, 0);
atomic_init(&task->shuttingdown, false);
atomic_init(&task->privileged, false);
task->now = 0;
isc_time_settoepoch(&task->tnow);
memset(task->name, 0, sizeof(task->name));
@@ -311,9 +301,9 @@ task_shutdown(isc_task_t *task) {
XTRACE("task_shutdown");
if (!TASK_SHUTTINGDOWN(task)) {
if (atomic_compare_exchange_strong(&task->shuttingdown,
&(bool){ false }, true)) {
XTRACE("shutting down");
TASK_FLAG_SET(task, TASK_F_SHUTTINGDOWN);
if (task->state == task_state_idle) {
INSIST(EMPTY(task->events));
task->state = task_state_ready;
@@ -350,7 +340,14 @@ task_ready(isc_task_t *task) {
REQUIRE(VALID_MANAGER(manager));
XTRACE("task_ready");
isc_nm_task_enqueue(manager->nm, task, task->threadid);
isc_refcount_increment0(&task->running);
isc_nm_task_enqueue(manager->netmgr, task, task->threadid);
}
void
isc_task_ready(isc_task_t *task) {
task_ready(task);
}
static inline bool
@@ -820,8 +817,7 @@ task_run(isc_task_t *task) {
* and task lock to avoid deadlocks, just bail then.
*/
if (task->state != task_state_ready) {
UNLOCK(&task->lock);
return (ISC_R_SUCCESS);
goto done;
}
INSIST(task->state == task_state_ready);
@@ -886,7 +882,6 @@ task_run(isc_task_t *task) {
* The task is done.
*/
XTRACE("done");
finished = true;
task->state = task_state_done;
} else {
if (task->state == task_state_running) {
@@ -920,6 +915,13 @@ task_run(isc_task_t *task) {
break;
}
}
done:
if (isc_refcount_decrement(&task->running) == 1 &&
task->state == task_state_done)
{
finished = true;
}
UNLOCK(&task->lock);
if (finished) {
@@ -937,7 +939,7 @@ isc_task_run(isc_task_t *task) {
static void
manager_free(isc_taskmgr_t *manager) {
isc_refcount_destroy(&manager->references);
isc_nm_detach(&manager->nm);
isc_nm_detach(&manager->netmgr);
isc_mutex_destroy(&manager->lock);
isc_mutex_destroy(&manager->excl_lock);
@@ -988,13 +990,10 @@ isc__taskmgr_create(isc_mem_t *mctx, unsigned int default_quantum, isc_nm_t *nm,
manager->default_quantum = default_quantum;
if (nm != NULL) {
isc_nm_attach(nm, &manager->nm);
isc_nm_attach(nm, &manager->netmgr);
}
INIT_LIST(manager->tasks);
atomic_init(&manager->tasks_count, 0);
atomic_init(&manager->tasks_running, 0);
atomic_init(&manager->tasks_ready, 0);
atomic_init(&manager->exiting, false);
atomic_store_relaxed(&manager->exclusive_req, false);
@@ -1074,6 +1073,18 @@ isc__taskmgr_destroy(isc_taskmgr_t **managerp) {
XTHREADTRACE("isc_taskmgr_destroy");
#ifdef ISC_TASK_TRACE
int counter = 0;
while (isc_refcount_current(&manager->references) > 1 &&
counter++ < 1000) {
usleep(10 * 1000);
}
#else
while (isc_refcount_current(&manager->references) > 1) {
usleep(10 * 1000);
}
#endif
REQUIRE(isc_refcount_decrement(&manager->references) == 1);
manager_free(manager);
}
@@ -1130,7 +1141,7 @@ isc_task_beginexclusive(isc_task_t *task) {
return (ISC_R_LOCKBUSY);
}
isc_nm_pause(manager->nm);
isc_nm_pause(manager->netmgr);
return (ISC_R_SUCCESS);
}
@@ -1143,7 +1154,7 @@ isc_task_endexclusive(isc_task_t *task) {
REQUIRE(task->state == task_state_running);
manager = task->manager;
isc_nm_resume(manager->nm);
isc_nm_resume(manager->netmgr);
REQUIRE(atomic_compare_exchange_strong(&manager->exclusive_req,
&(bool){ true }, false));
}
@@ -1210,20 +1221,8 @@ isc_task_unpause(isc_task_t *task) {
void
isc_task_setprivilege(isc_task_t *task, bool priv) {
REQUIRE(VALID_TASK(task));
uint_fast32_t oldflags, newflags;
oldflags = atomic_load_acquire(&task->flags);
do {
if (priv) {
newflags = oldflags | TASK_F_PRIVILEGED;
} else {
newflags = oldflags & ~TASK_F_PRIVILEGED;
}
if (newflags == oldflags) {
return;
}
} while (!atomic_compare_exchange_weak_acq_rel(&task->flags, &oldflags,
newflags));
atomic_store_release(&task->privileged, priv);
}
bool
@@ -1269,21 +1268,6 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr, void *writer0) {
mgr->default_quantum));
TRY0(xmlTextWriterEndElement(writer)); /* default-quantum */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-count"));
TRY0(xmlTextWriterWriteFormatString(
writer, "%d", (int)atomic_load_relaxed(&mgr->tasks_count)));
TRY0(xmlTextWriterEndElement(writer)); /* tasks-count */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running"));
TRY0(xmlTextWriterWriteFormatString(
writer, "%d", (int)atomic_load_relaxed(&mgr->tasks_running)));
TRY0(xmlTextWriterEndElement(writer)); /* tasks-running */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-ready"));
TRY0(xmlTextWriterWriteFormatString(
writer, "%d", (int)atomic_load_relaxed(&mgr->tasks_ready)));
TRY0(xmlTextWriterEndElement(writer)); /* tasks-ready */
TRY0(xmlTextWriterEndElement(writer)); /* thread-model */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks"));
@@ -1373,18 +1357,6 @@ isc_taskmgr_renderjson(isc_taskmgr_t *mgr, void *tasks0) {
CHECKMEM(obj);
json_object_object_add(tasks, "default-quantum", obj);
obj = json_object_new_int(atomic_load_relaxed(&mgr->tasks_count));
CHECKMEM(obj);
json_object_object_add(tasks, "tasks-count", obj);
obj = json_object_new_int(atomic_load_relaxed(&mgr->tasks_running));
CHECKMEM(obj);
json_object_object_add(tasks, "tasks-running", obj);
obj = json_object_new_int(atomic_load_relaxed(&mgr->tasks_ready));
CHECKMEM(obj);
json_object_object_add(tasks, "tasks-ready", obj);
array = json_object_new_array();
CHECKMEM(array);

View File

@@ -101,6 +101,7 @@ isc_socketmgr_setreserved
isc_socketmgr_setstats
isc_task_getname
isc_task_gettag
isc_task_ready
isc_task_run
isc_task_unsendrange
isc_taskmgr_attach