Compare commits

...

3 Commits

View File

@@ -152,6 +152,7 @@ struct isc__taskqueue {
isc_thread_t thread;
unsigned int threadid;
isc__taskmgr_t *manager;
bool bound;
};
struct isc__taskmgr {
@@ -161,7 +162,8 @@ struct isc__taskmgr {
isc_mutex_t lock;
isc_mutex_t halt_lock;
isc_condition_t halt_cond;
unsigned int workers;
unsigned int bound_workers;
unsigned int loose_workers;
atomic_uint_fast32_t tasks_running;
atomic_uint_fast32_t tasks_ready;
atomic_uint_fast32_t curq;
@@ -229,7 +231,9 @@ wake_all_queues(isc__taskmgr_t *manager);
static inline void
wake_all_queues(isc__taskmgr_t *manager) {
for (unsigned int i = 0; i < manager->workers; i++) {
for (unsigned int i = 0;
i < manager->bound_workers + manager->loose_workers; i++)
{
LOCK(&manager->queues[i].lock);
BROADCAST(&manager->queues[i].work_available);
UNLOCK(&manager->queues[i].lock);
@@ -301,7 +305,7 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
* by a specific thread.
*/
task->bound = true;
task->threadid = threadid % manager->workers;
task->threadid = threadid % manager->bound_workers;
}
isc_mutex_init(&task->lock);
@@ -543,11 +547,13 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
/* If task is bound ignore provided cpu. */
if (task->bound) {
c = task->threadid;
c %= task->manager->bound_workers;
} else if (c < 0) {
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
memory_order_relaxed);
c %= task->manager->loose_workers;
c += task->manager->bound_workers;
}
c %= task->manager->workers;
was_idle = task_send(task, eventp, c);
UNLOCK(&task->lock);
@@ -589,11 +595,13 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
LOCK(&task->lock);
if (task->bound) {
c = task->threadid;
c %= task->manager->bound_workers;
} else if (c < 0) {
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
memory_order_relaxed);
c %= task->manager->loose_workers;
c += task->manager->bound_workers;
}
c %= task->manager->workers;
idle1 = task_send(task, eventp, c);
idle2 = task_detach(task);
UNLOCK(&task->lock);
@@ -972,6 +980,7 @@ push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
static void
dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
isc__task_t *task;
isc__task_t *last_task = NULL;
REQUIRE(VALID_MANAGER(manager));
@@ -1117,7 +1126,14 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
memory_order_release) > 0);
atomic_fetch_add_explicit(&manager->tasks_running, 1,
memory_order_acquire);
if (!manager->queues[threadid].bound && task == last_task) {
/*
* XXXWPK don't let an unbound worker run one
* task continously. This is so, SO wrong...
*/
usleep(10000);
}
last_task = task;
LOCK(&task->lock);
/*
* It is possible because that we have a paused task
@@ -1305,8 +1321,11 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
{
bool empty = true;
unsigned int i;
for (i = 0; i < manager->workers && empty; i++)
{
for (i = 0;
i < manager->bound_workers +
manager->loose_workers &&
empty;
i++) {
LOCK(&manager->queues[i].lock);
empty &= empty_readyq(manager, i);
UNLOCK(&manager->queues[i].lock);
@@ -1337,7 +1356,9 @@ static isc_threadresult_t
isc__taskqueue_t *tq = queuep;
isc__taskmgr_t *manager = tq->manager;
int threadid = tq->threadid;
isc_thread_setaffinity(threadid);
if (tq->bound) {
isc_thread_setaffinity(threadid);
}
XTHREADTRACE("starting");
@@ -1354,13 +1375,16 @@ static isc_threadresult_t
static void
manager_free(isc__taskmgr_t *manager) {
for (unsigned int i = 0; i < manager->workers; i++) {
for (unsigned int i = 0;
i < manager->bound_workers + manager->loose_workers; i++)
{
isc_mutex_destroy(&manager->queues[i].lock);
}
isc_mutex_destroy(&manager->lock);
isc_mutex_destroy(&manager->halt_lock);
isc_mem_put(manager->mctx, manager->queues,
manager->workers * sizeof(isc__taskqueue_t));
(manager->bound_workers + manager->loose_workers) *
sizeof(isc__taskqueue_t));
manager->common.impmagic = 0;
manager->common.magic = 0;
isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
@@ -1391,7 +1415,8 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
isc_mutex_init(&manager->halt_lock);
isc_condition_init(&manager->halt_cond);
manager->workers = workers;
manager->bound_workers = workers;
manager->loose_workers = 2; /* XXXWPK choosen by a random dice roll */
if (default_quantum == 0) {
default_quantum = DEFAULT_DEFAULT_QUANTUM;
@@ -1404,7 +1429,9 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
INIT_LIST(manager->tasks);
atomic_store(&manager->tasks_count, 0);
manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
manager->queues = isc_mem_get(
mctx, (manager->bound_workers + manager->loose_workers) *
sizeof(isc__taskqueue_t));
RUNTIME_CHECK(manager->queues != NULL);
atomic_init(&manager->tasks_running, 0);
@@ -1420,7 +1447,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
/*
* Start workers.
*/
for (i = 0; i < workers; i++) {
for (i = 0; i < manager->bound_workers + manager->loose_workers; i++) {
INIT_LIST(manager->queues[i].ready_tasks);
INIT_LIST(manager->queues[i].ready_priority_tasks);
isc_mutex_init(&manager->queues[i].lock);
@@ -1428,15 +1455,21 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
manager->queues[i].manager = manager;
manager->queues[i].threadid = i;
manager->queues[i].bound = (i < manager->bound_workers);
isc_thread_create(run, &manager->queues[i],
&manager->queues[i].thread);
char name[21];
snprintf(name, sizeof(name), "isc-worker%04u", i);
if (manager->queues[i].bound) {
snprintf(name, sizeof(name), "isc-b-wrkr-%03u", i);
} else {
snprintf(name, sizeof(name), "isc-l-wrkr-%03u",
i - manager->bound_workers);
}
isc_thread_setname(manager->queues[i].thread, name);
}
UNLOCK(&manager->lock);
isc_thread_setconcurrency(workers);
isc_thread_setconcurrency(3 * workers);
*managerp = (isc_taskmgr_t *)manager;
@@ -1527,7 +1560,8 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
/*
* Wait for all the worker threads to exit.
*/
for (i = 0; i < manager->workers; i++) {
for (i = 0; i < (manager->bound_workers + manager->loose_workers); i++)
{
isc_thread_join(manager->queues[i].thread, NULL);
}
@@ -1571,7 +1605,8 @@ isc__taskmgr_pause(isc_taskmgr_t *manager0) {
}
atomic_store_relaxed(&manager->pause_req, true);
while (manager->halted < manager->workers) {
while (manager->halted <
(manager->bound_workers + manager->loose_workers)) {
wake_all_queues(manager);
WAIT(&manager->halt_cond, &manager->halt_lock);
}
@@ -1653,7 +1688,8 @@ isc_task_beginexclusive(isc_task_t *task0) {
INSIST(!atomic_load_relaxed(&manager->exclusive_req) &&
!atomic_load_relaxed(&manager->pause_req));
atomic_store_relaxed(&manager->exclusive_req, true);
while (manager->halted + 1 < manager->workers) {
while (manager->halted + 1 <
(manager->bound_workers + manager->loose_workers)) {
wake_all_queues(manager);
WAIT(&manager->halt_cond, &manager->halt_lock);
}
@@ -1820,7 +1856,7 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr0, void *writer0) {
TRY0(xmlTextWriterEndElement(writer)); /* type */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "worker-threads"));
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->workers));
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->bound_workers));
TRY0(xmlTextWriterEndElement(writer)); /* worker-threads */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum"));
@@ -1929,7 +1965,7 @@ isc_taskmgr_renderjson(isc_taskmgr_t *mgr0, void *tasks0) {
CHECKMEM(obj);
json_object_object_add(tasks, "thread-model", obj);
obj = json_object_new_int(mgr->workers);
obj = json_object_new_int(mgr->bound_workers);
CHECKMEM(obj);
json_object_object_add(tasks, "worker-threads", obj);