diff --git a/lib/isc/include/isc/task.h b/lib/isc/include/isc/task.h index f8a790403e..45d062ee32 100644 --- a/lib/isc/include/isc/task.h +++ b/lib/isc/include/isc/task.h @@ -136,6 +136,12 @@ isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum, *\li #ISC_R_SHUTTINGDOWN */ +void +isc_task_ready(isc_task_t *task); +/*%< + * Enqueue the task onto netmgr queue. + */ + isc_result_t isc_task_run(isc_task_t *task); /*%< diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 4384932dca..6db3f93d05 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -39,6 +39,8 @@ #include "uv-compat.h" +#define ISC_NETMGR_QUANTUM_DEFAULT 128 + #define ISC_NETMGR_TID_UNKNOWN -1 /* Must be different from ISC_NETMGR_TID_UNKNOWN */ @@ -188,6 +190,7 @@ typedef struct isc__networker { char *recvbuf; char *sendbuf; bool recvbuf_inuse; + unsigned int quantum; } isc__networker_t; /* diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 85a1f1714a..f080e8f52c 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -135,15 +135,16 @@ nm_thread(isc_threadarg_t worker0); static void async_cb(uv_async_t *handle); static bool -process_queue(isc__networker_t *worker, isc_queue_t *queue); +process_queue(isc__networker_t *worker, isc_queue_t *queue, + unsigned int *quantump); static bool -process_priority_queue(isc__networker_t *worker); -static void -process_privilege_queue(isc__networker_t *worker); -static void -process_tasks_queue(isc__networker_t *worker); -static void -process_normal_queue(isc__networker_t *worker); +process_priority_queue(isc__networker_t *worker, unsigned int *quantump); +static bool +process_privilege_queue(isc__networker_t *worker, unsigned int *quantump); +static bool +process_task_queue(isc__networker_t *worker, unsigned int *quantump); +static bool +process_normal_queue(isc__networker_t *worker, unsigned int *quantump); static void isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0); @@ -264,6 +265,7 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { *worker = (isc__networker_t){ .mgr = mgr, .id = i, + .quantum = ISC_NETMGR_QUANTUM_DEFAULT, }; r = uv_loop_init(&worker->loop); @@ -278,8 +280,8 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { isc_condition_init(&worker->cond); worker->ievents = isc_queue_new(mgr->mctx, 128); - worker->ievents_priv = isc_queue_new(mgr->mctx, 128); worker->ievents_task = isc_queue_new(mgr->mctx, 128); + worker->ievents_priv = isc_queue_new(mgr->mctx, 128); worker->ievents_prio = isc_queue_new(mgr->mctx, 128); worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE); worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE); @@ -631,7 +633,8 @@ nm_thread(isc_threadarg_t worker0) { while (worker->paused) { WAIT(&worker->cond, &worker->lock); UNLOCK(&worker->lock); - (void)process_priority_queue(worker); + (void)process_priority_queue( + worker, &(unsigned int){ UINT_MAX }); LOCK(&worker->lock); } @@ -642,10 +645,11 @@ nm_thread(isc_threadarg_t worker0) { UNLOCK(&worker->lock); /* - * All workers must run the privileged event + * All workers must drain the privileged event * queue before we resume from pause. */ - process_privilege_queue(worker); + (void)process_privilege_queue( + worker, &(unsigned int){ UINT_MAX }); LOCK(&mgr->lock); while (atomic_load(&mgr->paused)) { @@ -660,16 +664,6 @@ nm_thread(isc_threadarg_t worker0) { } INSIST(!worker->finished); - - /* - * We've fully resumed from pause. Drain the normal - * asynchronous event queues before resuming the uv_run() - * loop. (This is not strictly necessary, it just ensures - * that all pending events are processed before another - * pause can slip in.) - */ - process_tasks_queue(worker); - process_normal_queue(worker); } /* @@ -677,8 +671,8 @@ nm_thread(isc_threadarg_t worker0) { * (they may include shutdown events) but do not process * the netmgr event queue. */ - process_privilege_queue(worker); - process_tasks_queue(worker); + (void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX }); + (void)process_task_queue(worker, &(unsigned int){ UINT_MAX }); LOCK(&mgr->lock); mgr->workers_running--; @@ -688,6 +682,23 @@ nm_thread(isc_threadarg_t worker0) { return ((isc_threadresult_t)0); } +static bool +process_all_queues(isc__networker_t *worker, unsigned int quantum) { + /* + * The queue processing functions will return false when the + * system is pausing or stopping, or if we have completed + * 'quantum' events. + * + * We don't want to proceed to a new queue until the previous one + * has been fully drained, so whenever one queue is interrupted, + * we skip all the later ones. + */ + return (process_priority_queue(worker, &quantum) && + process_privilege_queue(worker, &quantum) && + process_task_queue(worker, &quantum) && + process_normal_queue(worker, &quantum)); +} + /* * async_cb() is a universal callback for 'async' events sent to event loop. * It's the only way to safely pass data to the libuv event loop. We use a @@ -697,18 +708,14 @@ nm_thread(isc_threadarg_t worker0) { static void async_cb(uv_async_t *handle) { isc__networker_t *worker = (isc__networker_t *)handle->loop->data; + unsigned int quantum = worker->quantum; - /* - * process_priority_queue() returns false when pausing or stopping, - * so we don't want to process the other queues in that case. - */ - if (!process_priority_queue(worker)) { - return; + if (!process_all_queues(worker, quantum)) { + /* If we didn't process all the events, we need to enqueue + * async_cb to be run in the next iteration of the uv_loop + */ + uv_async_send(handle); } - - process_privilege_queue(worker); - process_tasks_queue(worker); - process_normal_queue(worker); } static void @@ -775,8 +782,7 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) { switch (result) { case ISC_R_QUOTA: - isc_nm_task_enqueue(worker->mgr, (isc_task_t *)ievent->task, - isc_nm_tid()); + isc_task_ready(ievent->task); return; case ISC_R_SUCCESS: return; @@ -787,23 +793,23 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) { } static bool -process_priority_queue(isc__networker_t *worker) { - return (process_queue(worker, worker->ievents_prio)); +process_priority_queue(isc__networker_t *worker, unsigned int *quantump) { + return (process_queue(worker, worker->ievents_prio, quantump)); } -static void -process_privilege_queue(isc__networker_t *worker) { - (void)process_queue(worker, worker->ievents_priv); +static bool +process_privilege_queue(isc__networker_t *worker, unsigned int *quantump) { + return (process_queue(worker, worker->ievents_priv, quantump)); } -static void -process_tasks_queue(isc__networker_t *worker) { - (void)process_queue(worker, worker->ievents_task); +static bool +process_task_queue(isc__networker_t *worker, unsigned int *quantump) { + return (process_queue(worker, worker->ievents_task, quantump)); } -static void -process_normal_queue(isc__networker_t *worker) { - (void)process_queue(worker, worker->ievents); +static bool +process_normal_queue(isc__networker_t *worker, unsigned int *quantump) { + return (process_queue(worker, worker->ievents, quantump)); } /* @@ -905,16 +911,27 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) { } static bool -process_queue(isc__networker_t *worker, isc_queue_t *queue) { - isc__netievent_t *ievent = NULL; +process_queue(isc__networker_t *worker, isc_queue_t *queue, + unsigned int *quantump) { + while (*quantump > 0) { + isc__netievent_t *ievent = + (isc__netievent_t *)isc_queue_dequeue(queue); + + (*quantump)--; + + if (ievent == NULL) { + /* We fully drained this queue */ + return (true); + } - while ((ievent = (isc__netievent_t *)isc_queue_dequeue(queue)) != NULL) - { if (!process_netievent(worker, ievent)) { + /* Netievent told us to stop */ return (false); } } - return (true); + + /* No more quantum */ + return (false); } void * diff --git a/lib/isc/task.c b/lib/isc/task.c index 49112f22e7..27073afdb1 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -17,6 +17,7 @@ */ #include +#include #include #include @@ -102,6 +103,7 @@ struct isc_task { task_state_t state; int pause_cnt; isc_refcount_t references; + isc_refcount_t running; isc_eventlist_t events; isc_eventlist_t on_shutdown; unsigned int nevents; @@ -112,21 +114,14 @@ struct isc_task { void *tag; bool bound; /* Protected by atomics */ - atomic_uint_fast32_t flags; + atomic_bool shuttingdown; + atomic_bool privileged; /* Locked by task manager lock. */ LINK(isc_task_t) link; }; -#define TASK_F_SHUTTINGDOWN 0x01 -#define TASK_F_PRIVILEGED 0x02 - -#define TASK_SHUTTINGDOWN(t) \ - ((atomic_load_acquire(&(t)->flags) & TASK_F_SHUTTINGDOWN) != 0) -#define TASK_PRIVILEGED(t) \ - ((atomic_load_acquire(&(t)->flags) & TASK_F_PRIVILEGED) != 0) - -#define TASK_FLAG_SET(t, f) atomic_fetch_or_release(&(t)->flags, (f)) -#define TASK_FLAG_CLR(t, f) atomic_fetch_and_release(&(t)->flags, ~(f)) +#define TASK_SHUTTINGDOWN(t) (atomic_load_acquire(&(t)->shuttingdown)) +#define TASK_PRIVILEGED(t) (atomic_load_acquire(&(t)->privileged)) #define TASK_MANAGER_MAGIC ISC_MAGIC('T', 'S', 'K', 'M') #define VALID_MANAGER(m) ISC_MAGIC_VALID(m, TASK_MANAGER_MAGIC) @@ -137,10 +132,8 @@ struct isc_taskmgr { isc_refcount_t references; isc_mem_t *mctx; isc_mutex_t lock; - atomic_uint_fast32_t tasks_running; - atomic_uint_fast32_t tasks_ready; atomic_uint_fast32_t tasks_count; - isc_nm_t *nm; + isc_nm_t *netmgr; /* Locked by task manager lock. */ unsigned int default_quantum; @@ -148,9 +141,6 @@ struct isc_taskmgr { atomic_bool exclusive_req; atomic_bool exiting; - /* Locked by halt_lock */ - unsigned int halted; - /* * Multiple threads can read/write 'excl' at the same time, so we need * to protect the access. We can't use 'lock' since isc_task_detach() @@ -161,9 +151,6 @@ struct isc_taskmgr { }; #define DEFAULT_DEFAULT_QUANTUM 25 -#define FINISHED(m) \ - (atomic_load_relaxed(&((m)->exiting)) && \ - atomic_load(&(m)->tasks_count) == 0) /*% * The following are intended for internal use (indicated by "isc__" @@ -193,6 +180,7 @@ task_finished(isc_task_t *task) { XTRACE("task_finished"); + isc_refcount_destroy(&task->running); isc_refcount_destroy(&task->references); LOCK(&manager->lock); @@ -251,11 +239,13 @@ isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum, task->pause_cnt = 0; isc_refcount_init(&task->references, 1); + isc_refcount_init(&task->running, 0); INIT_LIST(task->events); INIT_LIST(task->on_shutdown); task->nevents = 0; task->quantum = (quantum > 0) ? quantum : manager->default_quantum; - atomic_init(&task->flags, 0); + atomic_init(&task->shuttingdown, false); + atomic_init(&task->privileged, false); task->now = 0; isc_time_settoepoch(&task->tnow); memset(task->name, 0, sizeof(task->name)); @@ -311,9 +301,9 @@ task_shutdown(isc_task_t *task) { XTRACE("task_shutdown"); - if (!TASK_SHUTTINGDOWN(task)) { + if (atomic_compare_exchange_strong(&task->shuttingdown, + &(bool){ false }, true)) { XTRACE("shutting down"); - TASK_FLAG_SET(task, TASK_F_SHUTTINGDOWN); if (task->state == task_state_idle) { INSIST(EMPTY(task->events)); task->state = task_state_ready; @@ -350,7 +340,14 @@ task_ready(isc_task_t *task) { REQUIRE(VALID_MANAGER(manager)); XTRACE("task_ready"); - isc_nm_task_enqueue(manager->nm, task, task->threadid); + + isc_refcount_increment0(&task->running); + isc_nm_task_enqueue(manager->netmgr, task, task->threadid); +} + +void +isc_task_ready(isc_task_t *task) { + task_ready(task); } static inline bool @@ -820,8 +817,7 @@ task_run(isc_task_t *task) { * and task lock to avoid deadlocks, just bail then. */ if (task->state != task_state_ready) { - UNLOCK(&task->lock); - return (ISC_R_SUCCESS); + goto done; } INSIST(task->state == task_state_ready); @@ -886,7 +882,6 @@ task_run(isc_task_t *task) { * The task is done. */ XTRACE("done"); - finished = true; task->state = task_state_done; } else { if (task->state == task_state_running) { @@ -920,6 +915,13 @@ task_run(isc_task_t *task) { break; } } + +done: + if (isc_refcount_decrement(&task->running) == 1 && + task->state == task_state_done) + { + finished = true; + } UNLOCK(&task->lock); if (finished) { @@ -937,7 +939,7 @@ isc_task_run(isc_task_t *task) { static void manager_free(isc_taskmgr_t *manager) { isc_refcount_destroy(&manager->references); - isc_nm_detach(&manager->nm); + isc_nm_detach(&manager->netmgr); isc_mutex_destroy(&manager->lock); isc_mutex_destroy(&manager->excl_lock); @@ -988,13 +990,10 @@ isc__taskmgr_create(isc_mem_t *mctx, unsigned int default_quantum, isc_nm_t *nm, manager->default_quantum = default_quantum; if (nm != NULL) { - isc_nm_attach(nm, &manager->nm); + isc_nm_attach(nm, &manager->netmgr); } INIT_LIST(manager->tasks); - atomic_init(&manager->tasks_count, 0); - atomic_init(&manager->tasks_running, 0); - atomic_init(&manager->tasks_ready, 0); atomic_init(&manager->exiting, false); atomic_store_relaxed(&manager->exclusive_req, false); @@ -1074,6 +1073,18 @@ isc__taskmgr_destroy(isc_taskmgr_t **managerp) { XTHREADTRACE("isc_taskmgr_destroy"); +#ifdef ISC_TASK_TRACE + int counter = 0; + while (isc_refcount_current(&manager->references) > 1 && + counter++ < 1000) { + usleep(10 * 1000); + } +#else + while (isc_refcount_current(&manager->references) > 1) { + usleep(10 * 1000); + } +#endif + REQUIRE(isc_refcount_decrement(&manager->references) == 1); manager_free(manager); } @@ -1130,7 +1141,7 @@ isc_task_beginexclusive(isc_task_t *task) { return (ISC_R_LOCKBUSY); } - isc_nm_pause(manager->nm); + isc_nm_pause(manager->netmgr); return (ISC_R_SUCCESS); } @@ -1143,7 +1154,7 @@ isc_task_endexclusive(isc_task_t *task) { REQUIRE(task->state == task_state_running); manager = task->manager; - isc_nm_resume(manager->nm); + isc_nm_resume(manager->netmgr); REQUIRE(atomic_compare_exchange_strong(&manager->exclusive_req, &(bool){ true }, false)); } @@ -1210,20 +1221,8 @@ isc_task_unpause(isc_task_t *task) { void isc_task_setprivilege(isc_task_t *task, bool priv) { REQUIRE(VALID_TASK(task)); - uint_fast32_t oldflags, newflags; - oldflags = atomic_load_acquire(&task->flags); - do { - if (priv) { - newflags = oldflags | TASK_F_PRIVILEGED; - } else { - newflags = oldflags & ~TASK_F_PRIVILEGED; - } - if (newflags == oldflags) { - return; - } - } while (!atomic_compare_exchange_weak_acq_rel(&task->flags, &oldflags, - newflags)); + atomic_store_release(&task->privileged, priv); } bool @@ -1269,21 +1268,6 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr, void *writer0) { mgr->default_quantum)); TRY0(xmlTextWriterEndElement(writer)); /* default-quantum */ - TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-count")); - TRY0(xmlTextWriterWriteFormatString( - writer, "%d", (int)atomic_load_relaxed(&mgr->tasks_count))); - TRY0(xmlTextWriterEndElement(writer)); /* tasks-count */ - - TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running")); - TRY0(xmlTextWriterWriteFormatString( - writer, "%d", (int)atomic_load_relaxed(&mgr->tasks_running))); - TRY0(xmlTextWriterEndElement(writer)); /* tasks-running */ - - TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-ready")); - TRY0(xmlTextWriterWriteFormatString( - writer, "%d", (int)atomic_load_relaxed(&mgr->tasks_ready))); - TRY0(xmlTextWriterEndElement(writer)); /* tasks-ready */ - TRY0(xmlTextWriterEndElement(writer)); /* thread-model */ TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks")); @@ -1373,18 +1357,6 @@ isc_taskmgr_renderjson(isc_taskmgr_t *mgr, void *tasks0) { CHECKMEM(obj); json_object_object_add(tasks, "default-quantum", obj); - obj = json_object_new_int(atomic_load_relaxed(&mgr->tasks_count)); - CHECKMEM(obj); - json_object_object_add(tasks, "tasks-count", obj); - - obj = json_object_new_int(atomic_load_relaxed(&mgr->tasks_running)); - CHECKMEM(obj); - json_object_object_add(tasks, "tasks-running", obj); - - obj = json_object_new_int(atomic_load_relaxed(&mgr->tasks_ready)); - CHECKMEM(obj); - json_object_object_add(tasks, "tasks-ready", obj); - array = json_object_new_array(); CHECKMEM(array); diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index 3c4dd02f71..e6f7787291 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -101,6 +101,7 @@ isc_socketmgr_setreserved isc_socketmgr_setstats isc_task_getname isc_task_gettag +isc_task_ready isc_task_run isc_task_unsendrange isc_taskmgr_attach