Compare commits

...

3 Commits

Author SHA1 Message Date
Artem Boldariev
b6e826242d WIP: optimise memory consumption and allocations on server-side 2021-07-21 16:30:09 +03:00
Artem Boldariev
6ee9e27f01 Replace ad-hoc DNS message buffer in client code with isc_buffer_t
The commit replaces an ad-hoc incoming DNS-message buffer in the
client-side DoH code with isc_buffer_t.

The commit also fixes a timing issue in the unit tests revealed by the
change.
2021-07-20 18:41:57 +03:00
Artem Boldariev
429efe1c28 Replace the HTTP/2 session's ad-hoc buffer with isc_buffer_t
This commit replaces a static ad-hoc HTTP/2 session's temporary buffer
with a realloc-able isc_buffer_t object, which is being allocated on
as needed basis, lowering the memory consumption somewhat. The buffer
is needed in very rare cases, so allocating it prematurely is not
wise.

Also, it fixes a bug in http_readcb() where the ad-hoc buffer appeared
to be improperly used, leading to a situation when the processed data
from the receiving regions can be processed twice, while unprocessed
data will never be processed.
2021-07-19 15:26:53 +03:00
3 changed files with 85 additions and 56 deletions

View File

@@ -74,6 +74,8 @@
((code) >= MIN_SUCCESSFUL_HTTP_STATUS && \
(code) <= MAX_SUCCESSFUL_HTTP_STATUS)
#define INITIAL_DNS_MESSAGE_BUFFER_SIZE (512)
typedef struct isc_nm_http_response_status {
size_t code;
size_t content_length;
@@ -96,8 +98,7 @@ typedef struct http_cstream {
size_t authoritylen;
char *path;
uint8_t rbuf[MAX_DNS_MESSAGE_SIZE];
size_t rbufsize;
isc_buffer_t *rbuf;
size_t pathlen;
int32_t stream_id;
@@ -139,8 +140,7 @@ struct isc_nm_http_session {
isc_nmhandle_t *client_httphandle;
isc_nmsocket_t *serversocket;
uint8_t buf[MAX_DNS_MESSAGE_SIZE];
size_t bufsize;
isc_buffer_t *buf;
isc_tlsctx_t *tlsctx;
uint32_t max_concurrent_streams;
@@ -318,6 +318,10 @@ isc__nm_httpsession_detach(isc_nm_http_session_t **sessionp) {
session->ngsession = NULL;
}
if (session->buf != NULL) {
isc_buffer_free(&session->buf);
}
/* We need an acquire memory barrier here */
(void)isc_refcount_current(&session->references);
@@ -419,6 +423,10 @@ new_http_cstream(isc_nmsocket_t *sock, http_cstream_t **streamp) {
stream->up.field_data[ISC_UF_QUERY].len);
}
isc_buffer_allocate(mctx, &stream->rbuf,
INITIAL_DNS_MESSAGE_BUFFER_SIZE);
isc_buffer_setautorealloc(stream->rbuf, true);
ISC_LIST_PREPEND(sock->h2.session->cstreams, stream, link);
*streamp = stream;
@@ -448,6 +456,8 @@ put_http_cstream(isc_mem_t *mctx, http_cstream_t *stream) {
link);
}
isc__nmsocket_detach(&stream->httpsock);
isc_buffer_free(&stream->rbuf);
isc_mem_put(mctx, stream, sizeof(http_cstream_t));
}
@@ -499,12 +509,14 @@ on_client_data_chunk_recv_callback(int32_t stream_id, const uint8_t *data,
http_cstream_t *cstream = find_http_cstream(stream_id, session);
if (cstream != NULL) {
size_t new_rbufsize = cstream->rbufsize + len;
size_t new_rbufsize = len;
if (cstream->rbuf != NULL) {
new_rbufsize += isc_buffer_usedlength(cstream->rbuf);
}
if (new_rbufsize <= MAX_DNS_MESSAGE_SIZE &&
new_rbufsize <= cstream->response_status.content_length)
{
memmove(cstream->rbuf + cstream->rbufsize, data, len);
cstream->rbufsize = new_rbufsize;
isc_buffer_putmem(cstream->rbuf, data, len);
} else {
return (NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE);
}
@@ -521,11 +533,17 @@ on_server_data_chunk_recv_callback(int32_t stream_id, const uint8_t *data,
isc_nmsocket_h2_t *h2 = ISC_LIST_HEAD(session->sstreams);
while (h2 != NULL) {
if (stream_id == h2->stream_id) {
size_t new_bufsize = h2->bufsize + len;
size_t new_bufsize;
isc_buffer_reinit(&h2->rbuf,
isc_mem_allocate(session->mctx,
h2->content_length),
h2->content_length);
new_bufsize = isc_buffer_usedlength(&h2->rbuf) + len;
if (new_bufsize <= MAX_DNS_MESSAGE_SIZE &&
new_bufsize <= h2->content_length) {
memmove(h2->buf + h2->bufsize, data, len);
h2->bufsize = new_bufsize;
isc_buffer_putmem(&h2->rbuf, data, len);
break;
}
@@ -565,12 +583,13 @@ static void
call_unlink_cstream_readcb(http_cstream_t *cstream,
isc_nm_http_session_t *session,
isc_result_t result) {
isc_region_t read_data;
REQUIRE(VALID_HTTP2_SESSION(session));
REQUIRE(cstream != NULL);
ISC_LIST_UNLINK(session->cstreams, cstream, link);
INSIST(VALID_NMHANDLE(session->client_httphandle));
cstream->read_cb(session->client_httphandle, result,
&(isc_region_t){ cstream->rbuf, cstream->rbufsize },
isc_buffer_usedregion(cstream->rbuf, &read_data);
cstream->read_cb(session->client_httphandle, result, &read_data,
cstream->read_cbarg);
put_http_cstream(session->mctx, cstream);
}
@@ -952,10 +971,14 @@ http_readcb(isc_nmhandle_t *handle, isc_result_t result, isc_region_t *region,
}
if ((size_t)readlen < region->length) {
INSIST(session->bufsize == 0);
INSIST(region->length - readlen < MAX_DNS_MESSAGE_SIZE);
memmove(session->buf, region->base, region->length - readlen);
session->bufsize = region->length - readlen;
size_t unread_size = region->length - readlen;
if (session->buf == NULL) {
isc_buffer_allocate(session->mctx, &session->buf,
unread_size);
isc_buffer_setautorealloc(session->buf, true);
}
isc__buffer_putmem(session->buf, region->base + readlen,
unread_size);
isc_nm_pauseread(session->handle);
}
@@ -1240,18 +1263,18 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle,
/* We have not yet started reading from this handle */
isc_nm_read(session->handle, http_readcb, session);
session->reading = true;
} else if (session->bufsize > 0) {
} else if (session->buf != NULL) {
size_t remaining =
isc_buffer_remaininglength(session->buf);
/* Leftover data in the buffer, use it */
size_t readlen = nghttp2_session_mem_recv(
session->ngsession, session->buf,
session->bufsize);
session->ngsession,
isc_buffer_current(session->buf), remaining);
if (readlen == session->bufsize) {
session->bufsize = 0;
if (readlen == remaining) {
isc_buffer_free(&session->buf);
} else {
memmove(session->buf, session->buf + readlen,
session->bufsize - readlen);
session->bufsize -= readlen;
isc_buffer_add(session->buf, readlen);
}
http_do_bio(session, send_httphandle, send_cb,
@@ -1634,12 +1657,11 @@ server_on_begin_headers_callback(nghttp2_session *ngsession,
isc_nm_httpsocket,
(isc_sockaddr_t *)&session->handle->sock->iface);
socket->peer = session->handle->sock->peer;
socket->h2 = (isc_nmsocket_h2_t){
.buf = isc_mem_allocate(session->mctx, MAX_DNS_MESSAGE_SIZE),
.psock = socket,
.stream_id = frame->hd.stream_id,
.headers_error_code = ISC_HTTP_ERROR_SUCCESS
};
socket->h2 = (isc_nmsocket_h2_t){ .psock = socket,
.stream_id = frame->hd.stream_id,
.headers_error_code =
ISC_HTTP_ERROR_SUCCESS };
isc_buffer_init(&socket->h2.rbuf, NULL, 0);
session->nsstreams++;
isc__nm_httpsession_attach(session, &socket->h2.session);
socket->tid = session->handle->sock->tid;
@@ -1912,14 +1934,14 @@ server_read_callback(nghttp2_session *ngsession, int32_t stream_id,
UNUSED(ngsession);
UNUSED(session);
buflen = socket->h2.bufsize - socket->h2.bufpos;
buflen = isc_buffer_remaininglength(&socket->h2.wbuf);
if (buflen > length) {
buflen = length;
}
memmove(buf, socket->h2.buf + socket->h2.bufpos, buflen);
socket->h2.bufpos += buflen;
if (socket->h2.bufpos == socket->h2.bufsize) {
memmove(buf, isc_buffer_current(&socket->h2.wbuf), buflen);
isc_buffer_add(&socket->h2.wbuf, buflen);
if (isc_buffer_remaininglength(&socket->h2.wbuf) == 0) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
@@ -1982,9 +2004,6 @@ static struct http_error_responses {
static isc_result_t
server_send_error_response(const isc_http_error_responses_t error,
nghttp2_session *ngsession, isc_nmsocket_t *socket) {
socket->h2.bufsize = 0;
socket->h2.bufpos = 0;
for (size_t i = 0;
i < sizeof(error_responses) / sizeof(error_responses[0]); i++)
{
@@ -2036,6 +2055,7 @@ server_on_request_recv(nghttp2_session *ngsession,
isc_result_t result;
isc_http_error_responses_t code = ISC_HTTP_ERROR_SUCCESS;
isc_region_t data;
uint8_t tmpbuf[MAX_DNS_MESSAGE_SIZE];
code = socket->h2.headers_error_code;
if (code != ISC_HTTP_ERROR_SUCCESS) {
@@ -2045,11 +2065,13 @@ server_on_request_recv(nghttp2_session *ngsession,
if (socket->h2.request_path == NULL || socket->h2.cb == NULL) {
code = ISC_HTTP_ERROR_NOT_FOUND;
} else if (socket->h2.request_type == ISC_HTTP_REQ_POST &&
socket->h2.bufsize > socket->h2.content_length)
isc_buffer_usedlength(&socket->h2.rbuf) >
socket->h2.content_length)
{
code = ISC_HTTP_ERROR_PAYLOAD_TOO_LARGE;
} else if (socket->h2.request_type == ISC_HTTP_REQ_POST &&
socket->h2.bufsize != socket->h2.content_length)
isc_buffer_usedlength(&socket->h2.rbuf) !=
socket->h2.content_length)
{
code = ISC_HTTP_ERROR_BAD_REQUEST;
}
@@ -2060,18 +2082,17 @@ server_on_request_recv(nghttp2_session *ngsession,
if (socket->h2.request_type == ISC_HTTP_REQ_GET) {
isc_buffer_t decoded_buf;
isc__buffer_init(&decoded_buf, socket->h2.buf,
MAX_DNS_MESSAGE_SIZE);
isc_buffer_init(&decoded_buf, tmpbuf, sizeof(tmpbuf));
if (isc_base64_decodestring(socket->h2.query_data,
&decoded_buf) != ISC_R_SUCCESS)
{
code = ISC_HTTP_ERROR_BAD_REQUEST;
goto error;
}
isc__buffer_usedregion(&decoded_buf, &data);
isc_buffer_usedregion(&decoded_buf, &data);
} else if (socket->h2.request_type == ISC_HTTP_REQ_POST) {
INSIST(socket->h2.content_length > 0);
data = (isc_region_t){ socket->h2.buf, socket->h2.bufsize };
isc_buffer_usedregion(&socket->h2.rbuf, &data);
} else {
INSIST(0);
ISC_UNREACHABLE();
@@ -2163,8 +2184,7 @@ server_httpsend(isc_nmhandle_t *handle, isc_nmsocket_t *sock,
INSIST(VALID_NMHANDLE(handle->httpsession->handle));
INSIST(VALID_NMSOCK(handle->httpsession->handle->sock));
memmove(sock->h2.buf, req->uvbuf.base, req->uvbuf.len);
sock->h2.bufsize = req->uvbuf.len;
isc_buffer_init(&sock->h2.wbuf, req->uvbuf.base, req->uvbuf.len);
len = snprintf(sock->h2.clenbuf, sizeof(sock->h2.clenbuf), "%lu",
(unsigned long)req->uvbuf.len);
@@ -2658,6 +2678,7 @@ isc__nm_async_httpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
static void
failed_httpstream_read_cb(isc_nmsocket_t *sock, isc_result_t result,
isc_nm_http_session_t *session) {
isc_region_t data = { 0, 0 };
REQUIRE(VALID_NMSOCK(sock));
INSIST(sock->type == isc_nm_httpsocket);
@@ -2670,8 +2691,9 @@ failed_httpstream_read_cb(isc_nmsocket_t *sock, isc_result_t result,
(void)nghttp2_submit_rst_stream(
session->ngsession, NGHTTP2_FLAG_END_STREAM, sock->h2.stream_id,
NGHTTP2_REFUSED_STREAM);
server_call_cb(sock, session, result,
&(isc_region_t){ sock->h2.buf, sock->h2.bufsize });
isc_buffer_usedregion(&sock->h2.rbuf, &data);
server_call_cb(sock, session, result, &data);
}
static void
@@ -2693,10 +2715,10 @@ client_call_failed_read_cb(isc_result_t result,
* in such a case.
*/
if (cstream->read_cb != NULL) {
isc_region_t read_data;
isc_buffer_usedregion(cstream->rbuf, &read_data);
cstream->read_cb(session->client_httphandle, result,
&(isc_region_t){ cstream->rbuf,
cstream->rbufsize },
cstream->read_cbarg);
&read_data, cstream->read_cbarg);
}
if (result != ISC_R_TIMEDOUT || cstream->read_cb == NULL ||
@@ -2939,9 +2961,9 @@ isc__nm_http_cleanup_data(isc_nmsocket_t *sock) {
INSIST(sock->h2.connect.cstream == NULL);
if (sock->h2.buf != NULL) {
isc_mem_free(sock->mgr->mctx, sock->h2.buf);
sock->h2.buf = NULL;
if (isc_buffer_base(&sock->h2.rbuf) != NULL) {
void *base = isc_buffer_base(&sock->h2.rbuf);
isc_mem_free(sock->mgr->mctx, base);
}
}

View File

@@ -791,9 +791,8 @@ typedef struct isc_nmsocket_h2 {
bool query_too_large;
isc_nm_httphandler_t *handler;
uint8_t *buf;
size_t bufsize;
size_t bufpos;
isc_buffer_t rbuf;
isc_buffer_t wbuf;
int32_t stream_id;
isc_nm_http_session_t *session;

View File

@@ -1060,6 +1060,14 @@ doh_recv_send(void **state) {
isc_thread_create(doh_connect_thread, connect_nm, &threads[i]);
}
/* wait for the all responses from the server */
while (atomic_load(&ssends) < atomic_load(&total_sends)) {
if (atomic_load(&was_error)) {
break;
}
isc_test_nap(100);
}
for (size_t i = 0; i < nthreads; i++) {
isc_thread_join(threads[i], NULL);
}