Change the isc_async API to use cds_wfcqueue internally

The isc_async API was using lock-free stack (where enqueue operation was
not wait-free).  Change the isc_async to use cds_wfcqueue internally -
enqueue and splice (move the queue members from one list to another) is
nonblocking and wait-free.
This commit is contained in:
Ondřej Surý
2023-05-08 23:31:54 +02:00
parent c90a9d6a09
commit 7b1d985de2
9 changed files with 100 additions and 91 deletions

View File

@@ -27,7 +27,6 @@
#include <isc/refcount.h> #include <isc/refcount.h>
#include <isc/result.h> #include <isc/result.h>
#include <isc/signal.h> #include <isc/signal.h>
#include <isc/stack.h>
#include <isc/strerr.h> #include <isc/strerr.h>
#include <isc/thread.h> #include <isc/thread.h>
#include <isc/util.h> #include <isc/util.h>
@@ -45,42 +44,72 @@ isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job)); isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
*job = (isc_job_t){ *job = (isc_job_t){
.link = ISC_LINK_INITIALIZER,
.cb = cb, .cb = cb,
.cbarg = cbarg, .cbarg = cbarg,
}; };
/* cds_wfcq_node_init(&job->wfcq_node);
* Now send the half-initialized job to the loop queue.
*/
ISC_ASTACK_PUSH(loop->async_jobs, job, link);
int r = uv_async_send(&loop->async_trigger); /*
UV_RUNTIME_CHECK(uv_async_send, r); * cds_wfcq_enqueue() is non-blocking and enqueues the job to async
* queue.
*
* The function returns 'false' in case the queue was empty - in such
* case we need to trigger the async callback.
*/
__tsan_release(job);
if (!cds_wfcq_enqueue(&loop->async_jobs.head, &loop->async_jobs.tail,
&job->wfcq_node))
{
int r = uv_async_send(&loop->async_trigger);
UV_RUNTIME_CHECK(uv_async_send, r);
}
} }
void void
isc__async_cb(uv_async_t *handle) { isc__async_cb(uv_async_t *handle) {
isc_loop_t *loop = uv_handle_get_data(handle); isc_loop_t *loop = uv_handle_get_data(handle);
isc_jobqueue_t jobs;
REQUIRE(VALID_LOOP(loop)); REQUIRE(VALID_LOOP(loop));
ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->async_jobs); /* Initialize local wfcqueue */
ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER; __cds_wfcq_init(&jobs.head, &jobs.tail);
isc_job_t *job = ISC_STACK_POP(drain, link); /*
isc_job_t *next = NULL; * Move all the elements from loop->async_jobs to a local jobs queue.
while (job != NULL) { *
ISC_LIST_PREPEND(jobs, job, link); * __cds_wfcq_splice_blocking() assumes that synchronization is
* done externally - there's no internal locking, unlike
job = ISC_STACK_POP(drain, link); * cds_wfcq_splice_blocking(), and we do not need to check whether
* it needs to block, unlike __cds_wfcq_splice_nonblocking().
*
* The reason we can use __cds_wfcq_splice_blocking() is that the
* only other function we use is cds_wfcq_enqueue() which doesn't
* require any synchronization (see the table in urcu/wfcqueue.h
* for more details).
*/
enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
&jobs.head, &jobs.tail, &loop->async_jobs.head,
&loop->async_jobs.tail);
INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
if (ret == CDS_WFCQ_RET_SRC_EMPTY) {
/*
* Nothing to do, the source queue was empty - most
* probably we were called from isc__async_close() below.
*/
return;
} }
for (job = ISC_LIST_HEAD(jobs), /*
next = (job ? ISC_LIST_NEXT(job, link) : NULL); * Walk through the local queue which has now all the members copied
job != NULL; * locally, and call the callbacks and free all the isc_job_t(s).
job = next, next = (job ? ISC_LIST_NEXT(job, link) : NULL)) */
{ struct cds_wfcq_node *node, *next;
__cds_wfcq_for_each_blocking_safe(&jobs.head, &jobs.tail, node, next) {
isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node);
__tsan_acquire(job);
job->cb(job->cbarg); job->cb(job->cbarg);
isc_mem_put(loop->mctx, job, sizeof(*job)); isc_mem_put(loop->mctx, job, sizeof(*job));

View File

@@ -17,11 +17,8 @@
#include <isc/job.h> #include <isc/job.h>
#include <isc/loop.h> #include <isc/loop.h>
#include <isc/mem.h> #include <isc/mem.h>
#include <isc/stack.h>
#include <isc/uv.h> #include <isc/uv.h>
typedef ISC_ASTACK(isc_job_t) isc_asyncstack_t;
void void
isc__async_cb(uv_async_t *handle); isc__async_cb(uv_async_t *handle);

View File

@@ -28,6 +28,7 @@
#include <isc/mem.h> #include <isc/mem.h>
#include <isc/refcount.h> #include <isc/refcount.h>
#include <isc/types.h> #include <isc/types.h>
#include <isc/urcu.h>
typedef void (*isc_job_cb)(void *); typedef void (*isc_job_cb)(void *);
typedef struct isc_job isc_job_t; typedef struct isc_job isc_job_t;
@@ -35,12 +36,15 @@ typedef struct isc_job isc_job_t;
struct isc_job { struct isc_job {
isc_job_cb cb; isc_job_cb cb;
void *cbarg; void *cbarg;
ISC_LINK(isc_job_t) link; union {
struct cds_wfcq_node wfcq_node;
ISC_LINK(isc_job_t) link;
};
}; };
#define ISC_JOB_INITIALIZER \ #define ISC_JOB_INITIALIZER \
{ \ { \
.link = ISC_LINK_INITIALIZER \ .link = ISC_LINK_INITIALIZER, \
} }
ISC_LANG_BEGINDECLS ISC_LANG_BEGINDECLS

View File

@@ -122,11 +122,6 @@ isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
* yet been started. * yet been started.
*/ */
void
isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job);
void
isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job);
void void
isc_loopmgr_setup(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg); isc_loopmgr_setup(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg);
void void

View File

@@ -48,6 +48,7 @@ isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg) {
job->cb = cb; job->cb = cb;
job->cbarg = cbarg; job->cbarg = cbarg;
ISC_LINK_INIT(job, link);
ISC_LIST_APPEND(loop->run_jobs, job, link); ISC_LIST_APPEND(loop->run_jobs, job, link);
} }

View File

@@ -13,10 +13,22 @@
#pragma once #pragma once
#include <isc/align.h>
#include <isc/job.h> #include <isc/job.h>
#include <isc/loop.h> #include <isc/loop.h>
#include <isc/os.h>
#include <isc/uv.h> #include <isc/uv.h>
/*%
* NOTE: We are using struct __cds_wfcq_head that doesn't have an internal
* mutex, because we are only using enqueue and splice, and those don't need
* any synchronization (see urcu/wfcqueue.h for detailed description).
*/
typedef struct isc_jobqueue {
alignas(ISC_OS_CACHELINE_SIZE) struct __cds_wfcq_head head;
alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail;
} isc_jobqueue_t;
typedef ISC_LIST(isc_job_t) isc_joblist_t; typedef ISC_LIST(isc_job_t) isc_joblist_t;
void void

View File

@@ -159,7 +159,6 @@ destroy_cb(uv_async_t *handle) {
static void static void
shutdown_cb(uv_async_t *handle) { shutdown_cb(uv_async_t *handle) {
isc_job_t *job = NULL;
isc_loop_t *loop = uv_handle_get_data(handle); isc_loop_t *loop = uv_handle_get_data(handle);
isc_loopmgr_t *loopmgr = loop->loopmgr; isc_loopmgr_t *loopmgr = loop->loopmgr;
@@ -178,17 +177,12 @@ shutdown_cb(uv_async_t *handle) {
isc_signal_destroy(&loopmgr->sigint); isc_signal_destroy(&loopmgr->sigint);
} }
job = ISC_LIST_TAIL(loop->teardown_jobs); enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
while (job != NULL) { &loop->async_jobs.head, &loop->async_jobs.tail,
isc_job_t *prev = ISC_LIST_PREV(job, link); &loop->teardown_jobs.head, &loop->teardown_jobs.tail);
ISC_LIST_UNLINK(loop->teardown_jobs, job, link); INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
int r = uv_async_send(&loop->async_trigger);
job->cb(job->cbarg); UV_RUNTIME_CHECK(uv_async_send, r);
isc_mem_put(loop->mctx, job, sizeof(*job));
job = prev;
}
} }
static void static void
@@ -202,12 +196,13 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
*loop = (isc_loop_t){ *loop = (isc_loop_t){
.tid = tid, .tid = tid,
.loopmgr = loopmgr, .loopmgr = loopmgr,
.async_jobs = ISC_ASTACK_INITIALIZER,
.run_jobs = ISC_LIST_INITIALIZER, .run_jobs = ISC_LIST_INITIALIZER,
.setup_jobs = ISC_LIST_INITIALIZER,
.teardown_jobs = ISC_LIST_INITIALIZER,
}; };
__cds_wfcq_init(&loop->async_jobs.head, &loop->async_jobs.tail);
__cds_wfcq_init(&loop->setup_jobs.head, &loop->setup_jobs.tail);
__cds_wfcq_init(&loop->teardown_jobs.head, &loop->teardown_jobs.tail);
int r = uv_loop_init(&loop->loop); int r = uv_loop_init(&loop->loop);
UV_RUNTIME_CHECK(uv_loop_init, r); UV_RUNTIME_CHECK(uv_loop_init, r);
@@ -248,23 +243,6 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
loop->magic = LOOP_MAGIC; loop->magic = LOOP_MAGIC;
} }
static void
setup_jobs_cb(void *arg) {
isc_loop_t *loop = arg;
isc_job_t *job = ISC_LIST_HEAD(loop->setup_jobs);
while (job != NULL) {
isc_job_t *next = ISC_LIST_NEXT(job, link);
ISC_LIST_UNLINK(loop->setup_jobs, job, link);
job->cb(job->cbarg);
isc_mem_put(loop->mctx, job, sizeof(*job));
job = next;
}
}
static void static void
quiescent_cb(uv_prepare_t *handle) { quiescent_cb(uv_prepare_t *handle) {
isc__qsbr_quiescent_cb(handle); isc__qsbr_quiescent_cb(handle);
@@ -285,7 +263,7 @@ loop_close(isc_loop_t *loop) {
int r = uv_loop_close(&loop->loop); int r = uv_loop_close(&loop->loop);
UV_RUNTIME_CHECK(uv_loop_close, r); UV_RUNTIME_CHECK(uv_loop_close, r);
INSIST(ISC_ASTACK_EMPTY(loop->async_jobs)); INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail));
INSIST(ISC_LIST_EMPTY(loop->run_jobs)); INSIST(ISC_LIST_EMPTY(loop->run_jobs));
loop->magic = 0; loop->magic = 0;
@@ -306,7 +284,13 @@ loop_thread(void *arg) {
isc_barrier_wait(&loop->loopmgr->starting); isc_barrier_wait(&loop->loopmgr->starting);
isc_async_run(loop, setup_jobs_cb, loop); enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
&loop->async_jobs.head, &loop->async_jobs.tail,
&loop->setup_jobs.head, &loop->setup_jobs.tail);
INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
r = uv_async_send(&loop->async_trigger);
UV_RUNTIME_CHECK(uv_async_send, r);
r = uv_run(&loop->loop, UV_RUN_DEFAULT); r = uv_run(&loop->loop, UV_RUN_DEFAULT);
UV_RUNTIME_CHECK(uv_run, r); UV_RUNTIME_CHECK(uv_run, r);
@@ -316,16 +300,6 @@ loop_thread(void *arg) {
return (NULL); return (NULL);
} }
void
isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job) {
ISC_LIST_DEQUEUE(loop->setup_jobs, job, link);
}
void
isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job) {
ISC_LIST_DEQUEUE(loop->teardown_jobs, job, link);
}
/** /**
* Public * Public
*/ */
@@ -406,13 +380,15 @@ isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
*job = (isc_job_t){ *job = (isc_job_t){
.cb = cb, .cb = cb,
.cbarg = cbarg, .cbarg = cbarg,
.link = ISC_LINK_INITIALIZER,
}; };
cds_wfcq_node_init(&job->wfcq_node);
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
atomic_load(&loopmgr->paused)); atomic_load(&loopmgr->paused));
ISC_LIST_APPEND(loop->setup_jobs, job, link); cds_wfcq_enqueue(&loop->setup_jobs.head, &loop->setup_jobs.tail,
&job->wfcq_node);
return (job); return (job);
} }
@@ -426,13 +402,14 @@ isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
*job = (isc_job_t){ *job = (isc_job_t){
.cb = cb, .cb = cb,
.cbarg = cbarg, .cbarg = cbarg,
.link = ISC_LINK_INITIALIZER,
}; };
cds_wfcq_node_init(&job->wfcq_node);
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
atomic_load(&loopmgr->paused)); atomic_load(&loopmgr->paused));
ISC_LIST_APPEND(loop->teardown_jobs, job, link); cds_wfcq_enqueue(&loop->teardown_jobs.head, &loop->teardown_jobs.tail,
&job->wfcq_node);
return (job); return (job);
} }

View File

@@ -25,9 +25,9 @@
#include <isc/refcount.h> #include <isc/refcount.h>
#include <isc/result.h> #include <isc/result.h>
#include <isc/signal.h> #include <isc/signal.h>
#include <isc/stack.h>
#include <isc/thread.h> #include <isc/thread.h>
#include <isc/types.h> #include <isc/types.h>
#include <isc/urcu.h>
#include <isc/uv.h> #include <isc/uv.h>
#include <isc/work.h> #include <isc/work.h>
@@ -58,7 +58,7 @@ struct isc_loop {
/* Async queue */ /* Async queue */
uv_async_t async_trigger; uv_async_t async_trigger;
isc_asyncstack_t async_jobs; isc_jobqueue_t async_jobs;
/* Jobs queue */ /* Jobs queue */
uv_idle_t run_trigger; uv_idle_t run_trigger;
@@ -69,8 +69,8 @@ struct isc_loop {
/* Shutdown */ /* Shutdown */
uv_async_t shutdown_trigger; uv_async_t shutdown_trigger;
isc_joblist_t setup_jobs; isc_jobqueue_t setup_jobs;
isc_joblist_t teardown_jobs; isc_jobqueue_t teardown_jobs;
/* Destroy */ /* Destroy */
uv_async_t destroy_trigger; uv_async_t destroy_trigger;

View File

@@ -692,8 +692,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
.active_handles = ISC_LIST_INITIALIZER, .active_handles = ISC_LIST_INITIALIZER,
.active_link = ISC_LINK_INITIALIZER, .active_link = ISC_LINK_INITIALIZER,
.active = true, .active = true,
.job = ISC_JOB_INITIALIZER,
.quotacb = ISC_JOB_INITIALIZER,
}; };
if (iface != NULL) { if (iface != NULL) {
@@ -716,8 +714,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
isc__networker_attach(worker, &sock->worker); isc__networker_attach(worker, &sock->worker);
sock->uv_handle.handle.data = sock; sock->uv_handle.handle.data = sock;
ISC_LINK_INIT(&sock->quotacb, link);
switch (type) { switch (type) {
case isc_nm_udpsocket: case isc_nm_udpsocket:
case isc_nm_udplistener: case isc_nm_udplistener:
@@ -805,7 +801,6 @@ alloc_handle(isc_nmsocket_t *sock) {
.magic = NMHANDLE_MAGIC, .magic = NMHANDLE_MAGIC,
.active_link = ISC_LINK_INITIALIZER, .active_link = ISC_LINK_INITIALIZER,
.inactive_link = ISC_LINK_INITIALIZER, .inactive_link = ISC_LINK_INITIALIZER,
.job = ISC_JOB_INITIALIZER,
}; };
isc_refcount_init(&handle->references, 1); isc_refcount_init(&handle->references, 1);
@@ -1522,7 +1517,6 @@ isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) {
.connect_tries = 3, .connect_tries = 3,
.link = ISC_LINK_INITIALIZER, .link = ISC_LINK_INITIALIZER,
.active_link = ISC_LINK_INITIALIZER, .active_link = ISC_LINK_INITIALIZER,
.job = ISC_JOB_INITIALIZER,
.magic = UVREQ_MAGIC, .magic = UVREQ_MAGIC,
}; };
uv_handle_set_data(&req->uv_req.handle, req); uv_handle_set_data(&req->uv_req.handle, req);