Compare commits

..

1 Commits

Author SHA1 Message Date
Witold Kręcicki
4fb9199fb3 PoC: a separate worker for greedy tasks in taskmgr 2018-11-26 13:23:36 +00:00
5 changed files with 50 additions and 91 deletions

View File

@@ -87,6 +87,8 @@
#define ISC_TASKEVENT_TEST (ISC_EVENTCLASS_TASK + 1)
#define ISC_TASKEVENT_LASTEVENT (ISC_EVENTCLASS_TASK + 65535)
#define ISC_TASKCPU_NONE -1
#define ISC_TASKCPU_GREEDY -2
/*****
***** Tasks.
*****/

View File

@@ -44,11 +44,9 @@
#if defined(_MSC_VER)
# include <intrin.h>
# define isc_rwlock_pause() YieldProcessor()
#elif defined(__x86_64__)
#elif defined(__x86_64__) || defined(__i386__)
# include <immintrin.h>
# define isc_rwlock_pause() _mm_pause()
#elif defined(__i386__)
# define isc_rwlock_pause() __asm__ __volatile__ ("rep; nop")
#elif defined(__ia64__)
# define isc_rwlock_pause() __asm__ __volatile__ ("hint @pause")
#elif defined(__arm__)

View File

@@ -145,6 +145,7 @@ struct isc__taskmgr {
isc_mutex_t halt_lock;
isc_condition_t halt_cond;
unsigned int workers;
unsigned int greedy_worker;
atomic_uint_fast32_t tasks_running;
atomic_uint_fast32_t tasks_ready;
atomic_uint_fast32_t curq;
@@ -209,7 +210,7 @@ wake_all_queues(isc__taskmgr_t *manager);
static inline void
wake_all_queues(isc__taskmgr_t *manager) {
for (unsigned int i = 0; i < manager->workers; i++) {
for (unsigned int i = 0; i < manager->workers+1; i++) {
LOCK(&manager->queues[i].lock);
BROADCAST(&manager->queues[i].work_available);
UNLOCK(&manager->queues[i].lock);
@@ -249,7 +250,7 @@ isc_result_t
isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum,
isc_task_t **taskp)
{
return (isc_task_create_bound(manager0, quantum, taskp, -1));
return (isc_task_create_bound(manager0, quantum, taskp, ISC_TASKCPU_NONE));
}
isc_result_t
@@ -269,7 +270,7 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
XTRACE("isc_task_create");
task->manager = manager;
if (threadid == -1) {
if (threadid == ISC_TASKCPU_NONE) {
/*
* Task is not pinned to a queue, it's threadid will be
* choosen when first task will be sent to it - either
@@ -277,6 +278,13 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
*/
task->bound = false;
task->threadid = 0;
} else if (threadid == ISC_TASKCPU_GREEDY) {
/*
* Task is greedy - it will be pinned to a special
* queue for greedy tasks
*/
task->bound = true;
task->threadid = manager->greedy_worker;
} else {
/*
* Task is pinned to a queue, it'll always be run
@@ -494,12 +502,12 @@ task_send(isc__task_t *task, isc_event_t **eventp, int c) {
void
isc_task_send(isc_task_t *task0, isc_event_t **eventp) {
isc_task_sendto(task0, eventp, -1);
isc_task_sendto(task0, eventp, ISC_TASKCPU_NONE);
}
void
isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
isc_task_sendtoanddetach(taskp, eventp, -1);
isc_task_sendtoanddetach(taskp, eventp, ISC_TASKCPU_NONE);
}
void
@@ -517,11 +525,14 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
/* If task is bound ignore provided cpu. */
if (task->bound) {
c = task->threadid;
} else if (c < 0) {
c %= task->manager->workers;
} else if (c == ISC_TASKCPU_GREEDY) {
c = task->manager->greedy_worker;
} else if (c == ISC_TASKCPU_NONE) {
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
memory_order_relaxed);
c %= task->manager->workers;
}
c %= task->manager->workers;
/*
* We're trying hard to hold locks for as short a time as possible.
@@ -569,11 +580,14 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
if (task->bound) {
c = task->threadid;
} else if (c < 0) {
c %= task->manager->workers;
} else if (c == ISC_TASKCPU_GREEDY) {
c = task->manager->greedy_worker;
} else if (c == ISC_TASKCPU_NONE) {
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
memory_order_relaxed);
c %= task->manager->workers;
}
c %= task->manager->workers;
LOCK(&task->lock);
idle1 = task_send(task, eventp, c);
@@ -1279,7 +1293,7 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
{
bool empty = true;
unsigned int i;
for (i = 0; i < manager->workers && empty; i++)
for (i = 0; i < manager->workers+1 && empty; i++)
{
LOCK(&manager->queues[i].lock);
empty &= empty_readyq(manager, i);
@@ -1309,8 +1323,11 @@ WINAPI
run(void *queuep) {
isc__taskqueue_t *tq = queuep;
isc__taskmgr_t *manager = tq->manager;
int threadid = tq->threadid;
isc_thread_setaffinity(threadid);
unsigned int threadid = tq->threadid;
/* greedy worker is not pinned */
if (threadid != manager->greedy_worker) {
isc_thread_setaffinity(threadid);
}
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_STARTING, "starting"));
@@ -1329,13 +1346,13 @@ run(void *queuep) {
static void
manager_free(isc__taskmgr_t *manager) {
for (unsigned int i = 0; i < manager->workers; i++) {
for (unsigned int i = 0; i < manager->workers+1; i++) {
isc_mutex_destroy(&manager->queues[i].lock);
}
isc_mutex_destroy(&manager->lock);
isc_mutex_destroy(&manager->halt_lock);
isc_mem_put(manager->mctx, manager->queues,
manager->workers * sizeof(isc__taskqueue_t));
(manager->workers+1) * sizeof(isc__taskqueue_t));
manager->common.impmagic = 0;
manager->common.magic = 0;
isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
@@ -1374,7 +1391,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
}
manager->default_quantum = default_quantum;
INIT_LIST(manager->tasks);
manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
manager->queues = isc_mem_get(mctx, (workers+1) * sizeof(isc__taskqueue_t));
RUNTIME_CHECK(manager->queues != NULL);
manager->tasks_running = 0;
@@ -1392,24 +1409,31 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
/*
* Start workers.
*/
for (i = 0; i < workers; i++) {
for (i = 0; i < workers+1; i++) {
char name[16];
INIT_LIST(manager->queues[i].ready_tasks);
INIT_LIST(manager->queues[i].ready_priority_tasks);
isc_mutex_init(&manager->queues[i].lock);
isc_condition_init(&manager->queues[i].work_available);
if (i == workers) {
snprintf(name, sizeof(name), "isc-greedywrkr");
manager->greedy_worker = i;
} else {
snprintf(name, sizeof(name), "isc-worker%04u", i);
}
manager->queues[i].manager = manager;
manager->queues[i].threadid = i;
RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i],
&manager->queues[i].thread)
== ISC_R_SUCCESS);
char name[16];
snprintf(name, sizeof(name), "isc-worker%04u", i);
isc_thread_setname(manager->queues[i].thread, name);
}
UNLOCK(&manager->lock);
isc_thread_setconcurrency(workers);
isc_thread_setconcurrency(workers+1);
*managerp = (isc_taskmgr_t *)manager;
@@ -1497,7 +1521,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
/*
* Wait for all the worker threads to exit.
*/
for (i = 0; i < manager->workers; i++)
for (i = 0; i < manager->workers+1; i++)
(void)isc_thread_join(manager->queues[i].thread, NULL);
manager_free(manager);
@@ -1537,7 +1561,7 @@ isc__taskmgr_pause(isc_taskmgr_t *manager0) {
}
manager->pause_requested = true;
while (manager->halted < manager->workers) {
while (manager->halted < manager->workers+1) {
wake_all_queues(manager);
WAIT(&manager->halt_cond, &manager->halt_lock);
}
@@ -1611,7 +1635,7 @@ isc_task_beginexclusive(isc_task_t *task0) {
LOCK(&manager->halt_lock);
INSIST(!manager->exclusive_requested && !manager->pause_requested);
manager->exclusive_requested = true;
while (manager->halted + 1 < manager->workers) {
while (manager->halted + 1 < manager->workers+1) {
wake_all_queues(manager);
WAIT(&manager->halt_cond, &manager->halt_lock);
}

View File

@@ -40,7 +40,7 @@ isc_condition_init(isc_condition_t *cond) {
char strbuf[ISC_STRERRORSIZE];
DWORD err = GetLastError();
strerror_r(err, strbuf, sizeof(strbuf));
isc_error_fatal(__FILE__, __LINE__,
isc_error_fatal(__FILE__, __LINE,
"CreateEvent failed: %s", strbuf);
}
cond->events[LSIGNAL] = h;

View File

@@ -1049,51 +1049,6 @@ ns_client_sendraw(ns_client_t *client, dns_message_t *message) {
ns_client_next(client, result);
}
#ifdef NS_CLIENT_MOCKREPLY
static isc_result_t
client_mockreply(ns_client_t *client, uint16_t id) {
isc_result_t result;
unsigned char *data;
isc_buffer_t buffer;
isc_buffer_t tcpbuffer;
isc_region_t r;
unsigned char sendbuf[SEND_BUFFER_SIZE];
result = client_allocsendbuf(client, &buffer, &tcpbuffer, 0,
sendbuf, &data);
if (result != ISC_R_SUCCESS) {
goto done;
}
isc_buffer_putuint16(&buffer, id);
isc_buffer_putuint8(&buffer, 0x81);
isc_buffer_putuint8(&buffer, 0x80);
isc_buffer_putuint16(&buffer, 0);
isc_buffer_putuint16(&buffer, 0);
isc_buffer_putuint16(&buffer, 0);
isc_buffer_putuint16(&buffer, 0);
if (client->sendcb != NULL) {
client->sendcb(&buffer);
} else if (TCP_CLIENT(client)) {
isc_buffer_usedregion(&buffer, &r);
isc_buffer_putuint16(&tcpbuffer, (uint16_t) r.length);
isc_buffer_add(&tcpbuffer, r.length);
result = client_sendpkg(client, &tcpbuffer);
} else {
result = client_sendpkg(client, &buffer);
}
if (result == ISC_R_SUCCESS) {
return (ISC_R_SUCCESS);
}
done:
if (client->tcpbuf != NULL) {
isc_mem_put(client->mctx, client->tcpbuf, TCP_BUFFER_SIZE);
client->tcpbuf = NULL;
}
ns_client_next(client, result);
return (ISC_R_SUCCESS);
}
#endif
static void
client_send(ns_client_t *client) {
isc_result_t result;
@@ -2380,26 +2335,6 @@ ns__client_request(isc_task_t *task, isc_event_t *event) {
return;
}
#ifdef NS_CLIENT_MOCKREPLY
result = dns_message_peekheader(buffer, &id, &flags);
if (result != ISC_R_SUCCESS) {
/*
* There isn't enough header to determine whether
* this was a request or a response. Drop it.
*/
ns_client_next(client, result);
return;
}
/*
* client_mockreply always return success, but we want to
* make the return conditional to fool compilers that the
* code below is reachable.
*/
if (client_mockreply(client, id) == ISC_R_SUCCESS) {
return;
}
#endif
isc_netaddr_fromsockaddr(&netaddr, &client->peeraddr);
#if NS_CLIENT_DROPPORT