From 4fb9199fb3a01edfc6830b2f42f6946014efd6c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Mon, 26 Nov 2018 13:11:48 +0000 Subject: [PATCH] PoC: a separate worker for greedy tasks in taskmgr --- lib/isc/include/isc/task.h | 2 ++ lib/isc/task.c | 68 ++++++++++++++++++++++++++------------ 2 files changed, 48 insertions(+), 22 deletions(-) diff --git a/lib/isc/include/isc/task.h b/lib/isc/include/isc/task.h index b2d9c48384..1434c59d4d 100644 --- a/lib/isc/include/isc/task.h +++ b/lib/isc/include/isc/task.h @@ -87,6 +87,8 @@ #define ISC_TASKEVENT_TEST (ISC_EVENTCLASS_TASK + 1) #define ISC_TASKEVENT_LASTEVENT (ISC_EVENTCLASS_TASK + 65535) +#define ISC_TASKCPU_NONE -1 +#define ISC_TASKCPU_GREEDY -2 /***** ***** Tasks. *****/ diff --git a/lib/isc/task.c b/lib/isc/task.c index a51d3359ab..dd88712c46 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -145,6 +145,7 @@ struct isc__taskmgr { isc_mutex_t halt_lock; isc_condition_t halt_cond; unsigned int workers; + unsigned int greedy_worker; atomic_uint_fast32_t tasks_running; atomic_uint_fast32_t tasks_ready; atomic_uint_fast32_t curq; @@ -209,7 +210,7 @@ wake_all_queues(isc__taskmgr_t *manager); static inline void wake_all_queues(isc__taskmgr_t *manager) { - for (unsigned int i = 0; i < manager->workers; i++) { + for (unsigned int i = 0; i < manager->workers+1; i++) { LOCK(&manager->queues[i].lock); BROADCAST(&manager->queues[i].work_available); UNLOCK(&manager->queues[i].lock); @@ -249,7 +250,7 @@ isc_result_t isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum, isc_task_t **taskp) { - return (isc_task_create_bound(manager0, quantum, taskp, -1)); + return (isc_task_create_bound(manager0, quantum, taskp, ISC_TASKCPU_NONE)); } isc_result_t @@ -269,7 +270,7 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum, XTRACE("isc_task_create"); task->manager = manager; - if (threadid == -1) { + if (threadid == ISC_TASKCPU_NONE) { /* * Task is not pinned to a queue, it's threadid will be * choosen when first task will be sent to it - either @@ -277,6 +278,13 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum, */ task->bound = false; task->threadid = 0; + } else if (threadid == ISC_TASKCPU_GREEDY) { + /* + * Task is greedy - it will be pinned to a special + * queue for greedy tasks + */ + task->bound = true; + task->threadid = manager->greedy_worker; } else { /* * Task is pinned to a queue, it'll always be run @@ -494,12 +502,12 @@ task_send(isc__task_t *task, isc_event_t **eventp, int c) { void isc_task_send(isc_task_t *task0, isc_event_t **eventp) { - isc_task_sendto(task0, eventp, -1); + isc_task_sendto(task0, eventp, ISC_TASKCPU_NONE); } void isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) { - isc_task_sendtoanddetach(taskp, eventp, -1); + isc_task_sendtoanddetach(taskp, eventp, ISC_TASKCPU_NONE); } void @@ -517,11 +525,14 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) { /* If task is bound ignore provided cpu. */ if (task->bound) { c = task->threadid; - } else if (c < 0) { + c %= task->manager->workers; + } else if (c == ISC_TASKCPU_GREEDY) { + c = task->manager->greedy_worker; + } else if (c == ISC_TASKCPU_NONE) { c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed); + c %= task->manager->workers; } - c %= task->manager->workers; /* * We're trying hard to hold locks for as short a time as possible. @@ -569,11 +580,14 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) { if (task->bound) { c = task->threadid; - } else if (c < 0) { + c %= task->manager->workers; + } else if (c == ISC_TASKCPU_GREEDY) { + c = task->manager->greedy_worker; + } else if (c == ISC_TASKCPU_NONE) { c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed); + c %= task->manager->workers; } - c %= task->manager->workers; LOCK(&task->lock); idle1 = task_send(task, eventp, c); @@ -1279,7 +1293,7 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { { bool empty = true; unsigned int i; - for (i = 0; i < manager->workers && empty; i++) + for (i = 0; i < manager->workers+1 && empty; i++) { LOCK(&manager->queues[i].lock); empty &= empty_readyq(manager, i); @@ -1309,8 +1323,11 @@ WINAPI run(void *queuep) { isc__taskqueue_t *tq = queuep; isc__taskmgr_t *manager = tq->manager; - int threadid = tq->threadid; - isc_thread_setaffinity(threadid); + unsigned int threadid = tq->threadid; + /* greedy worker is not pinned */ + if (threadid != manager->greedy_worker) { + isc_thread_setaffinity(threadid); + } XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_STARTING, "starting")); @@ -1329,13 +1346,13 @@ run(void *queuep) { static void manager_free(isc__taskmgr_t *manager) { - for (unsigned int i = 0; i < manager->workers; i++) { + for (unsigned int i = 0; i < manager->workers+1; i++) { isc_mutex_destroy(&manager->queues[i].lock); } isc_mutex_destroy(&manager->lock); isc_mutex_destroy(&manager->halt_lock); isc_mem_put(manager->mctx, manager->queues, - manager->workers * sizeof(isc__taskqueue_t)); + (manager->workers+1) * sizeof(isc__taskqueue_t)); manager->common.impmagic = 0; manager->common.magic = 0; isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager)); @@ -1374,7 +1391,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, } manager->default_quantum = default_quantum; INIT_LIST(manager->tasks); - manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t)); + manager->queues = isc_mem_get(mctx, (workers+1) * sizeof(isc__taskqueue_t)); RUNTIME_CHECK(manager->queues != NULL); manager->tasks_running = 0; @@ -1392,24 +1409,31 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, /* * Start workers. */ - for (i = 0; i < workers; i++) { + for (i = 0; i < workers+1; i++) { + char name[16]; INIT_LIST(manager->queues[i].ready_tasks); INIT_LIST(manager->queues[i].ready_priority_tasks); isc_mutex_init(&manager->queues[i].lock); isc_condition_init(&manager->queues[i].work_available); + if (i == workers) { + snprintf(name, sizeof(name), "isc-greedywrkr"); + manager->greedy_worker = i; + } else { + snprintf(name, sizeof(name), "isc-worker%04u", i); + } manager->queues[i].manager = manager; manager->queues[i].threadid = i; RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i], &manager->queues[i].thread) == ISC_R_SUCCESS); - char name[16]; + snprintf(name, sizeof(name), "isc-worker%04u", i); - isc_thread_setname(manager->queues[i].thread, name); } + UNLOCK(&manager->lock); - isc_thread_setconcurrency(workers); + isc_thread_setconcurrency(workers+1); *managerp = (isc_taskmgr_t *)manager; @@ -1497,7 +1521,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { /* * Wait for all the worker threads to exit. */ - for (i = 0; i < manager->workers; i++) + for (i = 0; i < manager->workers+1; i++) (void)isc_thread_join(manager->queues[i].thread, NULL); manager_free(manager); @@ -1537,7 +1561,7 @@ isc__taskmgr_pause(isc_taskmgr_t *manager0) { } manager->pause_requested = true; - while (manager->halted < manager->workers) { + while (manager->halted < manager->workers+1) { wake_all_queues(manager); WAIT(&manager->halt_cond, &manager->halt_lock); } @@ -1611,7 +1635,7 @@ isc_task_beginexclusive(isc_task_t *task0) { LOCK(&manager->halt_lock); INSIST(!manager->exclusive_requested && !manager->pause_requested); manager->exclusive_requested = true; - while (manager->halted + 1 < manager->workers) { + while (manager->halted + 1 < manager->workers+1) { wake_all_queues(manager); WAIT(&manager->halt_cond, &manager->halt_lock); }