Refactor the core qp-trie code to use liburcu

A `dns_qmpulti_t` no longer needs to know about its loopmgr. We no
longer keep a linked list of `dns_qpmulti_t` that have reclamation
work, and we no longer mark chunks with the phase in which they are to
be reclaimed. Instead, empty chunks are listed in an array in a
`qp_rcu_t`, which is passed to call_rcu().
This commit is contained in:
Tony Finch
2023-03-08 14:28:06 +00:00
parent 05ca11e122
commit 6217e434b5
4 changed files with 153 additions and 165 deletions

View File

@@ -263,6 +263,7 @@ libdns_la_CPPFLAGS = \
$(AM_CPPFLAGS) \
$(LIBDNS_CFLAGS) \
$(LIBISC_CFLAGS) \
$(LIBURCU_CFLAGS) \
$(LIBUV_CFLAGS) \
$(OPENSSL_CFLAGS)
@@ -272,6 +273,7 @@ libdns_la_LDFLAGS = \
libdns_la_LIBADD = \
$(LIBISC_LIBS) \
$(LIBURCU_LIBS) \
$(LIBUV_LIBS) \
$(OPENSSL_LIBS)

View File

@@ -47,17 +47,11 @@
* version of the trie.
*
* For fast concurrent reads, call `dns_qpmulti_query()` to fill in a
* `dns_qpread_t`, which must be allocated on the stack. Readers can
* access a single version of the trie within the scope of an isc_loop
* thread (NOT an isc_work thread). We rely on the loop to bound the
* lifetime of a `dns_qpread_t`, instead of using locks. Readers are
* not blocked by any write activity, and vice versa.
*
* For read-only access outside the scope of a loop, such as from an
* isc_work callback, `dns_qpmulti_lockedread()`. This looks like a
* query transaction; the difference is that a locked read transaction
* takes the `dns_qpmulti_t` mutex. When you have finished with a
* `dns_qpread_t`, call `dns_qpread_destroy()` to release the mutex.
* `dns_qpread_t`, which must be allocated on the stack. This gives
* the reader access to a single version of the trie. The reader's
* thread must be registered with `liburcu`, which is normally taken
* care of by `libisc`. Readers are not blocked by any write activity,
* and vice versa.
*
* For reads that need a stable view of the trie for multiple cycles
* of an isc_loop, or which can be used from any thread, call
@@ -314,17 +308,15 @@ dns_qp_destroy(dns_qp_t **qptp);
*/
void
dns_qpmulti_create(isc_mem_t *mctx, isc_loopmgr_t *loopmgr,
const dns_qpmethods_t *methods, void *uctx,
dns_qpmulti_create(isc_mem_t *mctx, const dns_qpmethods_t *methods, void *uctx,
dns_qpmulti_t **qpmp);
/*%<
* Create a multi-threaded qp-trie.
*
* Requires:
* \li `mctx` is a pointer to a valid memory context.
* \li 'loopmgr' is a valid loop manager.
* \li `qpmp != NULL && *qpmp == NULL`
* \li all the methods are non-NULL
* \li `qpmp != NULL && *qpmp == NULL`
*
* Ensures:
* \li `*qpmp` is a pointer to a valid multi-threaded qp-trie

View File

@@ -28,17 +28,16 @@
#include <isc/atomic.h>
#include <isc/buffer.h>
#include <isc/loop.h>
#include <isc/magic.h>
#include <isc/mem.h>
#include <isc/mutex.h>
#include <isc/qsbr.h>
#include <isc/refcount.h>
#include <isc/result.h>
#include <isc/rwlock.h>
#include <isc/tid.h>
#include <isc/time.h>
#include <isc/types.h>
#include <isc/urcu.h>
#include <isc/util.h>
#include <dns/fixedname.h>
@@ -437,6 +436,7 @@ realloc_chunk_arrays(dns_qp_t *qp, qp_chunk_t newmax) {
}
memset(&qp->base->ptr[qp->chunk_max], 0, newptrs - oldptrs);
isc_refcount_init(&qp->base->refcount, 1);
qp->base->magic = QPBASE_MAGIC;
/* usage array is exclusive to the writer */
size_t oldusage = sizeof(qp->usage[0]) * qp->chunk_max;
@@ -555,12 +555,14 @@ chunk_usage(dns_qp_t *qp, qp_chunk_t chunk) {
*/
static void
chunk_discount(dns_qp_t *qp, qp_chunk_t chunk) {
if (qp->usage[chunk].phase == 0) {
INSIST(qp->used_count >= qp->usage[chunk].used);
INSIST(qp->free_count >= qp->usage[chunk].free);
qp->used_count -= qp->usage[chunk].used;
qp->free_count -= qp->usage[chunk].free;
if (qp->usage[chunk].discounted) {
return;
}
INSIST(qp->used_count >= qp->usage[chunk].used);
INSIST(qp->free_count >= qp->usage[chunk].free);
qp->used_count -= qp->usage[chunk].used;
qp->free_count -= qp->usage[chunk].free;
qp->usage[chunk].discounted = true;
}
/*
@@ -622,109 +624,97 @@ recycle(dns_qp_t *qp) {
}
/*
* At the end of a transaction, mark empty but immutable chunks for
* reclamation later. Returns true when chunks need reclaiming later.
* asynchronous cleanup
*/
static bool
defer_chunk_reclamation(dns_qp_t *qp, isc_qsbr_phase_t phase) {
unsigned int reclaim = 0;
static void
reclaim_chunks_cb(struct rcu_head *rcu_head) {
qp_rcuctx_t *rcuctx = caa_container_of(rcu_head, qp_rcuctx_t, rcu_head);
__tsan_acquire(rcuctx);
REQUIRE(QPRCU_VALID(rcuctx));
dns_qpmulti_t *multi = rcuctx->multi;
REQUIRE(QPMULTI_VALID(multi));
for (qp_chunk_t chunk = 0; chunk < qp->chunk_max; chunk++) {
if (chunk != qp->bump && chunk_usage(qp, chunk) == 0 &&
qp->usage[chunk].exists && qp->usage[chunk].immutable &&
qp->usage[chunk].phase == 0)
{
chunk_discount(qp, chunk);
qp->usage[chunk].phase = phase;
reclaim++;
}
}
LOCK(&multi->mutex);
if (reclaim > 0) {
LOG_STATS("qp will reclaim %u chunks in phase %u", reclaim,
phase);
}
dns_qp_t *qp = &multi->writer;
REQUIRE(QP_VALID(qp));
return (reclaim > 0);
}
static bool
reclaim_chunks(dns_qp_t *qp, isc_qsbr_phase_t phase) {
unsigned int free = 0;
bool more = false;
isc_nanosecs_t start = isc_time_monotonic();
for (qp_chunk_t chunk = 0; chunk < qp->chunk_max; chunk++) {
if (qp->usage[chunk].phase == phase) {
if (qp->usage[chunk].snapshot) {
/* cleanup when snapshot is destroyed */
qp->usage[chunk].snapfree = true;
} else {
chunk_free(qp, chunk);
free++;
}
} else if (qp->usage[chunk].phase != 0) {
/*
* We need to reclaim more of this trie's memory
* on a later qsbr callback.
*/
more = true;
for (unsigned int i = 0; i < rcuctx->count; i++) {
qp_chunk_t chunk = rcuctx->chunk[i];
if (qp->usage[chunk].snapshot) {
/* cleanup when snapshot is destroyed */
qp->usage[chunk].snapfree = true;
} else {
chunk_free(qp, chunk);
free++;
}
}
isc_mem_putanddetach(&rcuctx->mctx, rcuctx,
STRUCT_FLEX_SIZE(rcuctx, chunk, rcuctx->count));
isc_nanosecs_t time = isc_time_monotonic() - start;
recycle_time += time;
if (free > 0) {
LOG_STATS("qp reclaim" PRItime "phase %u free %u chunks", time,
phase, free);
LOG_STATS("qp reclaim" PRItime "free %u chunks", time, free);
LOG_STATS("qp reclaim leaf %u live %u used %u free %u hold %u",
qp->leaf_count, qp->used_count - qp->free_count,
qp->used_count, qp->free_count, qp->hold_count);
}
return (more);
UNLOCK(&multi->mutex);
}
/*
* List of `dns_qpmulti_t`s that have chunks to be reclaimed.
*/
static ISC_ASTACK(dns_qpmulti_t) qsbr_work;
/*
* When a grace period has passed, this function reclaims any unused memory.
* At the end of a transaction, schedule empty but immutable chunks
* for reclamation later.
*/
static void
qp_qsbr_reclaimer(isc_qsbr_phase_t phase) {
ISC_STACK(dns_qpmulti_t) drain = ISC_ASTACK_TO_STACK(qsbr_work);
while (!ISC_STACK_EMPTY(drain)) {
/* lock before pop */
dns_qpmulti_t *multi = ISC_STACK_TOP(drain);
INSIST(QPMULTI_VALID(multi));
LOCK(&multi->mutex);
ISC_STACK_POP(drain, cleanup);
if (multi->writer.destroy) {
UNLOCK(&multi->mutex);
dns_qpmulti_destroy(&multi);
} else {
if (reclaim_chunks(&multi->writer, phase)) {
/* more to do next time */
ISC_ASTACK_PUSH(qsbr_work, multi, cleanup);
}
UNLOCK(&multi->mutex);
reclaim_chunks(dns_qpmulti_t *multi) {
dns_qp_t *qp = &multi->writer;
unsigned int count = 0;
for (qp_chunk_t chunk = 0; chunk < qp->chunk_max; chunk++) {
if (chunk != qp->bump && chunk_usage(qp, chunk) == 0 &&
qp->usage[chunk].exists && qp->usage[chunk].immutable &&
!qp->usage[chunk].discounted)
{
count++;
}
}
}
/*
* Register `qp_qsbr_reclaimer()` with QSBR at startup.
*/
static void
qp_qsbr_register(void) ISC_CONSTRUCTOR;
static void
qp_qsbr_register(void) {
isc_qsbr_register(qp_qsbr_reclaimer);
if (count == 0) {
return;
}
qp_rcuctx_t *rcuctx =
isc_mem_get(qp->mctx, STRUCT_FLEX_SIZE(rcuctx, chunk, count));
*rcuctx = (qp_rcuctx_t){
.magic = QPRCU_MAGIC,
.multi = multi,
.count = count,
};
isc_mem_attach(qp->mctx, &rcuctx->mctx);
unsigned int i = 0;
for (qp_chunk_t chunk = 0; chunk < qp->chunk_max; chunk++) {
if (chunk != qp->bump && chunk_usage(qp, chunk) == 0 &&
qp->usage[chunk].exists && qp->usage[chunk].immutable &&
!qp->usage[chunk].discounted)
{
rcuctx->chunk[i++] = chunk;
chunk_discount(qp, chunk);
}
}
__tsan_release(rcuctx);
call_rcu(&rcuctx->rcu_head, reclaim_chunks_cb);
LOG_STATS("qp will reclaim %u chunks", count);
}
/*
@@ -1121,6 +1111,8 @@ dns_qpmulti_update(dns_qpmulti_t *multi, dns_qp_t **qptp) {
memmove(rollback, qp, sizeof(*rollback));
/* can be uninitialized on the first transaction */
if (rollback->base != NULL) {
INSIST(QPBASE_VALID(rollback->base));
INSIST(qp->usage != NULL && qp->chunk_max > 0);
/* paired with either _commit() or _rollback() */
isc_refcount_increment(&rollback->base->refcount);
size_t usage_bytes = sizeof(qp->usage[0]) * qp->chunk_max;
@@ -1187,12 +1179,8 @@ dns_qpmulti_commit(dns_qpmulti_t *multi, dns_qp_t **qptp) {
recycle(qp);
}
/* the reclamation phase must be sampled after the commit */
isc_qsbr_phase_t phase = isc_qsbr_phase(multi->loopmgr);
if (defer_chunk_reclamation(qp, phase)) {
ISC_ASTACK_ADD(qsbr_work, multi, cleanup);
isc_qsbr_activate(multi->loopmgr, phase);
}
/* schedule the rest for later */
reclaim_chunks(multi);
*qptp = NULL;
UNLOCK(&multi->mutex);
@@ -1282,31 +1270,11 @@ dns_qpmulti_query(dns_qpmulti_t *multi, dns_qpread_t *qp) {
REQUIRE(QPMULTI_VALID(multi));
REQUIRE(qp != NULL);
/* we MUST be in an isc_loop thread */
qp->tid = isc_tid();
REQUIRE(qp->tid != ISC_TID_UNKNOWN);
dns_qpmulti_t *whence = reader_open(multi, qp);
INSIST(whence == multi);
}
/*
* a locked read takes the mutex
*/
void
dns_qpmulti_lockedread(dns_qpmulti_t *multi, dns_qpread_t *qp) {
REQUIRE(QPMULTI_VALID(multi));
REQUIRE(qp != NULL);
/* we MUST NOT be in an isc_loop thread */
qp->tid = isc_tid();
REQUIRE(qp->tid == ISC_TID_UNKNOWN);
LOCK(&multi->mutex);
dns_qpmulti_t *whence = reader_open(multi, qp);
INSIST(whence == multi);
rcu_read_lock();
}
void
@@ -1314,10 +1282,8 @@ dns_qpread_destroy(dns_qpmulti_t *multi, dns_qpread_t *qp) {
REQUIRE(QPMULTI_VALID(multi));
REQUIRE(QP_VALID(qp));
REQUIRE(qp->tid == isc_tid());
if (qp->tid == ISC_TID_UNKNOWN) {
UNLOCK(&multi->mutex);
}
*qp = (dns_qpread_t){};
rcu_read_unlock();
}
/*
@@ -1406,8 +1372,7 @@ dns_qp_create(isc_mem_t *mctx, const dns_qpmethods_t *methods, void *uctx,
}
void
dns_qpmulti_create(isc_mem_t *mctx, isc_loopmgr_t *loopmgr,
const dns_qpmethods_t *methods, void *uctx,
dns_qpmulti_create(isc_mem_t *mctx, const dns_qpmethods_t *methods, void *uctx,
dns_qpmulti_t **qpmp) {
REQUIRE(qpmp != NULL && *qpmp == NULL);
@@ -1415,8 +1380,6 @@ dns_qpmulti_create(isc_mem_t *mctx, isc_loopmgr_t *loopmgr,
*multi = (dns_qpmulti_t){
.magic = QPMULTI_MAGIC,
.reader_ref = INVALID_REF,
.loopmgr = loopmgr,
.cleanup = ISC_SLINK_INITIALIZER,
};
isc_mutex_init(&multi->mutex);
ISC_LIST_INIT(multi->snapshots);
@@ -1468,29 +1431,50 @@ dns_qp_destroy(dns_qp_t **qptp) {
isc_mem_putanddetach(&qp->mctx, qp, sizeof(*qp));
}
static void
qpmulti_destroy_cb(struct rcu_head *rcu_head) {
qp_rcuctx_t *rcuctx = caa_container_of(rcu_head, qp_rcuctx_t, rcu_head);
__tsan_acquire(rcuctx);
REQUIRE(QPRCU_VALID(rcuctx));
dns_qpmulti_t *multi = rcuctx->multi;
REQUIRE(QPMULTI_VALID(multi));
dns_qp_t *qp = &multi->writer;
REQUIRE(QP_VALID(qp));
REQUIRE(rcuctx->count == 0);
destroy_guts(qp);
isc_mutex_destroy(&multi->mutex);
isc_mem_putanddetach(&rcuctx->mctx, rcuctx,
STRUCT_FLEX_SIZE(rcuctx, chunk, rcuctx->count));
isc_mem_putanddetach(&qp->mctx, multi, sizeof(*multi));
}
void
dns_qpmulti_destroy(dns_qpmulti_t **qpmp) {
dns_qp_t *qp = NULL;
dns_qpmulti_t *multi = NULL;
qp_rcuctx_t *rcuctx = NULL;
REQUIRE(qpmp != NULL);
REQUIRE(QPMULTI_VALID(*qpmp));
dns_qpmulti_t *multi = *qpmp;
dns_qp_t *qp = &multi->writer;
multi = *qpmp;
qp = &multi->writer;
*qpmp = NULL;
REQUIRE(QP_VALID(qp));
REQUIRE(multi->rollback == NULL);
REQUIRE(ISC_LIST_EMPTY(multi->snapshots));
LOCK(&multi->mutex);
if (ISC_SLINK_LINKED(multi, cleanup)) {
qp->destroy = true;
UNLOCK(&multi->mutex);
} else {
destroy_guts(qp);
UNLOCK(&multi->mutex);
isc_mutex_destroy(&multi->mutex);
isc_mem_putanddetach(&qp->mctx, multi, sizeof(*multi));
}
rcuctx = isc_mem_get(qp->mctx, STRUCT_FLEX_SIZE(rcuctx, chunk, 0));
*rcuctx = (qp_rcuctx_t){
.magic = QPRCU_MAGIC,
.multi = multi,
};
isc_mem_attach(qp->mctx, &rcuctx->mctx);
__tsan_release(rcuctx);
call_rcu(&rcuctx->rcu_head, qpmulti_destroy_cb);
}
/***********************************************************************

View File

@@ -301,11 +301,9 @@ ref_cell(qp_ref_t ref) {
* transactions, but it must become immutable when an update
* transaction is opened.)
*
* When a chunk becomes empty (wrt the latest version of the trie), we
* note the QSBR phase after which no old versions of the trie will
* need the chunk and it will be safe to free(). There are a few flags
* used to mark which chunks are still needed by snapshots after the
* chunks have passed their normal reclamation phase.
* There are a few flags used to mark which chunks are still needed by
* snapshots after the chunks have passed their normal reclamation
* phase.
*/
typedef struct qp_usage {
/*% the allocation point, increases monotonically */
@@ -316,14 +314,14 @@ typedef struct qp_usage {
bool exists : 1;
/*% is this chunk shared? [MT] */
bool immutable : 1;
/*% already subtracted from multi->*_count [MT] */
bool discounted : 1;
/*% is a snapshot using this chunk? [MT] */
bool snapshot : 1;
/*% tried to free it but a snapshot needs it [MT] */
bool snapfree : 1;
/*% for mark/sweep snapshot flag updates [MT] */
bool snapmark : 1;
/*% in which phase did this chunk become unused? [MT] */
isc_qsbr_phase_t phase : ISC_QSBR_PHASE_BITS;
} qp_usage_t;
/*
@@ -335,7 +333,7 @@ typedef struct qp_usage {
* A `dns_qpbase_t` counts references from `dns_qp_t` objects and
* from packed readers, but not from `dns_qpread_t` nor from
* `dns_qpsnap_t` objects. Refcount adjustments for `dns_qpread_t`
* would wreck multicore scalability; instead we rely on QSBR.
* would wreck multicore scalability; instead we rely on RCU.
*
* The `usage` array determines when a chunk is no longer needed: old
* chunk pointers in old `base` arrays are ignored. (They can become
@@ -349,10 +347,25 @@ typedef struct qp_usage {
* busy slots that are in use by readers.
*/
struct dns_qpbase {
unsigned int magic;
isc_refcount_t refcount;
qp_node_t *ptr[];
};
/*
* Chunks that may be in use by readers are reclaimed asynchronously.
* When a transaction commits, immutable chunks that are now empty are
* listed in a `qp_rcuctx_t` structure and passed to `call_rcu()`.
*/
typedef struct qp_rcuctx {
unsigned int magic;
struct rcu_head rcu_head;
isc_mem_t *mctx;
dns_qpmulti_t *multi;
qp_chunk_t count;
qp_chunk_t chunk[];
} qp_rcuctx_t;
/*
* Returns true when the base array can be free()d.
*/
@@ -382,10 +395,14 @@ ref_ptr(dns_qpreadable_t qpr, qp_ref_t ref) {
#define QPITER_MAGIC ISC_MAGIC('q', 'p', 'i', 't')
#define QPMULTI_MAGIC ISC_MAGIC('q', 'p', 'm', 'v')
#define QPREADER_MAGIC ISC_MAGIC('q', 'p', 'r', 'x')
#define QPBASE_MAGIC ISC_MAGIC('q', 'p', 'b', 'p')
#define QPRCU_MAGIC ISC_MAGIC('q', 'p', 'c', 'b')
#define QP_VALID(p) ISC_MAGIC_VALID(p, QP_MAGIC)
#define QPITER_VALID(p) ISC_MAGIC_VALID(p, QPITER_MAGIC)
#define QPMULTI_VALID(p) ISC_MAGIC_VALID(p, QPMULTI_MAGIC)
#define QP_VALID(qp) ISC_MAGIC_VALID(qp, QP_MAGIC)
#define QPITER_VALID(qp) ISC_MAGIC_VALID(qp, QPITER_MAGIC)
#define QPMULTI_VALID(qp) ISC_MAGIC_VALID(qp, QPMULTI_MAGIC)
#define QPBASE_VALID(qp) ISC_MAGIC_VALID(qp, QPBASE_MAGIC)
#define QPRCU_VALID(qp) ISC_MAGIC_VALID(qp, QPRCU_MAGIC)
/*
* Polymorphic initialization of the `dns_qpreader_t` prefix.
@@ -504,8 +521,6 @@ struct dns_qp {
enum { QP_NONE, QP_WRITE, QP_UPDATE } transaction_mode : 2;
/*% compact the entire trie [MT] */
bool compact_all : 1;
/*% destroy the trie asynchronously [MT] */
bool destroy : 1;
/*% optionally when compiled with fuzzing support [MT] */
bool write_protect : 1;
};
@@ -517,9 +532,6 @@ struct dns_qp {
* of the trie. See the "packed reader nodes" section below for a
* description of what it points to.
*
* We need access to the loopmgr to hook into QSBR safe memory reclamation.
* It is constant after initialization.
*
* The main object under the protection of the mutex is the `writer`
* containing all the allocator state. There can be a backup copy when
* we want to be able to rollback an update transaction.
@@ -537,8 +549,6 @@ struct dns_qp {
*/
struct dns_qpmulti {
uint32_t magic;
/*% safe memory reclamation context (const) */
isc_loopmgr_t *loopmgr;
/*% pointer to current packed reader */
atomic_ptr(qp_node_t) reader;
/*% the mutex protects the rest of this structure */
@@ -551,8 +561,6 @@ struct dns_qpmulti {
dns_qp_t *rollback;
/*% all snapshots of this trie */
ISC_LIST(dns_qpsnap_t) snapshots;
/*% safe memory reclamation work list */
ISC_SLINK(dns_qpmulti_t) cleanup;
};
/***********************************************************************
@@ -878,13 +886,15 @@ static inline dns_qpmulti_t *
unpack_reader(dns_qpreader_t *qp, qp_node_t *reader) {
INSIST(reader_valid(reader));
dns_qpmulti_t *multi = node_pointer(&reader[0]);
dns_qpbase_t *base = node_pointer(&reader[1]);
INSIST(QPMULTI_VALID(multi));
INSIST(QPBASE_VALID(base));
*qp = (dns_qpreader_t){
.magic = QP_MAGIC,
.uctx = multi->writer.uctx,
.methods = multi->writer.methods,
.root_ref = node32(&reader[1]),
.base = node_pointer(&reader[1]),
.base = base,
};
return (multi);
}