From 3737ea592bd1e2221119bbabab9324ff9df64584 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Fri, 13 Oct 2023 14:41:22 +0200 Subject: [PATCH] Offload AXFR and IXFR processing Instead of processing received data synchronously, store the incoming differences in the list and process them asynchronously when we need to commit the data into the database and/or journal. --- lib/dns/xfrin.c | 370 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 262 insertions(+), 108 deletions(-) diff --git a/lib/dns/xfrin.c b/lib/dns/xfrin.c index 359e1cc6f9..d389f7dc44 100644 --- a/lib/dns/xfrin.c +++ b/lib/dns/xfrin.c @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -129,6 +130,12 @@ struct dns_xfrin { dns_dbversion_t *ver; dns_diff_t diff; /*%< Pending database changes */ + /* Diff queue */ + bool diff_running; + struct __cds_wfcq_head diff_head; + struct cds_wfcq_tail diff_tail; + isc_result_t result; + _Atomic xfrin_state_t state; uint32_t expireopt; bool edns, expireoptset; @@ -136,7 +143,7 @@ struct dns_xfrin { /* * Following variable were made atomic only for loading the values for - * the statistics channelr, thus all accesses can be **relaxed** because + * the statistics channel, thus all accesses can be **relaxed** because * all store and load operations that affect XFR are done on the same * thread and only the statistics channel thread could perform a load * operation from a different thread and it's ok to not be precise in @@ -210,9 +217,7 @@ axfr_makedb(dns_xfrin_t *xfr, dns_db_t **dbp); static isc_result_t axfr_putdata(dns_xfrin_t *xfr, dns_diffop_t op, dns_name_t *name, dns_ttl_t ttl, dns_rdata_t *rdata); -static isc_result_t -axfr_apply(dns_xfrin_t *xfr); -static isc_result_t +static void axfr_commit(dns_xfrin_t *xfr); static isc_result_t axfr_finalize(dns_xfrin_t *xfr); @@ -220,8 +225,6 @@ axfr_finalize(dns_xfrin_t *xfr); static isc_result_t ixfr_init(dns_xfrin_t *xfr); static isc_result_t -ixfr_apply(dns_xfrin_t *xfr); -static isc_result_t ixfr_putdata(dns_xfrin_t *xfr, dns_diffop_t op, dns_name_t *name, dns_ttl_t ttl, dns_rdata_t *rdata); static isc_result_t @@ -242,6 +245,9 @@ xfrin_send_done(isc_result_t eresult, isc_region_t *region, void *arg); static void xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg); +static void +xfrin_end(dns_xfrin_t *xfr, isc_result_t result); + static void xfrin_destroy(dns_xfrin_t *xfr); @@ -320,13 +326,18 @@ failure: /* * Store a set of AXFR RRs in the database. */ -static isc_result_t -axfr_apply(dns_xfrin_t *xfr) { - isc_result_t result; +static void +axfr_apply(void *arg) { + dns_xfrin_t *xfr = arg; + isc_result_t result = ISC_R_SUCCESS; uint64_t records; + if (atomic_load(&xfr->shuttingdown)) { + result = ISC_R_SHUTTINGDOWN; + goto failure; + } + CHECK(dns_diff_load(&xfr->diff, xfr->axfr.add, xfr->axfr.add_private)); - dns_diff_clear(&xfr->diff); if (xfr->maxrecords != 0U) { result = dns_db_getsize(xfr->db, xfr->ver, &records, NULL); if (result == ISC_R_SUCCESS && records > xfr->maxrecords) { @@ -334,22 +345,50 @@ axfr_apply(dns_xfrin_t *xfr) { goto failure; } } - result = ISC_R_SUCCESS; + failure: - return (result); + dns_diff_clear(&xfr->diff); + xfr->result = result; } -static isc_result_t -axfr_commit(dns_xfrin_t *xfr) { - isc_result_t result; +static void +axfr_apply_done(void *arg) { + dns_xfrin_t *xfr = arg; + isc_result_t result = xfr->result; - CHECK(axfr_apply(xfr)); - CHECK(dns_db_endload(xfr->db, &xfr->axfr)); - CHECK(dns_zone_verifydb(xfr->zone, xfr->db, NULL)); + if (atomic_load(&xfr->shuttingdown)) { + result = ISC_R_SHUTTINGDOWN; + } + + if (result == ISC_R_SUCCESS) { + CHECK(dns_db_endload(xfr->db, &xfr->axfr)); + CHECK(dns_zone_verifydb(xfr->zone, xfr->db, NULL)); + CHECK(axfr_finalize(xfr)); + } else { + (void)dns_db_endload(xfr->db, &xfr->axfr); + } - result = ISC_R_SUCCESS; failure: - return (result); + xfr->diff_running = false; + + if (result == ISC_R_SUCCESS) { + if (atomic_load(&xfr->state) == XFRST_AXFR_END) { + xfrin_end(xfr, result); + } + } else { + xfrin_fail(xfr, result, "failed while processing responses"); + } + + dns_xfrin_detach(&xfr); +} + +static void +axfr_commit(dns_xfrin_t *xfr) { + INSIST(!xfr->diff_running); + xfr->diff_running = true; + dns_xfrin_ref(xfr); + isc_work_enqueue(dns_zone_getloop(xfr->zone), axfr_apply, + axfr_apply_done, xfr); } static isc_result_t @@ -368,6 +407,12 @@ axfr_finalize(dns_xfrin_t *xfr) { * IXFR handling */ +typedef struct ixfr_apply_data { + dns_diff_t diff; /*%< Pending database changes */ + isc_result_t result; + struct cds_wfcq_node wfcq_node; +} ixfr_apply_data_t; + static isc_result_t ixfr_init(dns_xfrin_t *xfr) { isc_result_t result; @@ -414,21 +459,38 @@ failure: return (result); } -/* - * Apply a set of IXFR changes to the database. - */ static isc_result_t -ixfr_apply(dns_xfrin_t *xfr) { - isc_result_t result; +ixfr_begin_transaction(dns_xfrin_t *xfr) { + isc_result_t result = ISC_R_SUCCESS; + + if (xfr->ixfr.journal != NULL) { + CHECK(dns_journal_begin_transaction(xfr->ixfr.journal)); + } +failure: + return (result); +} + +static isc_result_t +ixfr_end_transaction(dns_xfrin_t *xfr) { + isc_result_t result = ISC_R_SUCCESS; + + CHECK(dns_zone_verifydb(xfr->zone, xfr->db, xfr->ver)); + /* XXX enter ready-to-commit state here */ + if (xfr->ixfr.journal != NULL) { + CHECK(dns_journal_commit(xfr->ixfr.journal)); + } +failure: + return (result); +} + +static isc_result_t +ixfr_apply_one(dns_xfrin_t *xfr, ixfr_apply_data_t *data) { + isc_result_t result = ISC_R_SUCCESS; uint64_t records; - if (xfr->ver == NULL) { - CHECK(dns_db_newversion(xfr->db, &xfr->ver)); - if (xfr->ixfr.journal != NULL) { - CHECK(dns_journal_begin_transaction(xfr->ixfr.journal)); - } - } - CHECK(dns_diff_apply(&xfr->diff, xfr->db, xfr->ver)); + CHECK(ixfr_begin_transaction(xfr)); + + CHECK(dns_diff_apply(&data->diff, xfr->db, xfr->ver)); if (xfr->maxrecords != 0U) { result = dns_db_getsize(xfr->db, xfr->ver, &records, NULL); if (result == ISC_R_SUCCESS && records > xfr->maxrecords) { @@ -437,29 +499,126 @@ ixfr_apply(dns_xfrin_t *xfr) { } } if (xfr->ixfr.journal != NULL) { - CHECK(dns_journal_writediff(xfr->ixfr.journal, &xfr->diff)); + CHECK(dns_journal_writediff(xfr->ixfr.journal, &data->diff)); } - dns_diff_clear(&xfr->diff); - result = ISC_R_SUCCESS; + + result = ixfr_end_transaction(xfr); + + return (result); failure: + /* We need to end the transaction, but keep the previous error */ + (void)ixfr_end_transaction(xfr); + return (result); } -static isc_result_t -ixfr_commit(dns_xfrin_t *xfr) { - isc_result_t result; +static void +ixfr_apply(void *arg) { + dns_xfrin_t *xfr = arg; + isc_result_t result = ISC_R_SUCCESS; - CHECK(ixfr_apply(xfr)); - if (xfr->ver != NULL) { - CHECK(dns_zone_verifydb(xfr->zone, xfr->db, xfr->ver)); - /* XXX enter ready-to-commit state here */ - if (xfr->ixfr.journal != NULL) { - CHECK(dns_journal_commit(xfr->ixfr.journal)); + struct __cds_wfcq_head diff_head; + struct cds_wfcq_tail diff_tail; + + /* Initialize local wfcqueue */ + __cds_wfcq_init(&diff_head, &diff_tail); + + enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( + &diff_head, &diff_tail, &xfr->diff_head, &xfr->diff_tail); + INSIST(ret == CDS_WFCQ_RET_DEST_EMPTY); + + struct cds_wfcq_node *node, *next; + __cds_wfcq_for_each_blocking_safe(&diff_head, &diff_tail, node, next) { + ixfr_apply_data_t *data = + caa_container_of(node, ixfr_apply_data_t, wfcq_node); + + if (atomic_load(&xfr->shuttingdown)) { + result = ISC_R_SHUTTINGDOWN; } + + /* Apply only until first failure */ + if (result == ISC_R_SUCCESS) { + /* This also checks for shuttingdown condition */ + result = ixfr_apply_one(xfr, data); + } + + /* We need to clear and free all data chunks */ + dns_diff_clear(&data->diff); + isc_mem_put(xfr->mctx, data, sizeof(*data)); + } + + /* FIXME: This might need a barrier or smth */ + xfr->result = result; +} + +static void +ixfr_apply_done(void *arg) { + dns_xfrin_t *xfr = arg; + isc_result_t result = xfr->result; + + if (atomic_load(&xfr->shuttingdown)) { + result = ISC_R_SHUTTINGDOWN; + } + + if (result != ISC_R_SUCCESS) { + goto failure; + } + + /* Reschedule */ + if (!cds_wfcq_empty(&xfr->diff_head, &xfr->diff_tail)) { + isc_work_enqueue(dns_zone_getloop(xfr->zone), ixfr_apply, + ixfr_apply_done, xfr); + return; + } + +failure: + xfr->diff_running = false; + + if (result == ISC_R_SUCCESS) { dns_db_closeversion(xfr->db, &xfr->ver, true); dns_zone_markdirty(xfr->zone); + + if (atomic_load(&xfr->state) == XFRST_IXFR_END) { + xfrin_end(xfr, result); + } + } else { + dns_db_closeversion(xfr->db, &xfr->ver, false); + + xfrin_fail(xfr, result, "failed while processing responses"); } - result = ISC_R_SUCCESS; + + dns_xfrin_detach(&xfr); +} + +/* + * Apply a set of IXFR changes to the database. + */ +static isc_result_t +ixfr_commit(dns_xfrin_t *xfr) { + isc_result_t result = ISC_R_SUCCESS; + ixfr_apply_data_t *data = isc_mem_get(xfr->mctx, sizeof(*data)); + + *data = (ixfr_apply_data_t){ 0 }; + cds_wfcq_node_init(&data->wfcq_node); + + if (xfr->ver == NULL) { + CHECK(dns_db_newversion(xfr->db, &xfr->ver)); + } + + dns_diff_init(xfr->mctx, &data->diff); + /* FIXME: Should we add dns_diff_move() */ + ISC_LIST_MOVE(data->diff.tuples, xfr->diff.tuples); + + (void)cds_wfcq_enqueue(&xfr->diff_head, &xfr->diff_tail, + &data->wfcq_node); + + if (!xfr->diff_running) { + dns_xfrin_ref(xfr); + xfr->diff_running = true; + isc_work_enqueue(dns_zone_getloop(xfr->zone), ixfr_apply, + ixfr_apply_done, xfr); + } + failure: return (result); } @@ -672,7 +831,7 @@ redo: result = DNS_R_FORMERR; goto failure; } - CHECK(axfr_commit(xfr)); + axfr_commit(xfr); atomic_store(&xfr->state, XFRST_AXFR_END); break; } @@ -764,13 +923,9 @@ xfrin_idledout(void *xfr) { isc_time_t dns_xfrin_getstarttime(dns_xfrin_t *xfr) { - isc_time_t start; - REQUIRE(VALID_XFRIN(xfr)); return (atomic_load_relaxed(&xfr->start)); - - return (start); } void @@ -895,8 +1050,12 @@ ISC_REFCOUNT_IMPL(dns_xfrin, xfrin_destroy); static void xfrin_cancelio(dns_xfrin_t *xfr) { - dns_dispatch_done(&xfr->dispentry); - dns_dispatch_detach(&xfr->disp); + if (xfr->dispentry != NULL) { + dns_dispatch_done(&xfr->dispentry); + } + if (xfr->disp != NULL) { + dns_dispatch_detach(&xfr->disp); + } } static void @@ -946,21 +1105,10 @@ xfrin_fail(dns_xfrin_t *xfr, isc_result_t result, const char *msg) { result = DNS_R_BADIXFR; } } + xfrin_cancelio(xfr); - /* - * Close the journal. - */ - if (xfr->ixfr.journal != NULL) { - dns_journal_destroy(&xfr->ixfr.journal); - } - if (xfr->done != NULL) { - (xfr->done)(xfr->zone, - xfr->expireoptset ? &xfr->expireopt : NULL, - result); - xfr->done = NULL; - } - xfr->shutdown_result = result; + xfrin_end(xfr, result); } dns_xfrin_detach(&xfr); @@ -995,6 +1143,8 @@ xfrin_create(isc_mem_t *mctx, dns_zone_t *zone, dns_db_t *db, dns_view_weakattach(dns_zone_getview(zone), &xfr->view); dns_name_init(&xfr->name, NULL); + __cds_wfcq_init(&xfr->diff_head, &xfr->diff_tail); + atomic_init(&xfr->shuttingdown, false); atomic_init(&xfr->is_ixfr, false); @@ -1128,12 +1278,7 @@ xfrin_start(dns_xfrin_t *xfr) { return (ISC_R_SUCCESS); failure: - if (xfr->dispentry != NULL) { - dns_dispatch_done(&xfr->dispentry); - } - if (xfr->disp != NULL) { - dns_dispatch_detach(&xfr->disp); - } + xfrin_cancelio(xfr); dns_xfrin_detach(&xfr); return (result); @@ -1501,6 +1646,31 @@ get_edns_expire(dns_xfrin_t *xfr, dns_message_t *msg) { } } +static void +xfrin_end(dns_xfrin_t *xfr, isc_result_t result) { + /* Close the journal. */ + if (xfr->ixfr.journal != NULL) { + LIBDNS_XFRIN_JOURNAL_DESTROY_BEGIN(xfr, xfr->info, result); + dns_journal_destroy(&xfr->ixfr.journal); + LIBDNS_XFRIN_JOURNAL_DESTROY_END(xfr, xfr->info, result); + } + + /* Inform the caller. */ + if (xfr->done != NULL) { + LIBDNS_XFRIN_DONE_CALLBACK_BEGIN(xfr, xfr->info, result); + (xfr->done)(xfr->zone, + xfr->expireoptset ? &xfr->expireopt : NULL, result); + xfr->done = NULL; + LIBDNS_XFRIN_DONE_CALLBACK_END(xfr, xfr->info, result); + } + + atomic_store(&xfr->shuttingdown, true); + isc_timer_stop(xfr->max_time_timer); + if (xfr->shutdown_result == ISC_R_UNSET) { + xfr->shutdown_result = result; + } +} + static void xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) { dns_xfrin_t *xfr = (dns_xfrin_t *)arg; @@ -1715,9 +1885,10 @@ xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) { } } } - if (result != ISC_R_NOMORE) { - goto failure; + if (result == ISC_R_NOMORE) { + result = ISC_R_SUCCESS; } + CHECK(result); if (dns_message_gettsig(msg, &tsigowner) != NULL) { /* @@ -1772,36 +1943,11 @@ xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) { CHECK(xfrin_send_request(xfr)); break; case XFRST_AXFR_END: - CHECK(axfr_finalize(xfr)); - FALLTHROUGH; case XFRST_IXFR_END: - /* - * Close the journal. - */ - if (xfr->ixfr.journal != NULL) { - LIBDNS_XFRIN_JOURNAL_DESTROY_BEGIN(xfr, xfr->info, - result); - dns_journal_destroy(&xfr->ixfr.journal); - LIBDNS_XFRIN_JOURNAL_DESTROY_END(xfr, xfr->info, - result); - } - - /* - * Inform the caller we succeeded. - */ - if (xfr->done != NULL) { - LIBDNS_XFRIN_DONE_CALLBACK_BEGIN(xfr, xfr->info, - result); - (xfr->done)(xfr->zone, - xfr->expireoptset ? &xfr->expireopt : NULL, - ISC_R_SUCCESS); - xfr->done = NULL; - LIBDNS_XFRIN_DONE_CALLBACK_END(xfr, xfr->info, result); - } - - atomic_store(&xfr->shuttingdown, true); + /* We are at the end, cancel the timers and IO */ + isc_timer_stop(xfr->max_idle_timer); isc_timer_stop(xfr->max_time_timer); - xfr->shutdown_result = ISC_R_SUCCESS; + xfrin_cancelio(xfr); break; default: /* @@ -1821,6 +1967,7 @@ xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) { failure: if (result != ISC_R_SUCCESS) { + xfr->result = result; xfrin_fail(xfr, result, "failed while receiving responses"); } @@ -1872,13 +2019,22 @@ xfrin_destroy(dns_xfrin_t *xfr) { (unsigned int)(msecs / 1000), (unsigned int)(msecs % 1000), (unsigned int)persec, atomic_load_relaxed(&xfr->end_serial)); - if (xfr->dispentry != NULL) { - dns_dispatch_done(&xfr->dispentry); - } - if (xfr->disp != NULL) { - dns_dispatch_detach(&xfr->disp); + /* Cleanup unprocessed IXFR data */ + struct cds_wfcq_node *node, *next; + __cds_wfcq_for_each_blocking_safe(&xfr->diff_head, &xfr->diff_tail, + node, next) { + ixfr_apply_data_t *data = + caa_container_of(node, ixfr_apply_data_t, wfcq_node); + /* We need to clear and free all data chunks */ + dns_diff_clear(&data->diff); + isc_mem_put(xfr->mctx, data, sizeof(*data)); } + /* Cleanup unprocessed AXFR data */ + dns_diff_clear(&xfr->diff); + + xfrin_cancelio(xfr); + if (xfr->transport != NULL) { dns_transport_detach(&xfr->transport); } @@ -1891,8 +2047,6 @@ xfrin_destroy(dns_xfrin_t *xfr) { isc_buffer_free(&xfr->lasttsig); } - dns_diff_clear(&xfr->diff); - if (xfr->ixfr.journal != NULL) { dns_journal_destroy(&xfr->ixfr.journal); }