Merge branch 'artem-http-write-buffering' into 'main'

HTTP/2 write buffering

See merge request isc-projects/bind9!5121
This commit is contained in:
Artem Boldariev
2021-06-01 18:28:34 +00:00

View File

@@ -21,6 +21,7 @@
#include <isc/print.h>
#include <isc/tls.h>
#include <isc/url.h>
#include <isc/util.h>
#include "netmgr-int.h"
@@ -58,6 +59,19 @@
#define MIN_SUCCESSFUL_HTTP_STATUS (200)
#define MAX_SUCCESSFUL_HTTP_STATUS (299)
/* This definition sets the upper limit of pending write buffer to an
* adequate enough value. That is done mostly to fight a limitation
* for a max TLS record size in flamethrower (2K). In a perfect world
* this constant should not be required, if we ever move closer to
* that state, the constant, and corresponding code, should be
* removed. For now the limit seems adequate enough to fight
* "tinygrams" problem. */
#define FLUSH_HTTP_WRITE_BUFFER_AFTER (1536)
/* This switch is here mostly to test the code interoperability with
* buggy implementations */
#define ENABLE_HTTP_WRITE_BUFFERING 1
#define SUCCESSFUL_HTTP_STATUS(code) \
((code) >= MIN_SUCCESSFUL_HTTP_STATUS && \
(code) <= MAX_SUCCESSFUL_HTTP_STATUS)
@@ -104,6 +118,8 @@ typedef struct http_cstream {
#define HTTP2_SESSION_MAGIC ISC_MAGIC('H', '2', 'S', 'S')
#define VALID_HTTP2_SESSION(t) ISC_MAGIC_VALID(t, HTTP2_SESSION_MAGIC)
typedef ISC_LIST(isc__nm_uvreq_t) isc__nm_http_pending_callbacks_t;
struct isc_nm_http_session {
unsigned int magic;
isc_refcount_t references;
@@ -130,6 +146,9 @@ struct isc_nm_http_session {
size_t bufsize;
isc_tlsctx_t *tlsctx;
isc__nm_http_pending_callbacks_t pending_write_callbacks;
isc_buffer_t *pending_write_data;
};
typedef enum isc_http_error_responses {
@@ -151,6 +170,7 @@ typedef struct isc_http_send_req {
isc_region_t data;
isc_nm_cb_t cb;
void *cbarg;
isc__nm_http_pending_callbacks_t pending_write_callbacks;
} isc_http_send_req_t;
static bool
@@ -187,6 +207,10 @@ finish_http_session(isc_nm_http_session_t *session);
static void
http_transpost_tcp_nodelay(isc_nmhandle_t *transphandle);
static void
call_pending_callbacks(isc__nm_http_pending_callbacks_t pending_callbacks,
isc_result_t result);
static bool
http_session_active(isc_nm_http_session_t *session) {
REQUIRE(VALID_HTTP2_SESSION(session));
@@ -251,6 +275,7 @@ new_session(isc_mem_t *mctx, isc_tlsctx_t *tctx,
isc_mem_attach(mctx, &session->mctx);
ISC_LIST_INIT(session->cstreams);
ISC_LIST_INIT(session->sstreams);
ISC_LIST_INIT(session->pending_write_callbacks);
*sessionp = session;
}
@@ -431,6 +456,15 @@ finish_http_session(isc_nm_http_session_t *session) {
} else {
server_call_failed_read_cb(ISC_R_UNEXPECTED, session);
}
call_pending_callbacks(session->pending_write_callbacks,
ISC_R_UNEXPECTED);
ISC_LIST_INIT(session->pending_write_callbacks);
if (session->pending_write_data != NULL) {
isc_buffer_free(&session->pending_write_data);
}
isc_nmhandle_detach(&session->handle);
}
@@ -890,6 +924,18 @@ http_readcb(isc_nmhandle_t *handle, isc_result_t result, isc_region_t *region,
http_do_bio(session, NULL, NULL, NULL);
}
static void
call_pending_callbacks(isc__nm_http_pending_callbacks_t pending_callbacks,
isc_result_t result) {
isc__nm_uvreq_t *cbreq = ISC_LIST_HEAD(pending_callbacks);
while (cbreq != NULL) {
isc__nm_uvreq_t *next = ISC_LIST_NEXT(cbreq, link);
ISC_LIST_UNLINK(pending_callbacks, cbreq, link);
isc__nm_sendcb(cbreq->handle->sock, cbreq, result, false);
cbreq = next;
}
}
static void
http_writecb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
isc_http_send_req_t *req = (isc_http_send_req_t *)arg;
@@ -903,6 +949,8 @@ http_writecb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
INSIST(session->handle == handle);
}
call_pending_callbacks(req->pending_write_callbacks, result);
if (req->cb != NULL) {
req->cb(req->httphandle, result, req->cbarg);
isc_nmhandle_detach(&req->httphandle);
@@ -911,8 +959,8 @@ http_writecb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
isc_mem_put(session->mctx, req->data.base, req->data.length);
isc_mem_put(session->mctx, req, sizeof(*req));
http_do_bio(session, NULL, NULL, NULL);
session->sending--;
http_do_bio(session, NULL, NULL, NULL);
isc_nmhandle_detach(&transphandle);
if (result != ISC_R_SUCCESS && session->sending == 0) {
finish_http_session(session);
@@ -920,31 +968,187 @@ http_writecb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
isc__nm_httpsession_detach(&session);
}
static void
move_pending_send_callbacks(isc_nm_http_session_t *session,
isc_http_send_req_t *send) {
STATIC_ASSERT(
sizeof(session->pending_write_callbacks) ==
sizeof(send->pending_write_callbacks),
"size of pending writes requests callbacks lists differs");
memmove(&send->pending_write_callbacks,
&session->pending_write_callbacks,
sizeof(session->pending_write_callbacks));
ISC_LIST_INIT(session->pending_write_callbacks);
}
static bool
http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle,
isc_nm_cb_t cb, void *cbarg) {
isc_http_send_req_t *send = NULL;
const uint8_t *data = NULL;
size_t pending;
size_t total = 0;
uint8_t tmp_data[8192] = { 0 };
uint8_t *prepared_data = &tmp_data[0];
#ifdef ENABLE_HTTP_WRITE_BUFFERING
size_t max_total_write_size = 0;
#endif /* ENABLE_HTTP_WRITE_BUFFERING */
if (!http_session_active(session) ||
!nghttp2_session_want_write(session->ngsession))
(!nghttp2_session_want_write(session->ngsession) &&
session->pending_write_data == NULL))
{
return (false);
}
pending = nghttp2_session_mem_send(session->ngsession, &data);
if (pending == 0) {
while (nghttp2_session_want_write(session->ngsession)) {
const uint8_t *data = NULL;
const size_t pending =
nghttp2_session_mem_send(session->ngsession, &data);
const size_t new_total = total + pending;
/* reallocate buffer if required */
if (new_total > sizeof(tmp_data)) {
uint8_t *old_prepared_data = prepared_data;
const bool allocated = prepared_data != tmp_data;
prepared_data = isc_mem_get(session->mctx, new_total);
memmove(prepared_data, old_prepared_data, total);
if (allocated) {
isc_mem_put(session->mctx, old_prepared_data,
total);
}
}
memmove(&prepared_data[total], data, pending);
total = new_total;
}
#ifdef ENABLE_HTTP_WRITE_BUFFERING
max_total_write_size = total;
if (session->pending_write_data != NULL) {
max_total_write_size +=
isc_buffer_usedlength(session->pending_write_data);
}
/* Here we are trying to flush the pending writes buffer earlier
* to avoid hitting unnecessary limitations on a TLS record size
* within some tools (e.g. flamethrower). */
if (max_total_write_size >= FLUSH_HTTP_WRITE_BUFFER_AFTER) {
/* Case 1: We have equal or more than
* FLUSH_HTTP_WRITE_BUFFER_AFTER bytes to send. Let's put the
* data which we have just obtained from nghttp2 into the
* pending write buffer and flush it. */
/* Let's allocate a new write buffer if there is none. */
if (session->pending_write_data == NULL) {
isc_buffer_allocate(session->mctx,
&session->pending_write_data,
max_total_write_size);
}
isc_buffer_putmem(session->pending_write_data, prepared_data,
total);
if (prepared_data != &tmp_data[0]) {
isc_mem_put(session->mctx, prepared_data, total);
}
total = max_total_write_size;
prepared_data = isc_buffer_base(session->pending_write_data);
} else if (session->sending > 0 && total > 0) {
/* Case 2: There is one or more write requests in flight and
* we have some new data form nghttp2 to send. Let's put the
* write callback (if any) into the pending write callbacks
* list and add the new data into the pending write
* buffer. Then let's return from the function: as soon as the
* "in-flight" write callback get's called or we have reached
* FLUSH_HTTP_WRITE_BUFFER_AFTER bytes in the write buffer, we
* will flush the buffer. */
if (cb != NULL) {
isc__nm_uvreq_t *newcb = isc__nm_uvreq_get(
httphandle->sock->mgr, httphandle->sock);
INSIST(VALID_NMHANDLE(httphandle));
newcb->cb.send = cb;
newcb->cbarg = cbarg;
isc_nmhandle_attach(httphandle, &newcb->handle);
ISC_LIST_APPEND(session->pending_write_callbacks, newcb,
link);
}
if (session->pending_write_data == NULL) {
isc_buffer_allocate(session->mctx,
&session->pending_write_data,
total);
isc_buffer_setautorealloc(session->pending_write_data,
true);
}
isc_buffer_putmem(session->pending_write_data, prepared_data,
total);
if (prepared_data != &tmp_data[0]) {
isc_mem_put(session->mctx, prepared_data, total);
}
return (false);
} else if (session->sending == 0 && total == 0 &&
session->pending_write_data != NULL)
{
/* Case 3: There is no write in flight and we haven't got
* anything new from nghttp2, but there is some data pending
* in the write buffer. Let's flush the buffer. */
isc_region_t region = { 0 };
total = isc_buffer_usedlength(session->pending_write_data);
INSIST(total > 0);
INSIST(prepared_data == &tmp_data[0]);
isc_buffer_usedregion(session->pending_write_data, &region);
INSIST(total == region.length);
prepared_data = region.base;
} else {
/* The other cases are, uninteresting, fall-through ones. */
/* In the following cases (4-6) we will just bail out. */
/* Case 4: There is nothing new to send, nor anything in the
* write buffer. */
/* Case 5: There is nothing new to send and there is write
* request(s) in flight. */
/* Case 6: There is nothing new to send nor there are any
* write requests in flight. */
/* Case 7: There is some new data to send and there are no any
* write requests in flight: Let's send the data.*/
INSIST((total == 0 && session->pending_write_data == NULL) ||
(total == 0 && session->sending > 0) ||
(total == 0 && session->sending == 0) ||
(total > 0 && session->sending == 0));
}
#else
INSIST(session->pending_write_data == NULL);
INSIST(ISC_LIST_EMPTY(session->pending_write_callbacks));
#endif /* ENABLE_HTTP_WRITE_BUFFERING */
if (total == 0) {
INSIST(prepared_data == &tmp_data[0]);
/* No data returned */
return (false);
}
send = isc_mem_get(session->mctx, sizeof(*send));
*send = (isc_http_send_req_t){
.data.base = isc_mem_get(session->mctx, pending),
.data.length = pending,
};
memmove(send->data.base, data, pending);
if (prepared_data == &tmp_data[0]) {
*send = (isc_http_send_req_t){
.data.base = isc_mem_get(session->mctx, total),
.data.length = total,
};
memmove(send->data.base, tmp_data, total);
} else if (session->pending_write_data != NULL) {
*send = (isc_http_send_req_t){
.data.base = isc_mem_get(session->mctx, total),
.data.length = total,
};
memmove(send->data.base,
isc_buffer_base(session->pending_write_data), total);
isc_buffer_free(&session->pending_write_data);
} else {
*send = (isc_http_send_req_t){
.data.base = prepared_data,
.data.length = total,
};
}
isc_nmhandle_attach(session->handle, &send->transphandle);
isc__nm_httpsession_attach(session, &send->session);
@@ -955,6 +1159,8 @@ http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle,
isc_nmhandle_attach(httphandle, &send->httphandle);
}
move_pending_send_callbacks(session, send);
session->sending++;
isc_nm_send(session->handle, &send->data, http_writecb, send);
return (true);
@@ -975,8 +1181,9 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle,
finish_http_session(session);
}
return;
} else if ((nghttp2_session_want_read(session->ngsession) == 0 &&
nghttp2_session_want_write(session->ngsession) == 0))
} else if (nghttp2_session_want_read(session->ngsession) == 0 &&
nghttp2_session_want_write(session->ngsession) == 0 &&
session->pending_write_data == NULL)
{
session->closing = true;
return;