QUIC: coalesce neighbouring stream send buffers.

Previously a single STREAM frame was created for each buffer in stream output
chain which is wasteful with respect to memory.  The following changes were
made in the stream send code:

- ngx_quic_stream_send_chain() no longer calls ngx_quic_stream_send() and got
  a separate implementation that coalesces neighbouring buffers into a single
  frame
- the new ngx_quic_stream_send_chain() respects the limit argument, which fixes
  sendfile_max_chunk and limit_rate
- ngx_quic_stream_send() is reimplemented to call ngx_quic_stream_send_chain()
- stream frame size limit is moved out to a separate function
  ngx_quic_max_stream_frame()
- flow control is moved out to a separate function ngx_quic_max_stream_flow()
- ngx_quic_stream_send_chain() is relocated next to ngx_quic_stream_send()
This commit is contained in:
Roman Arutyunyan 2020-08-18 12:28:33 +03:00
parent 6e17937db4
commit ff1941d6dd

View File

@ -280,9 +280,11 @@ static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
size_t size);
static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
size_t size);
static void ngx_quic_stream_cleanup_handler(void *data);
static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
ngx_chain_t *in, off_t limit);
static size_t ngx_quic_max_stream_frame(ngx_quic_connection_t *qc);
static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
static void ngx_quic_stream_cleanup_handler(void *data);
static ngx_quic_frame_t *ngx_quic_alloc_frame(ngx_connection_t *c, size_t size);
static void ngx_quic_free_frame(ngx_connection_t *c, ngx_quic_frame_t *frame);
@ -4228,10 +4230,44 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
static ssize_t
ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
{
u_char *p, *end;
size_t fsize, limit, n, len;
uint64_t sent, unacked;
ngx_buf_t b;
ngx_chain_t cl;
ngx_memzero(&b, sizeof(ngx_buf_t));
b.memory = 1;
b.pos = buf;
b.last = buf + size;
cl.buf = &b;
cl.next = NULL;
if (ngx_quic_stream_send_chain(c, &cl, 0) == NGX_CHAIN_ERROR) {
return NGX_ERROR;
}
if (b.pos == buf) {
return NGX_AGAIN;
}
return b.pos - buf;
}
static ngx_chain_t *
ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
{
u_char *p;
size_t n, max, max_frame, max_flow, max_limit, len;
#if (NGX_DEBUG)
size_t sent;
#endif
ngx_buf_t *b;
#if (NGX_DEBUG)
ngx_uint_t nframes;
#endif
ngx_event_t *wev;
ngx_chain_t *cl;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
@ -4243,68 +4279,48 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
wev = c->write;
if (wev->error) {
return NGX_ERROR;
return NGX_CHAIN_ERROR;
}
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id 0x%xL send: %uz", qs->id, size);
max_frame = ngx_quic_max_stream_frame(qc);
max_flow = ngx_quic_max_stream_flow(c);
max_limit = limit;
/*
* we need to fit at least 1 frame into a packet, thus account head/tail;
* 25 = 1 + 8x3 is max header for STREAM frame, with 1 byte for frame type
*/
limit = qc->ctp.max_udp_payload_size - NGX_QUIC_MAX_SHORT_HEADER - 25
- EVP_GCM_TLS_TAG_LEN;
#if (NGX_DEBUG)
sent = 0;
nframes = 0;
#endif
len = size;
sent = c->sent;
unacked = sent - qs->acked;
for ( ;; ) {
max = ngx_min(max_frame, max_flow);
if (qc->streams.send_max_data == 0) {
qc->streams.send_max_data = qc->ctp.initial_max_data;
}
if (unacked >= NGX_QUIC_STREAM_BUFSIZE) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send hit buffer size");
len = 0;
} else if (unacked + len > NGX_QUIC_STREAM_BUFSIZE) {
len = NGX_QUIC_STREAM_BUFSIZE - unacked;
}
if (qc->streams.sent >= qc->streams.send_max_data) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send hit MAX_DATA");
len = 0;
} else if (qc->streams.sent + len > qc->streams.send_max_data) {
len = qc->streams.send_max_data - qc->streams.sent;
}
if (sent >= qs->send_max_data) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send hit MAX_STREAM_DATA");
len = 0;
} else if (sent + len > qs->send_max_data) {
len = qs->send_max_data - sent;
}
p = (u_char *) buf;
end = (u_char *) buf + len;
n = 0;
while (p < end) {
fsize = ngx_min(limit, (size_t) (end - p));
frame = ngx_quic_alloc_frame(pc, fsize);
if (frame == NULL) {
return 0;
if (limit) {
max = ngx_min(max, max_limit);
}
ngx_memcpy(frame->data, p, fsize);
for (cl = in, n = 0; in; in = in->next) {
if (!ngx_buf_in_memory(in->buf)) {
continue;
}
n += ngx_buf_size(in->buf);
if (n > max) {
n = max;
break;
}
}
if (n == 0) {
wev->ready = (max_flow ? 1 : 0);
break;
}
frame = ngx_quic_alloc_frame(pc, n);
if (frame == NULL) {
return NGX_CHAIN_ERROR;
}
frame->level = ssl_encryption_application;
frame->type = NGX_QUIC_FT_STREAM6; /* OFF=1 LEN=1 FIN=0 */
@ -4315,33 +4331,115 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
frame->u.stream.type = frame->type;
frame->u.stream.stream_id = qs->id;
frame->u.stream.offset = c->sent;
frame->u.stream.length = fsize;
frame->u.stream.length = n;
frame->u.stream.data = frame->data;
c->sent += fsize;
qc->streams.sent += fsize;
p += fsize;
n += fsize;
ngx_sprintf(frame->info, "STREAM id:0x%xL len:%uz level:%d",
qs->id, n, frame->level);
ngx_sprintf(frame->info, "stream 0x%xL len=%ui level=%d",
qs->id, fsize, frame->level);
c->sent += n;
qc->streams.sent += n;
max_flow -= n;
if (limit) {
max_limit -= n;
}
#if (NGX_DEBUG)
sent += n;
nframes++;
#endif
for (p = frame->data; n > 0; cl = cl->next) {
b = cl->buf;
if (!ngx_buf_in_memory(b)) {
continue;
}
len = ngx_min(n, (size_t) (b->last - b->pos));
p = ngx_cpymem(p, b->pos, len);
b->pos += len;
n -= len;
}
ngx_quic_queue_frame(qc, frame);
}
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send %uz of %uz, sent:%O, unacked:%uL",
n, size, c->sent, (uint64_t) c->sent - qs->acked);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send_chain sent:%uz, frames:%ui", sent, nframes);
if (n != size) {
c->write->ready = 0;
return in;
}
static size_t
ngx_quic_max_stream_frame(ngx_quic_connection_t *qc)
{
/*
* we need to fit at least 1 frame into a packet, thus account head/tail;
* 25 = 1 + 8x3 is max header for STREAM frame, with 1 byte for frame type
*/
return qc->ctp.max_udp_payload_size - NGX_QUIC_MAX_SHORT_HEADER - 25
- EVP_GCM_TLS_TAG_LEN;
}
static size_t
ngx_quic_max_stream_flow(ngx_connection_t *c)
{
size_t size;
uint64_t sent, unacked;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
qs = c->qs;
qc = qs->parent->quic;
size = NGX_QUIC_STREAM_BUFSIZE;
sent = c->sent;
unacked = sent - qs->acked;
if (qc->streams.send_max_data == 0) {
qc->streams.send_max_data = qc->ctp.initial_max_data;
}
if (n == 0) {
return NGX_AGAIN;
if (unacked >= NGX_QUIC_STREAM_BUFSIZE) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send flow hit buffer size");
return 0;
}
return n;
if (unacked + size > NGX_QUIC_STREAM_BUFSIZE) {
size = NGX_QUIC_STREAM_BUFSIZE - unacked;
}
if (qc->streams.sent >= qc->streams.send_max_data) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send flow hit MAX_DATA");
return 0;
}
if (qc->streams.sent + size > qc->streams.send_max_data) {
size = qc->streams.send_max_data - qc->streams.sent;
}
if (sent >= qs->send_max_data) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send flow hit MAX_STREAM_DATA");
return 0;
}
if (sent + size > qs->send_max_data) {
size = qs->send_max_data - sent;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send flow: %uz", size);
return size;
}
@ -4431,48 +4529,6 @@ ngx_quic_stream_cleanup_handler(void *data)
}
static ngx_chain_t *
ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in,
off_t limit)
{
size_t len;
ssize_t n;
ngx_buf_t *b;
for ( /* void */; in; in = in->next) {
b = in->buf;
if (!ngx_buf_in_memory(b)) {
continue;
}
if (ngx_buf_size(b) == 0) {
continue;
}
len = b->last - b->pos;
n = ngx_quic_stream_send(c, b->pos, len);
if (n == NGX_ERROR) {
return NGX_CHAIN_ERROR;
}
if (n == NGX_AGAIN) {
return in;
}
b->pos += n;
if (n != (ssize_t) len) {
return in;
}
}
return NULL;
}
static ngx_quic_frame_t *
ngx_quic_alloc_frame(ngx_connection_t *c, size_t size)
{