Compare commits

...

18 Commits

Author SHA1 Message Date
Witold Kręcicki
f136e0c45e netmgr: add isc_nm_shutdown function which shuts down all active TCP connection but does not destroy netmgr 2019-11-22 23:26:42 +01:00
Evan Hunt
2b05411645 add functions from netmgr-int.h to libisc.def.in 2019-11-22 11:18:00 -08:00
Evan Hunt
140a24c50e fixup! fixup! WiP: shutdown active sockets when shutting down netmgr 2019-11-22 11:17:35 -08:00
Evan Hunt
7c91b58d40 fixup! finish moving TCP timeout values from ns_server to netmgr 2019-11-22 10:56:14 -08:00
Witold Kręcicki
21a7338b98 fixup! fixup! WiP: shutdown active sockets when shutting down netmgr 2019-11-22 15:04:49 +01:00
Witold Kręcicki
df31bdf239 fixup! WiP: shutdown active sockets when shutting down netmgr 2019-11-22 14:41:22 +01:00
Witold Kręcicki
ea4e56b89d WiP: shutdown active sockets when shutting down netmgr 2019-11-22 14:13:19 +01:00
Witold Kręcicki
7ff5f95865 netmgr:
- add support for TCP backlog, use the value provided by config
 - don't attach to quota for listening sockets, just assign the value
2019-11-22 13:25:36 +01:00
Witold Kręcicki
223e93d215 Test: increase prefetch test tolerance 2019-11-22 09:43:17 +01:00
Evan Hunt
4975f43e7d finish moving TCP timeout values from ns_server to netmgr 2019-11-21 23:38:45 -08:00
Evan Hunt
83f9097612 log connection error 2019-11-21 18:38:20 -08:00
Evan Hunt
7ee7bdaec6 use memory pools for ievent and uvreq objects 2019-11-21 18:21:16 -08:00
Evan Hunt
a6a35373c1 remove unused worker memory pool 2019-11-21 17:08:06 -08:00
Witold Kręcicki
655cda9922 Revert "Disable system test retry"
This reverts commit 29ab9c6ae6.
2019-11-21 20:26:44 +01:00
Witold Kręcicki
29ab9c6ae6 Disable system test retry 2019-11-21 20:26:30 +01:00
Witold Kręcicki
a499b77759 Make all atomic reads use atomic_ functions 2019-11-21 20:26:30 +01:00
Evan Hunt
e29980400c Make TCP timeouts configurable. 2019-11-21 20:26:30 +01:00
Witold Kręcicki
3a102fa4df Netmgr:
- Add timeout support for TCP and TCPDNS connections
  (protection against slowloris)
- Rework/simplify tcpdns state machine.
2019-11-21 20:26:30 +01:00
19 changed files with 737 additions and 402 deletions

View File

@@ -938,6 +938,12 @@ create_managers(void) {
static void
destroy_managers(void) {
/*
* isc_nm_shutdown closes all active connections, freeing attached
* clients and all other resources - but not does not shutdown the
* processing yet.
*/
isc_nm_shutdown(named_g_nm);
/*
* isc_taskmgr_destroy() will block until all tasks have exited,
*/

View File

@@ -8470,8 +8470,8 @@ load_configuration(const char *filename, named_server_t *server,
advertised = MAX_TCP_TIMEOUT;
}
ns_server_settimeouts(named_g_server->sctx,
initial, idle, keepalive, advertised);
isc_nm_tcp_settimeouts(named_g_nm, initial, idle,
keepalive, advertised);
/*
* Configure sets of UDP query source ports.
@@ -15405,8 +15405,8 @@ named_server_tcptimeouts(isc_lex_t *lex, isc_buffer_t **text) {
if (ptr == NULL)
return (ISC_R_UNEXPECTEDEND);
ns_server_gettimeouts(named_g_server->sctx,
&initial, &idle, &keepalive, &advertised);
isc_nm_tcp_gettimeouts(named_g_nm, &initial, &idle,
&keepalive, &advertised);
/* Look for optional arguments. */
ptr = next_token(lex, NULL);
@@ -15445,7 +15445,7 @@ named_server_tcptimeouts(isc_lex_t *lex, isc_buffer_t **text) {
result = isc_task_beginexclusive(named_g_server->task);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
ns_server_settimeouts(named_g_server->sctx, initial, idle,
isc_nm_tcp_settimeouts(named_g_nm, initial, idle,
keepalive, advertised);
isc_task_endexclusive(named_g_server->task);

View File

@@ -22,6 +22,7 @@ options {
recursion yes;
dnssec-validation yes;
querylog yes;
prefetch 3 9;
};
server 10.53.0.7 {

View File

@@ -452,7 +452,7 @@ n=`expr $n + 1`
echo_i "check prefetch (${n})"
ret=0
$DIG $DIGOPTS @10.53.0.5 fetch.tld txt > dig.out.1.${n} || ret=1
ttl1=`awk '/"A" "short" "ttl"/ { print $2 - 2 }' dig.out.1.${n}`
ttl1=`awk '/"A" "short" "ttl"/ { print $2 - 3 }' dig.out.1.${n}`
# sleep so we are in prefetch range
sleep ${ttl1:-0}
# trigger prefetch
@@ -470,7 +470,7 @@ n=`expr $n + 1`
echo_i "check prefetch of validated DS's RRSIG TTL is updated (${n})"
ret=0
$DIG $DIGOPTS +dnssec @10.53.0.5 ds.example.net ds > dig.out.1.${n} || ret=1
dsttl1=`awk '$4 == "DS" && $7 == "2" { print $2 - 2 }' dig.out.1.${n}`
dsttl1=`awk '$4 == "DS" && $7 == "2" { print $2 - 3 }' dig.out.1.${n}`
# sleep so we are in prefetch range
sleep ${dsttl1:-0}
# trigger prefetch
@@ -517,7 +517,7 @@ n=`expr $n + 1`
echo_i "check prefetch qtype * (${n})"
ret=0
$DIG $DIGOPTS @10.53.0.5 fetchall.tld any > dig.out.1.${n} || ret=1
ttl1=`awk '/"A" "short" "ttl"/ { print $2 - 2 }' dig.out.1.${n}`
ttl1=`awk '/"A" "short" "ttl"/ { print $2 - 3 }' dig.out.1.${n}`
# sleep so we are in prefetch range
sleep ${ttl1:-0}
# trigger prefetch

View File

@@ -109,7 +109,7 @@ foreach my $name(@ans) {
stop_signal($name, "TERM", 1);
}
@ans = wait_for_servers(60, @ans);
@ans = wait_for_servers(1200, @ans);
# Pass 3: SIGABRT
foreach my $name (@ns) {

View File

@@ -115,7 +115,7 @@ n=$((n + 1))
echo_i "TCP high-water: check initial statistics ($n)"
ret=0
refresh_tcp_stats
assert_int_equal "${TCP_CUR}" 1 "current TCP clients count" || ret=1
assert_int_equal "${TCP_CUR}" 0 "current TCP clients count" || ret=1
if [ $ret != 0 ]; then echo_i "failed"; fi
status=$((status + ret))
@@ -166,12 +166,9 @@ check_stats_limit() {
assert_int_equal "${TCP_HIGH}" "${TCP_LIMIT}" "TCP high-water value" || return 1
}
retry 2 check_stats_limit || ret=1
close_connections $((TCP_LIMIT + 1)) || :
#close_connections $((TCP_LIMIT + 1)) || :
if [ $ret != 0 ]; then echo_i "failed"; fi
status=$((status + ret))
# wait for connections to close
sleep 5
echo_i "exit status: $status"
[ $status -eq 0 ] || exit 1

View File

@@ -177,7 +177,8 @@ LIBISC_EXTERNAL_DATA extern isc_logmodule_t isc_modules[];
#define ISC_LOGMODULE_INTERFACE (&isc_modules[2])
#define ISC_LOGMODULE_TIMER (&isc_modules[3])
#define ISC_LOGMODULE_FILE (&isc_modules[4])
#define ISC_LOGMODULE_OTHER (&isc_modules[5])
#define ISC_LOGMODULE_NETMGR (&isc_modules[5])
#define ISC_LOGMODULE_OTHER (&isc_modules[6])
ISC_LANG_BEGINDECLS

View File

@@ -46,6 +46,13 @@ isc_nm_destroy(isc_nm_t **mgr0);
* for all other references to be gone.
*/
void
isc_nm_shutdown(isc_nm_t *mgr);
/*%<
* Shuts down all active connections, freeing all associated
* resources.
*/
/* Return thread id of current thread, or ISC_NETMGR_TID_UNKNOWN */
int
isc_nm_tid(void);
@@ -113,8 +120,20 @@ isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
isc_sockaddr_t
isc_nmhandle_peeraddr(isc_nmhandle_t *handle);
/*%<
* Return the peer address for the given handle.
*/
isc_sockaddr_t
isc_nmhandle_localaddr(isc_nmhandle_t *handle);
/*%<
* Return the local address for the given handle.
*/
isc_nm_t *
isc_nmhandle_netmgr(isc_nmhandle_t *handle);
/*%<
* Return a pointer to the netmgr object for the given handle.
*/
typedef void (*isc_nm_recv_cb_t)(isc_nmhandle_t *handle, isc_region_t *region,
void *cbarg);
@@ -211,8 +230,9 @@ isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region,
isc_result_t
isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_cb_t cb, void *cbarg,
size_t extrahandlesize, isc_quota_t *quota,
isc_nmsocket_t **rv);
size_t extrahandlesize, int backlog,
isc_quota_t *quota,
isc_nmsocket_t **sockp);
/*%<
* Start listening for raw messages over the TCP interface 'iface', using
* net manager 'mgr'.
@@ -230,8 +250,8 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
* quota. This allows us to enforce TCP client quota limits.
*
* NOTE: This is currently only called inside isc_nm_listentcpdns(), which
* creates a 'wrapper' socket that sends and receives DNS messages -
* prepended with a two-byte length field - and handles buffering.
* creates a 'wrapper' socket that sends and receives DNS messages
* prepended with a two-byte length field, and handles buffering.
*/
void
@@ -243,15 +263,18 @@ isc_nm_tcp_stoplistening(isc_nmsocket_t *sock);
isc_result_t
isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_recv_cb_t cb, void *arg,
size_t extrahandlesize, isc_quota_t *quota,
size_t extrahandlesize, int backlog,
isc_quota_t *quota,
isc_nmsocket_t **sockp);
/*%<
* Start listening for DNS messages over the TCP interface 'iface', using
* net manager 'mgr'.
*
* On success, 'sockp' will be updated to contain a new listening TCPDNS
* socket. This is a wrapper around a TCP socket, and handles DNS length
* processing.
* socket. This is a wrapper around a raw TCP socket, which sends and
* receives DNS messages via that socket. It handles message buffering
* and pipelining, and automatically prepends messages with a two-byte
* length field.
*
* When a complete DNS message is received on the socket, 'cb' will be
* called with 'cbarg' as its argument.
@@ -259,6 +282,8 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface,
* When handles are allocated for the socket, 'extrasize' additional bytes
* will be allocated along with the handle for an associated object
* (typically ns_client).
*
* 'quota' is passed to isc_nm_listentcp() when opening the raw TCP socket.
*/
void
@@ -270,25 +295,60 @@ isc_nm_tcpdns_stoplistening(isc_nmsocket_t *sock);
void
isc_nm_tcpdns_sequential(isc_nmhandle_t *handle);
/*%<
* Disable pipelining on this connection. Each DNS packet
* will be only processed after the previous completes.
* Disable pipelining on this connection. Each DNS packet will be only
* processed after the previous completes.
*
* The socket must be unpaused after the query is processed.
* This is done the response is sent, or if we're dropping the
* query, it will be done when a handle is fully dereferenced
* by calling the socket's closehandle_cb callback.
* The socket must be unpaused after the query is processed. This is done
* the response is sent, or if we're dropping the query, it will be done
* when a handle is fully dereferenced by calling the socket's
* closehandle_cb callback.
*
* Note: This can only be run while a message is being processed;
* if it is run before any messages are read, no messages will
* be read.
* Note: This can only be run while a message is being processed; if it is
* run before any messages are read, no messages will be read.
*
* Also note: once this has been set, it cannot be reversed for a
* given connection.
* Also note: once this has been set, it cannot be reversed for a given
* connection.
*/
void
isc_nm_tcpdns_keepalive(isc_nmhandle_t *handle);
/*%<
* Enable keepalive on this connection.
*
* When keepalive is active, we switch to using the keepalive timeout
* to determine when to close a connection, rather than the idle timeout.
*/
void
isc_nm_tcp_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle,
uint32_t keepalive, uint32_t advertised);
/*%<
* Sets the initial, idle, and keepalive timeout values to use for
* TCP connections, and the timeout value to advertise in responses using
* the EDNS TCP Keepalive option (which should ordinarily be the same
* as 'keepalive'), in tenths of seconds.
*
* Requires:
* \li 'mgr' is a valid netmgr.
*/
void
isc_nm_tcp_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
uint32_t *keepalive, uint32_t *advertised);
/*%<
* Gets the initial, idle, keepalive, or advertised timeout values,
* in tenths of seconds.
*
* Any integer pointer parameter not set to NULL will be updated to
* contain the corresponding timeout value.
*
* Requires:
* \li 'mgr' is a valid netmgr.
*/
void
isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp);
/*%<
* Simulate a broken firewall that blocks UDP messages larger
* than a given size.
* Simulate a broken firewall that blocks UDP messages larger than a given
* size.
*/

View File

@@ -192,6 +192,7 @@ LIBISC_EXTERNAL_DATA isc_logmodule_t isc_modules[] = {
{ "interface", 0 },
{ "timer", 0 },
{ "file", 0 },
{ "netmgr", 0 },
{ "other", 0 },
{ NULL, 0 }
};

View File

@@ -41,7 +41,6 @@ typedef struct isc__networker {
uv_async_t async; /* async channel to send
* data to this networker */
isc_mutex_t lock;
isc_mempool_t *mpool_bufs;
isc_condition_t cond;
bool paused;
bool finished;
@@ -116,12 +115,10 @@ typedef enum isc__netievent_type {
netievent_tcplisten,
netievent_tcpstoplisten,
netievent_tcpclose,
netievent_closecb,
netievent_shutdown,
} isc__netievent_type;
typedef struct isc__netievent_stop {
isc__netievent_type type;
} isc__netievent_stop_t;
/*
* We have to split it because we can read and write on a socket
* simultaneously.
@@ -186,6 +183,7 @@ typedef isc__netievent__socket_t isc__netievent_tcpclose_t;
typedef isc__netievent__socket_t isc__netievent_startread_t;
typedef isc__netievent__socket_t isc__netievent_pauseread_t;
typedef isc__netievent__socket_t isc__netievent_resumeread_t;
typedef isc__netievent__socket_t isc__netievent_closecb_t;
typedef struct isc__netievent__socket_req {
isc__netievent_type type;
@@ -208,10 +206,13 @@ typedef struct isc__netievent {
isc__netievent_type type;
} isc__netievent_t;
typedef isc__netievent_t isc__netievent_shutdown_t;
typedef isc__netievent_t isc__netievent_stop_t;
typedef union {
isc__netievent_t ni;
isc__netievent_stop_t nis;
isc__netievent_udplisten_t niul;
isc__netievent__socket_t nis;
isc__netievent__socket_req_t nisr;
isc__netievent_udpsend_t nius;
} isc__netievent_storage_t;
@@ -229,11 +230,20 @@ struct isc_nm {
isc_mutex_t lock;
isc_condition_t wkstatecond;
isc__networker_t *workers;
isc_mempool_t *reqpool;
isc_mutex_t reqlock;
isc_mempool_t *evpool;
isc_mutex_t evlock;
atomic_uint_fast32_t workers_running;
atomic_uint_fast32_t workers_paused;
atomic_uint_fast32_t maxudp;
atomic_bool paused;
atomic_bool shutdown;
/*
* A worker is actively waiting for other workers, for example to
* stop listening; that means no other thread can do the same thing
@@ -241,6 +251,18 @@ struct isc_nm {
* event or wait for the other one to finish if we want to pause.
*/
atomic_bool interlocked;
/*
* Timeout values for TCP connections, coresponding to
* tcp-intiial-timeout, tcp-idle-timeout, tcp-keepalive-timeout,
* and tcp-advertised-timeout. Note that these are stored in
* milliseconds so they can be used directly with the libuv timer,
* but they are configured in tenths of seconds.
*/
uint32_t init;
uint32_t idle;
uint32_t keepalive;
uint32_t advertised;
};
typedef enum isc_nmsocket_type {
@@ -267,7 +289,11 @@ struct isc_nmsocket {
isc_nm_t *mgr;
isc_nmsocket_t *parent;
isc_quota_t *quota;
isc_quota_t *gquota; /* non-attached quota for listening */
bool overquota;
uv_timer_t timer;
bool timer_initialized;
uint64_t read_timeout;
/*% outer socket is for 'wrapped' sockets - e.g. tcpdns in tcp */
isc_nmsocket_t *outer;
@@ -284,6 +310,9 @@ struct isc_nmsocket {
/*% extra data allocated at the end of each isc_nmhandle_t */
size_t extrahandlesize;
/*% TCP backlog */
int backlog;
/*% libuv data */
uv_os_sock_t fd;
union uv_any_handle uv_handle;
@@ -334,6 +363,12 @@ struct isc_nmsocket {
*/
atomic_bool readpaused;
/*%
* A TCP or TCPDNS socket has been set to use the keepalive
* timeout instead of the default idle timeout.
*/
atomic_bool keepalive;
/*%
* 'spare' handles for that can be reused to avoid allocations,
* for UDP.
@@ -366,7 +401,7 @@ struct isc_nmsocket {
* might want to change it to something lockless in the
* future.
*/
size_t ah;
atomic_int_fast32_t ah;
size_t ah_size;
size_t *ah_frees;
isc_nmhandle_t **ah_handles;
@@ -398,6 +433,8 @@ isc__nm_get_ievent(isc_nm_t *mgr, isc__netievent_type type);
/*%<
* Allocate an ievent and set the type.
*/
void
isc__nm_put_ievent(isc_nm_t *mgr, void *ievent);
void
isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event);
@@ -471,6 +508,19 @@ isc__nmsocket_prep_destroy(isc_nmsocket_t *sock);
* if there are no remaining references or active handles.
*/
void
isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0);
/*%<
* Issue a 'handle closed' callback on the socket.
*/
void
isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ievent0);
/*%<
* Walk through all uv handles, get the underlying sockets and issue
* close on them.
*/
isc_result_t
isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region,
isc_nm_cb_t cb, void *cbarg);
@@ -503,6 +553,12 @@ isc__nm_tcp_close(isc_nmsocket_t *sock);
* Close a TCP socket.
*/
void
isc__nm_tcp_shutdown(isc_nmsocket_t *sock);
/*%<
* Called on shutdown to close and clean up a listening TCP socket.
*/
void
isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0);
void

View File

@@ -94,6 +94,32 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
atomic_init(&mgr->paused, false);
atomic_init(&mgr->interlocked, false);
/*
* Default TCP timeout values.
* May be updated by isc_nm_tcptimeouts().
*/
mgr->init = 30000;
mgr->idle = 30000;
mgr->keepalive = 30000;
mgr->advertised = 30000;
isc_mutex_init(&mgr->reqlock);
isc_mempool_create(mgr->mctx, sizeof(isc__nm_uvreq_t), &mgr->reqpool);
isc_mempool_setname(mgr->reqpool, "nm_reqpool");
isc_mempool_setmaxalloc(mgr->reqpool, 32768);
isc_mempool_setfreemax(mgr->reqpool, 32768);
isc_mempool_associatelock(mgr->reqpool, &mgr->reqlock);
isc_mempool_setfillcount(mgr->reqpool, 32);
isc_mutex_init(&mgr->evlock);
isc_mempool_create(mgr->mctx, sizeof(isc__netievent_storage_t),
&mgr->evpool);
isc_mempool_setname(mgr->evpool, "nm_evpool");
isc_mempool_setmaxalloc(mgr->evpool, 32768);
isc_mempool_setfreemax(mgr->evpool, 32768);
isc_mempool_associatelock(mgr->evpool, &mgr->evlock);
isc_mempool_setfillcount(mgr->evpool, 32);
mgr->workers = isc_mem_get(mctx, workers * sizeof(isc__networker_t));
for (size_t i = 0; i < workers; i++) {
int r;
@@ -114,7 +140,6 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
isc_mutex_init(&worker->lock);
isc_condition_init(&worker->cond);
isc_mempool_create(mgr->mctx, 65536, &worker->mpool_bufs);
worker->ievents = isc_queue_new(mgr->mctx, 128);
/*
@@ -168,17 +193,22 @@ nm_destroy(isc_nm_t **mgr0) {
while ((ievent = (isc__netievent_t *)
isc_queue_dequeue(mgr->workers[i].ievents)) != NULL)
{
isc_mem_put(mgr->mctx, ievent,
sizeof(isc__netievent_storage_t));
isc_mempool_put(mgr->evpool, ievent);
}
int r = uv_loop_close(&mgr->workers[i].loop);
INSIST(r == 0);
isc_queue_destroy(mgr->workers[i].ievents);
isc_mempool_destroy(&mgr->workers[i].mpool_bufs);
}
isc_condition_destroy(&mgr->wkstatecond);
isc_mutex_destroy(&mgr->lock);
isc_mempool_destroy(&mgr->evpool);
isc_mutex_destroy(&mgr->evlock);
isc_mempool_destroy(&mgr->reqpool);
isc_mutex_destroy(&mgr->reqlock);
isc_mem_put(mgr->mctx, mgr->workers,
mgr->nworkers * sizeof(isc__networker_t));
isc_mem_putanddetach(&mgr->mctx, mgr, sizeof(*mgr));
@@ -267,6 +297,18 @@ isc_nm_detach(isc_nm_t **mgr0) {
}
}
void
isc_nm_shutdown(isc_nm_t *mgr) {
REQUIRE(VALID_NM(mgr));
atomic_store(&mgr->shutdown, true);
for (size_t i = 0; i < mgr->nworkers; i++) {
isc__netievent_t *event = NULL;
event = isc__nm_get_ievent(mgr, netievent_shutdown);
isc__nm_enqueue_ievent(&mgr->workers[i], event);
}
}
void
isc_nm_destroy(isc_nm_t **mgr0) {
@@ -279,6 +321,8 @@ isc_nm_destroy(isc_nm_t **mgr0) {
mgr = *mgr0;
*mgr0 = NULL;
isc_nm_shutdown(mgr);
/*
* Wait for the manager to be dereferenced elsehwere.
*/
@@ -303,6 +347,41 @@ isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp) {
atomic_store(&mgr->maxudp, maxudp);
}
void
isc_nm_tcp_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle,
uint32_t keepalive, uint32_t advertised)
{
REQUIRE(VALID_NM(mgr));
mgr->init = init * 100;
mgr->idle = idle * 100;
mgr->keepalive = keepalive * 100;
mgr->advertised = advertised * 100;
}
void
isc_nm_tcp_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
uint32_t *keepalive, uint32_t *advertised)
{
REQUIRE(VALID_NM(mgr));
if (initial != NULL) {
*initial = mgr->init / 100;
}
if (idle != NULL) {
*idle = mgr->idle / 100;
}
if (keepalive != NULL) {
*keepalive = mgr->keepalive / 100;
}
if (advertised != NULL) {
*advertised = mgr->advertised / 100;
}
}
/*
* nm_thread is a single worker thread, that runs uv_run event loop
* until asked to stop.
@@ -369,7 +448,7 @@ nm_thread(void *worker0) {
* XXX: uv_run() in UV_RUN_DEFAULT mode returns
* zero if there are still active uv_handles.
* This shouldn't happen, but if it does, we just
* to keep checking until they're done. We nap for a
* keep checking until they're done. We nap for a
* tenth of a second on each loop so as not to burn
* CPU. (We do a conditional wait instead, but it
* seems like overkill for this case.)
@@ -396,29 +475,23 @@ nm_thread(void *worker0) {
}
/*
* async_cb is an universal callback for 'async' events sent to event loop.
* It's the only way to safely pass data to libuv event loop. We use a single
* async event and a lockless queue of 'isc__netievent_t' structures passed
* from other threads.
* async_cb is a universal callback for 'async' events sent to event loop.
* It's the only way to safely pass data to the libuv event loop. We use a
* single async event and a lockless queue of 'isc__netievent_t' structures
* passed from other threads.
*/
static void
async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *) handle->loop->data;
isc__netievent_t *ievent;
/*
* We only try dequeue to not waste time, libuv guarantees
* that if someone calls uv_async_send -after- async_cb was called
* then async_cb will be called again, we won't loose any signals.
*/
while ((ievent = (isc__netievent_t *)
isc_queue_dequeue(worker->ievents)) != NULL)
{
switch (ievent->type) {
case netievent_stop:
uv_stop(handle->loop);
isc_mem_put(worker->mgr->mctx, ievent,
sizeof(isc__netievent_storage_t));
isc_mempool_put(worker->mgr->evpool, ievent);
return;
case netievent_udplisten:
isc__nm_async_udplisten(worker, ievent);
@@ -450,27 +523,36 @@ async_cb(uv_async_t *handle) {
case netievent_tcpclose:
isc__nm_async_tcpclose(worker, ievent);
break;
case netievent_closecb:
isc__nm_async_closecb(worker, ievent);
break;
case netievent_shutdown:
isc__nm_async_shutdown(worker, ievent);
break;
default:
INSIST(0);
ISC_UNREACHABLE();
}
isc_mem_put(worker->mgr->mctx, ievent,
sizeof(isc__netievent_storage_t));
isc__nm_put_ievent(worker->mgr, ievent);
}
}
void *
isc__nm_get_ievent(isc_nm_t *mgr, isc__netievent_type type) {
isc__netievent_storage_t *event =
isc_mem_get(mgr->mctx, sizeof(isc__netievent_storage_t));
isc__netievent_storage_t *event = isc_mempool_get(mgr->evpool);
/* XXX: Use a memory pool? */
*event = (isc__netievent_storage_t) {
.ni.type = type
};
return (event);
}
void
isc__nm_put_ievent(isc_nm_t *mgr, void *ievent) {
isc_mempool_put(mgr->evpool, ievent);
}
void
isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
isc_queue_enqueue(worker->ievents, (uintptr_t)event);
@@ -552,10 +634,17 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) {
isc_quota_detach(&sock->quota);
}
sock->gquota = NULL;
if (sock->timer_initialized) {
uv_close((uv_handle_t *)&sock->timer, NULL);
sock->timer_initialized = false;
}
isc_astack_destroy(sock->inactivehandles);
while ((uvreq = isc_astack_pop(sock->inactivereqs)) != NULL) {
isc_mem_put(sock->mgr->mctx, uvreq, sizeof(*uvreq));
isc_mempool_put(sock->mgr->reqpool, uvreq);
}
isc_astack_destroy(sock->inactivereqs);
@@ -570,7 +659,6 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) {
} else {
isc_nm_detach(&sock->mgr);
}
}
static void
@@ -596,11 +684,11 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) {
* accept destruction.
*/
LOCK(&sock->lock);
active_handles += sock->ah;
active_handles += atomic_load(&sock->ah);
if (sock->children != NULL) {
for (int i = 0; i < sock->nchildren; i++) {
LOCK(&sock->children[i].lock);
active_handles += sock->children[i].ah;
active_handles += atomic_load(&sock->children[i].ah);
UNLOCK(&sock->children[i].lock);
}
}
@@ -700,7 +788,6 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr,
.inactivehandles = isc_astack_new(mgr->mctx, 60),
.inactivereqs = isc_astack_new(mgr->mctx, 60)
};
isc_nm_attach(mgr, &sock->mgr);
sock->uv_handle.handle.data = sock;
@@ -780,6 +867,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
isc_sockaddr_t *local)
{
isc_nmhandle_t *handle = NULL;
size_t handlenum;
int pos;
REQUIRE(VALID_NMSOCK(sock));
@@ -812,7 +900,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
LOCK(&sock->lock);
/* We need to add this handle to the list of active handles */
if (sock->ah == sock->ah_size) {
if ((size_t) atomic_load(&sock->ah) == sock->ah_size) {
sock->ah_frees =
isc_mem_reallocate(sock->mgr->mctx, sock->ah_frees,
sock->ah_size * 2 *
@@ -831,7 +919,9 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
sock->ah_size *= 2;
}
pos = sock->ah_frees[sock->ah++];
handlenum = atomic_fetch_add(&sock->ah, 1);
pos = sock->ah_frees[handlenum];
INSIST(sock->ah_handles[pos] == NULL);
sock->ah_handles[pos] = handle;
handle->ah_pos = pos;
@@ -875,63 +965,86 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
*handle = (isc_nmhandle_t) {
.magic = 0
};
isc_mem_put(sock->mgr->mctx, handle, sizeof(isc_nmhandle_t) + extra);
}
void
isc_nmhandle_unref(isc_nmhandle_t *handle) {
isc_nmsocket_t *sock = NULL;
size_t handlenum;
bool reuse = false;
int refs;
REQUIRE(VALID_NMHANDLE(handle));
refs = isc_refcount_decrement(&handle->references);
INSIST(refs > 0);
if (refs == 1) {
isc_nmsocket_t *sock = handle->sock;
bool reuse = false;
if (refs > 1) {
return;
}
handle->sock = NULL;
if (handle->doreset != NULL) {
handle->doreset(handle->opaque);
}
sock = handle->sock;
handle->sock = NULL;
/*
* We do it all under lock to avoid races with socket
* destruction.
*/
LOCK(&sock->lock);
INSIST(sock->ah_handles[handle->ah_pos] == handle);
INSIST(sock->ah_size > handle->ah_pos);
INSIST(sock->ah > 0);
sock->ah_handles[handle->ah_pos] = NULL;
sock->ah_frees[--sock->ah] = handle->ah_pos;
handle->ah_pos = 0;
if (handle->doreset != NULL) {
handle->doreset(handle->opaque);
}
if (atomic_load(&sock->active)) {
reuse = isc_astack_trypush(sock->inactivehandles,
handle);
}
UNLOCK(&sock->lock);
/*
* We do all of this under lock to avoid races with socket
* destruction.
*/
LOCK(&sock->lock);
/*
* Handle is closed. If the socket has a callback
* configured for that (e.g., to perform cleanup after
* request processing), call it now.
*/
if (sock->closehandle_cb != NULL) {
INSIST(sock->ah_handles[handle->ah_pos] == handle);
INSIST(sock->ah_size > handle->ah_pos);
INSIST(atomic_load(&sock->ah) > 0);
sock->ah_handles[handle->ah_pos] = NULL;
handlenum = atomic_fetch_sub(&sock->ah, 1) - 1;
sock->ah_frees[handlenum] = handle->ah_pos;
handle->ah_pos = 0;
if (atomic_load(&sock->active)) {
reuse = isc_astack_trypush(sock->inactivehandles,
handle);
}
UNLOCK(&sock->lock);
if (!reuse) {
nmhandle_free(sock, handle);
}
/*
* The handle is closed. If the socket has a callback configured
* for that (e.g., to perform cleanup after request processing),
* call it now.
*/
if (sock->closehandle_cb != NULL) {
if (sock->tid == isc_nm_tid()) {
sock->closehandle_cb(sock);
} else {
isc__netievent_closecb_t * event =
isc__nm_get_ievent(sock->mgr,
netievent_closecb);
isc_nmsocket_attach(sock, &event->sock);
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *) event);
/*
* If we do this asynchronously then the async event
* will clean the socket, so just exit.
*/
return;
}
}
if (!reuse) {
nmhandle_free(sock, handle);
}
if (sock->ah == 0 &&
!atomic_load(&sock->active) &&
!atomic_load(&sock->destroying))
{
nmsocket_maybe_destroy(sock);
}
if (atomic_load(&sock->ah) == 0 &&
!atomic_load(&sock->active) &&
!atomic_load(&sock->destroying))
{
nmsocket_maybe_destroy(sock);
}
}
@@ -974,6 +1087,14 @@ isc_nmhandle_localaddr(isc_nmhandle_t *handle) {
return (handle->local);
}
isc_nm_t *
isc_nmhandle_netmgr(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
return (handle->sock->mgr);
}
isc__nm_uvreq_t *
isc__nm_uvreq_get(isc_nm_t *mgr, isc_nmsocket_t *sock) {
isc__nm_uvreq_t *req = NULL;
@@ -987,7 +1108,7 @@ isc__nm_uvreq_get(isc_nm_t *mgr, isc_nmsocket_t *sock) {
}
if (req == NULL) {
req = isc_mem_get(mgr->mctx, sizeof(isc__nm_uvreq_t));
req = isc_mempool_get(mgr->reqpool);
}
*req = (isc__nm_uvreq_t) {
@@ -1025,7 +1146,7 @@ isc__nm_uvreq_put(isc__nm_uvreq_t **req0, isc_nmsocket_t *sock) {
if (!atomic_load(&sock->active) ||
!isc_astack_trypush(sock->inactivereqs, req))
{
isc_mem_put(sock->mgr->mctx, req, sizeof(isc__nm_uvreq_t));
isc_mempool_put(sock->mgr->reqpool, req);
}
if (handle != NULL) {
@@ -1055,6 +1176,40 @@ isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region,
}
}
void
isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0) {
isc__netievent_closecb_t *ievent =
(isc__netievent_closecb_t *) ievent0;
REQUIRE(VALID_NMSOCK(ievent->sock));
REQUIRE(ievent->sock->tid == isc_nm_tid());
REQUIRE(ievent->sock->closehandle_cb != NULL);
UNUSED(worker);
ievent->sock->closehandle_cb(ievent->sock);
isc_nmsocket_detach(&ievent->sock);
}
static void
shutdown_walk_cb(uv_handle_t *handle, void *arg) {
UNUSED(arg);
switch(handle->type) {
case UV_TCP:
isc__nm_tcp_shutdown((isc_nmsocket_t *) handle->data);
break;
default:
break;
}
}
void
isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ievent0) {
UNUSED(ievent0);
uv_walk(&worker->loop, shutdown_walk_cb, NULL);
}
bool
isc__nm_acquire_interlocked(isc_nm_t *mgr) {
LOCK(&mgr->lock);

View File

@@ -15,6 +15,7 @@
#include <isc/atomic.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/log.h>
#include <isc/magic.h>
#include <isc/mem.h>
#include <isc/netmgr.h>
@@ -129,8 +130,9 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
isc_result_t
isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_cb_t cb, void *cbarg,
size_t extrahandlesize, isc_quota_t *quota,
isc_nmsocket_t **rv)
size_t extrahandlesize, int backlog,
isc_quota_t *quota,
isc_nmsocket_t **sockp)
{
isc__netievent_tcplisten_t *ievent = NULL;
isc_nmsocket_t *nsock = NULL;
@@ -143,15 +145,13 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
nsock->rcb.accept = cb;
nsock->rcbarg = cbarg;
nsock->extrahandlesize = extrahandlesize;
nsock->backlog = backlog;
if (quota != NULL) {
/*
* We need to force it to make sure we get it attached.
* An example failure mode would be server under attack
* reconfiguring interfaces - that might cause weak attach
* to fail and leave this listening socket without limits.
* We can ignore the result.
* We don't attach to quota, just assign - to avoid
* increasing quota unnecesarily.
*/
isc_quota_force(quota, &nsock->quota);
nsock->gquota = quota;
}
nsock->tid = isc_random_uniform(mgr->nworkers);
@@ -163,7 +163,7 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
ievent->sock = nsock;
isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
(isc__netievent_t *) ievent);
*rv = nsock;
*sockp = nsock;
return (ISC_R_SUCCESS);
}
@@ -184,7 +184,7 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
}
uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0);
r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, 10,
r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog,
tcp_connection_cb);
if (r != 0) {
return;
@@ -218,9 +218,7 @@ stoplistening_cb(uv_handle_t *handle) {
SIGNAL(&sock->cond);
UNLOCK(&sock->lock);
if (sock->quota != NULL) {
isc_quota_detach(&sock->quota);
}
sock->gquota = NULL;
isc_nmsocket_detach(&sock);
}
@@ -242,25 +240,52 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker,
uv_close(&sock->uv_handle.handle, stoplistening_cb);
}
static void
readtimeout_cb(uv_timer_t *handle) {
isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid());
/*
* Socket is actively processing something, so restart the timer
* and return.
*/
if (atomic_load(&sock->processing)) {
uv_timer_start(handle, readtimeout_cb, sock->read_timeout, 0);
return;
}
/*
* Timeout; stop reading and process whatever we have.
*/
uv_read_stop(&sock->uv_handle.stream);
if (sock->quota) {
isc_quota_detach(&sock->quota);
}
sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg);
}
isc_result_t
isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
isc_nmsocket_t *sock = NULL;
isc__netievent_startread_t *ievent = NULL;
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
sock = handle->sock;
sock->rcb.recv = cb;
sock->rcbarg = cbarg; /* That's obviously broken... */
sock->rcbarg = cbarg;
ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
ievent->sock = sock;
if (sock->tid == isc_nm_tid()) {
int r = uv_read_start(&sock->uv_handle.stream,
isc__nm_alloc_cb, read_cb);
INSIST(r == 0);
isc__nm_async_startread(&sock->mgr->workers[sock->tid],
(isc__netievent_t *) ievent);
isc__nm_put_ievent(sock->mgr, ievent);
} else {
isc__netievent_startread_t *ievent =
isc__nm_get_ievent(sock->mgr,
netievent_tcpstartread);
ievent->sock = sock;
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *) ievent);
}
@@ -275,12 +300,23 @@ isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0) {
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(worker->id == isc_nm_tid());
if (sock->read_timeout != 0) {
if (!sock->timer_initialized) {
uv_timer_init(&worker->loop, &sock->timer);
sock->timer.data = sock;
sock->timer_initialized = true;
}
uv_timer_start(&sock->timer, readtimeout_cb,
sock->read_timeout, 0);
}
uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb, read_cb);
}
isc_result_t
isc_nm_pauseread(isc_nmsocket_t *sock) {
isc__netievent_pauseread_t *ievent = NULL;
REQUIRE(VALID_NMSOCK(sock));
if (atomic_load(&sock->readpaused)) {
@@ -288,15 +324,14 @@ isc_nm_pauseread(isc_nmsocket_t *sock) {
}
atomic_store(&sock->readpaused, true);
ievent = isc__nm_get_ievent(sock->mgr, netievent_tcppauseread);
ievent->sock = sock;
if (sock->tid == isc_nm_tid()) {
int r = uv_read_stop(&sock->uv_handle.stream);
INSIST(r == 0);
isc__nm_async_pauseread(&sock->mgr->workers[sock->tid],
(isc__netievent_t *) ievent);
isc__nm_put_ievent(sock->mgr, ievent);
} else {
isc__netievent_pauseread_t *ievent =
isc__nm_get_ievent(sock->mgr,
netievent_tcppauseread);
ievent->sock = sock;
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *) ievent);
}
@@ -309,15 +344,20 @@ isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ievent0) {
isc__netievent_pauseread_t *ievent =
(isc__netievent_pauseread_t *) ievent0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(worker->id == isc_nm_tid());
if (sock->timer_initialized) {
uv_timer_stop(&sock->timer);
}
uv_read_stop(&sock->uv_handle.stream);
}
isc_result_t
isc_nm_resumeread(isc_nmsocket_t *sock) {
isc__netievent_startread_t *ievent = NULL;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->rcb.recv != NULL);
@@ -327,16 +367,14 @@ isc_nm_resumeread(isc_nmsocket_t *sock) {
atomic_store(&sock->readpaused, false);
ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
ievent->sock = sock;
if (sock->tid == isc_nm_tid()) {
int r = uv_read_start(&sock->uv_handle.stream,
isc__nm_alloc_cb, read_cb);
INSIST(r == 0);
isc__nm_async_startread(&sock->mgr->workers[sock->tid],
(isc__netievent_t *) ievent);
isc__nm_put_ievent(sock->mgr, ievent);
} else {
/* It's the same as startread */
isc__netievent_startread_t *ievent =
isc__nm_get_ievent(sock->mgr,
netievent_tcpstartread);
ievent->sock = sock;
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *) ievent);
}
@@ -359,6 +397,16 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
INSIST(sock->rcb.recv != NULL);
sock->rcb.recv(sock->tcphandle, &region, sock->rcbarg);
sock->read_timeout = (atomic_load(&sock->keepalive)
? sock->mgr->keepalive
: sock->mgr->idle);
if (sock->timer_initialized && sock->read_timeout != 0) {
/* The timer will be updated */
uv_timer_start(&sock->timer, readtimeout_cb,
sock->read_timeout, 0);
}
isc__nm_free_uvbuf(sock, buf);
return;
}
@@ -390,13 +438,14 @@ accept_connection(isc_nmsocket_t *ssock) {
REQUIRE(VALID_NMSOCK(ssock));
REQUIRE(ssock->tid == isc_nm_tid());
if (!atomic_load_relaxed(&ssock->active)) {
if (!atomic_load_relaxed(&ssock->active) ||
atomic_load_relaxed(&ssock->mgr->shutdown)) {
/* We're closing, bail */
return (ISC_R_CANCELED);
}
if (ssock->quota != NULL) {
result = isc_quota_attach(ssock->quota, &quota);
if (ssock->gquota != NULL) {
result = isc_quota_attach(ssock->gquota, &quota);
if (result != ISC_R_SUCCESS) {
return (result);
}
@@ -440,6 +489,7 @@ accept_connection(isc_nmsocket_t *ssock) {
handle = isc__nmhandle_get(csock, NULL, &local);
INSIST(ssock->rcb.accept != NULL);
csock->read_timeout = ssock->mgr->init;
ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg);
isc_nmsocket_detach(&csock);
@@ -449,15 +499,19 @@ accept_connection(isc_nmsocket_t *ssock) {
static void
tcp_connection_cb(uv_stream_t *server, int status) {
isc_nmsocket_t *ssock = server->data;
isc_result_t result = accept_connection(ssock);
isc_result_t result;
UNUSED(status);
result = accept_connection(ssock);
if (result != ISC_R_SUCCESS) {
if (result == ISC_R_QUOTA || result == ISC_R_SOFTQUOTA) {
ssock->overquota = true;
}
/* TODO: Log the error. */
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
"TCP connection failed: %s",
isc_result_totext(result));
}
}
@@ -568,6 +622,16 @@ tcp_close_cb(uv_handle_t *uvhandle) {
isc__nmsocket_prep_destroy(sock);
}
static void
timer_close_cb(uv_handle_t *uvhandle) {
isc_nmsocket_t *sock = uvhandle->data;
REQUIRE(VALID_NMSOCK(sock));
isc_nmsocket_detach(&sock->server);
uv_close(&sock->uv_handle.handle, tcp_close_cb);
}
static void
tcp_close_direct(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
@@ -587,9 +651,13 @@ tcp_close_direct(isc_nmsocket_t *sock) {
}
}
}
isc_nmsocket_detach(&sock->server);
uv_close(&sock->uv_handle.handle, tcp_close_cb);
if (sock->timer_initialized) {
uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
sock->timer_initialized = false;
} else {
isc_nmsocket_detach(&sock->server);
uv_close(&sock->uv_handle.handle, tcp_close_cb);
}
}
void
@@ -621,3 +689,12 @@ isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ievent0) {
tcp_close_direct(ievent->sock);
}
void
isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL) {
sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg);
}
}

View File

@@ -47,8 +47,16 @@ dnslen(unsigned char* base) {
return ((base[0] << 8) + (base[1]));
}
/*
* Regular TCP buffer, should suffice in most cases.
*/
#define NM_REG_BUF 4096
#define NM_BIG_BUF 65536
/*
* Two full DNS packets with lengths.
* netmgr receives 64k at most so there's no risk
* of overrun.
*/
#define NM_BIG_BUF (65535+2)*2
static inline void
alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
REQUIRE(len <= NM_BIG_BUF);
@@ -66,6 +74,25 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
}
}
static void
timer_close_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data;
INSIST(VALID_NMSOCK(sock));
sock->timer_initialized = false;
atomic_store(&sock->closed, true);
isc_nmsocket_detach(&sock);
}
static void
dnstcp_readtimeout(uv_timer_t *timer) {
isc_nmsocket_t *sock = (isc_nmsocket_t *) timer->data;
REQUIRE(VALID_NMSOCK(sock));
isc_nmsocket_detach(&sock->outer);
uv_close((uv_handle_t*) &sock->timer, timer_close_cb);
}
/*
* Accept callback for TCP-DNS connection
*/
@@ -94,77 +121,71 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_nmsocket_attach(handle->sock, &dnssock->outer);
dnssock->peer = handle->sock->peer;
dnssock->iface = handle->sock->iface;
dnssock->read_timeout = handle->sock->mgr->init;
dnssock->tid = isc_nm_tid();
dnssock->closehandle_cb = resume_processing;
uv_timer_init(&dnssock->mgr->workers[isc_nm_tid()].loop,
&dnssock->timer);
dnssock->timer.data = dnssock;
dnssock->timer_initialized = true;
uv_timer_start(&dnssock->timer, dnstcp_readtimeout,
dnssock->read_timeout, 0);
isc_nm_read(handle, dnslisten_readcb, dnssock);
}
static bool
connection_limit(isc_nmsocket_t *sock) {
int ah;
/*
* Process a single packet from the incoming buffer.
*
* Return ISC_R_SUCCESS and attach 'handlep' to a handle if something
* was processed; return ISC_R_NOMORE if there isn't a full message
* to be processed.
*
* The caller will need to unreference the handle.
*/
static isc_result_t
processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) {
size_t len;
REQUIRE(sock->type == isc_nm_tcpdnssocket && sock->outer != NULL);
if (atomic_load(&sock->sequential)) {
/*
* We're already non-pipelining, so there's
* no need to check per-connection limits.
*/
return (false);
}
LOCK(&sock->lock);
ah = sock->ah;
UNLOCK(&sock->lock);
if (ah >= TCPDNS_CLIENTS_PER_CONN) {
atomic_store(&sock->overlimit, true);
isc_nm_pauseread(sock->outer);
return (true);
}
return (false);
}
/* Process all complete packets out of incoming buffer */
static void
processbuffer(isc_nmsocket_t *dnssock) {
REQUIRE(VALID_NMSOCK(dnssock));
REQUIRE(handlep != NULL && *handlep == NULL);
/* While we have a complete packet in the buffer */
while (dnssock->buf_len > 2 &&
dnslen(dnssock->buf) <= dnssock->buf_len - 2 &&
!connection_limit(dnssock))
{
/*
* If we don't even have the length yet, we can't do
* anything.
*/
if (dnssock->buf_len < 2) {
return (ISC_R_NOMORE);
}
/*
* Process the first packet from the buffer, leaving
* the rest (if any) for later.
*/
len = dnslen(dnssock->buf);
if (len <= dnssock->buf_len - 2) {
isc_nmhandle_t *dnshandle = NULL;
isc_region_t r2 = {
.base = dnssock->buf + 2,
.length = dnslen(dnssock->buf)
.length = len
};
size_t len;
dnshandle = isc__nmhandle_get(dnssock, NULL, NULL);
atomic_store(&dnssock->processing, true);
dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
/*
* If the recv callback wants to hold on to the
* handle, it needs to attach to it.
*/
isc_nmhandle_unref(dnshandle);
len = dnslen(dnssock->buf) + 2;
len += 2;
dnssock->buf_len -= len;
if (len > 0) {
memmove(dnssock->buf, dnssock->buf + len,
dnssock->buf_len);
}
/* Check here to make sure we do the processing at least once */
if (atomic_load(&dnssock->processing)) {
return;
}
*handlep = dnshandle;
return (ISC_R_SUCCESS);
}
return (ISC_R_NOMORE);
}
/*
@@ -174,8 +195,8 @@ processbuffer(isc_nmsocket_t *dnssock) {
static void
dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
isc_nmsocket_t *dnssock = (isc_nmsocket_t *) arg;
isc_sockaddr_t local;
unsigned char *base = NULL;
bool done = false;
size_t len;
REQUIRE(VALID_NMSOCK(dnssock));
@@ -183,133 +204,67 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
if (region == NULL) {
/* Connection closed */
atomic_store(&dnssock->closed, true);
isc_nmsocket_detach(&dnssock->outer);
isc_nmsocket_detach(&dnssock);
isc__nm_tcpdns_close(dnssock);
return;
}
local = isc_nmhandle_localaddr(handle);
base = region->base;
len = region->length;
/*
* We have something in the buffer, we need to glue it.
*/
if (dnssock->buf_len > 0) {
if (dnssock->buf_len == 1) {
/* Make sure we have the length */
dnssock->buf[1] = base[0];
dnssock->buf_len = 2;
base++;
len--;
}
processbuffer(dnssock);
if (dnssock->buf_len + len > dnssock->buf_size) {
alloc_dnsbuf(dnssock, dnssock->buf_len + len);
}
memmove(dnssock->buf + dnssock->buf_len, base, len);
dnssock->buf_len += len;
if (dnssock->buf_len > 0) {
size_t plen;
dnssock->read_timeout = (atomic_load(&dnssock->keepalive)
? dnssock->mgr->keepalive
: dnssock->mgr->idle);
if (dnssock->buf_len == 1) {
/* Make sure we have the length */
dnssock->buf[1] = base[0];
dnssock->buf_len = 2;
base++;
len--;
}
do {
isc_result_t result;
isc_nmhandle_t *dnshandle = NULL;
/* At this point we definitely have 2 bytes there. */
plen = ISC_MIN(len, (dnslen(dnssock->buf) + 2 -
dnssock->buf_len));
if (dnssock->buf_len + plen > NM_BIG_BUF) {
result = processbuffer(dnssock, &dnshandle);
if (result != ISC_R_SUCCESS) {
/*
* XXX: continuing to read will overrun the
* socket buffer. We may need to force the
* connection to close so the client will have
* to open a new one.
* There wasn't anything in the buffer to process.
*/
return;
}
if (dnssock->buf_len + plen > dnssock->buf_size) {
alloc_dnsbuf(dnssock, dnssock->buf_len + plen);
}
memmove(dnssock->buf + dnssock->buf_len, base, plen);
dnssock->buf_len += plen;
base += plen;
len -= plen;
/* Do we have a complete packet in the buffer? */
if (dnslen(dnssock->buf) >= dnssock->buf_len - 2 &&
!connection_limit(dnssock))
{
isc_nmhandle_t *dnshandle = NULL;
isc_region_t r2 = {
.base = dnssock->buf + 2,
.length = dnslen(dnssock->buf)
};
dnshandle = isc__nmhandle_get(dnssock, NULL, &local);
atomic_store(&dnssock->processing, true);
dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
dnssock->buf_len = 0;
/*
* If the recv callback wants to hold on to the
* handle, it needs to attach to it.
*/
isc_nmhandle_unref(dnshandle);
}
}
/*
* At this point we've processed whatever was previously in the
* socket buffer. If there are more messages to be found in what
* we've read, and if we're either pipelining or not processing
* anything else currently, then we can process those messages now.
*/
while (len >= 2 && dnslen(base) <= len - 2 &&
(!atomic_load(&dnssock->sequential) ||
!atomic_load(&dnssock->processing)) &&
!connection_limit(dnssock))
{
isc_nmhandle_t *dnshandle = NULL;
isc_region_t r2 = {
.base = base + 2,
.length = dnslen(base)
};
len -= dnslen(base) + 2;
base += dnslen(base) + 2;
dnshandle = isc__nmhandle_get(dnssock, NULL, &local);
atomic_store(&dnssock->processing, true);
dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
/*
* If the recv callback wants to hold on to the
* handle, it needs to attach to it.
* We have a packet: stop timeout timers
*/
isc_nmhandle_unref(dnshandle);
}
atomic_store(&dnssock->outer->processing, true);
uv_timer_stop(&dnssock->timer);
/*
* We have less than a full message remaining; it can be
* stored in the socket buffer for next time.
*/
if (len > 0) {
if (len > dnssock->buf_size) {
alloc_dnsbuf(dnssock, len);
if (atomic_load(&dnssock->sequential)) {
/*
* We're in sequential mode and we processed
* one packet, so we're done until the next read
* completes.
*/
isc_nm_pauseread(dnssock->outer);
done = true;
} else {
/*
* We're pipelining, so we now resume processing
* packets until the clients-per-connection limit
* is reached (as determined by the number of
* active handles on the socket). When the limit
* is reached, pause reading.
*/
if (atomic_load(&dnssock->ah) >=
TCPDNS_CLIENTS_PER_CONN)
{
isc_nm_pauseread(dnssock->outer);
done = true;
}
}
INSIST(len <= dnssock->buf_size);
memmove(dnssock->buf, base, len);
dnssock->buf_len = len;
}
isc_nmhandle_unref(dnshandle);
} while (!done);
}
/*
@@ -320,8 +275,9 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
isc_result_t
isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_recv_cb_t cb, void *cbarg,
size_t extrahandlesize, isc_quota_t *quota,
isc_nmsocket_t **rv)
size_t extrahandlesize, int backlog,
isc_quota_t *quota,
isc_nmsocket_t **sockp)
{
/* A 'wrapper' socket object with outer set to true TCP socket */
isc_nmsocket_t *dnslistensock =
@@ -338,11 +294,12 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface,
/* We set dnslistensock->outer to a true listening socket */
result = isc_nm_listentcp(mgr, iface, dnslisten_acceptcb,
dnslistensock, extrahandlesize,
dnslistensock, extrahandlesize, backlog,
quota, &dnslistensock->outer);
atomic_store(&dnslistensock->listening, true);
*rv = dnslistensock;
*sockp = dnslistensock;
return (result);
}
@@ -382,6 +339,20 @@ isc_nm_tcpdns_sequential(isc_nmhandle_t *handle) {
atomic_store(&handle->sock->sequential, true);
}
void
isc_nm_tcpdns_keepalive(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
if (handle->sock->type != isc_nm_tcpdnssocket ||
handle->sock->outer == NULL)
{
return;
}
atomic_store(&handle->sock->keepalive, true);
atomic_store(&handle->sock->outer->keepalive, true);
}
typedef struct tcpsend {
isc_mem_t *mctx;
isc_nmhandle_t *handle;
@@ -394,23 +365,64 @@ typedef struct tcpsend {
static void
resume_processing(void *arg) {
isc_nmsocket_t *sock = (isc_nmsocket_t *) arg;
isc_result_t result;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid());
if (sock->type != isc_nm_tcpdnssocket || sock->outer == NULL) {
return;
}
/*
* If we're in sequential mode or over the
* clients-per-connection limit, the sock can
* resume reading now.
*/
if (atomic_load(&sock->overlimit) || atomic_load(&sock->sequential)) {
atomic_store(&sock->overlimit, false);
atomic_store(&sock->processing, false);
isc_nm_resumeread(sock->outer);
if (atomic_load(&sock->ah) == 0) {
/* Nothing is active; sockets can timeout now */
atomic_store(&sock->outer->processing, false);
uv_timer_start(&sock->timer, dnstcp_readtimeout,
sock->read_timeout, 0);
}
/*
* For sequential sockets: Process what's in the buffer, or
* if there aren't any messages buffered, resume reading.
*/
if (atomic_load(&sock->sequential)) {
isc_nmhandle_t *handle = NULL;
result = processbuffer(sock, &handle);
if (result == ISC_R_SUCCESS) {
atomic_store(&sock->outer->processing, true);
uv_timer_stop(&sock->timer);
isc_nmhandle_unref(handle);
} else if (sock->outer != NULL) {
isc_nm_resumeread(sock->outer);
}
return;
}
/*
* For pipelined sockets: If we're under the clients-per-connection
* limit, resume processing until we reach the limit again.
*/
do {
isc_nmhandle_t *dnshandle = NULL;
result = processbuffer(sock, &dnshandle);
if (result != ISC_R_SUCCESS) {
/*
* Nothing in the buffer; resume reading.
*/
if (sock->outer != NULL) {
isc_nm_resumeread(sock->outer);
}
break;
}
uv_timer_stop(&sock->timer);
atomic_store(&sock->outer->processing, true);
isc_nmhandle_unref(dnshandle);
} while (atomic_load(&sock->ah) < TCPDNS_CLIENTS_PER_CONN);
}
static void
@@ -422,19 +434,6 @@ tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
ts->cb(ts->orighandle, result, ts->cbarg);
isc_mem_put(ts->mctx, ts->region.base, ts->region.length);
/*
* The response was sent; if we're in sequential or overlimit
* mode, resume processing now.
*/
if (atomic_load(&ts->orighandle->sock->sequential) ||
atomic_load(&ts->orighandle->sock->overlimit))
{
atomic_store(&ts->orighandle->sock->processing, false);
atomic_store(&ts->orighandle->sock->overlimit, false);
processbuffer(ts->orighandle->sock);
isc_nm_resumeread(handle->sock);
}
isc_nmhandle_unref(ts->orighandle);
isc_mem_putanddetach(&ts->mctx, ts, sizeof(*ts));
}
@@ -483,12 +482,11 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
return (isc__nm_tcp_send(t->handle, &t->region, tcpdnssend_cb, t));
}
void
isc__nm_tcpdns_close(isc_nmsocket_t *sock) {
if (sock->outer != NULL) {
isc_nmsocket_detach(&sock->outer);
}
atomic_store(&sock->closed, true);
isc__nmsocket_prep_destroy(sock);
uv_close((uv_handle_t*) &sock->timer, timer_close_cb);
}

View File

@@ -438,6 +438,7 @@ isc_netscope_pton
isc_nmhandle_getdata
isc_nmhandle_getextra
isc_nmhandle_is_stream
isc_nmhandle_netmgr
isc_nmhandle_localaddr
isc_nmhandle_peeraddr
isc_nmhandle_ref
@@ -449,15 +450,50 @@ isc_nm_listentcpdns
isc_nm_listenudp
isc_nm_maxudp
isc_nm_send
isc_nm_shutdown
isc_nm_start
isc_nm_tcp_gettimeouts
isc_nm_tcp_settimeouts
isc_nmsocket_detach
isc_nm_tcpdns_keepalive
isc_nm_tcpdns_sequential
isc_nm_tcpdns_stoplistening
isc_nm_tid
isc_nm_udp_stoplistening
isc__nm_acquire_interlocked
isc__nm_drop_interlocked
isc__nm_acquire_interlocked_force
isc__nm_alloc_cb
isc__nm_async_closecb
isc__nm_async_pauseread
isc__nm_async_resumeread
isc__nm_async_shutdown
isc__nm_async_startread
isc__nm_async_tcpclose
isc__nm_async_tcpconnect
isc__nm_async_tcplisten
isc__nm_async_tcpsend
isc__nm_async_tcpstoplisten
isc__nm_async_udplisten
isc__nm_async_udpsend
isc__nm_async_udpstoplisten
isc__nm_drop_interlocked
isc__nm_enqueue_ievent
isc__nm_free_uvbuf
isc__nm_get_ievent
isc__nmhandle_get
isc__nm_in_netthread
isc__nm_put_ievent
isc__nmsocket_init
isc__nmsocket_prep_destroy
isc__nm_tcp_close
isc__nm_tcpdns_close
isc__nm_tcpdns_send
isc__nm_tcp_send
isc__nm_tcp_shutdown
isc__nm_udp_send
isc___nm_uverr2result
isc__nm_uvreq_get
isc__nm_uvreq_put
isc_nonce_buf
isc_ntpaths_get
isc_ntpaths_init

View File

@@ -1023,12 +1023,14 @@ ns_client_addopt(ns_client_t *client, dns_message_t *message,
}
if (TCP_CLIENT(client) && USEKEEPALIVE(client)) {
isc_buffer_t buf;
uint32_t adv;
INSIST(count < DNS_EDNSOPTIONS);
isc_nm_tcp_gettimeouts(isc_nmhandle_netmgr(client->handle),
NULL, NULL, NULL, &adv);
isc_buffer_init(&buf, advtimo, sizeof(advtimo));
isc_buffer_putuint16(&buf,
(uint16_t) client->sctx->advertisedtimo);
isc_buffer_putuint16(&buf, (uint16_t) adv);
ednsopts[count].code = DNS_OPT_TCP_KEEPALIVE;
ednsopts[count].length = 2;
ednsopts[count].value = advtimo;
@@ -2191,7 +2193,7 @@ get_clientmctx(ns_clientmgr_t *manager, isc_mem_t **mctxp) {
#if CLIENT_NMCTXS > 0
LOCK(&manager->lock);
if (isc_nm_tid()>=0) {
if (isc_nm_tid() >= 0) {
nextmctx = isc_nm_tid();
} else {
nextmctx = manager->nextmctx++;

View File

@@ -92,11 +92,6 @@ struct ns_server {
uint32_t options;
unsigned int delay;
unsigned int initialtimo;
unsigned int idletimo;
unsigned int keepalivetimo;
unsigned int advertisedtimo;
dns_acl_t *blackholeacl;
dns_acl_t *keepresporder;
uint16_t udpsize;
@@ -174,21 +169,6 @@ ns_server_setserverid(ns_server_t *sctx, const char *serverid);
*\li 'sctx' is valid.
*/
void
ns_server_settimeouts(ns_server_t *sctx, unsigned int initial,
unsigned int idle, unsigned int keepalive,
unsigned int advertised);
void
ns_server_gettimeouts(ns_server_t *sctx, unsigned int *initial,
unsigned int *idle, unsigned int *keepalive,
unsigned int *advertised);
/*%<
* Set/get tcp-timeout values.
*
* Requires:
*\li 'sctx' is valid.
*/
void
ns_server_setoption(ns_server_t *sctx, unsigned int option,
bool value);

View File

@@ -457,11 +457,11 @@ static isc_result_t
ns_interface_listentcp(ns_interface_t *ifp) {
isc_result_t result;
/* Reserve space for an ns_client_t with the netmgr handle */
result = isc_nm_listentcpdns(ifp->mgr->nm,
(isc_nmiface_t *) &ifp->addr,
ns__client_request, ifp,
sizeof(ns_client_t),
ifp->mgr->backlog,
&ifp->mgr->sctx->tcpquota,
&ifp->tcplistensocket);
if (result != ISC_R_SUCCESS) {

View File

@@ -87,11 +87,6 @@ ns_server_create(isc_mem_t *mctx, ns_matchview_t matchingview,
CHECKFATAL(isc_stats_create(mctx, &sctx->tcpoutstats6,
dns_sizecounter_out_max));
sctx->initialtimo = 300;
sctx->idletimo = 300;
sctx->keepalivetimo = 300;
sctx->advertisedtimo = 300;
sctx->udpsize = 4096;
sctx->transfer_tcp_message_size = 20480;
@@ -216,34 +211,6 @@ ns_server_setserverid(ns_server_t *sctx, const char *serverid) {
return (ISC_R_SUCCESS);
}
void
ns_server_settimeouts(ns_server_t *sctx, unsigned int initial,
unsigned int idle, unsigned int keepalive,
unsigned int advertised)
{
REQUIRE(SCTX_VALID(sctx));
sctx->initialtimo = initial;
sctx->idletimo = idle;
sctx->keepalivetimo = keepalive;
sctx->advertisedtimo = advertised;
}
void
ns_server_gettimeouts(ns_server_t *sctx, unsigned int *initial,
unsigned int *idle, unsigned int *keepalive,
unsigned int *advertised)
{
REQUIRE(SCTX_VALID(sctx));
REQUIRE(initial != NULL && idle != NULL &&
keepalive != NULL && advertised != NULL);
*initial = sctx->initialtimo;
*idle = sctx->idletimo;
*keepalive = sctx->keepalivetimo;
*advertised = sctx->advertisedtimo;
}
void
ns_server_setoption(ns_server_t *sctx, unsigned int option,
bool value)

View File

@@ -87,10 +87,8 @@ ns_server_attach
ns_server_create
ns_server_detach
ns_server_getoption
ns_server_gettimeouts
ns_server_setoption
ns_server_setserverid
ns_server_settimeouts
ns_sortlist_addrorder1
ns_sortlist_addrorder2
ns_sortlist_byaddrsetup