Better flow control and buffering for QUIC streams.

This commit is contained in:
Roman Arutyunyan 2020-03-23 15:49:31 +03:00
parent 72b0a1b32a
commit 3fa1dec9c7
2 changed files with 114 additions and 9 deletions

View File

@ -16,6 +16,9 @@ typedef enum {
} ngx_quic_state_t; } ngx_quic_state_t;
#define NGX_QUIC_STREAM_BUFSIZE 16384
typedef struct { typedef struct {
ngx_rbtree_node_t node; ngx_rbtree_node_t node;
ngx_buf_t *b; ngx_buf_t *b;
@ -106,6 +109,8 @@ static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame); ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame);
static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f);
static void ngx_quic_queue_frame(ngx_quic_connection_t *qc, static void ngx_quic_queue_frame(ngx_quic_connection_t *qc,
ngx_quic_frame_t *frame); ngx_quic_frame_t *frame);
@ -885,6 +890,18 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
ack_this = 1; ack_this = 1;
break; break;
case NGX_QUIC_FT_STREAM_DATA_BLOCKED:
if (ngx_quic_handle_stream_data_blocked_frame(c, pkt,
&frame.u.stream_data_blocked)
!= NGX_OK)
{
return NGX_ERROR;
}
ack_this = 1;
break;
default: default:
return NGX_ERROR; return NGX_ERROR;
} }
@ -1002,6 +1019,7 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f) ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f)
{ {
ngx_buf_t *b; ngx_buf_t *b;
ngx_event_t *rev;
ngx_quic_connection_t *qc; ngx_quic_connection_t *qc;
ngx_quic_stream_node_t *sn; ngx_quic_stream_node_t *sn;
@ -1013,15 +1031,24 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
b = sn->b; b = sn->b;
if ((size_t) (b->end - b->pos) < f->length) { if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
return NGX_ERROR; return NGX_ERROR;
} }
ngx_memcpy(b->pos, f->data, f->length); if ((size_t) (b->end - b->last) < f->length) {
b->pos += f->length; b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
b->pos = b->start;
}
// TODO: notify b->last = ngx_cpymem(b->last, f->data, f->length);
rev = sn->c->read;
rev->ready = 1;
if (rev->active) {
rev->handler(rev);
}
return NGX_OK; return NGX_OK;
} }
@ -1071,6 +1098,48 @@ ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
} }
static ngx_int_t
ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
{
size_t n;
ngx_buf_t *b;
ngx_quic_frame_t *frame;
ngx_quic_connection_t *qc;
ngx_quic_stream_node_t *sn;
qc = c->quic;
sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
if (sn == NULL) {
ngx_log_error(NGX_LOG_INFO, c->log, 0, "unknown stream id:%uL", f->id);
return NGX_ERROR;
}
b = sn->b;
n = (b->pos - b->start) + (b->end - b->last);
frame = ngx_pcalloc(c->pool, sizeof(ngx_quic_frame_t));
if (frame == NULL) {
return NGX_ERROR;
}
frame->level = pkt->level;
frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
frame->u.max_stream_data.id = f->id;
frame->u.max_stream_data.limit = n;
ngx_sprintf(frame->info, "MAX_STREAM_DATA id:%d limit:%d level=%d",
(int) frame->u.max_stream_data.id,
(int) frame->u.max_stream_data.limit,
frame->level);
ngx_quic_queue_frame(c->quic, frame);
return NGX_OK;
}
static void static void
ngx_quic_queue_frame(ngx_quic_connection_t *qc, ngx_quic_frame_t *frame) ngx_quic_queue_frame(ngx_quic_connection_t *qc, ngx_quic_frame_t *frame)
{ {
@ -1349,6 +1418,7 @@ ngx_quic_find_stream(ngx_rbtree_t *rbtree, ngx_uint_t key)
static ngx_quic_stream_node_t * static ngx_quic_stream_node_t *
ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id) ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id)
{ {
size_t n;
ngx_log_t *log; ngx_log_t *log;
ngx_pool_t *pool; ngx_pool_t *pool;
ngx_event_t *rev, *wev; ngx_event_t *rev, *wev;
@ -1402,8 +1472,11 @@ ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id)
sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
n = ngx_max(NGX_QUIC_STREAM_BUFSIZE,
qc->tp.initial_max_stream_data_bidi_remote);
sn->node.key =id; sn->node.key =id;
sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone sn->b = ngx_create_temp_buf(pool, n);
if (sn->b == NULL) { if (sn->b == NULL) {
return NULL; return NULL;
} }
@ -1456,11 +1529,10 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
b = sn->b; b = sn->b;
if (b->last - b->pos == 0) { if (b->pos == b->last) {
c->read->ready = 0; c->read->ready = 0;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic recv() not ready");
"quic recv() not ready"); return NGX_AGAIN;
return NGX_AGAIN; // ?
} }
len = ngx_min(b->last - b->pos, (ssize_t) size); len = ngx_min(b->last - b->pos, (ssize_t) size);
@ -1469,6 +1541,11 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
b->pos += len; b->pos += len;
if (b->pos == b->last) {
b->pos = b->start;
b->last = b->start;
}
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic recv: %z of %uz", len, size); "quic recv: %z of %uz", len, size);

View File

@ -69,6 +69,8 @@ static size_t ngx_quic_create_crypto(u_char *p,
static size_t ngx_quic_create_stream(u_char *p, ngx_quic_stream_frame_t *sf); static size_t ngx_quic_create_stream(u_char *p, ngx_quic_stream_frame_t *sf);
static size_t ngx_quic_create_max_streams(u_char *p, static size_t ngx_quic_create_max_streams(u_char *p,
ngx_quic_max_streams_frame_t *ms); ngx_quic_max_streams_frame_t *ms);
static size_t ngx_quic_create_max_stream_data(u_char *p,
ngx_quic_max_stream_data_frame_t *ms);
static size_t ngx_quic_create_close(u_char *p, ngx_quic_close_frame_t *cl); static size_t ngx_quic_create_close(u_char *p, ngx_quic_close_frame_t *cl);
static ngx_int_t ngx_quic_parse_transport_param(u_char *p, u_char *end, static ngx_int_t ngx_quic_parse_transport_param(u_char *p, u_char *end,
@ -1079,6 +1081,9 @@ ngx_quic_create_frame(u_char *p, u_char *end, ngx_quic_frame_t *f)
case NGX_QUIC_FT_MAX_STREAMS: case NGX_QUIC_FT_MAX_STREAMS:
return ngx_quic_create_max_streams(p, &f->u.max_streams); return ngx_quic_create_max_streams(p, &f->u.max_streams);
case NGX_QUIC_FT_MAX_STREAM_DATA:
return ngx_quic_create_max_stream_data(p, &f->u.max_stream_data);
default: default:
/* BUG: unsupported frame type generated */ /* BUG: unsupported frame type generated */
return NGX_ERROR; return NGX_ERROR;
@ -1459,6 +1464,29 @@ ngx_quic_parse_transport_params(u_char *p, u_char *end, ngx_quic_tp_t *tp,
} }
static size_t
ngx_quic_create_max_stream_data(u_char *p, ngx_quic_max_stream_data_frame_t *ms)
{
size_t len;
u_char *start;
if (p == NULL) {
len = ngx_quic_varint_len(NGX_QUIC_FT_MAX_STREAM_DATA);
len += ngx_quic_varint_len(ms->id);
len += ngx_quic_varint_len(ms->limit);
return len;
}
start = p;
ngx_quic_build_int(&p, NGX_QUIC_FT_MAX_STREAM_DATA);
ngx_quic_build_int(&p, ms->id);
ngx_quic_build_int(&p, ms->limit);
return p - start;
}
ssize_t ssize_t
ngx_quic_create_transport_params(u_char *pos, u_char *end, ngx_quic_tp_t *tp) ngx_quic_create_transport_params(u_char *pos, u_char *end, ngx_quic_tp_t *tp)
{ {