Compare commits
3 Commits
v9.16.42
...
wpk/separa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a5f22579e | ||
|
|
cf875cc194 | ||
|
|
7fdeddf1da |
@@ -152,6 +152,7 @@ struct isc__taskqueue {
|
||||
isc_thread_t thread;
|
||||
unsigned int threadid;
|
||||
isc__taskmgr_t *manager;
|
||||
bool bound;
|
||||
};
|
||||
|
||||
struct isc__taskmgr {
|
||||
@@ -161,7 +162,8 @@ struct isc__taskmgr {
|
||||
isc_mutex_t lock;
|
||||
isc_mutex_t halt_lock;
|
||||
isc_condition_t halt_cond;
|
||||
unsigned int workers;
|
||||
unsigned int bound_workers;
|
||||
unsigned int loose_workers;
|
||||
atomic_uint_fast32_t tasks_running;
|
||||
atomic_uint_fast32_t tasks_ready;
|
||||
atomic_uint_fast32_t curq;
|
||||
@@ -229,7 +231,9 @@ wake_all_queues(isc__taskmgr_t *manager);
|
||||
|
||||
static inline void
|
||||
wake_all_queues(isc__taskmgr_t *manager) {
|
||||
for (unsigned int i = 0; i < manager->workers; i++) {
|
||||
for (unsigned int i = 0;
|
||||
i < manager->bound_workers + manager->loose_workers; i++)
|
||||
{
|
||||
LOCK(&manager->queues[i].lock);
|
||||
BROADCAST(&manager->queues[i].work_available);
|
||||
UNLOCK(&manager->queues[i].lock);
|
||||
@@ -301,7 +305,7 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
|
||||
* by a specific thread.
|
||||
*/
|
||||
task->bound = true;
|
||||
task->threadid = threadid % manager->workers;
|
||||
task->threadid = threadid % manager->bound_workers;
|
||||
}
|
||||
|
||||
isc_mutex_init(&task->lock);
|
||||
@@ -543,11 +547,13 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
|
||||
/* If task is bound ignore provided cpu. */
|
||||
if (task->bound) {
|
||||
c = task->threadid;
|
||||
c %= task->manager->bound_workers;
|
||||
} else if (c < 0) {
|
||||
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
|
||||
memory_order_relaxed);
|
||||
c %= task->manager->loose_workers;
|
||||
c += task->manager->bound_workers;
|
||||
}
|
||||
c %= task->manager->workers;
|
||||
was_idle = task_send(task, eventp, c);
|
||||
UNLOCK(&task->lock);
|
||||
|
||||
@@ -589,11 +595,13 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
|
||||
LOCK(&task->lock);
|
||||
if (task->bound) {
|
||||
c = task->threadid;
|
||||
c %= task->manager->bound_workers;
|
||||
} else if (c < 0) {
|
||||
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
|
||||
memory_order_relaxed);
|
||||
c %= task->manager->loose_workers;
|
||||
c += task->manager->bound_workers;
|
||||
}
|
||||
c %= task->manager->workers;
|
||||
idle1 = task_send(task, eventp, c);
|
||||
idle2 = task_detach(task);
|
||||
UNLOCK(&task->lock);
|
||||
@@ -972,6 +980,7 @@ push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
|
||||
static void
|
||||
dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
||||
isc__task_t *task;
|
||||
isc__task_t *last_task = NULL;
|
||||
|
||||
REQUIRE(VALID_MANAGER(manager));
|
||||
|
||||
@@ -1117,7 +1126,14 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
||||
memory_order_release) > 0);
|
||||
atomic_fetch_add_explicit(&manager->tasks_running, 1,
|
||||
memory_order_acquire);
|
||||
|
||||
if (!manager->queues[threadid].bound && task == last_task) {
|
||||
/*
|
||||
* XXXWPK don't let an unbound worker run one
|
||||
* task continously. This is so, SO wrong...
|
||||
*/
|
||||
usleep(10000);
|
||||
}
|
||||
last_task = task;
|
||||
LOCK(&task->lock);
|
||||
/*
|
||||
* It is possible because that we have a paused task
|
||||
@@ -1305,8 +1321,11 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
||||
{
|
||||
bool empty = true;
|
||||
unsigned int i;
|
||||
for (i = 0; i < manager->workers && empty; i++)
|
||||
{
|
||||
for (i = 0;
|
||||
i < manager->bound_workers +
|
||||
manager->loose_workers &&
|
||||
empty;
|
||||
i++) {
|
||||
LOCK(&manager->queues[i].lock);
|
||||
empty &= empty_readyq(manager, i);
|
||||
UNLOCK(&manager->queues[i].lock);
|
||||
@@ -1337,7 +1356,9 @@ static isc_threadresult_t
|
||||
isc__taskqueue_t *tq = queuep;
|
||||
isc__taskmgr_t *manager = tq->manager;
|
||||
int threadid = tq->threadid;
|
||||
isc_thread_setaffinity(threadid);
|
||||
if (tq->bound) {
|
||||
isc_thread_setaffinity(threadid);
|
||||
}
|
||||
|
||||
XTHREADTRACE("starting");
|
||||
|
||||
@@ -1354,13 +1375,16 @@ static isc_threadresult_t
|
||||
|
||||
static void
|
||||
manager_free(isc__taskmgr_t *manager) {
|
||||
for (unsigned int i = 0; i < manager->workers; i++) {
|
||||
for (unsigned int i = 0;
|
||||
i < manager->bound_workers + manager->loose_workers; i++)
|
||||
{
|
||||
isc_mutex_destroy(&manager->queues[i].lock);
|
||||
}
|
||||
isc_mutex_destroy(&manager->lock);
|
||||
isc_mutex_destroy(&manager->halt_lock);
|
||||
isc_mem_put(manager->mctx, manager->queues,
|
||||
manager->workers * sizeof(isc__taskqueue_t));
|
||||
(manager->bound_workers + manager->loose_workers) *
|
||||
sizeof(isc__taskqueue_t));
|
||||
manager->common.impmagic = 0;
|
||||
manager->common.magic = 0;
|
||||
isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
|
||||
@@ -1391,7 +1415,8 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
isc_mutex_init(&manager->halt_lock);
|
||||
isc_condition_init(&manager->halt_cond);
|
||||
|
||||
manager->workers = workers;
|
||||
manager->bound_workers = workers;
|
||||
manager->loose_workers = 2; /* XXXWPK choosen by a random dice roll */
|
||||
|
||||
if (default_quantum == 0) {
|
||||
default_quantum = DEFAULT_DEFAULT_QUANTUM;
|
||||
@@ -1404,7 +1429,9 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
|
||||
INIT_LIST(manager->tasks);
|
||||
atomic_store(&manager->tasks_count, 0);
|
||||
manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
|
||||
manager->queues = isc_mem_get(
|
||||
mctx, (manager->bound_workers + manager->loose_workers) *
|
||||
sizeof(isc__taskqueue_t));
|
||||
RUNTIME_CHECK(manager->queues != NULL);
|
||||
|
||||
atomic_init(&manager->tasks_running, 0);
|
||||
@@ -1420,7 +1447,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
/*
|
||||
* Start workers.
|
||||
*/
|
||||
for (i = 0; i < workers; i++) {
|
||||
for (i = 0; i < manager->bound_workers + manager->loose_workers; i++) {
|
||||
INIT_LIST(manager->queues[i].ready_tasks);
|
||||
INIT_LIST(manager->queues[i].ready_priority_tasks);
|
||||
isc_mutex_init(&manager->queues[i].lock);
|
||||
@@ -1428,15 +1455,21 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
|
||||
manager->queues[i].manager = manager;
|
||||
manager->queues[i].threadid = i;
|
||||
manager->queues[i].bound = (i < manager->bound_workers);
|
||||
isc_thread_create(run, &manager->queues[i],
|
||||
&manager->queues[i].thread);
|
||||
char name[21];
|
||||
snprintf(name, sizeof(name), "isc-worker%04u", i);
|
||||
if (manager->queues[i].bound) {
|
||||
snprintf(name, sizeof(name), "isc-b-wrkr-%03u", i);
|
||||
} else {
|
||||
snprintf(name, sizeof(name), "isc-l-wrkr-%03u",
|
||||
i - manager->bound_workers);
|
||||
}
|
||||
isc_thread_setname(manager->queues[i].thread, name);
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
|
||||
isc_thread_setconcurrency(workers);
|
||||
isc_thread_setconcurrency(3 * workers);
|
||||
|
||||
*managerp = (isc_taskmgr_t *)manager;
|
||||
|
||||
@@ -1527,7 +1560,8 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
|
||||
/*
|
||||
* Wait for all the worker threads to exit.
|
||||
*/
|
||||
for (i = 0; i < manager->workers; i++) {
|
||||
for (i = 0; i < (manager->bound_workers + manager->loose_workers); i++)
|
||||
{
|
||||
isc_thread_join(manager->queues[i].thread, NULL);
|
||||
}
|
||||
|
||||
@@ -1571,7 +1605,8 @@ isc__taskmgr_pause(isc_taskmgr_t *manager0) {
|
||||
}
|
||||
|
||||
atomic_store_relaxed(&manager->pause_req, true);
|
||||
while (manager->halted < manager->workers) {
|
||||
while (manager->halted <
|
||||
(manager->bound_workers + manager->loose_workers)) {
|
||||
wake_all_queues(manager);
|
||||
WAIT(&manager->halt_cond, &manager->halt_lock);
|
||||
}
|
||||
@@ -1653,7 +1688,8 @@ isc_task_beginexclusive(isc_task_t *task0) {
|
||||
INSIST(!atomic_load_relaxed(&manager->exclusive_req) &&
|
||||
!atomic_load_relaxed(&manager->pause_req));
|
||||
atomic_store_relaxed(&manager->exclusive_req, true);
|
||||
while (manager->halted + 1 < manager->workers) {
|
||||
while (manager->halted + 1 <
|
||||
(manager->bound_workers + manager->loose_workers)) {
|
||||
wake_all_queues(manager);
|
||||
WAIT(&manager->halt_cond, &manager->halt_lock);
|
||||
}
|
||||
@@ -1820,7 +1856,7 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr0, void *writer0) {
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* type */
|
||||
|
||||
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "worker-threads"));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->workers));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->bound_workers));
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* worker-threads */
|
||||
|
||||
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum"));
|
||||
@@ -1929,7 +1965,7 @@ isc_taskmgr_renderjson(isc_taskmgr_t *mgr0, void *tasks0) {
|
||||
CHECKMEM(obj);
|
||||
json_object_object_add(tasks, "thread-model", obj);
|
||||
|
||||
obj = json_object_new_int(mgr->workers);
|
||||
obj = json_object_new_int(mgr->bound_workers);
|
||||
CHECKMEM(obj);
|
||||
json_object_object_add(tasks, "worker-threads", obj);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user