diff --git a/lib/isc/async.c b/lib/isc/async.c index 326d3a0803..db4388964e 100644 --- a/lib/isc/async.c +++ b/lib/isc/async.c @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -45,42 +44,72 @@ isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job)); *job = (isc_job_t){ - .link = ISC_LINK_INITIALIZER, .cb = cb, .cbarg = cbarg, }; - /* - * Now send the half-initialized job to the loop queue. - */ - ISC_ASTACK_PUSH(loop->async_jobs, job, link); + cds_wfcq_node_init(&job->wfcq_node); - int r = uv_async_send(&loop->async_trigger); - UV_RUNTIME_CHECK(uv_async_send, r); + /* + * cds_wfcq_enqueue() is non-blocking and enqueues the job to async + * queue. + * + * The function returns 'false' in case the queue was empty - in such + * case we need to trigger the async callback. + */ + __tsan_release(job); + if (!cds_wfcq_enqueue(&loop->async_jobs.head, &loop->async_jobs.tail, + &job->wfcq_node)) + { + int r = uv_async_send(&loop->async_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); + } } void isc__async_cb(uv_async_t *handle) { isc_loop_t *loop = uv_handle_get_data(handle); + isc_jobqueue_t jobs; REQUIRE(VALID_LOOP(loop)); - ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->async_jobs); - ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER; + /* Initialize local wfcqueue */ + __cds_wfcq_init(&jobs.head, &jobs.tail); - isc_job_t *job = ISC_STACK_POP(drain, link); - isc_job_t *next = NULL; - while (job != NULL) { - ISC_LIST_PREPEND(jobs, job, link); - - job = ISC_STACK_POP(drain, link); + /* + * Move all the elements from loop->async_jobs to a local jobs queue. + * + * __cds_wfcq_splice_blocking() assumes that synchronization is + * done externally - there's no internal locking, unlike + * cds_wfcq_splice_blocking(), and we do not need to check whether + * it needs to block, unlike __cds_wfcq_splice_nonblocking(). + * + * The reason we can use __cds_wfcq_splice_blocking() is that the + * only other function we use is cds_wfcq_enqueue() which doesn't + * require any synchronization (see the table in urcu/wfcqueue.h + * for more details). + */ + enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( + &jobs.head, &jobs.tail, &loop->async_jobs.head, + &loop->async_jobs.tail); + INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK); + if (ret == CDS_WFCQ_RET_SRC_EMPTY) { + /* + * Nothing to do, the source queue was empty - most + * probably we were called from isc__async_close() below. + */ + return; } - for (job = ISC_LIST_HEAD(jobs), - next = (job ? ISC_LIST_NEXT(job, link) : NULL); - job != NULL; - job = next, next = (job ? ISC_LIST_NEXT(job, link) : NULL)) - { + /* + * Walk through the local queue which has now all the members copied + * locally, and call the callbacks and free all the isc_job_t(s). + */ + struct cds_wfcq_node *node, *next; + __cds_wfcq_for_each_blocking_safe(&jobs.head, &jobs.tail, node, next) { + isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node); + __tsan_acquire(job); + job->cb(job->cbarg); isc_mem_put(loop->mctx, job, sizeof(*job)); diff --git a/lib/isc/async_p.h b/lib/isc/async_p.h index 0d854cd0ae..1ce94b99e1 100644 --- a/lib/isc/async_p.h +++ b/lib/isc/async_p.h @@ -17,11 +17,8 @@ #include #include #include -#include #include -typedef ISC_ASTACK(isc_job_t) isc_asyncstack_t; - void isc__async_cb(uv_async_t *handle); diff --git a/lib/isc/include/isc/job.h b/lib/isc/include/isc/job.h index 8eeb60d0f4..9a44974c25 100644 --- a/lib/isc/include/isc/job.h +++ b/lib/isc/include/isc/job.h @@ -28,6 +28,7 @@ #include #include #include +#include typedef void (*isc_job_cb)(void *); typedef struct isc_job isc_job_t; @@ -35,12 +36,15 @@ typedef struct isc_job isc_job_t; struct isc_job { isc_job_cb cb; void *cbarg; - ISC_LINK(isc_job_t) link; + union { + struct cds_wfcq_node wfcq_node; + ISC_LINK(isc_job_t) link; + }; }; -#define ISC_JOB_INITIALIZER \ - { \ - .link = ISC_LINK_INITIALIZER \ +#define ISC_JOB_INITIALIZER \ + { \ + .link = ISC_LINK_INITIALIZER, \ } ISC_LANG_BEGINDECLS diff --git a/lib/isc/include/isc/loop.h b/lib/isc/include/isc/loop.h index 05440375a0..93330e60d5 100644 --- a/lib/isc/include/isc/loop.h +++ b/lib/isc/include/isc/loop.h @@ -122,11 +122,6 @@ isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg); * yet been started. */ -void -isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job); -void -isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job); - void isc_loopmgr_setup(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg); void diff --git a/lib/isc/job.c b/lib/isc/job.c index 9f64df73e8..963248df15 100644 --- a/lib/isc/job.c +++ b/lib/isc/job.c @@ -48,6 +48,7 @@ isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg) { job->cb = cb; job->cbarg = cbarg; + ISC_LINK_INIT(job, link); ISC_LIST_APPEND(loop->run_jobs, job, link); } diff --git a/lib/isc/job_p.h b/lib/isc/job_p.h index 89fa078077..06c7bc319e 100644 --- a/lib/isc/job_p.h +++ b/lib/isc/job_p.h @@ -13,10 +13,22 @@ #pragma once +#include #include #include +#include #include +/*% + * NOTE: We are using struct __cds_wfcq_head that doesn't have an internal + * mutex, because we are only using enqueue and splice, and those don't need + * any synchronization (see urcu/wfcqueue.h for detailed description). + */ +typedef struct isc_jobqueue { + alignas(ISC_OS_CACHELINE_SIZE) struct __cds_wfcq_head head; + alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail; +} isc_jobqueue_t; + typedef ISC_LIST(isc_job_t) isc_joblist_t; void diff --git a/lib/isc/loop.c b/lib/isc/loop.c index 77f3dcc52e..1cef7d35ad 100644 --- a/lib/isc/loop.c +++ b/lib/isc/loop.c @@ -159,7 +159,6 @@ destroy_cb(uv_async_t *handle) { static void shutdown_cb(uv_async_t *handle) { - isc_job_t *job = NULL; isc_loop_t *loop = uv_handle_get_data(handle); isc_loopmgr_t *loopmgr = loop->loopmgr; @@ -178,17 +177,12 @@ shutdown_cb(uv_async_t *handle) { isc_signal_destroy(&loopmgr->sigint); } - job = ISC_LIST_TAIL(loop->teardown_jobs); - while (job != NULL) { - isc_job_t *prev = ISC_LIST_PREV(job, link); - ISC_LIST_UNLINK(loop->teardown_jobs, job, link); - - job->cb(job->cbarg); - - isc_mem_put(loop->mctx, job, sizeof(*job)); - - job = prev; - } + enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( + &loop->async_jobs.head, &loop->async_jobs.tail, + &loop->teardown_jobs.head, &loop->teardown_jobs.tail); + INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK); + int r = uv_async_send(&loop->async_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); } static void @@ -202,12 +196,13 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) { *loop = (isc_loop_t){ .tid = tid, .loopmgr = loopmgr, - .async_jobs = ISC_ASTACK_INITIALIZER, .run_jobs = ISC_LIST_INITIALIZER, - .setup_jobs = ISC_LIST_INITIALIZER, - .teardown_jobs = ISC_LIST_INITIALIZER, }; + __cds_wfcq_init(&loop->async_jobs.head, &loop->async_jobs.tail); + __cds_wfcq_init(&loop->setup_jobs.head, &loop->setup_jobs.tail); + __cds_wfcq_init(&loop->teardown_jobs.head, &loop->teardown_jobs.tail); + int r = uv_loop_init(&loop->loop); UV_RUNTIME_CHECK(uv_loop_init, r); @@ -248,23 +243,6 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) { loop->magic = LOOP_MAGIC; } -static void -setup_jobs_cb(void *arg) { - isc_loop_t *loop = arg; - isc_job_t *job = ISC_LIST_HEAD(loop->setup_jobs); - - while (job != NULL) { - isc_job_t *next = ISC_LIST_NEXT(job, link); - ISC_LIST_UNLINK(loop->setup_jobs, job, link); - - job->cb(job->cbarg); - - isc_mem_put(loop->mctx, job, sizeof(*job)); - - job = next; - } -} - static void quiescent_cb(uv_prepare_t *handle) { isc__qsbr_quiescent_cb(handle); @@ -285,7 +263,7 @@ loop_close(isc_loop_t *loop) { int r = uv_loop_close(&loop->loop); UV_RUNTIME_CHECK(uv_loop_close, r); - INSIST(ISC_ASTACK_EMPTY(loop->async_jobs)); + INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail)); INSIST(ISC_LIST_EMPTY(loop->run_jobs)); loop->magic = 0; @@ -306,7 +284,13 @@ loop_thread(void *arg) { isc_barrier_wait(&loop->loopmgr->starting); - isc_async_run(loop, setup_jobs_cb, loop); + enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( + &loop->async_jobs.head, &loop->async_jobs.tail, + &loop->setup_jobs.head, &loop->setup_jobs.tail); + INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK); + + r = uv_async_send(&loop->async_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); r = uv_run(&loop->loop, UV_RUN_DEFAULT); UV_RUNTIME_CHECK(uv_run, r); @@ -316,16 +300,6 @@ loop_thread(void *arg) { return (NULL); } -void -isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job) { - ISC_LIST_DEQUEUE(loop->setup_jobs, job, link); -} - -void -isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job) { - ISC_LIST_DEQUEUE(loop->teardown_jobs, job, link); -} - /** * Public */ @@ -406,13 +380,15 @@ isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { *job = (isc_job_t){ .cb = cb, .cbarg = cbarg, - .link = ISC_LINK_INITIALIZER, }; + cds_wfcq_node_init(&job->wfcq_node); + REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || atomic_load(&loopmgr->paused)); - ISC_LIST_APPEND(loop->setup_jobs, job, link); + cds_wfcq_enqueue(&loop->setup_jobs.head, &loop->setup_jobs.tail, + &job->wfcq_node); return (job); } @@ -426,13 +402,14 @@ isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { *job = (isc_job_t){ .cb = cb, .cbarg = cbarg, - .link = ISC_LINK_INITIALIZER, }; + cds_wfcq_node_init(&job->wfcq_node); REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || atomic_load(&loopmgr->paused)); - ISC_LIST_APPEND(loop->teardown_jobs, job, link); + cds_wfcq_enqueue(&loop->teardown_jobs.head, &loop->teardown_jobs.tail, + &job->wfcq_node); return (job); } diff --git a/lib/isc/loop_p.h b/lib/isc/loop_p.h index 9594a0f306..b9bef13b19 100644 --- a/lib/isc/loop_p.h +++ b/lib/isc/loop_p.h @@ -25,9 +25,9 @@ #include #include #include -#include #include #include +#include #include #include @@ -58,7 +58,7 @@ struct isc_loop { /* Async queue */ uv_async_t async_trigger; - isc_asyncstack_t async_jobs; + isc_jobqueue_t async_jobs; /* Jobs queue */ uv_idle_t run_trigger; @@ -69,8 +69,8 @@ struct isc_loop { /* Shutdown */ uv_async_t shutdown_trigger; - isc_joblist_t setup_jobs; - isc_joblist_t teardown_jobs; + isc_jobqueue_t setup_jobs; + isc_jobqueue_t teardown_jobs; /* Destroy */ uv_async_t destroy_trigger; diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 583febaa8a..c861751555 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -692,8 +692,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker, .active_handles = ISC_LIST_INITIALIZER, .active_link = ISC_LINK_INITIALIZER, .active = true, - .job = ISC_JOB_INITIALIZER, - .quotacb = ISC_JOB_INITIALIZER, }; if (iface != NULL) { @@ -716,8 +714,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker, isc__networker_attach(worker, &sock->worker); sock->uv_handle.handle.data = sock; - ISC_LINK_INIT(&sock->quotacb, link); - switch (type) { case isc_nm_udpsocket: case isc_nm_udplistener: @@ -805,7 +801,6 @@ alloc_handle(isc_nmsocket_t *sock) { .magic = NMHANDLE_MAGIC, .active_link = ISC_LINK_INITIALIZER, .inactive_link = ISC_LINK_INITIALIZER, - .job = ISC_JOB_INITIALIZER, }; isc_refcount_init(&handle->references, 1); @@ -1522,7 +1517,6 @@ isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) { .connect_tries = 3, .link = ISC_LINK_INITIALIZER, .active_link = ISC_LINK_INITIALIZER, - .job = ISC_JOB_INITIALIZER, .magic = UVREQ_MAGIC, }; uv_handle_set_data(&req->uv_req.handle, req);