From f395cd4b3e2ac770e6b9cc087d19733e71afd361 Mon Sep 17 00:00:00 2001 From: Artem Boldariev Date: Mon, 20 Jun 2022 20:30:12 +0300 Subject: [PATCH] Add isc_nm_streamdnssocket (aka Stream DNS) This commit adds an initial implementation of isc_nm_streamdnssocket transport: a unified transport for DNS over stream protocols messages, which is capable of replacing both TCP DNS and TLS DNS transports. Currently, the interface it provides is a unified set of interfaces provided by both of the transports it attempts to replace. The transport is built around "isc_dnsbuffer_t" and "isc_dnsstream_assembler_t" objects and attempts to minimise both the number of memory allocations during network transfers as well as memory usage. --- lib/isc/Makefile.am | 1 + lib/isc/include/isc/netmgr.h | 12 + lib/isc/include/isc/types.h | 4 +- lib/isc/netmgr/netmgr-int.h | 100 +++ lib/isc/netmgr/netmgr.c | 76 ++- lib/isc/netmgr/streamdns.c | 1197 ++++++++++++++++++++++++++++++++++ 6 files changed, 1388 insertions(+), 2 deletions(-) create mode 100644 lib/isc/netmgr/streamdns.c diff --git a/lib/isc/Makefile.am b/lib/isc/Makefile.am index 50d94e40d8..fb09d10145 100644 --- a/lib/isc/Makefile.am +++ b/lib/isc/Makefile.am @@ -115,6 +115,7 @@ libisc_la_SOURCES = \ netmgr/netmgr-int.h \ netmgr/netmgr.c \ netmgr/socket.c \ + netmgr/streamdns.c \ netmgr/tcp.c \ netmgr/tcpdns.c \ netmgr/timer.c \ diff --git a/lib/isc/include/isc/netmgr.h b/lib/isc/include/isc/netmgr.h index c9ce9907fe..729012ac63 100644 --- a/lib/isc/include/isc/netmgr.h +++ b/lib/isc/include/isc/netmgr.h @@ -410,6 +410,13 @@ isc_nm_listentlsdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, * Same as isc_nm_listentcpdns but for an SSL (DoT) socket. */ +isc_result_t +isc_nm_listenstreamdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, + isc_nm_recv_cb_t recv_cb, void *recv_cbarg, + isc_nm_accept_cb_t accept_cb, void *accept_cbarg, + int backlog, isc_quota_t *quota, isc_tlsctx_t *sslctx, + isc_nmsocket_t **sockp); + void isc_nm_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle, uint32_t keepalive, uint32_t advertised); @@ -496,6 +503,11 @@ isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, isc_nm_cb_t cb, void *cbarg, unsigned int timeout, isc_tlsctx_t *sslctx, isc_tlsctx_client_session_cache_t *client_sess_cache); +void +isc_nm_streamdnsconnect(isc_nm_t *mgr, isc_sockaddr_t *local, + isc_sockaddr_t *peer, isc_nm_cb_t cb, void *cbarg, + unsigned int timeout, isc_tlsctx_t *sslctx, + isc_tlsctx_client_session_cache_t *client_sess_cache); /*%< * Establish a DNS client connection via a TCP or TLS connection, bound to * the address 'local' and connected to the address 'peer'. diff --git a/lib/isc/include/isc/types.h b/lib/isc/include/isc/types.h index 51a246dfab..353a37f5c3 100644 --- a/lib/isc/include/isc/types.h +++ b/lib/isc/include/isc/types.h @@ -110,6 +110,7 @@ typedef enum isc_nmsocket_type { isc_nm_tlssocket = 1 << 4, isc_nm_tlsdnssocket = 1 << 5, isc_nm_httpsocket = 1 << 6, + isc_nm_streamdnssocket = 1 << 7, isc_nm_maxsocket, isc_nm_udplistener, /* Aggregate of nm_udpsocks */ @@ -117,7 +118,8 @@ typedef enum isc_nmsocket_type { isc_nm_tlslistener, isc_nm_tcpdnslistener, isc_nm_tlsdnslistener, - isc_nm_httplistener + isc_nm_httplistener, + isc_nm_streamdnslistener } isc_nmsocket_type; typedef isc_nmsocket_type isc_nmsocket_type_t; diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index fe99e3a125..807bd151a8 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -281,6 +282,11 @@ typedef enum isc__netievent_type { netievent_httpsend, netievent_httpendpoints, + netievent_streamdnsclose, + netievent_streamdnssend, + netievent_streamdnsread, + netievent_streamdnscancel, + netievent_connectcb, netievent_readcb, netievent_sendcb, @@ -922,6 +928,17 @@ struct isc_nmsocket { isc_nmsocket_h2_t h2; #endif /* HAVE_LIBNGHTTP2 */ + + struct { + isc_dnsstream_assembler_t *input; + bool reading; + isc_nmsocket_t *listener; + isc_nmsocket_t *sock; + size_t nsending; + void *send_req; + bool dot_alpn_negotiated; + const char *tls_verify_error; + } streamdns; /*% * quota is the TCP client, attached when a TCP connection * is established. pquota is a non-attached pointer to the @@ -1702,6 +1719,79 @@ isc__nm_http_set_max_streams(isc_nmsocket_t *listener, #endif +void +isc__nm_async_streamdnsread(isc__networker_t *worker, isc__netievent_t *ev0); + +void +isc__nm_streamdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, + void *cbarg); + +void +isc__nm_async_streamdnssend(isc__networker_t *worker, isc__netievent_t *ev0); + +void +isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region, + isc_nm_cb_t cb, void *cbarg); + +void +isc__nm_async_streamdnsclose(isc__networker_t *worker, isc__netievent_t *ev0); + +void +isc__nm_streamdns_close(isc_nmsocket_t *sock); + +void +isc__nm_streamdns_stoplistening(isc_nmsocket_t *sock); + +void +isc__nm_streamdns_cleanup_data(isc_nmsocket_t *sock); + +void +isc__nm_async_streamdnscancel(isc__networker_t *worker, isc__netievent_t *ev0); + +void +isc__nm_streamdns_cancelread(isc_nmhandle_t *handle); + +void +isc__nmhandle_streamdns_cleartimeout(isc_nmhandle_t *handle); + +void +isc__nmhandle_streamdns_settimeout(isc_nmhandle_t *handle, uint32_t timeout); + +void +isc__nmhandle_streamdns_keepalive(isc_nmhandle_t *handle, bool value); + +void +isc__nmhandle_streamdns_setwritetimeout(isc_nmhandle_t *handle, + uint32_t timeout); + +bool +isc__nm_streamdns_has_encryption(const isc_nmhandle_t *handle); + +const char * +isc__nm_streamdns_verify_tls_peer_result_string(const isc_nmhandle_t *handle); + +void +isc__nm_streamdns_set_tlsctx(isc_nmsocket_t *listener, isc_tlsctx_t *tlsctx); + +bool +isc__nm_streamdns_xfr_allowed(isc_nmsocket_t *sock); + +void +isc__nmsocket_streamdns_reset(isc_nmsocket_t *sock); + +bool +isc__nmsocket_streamdns_timer_running(isc_nmsocket_t *sock); + +void +isc__nmsocket_streamdns_timer_stop(isc_nmsocket_t *sock); + +void +isc__nmsocket_streamdns_timer_restart(isc_nmsocket_t *sock); + +void +isc__nm_streamdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result, + bool async); + void isc__nm_async_settlsctx(isc__networker_t *worker, isc__netievent_t *ev0); @@ -1865,6 +1955,11 @@ NETIEVENT_SOCKET_HANDLE_TYPE(udpcancel); NETIEVENT_SOCKET_QUOTA_TYPE(tcpaccept); +NETIEVENT_SOCKET_TYPE(streamdnsclose); +NETIEVENT_SOCKET_REQ_TYPE(streamdnssend); +NETIEVENT_SOCKET_TYPE(streamdnsread); +NETIEVENT_SOCKET_HANDLE_TYPE(streamdnscancel); + NETIEVENT_SOCKET_TLSCTX_TYPE(settlsctx); NETIEVENT_SOCKET_TYPE(sockstop); @@ -1915,6 +2010,11 @@ NETIEVENT_SOCKET_DECL(detach); NETIEVENT_SOCKET_QUOTA_DECL(tcpaccept); +NETIEVENT_SOCKET_DECL(streamdnsclose); +NETIEVENT_SOCKET_REQ_DECL(streamdnssend); +NETIEVENT_SOCKET_DECL(streamdnsread); +NETIEVENT_SOCKET_HANDLE_DECL(streamdnscancel); + NETIEVENT_SOCKET_TLSCTX_DECL(settlsctx); NETIEVENT_SOCKET_DECL(sockstop); diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index bca6f3c76b..9fdf77a96a 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -354,6 +354,9 @@ isc_nmhandle_setwritetimeout(isc_nmhandle_t *handle, uint64_t write_timeout) { isc__nmhandle_tls_setwritetimeout(handle, write_timeout); break; #endif /* HAVE_LIBNGHTTP2 */ + case isc_nm_streamdnssocket: + isc__nmhandle_streamdns_setwritetimeout(handle, write_timeout); + break; default: UNREACHABLE(); break; @@ -480,6 +483,11 @@ process_netievent(void *arg) { NETIEVENT_CASE(httpclose); NETIEVENT_CASE(httpendpoints); #endif + NETIEVENT_CASE(streamdnsread); + NETIEVENT_CASE(streamdnssend); + NETIEVENT_CASE(streamdnsclose); + NETIEVENT_CASE(streamdnscancel); + NETIEVENT_CASE(settlsctx); NETIEVENT_CASE(sockstop); @@ -556,6 +564,11 @@ NETIEVENT_SOCKET_DEF(detach); NETIEVENT_SOCKET_QUOTA_DEF(tcpaccept); +NETIEVENT_SOCKET_DEF(streamdnsclose); +NETIEVENT_SOCKET_REQ_DEF(streamdnssend); +NETIEVENT_SOCKET_DEF(streamdnsread); +NETIEVENT_SOCKET_HANDLE_DEF(streamdnscancel); + NETIEVENT_SOCKET_TLSCTX_DEF(settlsctx); NETIEVENT_SOCKET_DEF(sockstop); @@ -715,6 +728,7 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree FLARG) { isc__nm_tls_cleanup_data(sock); isc__nm_http_cleanup_data(sock); #endif + isc__nm_streamdns_cleanup_data(sock); if (sock->barrier_initialised) { isc_barrier_destroy(&sock->barrier); @@ -844,6 +858,9 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) { case isc_nm_tlsdnssocket: isc__nm_tlsdns_close(sock); return; + case isc_nm_streamdnssocket: + isc__nm_streamdns_close(sock); + return; #if HAVE_LIBNGHTTP2 case isc_nm_tlssocket: isc__nm_tls_close(sock); @@ -896,6 +913,7 @@ isc_nmsocket_close(isc_nmsocket_t **sockp) { (*sockp)->type == isc_nm_tcplistener || (*sockp)->type == isc_nm_tcpdnslistener || (*sockp)->type == isc_nm_tlsdnslistener || + (*sockp)->type == isc_nm_streamdnslistener || (*sockp)->type == isc_nm_tlslistener || (*sockp)->type == isc_nm_httplistener); @@ -1159,7 +1177,8 @@ isc_nmhandle_is_stream(isc_nmhandle_t *handle) { handle->sock->type == isc_nm_tcpdnssocket || handle->sock->type == isc_nm_tlssocket || handle->sock->type == isc_nm_tlsdnssocket || - handle->sock->type == isc_nm_httpsocket); + handle->sock->type == isc_nm_httpsocket || + handle->sock->type == isc_nm_streamdnssocket); } static void @@ -1406,6 +1425,9 @@ isc__nm_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result, bool async) { isc__nm_tls_failed_read_cb(sock, result, async); return; #endif + case isc_nm_streamdnssocket: + isc__nm_streamdns_failed_read_cb(sock, result, async); + return; default: UNREACHABLE(); } @@ -1517,6 +1539,9 @@ isc__nmsocket_timer_restart(isc_nmsocket_t *sock) { isc__nmsocket_tls_timer_restart(sock); return; #endif /* HAVE_LIBNGHTTP2 */ + case isc_nm_streamdnssocket: + isc__nmsocket_streamdns_timer_restart(sock); + return; default: break; } @@ -1560,6 +1585,8 @@ isc__nmsocket_timer_running(isc_nmsocket_t *sock) { case isc_nm_tlssocket: return (isc__nmsocket_tls_timer_running(sock)); #endif /* HAVE_LIBNGHTTP2 */ + case isc_nm_streamdnssocket: + return (isc__nmsocket_streamdns_timer_running(sock)); default: break; } @@ -1590,6 +1617,9 @@ isc__nmsocket_timer_stop(isc_nmsocket_t *sock) { isc__nmsocket_tls_timer_stop(sock); return; #endif /* HAVE_LIBNGHTTP2 */ + case isc_nm_streamdnssocket: + isc__nmsocket_streamdns_timer_stop(sock); + return; default: break; } @@ -1613,6 +1643,9 @@ isc__nm_get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr) { case isc_nm_tlssocket: isc_nmhandle_attach(sock->statichandle, &req->handle); break; + case isc_nm_streamdnssocket: + isc_nmhandle_attach(sock->recv_handle, &req->handle); + break; default: if (atomic_load(&sock->client) && sock->statichandle != NULL) { isc_nmhandle_attach(sock->statichandle, &req->handle); @@ -1842,6 +1875,9 @@ isc_nmhandle_cleartimeout(isc_nmhandle_t *handle) { isc__nm_tls_cleartimeout(handle); return; #endif + case isc_nm_streamdnssocket: + isc__nmhandle_streamdns_cleartimeout(handle); + return; default: handle->sock->read_timeout = 0; @@ -1865,6 +1901,9 @@ isc_nmhandle_settimeout(isc_nmhandle_t *handle, uint32_t timeout) { isc__nm_tls_settimeout(handle, timeout); return; #endif + case isc_nm_streamdnssocket: + isc__nmhandle_streamdns_settimeout(handle, timeout); + return; default: handle->sock->read_timeout = timeout; isc__nmsocket_timer_restart(handle->sock); @@ -1892,6 +1931,9 @@ isc_nmhandle_keepalive(isc_nmhandle_t *handle, bool value) { sock->write_timeout = value ? atomic_load(&netmgr->keepalive) : atomic_load(&netmgr->idle); break; + case isc_nm_streamdnssocket: + isc__nmhandle_streamdns_keepalive(handle, value); + break; #if HAVE_LIBNGHTTP2 case isc_nm_tlssocket: isc__nmhandle_tls_keepalive(handle, value); @@ -2025,6 +2067,9 @@ isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, case isc_nm_tlsdnssocket: isc__nm_tlsdns_send(handle, region, cb, cbarg); break; + case isc_nm_streamdnssocket: + isc__nm_streamdns_send(handle, region, cb, cbarg); + break; #if HAVE_LIBNGHTTP2 case isc_nm_tlssocket: isc__nm_tls_send(handle, region, cb, cbarg); @@ -2055,6 +2100,9 @@ isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { case isc_nm_tlsdnssocket: isc__nm_tlsdns_read(handle, cb, cbarg); break; + case isc_nm_streamdnssocket: + isc__nm_streamdns_read(handle, cb, cbarg); + break; #if HAVE_LIBNGHTTP2 case isc_nm_tlssocket: isc__nm_tls_read(handle, cb, cbarg); @@ -2082,6 +2130,9 @@ isc_nm_cancelread(isc_nmhandle_t *handle) { case isc_nm_tlsdnssocket: isc__nm_tlsdns_cancelread(handle); break; + case isc_nm_streamdnssocket: + isc__nm_streamdns_cancelread(handle); + break; default: UNREACHABLE(); } @@ -2124,6 +2175,9 @@ isc_nm_stoplistening(isc_nmsocket_t *sock) { case isc_nm_tlsdnslistener: isc__nm_tlsdns_stoplistening(sock); break; + case isc_nm_streamdnslistener: + isc__nm_streamdns_stoplistening(sock); + break; #if HAVE_LIBNGHTTP2 case isc_nm_tlslistener: isc__nm_tls_stoplistening(sock); @@ -2364,6 +2418,9 @@ isc__nmsocket_reset(isc_nmsocket_t *sock) { isc__nmsocket_tls_reset(sock); return; #endif /* HAVE_LIBNGHTTP2 */ + case isc_nm_streamdnssocket: + isc__nmsocket_streamdns_reset(sock); + return; default: UNREACHABLE(); break; @@ -2583,6 +2640,7 @@ isc_nm_bad_request(isc_nmhandle_t *handle) { case isc_nm_tcpdnssocket: case isc_nm_tlsdnssocket: case isc_nm_tcpsocket: + case isc_nm_streamdnssocket: #if HAVE_LIBNGHTTP2 case isc_nm_tlssocket: #endif /* HAVE_LIBNGHTTP2 */ @@ -2614,6 +2672,8 @@ isc_nm_xfr_allowed(isc_nmhandle_t *handle) { return (true); case isc_nm_tlsdnssocket: return (isc__nm_tlsdns_xfr_allowed(sock)); + case isc_nm_streamdnssocket: + return (isc__nm_streamdns_xfr_allowed(sock)); default: return (false); } @@ -2653,6 +2713,7 @@ isc_nm_set_maxage(isc_nmhandle_t *handle, const uint32_t ttl) { case isc_nm_udpsocket: case isc_nm_tcpdnssocket: case isc_nm_tlsdnssocket: + case isc_nm_streamdnssocket: return; break; @@ -2689,6 +2750,8 @@ isc_nm_has_encryption(const isc_nmhandle_t *handle) { case isc_nm_httpsocket: return (isc__nm_http_has_encryption(handle)); #endif /* HAVE_LIBNGHTTP2 */ + case isc_nm_streamdnssocket: + return (isc__nm_streamdns_has_encryption(handle)); default: return (false); }; @@ -2716,6 +2779,10 @@ isc_nm_verify_tls_peer_result_string(const isc_nmhandle_t *handle) { return (isc__nm_http_verify_tls_peer_result_string(handle)); break; #endif /* HAVE_LIBNGHTTP2 */ + case isc_nm_streamdnssocket: + return (isc__nm_streamdns_verify_tls_peer_result_string( + handle)); + break; default: break; } @@ -2785,6 +2852,9 @@ isc_nmsocket_set_tlsctx(isc_nmsocket_t *listener, isc_tlsctx_t *tlsctx) { case isc_nm_tlsdnslistener: set_tlsctx_workers(listener, tlsctx); break; + case isc_nm_streamdnslistener: + isc__nm_streamdns_set_tlsctx(listener, tlsctx); + break; default: UNREACHABLE(); break; @@ -2976,6 +3046,10 @@ nmsocket_type_totext(isc_nmsocket_type type) { return ("isc_nm_httplistener"); case isc_nm_httpsocket: return ("isc_nm_httpsocket"); + case isc_nm_streamdnslistener: + return ("isc_nm_streamdnslistener"); + case isc_nm_streamdnssocket: + return ("isc_nm_streamdnssocket"); default: UNREACHABLE(); } diff --git a/lib/isc/netmgr/streamdns.c b/lib/isc/netmgr/streamdns.c new file mode 100644 index 0000000000..6e3a5bfe5b --- /dev/null +++ b/lib/isc/netmgr/streamdns.c @@ -0,0 +1,1197 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * SPDX-License-Identifier: MPL-2.0 + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, you can obtain one at https://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#include +#include + +#include +#include +#include +#include + +#include "netmgr-int.h" + +/* + * Stream DNS is a unified transport capable of serving both DNS over + * TCP and DNS over TLS. It is built on top of + * 'isc_dnsstream_assembler_t' and 'isc_dnsbuffer_t'. The first one + * is used for assembling DNS messages in the format used for DNS over + * TCP out of incoming data and is built on top of + * 'isc_dnsbuffer_t'. The 'isc_dnbuffer_t' is a thin wrapper on top of + * 'isc_buffer_t' and is optimised for small (>= 512 bytes) DNS + * messages. For small messages it uses a small static memory buffer, + * but it can automatically switch to a dynamically allocated memory + * buffer for larger ones. This way we avoid unnecessary memory + * allocation requests in most cases, as most DNS messages are small. + * + * The use of 'isc_dnsstream_assembler_t' allows decoupling DNS + * message assembling code from networking code itself, making it + * easier to test. + * + * To understand how the part responsible for reading of data works, + * start by looking at 'streamdns_on_dnsmessage_data_cb()' (the DNS + * message data processing callback) and + * 'streamdns_handle_incoming_data()' which passes incoming data to + * the 'isc_dnsstream_assembler_t' object within the socket. + * + * The writing is done in a simpler manner due to the fact that we + * have full control over the data. For each write request we attempt + * to allocate a 'streamdns_send_req_t' structure, whose main purpose + * is to keep the data required for the send request processing. An + * 'isc_dnsbuffer_t' object is used as a data storage for the reasons + * described above. + * + * When processing write requests there is an important optimisation: + * we attempt to reuse 'streamdns_send_req_t' objects again, in order + * to avoid memory allocations when: + * + * a) requesting memory for the new 'streamdns_send_req_t' object; + * + * b) resizing the 'isc_dnsbuffer_t' to fit large messages, should it + * be required. + * + * The last characteristic is important as it allows gradually growing + * the reused send buffer in a lazy manner when transmitting multiple + * DNS messages (e.g. during zone transfers). + * + * To understand how sending is done, start by looking at + * 'isc__nm_async_streamdnssend()'. Additionally also take a look at + * 'streamdns_get_send_req()' and 'streamdns_put_send_req()' which are + * responsible for send requests allocation/reuse and initialisation. + * + * The rest of the code is mostly wrapping code to expose the + * functionality of the underlying transport, which at the moment + * could be either TCP or TLS. + */ + +typedef struct streamdns_send_req { + isc_nm_cb_t cb; /* send callback */ + void *cbarg; /* send callback argument */ + isc_nmhandle_t *dnshandle; /* Stream DNS socket handle */ + isc_dnsbuffer_t data; /* buffer that contains the DNS message to send */ +} streamdns_send_req_t; + +static streamdns_send_req_t * +streamdns_get_send_req(isc_nmsocket_t *sock, isc_mem_t *mctx, + isc__nm_uvreq_t *req, isc_region_t *data); + +static void +streamdns_put_send_req(isc_mem_t *mctx, streamdns_send_req_t *send_req, + const bool force_destroy); + +static void +streamdns_readcb(isc_nmhandle_t *handle, isc_result_t result, + isc_region_t *region, void *cbarg); + +static void +streamdns_failed_read_cb(isc_nmsocket_t *sock, const isc_result_t result, + const bool async); + +static void +streamdns_try_close_unused(isc_nmsocket_t *sock); + +static bool +streamdns_closing(isc_nmsocket_t *sock); + +static void +streamdns_resumeread(isc_nmsocket_t *sock, isc_nmhandle_t *transphandle) { + if (!sock->streamdns.reading) { + sock->streamdns.reading = true; + isc_nm_read(transphandle, streamdns_readcb, (void *)sock); + } +} + +static void +streamdns_readmore(isc_nmsocket_t *sock, isc_nmhandle_t *transphandle) { + streamdns_resumeread(sock, transphandle); + if (sock->streamdns.reading && atomic_load(&sock->ah) == 1) { + isc__nmsocket_timer_start(sock); + } +} + +static void +streamdns_pauseread(isc_nmsocket_t *sock, isc_nmhandle_t *transphandle) { + if (sock->streamdns.reading) { + sock->streamdns.reading = false; + isc_nm_read_stop(transphandle); + } +} + +static bool +streamdns_on_complete_dnsmessage(isc_dnsstream_assembler_t *dnsasm, + isc_region_t *restrict region, + isc_nmsocket_t *sock, + isc_nmhandle_t *transphandle) { + const bool client = atomic_load(&sock->client); + const bool last_datum = isc_dnsstream_assembler_remaininglength( + dnsasm) == region->length; + /* + * Stop after one message if a client + * connection. + */ + bool stop = client; + + sock->recv_read = false; + if (sock->recv_cb != NULL) { + if (!client) { + /* + * We must allocate a new handle object, as we + * need to ensure that after processing of this + * message has been completed and the handle + * gets destroyed, 'nsock->closehandle_cb' + * (streamdns_resume_processing()) is invoked. + * That is required for pipelining support. + */ + isc_nmhandle_t *handle = isc__nmhandle_get( + sock, &sock->peer, &sock->iface); + sock->recv_cb(handle, ISC_R_SUCCESS, region, + sock->recv_cbarg); + isc_nmhandle_detach(&handle); + } else { + /* + * As on the client side we are supposed to stop + * reading/processing after receiving one + * message, we can use the 'sock->recv_handle' + * from which we would need to detach before + * calling the read callback anyway. + */ + isc_nmhandle_t *recv_handle = sock->recv_handle; + sock->recv_handle = NULL; + sock->recv_cb(recv_handle, ISC_R_SUCCESS, region, + sock->recv_cbarg); + isc_nmhandle_detach(&recv_handle); + } + + if (streamdns_closing(sock)) { + stop = true; + } + } else { + stop = true; + } + + isc__nmsocket_timer_stop(sock); + if (!stop && last_datum) { + /* + * We have processed all data, need to read more. + * The call also restarts the timer. + */ + streamdns_readmore(sock, transphandle); + } else if (stop) { + streamdns_pauseread(sock, transphandle); + } + + return (!stop); +} + +/* + * This function, alongside 'streamdns_handle_incoming_data()', + * connects networking code to the 'isc_dnsstream_assembler_t'. It is + * responsible for making decisions regarding reading from the + * underlying transport socket as well as controlling the read timer. + */ +static bool +streamdns_on_dnsmessage_data_cb(isc_dnsstream_assembler_t *dnsasm, + const isc_result_t result, + isc_region_t *restrict region, void *cbarg, + void *userarg) { + isc_nmsocket_t *sock = (isc_nmsocket_t *)cbarg; + isc_nmhandle_t *transphandle = (isc_nmhandle_t *)userarg; + + switch (result) { + case ISC_R_SUCCESS: + /* + * A complete DNS message has been assembled from the incoming + * data. Let's process it. + */ + return (streamdns_on_complete_dnsmessage(dnsasm, region, sock, + transphandle)); + case ISC_R_RANGE: + /* + * It seems that someone attempts to send us some binary junk + * over the socket, as the beginning of the next message tells + * us the there is an empty (0-sized) DNS message to receive. + * We should treat it as a hard error. + */ + streamdns_failed_read_cb(sock, result, false); + return (false); + case ISC_R_NOMORE: + /* + * We do not have enough data to process the next message and + * thus we need to resume reading from the socket. + */ + if (sock->recv_handle != NULL) { + streamdns_readmore(sock, transphandle); + } + return (false); + default: + UNREACHABLE(); + }; +} + +static void +streamdns_handle_incoming_data(isc_nmsocket_t *sock, + isc_nmhandle_t *transphandle, + void *restrict data, size_t len) { + isc_dnsstream_assembler_t *dnsasm = sock->streamdns.input; + + /* + * Try to process the received data or, when 'data == NULL' and + * 'len == 0', try to resume processing of the data within the + * internal buffers or resume reading, if there is no any. + */ + isc_dnsstream_assembler_incoming(dnsasm, transphandle, data, len); + streamdns_try_close_unused(sock); +} + +static isc_nmsocket_t * +streamdns_sock_new(isc__networker_t *worker, const isc_nmsocket_type_t type, + isc_sockaddr_t *addr, const bool is_server) { + isc_nmsocket_t *sock; + INSIST(type == isc_nm_streamdnssocket || + type == isc_nm_streamdnslistener); + + sock = isc_mem_get(worker->mctx, sizeof(*sock)); + isc__nmsocket_init(sock, worker, type, addr); + sock->result = ISC_R_UNSET; + if (type == isc_nm_streamdnssocket) { + uint32_t initial = 0; + isc_nm_gettimeouts(worker->netmgr, &initial, NULL, NULL, NULL); + sock->read_timeout = initial; + atomic_init(&sock->client, !is_server); + atomic_init(&sock->connecting, !is_server); + sock->streamdns.input = isc_dnsstream_assembler_new( + sock->worker->mctx, streamdns_on_dnsmessage_data_cb, + sock); + } + + return (sock); +} + +static void +streamdns_call_connect_cb(isc_nmsocket_t *sock, isc_nmhandle_t *handle, + const isc_result_t result) { + atomic_store(&sock->connecting, false); + if (sock->connect_cb == NULL) { + return; + } + sock->connect_cb(handle, result, sock->connect_cbarg); + if (result != ISC_R_SUCCESS) { + isc__nmsocket_clearcb(handle->sock); + } else { + atomic_store(&sock->connected, true); + } + streamdns_try_close_unused(sock); +} + +static void +streamdns_save_alpn_status(isc_nmsocket_t *dnssock, + isc_nmhandle_t *transp_handle) { + const unsigned char *alpn = NULL; + unsigned int alpnlen = 0; + + isc__nmhandle_get_selected_alpn(transp_handle, &alpn, &alpnlen); + if (alpn != NULL && alpnlen == ISC_TLS_DOT_PROTO_ALPN_ID_LEN && + memcmp(ISC_TLS_DOT_PROTO_ALPN_ID, alpn, + ISC_TLS_DOT_PROTO_ALPN_ID_LEN) == 0) + { + dnssock->streamdns.dot_alpn_negotiated = true; + } +} + +static void +streamdns_transport_connected(isc_nmhandle_t *handle, isc_result_t result, + void *cbarg) { + isc_nmsocket_t *sock = (isc_nmsocket_t *)cbarg; + isc_nmhandle_t *streamhandle = NULL; + + REQUIRE(VALID_NMSOCK(sock)); + + sock->tid = isc_tid(); + if (result == ISC_R_EOF) { + /* + * The transport layer (probably TLS) has returned EOF during + * connection establishment. That means that connection has + * been "cancelled" (for compatibility with old transport + * behaviour). + */ + result = ISC_R_CANCELED; + goto error; + } else if (result == ISC_R_TLSERROR) { + /* + * In some of the cases when the old code would return + * ISC_R_CANCELLED, the new code could return generic + * ISC_R_TLSERROR code. However, the old code does not expect + * that. + */ + result = ISC_R_CANCELED; + goto error; + } else if (result != ISC_R_SUCCESS) { + goto error; + } + + INSIST(VALID_NMHANDLE(handle)); + + sock->iface = isc_nmhandle_localaddr(handle); + sock->peer = isc_nmhandle_peeraddr(handle); + if (isc__nmsocket_closing(handle->sock)) { + result = ISC_R_SHUTTINGDOWN; + goto error; + } + + isc_nmhandle_attach(handle, &sock->outerhandle); + atomic_store(&sock->active, true); + + handle->sock->streamdns.sock = sock; + + streamdns_save_alpn_status(sock, handle); + isc__nmhandle_set_manual_timer(sock->outerhandle, true); + streamhandle = isc__nmhandle_get(sock, &sock->peer, &sock->iface); + streamdns_call_connect_cb(sock, streamhandle, result); + isc_nmhandle_detach(&streamhandle); + + return; +error: + if (handle != NULL) { + /* + * Let's save the error description (if any) so that + * e.g. 'dig' could produce a usable error message. + */ + INSIST(VALID_NMHANDLE(handle)); + sock->streamdns.tls_verify_error = + isc_nm_verify_tls_peer_result_string(handle); + } + streamhandle = isc__nmhandle_get(sock, NULL, NULL); + atomic_store(&sock->closed, true); + streamdns_call_connect_cb(sock, streamhandle, result); + isc_nmhandle_detach(&streamhandle); + isc__nmsocket_detach(&sock); +} + +void +isc_nm_streamdnsconnect(isc_nm_t *mgr, isc_sockaddr_t *local, + isc_sockaddr_t *peer, isc_nm_cb_t cb, void *cbarg, + unsigned int timeout, isc_tlsctx_t *ctx, + isc_tlsctx_client_session_cache_t *client_sess_cache) { + isc_nmsocket_t *nsock = NULL; + isc__networker_t *worker = &mgr->workers[isc_tid()]; + + REQUIRE(VALID_NM(mgr)); + + if (isc__nm_closing(worker)) { + cb(NULL, ISC_R_SHUTTINGDOWN, cbarg); + return; + } + + nsock = streamdns_sock_new(worker, isc_nm_streamdnssocket, local, + false); + nsock->connect_cb = cb; + nsock->connect_cbarg = cbarg; + nsock->connect_timeout = timeout; + + if (ctx == NULL) { + INSIST(client_sess_cache == NULL); + isc_nm_tcpconnect(mgr, local, peer, + streamdns_transport_connected, nsock, + nsock->connect_timeout); + } else { + isc_nm_tlsconnect(mgr, local, peer, + streamdns_transport_connected, nsock, ctx, + client_sess_cache, nsock->connect_timeout); + } +} + +static bool +streamdns_waiting_for_msg(isc_nmsocket_t *sock) { + /* There is an unsatisfied read operation pending */ + return (sock->recv_read); +} + +bool +isc__nmsocket_streamdns_timer_running(isc_nmsocket_t *sock) { + isc_nmsocket_t *transp_sock; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_streamdnssocket); + + if (sock->outerhandle == NULL) { + return (false); + } + + INSIST(VALID_NMHANDLE(sock->outerhandle)); + transp_sock = sock->outerhandle->sock; + INSIST(VALID_NMSOCK(transp_sock)); + + return (isc__nmsocket_timer_running(transp_sock)); +} + +void +isc__nmsocket_streamdns_timer_stop(isc_nmsocket_t *sock) { + isc_nmsocket_t *transp_sock; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_streamdnssocket); + + if (sock->outerhandle == NULL) { + return; + } + + INSIST(VALID_NMHANDLE(sock->outerhandle)); + transp_sock = sock->outerhandle->sock; + INSIST(VALID_NMSOCK(transp_sock)); + + isc__nmsocket_timer_stop(transp_sock); +} + +void +isc__nmsocket_streamdns_timer_restart(isc_nmsocket_t *sock) { + isc_nmsocket_t *transp_sock; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_streamdnssocket); + + if (sock->outerhandle == NULL) { + return; + } + + INSIST(VALID_NMHANDLE(sock->outerhandle)); + transp_sock = sock->outerhandle->sock; + INSIST(VALID_NMSOCK(transp_sock)); + + isc__nmsocket_timer_restart(transp_sock); +} + +static void +streamdns_failed_read_cb(isc_nmsocket_t *sock, const isc_result_t result, + const bool async) { + bool destroy = true; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(result != ISC_R_SUCCESS); + + if (sock->recv_cb != NULL && sock->recv_handle != NULL && + (streamdns_waiting_for_msg(sock) || result == ISC_R_TIMEDOUT)) + { + isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL); + + INSIST(VALID_NMHANDLE(sock->recv_handle)); + + if (result != ISC_R_TIMEDOUT) { + sock->recv_read = false; + isc_dnsstream_assembler_clear(sock->streamdns.input); + isc__nmsocket_clearcb(sock); + } else if (atomic_load(&sock->client)) { + sock->recv_read = false; + } + isc__nm_readcb(sock, req, result, async); + if (result == ISC_R_TIMEDOUT && + isc__nmsocket_streamdns_timer_running(sock)) + { + destroy = false; + } + } + + if (destroy) { + isc__nmsocket_prep_destroy(sock); + } +} + +void +isc__nm_streamdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result, + const bool async) { + REQUIRE(result != ISC_R_SUCCESS); + REQUIRE(sock->type == isc_nm_streamdnssocket); + sock->streamdns.reading = false; + streamdns_failed_read_cb(sock, result, async); +} + +static void +streamdns_readcb(isc_nmhandle_t *handle, isc_result_t result, + isc_region_t *region, void *cbarg) { + isc_nmsocket_t *sock = (isc_nmsocket_t *)cbarg; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_tid()); + + if (result != ISC_R_SUCCESS) { + streamdns_failed_read_cb(sock, result, false); + return; + } else if (streamdns_closing(sock)) { + streamdns_failed_read_cb(sock, ISC_R_CANCELED, false); + return; + } + + streamdns_handle_incoming_data(sock, handle, region->base, + region->length); +} + +static void +streamdns_try_close_unused(isc_nmsocket_t *sock) { + if (sock->recv_handle == NULL && sock->streamdns.nsending == 0) { + /* + * The socket is unused after calling the callback. Let's close + * the underlying connection. + */ + isc__nmsocket_prep_destroy(sock); + } +} + +static streamdns_send_req_t * +streamdns_get_send_req(isc_nmsocket_t *sock, isc_mem_t *mctx, + isc__nm_uvreq_t *req, isc_region_t *data) { + streamdns_send_req_t *send_req; + + if (sock->streamdns.send_req != NULL) { + /* + * We have a previously allocated object - let's use that. + * That should help reducing stress on the memory allocator. + */ + send_req = (streamdns_send_req_t *)sock->streamdns.send_req; + sock->streamdns.send_req = NULL; + } else { + /* Allocate a new object. */ + send_req = isc_mem_get(mctx, sizeof(*send_req)); + *send_req = (streamdns_send_req_t){ 0 }; + isc_dnsbuffer_init(&send_req->data, mctx); + } + + /* Initialise the send request object */ + send_req->cb = req->cb.send; + send_req->cbarg = req->cbarg; + isc_nmhandle_attach(req->handle, &send_req->dnshandle); + + /* Prepare the message */ + /* 1. Add the message length at the very beginning of the message */ + isc_dnsbuffer_putmem_uint16be(&send_req->data, + (uint16_t)req->uvbuf.len); + /* 2. Append the data itself */ + isc_dnsbuffer_putmem(&send_req->data, req->uvbuf.base, req->uvbuf.len); + isc_dnsbuffer_remainingregion(&send_req->data, data); + + sock->streamdns.nsending++; + + return (send_req); +} + +static void +streamdns_put_send_req(isc_mem_t *mctx, streamdns_send_req_t *send_req, + const bool force_destroy) { + /* + * Attempt to put the object for reuse later if we are not + * wrapping up. + */ + if (!force_destroy) { + isc_nmsocket_t *sock = send_req->dnshandle->sock; + sock->streamdns.nsending--; + isc_nmhandle_detach(&send_req->dnshandle); + if (sock->streamdns.send_req == NULL) { + isc_dnsbuffer_clear(&send_req->data); + sock->streamdns.send_req = send_req; + /* + * An object has been recycled, + * if not - we are going to destroy it. + */ + return; + } + } + + isc_dnsbuffer_uninit(&send_req->data); + isc_mem_put(mctx, send_req, sizeof(*send_req)); +} + +static void +streamdns_writecb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { + streamdns_send_req_t *send_req = (streamdns_send_req_t *)cbarg; + isc_mem_t *mctx; + isc_nm_cb_t cb; + void *send_cbarg; + isc_nmhandle_t *dnshandle = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMHANDLE(send_req->dnshandle)); + REQUIRE(VALID_NMSOCK(send_req->dnshandle->sock)); + REQUIRE(send_req->dnshandle->sock->tid == isc_tid()); + + mctx = send_req->dnshandle->sock->worker->mctx; + cb = send_req->cb; + send_cbarg = send_req->cbarg; + + isc_nmhandle_attach(send_req->dnshandle, &dnshandle); + /* try to keep the send request object for reuse */ + streamdns_put_send_req(mctx, send_req, false); + cb(dnshandle, result, send_cbarg); + streamdns_try_close_unused(dnshandle->sock); + isc_nmhandle_detach(&dnshandle); +} + +static bool +streamdns_closing(isc_nmsocket_t *sock) { + return (isc__nmsocket_closing(sock) || isc__nm_closing(sock->worker) || + sock->outerhandle == NULL || + (sock->outerhandle != NULL && + isc__nmsocket_closing(sock->outerhandle->sock))); +} + +static void +streamdns_resume_processing(void *arg) { + isc_nmsocket_t *sock = (isc_nmsocket_t *)arg; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_tid()); + REQUIRE(!atomic_load(&sock->client)); + + if (streamdns_closing(sock)) { + return; + } + + streamdns_handle_incoming_data(sock, sock->outerhandle, NULL, 0); +} + +static isc_result_t +streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { + isc_nmsocket_t *listensock = (isc_nmsocket_t *)cbarg; + isc_nmsocket_t *nsock; + isc_sockaddr_t iface; + int tid; + uint32_t initial = 0; + + if (result != ISC_R_SUCCESS) { + return (result); + } + + INSIST(VALID_NMHANDLE(handle)); + INSIST(VALID_NMSOCK(handle->sock)); + INSIST(VALID_NMSOCK(listensock)); + INSIST(listensock->type == isc_nm_streamdnslistener); + + if (isc__nm_closing(handle->sock->worker)) { + return (ISC_R_SHUTTINGDOWN); + } else if (isc__nmsocket_closing(handle->sock) || + atomic_load(&listensock->closing)) + { + return (ISC_R_CANCELED); + } + + tid = isc_tid(); + iface = isc_nmhandle_localaddr(handle); + nsock = streamdns_sock_new(handle->sock->worker, isc_nm_streamdnssocket, + &iface, true); + nsock->recv_cb = listensock->recv_cb; + nsock->recv_cbarg = listensock->recv_cbarg; + + nsock->peer = isc_nmhandle_peeraddr(handle); + nsock->tid = tid; + isc_nm_gettimeouts(handle->sock->worker->netmgr, &initial, NULL, NULL, + NULL); + nsock->read_timeout = initial; + atomic_init(&nsock->accepting, true); + atomic_store(&nsock->active, true); + + isc__nmsocket_attach(listensock, &nsock->listener); + isc_nmhandle_attach(handle, &nsock->outerhandle); + handle->sock->streamdns.sock = nsock; + + streamdns_save_alpn_status(nsock, handle); + + nsock->recv_handle = isc__nmhandle_get(nsock, NULL, &iface); + INSIST(listensock->accept_cb != NULL); + result = listensock->accept_cb(nsock->recv_handle, result, + listensock->accept_cbarg); + if (result != ISC_R_SUCCESS) { + isc_nmhandle_detach(&nsock->recv_handle); + isc__nmsocket_detach(&nsock->listener); + isc_nmhandle_detach(&nsock->outerhandle); + atomic_store(&nsock->closed, true); + goto exit; + } + + nsock->closehandle_cb = streamdns_resume_processing; + isc__nmhandle_set_manual_timer(nsock->outerhandle, true); + isc_nm_gettimeouts(nsock->worker->netmgr, &initial, NULL, NULL, NULL); + /* settimeout restarts the timer */ + isc_nmhandle_settimeout(nsock->outerhandle, initial); + streamdns_handle_incoming_data(nsock, nsock->outerhandle, NULL, 0); + +exit: + atomic_store(&nsock->accepting, false); + + return (result); +} + +isc_result_t +isc_nm_listenstreamdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, + isc_nm_recv_cb_t recv_cb, void *recv_cbarg, + isc_nm_accept_cb_t accept_cb, void *accept_cbarg, + int backlog, isc_quota_t *quota, isc_tlsctx_t *tlsctx, + isc_nmsocket_t **sockp) { + isc_result_t result; + isc_nmsocket_t *listener = NULL; + isc__networker_t *worker = &mgr->workers[isc_tid()]; + + REQUIRE(VALID_NM(mgr)); + REQUIRE(isc_tid() == 0); + + if (isc__nm_closing(worker)) { + return (ISC_R_SHUTTINGDOWN); + } + + listener = streamdns_sock_new(worker, isc_nm_streamdnslistener, iface, + true); + listener->accept_cb = accept_cb; + listener->accept_cbarg = accept_cbarg; + listener->recv_cb = recv_cb; + listener->recv_cbarg = recv_cbarg; + + if (tlsctx == NULL) { + result = isc_nm_listentcp(mgr, workers, iface, + streamdns_accept_cb, listener, + backlog, quota, &listener->outer); + } else { + result = isc_nm_listentls( + mgr, workers, iface, streamdns_accept_cb, listener, + backlog, quota, tlsctx, &listener->outer); + } + if (result != ISC_R_SUCCESS) { + atomic_store(&listener->closed, true); + isc__nmsocket_detach(&listener); + return (result); + } + + listener->result = result; + atomic_store(&listener->active, true); + atomic_store(&listener->listening, true); + INSIST(listener->outer->streamdns.listener == NULL); + listener->nchildren = listener->outer->nchildren; + isc__nmsocket_barrier_init(listener); + atomic_init(&listener->rchildren, listener->outer->nchildren); + isc__nmsocket_attach(listener, &listener->outer->streamdns.listener); + + *sockp = listener; + + return (result); +} + +void +isc__nm_streamdns_cleanup_data(isc_nmsocket_t *sock) { + switch (sock->type) { + case isc_nm_streamdnssocket: + isc_dnsstream_assembler_free(&sock->streamdns.input); + INSIST(sock->streamdns.nsending == 0); + if (sock->streamdns.send_req != NULL) { + isc_mem_t *mctx = sock->worker->mctx; + streamdns_put_send_req(mctx, + (streamdns_send_req_t *) + sock->streamdns.send_req, + true); + } + break; + case isc_nm_streamdnslistener: + if (sock->outer) { + isc__nmsocket_detach(&sock->outer); + } + break; + case isc_nm_tlslistener: + case isc_nm_tcplistener: + if (sock->streamdns.listener != NULL) { + isc__nmsocket_detach(&sock->streamdns.listener); + } + break; + case isc_nm_tlssocket: + case isc_nm_tcpsocket: + if (sock->streamdns.sock != NULL) { + isc__nmsocket_detach(&sock->streamdns.sock); + } + break; + default: + return; + } +} + +void +isc__nm_async_streamdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_streamdnsread_t *ievent = + (isc__netievent_streamdnsread_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + + REQUIRE(sock->tid == isc_tid()); + UNUSED(worker); + + if (streamdns_closing(sock)) { + streamdns_failed_read_cb(sock, ISC_R_CANCELED, false); + return; + } + + if (sock->streamdns.reading) { + return; + } + + INSIST(VALID_NMHANDLE(sock->outerhandle)); + streamdns_handle_incoming_data(sock, sock->outerhandle, NULL, 0); +} + +void +isc__nm_streamdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, + void *cbarg) { + isc_nmsocket_t *sock = NULL; + bool closing = false; + bool worker_thread; + + REQUIRE(VALID_NMHANDLE(handle)); + sock = handle->sock; + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_streamdnssocket); + REQUIRE(sock->recv_handle == NULL); + + closing = streamdns_closing(sock); + worker_thread = sock->tid == isc_tid(); + + sock->recv_cb = cb; + sock->recv_cbarg = cbarg; + sock->recv_read = true; + isc_nmhandle_attach(handle, &sock->recv_handle); + + /* + * In some cases there is little sense in making the operation + * asynchronous as we just want to start reading from the + * underlying transport. + */ + if (worker_thread && !closing && + isc_dnsstream_assembler_result(sock->streamdns.input) == + ISC_R_UNSET) + { + isc__netievent_streamdnsread_t event = { .sock = sock }; + isc__nm_async_streamdnsread(sock->worker, + (isc__netievent_t *)&event); + } else { + isc__netievent_streamdnsread_t *ievent = NULL; + /* + * We want the read operation to be asynchronous in most cases + * because: + * + * 1. A read operation might be initiated from within the read + * callback itself. + * + * 2. Due to the above, we need to make the operation + * asynchronous to keep the socket state consistent. + */ + ievent = isc__nm_get_netievent_streamdnsread(sock->worker, + sock); + isc__nm_enqueue_ievent(sock->worker, + (isc__netievent_t *)ievent); + } +} + +void +isc__nm_async_streamdnssend(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_streamdnssend_t *ievent = + (isc__netievent_streamdnssend_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *req = ievent->req; + streamdns_send_req_t *send_req; + isc_mem_t *mctx; + isc_region_t data = { 0 }; + + REQUIRE(VALID_UVREQ(req)); + REQUIRE(sock->tid == isc_tid()); + + UNUSED(worker); + + ievent->req = NULL; + + if (streamdns_closing(sock)) { + isc__nm_failed_send_cb(sock, req, ISC_R_CANCELED, true); + return; + } + + mctx = sock->worker->mctx; + + send_req = streamdns_get_send_req(sock, mctx, req, &data); + isc_nm_send(sock->outerhandle, &data, streamdns_writecb, + (void *)send_req); + + isc__nm_uvreq_put(&req, sock); + return; +} + +void +isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region, + isc_nm_cb_t cb, void *cbarg) { + isc__nm_uvreq_t *uvreq = NULL; + isc_nmsocket_t *sock = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); + REQUIRE(region->length <= UINT16_MAX); + + sock = handle->sock; + + REQUIRE(sock->type == isc_nm_streamdnssocket); + + uvreq = isc__nm_uvreq_get(sock->worker, sock); + isc_nmhandle_attach(handle, &uvreq->handle); + uvreq->cb.send = cb; + uvreq->cbarg = cbarg; + uvreq->uvbuf.base = (char *)region->base; + uvreq->uvbuf.len = region->length; + + /* + * As when sending, we, basically, handing data to the underlying + * transport, we can treat the operation synchronously, as the + * transport code will take care of the asynchronicity if required. + */ + if (sock->tid == isc_tid()) { + isc__netievent_streamdnssend_t event = { .sock = sock, + .req = uvreq }; + isc__nm_async_streamdnssend(sock->worker, + (isc__netievent_t *)&event); + } else { + isc__netievent_streamdnssend_t *ievent = + isc__nm_get_netievent_streamdnssend(sock->worker, sock, + uvreq); + isc__nm_enqueue_ievent(sock->worker, + (isc__netievent_t *)ievent); + } +} + +static void +streamdns_close_direct(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_tid()); + + if (sock->outerhandle != NULL) { + sock->streamdns.reading = false; + isc_nm_read_stop(sock->outerhandle); + isc_nmhandle_close(sock->outerhandle); + isc_nmhandle_detach(&sock->outerhandle); + } + + if (sock->listener != NULL) { + isc__nmsocket_detach(&sock->listener); + } + + if (sock->recv_handle != NULL) { + isc_nmhandle_detach(&sock->recv_handle); + } + + /* Further cleanup performed in isc__nm_streamdns_cleanup_data() */ + isc_dnsstream_assembler_clear(sock->streamdns.input); + atomic_store(&sock->closed, true); + atomic_store(&sock->active, false); +} + +void +isc__nm_async_streamdnsclose(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_streamdnsclose_t *ievent = + (isc__netievent_streamdnsclose_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + + UNUSED(worker); + + streamdns_close_direct(sock); +} + +void +isc__nm_streamdns_close(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_streamdnssocket); + + if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, + true)) + { + return; + } + + if (sock->tid == isc_tid()) { + streamdns_close_direct(sock); + } else { + isc__netievent_streamdnsclose_t *ievent = + isc__nm_get_netievent_streamdnsclose(sock->worker, + sock); + isc__nm_enqueue_ievent(sock->worker, + (isc__netievent_t *)ievent); + } +} + +void +isc__nm_streamdns_stoplistening(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_streamdnslistener); + + isc__nmsocket_stop(sock); +} + +void +isc__nm_streamdns_cancelread(isc_nmhandle_t *handle) { + isc_nmsocket_t *sock = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); + REQUIRE(handle->sock->type == isc_nm_streamdnssocket); + + sock = handle->sock; + + isc__netievent_streamdnscancel_t *ievent = + isc__nm_get_netievent_streamdnscancel(sock->worker, + handle->sock, handle); + isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); +} + +void +isc__nm_async_streamdnscancel(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_streamdnscancel_t *ievent = + (isc__netievent_streamdnscancel_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + + UNUSED(worker); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_tid()); + + streamdns_failed_read_cb(sock, ISC_R_EOF, false); +} + +void +isc__nmhandle_streamdns_cleartimeout(isc_nmhandle_t *handle) { + isc_nmsocket_t *sock = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); + REQUIRE(handle->sock->type == isc_nm_streamdnssocket); + + sock = handle->sock; + if (sock->outerhandle != NULL) { + INSIST(VALID_NMHANDLE(sock->outerhandle)); + isc_nmhandle_cleartimeout(sock->outerhandle); + } +} + +void +isc__nmhandle_streamdns_settimeout(isc_nmhandle_t *handle, uint32_t timeout) { + isc_nmsocket_t *sock = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); + REQUIRE(handle->sock->type == isc_nm_streamdnssocket); + + sock = handle->sock; + if (sock->outerhandle != NULL) { + INSIST(VALID_NMHANDLE(sock->outerhandle)); + isc_nmhandle_settimeout(sock->outerhandle, timeout); + } +} + +void +isc__nmhandle_streamdns_keepalive(isc_nmhandle_t *handle, bool value) { + isc_nmsocket_t *sock = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); + REQUIRE(handle->sock->type == isc_nm_streamdnssocket); + + sock = handle->sock; + if (sock->outerhandle != NULL) { + INSIST(VALID_NMHANDLE(sock->outerhandle)); + isc_nmhandle_keepalive(sock->outerhandle, value); + } +} + +void +isc__nmhandle_streamdns_setwritetimeout(isc_nmhandle_t *handle, + uint32_t timeout) { + isc_nmsocket_t *sock = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); + REQUIRE(handle->sock->type == isc_nm_streamdnssocket); + + sock = handle->sock; + if (sock->outerhandle != NULL) { + INSIST(VALID_NMHANDLE(sock->outerhandle)); + isc_nmhandle_setwritetimeout(sock->outerhandle, timeout); + } +} + +bool +isc__nm_streamdns_has_encryption(const isc_nmhandle_t *handle) { + isc_nmsocket_t *sock = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); + REQUIRE(handle->sock->type == isc_nm_streamdnssocket); + + sock = handle->sock; + if (sock->outerhandle != NULL) { + INSIST(VALID_NMHANDLE(sock->outerhandle)); + return (isc_nm_has_encryption(sock->outerhandle)); + } + + return (false); +} + +const char * +isc__nm_streamdns_verify_tls_peer_result_string(const isc_nmhandle_t *handle) { + isc_nmsocket_t *sock = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); + REQUIRE(handle->sock->type == isc_nm_streamdnssocket); + + sock = handle->sock; + if (sock->outerhandle != NULL) { + INSIST(VALID_NMHANDLE(sock->outerhandle)); + return (isc_nm_verify_tls_peer_result_string( + sock->outerhandle)); + } else if (sock->streamdns.tls_verify_error != NULL) { + return (sock->streamdns.tls_verify_error); + } + + return (false); +} + +void +isc__nm_streamdns_set_tlsctx(isc_nmsocket_t *listener, isc_tlsctx_t *tlsctx) { + REQUIRE(VALID_NMSOCK(listener)); + REQUIRE(listener->type == isc_nm_streamdnslistener); + + if (listener->outer != NULL) { + INSIST(VALID_NMSOCK(listener->outer)); + isc_nmsocket_set_tlsctx(listener->outer, tlsctx); + } +} + +bool +isc__nm_streamdns_xfr_allowed(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_streamdnssocket); + + if (sock->outerhandle == NULL) { + return (false); + } else if (!isc_nm_has_encryption(sock->outerhandle)) { + return (true); + } + + return (sock->streamdns.dot_alpn_negotiated); +} + +void +isc__nmsocket_streamdns_reset(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_streamdnssocket); + + if (sock->outerhandle == NULL) { + return; + } + + INSIST(VALID_NMHANDLE(sock->outerhandle)); + isc__nmsocket_reset(sock->outerhandle->sock); +}