Added reordering support for STREAM frames.

Each stream node now includes incoming frames queue and sent/received counters
for tracking offset. The sent counter is not used, c->sent is used, not like
in crypto buffers, which have no connections.
This commit is contained in:
Vladimir Homutov 2020-04-15 11:11:54 +03:00
parent 081682cd3c
commit 9542c975b7
2 changed files with 159 additions and 72 deletions

View File

@ -70,15 +70,6 @@ typedef struct {
} ngx_quic_send_ctx_t; } ngx_quic_send_ctx_t;
/* ordered frames stream context */
typedef struct {
uint64_t sent;
uint64_t received;
ngx_queue_t frames;
size_t total; /* size of buffered data */
} ngx_quic_frames_stream_t;
struct ngx_quic_connection_s { struct ngx_quic_connection_s {
ngx_str_t scid; ngx_str_t scid;
ngx_str_t dcid; ngx_str_t dcid;
@ -177,7 +168,12 @@ static ngx_int_t ngx_quic_handle_ordered_frame(ngx_connection_t *c,
static ngx_int_t ngx_quic_crypto_input(ngx_connection_t *c, static ngx_int_t ngx_quic_crypto_input(ngx_connection_t *c,
ngx_quic_frame_t *frame); ngx_quic_frame_t *frame);
static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c, static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame); ngx_quic_header_t *pkt, ngx_quic_frame_t *frame);
static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
ngx_quic_frame_t *frame);
static ngx_quic_stream_t *ngx_quic_add_stream(ngx_connection_t *c,
ngx_quic_stream_frame_t *f);
static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c); static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c);
static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
@ -739,6 +735,7 @@ ngx_quic_close_connection(ngx_connection_t *c)
#if (NGX_DEBUG) #if (NGX_DEBUG)
ngx_uint_t ns; ngx_uint_t ns;
#endif #endif
ngx_uint_t i;
ngx_pool_t *pool; ngx_pool_t *pool;
ngx_event_t *rev; ngx_event_t *rev;
ngx_rbtree_t *tree; ngx_rbtree_t *tree;
@ -748,11 +745,14 @@ ngx_quic_close_connection(ngx_connection_t *c)
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "close quic connection"); ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "close quic connection");
// TODO: free frames from reorder queue if any
qc = c->quic; qc = c->quic;
if (qc) { if (qc) {
for (i = 0; i < NGX_QUIC_ENCRYPTION_LAST; i++) {
ngx_quic_free_frames(c, &qc->crypto[i].frames);
}
qc->closing = 1; qc->closing = 1;
tree = &qc->streams.tree; tree = &qc->streams.tree;
@ -1201,9 +1201,7 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
case NGX_QUIC_FT_STREAM6: case NGX_QUIC_FT_STREAM6:
case NGX_QUIC_FT_STREAM7: case NGX_QUIC_FT_STREAM7:
if (ngx_quic_handle_stream_frame(c, pkt, &frame.u.stream) if (ngx_quic_handle_stream_frame(c, pkt, &frame) != NGX_OK) {
!= NGX_OK)
{
return NGX_ERROR; return NGX_ERROR;
} }
@ -1441,6 +1439,7 @@ ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs,
ngx_quic_frame_t *frame, ngx_quic_frame_handler_pt handler) ngx_quic_frame_t *frame, ngx_quic_frame_handler_pt handler)
{ {
size_t full_len; size_t full_len;
ngx_int_t rc;
ngx_queue_t *q; ngx_queue_t *q;
ngx_quic_ordered_frame_t *f; ngx_quic_ordered_frame_t *f;
@ -1468,10 +1467,17 @@ ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs,
/* f->offset == fs->received */ /* f->offset == fs->received */
if (handler(c, frame) != NGX_OK) { rc = handler(c, frame);
if (rc == NGX_ERROR) {
return NGX_ERROR; return NGX_ERROR;
} else if (rc == NGX_DONE) {
/* handler destroyed stream, queue no longer exists */
return NGX_OK;
} }
/* rc == NGX_OK */
fs->received += f->length; fs->received += f->length;
/* now check the queue if we can continue with buffered frames */ /* now check the queue if we can continue with buffered frames */
@ -1512,8 +1518,14 @@ ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs,
/* f->offset == fs->received */ /* f->offset == fs->received */
if (handler(c, frame) != NGX_OK) { rc = handler(c, frame);
if (rc == NGX_ERROR) {
return NGX_ERROR; return NGX_ERROR;
} else if (rc == NGX_DONE) {
/* handler destroyed stream, queue no longer exists */
return NGX_OK;
} }
fs->received += f->length; fs->received += f->length;
@ -1721,20 +1733,54 @@ ngx_quic_crypto_input(ngx_connection_t *c, ngx_quic_frame_t *frame)
static ngx_int_t static ngx_int_t
ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f) ngx_quic_frame_t *frame)
{ {
size_t n; ngx_quic_stream_t *sn;
ngx_buf_t *b; ngx_quic_connection_t *qc;
ngx_event_t *rev; ngx_quic_stream_frame_t *f;
ngx_quic_stream_t *sn; ngx_quic_frames_stream_t *fs;
ngx_quic_connection_t *qc;
qc = c->quic; qc = c->quic;
f = &frame->u.stream;
sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
if (sn) { if (sn == NULL) {
sn = ngx_quic_add_stream(c, f);
if (sn == NULL) {
return NGX_ERROR;
}
}
fs = &sn->fs;
return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input);
}
static ngx_int_t
ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame)
{
ngx_buf_t *b;
ngx_event_t *rev;
ngx_quic_stream_t *sn;
ngx_quic_connection_t *qc;
ngx_quic_stream_frame_t *f;
qc = c->quic;
f = &frame->u.stream;
sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
if (sn == NULL) {
// TODO: possible?
// deleted while stream is in reordering queue?
return NGX_ERROR;
}
if (sn->fs.received != 0) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
b = sn->b; b = sn->b;
@ -1761,29 +1807,14 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
rev->handler(rev); rev->handler(rev);
} }
/* check if stream was destroyed */
if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
return NGX_DONE;
}
return NGX_OK; return NGX_OK;
} }
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
? qc->tp.initial_max_stream_data_uni
: qc->tp.initial_max_stream_data_bidi_remote;
if (n < NGX_QUIC_STREAM_BUFSIZE) {
n = NGX_QUIC_STREAM_BUFSIZE;
}
if (n < f->length) {
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
return NGX_ERROR;
}
sn = ngx_quic_create_stream(c, f->stream_id, n);
if (sn == NULL) {
return NGX_ERROR;
}
b = sn->b; b = sn->b;
b->last = ngx_cpymem(b->last, f->data, f->length); b->last = ngx_cpymem(b->last, f->data, f->length);
@ -1800,10 +1831,50 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
qc->streams.handler(sn->c); qc->streams.handler(sn->c);
/* check if stream was destroyed */
if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
return NGX_DONE;
}
return NGX_OK; return NGX_OK;
} }
static ngx_quic_stream_t *
ngx_quic_add_stream(ngx_connection_t *c, ngx_quic_stream_frame_t *f)
{
size_t n;
ngx_quic_stream_t *sn;
ngx_quic_connection_t *qc;
qc = c->quic;
// TODO: check increasing IDs
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
? qc->tp.initial_max_stream_data_uni
: qc->tp.initial_max_stream_data_bidi_remote;
if (n < NGX_QUIC_STREAM_BUFSIZE) {
n = NGX_QUIC_STREAM_BUFSIZE;
}
if (n < f->length) {
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
return NULL;
}
sn = ngx_quic_create_stream(c, f->stream_id, n);
if (sn == NULL) {
return NULL;
}
return sn;
}
static ngx_int_t static ngx_int_t
ngx_quic_handle_max_streams(ngx_connection_t *c) ngx_quic_handle_max_streams(ngx_connection_t *c)
{ {
@ -2024,7 +2095,6 @@ ngx_quic_output_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
return NGX_ERROR; return NGX_ERROR;
} }
} while (q != ngx_queue_sentinel(&ctx->frames)); } while (q != ngx_queue_sentinel(&ctx->frames));
return NGX_OK; return NGX_OK;
@ -2037,15 +2107,19 @@ ngx_quic_free_frames(ngx_connection_t *c, ngx_queue_t *frames)
ngx_queue_t *q; ngx_queue_t *q;
ngx_quic_frame_t *f; ngx_quic_frame_t *f;
q = ngx_queue_head(frames);
do { do {
q = ngx_queue_head(frames);
if (q == ngx_queue_sentinel(frames)) {
break;
}
ngx_queue_remove(q);
f = ngx_queue_data(q, ngx_quic_frame_t, queue); f = ngx_queue_data(q, ngx_quic_frame_t, queue);
q = ngx_queue_next(q);
ngx_quic_free_frame(c, f); ngx_quic_free_frame(c, f);
} while (1);
} while (q != ngx_queue_sentinel(frames));
} }
@ -2237,7 +2311,7 @@ ngx_quic_retransmit_handler(ngx_event_t *ev)
static void static void
ngx_quic_push_handler(ngx_event_t *ev) ngx_quic_push_handler(ngx_event_t *ev)
{ {
ngx_connection_t *c; ngx_connection_t *c;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "push timer"); ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "push timer");
@ -2430,6 +2504,8 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
return NULL; return NULL;
} }
ngx_queue_init(&sn->fs.frames);
log = ngx_palloc(pool, sizeof(ngx_log_t)); log = ngx_palloc(pool, sizeof(ngx_log_t));
if (log == NULL) { if (log == NULL) {
ngx_destroy_pool(pool); ngx_destroy_pool(pool);
@ -2595,6 +2671,8 @@ ngx_quic_stream_cleanup_handler(void *data)
return; return;
} }
ngx_quic_free_frames(pc, &qs->fs.frames);
if ((qs->id & 0x03) == NGX_QUIC_STREAM_UNIDIRECTIONAL) { if ((qs->id & 0x03) == NGX_QUIC_STREAM_UNIDIRECTIONAL) {
/* do not send fin for client unidirectional streams */ /* do not send fin for client unidirectional streams */
return; return;

View File

@ -29,33 +29,42 @@
typedef struct { typedef struct {
/* configurable */ /* configurable */
ngx_msec_t max_idle_timeout; ngx_msec_t max_idle_timeout;
ngx_msec_t max_ack_delay; ngx_msec_t max_ack_delay;
ngx_uint_t max_packet_size; ngx_uint_t max_packet_size;
ngx_uint_t initial_max_data; ngx_uint_t initial_max_data;
ngx_uint_t initial_max_stream_data_bidi_local; ngx_uint_t initial_max_stream_data_bidi_local;
ngx_uint_t initial_max_stream_data_bidi_remote; ngx_uint_t initial_max_stream_data_bidi_remote;
ngx_uint_t initial_max_stream_data_uni; ngx_uint_t initial_max_stream_data_uni;
ngx_uint_t initial_max_streams_bidi; ngx_uint_t initial_max_streams_bidi;
ngx_uint_t initial_max_streams_uni; ngx_uint_t initial_max_streams_uni;
ngx_uint_t ack_delay_exponent; ngx_uint_t ack_delay_exponent;
ngx_uint_t disable_active_migration; ngx_uint_t disable_active_migration;
ngx_uint_t active_connection_id_limit; ngx_uint_t active_connection_id_limit;
/* TODO */ /* TODO */
ngx_uint_t original_connection_id; ngx_uint_t original_connection_id;
u_char stateless_reset_token[16]; u_char stateless_reset_token[16];
void *preferred_address; void *preferred_address;
} ngx_quic_tp_t; } ngx_quic_tp_t;
typedef struct {
uint64_t sent;
uint64_t received;
ngx_queue_t frames; /* reorder queue */
size_t total; /* size of buffered data */
} ngx_quic_frames_stream_t;
struct ngx_quic_stream_s { struct ngx_quic_stream_s {
ngx_rbtree_node_t node; ngx_rbtree_node_t node;
ngx_connection_t *parent; ngx_connection_t *parent;
ngx_connection_t *c; ngx_connection_t *c;
uint64_t id; uint64_t id;
ngx_buf_t *b; ngx_buf_t *b;
ngx_quic_frames_stream_t fs;
}; };