DoH: process data chunk by chunk instead of all at once
Initially, our DNS-over-HTTP(S) implementation would try to process as
much incoming data from the network as possible. However, that might
be undesirable as we might create too many streams (each effectively
backed by a ns_client_t object). That is too forgiving as it might
overwhelm the server and trash its memory allocator, causing high CPU
and memory usage.
Instead of doing that, we resort to processing incoming data using a
chunk-by-chunk processing strategy. That is, we split data into small
chunks (currently 256 bytes) and process each of them
asynchronously. However, we can process more than one chunk at
once (up to 4 currently), given that the number of HTTP/2 streams has
not increased while processing a chunk.
That alone is not enough, though. In addition to the above, we should
limit the number of active streams: these streams for which we have
received a request and started processing it (the ones for which a
read callback was called), as it is perfectly fine to have more opened
streams than active ones. In the case we have reached or surpassed the
limit of active streams, we stop reading AND processing the data from
the remote peer. The number of active streams is effectively decreased
only when responses associated with the active streams are sent to the
remote peer.
Overall, this strategy is very similar to the one used for other
stream-based DNS transports like TCP and TLS.
(cherry picked from commit 9846f395ad)
This commit is contained in:
committed by
Andoni Duarte Pintado
parent
125bfd71d3
commit
11a2956dce
@@ -85,6 +85,21 @@
|
||||
|
||||
#define INITIAL_DNS_MESSAGE_BUFFER_SIZE (512)
|
||||
|
||||
/*
|
||||
* The value should be small enough to not allow a server to open too
|
||||
* many streams at once. It should not be too small either because
|
||||
* the incoming data will be split into too many chunks with each of
|
||||
* them processed asynchronously.
|
||||
*/
|
||||
#define INCOMING_DATA_CHUNK_SIZE (256)
|
||||
|
||||
/*
|
||||
* Often processing a chunk does not change the number of streams. In
|
||||
* that case we can process more than once, but we still should have a
|
||||
* hard limit on that.
|
||||
*/
|
||||
#define INCOMING_DATA_MAX_CHUNKS_AT_ONCE (4)
|
||||
|
||||
typedef struct isc_nm_http_response_status {
|
||||
size_t code;
|
||||
size_t content_length;
|
||||
@@ -155,6 +170,15 @@ struct isc_nm_http_session {
|
||||
|
||||
isc__nm_http_pending_callbacks_t pending_write_callbacks;
|
||||
isc_buffer_t *pending_write_data;
|
||||
|
||||
/*
|
||||
* The statistical values below are for usage on server-side
|
||||
* only. They are meant to detect clients that are taking too many
|
||||
* resources from the server.
|
||||
*/
|
||||
uint64_t received; /* How many requests have been received. */
|
||||
uint64_t submitted; /* How many responses were submitted to send */
|
||||
uint64_t processed; /* How many responses were processed. */
|
||||
};
|
||||
|
||||
typedef enum isc_http_error_responses {
|
||||
@@ -177,6 +201,7 @@ typedef struct isc_http_send_req {
|
||||
void *cbarg;
|
||||
isc_buffer_t *pending_write_data;
|
||||
isc__nm_http_pending_callbacks_t pending_write_callbacks;
|
||||
uint64_t submitted;
|
||||
} isc_http_send_req_t;
|
||||
|
||||
#define HTTP_ENDPOINTS_MAGIC ISC_MAGIC('H', 'T', 'E', 'P')
|
||||
@@ -189,10 +214,20 @@ static bool
|
||||
http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle,
|
||||
isc_nm_cb_t cb, void *cbarg);
|
||||
|
||||
static ssize_t
|
||||
http_process_input_data(isc_nm_http_session_t *session,
|
||||
isc_buffer_t *input_data);
|
||||
|
||||
static inline bool
|
||||
http_too_many_active_streams(isc_nm_http_session_t *session);
|
||||
|
||||
static void
|
||||
http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle,
|
||||
isc_nm_cb_t send_cb, void *send_cbarg);
|
||||
|
||||
static void
|
||||
http_do_bio_async(isc_nm_http_session_t *session);
|
||||
|
||||
static void
|
||||
failed_httpstream_read_cb(isc_nmsocket_t *sock, isc_result_t result,
|
||||
isc_nm_http_session_t *session);
|
||||
@@ -496,6 +531,15 @@ finish_http_session(isc_nm_http_session_t *session) {
|
||||
isc_nm_cancelread(session->handle);
|
||||
}
|
||||
|
||||
/*
|
||||
* Free any unprocessed incoming data in order to not process
|
||||
* it during indirect calls to http_do_bio() that might happen
|
||||
* when calling the failed callbacks.
|
||||
*/
|
||||
if (session->buf != NULL) {
|
||||
isc_buffer_free(&session->buf);
|
||||
}
|
||||
|
||||
if (session->client) {
|
||||
client_call_failed_read_cb(ISC_R_UNEXPECTED, session);
|
||||
} else {
|
||||
@@ -656,6 +700,9 @@ on_server_stream_close_callback(int32_t stream_id,
|
||||
|
||||
ISC_LIST_UNLINK(session->sstreams, &sock->h2, link);
|
||||
session->nsstreams--;
|
||||
if (sock->h2.request_received) {
|
||||
session->submitted++;
|
||||
}
|
||||
|
||||
/*
|
||||
* By making a call to isc__nmsocket_prep_destroy(), we ensure that
|
||||
@@ -967,6 +1014,103 @@ client_submit_request(isc_nm_http_session_t *session, http_cstream_t *stream) {
|
||||
return ISC_R_SUCCESS;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
http_process_input_data(isc_nm_http_session_t *session,
|
||||
isc_buffer_t *input_data) {
|
||||
ssize_t readlen = 0;
|
||||
ssize_t processed = 0;
|
||||
isc_region_t chunk = { 0 };
|
||||
size_t before, after;
|
||||
size_t i;
|
||||
|
||||
REQUIRE(VALID_HTTP2_SESSION(session));
|
||||
REQUIRE(input_data != NULL);
|
||||
|
||||
if (!http_session_active(session)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* For clients that initiate request themselves just process
|
||||
* everything.
|
||||
*/
|
||||
if (session->client) {
|
||||
isc_buffer_remainingregion(input_data, &chunk);
|
||||
if (chunk.length == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
readlen = nghttp2_session_mem_recv(session->ngsession,
|
||||
chunk.base, chunk.length);
|
||||
|
||||
if (readlen >= 0) {
|
||||
isc_buffer_forward(input_data, readlen);
|
||||
}
|
||||
|
||||
return readlen;
|
||||
}
|
||||
|
||||
/*
|
||||
* If no streams are created during processing, we might process
|
||||
* more than one chunk at a time. Still we should not overdo that
|
||||
* to avoid processing too much data at once as such behaviour is
|
||||
* known for trashing the memory allocator at times.
|
||||
*/
|
||||
for (before = after = session->nsstreams, i = 0;
|
||||
after <= before && i < INCOMING_DATA_MAX_CHUNKS_AT_ONCE;
|
||||
after = session->nsstreams, i++)
|
||||
{
|
||||
const uint64_t active_streams =
|
||||
(session->received - session->processed);
|
||||
|
||||
/*
|
||||
* If there are non completed send requests in flight -let's
|
||||
* not process any incoming data, as it could lead to piling
|
||||
* up too much send data in send buffers. With many clients
|
||||
* connected it can lead to excessive memory consumption on
|
||||
* the server instance.
|
||||
*/
|
||||
if (session->sending > 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* If we have reached the maximum number of streams used, we
|
||||
* might stop processing for now, as nghttp2 will happily
|
||||
* consume as much data as possible.
|
||||
*/
|
||||
if (session->nsstreams >= session->max_concurrent_streams &&
|
||||
active_streams > 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (http_too_many_active_streams(session)) {
|
||||
break;
|
||||
}
|
||||
|
||||
isc_buffer_remainingregion(input_data, &chunk);
|
||||
if (chunk.length == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
chunk.length = ISC_MIN(chunk.length, INCOMING_DATA_CHUNK_SIZE);
|
||||
|
||||
readlen = nghttp2_session_mem_recv(session->ngsession,
|
||||
chunk.base, chunk.length);
|
||||
|
||||
if (readlen >= 0) {
|
||||
isc_buffer_forward(input_data, readlen);
|
||||
processed += readlen;
|
||||
} else {
|
||||
isc_buffer_clear(input_data);
|
||||
return readlen;
|
||||
}
|
||||
}
|
||||
|
||||
return processed;
|
||||
}
|
||||
|
||||
/*
|
||||
* Read callback from TLS socket.
|
||||
*/
|
||||
@@ -976,6 +1120,7 @@ http_readcb(isc_nmhandle_t *handle, isc_result_t result, isc_region_t *region,
|
||||
isc_nm_http_session_t *session = (isc_nm_http_session_t *)data;
|
||||
isc_nm_http_session_t *tmpsess = NULL;
|
||||
ssize_t readlen;
|
||||
isc_buffer_t input;
|
||||
|
||||
REQUIRE(VALID_HTTP2_SESSION(session));
|
||||
|
||||
@@ -994,8 +1139,10 @@ http_readcb(isc_nmhandle_t *handle, isc_result_t result, isc_region_t *region,
|
||||
goto done;
|
||||
}
|
||||
|
||||
readlen = nghttp2_session_mem_recv(session->ngsession, region->base,
|
||||
region->length);
|
||||
isc_buffer_init(&input, region->base, region->length);
|
||||
isc_buffer_add(&input, region->length);
|
||||
|
||||
readlen = http_process_input_data(session, &input);
|
||||
if (readlen < 0) {
|
||||
failed_read_cb(ISC_R_UNEXPECTED, session);
|
||||
goto done;
|
||||
@@ -1011,11 +1158,12 @@ http_readcb(isc_nmhandle_t *handle, isc_result_t result, isc_region_t *region,
|
||||
isc_buffer_putmem(session->buf, region->base + readlen,
|
||||
unread_size);
|
||||
isc_nm_pauseread(session->handle);
|
||||
http_do_bio_async(session);
|
||||
} else {
|
||||
/* We might have something to receive or send, do IO */
|
||||
http_do_bio(session, NULL, NULL, NULL);
|
||||
}
|
||||
|
||||
/* We might have something to receive or send, do IO */
|
||||
http_do_bio(session, NULL, NULL, NULL);
|
||||
|
||||
done:
|
||||
isc__nm_httpsession_detach(&tmpsess);
|
||||
}
|
||||
@@ -1053,14 +1201,18 @@ http_writecb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
|
||||
}
|
||||
|
||||
isc_buffer_free(&req->pending_write_data);
|
||||
session->processed += req->submitted;
|
||||
isc_mem_put(session->mctx, req, sizeof(*req));
|
||||
|
||||
session->sending--;
|
||||
http_do_bio(session, NULL, NULL, NULL);
|
||||
isc_nmhandle_detach(&transphandle);
|
||||
if (result != ISC_R_SUCCESS && session->sending == 0) {
|
||||
|
||||
if (result == ISC_R_SUCCESS) {
|
||||
http_do_bio(session, NULL, NULL, NULL);
|
||||
} else {
|
||||
finish_http_session(session);
|
||||
}
|
||||
isc_nmhandle_detach(&transphandle);
|
||||
|
||||
isc__nm_httpsession_detach(&session);
|
||||
}
|
||||
|
||||
@@ -1206,7 +1358,9 @@ http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle,
|
||||
*send = (isc_http_send_req_t){ .pending_write_data =
|
||||
session->pending_write_data,
|
||||
.cb = cb,
|
||||
.cbarg = cbarg };
|
||||
.cbarg = cbarg,
|
||||
.submitted = session->submitted };
|
||||
session->submitted = 0;
|
||||
session->pending_write_data = NULL;
|
||||
move_pending_send_callbacks(session, send);
|
||||
|
||||
@@ -1227,6 +1381,27 @@ nothing_to_send:
|
||||
return false;
|
||||
}
|
||||
|
||||
static inline bool
|
||||
http_too_many_active_streams(isc_nm_http_session_t *session) {
|
||||
const uint64_t active_streams = session->received - session->processed;
|
||||
const uint64_t max_active_streams = ISC_MIN(
|
||||
STREAM_CLIENTS_PER_CONN, session->max_concurrent_streams);
|
||||
|
||||
if (session->client) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Do not process incoming data if there are too many active DNS
|
||||
* clients (streams) per connection.
|
||||
*/
|
||||
if (active_streams >= max_active_streams) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static void
|
||||
http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle,
|
||||
isc_nm_cb_t send_cb, void *send_cbarg) {
|
||||
@@ -1242,11 +1417,21 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle,
|
||||
finish_http_session(session);
|
||||
}
|
||||
return;
|
||||
} else if (nghttp2_session_want_read(session->ngsession) == 0 &&
|
||||
nghttp2_session_want_write(session->ngsession) == 0 &&
|
||||
session->pending_write_data == NULL)
|
||||
{
|
||||
session->closing = true;
|
||||
}
|
||||
|
||||
if (send_cb != NULL) {
|
||||
INSIST(VALID_NMHANDLE(send_httphandle));
|
||||
(void)http_send_outgoing(session, send_httphandle, send_cb,
|
||||
send_cbarg);
|
||||
return;
|
||||
}
|
||||
|
||||
INSIST(send_httphandle == NULL);
|
||||
INSIST(send_cb == NULL);
|
||||
INSIST(send_cbarg == NULL);
|
||||
|
||||
if (session->pending_write_data != NULL && session->sending == 0) {
|
||||
(void)http_send_outgoing(session, NULL, NULL, NULL);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1259,18 +1444,49 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle,
|
||||
size_t remaining =
|
||||
isc_buffer_remaininglength(session->buf);
|
||||
/* Leftover data in the buffer, use it */
|
||||
size_t readlen = nghttp2_session_mem_recv(
|
||||
session->ngsession,
|
||||
isc_buffer_current(session->buf), remaining);
|
||||
size_t remaining_after = 0;
|
||||
ssize_t readlen = 0;
|
||||
isc_nm_http_session_t *tmpsess = NULL;
|
||||
|
||||
if (readlen == remaining) {
|
||||
/*
|
||||
* Let's ensure that HTTP/2 session and its associated
|
||||
* data will not go "out of scope" too early.
|
||||
*/
|
||||
isc__nm_httpsession_attach(session, &tmpsess);
|
||||
|
||||
readlen = http_process_input_data(session,
|
||||
session->buf);
|
||||
|
||||
remaining_after =
|
||||
isc_buffer_remaininglength(session->buf);
|
||||
|
||||
if (readlen < 0) {
|
||||
failed_read_cb(ISC_R_UNEXPECTED, session);
|
||||
} else if ((size_t)readlen == remaining) {
|
||||
isc_buffer_free(&session->buf);
|
||||
http_do_bio(session, NULL, NULL, NULL);
|
||||
} else if (remaining_after > 0 &&
|
||||
remaining_after < remaining)
|
||||
{
|
||||
/*
|
||||
* We have processed a part of the data, now
|
||||
* let's delay processing of whatever is left
|
||||
* here. We want it to be an async operation so
|
||||
* that we will:
|
||||
*
|
||||
* a) let other things run;
|
||||
* b) have finer grained control over how much
|
||||
* data is processed at once, because nghttp2
|
||||
* would happily consume as much data we pass to
|
||||
* it and that could overwhelm the server.
|
||||
*/
|
||||
http_do_bio_async(session);
|
||||
} else {
|
||||
isc_buffer_forward(session->buf, readlen);
|
||||
(void)http_send_outgoing(session, NULL, NULL,
|
||||
NULL);
|
||||
}
|
||||
|
||||
http_do_bio(session, send_httphandle, send_cb,
|
||||
send_cbarg);
|
||||
isc__nm_httpsession_detach(&tmpsess);
|
||||
return;
|
||||
} else {
|
||||
/* Resume reading, it's idempotent, wait for more */
|
||||
@@ -1281,20 +1497,55 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle,
|
||||
isc_nm_pauseread(session->handle);
|
||||
}
|
||||
|
||||
if (send_cb != NULL) {
|
||||
INSIST(VALID_NMHANDLE(send_httphandle));
|
||||
(void)http_send_outgoing(session, send_httphandle, send_cb,
|
||||
send_cbarg);
|
||||
} else {
|
||||
INSIST(send_httphandle == NULL);
|
||||
INSIST(send_cb == NULL);
|
||||
INSIST(send_cbarg == NULL);
|
||||
(void)http_send_outgoing(session, NULL, NULL, NULL);
|
||||
/* we might have some data to send after processing */
|
||||
(void)http_send_outgoing(session, NULL, NULL, NULL);
|
||||
|
||||
if (nghttp2_session_want_read(session->ngsession) == 0 &&
|
||||
nghttp2_session_want_write(session->ngsession) == 0 &&
|
||||
session->pending_write_data == NULL)
|
||||
{
|
||||
session->closing = true;
|
||||
isc_nm_pauseread(session->handle);
|
||||
if (session->sending == 0) {
|
||||
finish_http_session(session);
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
static void
|
||||
http_do_bio_async_cb(void *arg) {
|
||||
isc_nm_http_session_t *session = arg;
|
||||
|
||||
REQUIRE(VALID_HTTP2_SESSION(session));
|
||||
|
||||
if (session->handle != NULL &&
|
||||
!isc__nmsocket_closing(session->handle->sock))
|
||||
{
|
||||
http_do_bio(session, NULL, NULL, NULL);
|
||||
}
|
||||
|
||||
isc__nm_httpsession_detach(&session);
|
||||
}
|
||||
|
||||
static void
|
||||
http_do_bio_async(isc_nm_http_session_t *session) {
|
||||
isc_nm_http_session_t *tmpsess = NULL;
|
||||
|
||||
REQUIRE(VALID_HTTP2_SESSION(session));
|
||||
|
||||
if (session->handle == NULL ||
|
||||
isc__nmsocket_closing(session->handle->sock))
|
||||
{
|
||||
return;
|
||||
}
|
||||
isc__nm_httpsession_attach(session, &tmpsess);
|
||||
isc__nm_async_run(
|
||||
&session->handle->sock->mgr->workers[session->handle->sock->tid],
|
||||
http_do_bio_async_cb, tmpsess);
|
||||
}
|
||||
|
||||
static isc_result_t
|
||||
get_http_cstream(isc_nmsocket_t *sock, http_cstream_t **streamp) {
|
||||
http_cstream_t *cstream = sock->h2.connect.cstream;
|
||||
@@ -2039,6 +2290,10 @@ server_call_cb(isc_nmsocket_t *socket, const isc_result_t result,
|
||||
if (result != ISC_R_SUCCESS) {
|
||||
data = NULL;
|
||||
}
|
||||
if (result == ISC_R_SUCCESS) {
|
||||
socket->h2.request_received = true;
|
||||
socket->h2.session->received++;
|
||||
}
|
||||
socket->h2.cb(handle, result, data, socket->h2.cbarg);
|
||||
isc_nmhandle_detach(&handle);
|
||||
}
|
||||
|
||||
@@ -983,6 +983,7 @@ typedef struct isc_nmsocket_h2 {
|
||||
|
||||
isc_nm_http_endpoints_t *peer_endpoints;
|
||||
|
||||
bool request_received;
|
||||
bool response_submitted;
|
||||
struct {
|
||||
char *uri;
|
||||
|
||||
Reference in New Issue
Block a user