Compare commits

...

1 Commits

Author SHA1 Message Date
Witold Kręcicki
04d46e0693 test tn patches 2020-06-19 16:20:08 +02:00
7 changed files with 172 additions and 98 deletions

View File

@@ -1486,7 +1486,7 @@ dns_rpz_new_zones(dns_rpz_zones_t **rpzsp, char *rps_cstr, size_t rps_cstr_size,
goto cleanup_rbt;
}
result = isc_task_create(taskmgr, 0, &zones->updater);
result = isc_task_create(taskmgr, 5, &zones->updater);
if (result != ISC_R_SUCCESS) {
goto cleanup_task;
}

View File

@@ -136,7 +136,9 @@ typedef enum isc__netievent_type {
netievent_tcpaccept,
netievent_tcpstop,
netievent_tcpclose,
netievent_tcpdnsclose,
netievent_tcpdnssend,
netievent_closecb,
netievent_shutdown,
@@ -227,6 +229,7 @@ typedef struct isc__netievent__socket_req {
typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t;
typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t;
typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t;
typedef isc__netievent__socket_req_t isc__netievent_tcpdnssend_t;
typedef struct isc__netievent__socket_streaminfo_quota {
isc__netievent_type type;
@@ -746,6 +749,9 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock);
void
isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0);
#define isc__nm_uverr2result(x) \
isc___nm_uverr2result(x, true, __FILE__, __LINE__)
isc_result_t

View File

@@ -621,6 +621,9 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
case netievent_tcpsend:
isc__nm_async_tcpsend(worker, ievent);
break;
case netievent_tcpdnssend:
isc__nm_async_tcpdnssend(worker, ievent);
break;
case netievent_tcpstop:
isc__nm_async_tcpstop(worker, ievent);
break;
@@ -827,10 +830,13 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) {
if (active_handles == 0 || sock->tcphandle != NULL) {
destroy = true;
}
UNLOCK(&sock->lock);
if (destroy) {
atomic_store(&sock->destroying, true);
UNLOCK(&sock->lock);
nmsocket_cleanup(sock, true);
} else {
UNLOCK(&sock->lock);
}
}

View File

@@ -365,15 +365,6 @@ isc_nm_tcpdns_keepalive(isc_nmhandle_t *handle) {
atomic_store(&handle->sock->outer->keepalive, true);
}
typedef struct tcpsend {
isc_mem_t *mctx;
isc_nmhandle_t *handle;
isc_region_t region;
isc_nmhandle_t *orighandle;
isc_nm_cb_t cb;
void *cbarg;
} tcpsend_t;
static void
resume_processing(void *arg) {
isc_nmsocket_t *sock = (isc_nmsocket_t *)arg;
@@ -445,15 +436,40 @@ resume_processing(void *arg) {
static void
tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
tcpsend_t *ts = (tcpsend_t *)cbarg;
isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)cbarg;
UNUSED(handle);
ts->cb(ts->orighandle, result, ts->cbarg);
isc_mem_put(ts->mctx, ts->region.base, ts->region.length);
req->cb.send(req->handle, result, req->cbarg);
isc_mem_put(req->sock->mgr->mctx, req->uvbuf.base, req->uvbuf.len);
isc__nm_uvreq_put(&req, req->handle->sock);
}
isc_nmhandle_unref(ts->orighandle);
isc_mem_putanddetach(&ts->mctx, ts, sizeof(*ts));
void
isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) {
isc_result_t result;
isc__netievent_tcpdnssend_t *ievent =
(isc__netievent_tcpdnssend_t *)ev0;
isc__nm_uvreq_t *req = ievent->req;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(worker->id == sock->tid);
result = ISC_R_NOTCONNECTED;
if (atomic_load(&sock->active)) {
isc_region_t r;
r.base = (unsigned char *)req->uvbuf.base;
r.length = req->uvbuf.len;
result = isc__nm_tcp_send(sock->outer->tcphandle, &r,
tcpdnssend_cb, req);
}
if (result != ISC_R_SUCCESS) {
req->cb.send(req->handle, result, req->cbarg);
isc_mem_put(sock->mgr->mctx, req->uvbuf.base, req->uvbuf.len);
isc__nm_uvreq_put(&req, sock);
}
}
/*
@@ -462,7 +478,7 @@ tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_result_t
isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
isc_nm_cb_t cb, void *cbarg) {
tcpsend_t *t = NULL;
isc__nm_uvreq_t *uvreq = NULL;
REQUIRE(VALID_NMHANDLE(handle));
@@ -471,36 +487,47 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpdnssocket);
if (sock->outer == NULL) {
/* The socket is closed */
return (ISC_R_NOTCONNECTED);
uvreq = isc__nm_uvreq_get(sock->mgr, sock);
uvreq->handle = handle;
isc_nmhandle_ref(uvreq->handle);
uvreq->cb.send = cb;
uvreq->cbarg = cbarg;
uvreq->uvbuf.base = isc_mem_get(sock->mgr->mctx, region->length + 2);
uvreq->uvbuf.len = region->length + 2;
*(uint16_t *)uvreq->uvbuf.base = htons(region->length);
memmove(uvreq->uvbuf.base + 2, region->base, region->length);
if (sock->tid == isc_nm_tid()) {
isc_region_t r;
r.base = (unsigned char *)uvreq->uvbuf.base;
r.length = uvreq->uvbuf.len;
return (isc__nm_tcp_send(sock->outer->tcphandle, &r,
tcpdnssend_cb, uvreq));
} else {
isc__netievent_tcpdnssend_t *ievent = NULL;
ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpdnssend);
ievent->req = uvreq;
ievent->sock = sock;
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
return (ISC_R_SUCCESS);
}
t = isc_mem_get(sock->mgr->mctx, sizeof(*t));
*t = (tcpsend_t){
.cb = cb,
.cbarg = cbarg,
.handle = handle->sock->outer->tcphandle,
};
isc_mem_attach(sock->mgr->mctx, &t->mctx);
t->orighandle = handle;
isc_nmhandle_ref(t->orighandle);
t->region = (isc_region_t){ .base = isc_mem_get(t->mctx,
region->length + 2),
.length = region->length + 2 };
*(uint16_t *)t->region.base = htons(region->length);
memmove(t->region.base + 2, region->base, region->length);
return (isc_nm_send(t->handle, &t->region, tcpdnssend_cb, t));
return (ISC_R_UNEXPECTED);
}
static void
tcpdns_close_direct(isc_nmsocket_t *sock) {
REQUIRE(sock->tid == isc_nm_tid());
/* We don't need atomics here, it's all in single network thread */
if (sock->timer_initialized) {
/*
* We need to fire the timer callback to clean it up,

View File

@@ -209,19 +209,6 @@ static void
stoplistening(isc_nmsocket_t *sock) {
REQUIRE(sock->type == isc_nm_udplistener);
/*
* Socket is already closing; there's nothing to do.
*/
if (!isc__nmsocket_active(sock)) {
return;
}
/*
* Mark it inactive now so that all sends will be ignored
* and we won't try to stop listening again.
*/
atomic_store(&sock->active, false);
for (int i = 0; i < sock->nchildren; i++) {
isc__netievent_udpstop_t *event = NULL;
@@ -255,6 +242,18 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_udplistener);
/*
* Socket is already closing; there's nothing to do.
*/
if (!isc__nmsocket_active(sock)) {
return;
}
/*
* Mark it inactive now so that all sends will be ignored
* and we won't try to stop listening again.
*/
atomic_store(&sock->active, false);
/*
* If the manager is interlocked, re-enqueue this as an asynchronous
* event. Otherwise, go ahead and stop listening right away.
@@ -330,24 +329,23 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
#endif
/*
* If addr == NULL that's the end of stream - we can
* free the buffer and bail.
* Three reasons to return now without processing
* - If addr == NULL that's the end of stream - we can
* free the buffer and bail.
* - Simulate a firewall blocking UDP packets bigger than
* 'maxudp' bytes.
* - Socket is no longer active.
*/
if (addr == NULL) {
maxudp = atomic_load(&sock->mgr->maxudp);
if ((addr == NULL) ||
(maxudp != 0 && (uint32_t)nrecv > maxudp) ||
(!isc__nmsocket_active(sock))) {
if (free_buf) {
isc__nm_free_uvbuf(sock, buf);
}
return;
}
/*
* Simulate a firewall blocking UDP packets bigger than
* 'maxudp' bytes.
*/
maxudp = atomic_load(&sock->mgr->maxudp);
if (maxudp != 0 && (uint32_t)nrecv > maxudp) {
return;
}
result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
@@ -509,6 +507,9 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(sock->type == isc_nm_udpsocket);
if (!isc__nmsocket_active(sock)) {
return (ISC_R_CANCELED);
}
isc_nmhandle_ref(req->handle);
rv = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp,
&req->uvbuf, 1, &peer->type.sa, udp_send_cb);

View File

@@ -17,6 +17,7 @@
*/
#include <stdbool.h>
#include <unistd.h>
#include <isc/app.h>
#include <isc/atomic.h>
@@ -152,6 +153,7 @@ struct isc__taskqueue {
isc_thread_t thread;
unsigned int threadid;
isc__taskmgr_t *manager;
bool bound;
};
struct isc__taskmgr {
@@ -161,7 +163,8 @@ struct isc__taskmgr {
isc_mutex_t lock;
isc_mutex_t halt_lock;
isc_condition_t halt_cond;
unsigned int workers;
unsigned int bound_workers;
unsigned int loose_workers;
atomic_uint_fast32_t tasks_running;
atomic_uint_fast32_t tasks_ready;
atomic_uint_fast32_t curq;
@@ -229,7 +232,9 @@ wake_all_queues(isc__taskmgr_t *manager);
static inline void
wake_all_queues(isc__taskmgr_t *manager) {
for (unsigned int i = 0; i < manager->workers; i++) {
for (unsigned int i = 0;
i < manager->bound_workers + manager->loose_workers; i++)
{
LOCK(&manager->queues[i].lock);
BROADCAST(&manager->queues[i].work_available);
UNLOCK(&manager->queues[i].lock);
@@ -301,7 +306,7 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
* by a specific thread.
*/
task->bound = true;
task->threadid = threadid % manager->workers;
task->threadid = threadid % manager->bound_workers;
}
isc_mutex_init(&task->lock);
@@ -543,11 +548,13 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
/* If task is bound ignore provided cpu. */
if (task->bound) {
c = task->threadid;
c %= task->manager->bound_workers;
} else if (c < 0) {
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
memory_order_relaxed);
c %= task->manager->loose_workers;
c += task->manager->bound_workers;
}
c %= task->manager->workers;
was_idle = task_send(task, eventp, c);
UNLOCK(&task->lock);
@@ -589,11 +596,13 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
LOCK(&task->lock);
if (task->bound) {
c = task->threadid;
c %= task->manager->bound_workers;
} else if (c < 0) {
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
memory_order_relaxed);
c %= task->manager->loose_workers;
c += task->manager->bound_workers;
}
c %= task->manager->workers;
idle1 = task_send(task, eventp, c);
idle2 = task_detach(task);
UNLOCK(&task->lock);
@@ -972,6 +981,7 @@ push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
static void
dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
isc__task_t *task;
isc__task_t *last_task = NULL;
REQUIRE(VALID_MANAGER(manager));
@@ -1117,7 +1127,14 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
memory_order_release) > 0);
atomic_fetch_add_explicit(&manager->tasks_running, 1,
memory_order_acquire);
if (!manager->queues[threadid].bound && task == last_task) {
/*
* XXXWPK don't let an unbound worker run one
* task continously. This is just an experiment...
*/
usleep(100);
}
last_task = task;
LOCK(&task->lock);
/*
* It is possible because that we have a paused task
@@ -1305,8 +1322,11 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
{
bool empty = true;
unsigned int i;
for (i = 0; i < manager->workers && empty; i++)
{
for (i = 0;
i < manager->bound_workers +
manager->loose_workers &&
empty;
i++) {
LOCK(&manager->queues[i].lock);
empty &= empty_readyq(manager, i);
UNLOCK(&manager->queues[i].lock);
@@ -1337,7 +1357,9 @@ static isc_threadresult_t
isc__taskqueue_t *tq = queuep;
isc__taskmgr_t *manager = tq->manager;
int threadid = tq->threadid;
isc_thread_setaffinity(threadid);
if (tq->bound) {
isc_thread_setaffinity(threadid);
}
XTHREADTRACE("starting");
@@ -1354,7 +1376,9 @@ static isc_threadresult_t
static void
manager_free(isc__taskmgr_t *manager) {
for (unsigned int i = 0; i < manager->workers; i++) {
for (unsigned int i = 0;
i < manager->bound_workers + manager->loose_workers; i++)
{
isc_mutex_destroy(&manager->queues[i].lock);
isc_condition_destroy(&manager->queues[i].work_available);
}
@@ -1363,7 +1387,8 @@ manager_free(isc__taskmgr_t *manager) {
isc_mutex_destroy(&manager->halt_lock);
isc_condition_destroy(&manager->halt_cond);
isc_mem_put(manager->mctx, manager->queues,
manager->workers * sizeof(isc__taskqueue_t));
(manager->bound_workers + manager->loose_workers) *
sizeof(isc__taskqueue_t));
manager->common.impmagic = 0;
manager->common.magic = 0;
isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
@@ -1394,7 +1419,8 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
isc_mutex_init(&manager->halt_lock);
isc_condition_init(&manager->halt_cond);
manager->workers = workers;
manager->bound_workers = workers;
manager->loose_workers = 2; /* XXXWPK choosen by a random dice roll */
if (default_quantum == 0) {
default_quantum = DEFAULT_DEFAULT_QUANTUM;
@@ -1407,7 +1433,9 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
INIT_LIST(manager->tasks);
atomic_store(&manager->tasks_count, 0);
manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
manager->queues = isc_mem_get(
mctx, (manager->bound_workers + manager->loose_workers) *
sizeof(isc__taskqueue_t));
RUNTIME_CHECK(manager->queues != NULL);
atomic_init(&manager->tasks_running, 0);
@@ -1423,7 +1451,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
/*
* Start workers.
*/
for (i = 0; i < workers; i++) {
for (i = 0; i < manager->bound_workers + manager->loose_workers; i++) {
INIT_LIST(manager->queues[i].ready_tasks);
INIT_LIST(manager->queues[i].ready_priority_tasks);
isc_mutex_init(&manager->queues[i].lock);
@@ -1431,15 +1459,21 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
manager->queues[i].manager = manager;
manager->queues[i].threadid = i;
manager->queues[i].bound = (i < manager->bound_workers);
isc_thread_create(run, &manager->queues[i],
&manager->queues[i].thread);
char name[21];
snprintf(name, sizeof(name), "isc-worker%04u", i);
if (manager->queues[i].bound) {
snprintf(name, sizeof(name), "isc-b-wrkr-%03u", i);
} else {
snprintf(name, sizeof(name), "isc-l-wrkr-%03u",
i - manager->bound_workers);
}
isc_thread_setname(manager->queues[i].thread, name);
}
UNLOCK(&manager->lock);
isc_thread_setconcurrency(workers);
isc_thread_setconcurrency(3 * workers);
*managerp = (isc_taskmgr_t *)manager;
@@ -1530,7 +1564,8 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
/*
* Wait for all the worker threads to exit.
*/
for (i = 0; i < manager->workers; i++) {
for (i = 0; i < (manager->bound_workers + manager->loose_workers); i++)
{
isc_thread_join(manager->queues[i].thread, NULL);
}
@@ -1574,7 +1609,8 @@ isc__taskmgr_pause(isc_taskmgr_t *manager0) {
}
atomic_store_relaxed(&manager->pause_req, true);
while (manager->halted < manager->workers) {
while (manager->halted <
(manager->bound_workers + manager->loose_workers)) {
wake_all_queues(manager);
WAIT(&manager->halt_cond, &manager->halt_lock);
}
@@ -1656,7 +1692,8 @@ isc_task_beginexclusive(isc_task_t *task0) {
INSIST(!atomic_load_relaxed(&manager->exclusive_req) &&
!atomic_load_relaxed(&manager->pause_req));
atomic_store_relaxed(&manager->exclusive_req, true);
while (manager->halted + 1 < manager->workers) {
while (manager->halted + 1 <
(manager->bound_workers + manager->loose_workers)) {
wake_all_queues(manager);
WAIT(&manager->halt_cond, &manager->halt_lock);
}
@@ -1823,7 +1860,7 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr0, void *writer0) {
TRY0(xmlTextWriterEndElement(writer)); /* type */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "worker-threads"));
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->workers));
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->bound_workers));
TRY0(xmlTextWriterEndElement(writer)); /* worker-threads */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum"));
@@ -1932,7 +1969,7 @@ isc_taskmgr_renderjson(isc_taskmgr_t *mgr0, void *tasks0) {
CHECKMEM(obj);
json_object_object_add(tasks, "thread-model", obj);
obj = json_object_new_int(mgr->workers);
obj = json_object_new_int(mgr->bound_workers);
CHECKMEM(obj);
json_object_object_add(tasks, "worker-threads", obj);

View File

@@ -2698,10 +2698,9 @@ send_recvdone_event(isc__socket_t *sock, isc_socketevent_t **dev) {
}
if (((*dev)->attributes & ISC_SOCKEVENTATTR_ATTACHED) != 0) {
isc_task_sendtoanddetach(&task, (isc_event_t **)dev,
sock->threadid);
isc_task_sendanddetach(&task, (isc_event_t **)dev);
} else {
isc_task_sendto(task, (isc_event_t **)dev, sock->threadid);
isc_task_send(task, (isc_event_t **)dev);
}
}
@@ -2724,10 +2723,9 @@ send_senddone_event(isc__socket_t *sock, isc_socketevent_t **dev) {
}
if (((*dev)->attributes & ISC_SOCKEVENTATTR_ATTACHED) != 0) {
isc_task_sendtoanddetach(&task, (isc_event_t **)dev,
sock->threadid);
isc_task_sendanddetach(&task, (isc_event_t **)dev);
} else {
isc_task_sendto(task, (isc_event_t **)dev, sock->threadid);
isc_task_send(task, (isc_event_t **)dev);
}
}
@@ -2749,7 +2747,7 @@ send_connectdone_event(isc__socket_t *sock, isc_socket_connev_t **dev) {
ISC_LIST_DEQUEUE(sock->connect_list, *dev, ev_link);
}
isc_task_sendtoanddetach(&task, (isc_event_t **)dev, sock->threadid);
isc_task_sendanddetach(&task, (isc_event_t **)dev);
}
/*
@@ -3012,7 +3010,7 @@ internal_accept(isc__socket_t *sock) {
task = dev->ev_sender;
dev->ev_sender = sock;
isc_task_sendtoanddetach(&task, ISC_EVENT_PTR(&dev), sock->threadid);
isc_task_sendanddetach(&task, ISC_EVENT_PTR(&dev));
return;
soft_error:
@@ -4729,7 +4727,7 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr,
if (sock->connected) {
INSIST(isc_sockaddr_equal(&sock->peer_address, addr));
dev->result = ISC_R_SUCCESS;
isc_task_sendto(task, ISC_EVENT_PTR(&dev), sock->threadid);
isc_task_send(task, ISC_EVENT_PTR(&dev));
UNLOCK(&sock->lock);
@@ -4798,7 +4796,7 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr,
err_exit:
sock->connected = 0;
isc_task_sendto(task, ISC_EVENT_PTR(&dev), sock->threadid);
isc_task_send(task, ISC_EVENT_PTR(&dev));
UNLOCK(&sock->lock);
inc_stats(sock->manager->stats,
@@ -4814,7 +4812,7 @@ success:
sock->connected = 1;
sock->bound = 1;
dev->result = ISC_R_SUCCESS;
isc_task_sendto(task, ISC_EVENT_PTR(&dev), sock->threadid);
isc_task_send(task, ISC_EVENT_PTR(&dev));
UNLOCK(&sock->lock);
@@ -5102,9 +5100,8 @@ isc_socket_cancel(isc_socket_t *sock0, isc_task_t *task, unsigned int how) {
dev->result = ISC_R_CANCELED;
dev->ev_sender = sock;
isc_task_sendtoanddetach(&current_task,
ISC_EVENT_PTR(&dev),
sock->threadid);
isc_task_sendanddetach(&current_task,
ISC_EVENT_PTR(&dev));
}
dev = next;