Compare commits

...

5 Commits

Author SHA1 Message Date
Ondřej Surý
1bd60000c8 fixup! fixup! fixup! fixup! Use uv_udp_try_send2 when available 2025-01-20 21:43:55 +01:00
Ondřej Surý
c0c867ab80 fixup! fixup! fixup! Use uv_udp_try_send2 when available 2025-01-20 21:17:44 +01:00
Ondřej Surý
b0257107ec fixup! fixup! Use uv_udp_try_send2 when available 2025-01-20 19:58:09 +01:00
Ondřej Surý
8ffe297b17 fixup! Use uv_udp_try_send2 when available 2025-01-20 19:36:32 +01:00
Ondřej Surý
19fe94b2af Use uv_udp_try_send2 when available 2025-01-20 11:58:38 +01:00
3 changed files with 171 additions and 53 deletions

View File

@@ -684,6 +684,21 @@ struct isc_nmsocket {
size_t active_handles_cur;
size_t active_handles_max;
/*%
* 'pending' UDP sends
*/
struct {
size_t count;
isc_nmhandle_t *handles[20];
uv_buf_t *bufs[20];
uv_buf_t bufs_s[20];
unsigned int nbufs[20];
struct sockaddr *addrs[20];
isc_nm_cb_t cbs[20];
void *cbargs[20];
uv_check_t flush;
} sends;
/*%
* Used to pass a result back from listen or connect events.
*/

View File

@@ -689,10 +689,16 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
.result = ISC_R_UNSET,
.active_handles = ISC_LIST_INITIALIZER,
.active_handles_max = ISC_NETMGR_MAX_STREAM_CLIENTS_PER_CONN,
.active_uvreqs = ISC_LIST_INITIALIZER,
.active_link = ISC_LINK_INITIALIZER,
.active = true,
};
for (size_t i = 0; i < ARRAY_SIZE(sock->sends.bufs); i++) {
sock->sends.bufs[i] = &sock->sends.bufs_s[i];
sock->sends.nbufs[i] = 1;
}
if (iface != NULL) {
family = iface->type.sa.sa_family;
sock->iface = *iface;

View File

@@ -110,6 +110,9 @@ start_udp_child_job(void *arg) {
(void)isc__nm_socket_min_mtu(sock->fd, sa_family);
(void)uv_check_init(&loop->loop, &sock->sends.flush);
uv_handle_set_data(&sock->sends.flush, sock);
#if HAVE_DECL_UV_UDP_RECVMMSG
uv_init_flags |= UV_UDP_RECVMMSG;
#endif
@@ -412,6 +415,9 @@ isc_nm_routeconnect(isc_nm_t *mgr, isc_nm_cb_t cb, void *cbarg) {
#endif /* USE_ROUTE_SOCKET */
}
static void
udp_flush_pending(uv_check_t *handle);
/*
* Asynchronous 'udpstop' call handler: stop listening on a UDP socket.
*/
@@ -422,10 +428,19 @@ stop_udp_child_job(void *arg) {
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->parent != NULL);
if (sock->sends.count > 0) {
udp_flush_pending(&sock->sends.flush);
}
sock->active = false;
/* See isc__nm_udp_close() for correct ordering */
isc__nm_udp_close(sock);
/* 0. close the check callback */
(void)uv_check_stop(&sock->sends.flush);
uv_close((uv_handle_t *)&sock->sends.flush, NULL);
REQUIRE(!sock->worker->loop->paused);
isc_barrier_wait(&sock->parent->stop_barrier);
}
@@ -654,6 +669,100 @@ can_log_udp_sends(void) {
return false;
}
static void
send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq) {
REQUIRE(sock->connected);
isc__networker_t *worker = sock->worker;
isc_result_t result;
if (uv_udp_get_send_queue_size(&sock->uv_handle.udp) >
ISC_NETMGR_UDP_SENDBUF_SIZE)
{
/*
* The kernel UDP send queue is full, try sending the
* UDP response synchronously instead of just failing.
*/
int r = uv_udp_try_send(&sock->uv_handle.udp, &uvreq->uvbuf, 1,
NULL);
if (r < 0) {
result = isc_uverr2result(r);
if (can_log_udp_sends()) {
isc__netmgr_log(
worker->netmgr, ISC_LOG_ERROR,
"Sending UDP messages failed: %s",
isc_result_totext(result));
}
isc__nm_incstats(sock, STATID_SENDFAIL);
goto fail;
}
isc__nm_sendcb(sock, uvreq, ISC_R_SUCCESS, true);
} else {
/* Send the message asynchronously */
int r = uv_udp_send(&uvreq->uv_req.udp_send,
&sock->uv_handle.udp, &uvreq->uvbuf, 1,
NULL, udp_send_cb);
if (r < 0) {
isc__nm_incstats(sock, STATID_SENDFAIL);
result = isc_uverr2result(r);
goto fail;
}
}
return;
fail:
isc__nm_failed_send_cb(sock, uvreq, result, true);
}
static void
udp_flush_pending(uv_check_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
size_t sent = 0;
isc_result_t result = ISC_R_FAILURE;
if (sock->sends.count == 0) {
return;
}
INSIST(sock->sends.bufs[0] == &sock->sends.bufs_s[0]);
int r = uv_udp_try_send2(&sock->uv_handle.udp, sock->sends.count,
sock->sends.bufs, sock->sends.nbufs,
sock->sends.addrs, 0);
if (r < 0) {
result = isc_uverr2result(r);
if (can_log_udp_sends()) {
isc__netmgr_log(sock->worker->netmgr, ISC_LOG_ERROR,
"Sending UDP messages failed: %s",
isc_result_totext(result));
}
} else {
sent = r;
for (size_t i = 0; i < sent; i++) {
sock->sends.cbs[i](sock->sends.handles[i],
ISC_R_SUCCESS,
sock->sends.cbargs[i]);
isc_nmhandle_detach(&sock->sends.handles[i]);
}
}
for (size_t i = sent; i < sock->sends.count; i++) {
sock->sends.cbs[i](sock->sends.handles[i], result,
sock->sends.cbargs[i]);
isc_nmhandle_detach(&sock->sends.handles[i]);
isc__nm_incstats(sock, STATID_SENDFAIL);
}
(void)uv_check_stop(&sock->sends.flush);
sock->sends.count = 0;
return;
}
/*
* Send the data in 'region' to a peer via a UDP socket. We try to find
* a proper sibling/child socket so that we won't have to jump to
@@ -663,13 +772,9 @@ void
isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region,
isc_nm_cb_t cb, void *cbarg) {
isc_nmsocket_t *sock = handle->sock;
const isc_sockaddr_t *peer = &handle->peer;
const struct sockaddr *sa = NULL;
isc__nm_uvreq_t *uvreq = NULL;
isc__networker_t *worker = NULL;
uint32_t maxudp;
int r;
isc_result_t result;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_udpsocket);
@@ -677,7 +782,6 @@ isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region,
worker = sock->worker;
maxudp = atomic_load(&worker->netmgr->maxudp);
sa = sock->connected ? NULL : &peer->type.sa;
/*
* We're simulating a firewall blocking UDP packets bigger than
@@ -692,62 +796,55 @@ isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region,
return;
}
uvreq = isc__nm_uvreq_get(sock);
uvreq->uvbuf.base = (char *)region->base;
uvreq->uvbuf.len = region->length;
if (sock->connected) {
uvreq = isc__nm_uvreq_get(sock);
uvreq->uvbuf.base = (char *)region->base;
uvreq->uvbuf.len = region->length;
isc_nmhandle_attach(handle, &uvreq->handle);
isc_nmhandle_attach(handle, &uvreq->handle);
uvreq->cb.send = cb;
uvreq->cbarg = cbarg;
uvreq->cb.send = cb;
uvreq->cbarg = cbarg;
if (isc__nm_closing(worker)) {
result = ISC_R_SHUTTINGDOWN;
goto fail;
}
if (isc__nmsocket_closing(sock)) {
result = ISC_R_CANCELED;
goto fail;
}
if (uv_udp_get_send_queue_size(&sock->uv_handle.udp) >
ISC_NETMGR_UDP_SENDBUF_SIZE)
{
/*
* The kernel UDP send queue is full, try sending the UDP
* response synchronously instead of just failing.
*/
r = uv_udp_try_send(&sock->uv_handle.udp, &uvreq->uvbuf, 1, sa);
if (r < 0) {
if (can_log_udp_sends()) {
isc__netmgr_log(
worker->netmgr, ISC_LOG_ERROR,
"Sending UDP messages failed: %s",
isc_result_totext(isc_uverr2result(r)));
}
isc__nm_incstats(sock, STATID_SENDFAIL);
result = isc_uverr2result(r);
goto fail;
if (isc__nm_closing(worker)) {
isc__nm_failed_send_cb(sock, uvreq, ISC_R_SHUTTINGDOWN,
true);
return;
}
RUNTIME_CHECK(r == (int)region->length);
isc__nm_sendcb(sock, uvreq, ISC_R_SUCCESS, true);
} else {
/* Send the message asynchronously */
r = uv_udp_send(&uvreq->uv_req.udp_send, &sock->uv_handle.udp,
&uvreq->uvbuf, 1, sa, udp_send_cb);
if (r < 0) {
isc__nm_incstats(sock, STATID_SENDFAIL);
result = isc_uverr2result(r);
goto fail;
if (isc__nmsocket_closing(sock)) {
isc__nm_failed_send_cb(sock, uvreq, ISC_R_CANCELED,
true);
return;
}
send_direct(sock, uvreq);
return;
}
const isc_sockaddr_t *peer = &handle->peer;
size_t i = sock->sends.count;
INSIST(i < ARRAY_SIZE(sock->sends.cbs));
isc_nmhandle_attach(handle, &sock->sends.handles[i]);
sock->sends.cbs[i] = cb;
sock->sends.cbargs[i] = cbarg;
sock->sends.bufs_s[i].base = (char *)region->base;
sock->sends.bufs_s[i].len = region->length;
sock->sends.addrs[i] = UNCONST(&peer->type.sa);
sock->sends.count++;
if (sock->sends.count == 20) {
udp_flush_pending(&sock->sends.flush);
}
if (sock->sends.count > 0) {
/* FIXME: We need to flush the udp when shutting down */
uv_check_start(&sock->sends.flush, udp_flush_pending);
}
return;
fail:
isc__nm_failed_send_cb(sock, uvreq, result, true);
}
static isc_result_t