checkpoint: implement isc_nm_read() for UDP

isc_nm_read() on a UDP socket will now read a single datagram and
stop until next time.
This commit is contained in:
Evan Hunt
2020-07-13 21:22:01 -07:00
parent 03009e7159
commit db354895e7
4 changed files with 135 additions and 6 deletions

View File

@@ -208,12 +208,17 @@ isc_nm_pause(isc_nm_t *mgr);
void
isc_nm_resume(isc_nm_t *mgr);
/*%<
* Resume paused processing. It will return immediately
* after signalling workers to resume.
* Resume paused processing. It will return immediately after signalling
* workers to resume.
*/
isc_result_t
isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
/*
* Begin (or continue) reading on the socket associated with 'handle', and
* update its recv callback to 'cb', which will be called as soon as there
* is data to process.
*/
isc_result_t
isc_nm_pauseread(isc_nmhandle_t *handle);

View File

@@ -118,12 +118,12 @@ struct isc_nmiface {
typedef enum isc__netievent_type {
netievent_udpconnect,
netievent_udpsend,
netievent_udprecv,
netievent_udpread,
netievent_udpstop,
netievent_udpclose,
netievent_tcpconnect,
netievent_tcpsend,
netievent_tcprecv,
netievent_tcpstartread,
netievent_tcppauseread,
netievent_tcpchildaccept,
@@ -200,7 +200,9 @@ typedef struct isc__netievent__socket {
} isc__netievent__socket_t;
typedef isc__netievent__socket_t isc__netievent_udplisten_t;
typedef isc__netievent__socket_t isc__netievent_udpread_t;
typedef isc__netievent__socket_t isc__netievent_udpstop_t;
typedef isc__netievent__socket_t isc__netievent_udpclose_t;
typedef isc__netievent__socket_t isc__netievent_tcpstop_t;
typedef isc__netievent__socket_t isc__netievent_tcpclose_t;
typedef isc__netievent__socket_t isc__netievent_startread_t;
@@ -669,6 +671,18 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
* Back-end implementation of isc_nm_send() for UDP handles.
*/
isc_result_t
isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
/*
* Back-end implementation of isc_nm_read() for UDP handles.
*/
void
isc__nm_udp_close(isc_nmsocket_t *sock);
/*%<
* Close a UDP socket.
*/
void
isc__nm_udp_stoplistening(isc_nmsocket_t *sock);
@@ -681,6 +695,10 @@ void
isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_udpclose(isc__networker_t *worker, isc__netievent_t *ev0);
/*%<
* Callback handlers for asynchronous UDP events (listen, stoplisten, send).
*/
@@ -694,6 +712,9 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
isc_result_t
isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
/*
* Back-end implementation of isc_nm_read() for TCP handles.
*/
void
isc__nm_tcp_close(isc_nmsocket_t *sock);

View File

@@ -603,6 +603,12 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
case netievent_udpsend:
isc__nm_async_udpsend(worker, ievent);
break;
case netievent_udpread:
isc__nm_async_udpread(worker, ievent);
break;
case netievent_udpclose:
isc__nm_async_udpclose(worker, ievent);
break;
case netievent_tcpconnect:
isc__nm_async_tcpconnect(worker, ievent);
break;
@@ -881,6 +887,9 @@ isc__nmsocket_prep_destroy(isc_nmsocket_t *sock) {
*/
if (!atomic_load(&sock->closed)) {
switch (sock->type) {
case isc_nm_udpsocket:
isc__nm_udp_close(sock);
return;
case isc_nm_tcpsocket:
isc__nm_tcp_close(sock);
return;
@@ -1346,6 +1355,8 @@ isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
REQUIRE(VALID_NMHANDLE(handle));
switch (handle->sock->type) {
case isc_nm_udpsocket:
return (isc__nm_udp_read(handle, cb, cbarg));
case isc_nm_tcpsocket:
return (isc__nm_tcp_read(handle, cb, cbarg));
case isc_nm_tcpdnssocket:

View File

@@ -226,7 +226,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
}
static void
udp_close_cb(uv_handle_t *handle) {
udp_stop_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
atomic_store(&sock->closed, true);
@@ -239,7 +239,7 @@ stop_udp_child(isc_nmsocket_t *sock) {
REQUIRE(sock->tid == isc_nm_tid());
uv_udp_recv_stop(&sock->uv_handle.udp);
uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_close_cb);
uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_stop_cb);
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
@@ -743,3 +743,95 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmiface_t *peer,
return (result);
}
static void
udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags) {
isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
udp_recv_cb(handle, nrecv, buf, addr, flags);
uv_udp_recv_stop(&sock->uv_handle.udp);
}
/*
* handle 'udpread' async call - start or resume reading on a socket;
* stop and call recv callback after each datagram.
*/
void
isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
UNUSED(worker);
uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb, udp_read_cb);
}
isc_result_t
isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
isc_nmsocket_t *sock = NULL;
isc__netievent_startread_t *ievent = NULL;
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
REQUIRE(handle->sock->type == isc_nm_udpsocket);
sock = handle->sock;
sock->rcb.recv = cb;
sock->rcbarg = cbarg;
ievent = isc__nm_get_ievent(sock->mgr, netievent_udpread);
ievent->sock = sock;
if (sock->tid == isc_nm_tid()) {
isc__nm_async_udpread(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
isc__nm_put_ievent(sock->mgr, ievent);
} else {
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
}
return (ISC_R_SUCCESS);
}
static void
udp_close_cb(uv_handle_t *uvhandle) {
isc_nmsocket_t *sock = uv_handle_get_data(uvhandle);
REQUIRE(VALID_NMSOCK(sock));
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
atomic_store(&sock->closed, true);
isc__nmsocket_prep_destroy(sock);
}
void
isc__nm_async_udpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_udpclose_t *ievent = (isc__netievent_udpclose_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(worker->id == ievent->sock->tid);
uv_close(&sock->uv_handle.handle, udp_close_cb);
}
void
isc__nm_udp_close(isc_nmsocket_t *sock) {
isc__netievent_udpclose_t *ievent = NULL;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_udpsocket);
ievent = isc__nm_get_ievent(sock->mgr, netievent_udpclose);
ievent->sock = sock;
if (sock->tid == isc_nm_tid()) {
isc__nm_async_udpclose(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
isc__nm_put_ievent(sock->mgr, ievent);
} else {
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
}
}