|
|
|
|
@@ -137,28 +137,25 @@ static void
|
|
|
|
|
async_cb(uv_async_t *handle);
|
|
|
|
|
static bool
|
|
|
|
|
process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
|
|
|
|
|
static bool
|
|
|
|
|
static isc_result_t
|
|
|
|
|
process_queue(isc__networker_t *worker, isc_queue_t *queue,
|
|
|
|
|
unsigned int *quantump);
|
|
|
|
|
atomic_uint_fast32_t *nqueue);
|
|
|
|
|
static void
|
|
|
|
|
wait_for_priority_queue(isc__networker_t *worker);
|
|
|
|
|
static bool
|
|
|
|
|
process_priority_queue(isc__networker_t *worker, unsigned int *quantump);
|
|
|
|
|
static bool
|
|
|
|
|
process_privilege_queue(isc__networker_t *worker, unsigned int *quantump);
|
|
|
|
|
static bool
|
|
|
|
|
process_task_queue(isc__networker_t *worker, unsigned int *quantump);
|
|
|
|
|
static bool
|
|
|
|
|
process_normal_queue(isc__networker_t *worker, unsigned int *quantump);
|
|
|
|
|
|
|
|
|
|
#define drain_priority_queue(worker) \
|
|
|
|
|
(void)process_priority_queue(worker, &(unsigned int){ UINT_MAX })
|
|
|
|
|
#define drain_privilege_queue(worker) \
|
|
|
|
|
(void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX })
|
|
|
|
|
#define drain_task_queue(worker) \
|
|
|
|
|
(void)process_task_queue(worker, &(unsigned int){ UINT_MAX })
|
|
|
|
|
#define drain_normal_queue(worker) \
|
|
|
|
|
(void)process_normal_queue(worker, &(unsigned int){ UINT_MAX })
|
|
|
|
|
static void
|
|
|
|
|
drain_priority_queue(isc__networker_t *worker);
|
|
|
|
|
static void
|
|
|
|
|
drain_privilege_queue(isc__networker_t *worker);
|
|
|
|
|
static void
|
|
|
|
|
drain_task_queue(isc__networker_t *worker);
|
|
|
|
|
static isc_result_t
|
|
|
|
|
process_priority_queue(isc__networker_t *worker);
|
|
|
|
|
static isc_result_t
|
|
|
|
|
process_privilege_queue(isc__networker_t *worker);
|
|
|
|
|
static isc_result_t
|
|
|
|
|
process_task_queue(isc__networker_t *worker);
|
|
|
|
|
static isc_result_t
|
|
|
|
|
process_normal_queue(isc__networker_t *worker);
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
|
|
|
|
|
@@ -283,7 +280,6 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
|
|
|
|
|
*worker = (isc__networker_t){
|
|
|
|
|
.mgr = mgr,
|
|
|
|
|
.id = i,
|
|
|
|
|
.quantum = ISC_NETMGR_QUANTUM_DEFAULT,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
r = uv_loop_init(&worker->loop);
|
|
|
|
|
@@ -735,20 +731,72 @@ nm_thread(isc_threadarg_t worker0) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool
|
|
|
|
|
process_all_queues(isc__networker_t *worker, unsigned int quantum) {
|
|
|
|
|
process_all_queues(isc__networker_t *worker) {
|
|
|
|
|
isc_result_t result = ISC_R_SUCCESS;
|
|
|
|
|
bool reschedule = false;
|
|
|
|
|
/*
|
|
|
|
|
* The queue processing functions will return false when the
|
|
|
|
|
* system is pausing or stopping, or if we have completed
|
|
|
|
|
* 'quantum' events.
|
|
|
|
|
*
|
|
|
|
|
* We don't want to proceed to a new queue until the previous one
|
|
|
|
|
* has been fully drained, so whenever one queue is interrupted,
|
|
|
|
|
* we skip all the later ones.
|
|
|
|
|
* system is pausing or stopping and we don't want to process
|
|
|
|
|
* the other queues in such case, but we need the async event
|
|
|
|
|
* to be rescheduled in the next uv_run().
|
|
|
|
|
*/
|
|
|
|
|
return (process_priority_queue(worker, &quantum) &&
|
|
|
|
|
process_privilege_queue(worker, &quantum) &&
|
|
|
|
|
process_task_queue(worker, &quantum) &&
|
|
|
|
|
process_normal_queue(worker, &quantum));
|
|
|
|
|
result = process_priority_queue(worker);
|
|
|
|
|
switch (result) {
|
|
|
|
|
case ISC_R_SUSPEND:
|
|
|
|
|
return (true);
|
|
|
|
|
case ISC_R_EMPTY:
|
|
|
|
|
/* empty queue */
|
|
|
|
|
break;
|
|
|
|
|
case ISC_R_SUCCESS:
|
|
|
|
|
reschedule = true;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
INSIST(0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result = process_privilege_queue(worker);
|
|
|
|
|
switch (result) {
|
|
|
|
|
case ISC_R_SUSPEND:
|
|
|
|
|
return (true);
|
|
|
|
|
case ISC_R_EMPTY:
|
|
|
|
|
/* empty queue */
|
|
|
|
|
break;
|
|
|
|
|
case ISC_R_SUCCESS:
|
|
|
|
|
reschedule = true;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
INSIST(0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result = process_task_queue(worker);
|
|
|
|
|
switch (result) {
|
|
|
|
|
case ISC_R_SUSPEND:
|
|
|
|
|
return (true);
|
|
|
|
|
case ISC_R_EMPTY:
|
|
|
|
|
/* empty queue */
|
|
|
|
|
break;
|
|
|
|
|
case ISC_R_SUCCESS:
|
|
|
|
|
reschedule = true;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
INSIST(0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result = process_normal_queue(worker);
|
|
|
|
|
switch (result) {
|
|
|
|
|
case ISC_R_SUSPEND:
|
|
|
|
|
return (true);
|
|
|
|
|
case ISC_R_EMPTY:
|
|
|
|
|
/* empty queue */
|
|
|
|
|
break;
|
|
|
|
|
case ISC_R_SUCCESS:
|
|
|
|
|
reschedule = true;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
INSIST(0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return (reschedule);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@@ -760,9 +808,8 @@ process_all_queues(isc__networker_t *worker, unsigned int quantum) {
|
|
|
|
|
static void
|
|
|
|
|
async_cb(uv_async_t *handle) {
|
|
|
|
|
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
|
|
|
|
|
unsigned int quantum = worker->quantum;
|
|
|
|
|
|
|
|
|
|
if (!process_all_queues(worker, quantum)) {
|
|
|
|
|
if (process_all_queues(worker)) {
|
|
|
|
|
/* If we didn't process all the events, we need to enqueue
|
|
|
|
|
* async_cb to be run in the next iteration of the uv_loop
|
|
|
|
|
*/
|
|
|
|
|
@@ -836,43 +883,65 @@ wait_for_priority_queue(isc__networker_t *worker) {
|
|
|
|
|
while (true) {
|
|
|
|
|
isc__netievent_t *ievent;
|
|
|
|
|
LOCK(&worker->lock);
|
|
|
|
|
ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
|
|
|
|
|
if (wait_for_work) {
|
|
|
|
|
while (ievent == NULL) {
|
|
|
|
|
while (true) {
|
|
|
|
|
ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
|
|
|
|
|
if (wait_for_work && ievent == NULL) {
|
|
|
|
|
WAIT(cond, &worker->lock);
|
|
|
|
|
ievent = (isc__netievent_t *)isc_queue_dequeue(
|
|
|
|
|
queue);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
UNLOCK(&worker->lock);
|
|
|
|
|
wait_for_work = false;
|
|
|
|
|
UNLOCK(&worker->lock);
|
|
|
|
|
|
|
|
|
|
if (ievent == NULL) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
atomic_fetch_sub_release(&worker->nievents_prio, 1);
|
|
|
|
|
|
|
|
|
|
(void)process_netievent(worker, ievent);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool
|
|
|
|
|
process_priority_queue(isc__networker_t *worker, unsigned int *quantump) {
|
|
|
|
|
return (process_queue(worker, worker->ievents_prio, quantump));
|
|
|
|
|
static void
|
|
|
|
|
drain_priority_queue(isc__networker_t *worker) {
|
|
|
|
|
while (process_priority_queue(worker) != ISC_R_EMPTY)
|
|
|
|
|
;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool
|
|
|
|
|
process_privilege_queue(isc__networker_t *worker, unsigned int *quantump) {
|
|
|
|
|
return (process_queue(worker, worker->ievents_priv, quantump));
|
|
|
|
|
static void
|
|
|
|
|
drain_privilege_queue(isc__networker_t *worker) {
|
|
|
|
|
while (process_privilege_queue(worker) != ISC_R_EMPTY)
|
|
|
|
|
;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool
|
|
|
|
|
process_task_queue(isc__networker_t *worker, unsigned int *quantump) {
|
|
|
|
|
return (process_queue(worker, worker->ievents_task, quantump));
|
|
|
|
|
static void
|
|
|
|
|
drain_task_queue(isc__networker_t *worker) {
|
|
|
|
|
while (process_task_queue(worker) != ISC_R_EMPTY)
|
|
|
|
|
;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool
|
|
|
|
|
process_normal_queue(isc__networker_t *worker, unsigned int *quantump) {
|
|
|
|
|
return (process_queue(worker, worker->ievents, quantump));
|
|
|
|
|
static isc_result_t
|
|
|
|
|
process_priority_queue(isc__networker_t *worker) {
|
|
|
|
|
return (process_queue(worker, worker->ievents_prio,
|
|
|
|
|
&worker->nievents_prio));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static isc_result_t
|
|
|
|
|
process_privilege_queue(isc__networker_t *worker) {
|
|
|
|
|
return (process_queue(worker, worker->ievents_priv,
|
|
|
|
|
&worker->nievents_priv));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static isc_result_t
|
|
|
|
|
process_task_queue(isc__networker_t *worker) {
|
|
|
|
|
return (process_queue(worker, worker->ievents_task,
|
|
|
|
|
&worker->nievents_task));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static isc_result_t
|
|
|
|
|
process_normal_queue(isc__networker_t *worker) {
|
|
|
|
|
return (process_queue(worker, worker->ievents, &worker->nievents));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@@ -973,28 +1042,51 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) {
|
|
|
|
|
return (true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool
|
|
|
|
|
/*
|
|
|
|
|
* The queue and nqueue is only loosely synchronized, it's possible that there's
|
|
|
|
|
* item
|
|
|
|
|
*/
|
|
|
|
|
static isc_result_t
|
|
|
|
|
process_queue(isc__networker_t *worker, isc_queue_t *queue,
|
|
|
|
|
unsigned int *quantump) {
|
|
|
|
|
while (*quantump > 0) {
|
|
|
|
|
isc__netievent_t *ievent =
|
|
|
|
|
(isc__netievent_t *)isc_queue_dequeue(queue);
|
|
|
|
|
atomic_uint_fast32_t *nqueue) {
|
|
|
|
|
/*
|
|
|
|
|
* The number of items on the queue is only loosely synchronized with
|
|
|
|
|
* the items on the queue. But there's a guarantee that if there's an
|
|
|
|
|
* item on the queue, it will be accounted for. However there's a
|
|
|
|
|
* possibility that the counter might be higher than the items on the
|
|
|
|
|
* queue stored.
|
|
|
|
|
*/
|
|
|
|
|
uint_fast32_t waiting = atomic_load_acquire(nqueue);
|
|
|
|
|
isc__netievent_t *ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
|
|
|
|
|
|
|
|
|
|
if (ievent == NULL) {
|
|
|
|
|
/* We fully drained this queue */
|
|
|
|
|
return (true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(*quantump)--;
|
|
|
|
|
|
|
|
|
|
if (!process_netievent(worker, ievent)) {
|
|
|
|
|
/* Netievent told us to stop */
|
|
|
|
|
return (false);
|
|
|
|
|
}
|
|
|
|
|
if (ievent == NULL && waiting == 0) {
|
|
|
|
|
/* There's nothing scheduled */
|
|
|
|
|
return (ISC_R_EMPTY);
|
|
|
|
|
} else if (ievent == NULL) {
|
|
|
|
|
/* There's at least one item scheduled, but not on the queue yet
|
|
|
|
|
*/
|
|
|
|
|
return (ISC_R_SUCCESS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* No more quantum */
|
|
|
|
|
return (false);
|
|
|
|
|
while (ievent != NULL) {
|
|
|
|
|
atomic_fetch_sub_release(nqueue, 1);
|
|
|
|
|
bool stop = !process_netievent(worker, ievent);
|
|
|
|
|
|
|
|
|
|
if (stop) {
|
|
|
|
|
/* Netievent told us to stop */
|
|
|
|
|
return (ISC_R_SUSPEND);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (waiting-- == 0) {
|
|
|
|
|
/* We reached this round "quota" */
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* We processed at least one */
|
|
|
|
|
return (ISC_R_SUCCESS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void *
|
|
|
|
|
@@ -1096,14 +1188,18 @@ isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
|
|
|
|
|
* the queue will be processed.
|
|
|
|
|
*/
|
|
|
|
|
LOCK(&worker->lock);
|
|
|
|
|
atomic_fetch_add_release(&worker->nievents_prio, 1);
|
|
|
|
|
isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event);
|
|
|
|
|
SIGNAL(&worker->cond_prio);
|
|
|
|
|
UNLOCK(&worker->lock);
|
|
|
|
|
} else if (event->type == netievent_privilegedtask) {
|
|
|
|
|
atomic_fetch_add_release(&worker->nievents_priv, 1);
|
|
|
|
|
isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event);
|
|
|
|
|
} else if (event->type == netievent_task) {
|
|
|
|
|
atomic_fetch_add_release(&worker->nievents_task, 1);
|
|
|
|
|
isc_queue_enqueue(worker->ievents_task, (uintptr_t)event);
|
|
|
|
|
} else {
|
|
|
|
|
atomic_fetch_add_release(&worker->nievents, 1);
|
|
|
|
|
isc_queue_enqueue(worker->ievents, (uintptr_t)event);
|
|
|
|
|
}
|
|
|
|
|
uv_async_send(&worker->async);
|
|
|
|
|
|