mirror of
https://github.com/nginx/nginx.git
synced 2024-12-12 02:09:04 +08:00
QUIC: generic buffering for stream input.
Previously each stream had an input buffer. Now memory is allocated as bytes arrive. Generic buffering mechanism is used for this.
This commit is contained in:
parent
8f0d5edf63
commit
a8c8b33144
@ -79,7 +79,8 @@ struct ngx_quic_stream_s {
|
||||
uint64_t id;
|
||||
uint64_t acked;
|
||||
uint64_t send_max_data;
|
||||
ngx_buf_t *b;
|
||||
uint64_t recv_max_data;
|
||||
ngx_chain_t *in;
|
||||
ngx_quic_frames_stream_t *fs;
|
||||
ngx_uint_t cancelable; /* unsigned cancelable:1; */
|
||||
};
|
||||
|
@ -432,8 +432,6 @@ ngx_quic_detect_lost(ngx_connection_t *c)
|
||||
void
|
||||
ngx_quic_resend_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
|
||||
{
|
||||
size_t n;
|
||||
ngx_buf_t *b;
|
||||
ngx_queue_t *q;
|
||||
ngx_quic_frame_t *f, *start;
|
||||
ngx_quic_stream_t *qs;
|
||||
@ -497,13 +495,7 @@ ngx_quic_resend_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
|
||||
break;
|
||||
}
|
||||
|
||||
b = qs->b;
|
||||
n = qs->fs->received + (b->pos - b->start) + (b->end - b->last);
|
||||
|
||||
if (f->u.max_stream_data.limit < n) {
|
||||
f->u.max_stream_data.limit = n;
|
||||
}
|
||||
|
||||
f->u.max_stream_data.limit = qs->recv_max_data;
|
||||
ngx_quic_queue_frame(qc, f);
|
||||
break;
|
||||
|
||||
|
@ -13,7 +13,6 @@
|
||||
#define NGX_QUIC_BUFFER_SIZE 4096
|
||||
|
||||
|
||||
static void ngx_quic_free_bufs(ngx_connection_t *c, ngx_chain_t *in);
|
||||
static ngx_chain_t *ngx_quic_split_bufs(ngx_connection_t *c, ngx_chain_t *in,
|
||||
size_t len);
|
||||
|
||||
@ -84,7 +83,7 @@ ngx_quic_free_frame(ngx_connection_t *c, ngx_quic_frame_t *frame)
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
void
|
||||
ngx_quic_free_bufs(ngx_connection_t *c, ngx_chain_t *in)
|
||||
{
|
||||
ngx_buf_t *b, *shadow;
|
||||
|
@ -28,6 +28,7 @@ ngx_chain_t *ngx_quic_copy_buf(ngx_connection_t *c, u_char *data,
|
||||
size_t len);
|
||||
ngx_chain_t *ngx_quic_copy_chain(ngx_connection_t *c, ngx_chain_t *in,
|
||||
size_t limit);
|
||||
void ngx_quic_free_bufs(ngx_connection_t *c, ngx_chain_t *in);
|
||||
|
||||
ngx_int_t ngx_quic_handle_ordered_frame(ngx_connection_t *c,
|
||||
ngx_quic_frames_stream_t *fs, ngx_quic_frame_t *frame,
|
||||
|
@ -16,7 +16,7 @@
|
||||
static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c,
|
||||
uint64_t id);
|
||||
static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
|
||||
uint64_t id, size_t rcvbuf_size);
|
||||
uint64_t id);
|
||||
static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
|
||||
size_t size);
|
||||
static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
|
||||
@ -30,7 +30,6 @@ static void ngx_quic_stream_cleanup_handler(void *data);
|
||||
ngx_connection_t *
|
||||
ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi)
|
||||
{
|
||||
size_t rcvbuf_size;
|
||||
uint64_t id;
|
||||
ngx_quic_stream_t *qs, *nqs;
|
||||
ngx_quic_connection_t *qc;
|
||||
@ -58,7 +57,6 @@ ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi)
|
||||
qc->streams.server_max_streams_bidi, id);
|
||||
|
||||
qc->streams.server_streams_bidi++;
|
||||
rcvbuf_size = qc->tp.initial_max_stream_data_bidi_local;
|
||||
|
||||
} else {
|
||||
if (qc->streams.server_streams_uni
|
||||
@ -81,10 +79,9 @@ ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi)
|
||||
qc->streams.server_max_streams_uni, id);
|
||||
|
||||
qc->streams.server_streams_uni++;
|
||||
rcvbuf_size = 0;
|
||||
}
|
||||
|
||||
nqs = ngx_quic_create_stream(qs->parent, id, rcvbuf_size);
|
||||
nqs = ngx_quic_create_stream(qs->parent, id);
|
||||
if (nqs == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
@ -235,7 +232,6 @@ ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
|
||||
static ngx_quic_stream_t *
|
||||
ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
|
||||
{
|
||||
size_t n;
|
||||
uint64_t min_id;
|
||||
ngx_quic_stream_t *qs;
|
||||
ngx_quic_connection_t *qc;
|
||||
@ -272,7 +268,6 @@ ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
|
||||
min_id = (qc->streams.client_streams_uni << 2)
|
||||
| NGX_QUIC_STREAM_UNIDIRECTIONAL;
|
||||
qc->streams.client_streams_uni = (id >> 2) + 1;
|
||||
n = qc->tp.initial_max_stream_data_uni;
|
||||
|
||||
} else {
|
||||
|
||||
@ -296,11 +291,6 @@ ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
|
||||
|
||||
min_id = (qc->streams.client_streams_bidi << 2);
|
||||
qc->streams.client_streams_bidi = (id >> 2) + 1;
|
||||
n = qc->tp.initial_max_stream_data_bidi_remote;
|
||||
}
|
||||
|
||||
if (n < NGX_QUIC_STREAM_BUFSIZE) {
|
||||
n = NGX_QUIC_STREAM_BUFSIZE;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -314,7 +304,7 @@ ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
|
||||
|
||||
for ( /* void */ ; min_id < id; min_id += 0x04) {
|
||||
|
||||
qs = ngx_quic_create_stream(c, min_id, n);
|
||||
qs = ngx_quic_create_stream(c, min_id);
|
||||
if (qs == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
@ -326,12 +316,12 @@ ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
|
||||
}
|
||||
}
|
||||
|
||||
return ngx_quic_create_stream(c, id, n);
|
||||
return ngx_quic_create_stream(c, id);
|
||||
}
|
||||
|
||||
|
||||
static ngx_quic_stream_t *
|
||||
ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
|
||||
ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
|
||||
{
|
||||
ngx_log_t *log;
|
||||
ngx_pool_t *pool;
|
||||
@ -360,12 +350,6 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
|
||||
qs->parent = c;
|
||||
qs->id = id;
|
||||
|
||||
qs->b = ngx_create_temp_buf(pool, rcvbuf_size);
|
||||
if (qs->b == NULL) {
|
||||
ngx_destroy_pool(pool);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
qs->fs = ngx_pcalloc(pool, sizeof(ngx_quic_frames_stream_t));
|
||||
if (qs->fs == NULL) {
|
||||
ngx_destroy_pool(pool);
|
||||
@ -420,13 +404,19 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
|
||||
if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
|
||||
if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
|
||||
qs->send_max_data = qc->ctp.initial_max_stream_data_uni;
|
||||
|
||||
} else {
|
||||
qs->recv_max_data = qc->tp.initial_max_stream_data_uni;
|
||||
}
|
||||
|
||||
} else {
|
||||
if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
|
||||
qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote;
|
||||
qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_local;
|
||||
|
||||
} else {
|
||||
qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
|
||||
qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote;
|
||||
}
|
||||
}
|
||||
|
||||
@ -449,8 +439,9 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
|
||||
static ssize_t
|
||||
ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
|
||||
{
|
||||
ssize_t len;
|
||||
ssize_t len, n;
|
||||
ngx_buf_t *b;
|
||||
ngx_chain_t *cl, **ll;
|
||||
ngx_event_t *rev;
|
||||
ngx_connection_t *pc;
|
||||
ngx_quic_frame_t *frame;
|
||||
@ -458,7 +449,6 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
|
||||
ngx_quic_connection_t *qc;
|
||||
|
||||
qs = c->quic;
|
||||
b = qs->b;
|
||||
pc = qs->parent;
|
||||
qc = ngx_quic_get_connection(pc);
|
||||
rev = c->read;
|
||||
@ -467,11 +457,11 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"quic stream recv id:0x%xL eof:%d avail:%z",
|
||||
qs->id, rev->pending_eof, b->last - b->pos);
|
||||
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"quic stream recv id:0x%xL eof:%d",
|
||||
qs->id, rev->pending_eof);
|
||||
|
||||
if (b->pos == b->last) {
|
||||
if (qs->in == NULL) {
|
||||
rev->ready = 0;
|
||||
|
||||
if (rev->pending_eof) {
|
||||
@ -484,16 +474,33 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
len = ngx_min(b->last - b->pos, (ssize_t) size);
|
||||
len = 0;
|
||||
cl = qs->in;
|
||||
|
||||
ngx_memcpy(buf, b->pos, len);
|
||||
for (ll = &cl; *ll; ll = &(*ll)->next) {
|
||||
b = (*ll)->buf;
|
||||
|
||||
n = ngx_min(b->last - b->pos, (ssize_t) size);
|
||||
buf = ngx_cpymem(buf, b->pos, n);
|
||||
|
||||
len += n;
|
||||
size -= n;
|
||||
b->pos += n;
|
||||
|
||||
if (b->pos != b->last) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
qs->in = *ll;
|
||||
*ll = NULL;
|
||||
|
||||
ngx_quic_free_bufs(pc, cl);
|
||||
|
||||
b->pos += len;
|
||||
qc->streams.received += len;
|
||||
qs->recv_max_data += len;
|
||||
|
||||
if (b->pos == b->last) {
|
||||
b->pos = b->start;
|
||||
b->last = b->start;
|
||||
if (qs->in == NULL) {
|
||||
rev->ready = rev->pending_eof;
|
||||
}
|
||||
|
||||
@ -510,8 +517,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
|
||||
frame->level = ssl_encryption_application;
|
||||
frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
|
||||
frame->u.max_stream_data.id = qs->id;
|
||||
frame->u.max_stream_data.limit = qs->fs->received + (b->pos - b->start)
|
||||
+ (b->end - b->last);
|
||||
frame->u.max_stream_data.limit = qs->recv_max_data;
|
||||
|
||||
ngx_quic_queue_frame(qc, frame);
|
||||
}
|
||||
@ -714,6 +720,7 @@ ngx_quic_stream_cleanup_handler(void *data)
|
||||
|
||||
ngx_rbtree_delete(&qc->streams.tree, &qs->node);
|
||||
ngx_quic_free_frames(pc, &qs->fs->frames);
|
||||
ngx_quic_free_bufs(pc, qs->in);
|
||||
|
||||
if (qc->closing) {
|
||||
/* schedule handler call to continue ngx_quic_close_connection() */
|
||||
@ -808,9 +815,7 @@ ngx_int_t
|
||||
ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
|
||||
ngx_quic_frame_t *frame)
|
||||
{
|
||||
size_t window;
|
||||
uint64_t last;
|
||||
ngx_buf_t *b;
|
||||
ngx_pool_t *pool;
|
||||
ngx_connection_t *sc;
|
||||
ngx_quic_stream_t *qs;
|
||||
@ -846,10 +851,8 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
|
||||
|
||||
sc = qs->connection;
|
||||
fs = qs->fs;
|
||||
b = qs->b;
|
||||
window = b->end - b->last;
|
||||
|
||||
if (last > window) {
|
||||
if (last > qs->recv_max_data) {
|
||||
qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
@ -867,10 +870,8 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
|
||||
}
|
||||
|
||||
fs = qs->fs;
|
||||
b = qs->b;
|
||||
window = (b->pos - b->start) + (b->end - b->last);
|
||||
|
||||
if (last > fs->received && last - fs->received > window) {
|
||||
if (last > qs->recv_max_data) {
|
||||
qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
|
||||
return NGX_ERROR;
|
||||
}
|
||||
@ -892,10 +893,11 @@ cleanup:
|
||||
ngx_int_t
|
||||
ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data)
|
||||
{
|
||||
ssize_t n;
|
||||
uint64_t id;
|
||||
ngx_buf_t *b;
|
||||
ngx_event_t *rev;
|
||||
ngx_chain_t *cl;
|
||||
ngx_chain_t *cl, **ll;
|
||||
ngx_quic_stream_t *qs;
|
||||
ngx_quic_connection_t *qc;
|
||||
ngx_quic_stream_frame_t *f;
|
||||
@ -905,24 +907,34 @@ ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data)
|
||||
|
||||
f = &frame->u.stream;
|
||||
id = f->stream_id;
|
||||
cl = frame->data;
|
||||
|
||||
b = qs->b;
|
||||
for (ll = &qs->in; *ll; ll = &(*ll)->next) {
|
||||
if ((*ll)->next) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0,
|
||||
"quic no space in stream buffer");
|
||||
/* append to last buffer */
|
||||
|
||||
b = (*ll)->buf;
|
||||
|
||||
while (cl && b->last != b->end) {
|
||||
n = ngx_min(cl->buf->last - cl->buf->pos, b->end - b->last);
|
||||
b->last = ngx_cpymem(b->last, cl->buf->pos, n);
|
||||
cl->buf->pos += n;
|
||||
|
||||
if (cl->buf->pos == cl->buf->last) {
|
||||
cl = cl->next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cl = ngx_quic_copy_chain(c, cl, 0);
|
||||
if (cl == NGX_CHAIN_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if ((size_t) (b->end - b->last) < f->length) {
|
||||
b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
|
||||
b->pos = b->start;
|
||||
}
|
||||
|
||||
for (cl = frame->data; cl; cl = cl->next) {
|
||||
b->last = ngx_cpymem(b->last, cl->buf->pos,
|
||||
cl->buf->last - cl->buf->pos);
|
||||
}
|
||||
*ll = cl;
|
||||
|
||||
rev = qs->connection->read;
|
||||
rev->ready = 1;
|
||||
@ -995,8 +1007,7 @@ ngx_int_t
|
||||
ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
|
||||
ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
|
||||
{
|
||||
size_t n;
|
||||
ngx_buf_t *b;
|
||||
uint64_t limit;
|
||||
ngx_quic_frame_t *frame;
|
||||
ngx_quic_stream_t *qs;
|
||||
ngx_quic_connection_t *qc;
|
||||
@ -1023,14 +1034,12 @@ ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
b = qs->b;
|
||||
n = b->end - b->last;
|
||||
limit = qs->recv_max_data;
|
||||
|
||||
qs->connection->listening->handler(qs->connection);
|
||||
|
||||
} else {
|
||||
b = qs->b;
|
||||
n = qs->fs->received + (b->pos - b->start) + (b->end - b->last);
|
||||
limit = qs->recv_max_data;
|
||||
}
|
||||
|
||||
frame = ngx_quic_alloc_frame(c);
|
||||
@ -1041,7 +1050,7 @@ ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
|
||||
frame->level = pkt->level;
|
||||
frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
|
||||
frame->u.max_stream_data.id = f->id;
|
||||
frame->u.max_stream_data.limit = n;
|
||||
frame->u.max_stream_data.limit = limit;
|
||||
|
||||
ngx_quic_queue_frame(qc, frame);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user