|
|
|
|
@@ -135,6 +135,7 @@ struct isc__taskqueue {
|
|
|
|
|
isc_thread_t thread;
|
|
|
|
|
unsigned int threadid;
|
|
|
|
|
unsigned int tasks_waiting;
|
|
|
|
|
bool running;
|
|
|
|
|
isc__taskmgr_t *manager;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
@@ -146,8 +147,6 @@ struct isc__taskmgr {
|
|
|
|
|
isc_mutex_t halt_lock;
|
|
|
|
|
isc_condition_t halt_cond;
|
|
|
|
|
unsigned int workers;
|
|
|
|
|
atomic_uint_fast32_t tasks_running;
|
|
|
|
|
atomic_uint_fast32_t tasks_ready;
|
|
|
|
|
atomic_uint_fast32_t curq;
|
|
|
|
|
isc__taskqueue_t *queues;
|
|
|
|
|
|
|
|
|
|
@@ -176,7 +175,17 @@ void
|
|
|
|
|
isc__taskmgr_resume(isc_taskmgr_t *manager0);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#define DEFAULT_QUANTUM 25
|
|
|
|
|
/*
|
|
|
|
|
* Unless specified otherwise when creating task we normally run
|
|
|
|
|
* DEFAULT_QUANTUM events from a task in a single worker loop.
|
|
|
|
|
* If there are more than CONGESTED_TASK_LIMIT tasks waiting in the same queue
|
|
|
|
|
* we switch to CONGESTED_QUANTUM tasks per loop.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
#define DEFAULT_QUANTUM 50
|
|
|
|
|
#define CONGESTED_QUANTUM 5
|
|
|
|
|
#define CONGESTED_TASK_LIMIT 2
|
|
|
|
|
|
|
|
|
|
#define FINISHED(m) ((m)->exiting && EMPTY((m)->tasks))
|
|
|
|
|
|
|
|
|
|
/*%
|
|
|
|
|
@@ -932,6 +941,7 @@ pop_readyq(isc__taskmgr_t *manager, int c) {
|
|
|
|
|
|
|
|
|
|
if (task != NULL) {
|
|
|
|
|
DEQUEUE(manager->queues[c].ready_tasks, task, ready_link);
|
|
|
|
|
INSIST(manager->queues[c].tasks_waiting > 0);
|
|
|
|
|
manager->queues[c].tasks_waiting--;
|
|
|
|
|
if (ISC_LINK_LINKED(task, ready_priority_link)) {
|
|
|
|
|
DEQUEUE(manager->queues[c].ready_priority_tasks, task,
|
|
|
|
|
@@ -956,8 +966,6 @@ push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
|
|
|
|
|
ENQUEUE(manager->queues[c].ready_priority_tasks, task,
|
|
|
|
|
ready_priority_link);
|
|
|
|
|
}
|
|
|
|
|
atomic_fetch_add_explicit(&manager->tasks_ready, 1,
|
|
|
|
|
memory_order_acquire);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
@@ -1096,8 +1104,17 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
|
|
|
|
|
|
|
|
|
task = pop_readyq(manager, threadid);
|
|
|
|
|
if (task != NULL) {
|
|
|
|
|
int quantum = (task->quantum > 0) ? task->quantum :
|
|
|
|
|
DEFAULT_QUANTUM;
|
|
|
|
|
unsigned int dispatch_count = 0;
|
|
|
|
|
unsigned int quantum = task->quantum;
|
|
|
|
|
if (quantum == 0) {
|
|
|
|
|
if (manager->queues[threadid].tasks_waiting
|
|
|
|
|
< CONGESTED_TASK_LIMIT)
|
|
|
|
|
{
|
|
|
|
|
quantum = DEFAULT_QUANTUM;
|
|
|
|
|
} else {
|
|
|
|
|
quantum = CONGESTED_QUANTUM;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
bool done = false;
|
|
|
|
|
bool requeue = false;
|
|
|
|
|
bool finished = false;
|
|
|
|
|
@@ -1110,12 +1127,8 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
|
|
|
|
* have a task to do. We must reacquire the queue
|
|
|
|
|
* lock before exiting the 'if (task != NULL)' block.
|
|
|
|
|
*/
|
|
|
|
|
manager->queues[threadid].running = true;
|
|
|
|
|
UNLOCK(&manager->queues[threadid].lock);
|
|
|
|
|
RUNTIME_CHECK(
|
|
|
|
|
atomic_fetch_sub_explicit(&manager->tasks_ready,
|
|
|
|
|
1, memory_order_release) > 0);
|
|
|
|
|
atomic_fetch_add_explicit(&manager->tasks_running, 1,
|
|
|
|
|
memory_order_acquire);
|
|
|
|
|
|
|
|
|
|
LOCK(&task->lock);
|
|
|
|
|
INSIST(task->state == task_state_ready);
|
|
|
|
|
@@ -1146,7 +1159,7 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
|
|
|
|
event);
|
|
|
|
|
LOCK(&task->lock);
|
|
|
|
|
}
|
|
|
|
|
quantum--;
|
|
|
|
|
dispatch_count++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (task->references == 0 &&
|
|
|
|
|
@@ -1204,7 +1217,7 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
|
|
|
|
} else
|
|
|
|
|
task->state = task_state_idle;
|
|
|
|
|
done = true;
|
|
|
|
|
} else if (quantum <= 0) {
|
|
|
|
|
} else if (dispatch_count >= quantum) {
|
|
|
|
|
/*
|
|
|
|
|
* Our quantum has expired, but
|
|
|
|
|
* there is more work to be done.
|
|
|
|
|
@@ -1229,10 +1242,8 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
|
|
|
|
if (finished)
|
|
|
|
|
task_finished(task);
|
|
|
|
|
|
|
|
|
|
RUNTIME_CHECK(
|
|
|
|
|
atomic_fetch_sub_explicit(&manager->tasks_running,
|
|
|
|
|
1, memory_order_release) > 0);
|
|
|
|
|
LOCK(&manager->queues[threadid].lock);
|
|
|
|
|
manager->queues[threadid].running = false;
|
|
|
|
|
if (requeue) {
|
|
|
|
|
/*
|
|
|
|
|
* We know we're awake, so we don't have
|
|
|
|
|
@@ -1259,13 +1270,14 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If we are in privileged execution mode and there are no
|
|
|
|
|
* tasks remaining on the current ready queue, then
|
|
|
|
|
* we're stuck. Automatically drop privileges at that
|
|
|
|
|
* point and continue with the regular ready queue.
|
|
|
|
|
* tasks remaining on the current ready queue, we need to check
|
|
|
|
|
* if maybe we need to drop from privileged to normal mode.
|
|
|
|
|
* This might seem too heavy with all the locking but
|
|
|
|
|
* privileged mode is used only during startup and dropped
|
|
|
|
|
* once - after that this code is never executed.
|
|
|
|
|
*/
|
|
|
|
|
if (manager->mode != isc_taskmgrmode_normal &&
|
|
|
|
|
atomic_load_explicit(&manager->tasks_running,
|
|
|
|
|
memory_order_acquire) == 0)
|
|
|
|
|
empty_readyq(manager, threadid))
|
|
|
|
|
{
|
|
|
|
|
UNLOCK(&manager->queues[threadid].lock);
|
|
|
|
|
LOCK(&manager->lock);
|
|
|
|
|
@@ -1276,16 +1288,14 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
|
|
|
|
* we'll end up in a deadlock over queue locks.
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
if (manager->mode != isc_taskmgrmode_normal &&
|
|
|
|
|
atomic_load_explicit(&manager->tasks_running,
|
|
|
|
|
memory_order_acquire) == 0)
|
|
|
|
|
{
|
|
|
|
|
if (manager->mode != isc_taskmgrmode_normal) {
|
|
|
|
|
bool empty = true;
|
|
|
|
|
unsigned int i;
|
|
|
|
|
for (i = 0; i < manager->workers && empty; i++)
|
|
|
|
|
{
|
|
|
|
|
LOCK(&manager->queues[i].lock);
|
|
|
|
|
empty &= empty_readyq(manager, i);
|
|
|
|
|
empty &= empty_readyq(manager, i) &&
|
|
|
|
|
!manager->queues[i].running;
|
|
|
|
|
UNLOCK(&manager->queues[i].lock);
|
|
|
|
|
}
|
|
|
|
|
if (empty) {
|
|
|
|
|
@@ -1376,8 +1386,6 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
|
|
|
|
manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
|
|
|
|
|
RUNTIME_CHECK(manager->queues != NULL);
|
|
|
|
|
|
|
|
|
|
manager->tasks_running = 0;
|
|
|
|
|
manager->tasks_ready = 0;
|
|
|
|
|
manager->curq = 0;
|
|
|
|
|
manager->exiting = false;
|
|
|
|
|
manager->excl = NULL;
|
|
|
|
|
@@ -1400,6 +1408,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
|
|
|
|
manager->queues[i].manager = manager;
|
|
|
|
|
manager->queues[i].threadid = i;
|
|
|
|
|
manager->queues[i].tasks_waiting = 0;
|
|
|
|
|
manager->queues[i].running = false;
|
|
|
|
|
RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i],
|
|
|
|
|
&manager->queues[i].thread)
|
|
|
|
|
== ISC_R_SUCCESS);
|
|
|
|
|
@@ -1708,16 +1717,6 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr0, xmlTextWriterPtr writer) {
|
|
|
|
|
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->workers));
|
|
|
|
|
TRY0(xmlTextWriterEndElement(writer)); /* worker-threads */
|
|
|
|
|
|
|
|
|
|
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running"));
|
|
|
|
|
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
|
|
|
|
|
(int) mgr->tasks_running));
|
|
|
|
|
TRY0(xmlTextWriterEndElement(writer)); /* tasks-running */
|
|
|
|
|
|
|
|
|
|
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-ready"));
|
|
|
|
|
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
|
|
|
|
|
(int) mgr->tasks_ready));
|
|
|
|
|
TRY0(xmlTextWriterEndElement(writer)); /* tasks-ready */
|
|
|
|
|
|
|
|
|
|
TRY0(xmlTextWriterEndElement(writer)); /* thread-model */
|
|
|
|
|
|
|
|
|
|
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks"));
|
|
|
|
|
@@ -1804,14 +1803,6 @@ isc_taskmgr_renderjson(isc_taskmgr_t *mgr0, json_object *tasks) {
|
|
|
|
|
CHECKMEM(obj);
|
|
|
|
|
json_object_object_add(tasks, "worker-threads", obj);
|
|
|
|
|
|
|
|
|
|
obj = json_object_new_int(mgr->tasks_running);
|
|
|
|
|
CHECKMEM(obj);
|
|
|
|
|
json_object_object_add(tasks, "tasks-running", obj);
|
|
|
|
|
|
|
|
|
|
obj = json_object_new_int(mgr->tasks_ready);
|
|
|
|
|
CHECKMEM(obj);
|
|
|
|
|
json_object_object_add(tasks, "tasks-ready", obj);
|
|
|
|
|
|
|
|
|
|
array = json_object_new_array();
|
|
|
|
|
CHECKMEM(array);
|
|
|
|
|
|
|
|
|
|
|