Offload AXFR and IXFR processing

Instead of processing received data synchronously, store the incoming
differences in the list and process them asynchronously when we need to
commit the data into the database and/or journal.
This commit is contained in:
Ondřej Surý
2023-10-13 14:41:22 +02:00
parent e5c79261c0
commit 3737ea592b

View File

@@ -22,6 +22,7 @@
#include <isc/result.h>
#include <isc/string.h>
#include <isc/util.h>
#include <isc/work.h>
#include <dns/callbacks.h>
#include <dns/catz.h>
@@ -129,6 +130,12 @@ struct dns_xfrin {
dns_dbversion_t *ver;
dns_diff_t diff; /*%< Pending database changes */
/* Diff queue */
bool diff_running;
struct __cds_wfcq_head diff_head;
struct cds_wfcq_tail diff_tail;
isc_result_t result;
_Atomic xfrin_state_t state;
uint32_t expireopt;
bool edns, expireoptset;
@@ -136,7 +143,7 @@ struct dns_xfrin {
/*
* Following variable were made atomic only for loading the values for
* the statistics channelr, thus all accesses can be **relaxed** because
* the statistics channel, thus all accesses can be **relaxed** because
* all store and load operations that affect XFR are done on the same
* thread and only the statistics channel thread could perform a load
* operation from a different thread and it's ok to not be precise in
@@ -210,9 +217,7 @@ axfr_makedb(dns_xfrin_t *xfr, dns_db_t **dbp);
static isc_result_t
axfr_putdata(dns_xfrin_t *xfr, dns_diffop_t op, dns_name_t *name, dns_ttl_t ttl,
dns_rdata_t *rdata);
static isc_result_t
axfr_apply(dns_xfrin_t *xfr);
static isc_result_t
static void
axfr_commit(dns_xfrin_t *xfr);
static isc_result_t
axfr_finalize(dns_xfrin_t *xfr);
@@ -220,8 +225,6 @@ axfr_finalize(dns_xfrin_t *xfr);
static isc_result_t
ixfr_init(dns_xfrin_t *xfr);
static isc_result_t
ixfr_apply(dns_xfrin_t *xfr);
static isc_result_t
ixfr_putdata(dns_xfrin_t *xfr, dns_diffop_t op, dns_name_t *name, dns_ttl_t ttl,
dns_rdata_t *rdata);
static isc_result_t
@@ -242,6 +245,9 @@ xfrin_send_done(isc_result_t eresult, isc_region_t *region, void *arg);
static void
xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg);
static void
xfrin_end(dns_xfrin_t *xfr, isc_result_t result);
static void
xfrin_destroy(dns_xfrin_t *xfr);
@@ -320,13 +326,18 @@ failure:
/*
* Store a set of AXFR RRs in the database.
*/
static isc_result_t
axfr_apply(dns_xfrin_t *xfr) {
isc_result_t result;
static void
axfr_apply(void *arg) {
dns_xfrin_t *xfr = arg;
isc_result_t result = ISC_R_SUCCESS;
uint64_t records;
if (atomic_load(&xfr->shuttingdown)) {
result = ISC_R_SHUTTINGDOWN;
goto failure;
}
CHECK(dns_diff_load(&xfr->diff, xfr->axfr.add, xfr->axfr.add_private));
dns_diff_clear(&xfr->diff);
if (xfr->maxrecords != 0U) {
result = dns_db_getsize(xfr->db, xfr->ver, &records, NULL);
if (result == ISC_R_SUCCESS && records > xfr->maxrecords) {
@@ -334,22 +345,50 @@ axfr_apply(dns_xfrin_t *xfr) {
goto failure;
}
}
result = ISC_R_SUCCESS;
failure:
return (result);
dns_diff_clear(&xfr->diff);
xfr->result = result;
}
static isc_result_t
axfr_commit(dns_xfrin_t *xfr) {
isc_result_t result;
static void
axfr_apply_done(void *arg) {
dns_xfrin_t *xfr = arg;
isc_result_t result = xfr->result;
CHECK(axfr_apply(xfr));
CHECK(dns_db_endload(xfr->db, &xfr->axfr));
CHECK(dns_zone_verifydb(xfr->zone, xfr->db, NULL));
if (atomic_load(&xfr->shuttingdown)) {
result = ISC_R_SHUTTINGDOWN;
}
if (result == ISC_R_SUCCESS) {
CHECK(dns_db_endload(xfr->db, &xfr->axfr));
CHECK(dns_zone_verifydb(xfr->zone, xfr->db, NULL));
CHECK(axfr_finalize(xfr));
} else {
(void)dns_db_endload(xfr->db, &xfr->axfr);
}
result = ISC_R_SUCCESS;
failure:
return (result);
xfr->diff_running = false;
if (result == ISC_R_SUCCESS) {
if (atomic_load(&xfr->state) == XFRST_AXFR_END) {
xfrin_end(xfr, result);
}
} else {
xfrin_fail(xfr, result, "failed while processing responses");
}
dns_xfrin_detach(&xfr);
}
static void
axfr_commit(dns_xfrin_t *xfr) {
INSIST(!xfr->diff_running);
xfr->diff_running = true;
dns_xfrin_ref(xfr);
isc_work_enqueue(dns_zone_getloop(xfr->zone), axfr_apply,
axfr_apply_done, xfr);
}
static isc_result_t
@@ -368,6 +407,12 @@ axfr_finalize(dns_xfrin_t *xfr) {
* IXFR handling
*/
typedef struct ixfr_apply_data {
dns_diff_t diff; /*%< Pending database changes */
isc_result_t result;
struct cds_wfcq_node wfcq_node;
} ixfr_apply_data_t;
static isc_result_t
ixfr_init(dns_xfrin_t *xfr) {
isc_result_t result;
@@ -414,21 +459,38 @@ failure:
return (result);
}
/*
* Apply a set of IXFR changes to the database.
*/
static isc_result_t
ixfr_apply(dns_xfrin_t *xfr) {
isc_result_t result;
ixfr_begin_transaction(dns_xfrin_t *xfr) {
isc_result_t result = ISC_R_SUCCESS;
if (xfr->ixfr.journal != NULL) {
CHECK(dns_journal_begin_transaction(xfr->ixfr.journal));
}
failure:
return (result);
}
static isc_result_t
ixfr_end_transaction(dns_xfrin_t *xfr) {
isc_result_t result = ISC_R_SUCCESS;
CHECK(dns_zone_verifydb(xfr->zone, xfr->db, xfr->ver));
/* XXX enter ready-to-commit state here */
if (xfr->ixfr.journal != NULL) {
CHECK(dns_journal_commit(xfr->ixfr.journal));
}
failure:
return (result);
}
static isc_result_t
ixfr_apply_one(dns_xfrin_t *xfr, ixfr_apply_data_t *data) {
isc_result_t result = ISC_R_SUCCESS;
uint64_t records;
if (xfr->ver == NULL) {
CHECK(dns_db_newversion(xfr->db, &xfr->ver));
if (xfr->ixfr.journal != NULL) {
CHECK(dns_journal_begin_transaction(xfr->ixfr.journal));
}
}
CHECK(dns_diff_apply(&xfr->diff, xfr->db, xfr->ver));
CHECK(ixfr_begin_transaction(xfr));
CHECK(dns_diff_apply(&data->diff, xfr->db, xfr->ver));
if (xfr->maxrecords != 0U) {
result = dns_db_getsize(xfr->db, xfr->ver, &records, NULL);
if (result == ISC_R_SUCCESS && records > xfr->maxrecords) {
@@ -437,29 +499,126 @@ ixfr_apply(dns_xfrin_t *xfr) {
}
}
if (xfr->ixfr.journal != NULL) {
CHECK(dns_journal_writediff(xfr->ixfr.journal, &xfr->diff));
CHECK(dns_journal_writediff(xfr->ixfr.journal, &data->diff));
}
dns_diff_clear(&xfr->diff);
result = ISC_R_SUCCESS;
result = ixfr_end_transaction(xfr);
return (result);
failure:
/* We need to end the transaction, but keep the previous error */
(void)ixfr_end_transaction(xfr);
return (result);
}
static isc_result_t
ixfr_commit(dns_xfrin_t *xfr) {
isc_result_t result;
static void
ixfr_apply(void *arg) {
dns_xfrin_t *xfr = arg;
isc_result_t result = ISC_R_SUCCESS;
CHECK(ixfr_apply(xfr));
if (xfr->ver != NULL) {
CHECK(dns_zone_verifydb(xfr->zone, xfr->db, xfr->ver));
/* XXX enter ready-to-commit state here */
if (xfr->ixfr.journal != NULL) {
CHECK(dns_journal_commit(xfr->ixfr.journal));
struct __cds_wfcq_head diff_head;
struct cds_wfcq_tail diff_tail;
/* Initialize local wfcqueue */
__cds_wfcq_init(&diff_head, &diff_tail);
enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
&diff_head, &diff_tail, &xfr->diff_head, &xfr->diff_tail);
INSIST(ret == CDS_WFCQ_RET_DEST_EMPTY);
struct cds_wfcq_node *node, *next;
__cds_wfcq_for_each_blocking_safe(&diff_head, &diff_tail, node, next) {
ixfr_apply_data_t *data =
caa_container_of(node, ixfr_apply_data_t, wfcq_node);
if (atomic_load(&xfr->shuttingdown)) {
result = ISC_R_SHUTTINGDOWN;
}
/* Apply only until first failure */
if (result == ISC_R_SUCCESS) {
/* This also checks for shuttingdown condition */
result = ixfr_apply_one(xfr, data);
}
/* We need to clear and free all data chunks */
dns_diff_clear(&data->diff);
isc_mem_put(xfr->mctx, data, sizeof(*data));
}
/* FIXME: This might need a barrier or smth */
xfr->result = result;
}
static void
ixfr_apply_done(void *arg) {
dns_xfrin_t *xfr = arg;
isc_result_t result = xfr->result;
if (atomic_load(&xfr->shuttingdown)) {
result = ISC_R_SHUTTINGDOWN;
}
if (result != ISC_R_SUCCESS) {
goto failure;
}
/* Reschedule */
if (!cds_wfcq_empty(&xfr->diff_head, &xfr->diff_tail)) {
isc_work_enqueue(dns_zone_getloop(xfr->zone), ixfr_apply,
ixfr_apply_done, xfr);
return;
}
failure:
xfr->diff_running = false;
if (result == ISC_R_SUCCESS) {
dns_db_closeversion(xfr->db, &xfr->ver, true);
dns_zone_markdirty(xfr->zone);
if (atomic_load(&xfr->state) == XFRST_IXFR_END) {
xfrin_end(xfr, result);
}
} else {
dns_db_closeversion(xfr->db, &xfr->ver, false);
xfrin_fail(xfr, result, "failed while processing responses");
}
result = ISC_R_SUCCESS;
dns_xfrin_detach(&xfr);
}
/*
* Apply a set of IXFR changes to the database.
*/
static isc_result_t
ixfr_commit(dns_xfrin_t *xfr) {
isc_result_t result = ISC_R_SUCCESS;
ixfr_apply_data_t *data = isc_mem_get(xfr->mctx, sizeof(*data));
*data = (ixfr_apply_data_t){ 0 };
cds_wfcq_node_init(&data->wfcq_node);
if (xfr->ver == NULL) {
CHECK(dns_db_newversion(xfr->db, &xfr->ver));
}
dns_diff_init(xfr->mctx, &data->diff);
/* FIXME: Should we add dns_diff_move() */
ISC_LIST_MOVE(data->diff.tuples, xfr->diff.tuples);
(void)cds_wfcq_enqueue(&xfr->diff_head, &xfr->diff_tail,
&data->wfcq_node);
if (!xfr->diff_running) {
dns_xfrin_ref(xfr);
xfr->diff_running = true;
isc_work_enqueue(dns_zone_getloop(xfr->zone), ixfr_apply,
ixfr_apply_done, xfr);
}
failure:
return (result);
}
@@ -672,7 +831,7 @@ redo:
result = DNS_R_FORMERR;
goto failure;
}
CHECK(axfr_commit(xfr));
axfr_commit(xfr);
atomic_store(&xfr->state, XFRST_AXFR_END);
break;
}
@@ -764,13 +923,9 @@ xfrin_idledout(void *xfr) {
isc_time_t
dns_xfrin_getstarttime(dns_xfrin_t *xfr) {
isc_time_t start;
REQUIRE(VALID_XFRIN(xfr));
return (atomic_load_relaxed(&xfr->start));
return (start);
}
void
@@ -895,8 +1050,12 @@ ISC_REFCOUNT_IMPL(dns_xfrin, xfrin_destroy);
static void
xfrin_cancelio(dns_xfrin_t *xfr) {
dns_dispatch_done(&xfr->dispentry);
dns_dispatch_detach(&xfr->disp);
if (xfr->dispentry != NULL) {
dns_dispatch_done(&xfr->dispentry);
}
if (xfr->disp != NULL) {
dns_dispatch_detach(&xfr->disp);
}
}
static void
@@ -946,21 +1105,10 @@ xfrin_fail(dns_xfrin_t *xfr, isc_result_t result, const char *msg) {
result = DNS_R_BADIXFR;
}
}
xfrin_cancelio(xfr);
/*
* Close the journal.
*/
if (xfr->ixfr.journal != NULL) {
dns_journal_destroy(&xfr->ixfr.journal);
}
if (xfr->done != NULL) {
(xfr->done)(xfr->zone,
xfr->expireoptset ? &xfr->expireopt : NULL,
result);
xfr->done = NULL;
}
xfr->shutdown_result = result;
xfrin_end(xfr, result);
}
dns_xfrin_detach(&xfr);
@@ -995,6 +1143,8 @@ xfrin_create(isc_mem_t *mctx, dns_zone_t *zone, dns_db_t *db,
dns_view_weakattach(dns_zone_getview(zone), &xfr->view);
dns_name_init(&xfr->name, NULL);
__cds_wfcq_init(&xfr->diff_head, &xfr->diff_tail);
atomic_init(&xfr->shuttingdown, false);
atomic_init(&xfr->is_ixfr, false);
@@ -1128,12 +1278,7 @@ xfrin_start(dns_xfrin_t *xfr) {
return (ISC_R_SUCCESS);
failure:
if (xfr->dispentry != NULL) {
dns_dispatch_done(&xfr->dispentry);
}
if (xfr->disp != NULL) {
dns_dispatch_detach(&xfr->disp);
}
xfrin_cancelio(xfr);
dns_xfrin_detach(&xfr);
return (result);
@@ -1501,6 +1646,31 @@ get_edns_expire(dns_xfrin_t *xfr, dns_message_t *msg) {
}
}
static void
xfrin_end(dns_xfrin_t *xfr, isc_result_t result) {
/* Close the journal. */
if (xfr->ixfr.journal != NULL) {
LIBDNS_XFRIN_JOURNAL_DESTROY_BEGIN(xfr, xfr->info, result);
dns_journal_destroy(&xfr->ixfr.journal);
LIBDNS_XFRIN_JOURNAL_DESTROY_END(xfr, xfr->info, result);
}
/* Inform the caller. */
if (xfr->done != NULL) {
LIBDNS_XFRIN_DONE_CALLBACK_BEGIN(xfr, xfr->info, result);
(xfr->done)(xfr->zone,
xfr->expireoptset ? &xfr->expireopt : NULL, result);
xfr->done = NULL;
LIBDNS_XFRIN_DONE_CALLBACK_END(xfr, xfr->info, result);
}
atomic_store(&xfr->shuttingdown, true);
isc_timer_stop(xfr->max_time_timer);
if (xfr->shutdown_result == ISC_R_UNSET) {
xfr->shutdown_result = result;
}
}
static void
xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) {
dns_xfrin_t *xfr = (dns_xfrin_t *)arg;
@@ -1715,9 +1885,10 @@ xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) {
}
}
}
if (result != ISC_R_NOMORE) {
goto failure;
if (result == ISC_R_NOMORE) {
result = ISC_R_SUCCESS;
}
CHECK(result);
if (dns_message_gettsig(msg, &tsigowner) != NULL) {
/*
@@ -1772,36 +1943,11 @@ xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) {
CHECK(xfrin_send_request(xfr));
break;
case XFRST_AXFR_END:
CHECK(axfr_finalize(xfr));
FALLTHROUGH;
case XFRST_IXFR_END:
/*
* Close the journal.
*/
if (xfr->ixfr.journal != NULL) {
LIBDNS_XFRIN_JOURNAL_DESTROY_BEGIN(xfr, xfr->info,
result);
dns_journal_destroy(&xfr->ixfr.journal);
LIBDNS_XFRIN_JOURNAL_DESTROY_END(xfr, xfr->info,
result);
}
/*
* Inform the caller we succeeded.
*/
if (xfr->done != NULL) {
LIBDNS_XFRIN_DONE_CALLBACK_BEGIN(xfr, xfr->info,
result);
(xfr->done)(xfr->zone,
xfr->expireoptset ? &xfr->expireopt : NULL,
ISC_R_SUCCESS);
xfr->done = NULL;
LIBDNS_XFRIN_DONE_CALLBACK_END(xfr, xfr->info, result);
}
atomic_store(&xfr->shuttingdown, true);
/* We are at the end, cancel the timers and IO */
isc_timer_stop(xfr->max_idle_timer);
isc_timer_stop(xfr->max_time_timer);
xfr->shutdown_result = ISC_R_SUCCESS;
xfrin_cancelio(xfr);
break;
default:
/*
@@ -1821,6 +1967,7 @@ xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) {
failure:
if (result != ISC_R_SUCCESS) {
xfr->result = result;
xfrin_fail(xfr, result, "failed while receiving responses");
}
@@ -1872,13 +2019,22 @@ xfrin_destroy(dns_xfrin_t *xfr) {
(unsigned int)(msecs / 1000), (unsigned int)(msecs % 1000),
(unsigned int)persec, atomic_load_relaxed(&xfr->end_serial));
if (xfr->dispentry != NULL) {
dns_dispatch_done(&xfr->dispentry);
}
if (xfr->disp != NULL) {
dns_dispatch_detach(&xfr->disp);
/* Cleanup unprocessed IXFR data */
struct cds_wfcq_node *node, *next;
__cds_wfcq_for_each_blocking_safe(&xfr->diff_head, &xfr->diff_tail,
node, next) {
ixfr_apply_data_t *data =
caa_container_of(node, ixfr_apply_data_t, wfcq_node);
/* We need to clear and free all data chunks */
dns_diff_clear(&data->diff);
isc_mem_put(xfr->mctx, data, sizeof(*data));
}
/* Cleanup unprocessed AXFR data */
dns_diff_clear(&xfr->diff);
xfrin_cancelio(xfr);
if (xfr->transport != NULL) {
dns_transport_detach(&xfr->transport);
}
@@ -1891,8 +2047,6 @@ xfrin_destroy(dns_xfrin_t *xfr) {
isc_buffer_free(&xfr->lasttsig);
}
dns_diff_clear(&xfr->diff);
if (xfr->ixfr.journal != NULL) {
dns_journal_destroy(&xfr->ixfr.journal);
}