Implemented creation of server unidirectional streams.

The ngx_quic_create_stream() function is a generic function extracted from
the ngx_quic_handle_stream_frame() function.
This commit is contained in:
Vladimir Homutov 2020-03-18 13:49:39 +03:00
parent d36684447c
commit 2973465556

View File

@ -22,6 +22,8 @@ typedef struct {
ngx_rbtree_node_t sentinel;
ngx_msec_t timeout;
ngx_connection_handler_pt handler;
ngx_uint_t id_counter;
} ngx_quic_streams_t;
@ -101,6 +103,8 @@ static void ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp,
ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
static ngx_quic_stream_node_t *ngx_quic_find_stream(ngx_rbtree_t *rbtree,
ngx_uint_t key);
static ngx_quic_stream_node_t *ngx_quic_create_stream(ngx_connection_t *c,
ngx_uint_t id);
static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
size_t size);
static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
@ -916,9 +920,6 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f)
{
ngx_buf_t *b;
ngx_log_t *log;
ngx_pool_t *pool;
ngx_event_t *rev, *wev;
ngx_quic_connection_t *qc;
ngx_quic_stream_node_t *sn;
@ -945,69 +946,16 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
sn = ngx_pcalloc(c->pool, sizeof(ngx_quic_stream_node_t));
sn = ngx_quic_create_stream(c, f->stream_id);
if (sn == NULL) {
return NGX_ERROR;
}
sn->c = ngx_get_connection(-1, c->log); // TODO: free on connection termination
if (sn->c == NULL) {
return NGX_ERROR;
}
pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
if (pool == NULL) {
/* XXX free connection */
return NGX_ERROR;
}
log = ngx_palloc(pool, sizeof(ngx_log_t));
if (log == NULL) {
/* XXX free pool and connection */
return NGX_ERROR;
}
*log = *c->log;
pool->log = log;
sn->c->log = log;
sn->c->pool = pool;
sn->c->listening = c->listening;
sn->c->sockaddr = c->sockaddr;
sn->c->local_sockaddr = c->local_sockaddr;
rev = sn->c->read;
wev = sn->c->write;
rev->ready = 1;
rev->log = c->log;
wev->log = c->log;
sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
sn->node.key = f->stream_id;
sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone
if (sn->b == NULL) {
return NGX_ERROR;
}
b = sn->b;
ngx_memcpy(b->start, f->data, f->length);
b->last = b->start + f->length;
ngx_rbtree_insert(&qc->streams.tree, &sn->node);
sn->s.id = f->stream_id;
sn->s.unidirectional = (sn->s.id & 0x02) ? 1 : 0;
sn->s.parent = c;
sn->c->qs = &sn->s;
sn->c->recv = ngx_quic_stream_recv;
sn->c->send = ngx_quic_stream_send;
sn->c->send_chain = ngx_quic_stream_send_chain;
qc->streams.handler(sn->c);
return NGX_OK;
@ -1184,8 +1132,34 @@ ngx_quic_send_packet(ngx_connection_t *c, ngx_quic_connection_t *qc,
ngx_connection_t *
ngx_quic_create_uni_stream(ngx_connection_t *c)
{
/* XXX */
return NULL;
ngx_uint_t id;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
ngx_quic_stream_node_t *sn;
qs = c->qs;
qc = qs->parent->quic;
/*
* A stream ID is a 62-bit integer that is unique for all streams
* on a connection.
*
* 0x3 | Server-Initiated, Unidirectional
*/
id = (qc->streams.id_counter << 2) | 0x3;
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"creating server uni stream #%ui id %ui",
qc->streams.id_counter, id);
qc->streams.id_counter++;
sn = ngx_quic_create_stream(qs->parent, id);
if (sn == NULL) {
return NULL;
}
return sn->c;
}
@ -1254,6 +1228,81 @@ ngx_quic_find_stream(ngx_rbtree_t *rbtree, ngx_uint_t key)
}
static ngx_quic_stream_node_t *
ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id)
{
ngx_log_t *log;
ngx_pool_t *pool;
ngx_event_t *rev, *wev;
ngx_quic_connection_t *qc;
ngx_quic_stream_node_t *sn;
qc = c->quic;
sn = ngx_pcalloc(c->pool, sizeof(ngx_quic_stream_node_t));
if (sn == NULL) {
return NULL;
}
sn->c = ngx_get_connection(-1, c->log); // TODO: free on connection termination
if (sn->c == NULL) {
return NULL;
}
pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
if (pool == NULL) {
/* XXX free connection */
// TODO: add pool cleanup handdler
return NULL;
}
log = ngx_palloc(pool, sizeof(ngx_log_t));
if (log == NULL) {
/* XXX free pool and connection */
return NULL;
}
*log = *c->log;
pool->log = log;
sn->c->log = log;
sn->c->pool = pool;
sn->c->listening = c->listening;
sn->c->sockaddr = c->sockaddr;
sn->c->local_sockaddr = c->local_sockaddr;
rev = sn->c->read;
wev = sn->c->write;
rev->ready = 1;
rev->log = c->log;
wev->log = c->log;
sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
sn->node.key =id;
sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone
if (sn->b == NULL) {
return NULL;
}
ngx_rbtree_insert(&qc->streams.tree, &sn->node);
sn->s.id = id;
sn->s.unidirectional = (sn->s.id & 0x02) ? 1 : 0;
sn->s.parent = c;
sn->c->qs = &sn->s;
sn->c->recv = ngx_quic_stream_recv;
sn->c->send = ngx_quic_stream_send;
sn->c->send_chain = ngx_quic_stream_send_chain;
return sn;
}
static ssize_t
ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
{