Fix the streaming read callback shutdown logic

When shutting down TCP sockets, the read callback calling logic was
flawed, it would call either one less callback or one extra.  Fix the
logic in the way:

1. When isc_nm_read() has been called but isc_nm_read_stop() hasn't on
   the handle, the read callback will be called with ISC_R_CANCELED to
   cancel active reading from the socket/handle.

2. When isc_nm_read() has been called and isc_nm_read_stop() has been
   called on the on the handle, the read callback will be called with
   ISC_R_SHUTTINGDOWN to signal that the dormant (not-reading) socket
   is being shut down.

3. The .reading and .recv_read flags are little bit tricky.  The
   .reading flag indicates if the outer layer is reading the data (that
   would be uv_tcp_t for TCP and isc_nmsocket_t (TCP) for TLSStream),
   the .recv_read flag indicates whether somebody is interested in the
   data read from the socket.

   Usually, you would expect that the .reading should be false when
   .recv_read is false, but it gets even more tricky with TLSStream as
   the TLS protocol might need to read from the socket even when sending
   data.

   Fix the usage of the .recv_read and .reading flags in the TLSStream
   to their true meaning - which mostly consist of using .recv_read
   everywhere and then wrapping isc_nm_read() and isc_nm_read_stop()
   with the .reading flag.

4. The TLS failed read helper has been modified to resemble the TCP code
   as much as possible, clearing and re-setting the .recv_read flag in
   the TCP timeout code has been fixed and .recv_read is now cleared
   when isc_nm_read_stop() has been called on the streaming socket.

5. The use of Network Manager in the named_controlconf, isccc_ccmsg, and
   isc_httpd units have been greatly simplified due to the improved design.

6. More unit tests for TCP and TLS testing the shutdown conditions have
   been added.

Co-authored-by: Ondřej Surý <ondrej@isc.org>
Co-authored-by: Artem Boldariev <artem@isc.org>
This commit is contained in:
Ondřej Surý
2023-04-13 17:27:50 +02:00
parent 4fcbb078c1
commit 3b10814569
26 changed files with 977 additions and 806 deletions

View File

@@ -113,6 +113,7 @@ tcp_test_SOURCES = \
tcp_test.c \
netmgr_common.h \
netmgr_common.c \
stream_shutdown.c \
uv_wrap.h
tcpdns_test_CPPFLAGS = \
@@ -141,6 +142,7 @@ tls_test_SOURCES = \
tls_test.c \
netmgr_common.h \
netmgr_common.c \
stream_shutdown.c \
uv_wrap.h
tlsdns_test_CPPFLAGS = \

View File

@@ -108,24 +108,13 @@ isc_nm_recv_cb_t connect_readcb = NULL;
int
setup_netmgr_test(void **state) {
char *env_workers = getenv("ISC_TASK_WORKERS");
size_t nworkers;
tcp_connect_addr = (isc_sockaddr_t){ .length = 0 };
isc_sockaddr_fromin6(&tcp_connect_addr, &in6addr_loopback, 0);
tcp_listen_addr = (isc_sockaddr_t){ .length = 0 };
isc_sockaddr_fromin6(&tcp_listen_addr, &in6addr_loopback, stream_port);
if (env_workers != NULL) {
workers = atoi(env_workers);
} else {
workers = isc_os_ncpus();
}
INSIST(workers > 0);
nworkers = ISC_MAX(ISC_MIN(workers, 32), 1);
esends = NSENDS * nworkers;
esends = NSENDS * workers;
atomic_store(&nsends, esends);
@@ -227,7 +216,7 @@ teardown_netmgr_test(void **state ISC_ATTR_UNUSED) {
return (0);
}
static void
void
stop_listening(void *arg ISC_ATTR_UNUSED) {
isc_nm_stoplistening(listen_sock);
isc_nmsocket_close(&listen_sock);
@@ -237,24 +226,23 @@ stop_listening(void *arg ISC_ATTR_UNUSED) {
/* Callbacks */
void
noop_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult, isc_region_t *region,
void *cbarg) {
UNUSED(handle);
UNUSED(eresult);
UNUSED(region);
UNUSED(cbarg);
noop_recv_cb(isc_nmhandle_t *handle ISC_ATTR_UNUSED,
isc_result_t eresult ISC_ATTR_UNUSED,
isc_region_t *region ISC_ATTR_UNUSED,
void *cbarg ISC_ATTR_UNUSED) {
F();
}
unsigned int
noop_accept_cb(isc_nmhandle_t *handle, unsigned int result, void *cbarg) {
UNUSED(handle);
UNUSED(cbarg);
isc_result_t
noop_accept_cb(isc_nmhandle_t *handle ISC_ATTR_UNUSED, unsigned int eresult,
void *cbarg ISC_ATTR_UNUSED) {
F();
if (result == ISC_R_SUCCESS) {
if (eresult == ISC_R_SUCCESS) {
(void)atomic_fetch_add(&saccepts, 1);
}
return (0);
return (ISC_R_SUCCESS);
}
void
@@ -278,10 +266,8 @@ connect_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
case ISC_R_SHUTTINGDOWN:
case ISC_R_CANCELED:
case ISC_R_CONNECTIONRESET:
/* Send failed, we need to stop reading too */
if (stream) {
isc_nm_read_stop(handle);
} else {
/* Abort */
if (!stream) {
isc_nm_cancelread(handle);
}
break;
@@ -337,6 +323,10 @@ connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
return;
}
/* This will initiate one more read callback */
if (stream) {
isc_nmhandle_close(handle);
}
break;
case ISC_R_TIMEDOUT:
case ISC_R_EOF:
@@ -352,10 +342,6 @@ connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
}
isc_refcount_decrement(&active_creads);
if (stream) {
isc_nm_read_stop(handle);
}
isc_nmhandle_detach(&handle);
}
@@ -399,8 +385,6 @@ listen_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
F();
isc_refcount_decrement(&active_ssends);
switch (eresult) {
case ISC_R_CANCELED:
case ISC_R_CONNECTIONRESET:
@@ -418,6 +402,7 @@ listen_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
assert_int_equal(eresult, ISC_R_SUCCESS);
}
isc_refcount_decrement(&active_ssends);
isc_nmhandle_detach(&sendhandle);
}
@@ -431,11 +416,6 @@ listen_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
F();
switch (eresult) {
case ISC_R_CANCELED:
case ISC_R_CONNECTIONRESET:
case ISC_R_EOF:
case ISC_R_SHUTTINGDOWN:
break;
case ISC_R_SUCCESS:
memmove(&magic, region->base, sizeof(magic));
assert_true(magic == send_magic);
@@ -449,18 +429,21 @@ listen_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
memmove(&magic, region->base, sizeof(magic));
assert_true(magic == send_magic);
if (magic == send_magic) {
if (!noanswer) {
isc_nmhandle_t *sendhandle = NULL;
isc_nmhandle_attach(handle, &sendhandle);
isc_refcount_increment0(&active_ssends);
isc_nmhandle_setwritetimeout(sendhandle,
T_IDLE);
isc_nm_send(sendhandle, &send_msg,
listen_send_cb, cbarg);
}
return;
if (!noanswer) {
/* Answer and continue to listen */
isc_nmhandle_t *sendhandle = NULL;
isc_nmhandle_attach(handle, &sendhandle);
isc_refcount_increment0(&active_ssends);
isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
isc_nm_send(sendhandle, &send_msg, listen_send_cb,
cbarg);
}
/* Continue to listen */
return;
case ISC_R_CANCELED:
case ISC_R_CONNECTIONRESET:
case ISC_R_EOF:
case ISC_R_SHUTTINGDOWN:
break;
default:
fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
@@ -469,7 +452,6 @@ listen_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
}
isc_refcount_decrement(&active_sreads);
isc_nmhandle_detach(&handle);
}
@@ -488,6 +470,9 @@ listen_accept_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
do_saccepts_shutdown(loopmgr);
}
isc_nmhandle_attach(handle, &(isc_nmhandle_t *){ NULL });
isc_refcount_increment0(&active_sreads);
return (eresult);
}
@@ -508,6 +493,7 @@ stream_accept_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
}
isc_refcount_increment0(&active_sreads);
isc_nmhandle_attach(handle, &readhandle);
isc_nm_read(handle, listen_read_cb, readhandle);
@@ -670,11 +656,12 @@ noresponse_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
UNUSED(region);
UNUSED(cbarg);
F();
assert_true(eresult == ISC_R_CANCELED ||
eresult == ISC_R_CONNECTIONRESET || eresult == ISC_R_EOF);
isc_refcount_decrement(&active_creads);
isc_nmhandle_detach(&handle);
isc_loopmgr_shutdown(loopmgr);
@@ -685,6 +672,8 @@ noresponse_sendcb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
UNUSED(cbarg);
UNUSED(eresult);
F();
assert_non_null(handle);
atomic_fetch_add(&csends, 1);
isc_nmhandle_detach(&handle);
@@ -697,8 +686,6 @@ noresponse_connectcb(isc_nmhandle_t *handle, isc_result_t eresult,
isc_nmhandle_t *readhandle = NULL;
isc_nmhandle_t *sendhandle = NULL;
UNUSED(handle);
F();
isc_refcount_decrement(&active_cconnects);

View File

@@ -201,15 +201,15 @@ extern isc_nm_recv_cb_t connect_readcb;
fprintf(stderr, "%s:%s:%d:%s = %" PRId64 "\n", __func__, __FILE__, \
__LINE__, #v, atomic_load(&v))
#define P(v) fprintf(stderr, #v " = %" PRId64 "\n", v)
#define F() \
fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, \
#define F() \
fprintf(stderr, "%u:%s(%p, %s, %p)\n", isc_tid(), __func__, handle, \
isc_result_totext(eresult), cbarg)
#define isc_loopmgr_shutdown(loopmgr) \
{ \
fprintf(stderr, "%s:%s:%d:isc_loopmgr_shutdown(%p)\n", \
__func__, __FILE__, __LINE__, loopmgr); \
isc_loopmgr_shutdown(loopmgr); \
#define isc_loopmgr_shutdown(loopmgr) \
{ \
fprintf(stderr, "%u:%s:%s:%d:isc_loopmgr_shutdown(%p)\n", \
isc_tid(), __func__, __FILE__, __LINE__, loopmgr); \
isc_loopmgr_shutdown(loopmgr); \
}
#else
#define X(v)
@@ -234,8 +234,9 @@ void
noop_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult, isc_region_t *region,
void *cbarg);
unsigned int
noop_accept_cb(isc_nmhandle_t *handle, unsigned int result, void *cbarg);
isc_result_t
noop_accept_cb(isc_nmhandle_t *handle ISC_ATTR_UNUSED, unsigned int result,
void *cbarg ISC_ATTR_UNUSED);
void
connect_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg);
@@ -325,3 +326,20 @@ int
stream_recv_send_teardown(void **state ISC_ATTR_UNUSED);
void
stream_recv_send_connect(void *arg);
int
stream_shutdownconnect_setup(void **state ISC_ATTR_UNUSED);
void
stream_shutdownconnect(void **state ISC_ATTR_UNUSED);
int
stream_shutdownconnect_teardown(void **state ISC_ATTR_UNUSED);
int
stream_shutdownread_setup(void **state ISC_ATTR_UNUSED);
void
stream_shutdownread(void **state ISC_ATTR_UNUSED);
int
stream_shutdownread_teardown(void **state ISC_ATTR_UNUSED);
void
stop_listening(void *arg ISC_ATTR_UNUSED);

171
tests/isc/stream_shutdown.c Normal file
View File

@@ -0,0 +1,171 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* SPDX-License-Identifier: MPL-2.0
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
#include <sched.h> /* IWYU pragma: keep */
#include <setjmp.h>
#include <signal.h>
#include <stdarg.h>
#include <stdlib.h>
#include <unistd.h>
/*
* As a workaround, include an OpenSSL header file before including cmocka.h,
* because OpenSSL 3.1.0 uses __attribute__(malloc), conflicting with a
* redefined malloc in cmocka.h.
*/
#include <openssl/err.h>
#define UNIT_TESTING
#include <cmocka.h>
#include "netmgr_common.h"
#include <tests/isc.h>
/*
* FIXME: This really needs two network managers, so there's predictable result
* when shuttingdown the netmgr - right now there's a race whether the listening
* or connecting sockets gets shutdown first
*/
static void
shutdownconnect_connectcb(isc_nmhandle_t *handle, isc_result_t eresult,
void *cbarg) {
F();
assert_non_null(handle);
assert_int_equal(eresult, ISC_R_SHUTTINGDOWN);
assert_null(cbarg);
isc_refcount_decrement(&active_cconnects);
atomic_fetch_add(&cconnects, 1);
}
int
stream_shutdownconnect_setup(void **state ISC_ATTR_UNUSED) {
int r = setup_netmgr_test(state);
return (r);
}
void
stream_shutdownconnect(void **state ISC_ATTR_UNUSED) {
isc_result_t result = stream_listen(stream_accept_cb, NULL, 128, NULL,
&listen_sock);
assert_int_equal(result, ISC_R_SUCCESS);
isc_loop_teardown(mainloop, stop_listening, listen_sock);
/* Schedule the shutdown before the connect */
isc_loopmgr_shutdown(loopmgr);
stream_connect(shutdownconnect_connectcb, NULL, T_CONNECT);
}
int
stream_shutdownconnect_teardown(void **state ISC_ATTR_UNUSED) {
X(cconnects);
X(csends);
X(creads);
atomic_assert_int_eq(cconnects, 1);
atomic_assert_int_eq(csends, 0);
atomic_assert_int_eq(creads, 0);
return (teardown_netmgr_test(state));
}
/* Issue the shutdown before reading */
static void
shutdownread_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
isc_region_t *region, void *cbarg) {
F();
assert_non_null(handle);
assert_true(eresult == ISC_R_SHUTTINGDOWN ||
eresult == ISC_R_CONNECTIONRESET || eresult == ISC_R_EOF);
assert_non_null(region);
assert_null(cbarg);
atomic_fetch_add(&creads, 1);
isc_nmhandle_detach(&handle);
isc_refcount_decrement(&active_creads);
}
static void
shutdownread_sendcb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
F();
assert_non_null(handle);
assert_true(eresult == ISC_R_SUCCESS || eresult == ISC_R_SHUTTINGDOWN ||
eresult == ISC_R_CONNECTIONRESET || eresult == ISC_R_EOF);
assert_null(cbarg);
atomic_fetch_add(&csends, 1);
isc_nmhandle_detach(&handle);
isc_refcount_decrement(&active_csends);
}
static void
shutdownread_connectcb(isc_nmhandle_t *handle, isc_result_t eresult,
void *cbarg) {
F();
assert_non_null(handle);
assert_int_equal(eresult, ISC_R_SUCCESS);
assert_null(cbarg);
isc_refcount_decrement(&active_cconnects);
atomic_fetch_add(&cconnects, 1);
/* Schedule the shutdown before read and send */
isc_loopmgr_shutdown(loopmgr);
isc_refcount_increment0(&active_creads);
isc_nmhandle_ref(handle);
isc_nm_read(handle, shutdownread_readcb, cbarg);
isc_refcount_increment0(&active_csends);
isc_nmhandle_ref(handle);
isc_nm_send(handle, (isc_region_t *)&send_msg, shutdownread_sendcb,
cbarg);
}
int
stream_shutdownread_setup(void **state ISC_ATTR_UNUSED) {
int r = setup_netmgr_test(state);
return (r);
}
void
stream_shutdownread(void **state ISC_ATTR_UNUSED) {
isc_result_t result = stream_listen(stream_accept_cb, NULL, 128, NULL,
&listen_sock);
assert_int_equal(result, ISC_R_SUCCESS);
isc_loop_teardown(mainloop, stop_listening, listen_sock);
stream_connect(shutdownread_connectcb, NULL, T_CONNECT);
}
int
stream_shutdownread_teardown(void **state ISC_ATTR_UNUSED) {
X(cconnects);
X(csends);
X(creads);
atomic_assert_int_eq(cconnects, 1);
atomic_assert_int_eq(csends, 1);
atomic_assert_int_eq(creads, 1);
return (teardown_netmgr_test(state));
}

View File

@@ -57,6 +57,16 @@ ISC_LOOP_TEST_IMPL(tcp_noresponse) {
return;
}
ISC_LOOP_TEST_IMPL(tcp_shutdownconnect) {
stream_shutdownconnect(arg);
return;
}
ISC_LOOP_TEST_IMPL(tcp_shutdownread) {
stream_shutdownread(arg);
return;
}
ISC_LOOP_TEST_IMPL(tcp_timeout_recovery) {
stream_timeout_recovery(arg);
return;
@@ -114,6 +124,10 @@ ISC_TEST_LIST_START
ISC_TEST_ENTRY_CUSTOM(tcp_noop, stream_noop_setup, stream_noop_teardown)
ISC_TEST_ENTRY_CUSTOM(tcp_noresponse, stream_noresponse_setup,
stream_noresponse_teardown)
ISC_TEST_ENTRY_CUSTOM(tcp_shutdownconnect, stream_shutdownconnect_setup,
stream_shutdownconnect_teardown)
ISC_TEST_ENTRY_CUSTOM(tcp_shutdownread, stream_shutdownread_setup,
stream_shutdownread_teardown)
ISC_TEST_ENTRY_CUSTOM(tcp_timeout_recovery, stream_timeout_recovery_setup,
stream_timeout_recovery_teardown)
ISC_TEST_ENTRY_CUSTOM(tcp_recv_one, stream_recv_one_setup,

View File

@@ -47,13 +47,6 @@
/* TCPDNS */
static void
stop_listening(void *arg ISC_ATTR_UNUSED) {
isc_nm_stoplistening(listen_sock);
isc_nmsocket_close(&listen_sock);
assert_null(listen_sock);
}
static void
start_listening(uint32_t nworkers, isc_nm_accept_cb_t accept_cb,
isc_nm_recv_cb_t recv_cb) {

View File

@@ -55,6 +55,16 @@ ISC_LOOP_TEST_IMPL(tls_noresponse) {
return;
}
ISC_LOOP_TEST_IMPL(tls_shutdownconnect) {
stream_shutdownconnect(arg);
return;
}
ISC_LOOP_TEST_IMPL(tls_shutdownread) {
stream_shutdownread(arg);
return;
}
ISC_LOOP_TEST_IMPL(tls_timeout_recovery) {
stream_timeout_recovery(arg);
return;
@@ -109,6 +119,10 @@ ISC_TEST_LIST_START
ISC_TEST_ENTRY_CUSTOM(tls_noop, stream_noop_setup, stream_noop_teardown)
ISC_TEST_ENTRY_CUSTOM(tls_noresponse, stream_noresponse_setup,
stream_noresponse_teardown)
ISC_TEST_ENTRY_CUSTOM(tls_shutdownconnect, stream_shutdownconnect_setup,
stream_shutdownconnect_teardown)
ISC_TEST_ENTRY_CUSTOM(tls_shutdownread, stream_shutdownread_setup,
stream_shutdownread_teardown)
ISC_TEST_ENTRY_CUSTOM(tls_timeout_recovery, stream_timeout_recovery_setup,
stream_timeout_recovery_teardown)
ISC_TEST_ENTRY_CUSTOM(tls_recv_one, stream_recv_one_setup,

View File

@@ -46,13 +46,6 @@
#include <tests/isc.h>
static void
stop_listening(void *arg ISC_ATTR_UNUSED) {
isc_nm_stoplistening(listen_sock);
isc_nmsocket_close(&listen_sock);
assert_null(listen_sock);
}
static void
start_listening(uint32_t nworkers, isc_nm_accept_cb_t accept_cb,
isc_nm_recv_cb_t recv_cb) {

View File

@@ -134,6 +134,15 @@ mock_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult, isc_region_t *region,
UNUSED(cbarg);
}
static void
udp_listen_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
isc_region_t *region, void *cbarg) {
if (eresult != ISC_R_SUCCESS) {
isc_refcount_increment0(&active_sreads);
}
listen_read_cb(handle, eresult, region, cbarg);
}
static void
connect_nomemory_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
UNUSED(handle);
@@ -145,13 +154,6 @@ connect_nomemory_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
isc_loopmgr_shutdown(loopmgr);
}
static void
stop_listening(void *arg ISC_ATTR_UNUSED) {
isc_nm_stoplistening(listen_sock);
isc_nmsocket_close(&listen_sock);
assert_null(listen_sock);
}
static void
start_listening(uint32_t nworkers, isc_nm_recv_cb_t cb) {
isc_result_t result = isc_nm_listenudp(
@@ -616,6 +618,7 @@ udp_shutdown_read_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
isc_refcount_increment0(&active_creads);
isc_nmhandle_attach(handle, &readhandle);
isc_nm_read(handle, udp_shutdown_read_read_cb, cbarg);
assert_true(handle->sock->reading);
/* Send */
isc_refcount_increment0(&active_csends);
@@ -712,7 +715,7 @@ udp_cancel_read_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
udp_cancel_read_send_cb, cbarg);
}
break;
case ISC_R_EOF:
case ISC_R_CANCELED:
/* The read has been canceled */
atomic_fetch_add(&creads, 1);
isc_loopmgr_shutdown(loopmgr);
@@ -936,7 +939,7 @@ ISC_TEARDOWN_TEST_IMPL(udp_recv_one) {
}
ISC_LOOP_TEST_IMPL(udp_recv_one) {
start_listening(ISC_NM_LISTEN_ONE, listen_read_cb);
start_listening(ISC_NM_LISTEN_ONE, udp_listen_read_cb);
udp__connect(NULL);
}
@@ -976,7 +979,7 @@ ISC_TEARDOWN_TEST_IMPL(udp_recv_two) {
}
ISC_LOOP_TEST_IMPL(udp_recv_two) {
start_listening(ISC_NM_LISTEN_ONE, listen_read_cb);
start_listening(ISC_NM_LISTEN_ONE, udp_listen_read_cb);
udp__connect(NULL);
udp__connect(NULL);
@@ -1007,7 +1010,7 @@ ISC_TEARDOWN_TEST_IMPL(udp_recv_send) {
}
ISC_LOOP_TEST_IMPL(udp_recv_send) {
start_listening(ISC_NM_LISTEN_ALL, listen_read_cb);
start_listening(ISC_NM_LISTEN_ALL, udp_listen_read_cb);
for (size_t i = 0; i < workers; i++) {
isc_async_run(isc_loop_get(loopmgr, i), udp__connect, NULL);