Implemented retransmission and retransmit queue.

All frames collected to packet are moved into a per-namespace send queue.
QUIC connection has a timer which fires on the closest max_ack_delay time.
The frame is deleted from the queue when a corresponding packet is acknowledged.

The NGX_QUIC_MAX_RETRANSMISSION is a timeout that defines maximum length
of retransmission of a frame.
This commit is contained in:
Vladimir Homutov 2020-04-01 17:06:26 +03:00
parent d7eeb2e30b
commit 7b1a3df37c
3 changed files with 376 additions and 94 deletions

View File

@ -44,7 +44,11 @@ typedef struct {
ngx_quic_secret_t client_secret;
ngx_quic_secret_t server_secret;
ngx_uint_t pnum;
uint64_t pnum;
uint64_t largest;
ngx_queue_t frames;
ngx_queue_t sent;
} ngx_quic_namespace_t;
@ -64,8 +68,9 @@ struct ngx_quic_connection_s {
uint64_t crypto_offset[NGX_QUIC_ENCRYPTION_LAST];
ngx_ssl_t *ssl;
ngx_quic_frame_t *frames;
ngx_quic_frame_t *free_frames;
ngx_event_t retry;
ngx_queue_t free_frames;
#if (NGX_DEBUG)
ngx_uint_t nframes;
@ -133,8 +138,13 @@ static void ngx_quic_queue_frame(ngx_quic_connection_t *qc,
ngx_quic_frame_t *frame);
static ngx_int_t ngx_quic_output(ngx_connection_t *c);
ngx_int_t ngx_quic_frames_send(ngx_connection_t *c, ngx_quic_frame_t *start,
ngx_quic_frame_t *end, size_t total);
static ngx_int_t ngx_quic_output_ns(ngx_connection_t *c,
ngx_quic_namespace_t *ns, ngx_uint_t nsi);
static void ngx_quic_free_frames(ngx_connection_t *c, ngx_queue_t *frames);
static ngx_int_t ngx_quic_send_frames(ngx_connection_t *c, ngx_queue_t *frames);
static void ngx_quic_retransmit_handler(ngx_event_t *ev);
static ngx_int_t ngx_quic_retransmit_ns(ngx_connection_t *c,
ngx_quic_namespace_t *ns, ngx_msec_t *waitp);
static void ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp,
ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
@ -405,6 +415,7 @@ static ngx_int_t
ngx_quic_new_connection(ngx_connection_t *c, ngx_ssl_t *ssl, ngx_quic_tp_t *tp,
ngx_quic_header_t *pkt, ngx_connection_handler_pt handler)
{
ngx_uint_t i;
ngx_quic_tp_t *ctp;
ngx_quic_secrets_t *keys;
ngx_quic_connection_t *qc;
@ -441,6 +452,18 @@ ngx_quic_new_connection(ngx_connection_t *c, ngx_ssl_t *ssl, ngx_quic_tp_t *tp,
ngx_rbtree_init(&qc->streams.tree, &qc->streams.sentinel,
ngx_quic_rbtree_insert_stream);
for (i = 0; i < 3; i++) {
ngx_queue_init(&qc->ns[i].frames);
ngx_queue_init(&qc->ns[i].sent);
}
ngx_queue_init(&qc->free_frames);
qc->retry.log = c->log;
qc->retry.data = c;
qc->retry.handler = ngx_quic_retransmit_handler;
qc->retry.cancelable = 1;
c->quic = qc;
qc->ssl = ssl;
qc->tp = *tp;
@ -689,6 +712,10 @@ ngx_quic_close_connection(ngx_connection_t *c)
qc->closing = 1;
return;
}
if (qc->retry.timer_set) {
ngx_del_timer(&qc->retry);
}
}
if (c->ssl) {
@ -1129,7 +1156,9 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
: pkt->level;
ack_frame->type = NGX_QUIC_FT_ACK;
ack_frame->u.ack.pn = pkt->pn;
ack_frame->u.ack.largest = pkt->pn;
/* only ack immediate packet ]*/
ack_frame->u.ack.first_range = 0;
ngx_sprintf(ack_frame->info, "ACK for PN=%d from frame handler level=%d", pkt->pn, ack_frame->level);
ngx_quic_queue_frame(qc, ack_frame);
@ -1140,9 +1169,66 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
static ngx_int_t
ngx_quic_handle_ack_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
ngx_quic_ack_frame_t *f)
ngx_quic_ack_frame_t *ack)
{
/* TODO: handle ACK here */
ngx_uint_t found, min;
ngx_queue_t *q, range;
ngx_quic_frame_t *f;
ngx_quic_namespace_t *ns;
ns = &c->quic->ns[ngx_quic_ns(pkt->level)];
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"ngx_quic_handle_ack_frame in namespace %d",
ngx_quic_ns(pkt->level));
if (ack->first_range > ack->largest) {
ngx_log_error(NGX_LOG_INFO, c->log, 0,
"invalid first range in ack frame");
return NGX_ERROR;
}
min = ack->largest - ack->first_range;
found = 0;
ngx_queue_init(&range);
q = ngx_queue_head(&ns->sent);
while (q != ngx_queue_sentinel(&ns->sent)) {
f = ngx_queue_data(q, ngx_quic_frame_t, queue);
if (f->pnum >= min && f->pnum <= ack->largest) {
q = ngx_queue_next(q);
ngx_queue_remove(&f->queue);
ngx_quic_free_frame(c, f);
found = 1;
} else {
q = ngx_queue_next(q);
}
}
if (!found) {
if (ack->largest <= ns->pnum) {
/* duplicate ACK or ACK for non-ack-eliciting frame */
return NGX_OK;
}
ngx_log_error(NGX_LOG_INFO, c->log, 0,
"ACK for the packet not in sent queue ");
// TODO: handle error properly: PROTOCOL VIOLATION?
return NGX_ERROR;
}
/* 13.2.3. Receiver Tracking of ACK Frames */
if (ns->largest < ack->largest) {
ack->largest = ns->largest;
}
return NGX_OK;
}
@ -1380,99 +1466,146 @@ ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
static void
ngx_quic_queue_frame(ngx_quic_connection_t *qc, ngx_quic_frame_t *frame)
{
ngx_quic_frame_t **f;
ngx_quic_namespace_t *ns;
for (f = &qc->frames; *f; f = &(*f)->next) {
if ((*f)->level > frame->level) {
break;
}
}
ns = &qc->ns[ngx_quic_ns(frame->level)];
frame->next = *f;
*f = frame;
ngx_queue_insert_tail(&ns->frames, &frame->queue);
}
static ngx_int_t
ngx_quic_output(ngx_connection_t *c)
{
size_t len, hlen, n;
ngx_uint_t lvl;
ngx_quic_frame_t *f, *start, *next;
ngx_uint_t i;
ngx_quic_namespace_t *ns;
ngx_quic_connection_t *qc;
qc = c->quic;
if (qc->frames == NULL) {
return NGX_OK;
}
c->log->action = "sending frames";
lvl = qc->frames->level;
start = qc->frames;
f = start;
qc = c->quic;
do {
len = 0;
hlen = (lvl == ssl_encryption_application) ? NGX_QUIC_MAX_SHORT_HEADER
: NGX_QUIC_MAX_LONG_HEADER;
hlen += EVP_GCM_TLS_TAG_LEN;
do {
/* process same-level group of frames */
n = ngx_quic_create_frame(NULL, NULL, f);
if (len && hlen + len + n > qc->ctp.max_packet_size) {
break;
}
len += n;
f = f->next;
} while (f && f->level == lvl);
if (ngx_quic_frames_send(c, start, f, len) != NGX_OK) {
for (i = 0; i < 3; i++) {
ns = &qc->ns[i];
if (ngx_quic_output_ns(c, ns, i) != NGX_OK) {
return NGX_ERROR;
}
while (start != f) {
next = start->next;
ngx_quic_free_frame(c, start);
start = next;
}
if (f == NULL) {
break;
}
lvl = f->level; // TODO: must not decrease (ever, also between calls)
} while (1);
qc->frames = NULL;
}
if (!qc->send_timer_set) {
qc->send_timer_set = 1;
ngx_add_timer(c->read, qc->tp.max_idle_timeout);
}
if (!qc->retry.timer_set && !qc->closing) {
ngx_add_timer(&qc->retry, qc->tp.max_ack_delay * 1000);
}
return NGX_OK;
}
static ngx_int_t
ngx_quic_output_ns(ngx_connection_t *c, ngx_quic_namespace_t *ns,
ngx_uint_t nsi)
{
size_t len, hlen, n;
ngx_int_t rc;
ngx_queue_t *q, range;
ngx_quic_frame_t *f;
ngx_quic_connection_t *qc;
qc = c->quic;
if (ngx_queue_empty(&ns->frames)) {
return NGX_OK;
}
hlen = (nsi == 2) ? NGX_QUIC_MAX_SHORT_HEADER
: NGX_QUIC_MAX_LONG_HEADER;
hlen += EVP_GCM_TLS_TAG_LEN;
q = ngx_queue_head(&ns->frames);
do {
len = 0;
ngx_queue_init(&range);
do {
/* process group of frames that fits into packet */
f = ngx_queue_data(q, ngx_quic_frame_t, queue);
n = ngx_quic_create_frame(NULL, f);
if (len && hlen + len + n > qc->ctp.max_packet_size) {
break;
}
q = ngx_queue_next(q);
f->first = ngx_current_msec;
ngx_queue_remove(&f->queue);
ngx_queue_insert_tail(&range, &f->queue);
len += n;
} while (q != ngx_queue_sentinel(&ns->frames));
rc = ngx_quic_send_frames(c, &range);
if (rc == NGX_OK) {
/*
* frames are moved into the sent queue
* to wait for ack/be retransmitted
*/
ngx_queue_add(&ns->sent, &range);
} else if (rc == NGX_DONE) {
/* no ack is expected for this frames, can free them */
ngx_quic_free_frames(c, &range);
} else {
return NGX_ERROR;
}
} while (q != ngx_queue_sentinel(&ns->frames));
return NGX_OK;
}
static void
ngx_quic_free_frames(ngx_connection_t *c, ngx_queue_t *frames)
{
ngx_queue_t *q;
ngx_quic_frame_t *f;
q = ngx_queue_head(frames);
do {
f = ngx_queue_data(q, ngx_quic_frame_t, queue);
q = ngx_queue_next(q);
ngx_quic_free_frame(c, f);
} while (q != ngx_queue_sentinel(frames));
}
/* pack a group of frames [start; end) into memory p and send as single packet */
ngx_int_t
ngx_quic_frames_send(ngx_connection_t *c, ngx_quic_frame_t *start,
ngx_quic_frame_t *end, size_t total)
static ngx_int_t
ngx_quic_send_frames(ngx_connection_t *c, ngx_queue_t *frames)
{
ssize_t len;
u_char *p;
ngx_msec_t now;
ngx_str_t out, res;
ngx_quic_frame_t *f;
ngx_queue_t *q;
ngx_quic_frame_t *f, *start;
ngx_quic_header_t pkt;
ngx_quic_secrets_t *keys;
ngx_quic_namespace_t *ns;
@ -1481,24 +1614,41 @@ ngx_quic_frames_send(ngx_connection_t *c, ngx_quic_frame_t *start,
static u_char src[NGX_QUIC_DEFAULT_MAX_PACKET_SIZE];
static u_char dst[NGX_QUIC_DEFAULT_MAX_PACKET_SIZE];
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"sending frames %p...%p", start, end);
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "ngx_quic_send_frames");
q = ngx_queue_head(frames);
start = ngx_queue_data(q, ngx_quic_frame_t, queue);
ns = &c->quic->ns[ngx_quic_ns(start->level)];
ngx_memzero(&pkt, sizeof(ngx_quic_header_t));
p = src;
out.data = src;
for (f = start; f != end; f = f->next) {
for (q = ngx_queue_head(frames);
q != ngx_queue_sentinel(frames);
q = ngx_queue_next(q))
{
f = ngx_queue_data(q, ngx_quic_frame_t, queue);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, "frame: %s", f->info);
len = ngx_quic_create_frame(p, p + total, f);
len = ngx_quic_create_frame(p, f);
if (len == -1) {
return NGX_ERROR;
}
p += len;
f->pnum = ns->pnum;
}
if (start->level == ssl_encryption_initial) {
/* ack will not be sent in initial packets due to initial keys being
* discarded when handshake start.
* Thus consider initial packets as non-ack-eliciting
*/
pkt.need_ack = 0;
}
out.len = p - out.data;
@ -1508,14 +1658,13 @@ ngx_quic_frames_send(ngx_connection_t *c, ngx_quic_frame_t *start,
out.len++;
}
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"packet ready: %ui bytes at level %d",
out.len, start->level);
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
"packet ready: %ui bytes at level %d need_ack: %ui",
out.len, start->level, pkt.need_ack);
qc = c->quic;
keys = &c->quic->keys[start->level];
ns = &c->quic->ns[ngx_quic_ns(start->level)];
pkt.secret = &keys->server;
pkt.number = ns->pnum;
@ -1542,10 +1691,124 @@ ngx_quic_frames_send(ngx_connection_t *c, ngx_quic_frame_t *start,
ngx_quic_hexdump0(c->log, "packet to send", res.data, res.len);
c->send(c, res.data, res.len); // TODO: err handling
len = c->send(c, res.data, res.len);
if (len == NGX_ERROR || (size_t) len != res.len) {
return NGX_ERROR;
}
/* len == NGX_OK || NGX_AGAIN */
ns->pnum++;
now = ngx_current_msec;
start->last = now;
return pkt.need_ack ? NGX_OK : NGX_DONE;
}
static void
ngx_quic_retransmit_handler(ngx_event_t *ev)
{
ngx_uint_t i;
ngx_msec_t wait, nswait;
ngx_connection_t *c;
ngx_quic_connection_t *qc;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"retransmit timer");
c = ev->data;
qc = c->quic;
wait = 0;
for (i = 0; i < NGX_QUIC_NAMESPACE_LAST; i++) {
if (ngx_quic_retransmit_ns(c, &qc->ns[i], &nswait) != NGX_OK) {
ngx_quic_close_connection(c);
return;
}
if (i == 0) {
wait = nswait;
} else if (nswait > 0 && nswait < wait) {
wait = nswait;
}
}
if (wait > 0) {
ngx_add_timer(&qc->retry, wait);
}
}
static ngx_int_t
ngx_quic_retransmit_ns(ngx_connection_t *c, ngx_quic_namespace_t *ns,
ngx_msec_t *waitp)
{
uint64_t pn;
ngx_msec_t now, wait;
ngx_queue_t *q, range;
ngx_quic_frame_t *f, *start;
ngx_quic_connection_t *qc;
qc = c->quic;
now = ngx_current_msec;
wait = 0;
if (ngx_queue_empty(&ns->sent)) {
*waitp = 0;
return NGX_OK;
}
q = ngx_queue_head(&ns->sent);
start = ngx_queue_data(q, ngx_quic_frame_t, queue);
pn = start->pnum;
f = start;
do {
ngx_queue_init(&range);
/* send frames with same packet number to the wire */
do {
f = ngx_queue_data(q, ngx_quic_frame_t, queue);
if (start->first + qc->tp.max_idle_timeout < now) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"retransmission timeout");
return NGX_DECLINED;
}
if (f->pnum != pn) {
break;
}
q = ngx_queue_next(q);
ngx_queue_remove(&f->queue);
ngx_queue_insert_tail(&range, &f->queue);
} while (q != ngx_queue_sentinel(&ns->sent));
wait = start->last + qc->tp.max_ack_delay - now;
if ((ngx_msec_int_t) wait > 0) {
break;
}
/* NGX_DONE is impossible here, such frames don't get into this queue */
if (ngx_quic_send_frames(c, &range) != NGX_OK) {
return NGX_ERROR;
}
/* move frames group to the end of queue */
ngx_queue_add(&ns->sent, &range);
} while (q != ngx_queue_sentinel(&ns->sent));
*waitp = wait;
return NGX_OK;
}
@ -1903,6 +2166,7 @@ static ngx_quic_frame_t *
ngx_quic_alloc_frame(ngx_connection_t *c, size_t size)
{
u_char *p;
ngx_queue_t *q;
ngx_quic_frame_t *frame;
ngx_quic_connection_t *qc;
@ -1917,10 +2181,13 @@ ngx_quic_alloc_frame(ngx_connection_t *c, size_t size)
}
qc = c->quic;
frame = qc->free_frames;
if (frame) {
qc->free_frames = frame->next;
if (!ngx_queue_empty(&qc->free_frames)) {
q = ngx_queue_head(&qc->free_frames);
frame = ngx_queue_data(q, ngx_quic_frame_t, queue);
ngx_queue_remove(&frame->queue);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"reuse quic frame n:%ui", qc->nframes);
@ -1959,8 +2226,7 @@ ngx_quic_free_frame(ngx_connection_t *c, ngx_quic_frame_t *frame)
ngx_free(frame->data);
}
frame->next = qc->free_frames;
qc->free_frames = frame;
ngx_queue_insert_head(&qc->free_frames, &frame->queue);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"free quic frame n:%ui", qc->nframes);

View File

@ -1080,12 +1080,19 @@ not_allowed:
ssize_t
ngx_quic_create_frame(u_char *p, u_char *end, ngx_quic_frame_t *f)
ngx_quic_create_frame(u_char *p, ngx_quic_frame_t *f)
{
// TODO: handle end arg
/*
* QUIC-recovery, section 2:
*
* Ack-eliciting Frames: All frames other than ACK, PADDING, and
* CONNECTION_CLOSE are considered ack-eliciting.
*/
f->need_ack = 1;
switch (f->type) {
case NGX_QUIC_FT_ACK:
f->need_ack = 0;
return ngx_quic_create_ack(p, &f->u.ack);
case NGX_QUIC_FT_CRYPTO:
@ -1105,6 +1112,7 @@ ngx_quic_create_frame(u_char *p, u_char *end, ngx_quic_frame_t *f)
return ngx_quic_create_stream(p, &f->u.stream);
case NGX_QUIC_FT_CONNECTION_CLOSE:
f->need_ack = 0;
return ngx_quic_create_close(p, &f->u.close);
case NGX_QUIC_FT_MAX_STREAMS:
@ -1130,10 +1138,10 @@ ngx_quic_create_ack(u_char *p, ngx_quic_ack_frame_t *ack)
if (p == NULL) {
len = ngx_quic_varint_len(NGX_QUIC_FT_ACK);
len += ngx_quic_varint_len(ack->pn);
len += ngx_quic_varint_len(0);
len += ngx_quic_varint_len(ack->largest);
len += ngx_quic_varint_len(0);
len += ngx_quic_varint_len(0);
len += ngx_quic_varint_len(ack->first_range);
return len;
}
@ -1141,10 +1149,10 @@ ngx_quic_create_ack(u_char *p, ngx_quic_ack_frame_t *ack)
start = p;
ngx_quic_build_int(&p, NGX_QUIC_FT_ACK);
ngx_quic_build_int(&p, ack->pn);
ngx_quic_build_int(&p, 0);
ngx_quic_build_int(&p, ack->largest);
ngx_quic_build_int(&p, 0);
ngx_quic_build_int(&p, 0);
ngx_quic_build_int(&p, ack->first_range);
return p - start;
}

View File

@ -97,7 +97,6 @@
typedef struct {
ngx_uint_t pn;
uint64_t largest;
uint64_t delay;
uint64_t range_count;
@ -204,7 +203,13 @@ typedef struct ngx_quic_frame_s ngx_quic_frame_t;
struct ngx_quic_frame_s {
ngx_uint_t type;
enum ssl_encryption_level_t level;
ngx_quic_frame_t *next;
ngx_queue_t queue;
uint64_t pnum;
ngx_msec_t first;
ngx_msec_t last;
ngx_uint_t need_ack;
/* unsigned need_ack:1; */
u_char *data;
union {
ngx_quic_ack_frame_t ack;
@ -250,6 +255,9 @@ typedef struct {
uint64_t pn;
u_char *plaintext;
ngx_str_t payload; /* decrypted data */
ngx_uint_t need_ack;
/* unsigned need_ack:1; */
} ngx_quic_header_t;
@ -269,7 +277,7 @@ ngx_int_t ngx_quic_parse_handshake_header(ngx_quic_header_t *pkt);
ssize_t ngx_quic_parse_frame(ngx_quic_header_t *pkt, u_char *start, u_char *end,
ngx_quic_frame_t *frame);
ssize_t ngx_quic_create_frame(u_char *p, u_char *end, ngx_quic_frame_t *f);
ssize_t ngx_quic_create_frame(u_char *p, ngx_quic_frame_t *f);
ngx_int_t ngx_quic_parse_transport_params(u_char *p, u_char *end,
ngx_quic_tp_t *tp, ngx_log_t *log);