Compare commits

..

4 Commits

Author SHA1 Message Date
Witold Kręcicki
bbcb9d6669 Experiment: different task quantums 2018-11-23 15:24:55 +00:00
Witold Kręcicki
f0f73d8ad4 Experiment: 24 resolver tasks 2018-11-23 15:18:28 +00:00
Witold Kręcicki
4dd7e8c2fe Get rid of tasks-ready and tasks-running - tasks-ready was for statistics only,
we can work around not having tasks-running. This way there are virtually no
conflicting memory accesses between task manager threads.
2018-11-23 12:54:24 +00:00
Witold Kręcicki
91788ada31 Make task quantum dynamic: smaller if there are more tasks waiting. 2018-11-23 12:53:46 +00:00
3 changed files with 41 additions and 50 deletions

View File

@@ -64,9 +64,9 @@
#define MAX_RESTARTS 16
#ifdef TUNE_LARGE
#define RESOLVER_NTASKS 523
#define RESOLVER_NTASKS 24
#else
#define RESOLVER_NTASKS 31
#define RESOLVER_NTASKS 48
#endif /* TUNE_LARGE */
/*%

View File

@@ -2468,7 +2468,7 @@ dns_dispatch_createtcp(dns_dispatchmgr_t *mgr, isc_socket_t *sock,
disp->ntasks = 1;
disp->task[0] = NULL;
result = isc_task_create(taskmgr, 50, &disp->task[0]);
result = isc_task_create(taskmgr, 0, &disp->task[0]);
if (result != ISC_R_SUCCESS)
goto kill_socket;

View File

@@ -135,6 +135,7 @@ struct isc__taskqueue {
isc_thread_t thread;
unsigned int threadid;
unsigned int tasks_waiting;
bool running;
isc__taskmgr_t *manager;
};
@@ -146,8 +147,6 @@ struct isc__taskmgr {
isc_mutex_t halt_lock;
isc_condition_t halt_cond;
unsigned int workers;
atomic_uint_fast32_t tasks_running;
atomic_uint_fast32_t tasks_ready;
atomic_uint_fast32_t curq;
isc__taskqueue_t *queues;
@@ -176,7 +175,17 @@ void
isc__taskmgr_resume(isc_taskmgr_t *manager0);
#define DEFAULT_QUANTUM 25
/*
* Unless specified otherwise when creating task we normally run
* DEFAULT_QUANTUM events from a task in a single worker loop.
* If there are more than CONGESTED_TASK_LIMIT tasks waiting in the same queue
* we switch to CONGESTED_QUANTUM tasks per loop.
*/
#define DEFAULT_QUANTUM 50
#define CONGESTED_QUANTUM 5
#define CONGESTED_TASK_LIMIT 2
#define FINISHED(m) ((m)->exiting && EMPTY((m)->tasks))
/*%
@@ -932,6 +941,7 @@ pop_readyq(isc__taskmgr_t *manager, int c) {
if (task != NULL) {
DEQUEUE(manager->queues[c].ready_tasks, task, ready_link);
INSIST(manager->queues[c].tasks_waiting > 0);
manager->queues[c].tasks_waiting--;
if (ISC_LINK_LINKED(task, ready_priority_link)) {
DEQUEUE(manager->queues[c].ready_priority_tasks, task,
@@ -956,8 +966,6 @@ push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
ENQUEUE(manager->queues[c].ready_priority_tasks, task,
ready_priority_link);
}
atomic_fetch_add_explicit(&manager->tasks_ready, 1,
memory_order_acquire);
}
static void
@@ -1096,8 +1104,17 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
task = pop_readyq(manager, threadid);
if (task != NULL) {
int quantum = (task->quantum > 0) ? task->quantum :
DEFAULT_QUANTUM;
unsigned int dispatch_count = 0;
unsigned int quantum = task->quantum;
if (quantum == 0) {
if (manager->queues[threadid].tasks_waiting
< CONGESTED_TASK_LIMIT)
{
quantum = DEFAULT_QUANTUM;
} else {
quantum = CONGESTED_QUANTUM;
}
}
bool done = false;
bool requeue = false;
bool finished = false;
@@ -1110,12 +1127,8 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
* have a task to do. We must reacquire the queue
* lock before exiting the 'if (task != NULL)' block.
*/
manager->queues[threadid].running = true;
UNLOCK(&manager->queues[threadid].lock);
RUNTIME_CHECK(
atomic_fetch_sub_explicit(&manager->tasks_ready,
1, memory_order_release) > 0);
atomic_fetch_add_explicit(&manager->tasks_running, 1,
memory_order_acquire);
LOCK(&task->lock);
INSIST(task->state == task_state_ready);
@@ -1146,7 +1159,7 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
event);
LOCK(&task->lock);
}
quantum--;
dispatch_count++;
}
if (task->references == 0 &&
@@ -1204,7 +1217,7 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
} else
task->state = task_state_idle;
done = true;
} else if (quantum <= 0) {
} else if (dispatch_count >= quantum) {
/*
* Our quantum has expired, but
* there is more work to be done.
@@ -1229,10 +1242,8 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
if (finished)
task_finished(task);
RUNTIME_CHECK(
atomic_fetch_sub_explicit(&manager->tasks_running,
1, memory_order_release) > 0);
LOCK(&manager->queues[threadid].lock);
manager->queues[threadid].running = false;
if (requeue) {
/*
* We know we're awake, so we don't have
@@ -1259,13 +1270,14 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
/*
* If we are in privileged execution mode and there are no
* tasks remaining on the current ready queue, then
* we're stuck. Automatically drop privileges at that
* point and continue with the regular ready queue.
* tasks remaining on the current ready queue, we need to check
* if maybe we need to drop from privileged to normal mode.
* This might seem too heavy with all the locking but
* privileged mode is used only during startup and dropped
* once - after that this code is never executed.
*/
if (manager->mode != isc_taskmgrmode_normal &&
atomic_load_explicit(&manager->tasks_running,
memory_order_acquire) == 0)
empty_readyq(manager, threadid))
{
UNLOCK(&manager->queues[threadid].lock);
LOCK(&manager->lock);
@@ -1276,16 +1288,14 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
* we'll end up in a deadlock over queue locks.
*
*/
if (manager->mode != isc_taskmgrmode_normal &&
atomic_load_explicit(&manager->tasks_running,
memory_order_acquire) == 0)
{
if (manager->mode != isc_taskmgrmode_normal) {
bool empty = true;
unsigned int i;
for (i = 0; i < manager->workers && empty; i++)
{
LOCK(&manager->queues[i].lock);
empty &= empty_readyq(manager, i);
empty &= empty_readyq(manager, i) &&
!manager->queues[i].running;
UNLOCK(&manager->queues[i].lock);
}
if (empty) {
@@ -1376,8 +1386,6 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
RUNTIME_CHECK(manager->queues != NULL);
manager->tasks_running = 0;
manager->tasks_ready = 0;
manager->curq = 0;
manager->exiting = false;
manager->excl = NULL;
@@ -1400,6 +1408,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
manager->queues[i].manager = manager;
manager->queues[i].threadid = i;
manager->queues[i].tasks_waiting = 0;
manager->queues[i].running = false;
RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i],
&manager->queues[i].thread)
== ISC_R_SUCCESS);
@@ -1708,16 +1717,6 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr0, xmlTextWriterPtr writer) {
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->workers));
TRY0(xmlTextWriterEndElement(writer)); /* worker-threads */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running"));
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
(int) mgr->tasks_running));
TRY0(xmlTextWriterEndElement(writer)); /* tasks-running */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-ready"));
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
(int) mgr->tasks_ready));
TRY0(xmlTextWriterEndElement(writer)); /* tasks-ready */
TRY0(xmlTextWriterEndElement(writer)); /* thread-model */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks"));
@@ -1804,14 +1803,6 @@ isc_taskmgr_renderjson(isc_taskmgr_t *mgr0, json_object *tasks) {
CHECKMEM(obj);
json_object_object_add(tasks, "worker-threads", obj);
obj = json_object_new_int(mgr->tasks_running);
CHECKMEM(obj);
json_object_object_add(tasks, "tasks-running", obj);
obj = json_object_new_int(mgr->tasks_ready);
CHECKMEM(obj);
json_object_object_add(tasks, "tasks-ready", obj);
array = json_object_new_array();
CHECKMEM(array);