Compare commits
1 Commits
v9.16.17
...
wpk/test-t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
04d46e0693 |
@@ -1486,7 +1486,7 @@ dns_rpz_new_zones(dns_rpz_zones_t **rpzsp, char *rps_cstr, size_t rps_cstr_size,
|
||||
goto cleanup_rbt;
|
||||
}
|
||||
|
||||
result = isc_task_create(taskmgr, 0, &zones->updater);
|
||||
result = isc_task_create(taskmgr, 5, &zones->updater);
|
||||
if (result != ISC_R_SUCCESS) {
|
||||
goto cleanup_task;
|
||||
}
|
||||
|
||||
@@ -136,7 +136,9 @@ typedef enum isc__netievent_type {
|
||||
netievent_tcpaccept,
|
||||
netievent_tcpstop,
|
||||
netievent_tcpclose,
|
||||
|
||||
netievent_tcpdnsclose,
|
||||
netievent_tcpdnssend,
|
||||
|
||||
netievent_closecb,
|
||||
netievent_shutdown,
|
||||
@@ -227,6 +229,7 @@ typedef struct isc__netievent__socket_req {
|
||||
typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t;
|
||||
typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t;
|
||||
typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t;
|
||||
typedef isc__netievent__socket_req_t isc__netievent_tcpdnssend_t;
|
||||
|
||||
typedef struct isc__netievent__socket_streaminfo_quota {
|
||||
isc__netievent_type type;
|
||||
@@ -746,6 +749,9 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock);
|
||||
void
|
||||
isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0);
|
||||
|
||||
void
|
||||
isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0);
|
||||
|
||||
#define isc__nm_uverr2result(x) \
|
||||
isc___nm_uverr2result(x, true, __FILE__, __LINE__)
|
||||
isc_result_t
|
||||
|
||||
@@ -621,6 +621,9 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
|
||||
case netievent_tcpsend:
|
||||
isc__nm_async_tcpsend(worker, ievent);
|
||||
break;
|
||||
case netievent_tcpdnssend:
|
||||
isc__nm_async_tcpdnssend(worker, ievent);
|
||||
break;
|
||||
case netievent_tcpstop:
|
||||
isc__nm_async_tcpstop(worker, ievent);
|
||||
break;
|
||||
@@ -827,10 +830,13 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) {
|
||||
if (active_handles == 0 || sock->tcphandle != NULL) {
|
||||
destroy = true;
|
||||
}
|
||||
UNLOCK(&sock->lock);
|
||||
|
||||
if (destroy) {
|
||||
atomic_store(&sock->destroying, true);
|
||||
UNLOCK(&sock->lock);
|
||||
nmsocket_cleanup(sock, true);
|
||||
} else {
|
||||
UNLOCK(&sock->lock);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -365,15 +365,6 @@ isc_nm_tcpdns_keepalive(isc_nmhandle_t *handle) {
|
||||
atomic_store(&handle->sock->outer->keepalive, true);
|
||||
}
|
||||
|
||||
typedef struct tcpsend {
|
||||
isc_mem_t *mctx;
|
||||
isc_nmhandle_t *handle;
|
||||
isc_region_t region;
|
||||
isc_nmhandle_t *orighandle;
|
||||
isc_nm_cb_t cb;
|
||||
void *cbarg;
|
||||
} tcpsend_t;
|
||||
|
||||
static void
|
||||
resume_processing(void *arg) {
|
||||
isc_nmsocket_t *sock = (isc_nmsocket_t *)arg;
|
||||
@@ -445,15 +436,40 @@ resume_processing(void *arg) {
|
||||
|
||||
static void
|
||||
tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
|
||||
tcpsend_t *ts = (tcpsend_t *)cbarg;
|
||||
isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)cbarg;
|
||||
|
||||
UNUSED(handle);
|
||||
|
||||
ts->cb(ts->orighandle, result, ts->cbarg);
|
||||
isc_mem_put(ts->mctx, ts->region.base, ts->region.length);
|
||||
req->cb.send(req->handle, result, req->cbarg);
|
||||
isc_mem_put(req->sock->mgr->mctx, req->uvbuf.base, req->uvbuf.len);
|
||||
isc__nm_uvreq_put(&req, req->handle->sock);
|
||||
}
|
||||
|
||||
isc_nmhandle_unref(ts->orighandle);
|
||||
isc_mem_putanddetach(&ts->mctx, ts, sizeof(*ts));
|
||||
void
|
||||
isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) {
|
||||
isc_result_t result;
|
||||
isc__netievent_tcpdnssend_t *ievent =
|
||||
(isc__netievent_tcpdnssend_t *)ev0;
|
||||
isc__nm_uvreq_t *req = ievent->req;
|
||||
isc_nmsocket_t *sock = ievent->sock;
|
||||
|
||||
REQUIRE(worker->id == sock->tid);
|
||||
|
||||
result = ISC_R_NOTCONNECTED;
|
||||
if (atomic_load(&sock->active)) {
|
||||
isc_region_t r;
|
||||
|
||||
r.base = (unsigned char *)req->uvbuf.base;
|
||||
r.length = req->uvbuf.len;
|
||||
result = isc__nm_tcp_send(sock->outer->tcphandle, &r,
|
||||
tcpdnssend_cb, req);
|
||||
}
|
||||
|
||||
if (result != ISC_R_SUCCESS) {
|
||||
req->cb.send(req->handle, result, req->cbarg);
|
||||
isc_mem_put(sock->mgr->mctx, req->uvbuf.base, req->uvbuf.len);
|
||||
isc__nm_uvreq_put(&req, sock);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -462,7 +478,7 @@ tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
|
||||
isc_result_t
|
||||
isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
|
||||
isc_nm_cb_t cb, void *cbarg) {
|
||||
tcpsend_t *t = NULL;
|
||||
isc__nm_uvreq_t *uvreq = NULL;
|
||||
|
||||
REQUIRE(VALID_NMHANDLE(handle));
|
||||
|
||||
@@ -471,36 +487,47 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
|
||||
REQUIRE(VALID_NMSOCK(sock));
|
||||
REQUIRE(sock->type == isc_nm_tcpdnssocket);
|
||||
|
||||
if (sock->outer == NULL) {
|
||||
/* The socket is closed */
|
||||
return (ISC_R_NOTCONNECTED);
|
||||
uvreq = isc__nm_uvreq_get(sock->mgr, sock);
|
||||
uvreq->handle = handle;
|
||||
isc_nmhandle_ref(uvreq->handle);
|
||||
uvreq->cb.send = cb;
|
||||
uvreq->cbarg = cbarg;
|
||||
|
||||
uvreq->uvbuf.base = isc_mem_get(sock->mgr->mctx, region->length + 2);
|
||||
uvreq->uvbuf.len = region->length + 2;
|
||||
*(uint16_t *)uvreq->uvbuf.base = htons(region->length);
|
||||
memmove(uvreq->uvbuf.base + 2, region->base, region->length);
|
||||
|
||||
if (sock->tid == isc_nm_tid()) {
|
||||
isc_region_t r;
|
||||
|
||||
r.base = (unsigned char *)uvreq->uvbuf.base;
|
||||
r.length = uvreq->uvbuf.len;
|
||||
|
||||
return (isc__nm_tcp_send(sock->outer->tcphandle, &r,
|
||||
tcpdnssend_cb, uvreq));
|
||||
} else {
|
||||
isc__netievent_tcpdnssend_t *ievent = NULL;
|
||||
|
||||
ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpdnssend);
|
||||
ievent->req = uvreq;
|
||||
ievent->sock = sock;
|
||||
|
||||
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
|
||||
(isc__netievent_t *)ievent);
|
||||
|
||||
return (ISC_R_SUCCESS);
|
||||
}
|
||||
|
||||
t = isc_mem_get(sock->mgr->mctx, sizeof(*t));
|
||||
*t = (tcpsend_t){
|
||||
.cb = cb,
|
||||
.cbarg = cbarg,
|
||||
.handle = handle->sock->outer->tcphandle,
|
||||
};
|
||||
|
||||
isc_mem_attach(sock->mgr->mctx, &t->mctx);
|
||||
t->orighandle = handle;
|
||||
isc_nmhandle_ref(t->orighandle);
|
||||
|
||||
t->region = (isc_region_t){ .base = isc_mem_get(t->mctx,
|
||||
region->length + 2),
|
||||
.length = region->length + 2 };
|
||||
|
||||
*(uint16_t *)t->region.base = htons(region->length);
|
||||
memmove(t->region.base + 2, region->base, region->length);
|
||||
|
||||
return (isc_nm_send(t->handle, &t->region, tcpdnssend_cb, t));
|
||||
return (ISC_R_UNEXPECTED);
|
||||
}
|
||||
|
||||
static void
|
||||
tcpdns_close_direct(isc_nmsocket_t *sock) {
|
||||
REQUIRE(sock->tid == isc_nm_tid());
|
||||
|
||||
/* We don't need atomics here, it's all in single network thread */
|
||||
|
||||
if (sock->timer_initialized) {
|
||||
/*
|
||||
* We need to fire the timer callback to clean it up,
|
||||
|
||||
@@ -209,19 +209,6 @@ static void
|
||||
stoplistening(isc_nmsocket_t *sock) {
|
||||
REQUIRE(sock->type == isc_nm_udplistener);
|
||||
|
||||
/*
|
||||
* Socket is already closing; there's nothing to do.
|
||||
*/
|
||||
if (!isc__nmsocket_active(sock)) {
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Mark it inactive now so that all sends will be ignored
|
||||
* and we won't try to stop listening again.
|
||||
*/
|
||||
atomic_store(&sock->active, false);
|
||||
|
||||
for (int i = 0; i < sock->nchildren; i++) {
|
||||
isc__netievent_udpstop_t *event = NULL;
|
||||
|
||||
@@ -255,6 +242,18 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) {
|
||||
REQUIRE(VALID_NMSOCK(sock));
|
||||
REQUIRE(sock->type == isc_nm_udplistener);
|
||||
|
||||
/*
|
||||
* Socket is already closing; there's nothing to do.
|
||||
*/
|
||||
if (!isc__nmsocket_active(sock)) {
|
||||
return;
|
||||
}
|
||||
/*
|
||||
* Mark it inactive now so that all sends will be ignored
|
||||
* and we won't try to stop listening again.
|
||||
*/
|
||||
atomic_store(&sock->active, false);
|
||||
|
||||
/*
|
||||
* If the manager is interlocked, re-enqueue this as an asynchronous
|
||||
* event. Otherwise, go ahead and stop listening right away.
|
||||
@@ -330,24 +329,23 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
|
||||
#endif
|
||||
|
||||
/*
|
||||
* If addr == NULL that's the end of stream - we can
|
||||
* free the buffer and bail.
|
||||
* Three reasons to return now without processing
|
||||
* - If addr == NULL that's the end of stream - we can
|
||||
* free the buffer and bail.
|
||||
* - Simulate a firewall blocking UDP packets bigger than
|
||||
* 'maxudp' bytes.
|
||||
* - Socket is no longer active.
|
||||
*/
|
||||
if (addr == NULL) {
|
||||
maxudp = atomic_load(&sock->mgr->maxudp);
|
||||
if ((addr == NULL) ||
|
||||
(maxudp != 0 && (uint32_t)nrecv > maxudp) ||
|
||||
(!isc__nmsocket_active(sock))) {
|
||||
if (free_buf) {
|
||||
isc__nm_free_uvbuf(sock, buf);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Simulate a firewall blocking UDP packets bigger than
|
||||
* 'maxudp' bytes.
|
||||
*/
|
||||
maxudp = atomic_load(&sock->mgr->maxudp);
|
||||
if (maxudp != 0 && (uint32_t)nrecv > maxudp) {
|
||||
return;
|
||||
}
|
||||
|
||||
result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
|
||||
RUNTIME_CHECK(result == ISC_R_SUCCESS);
|
||||
@@ -509,6 +507,9 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
|
||||
REQUIRE(sock->tid == isc_nm_tid());
|
||||
REQUIRE(sock->type == isc_nm_udpsocket);
|
||||
|
||||
if (!isc__nmsocket_active(sock)) {
|
||||
return (ISC_R_CANCELED);
|
||||
}
|
||||
isc_nmhandle_ref(req->handle);
|
||||
rv = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp,
|
||||
&req->uvbuf, 1, &peer->type.sa, udp_send_cb);
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
*/
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <isc/app.h>
|
||||
#include <isc/atomic.h>
|
||||
@@ -152,6 +153,7 @@ struct isc__taskqueue {
|
||||
isc_thread_t thread;
|
||||
unsigned int threadid;
|
||||
isc__taskmgr_t *manager;
|
||||
bool bound;
|
||||
};
|
||||
|
||||
struct isc__taskmgr {
|
||||
@@ -161,7 +163,8 @@ struct isc__taskmgr {
|
||||
isc_mutex_t lock;
|
||||
isc_mutex_t halt_lock;
|
||||
isc_condition_t halt_cond;
|
||||
unsigned int workers;
|
||||
unsigned int bound_workers;
|
||||
unsigned int loose_workers;
|
||||
atomic_uint_fast32_t tasks_running;
|
||||
atomic_uint_fast32_t tasks_ready;
|
||||
atomic_uint_fast32_t curq;
|
||||
@@ -229,7 +232,9 @@ wake_all_queues(isc__taskmgr_t *manager);
|
||||
|
||||
static inline void
|
||||
wake_all_queues(isc__taskmgr_t *manager) {
|
||||
for (unsigned int i = 0; i < manager->workers; i++) {
|
||||
for (unsigned int i = 0;
|
||||
i < manager->bound_workers + manager->loose_workers; i++)
|
||||
{
|
||||
LOCK(&manager->queues[i].lock);
|
||||
BROADCAST(&manager->queues[i].work_available);
|
||||
UNLOCK(&manager->queues[i].lock);
|
||||
@@ -301,7 +306,7 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
|
||||
* by a specific thread.
|
||||
*/
|
||||
task->bound = true;
|
||||
task->threadid = threadid % manager->workers;
|
||||
task->threadid = threadid % manager->bound_workers;
|
||||
}
|
||||
|
||||
isc_mutex_init(&task->lock);
|
||||
@@ -543,11 +548,13 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
|
||||
/* If task is bound ignore provided cpu. */
|
||||
if (task->bound) {
|
||||
c = task->threadid;
|
||||
c %= task->manager->bound_workers;
|
||||
} else if (c < 0) {
|
||||
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
|
||||
memory_order_relaxed);
|
||||
c %= task->manager->loose_workers;
|
||||
c += task->manager->bound_workers;
|
||||
}
|
||||
c %= task->manager->workers;
|
||||
was_idle = task_send(task, eventp, c);
|
||||
UNLOCK(&task->lock);
|
||||
|
||||
@@ -589,11 +596,13 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
|
||||
LOCK(&task->lock);
|
||||
if (task->bound) {
|
||||
c = task->threadid;
|
||||
c %= task->manager->bound_workers;
|
||||
} else if (c < 0) {
|
||||
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
|
||||
memory_order_relaxed);
|
||||
c %= task->manager->loose_workers;
|
||||
c += task->manager->bound_workers;
|
||||
}
|
||||
c %= task->manager->workers;
|
||||
idle1 = task_send(task, eventp, c);
|
||||
idle2 = task_detach(task);
|
||||
UNLOCK(&task->lock);
|
||||
@@ -972,6 +981,7 @@ push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
|
||||
static void
|
||||
dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
||||
isc__task_t *task;
|
||||
isc__task_t *last_task = NULL;
|
||||
|
||||
REQUIRE(VALID_MANAGER(manager));
|
||||
|
||||
@@ -1117,7 +1127,14 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
||||
memory_order_release) > 0);
|
||||
atomic_fetch_add_explicit(&manager->tasks_running, 1,
|
||||
memory_order_acquire);
|
||||
|
||||
if (!manager->queues[threadid].bound && task == last_task) {
|
||||
/*
|
||||
* XXXWPK don't let an unbound worker run one
|
||||
* task continously. This is just an experiment...
|
||||
*/
|
||||
usleep(100);
|
||||
}
|
||||
last_task = task;
|
||||
LOCK(&task->lock);
|
||||
/*
|
||||
* It is possible because that we have a paused task
|
||||
@@ -1305,8 +1322,11 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
||||
{
|
||||
bool empty = true;
|
||||
unsigned int i;
|
||||
for (i = 0; i < manager->workers && empty; i++)
|
||||
{
|
||||
for (i = 0;
|
||||
i < manager->bound_workers +
|
||||
manager->loose_workers &&
|
||||
empty;
|
||||
i++) {
|
||||
LOCK(&manager->queues[i].lock);
|
||||
empty &= empty_readyq(manager, i);
|
||||
UNLOCK(&manager->queues[i].lock);
|
||||
@@ -1337,7 +1357,9 @@ static isc_threadresult_t
|
||||
isc__taskqueue_t *tq = queuep;
|
||||
isc__taskmgr_t *manager = tq->manager;
|
||||
int threadid = tq->threadid;
|
||||
isc_thread_setaffinity(threadid);
|
||||
if (tq->bound) {
|
||||
isc_thread_setaffinity(threadid);
|
||||
}
|
||||
|
||||
XTHREADTRACE("starting");
|
||||
|
||||
@@ -1354,7 +1376,9 @@ static isc_threadresult_t
|
||||
|
||||
static void
|
||||
manager_free(isc__taskmgr_t *manager) {
|
||||
for (unsigned int i = 0; i < manager->workers; i++) {
|
||||
for (unsigned int i = 0;
|
||||
i < manager->bound_workers + manager->loose_workers; i++)
|
||||
{
|
||||
isc_mutex_destroy(&manager->queues[i].lock);
|
||||
isc_condition_destroy(&manager->queues[i].work_available);
|
||||
}
|
||||
@@ -1363,7 +1387,8 @@ manager_free(isc__taskmgr_t *manager) {
|
||||
isc_mutex_destroy(&manager->halt_lock);
|
||||
isc_condition_destroy(&manager->halt_cond);
|
||||
isc_mem_put(manager->mctx, manager->queues,
|
||||
manager->workers * sizeof(isc__taskqueue_t));
|
||||
(manager->bound_workers + manager->loose_workers) *
|
||||
sizeof(isc__taskqueue_t));
|
||||
manager->common.impmagic = 0;
|
||||
manager->common.magic = 0;
|
||||
isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
|
||||
@@ -1394,7 +1419,8 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
isc_mutex_init(&manager->halt_lock);
|
||||
isc_condition_init(&manager->halt_cond);
|
||||
|
||||
manager->workers = workers;
|
||||
manager->bound_workers = workers;
|
||||
manager->loose_workers = 2; /* XXXWPK choosen by a random dice roll */
|
||||
|
||||
if (default_quantum == 0) {
|
||||
default_quantum = DEFAULT_DEFAULT_QUANTUM;
|
||||
@@ -1407,7 +1433,9 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
|
||||
INIT_LIST(manager->tasks);
|
||||
atomic_store(&manager->tasks_count, 0);
|
||||
manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
|
||||
manager->queues = isc_mem_get(
|
||||
mctx, (manager->bound_workers + manager->loose_workers) *
|
||||
sizeof(isc__taskqueue_t));
|
||||
RUNTIME_CHECK(manager->queues != NULL);
|
||||
|
||||
atomic_init(&manager->tasks_running, 0);
|
||||
@@ -1423,7 +1451,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
/*
|
||||
* Start workers.
|
||||
*/
|
||||
for (i = 0; i < workers; i++) {
|
||||
for (i = 0; i < manager->bound_workers + manager->loose_workers; i++) {
|
||||
INIT_LIST(manager->queues[i].ready_tasks);
|
||||
INIT_LIST(manager->queues[i].ready_priority_tasks);
|
||||
isc_mutex_init(&manager->queues[i].lock);
|
||||
@@ -1431,15 +1459,21 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
|
||||
manager->queues[i].manager = manager;
|
||||
manager->queues[i].threadid = i;
|
||||
manager->queues[i].bound = (i < manager->bound_workers);
|
||||
isc_thread_create(run, &manager->queues[i],
|
||||
&manager->queues[i].thread);
|
||||
char name[21];
|
||||
snprintf(name, sizeof(name), "isc-worker%04u", i);
|
||||
if (manager->queues[i].bound) {
|
||||
snprintf(name, sizeof(name), "isc-b-wrkr-%03u", i);
|
||||
} else {
|
||||
snprintf(name, sizeof(name), "isc-l-wrkr-%03u",
|
||||
i - manager->bound_workers);
|
||||
}
|
||||
isc_thread_setname(manager->queues[i].thread, name);
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
|
||||
isc_thread_setconcurrency(workers);
|
||||
isc_thread_setconcurrency(3 * workers);
|
||||
|
||||
*managerp = (isc_taskmgr_t *)manager;
|
||||
|
||||
@@ -1530,7 +1564,8 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
|
||||
/*
|
||||
* Wait for all the worker threads to exit.
|
||||
*/
|
||||
for (i = 0; i < manager->workers; i++) {
|
||||
for (i = 0; i < (manager->bound_workers + manager->loose_workers); i++)
|
||||
{
|
||||
isc_thread_join(manager->queues[i].thread, NULL);
|
||||
}
|
||||
|
||||
@@ -1574,7 +1609,8 @@ isc__taskmgr_pause(isc_taskmgr_t *manager0) {
|
||||
}
|
||||
|
||||
atomic_store_relaxed(&manager->pause_req, true);
|
||||
while (manager->halted < manager->workers) {
|
||||
while (manager->halted <
|
||||
(manager->bound_workers + manager->loose_workers)) {
|
||||
wake_all_queues(manager);
|
||||
WAIT(&manager->halt_cond, &manager->halt_lock);
|
||||
}
|
||||
@@ -1656,7 +1692,8 @@ isc_task_beginexclusive(isc_task_t *task0) {
|
||||
INSIST(!atomic_load_relaxed(&manager->exclusive_req) &&
|
||||
!atomic_load_relaxed(&manager->pause_req));
|
||||
atomic_store_relaxed(&manager->exclusive_req, true);
|
||||
while (manager->halted + 1 < manager->workers) {
|
||||
while (manager->halted + 1 <
|
||||
(manager->bound_workers + manager->loose_workers)) {
|
||||
wake_all_queues(manager);
|
||||
WAIT(&manager->halt_cond, &manager->halt_lock);
|
||||
}
|
||||
@@ -1823,7 +1860,7 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr0, void *writer0) {
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* type */
|
||||
|
||||
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "worker-threads"));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->workers));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->bound_workers));
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* worker-threads */
|
||||
|
||||
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum"));
|
||||
@@ -1932,7 +1969,7 @@ isc_taskmgr_renderjson(isc_taskmgr_t *mgr0, void *tasks0) {
|
||||
CHECKMEM(obj);
|
||||
json_object_object_add(tasks, "thread-model", obj);
|
||||
|
||||
obj = json_object_new_int(mgr->workers);
|
||||
obj = json_object_new_int(mgr->bound_workers);
|
||||
CHECKMEM(obj);
|
||||
json_object_object_add(tasks, "worker-threads", obj);
|
||||
|
||||
|
||||
@@ -2698,10 +2698,9 @@ send_recvdone_event(isc__socket_t *sock, isc_socketevent_t **dev) {
|
||||
}
|
||||
|
||||
if (((*dev)->attributes & ISC_SOCKEVENTATTR_ATTACHED) != 0) {
|
||||
isc_task_sendtoanddetach(&task, (isc_event_t **)dev,
|
||||
sock->threadid);
|
||||
isc_task_sendanddetach(&task, (isc_event_t **)dev);
|
||||
} else {
|
||||
isc_task_sendto(task, (isc_event_t **)dev, sock->threadid);
|
||||
isc_task_send(task, (isc_event_t **)dev);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2724,10 +2723,9 @@ send_senddone_event(isc__socket_t *sock, isc_socketevent_t **dev) {
|
||||
}
|
||||
|
||||
if (((*dev)->attributes & ISC_SOCKEVENTATTR_ATTACHED) != 0) {
|
||||
isc_task_sendtoanddetach(&task, (isc_event_t **)dev,
|
||||
sock->threadid);
|
||||
isc_task_sendanddetach(&task, (isc_event_t **)dev);
|
||||
} else {
|
||||
isc_task_sendto(task, (isc_event_t **)dev, sock->threadid);
|
||||
isc_task_send(task, (isc_event_t **)dev);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2749,7 +2747,7 @@ send_connectdone_event(isc__socket_t *sock, isc_socket_connev_t **dev) {
|
||||
ISC_LIST_DEQUEUE(sock->connect_list, *dev, ev_link);
|
||||
}
|
||||
|
||||
isc_task_sendtoanddetach(&task, (isc_event_t **)dev, sock->threadid);
|
||||
isc_task_sendanddetach(&task, (isc_event_t **)dev);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -3012,7 +3010,7 @@ internal_accept(isc__socket_t *sock) {
|
||||
task = dev->ev_sender;
|
||||
dev->ev_sender = sock;
|
||||
|
||||
isc_task_sendtoanddetach(&task, ISC_EVENT_PTR(&dev), sock->threadid);
|
||||
isc_task_sendanddetach(&task, ISC_EVENT_PTR(&dev));
|
||||
return;
|
||||
|
||||
soft_error:
|
||||
@@ -4729,7 +4727,7 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr,
|
||||
if (sock->connected) {
|
||||
INSIST(isc_sockaddr_equal(&sock->peer_address, addr));
|
||||
dev->result = ISC_R_SUCCESS;
|
||||
isc_task_sendto(task, ISC_EVENT_PTR(&dev), sock->threadid);
|
||||
isc_task_send(task, ISC_EVENT_PTR(&dev));
|
||||
|
||||
UNLOCK(&sock->lock);
|
||||
|
||||
@@ -4798,7 +4796,7 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr,
|
||||
|
||||
err_exit:
|
||||
sock->connected = 0;
|
||||
isc_task_sendto(task, ISC_EVENT_PTR(&dev), sock->threadid);
|
||||
isc_task_send(task, ISC_EVENT_PTR(&dev));
|
||||
|
||||
UNLOCK(&sock->lock);
|
||||
inc_stats(sock->manager->stats,
|
||||
@@ -4814,7 +4812,7 @@ success:
|
||||
sock->connected = 1;
|
||||
sock->bound = 1;
|
||||
dev->result = ISC_R_SUCCESS;
|
||||
isc_task_sendto(task, ISC_EVENT_PTR(&dev), sock->threadid);
|
||||
isc_task_send(task, ISC_EVENT_PTR(&dev));
|
||||
|
||||
UNLOCK(&sock->lock);
|
||||
|
||||
@@ -5102,9 +5100,8 @@ isc_socket_cancel(isc_socket_t *sock0, isc_task_t *task, unsigned int how) {
|
||||
|
||||
dev->result = ISC_R_CANCELED;
|
||||
dev->ev_sender = sock;
|
||||
isc_task_sendtoanddetach(¤t_task,
|
||||
ISC_EVENT_PTR(&dev),
|
||||
sock->threadid);
|
||||
isc_task_sendanddetach(¤t_task,
|
||||
ISC_EVENT_PTR(&dev));
|
||||
}
|
||||
|
||||
dev = next;
|
||||
|
||||
Reference in New Issue
Block a user