Request body: unbuffered reading.

The r->request_body_no_buffering flag was introduced.  It instructs
client request body reading code to avoid reading the whole body, and
to call post_handler early instead.  The caller should use the
ngx_http_read_unbuffered_request_body() function to read remaining
parts of the body.

Upstream module is now able to use this mode, if configured with
the proxy_request_buffering directive.
This commit is contained in:
Maxim Dounin 2015-03-23 21:09:19 +03:00
parent d02d2cff9b
commit 2743bb68ee
8 changed files with 306 additions and 25 deletions

View File

@ -292,6 +292,13 @@ static ngx_command_t ngx_http_proxy_commands[] = {
offsetof(ngx_http_proxy_loc_conf_t, upstream.buffering), offsetof(ngx_http_proxy_loc_conf_t, upstream.buffering),
NULL }, NULL },
{ ngx_string("proxy_request_buffering"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG,
ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_proxy_loc_conf_t, upstream.request_buffering),
NULL },
{ ngx_string("proxy_ignore_client_abort"), { ngx_string("proxy_ignore_client_abort"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG, NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG,
ngx_conf_set_flag_slot, ngx_conf_set_flag_slot,
@ -876,6 +883,15 @@ ngx_http_proxy_handler(ngx_http_request_t *r)
u->accel = 1; u->accel = 1;
if (!plcf->upstream.request_buffering
&& plcf->body_values == NULL && plcf->upstream.pass_request_body
&& !r->headers_in.chunked)
{
/* TODO: support chunked when using HTTP/1.1 */
r->request_body_no_buffering = 1;
}
rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init); rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
@ -1393,7 +1409,11 @@ ngx_http_proxy_create_request(ngx_http_request_t *r)
"http proxy header:%N\"%*s\"", "http proxy header:%N\"%*s\"",
(size_t) (b->last - b->pos), b->pos); (size_t) (b->last - b->pos), b->pos);
if (plcf->body_values == NULL && plcf->upstream.pass_request_body) { if (r->request_body_no_buffering) {
u->request_bufs = cl;
} else if (plcf->body_values == NULL && plcf->upstream.pass_request_body) {
body = u->request_bufs; body = u->request_bufs;
u->request_bufs = cl; u->request_bufs = cl;
@ -2582,6 +2602,7 @@ ngx_http_proxy_create_loc_conf(ngx_conf_t *cf)
conf->upstream.store_access = NGX_CONF_UNSET_UINT; conf->upstream.store_access = NGX_CONF_UNSET_UINT;
conf->upstream.next_upstream_tries = NGX_CONF_UNSET_UINT; conf->upstream.next_upstream_tries = NGX_CONF_UNSET_UINT;
conf->upstream.buffering = NGX_CONF_UNSET; conf->upstream.buffering = NGX_CONF_UNSET;
conf->upstream.request_buffering = NGX_CONF_UNSET;
conf->upstream.ignore_client_abort = NGX_CONF_UNSET; conf->upstream.ignore_client_abort = NGX_CONF_UNSET;
conf->upstream.force_ranges = NGX_CONF_UNSET; conf->upstream.force_ranges = NGX_CONF_UNSET;
@ -2691,6 +2712,9 @@ ngx_http_proxy_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_value(conf->upstream.buffering, ngx_conf_merge_value(conf->upstream.buffering,
prev->upstream.buffering, 1); prev->upstream.buffering, 1);
ngx_conf_merge_value(conf->upstream.request_buffering,
prev->upstream.request_buffering, 1);
ngx_conf_merge_value(conf->upstream.ignore_client_abort, ngx_conf_merge_value(conf->upstream.ignore_client_abort,
prev->upstream.ignore_client_abort, 0); prev->upstream.ignore_client_abort, 0);

View File

@ -138,6 +138,7 @@ ngx_int_t ngx_http_send_special(ngx_http_request_t *r, ngx_uint_t flags);
ngx_int_t ngx_http_read_client_request_body(ngx_http_request_t *r, ngx_int_t ngx_http_read_client_request_body(ngx_http_request_t *r,
ngx_http_client_body_handler_pt post_handler); ngx_http_client_body_handler_pt post_handler);
ngx_int_t ngx_http_read_unbuffered_request_body(ngx_http_request_t *r);
ngx_int_t ngx_http_send_header(ngx_http_request_t *r); ngx_int_t ngx_http_send_header(ngx_http_request_t *r);
ngx_int_t ngx_http_special_response_handler(ngx_http_request_t *r, ngx_int_t ngx_http_special_response_handler(ngx_http_request_t *r,

View File

@ -2525,6 +2525,11 @@ ngx_http_finalize_connection(ngx_http_request_t *r)
return; return;
} }
if (r->reading_body) {
r->keepalive = 0;
r->lingering_close = 1;
}
if (!ngx_terminate if (!ngx_terminate
&& !ngx_exiting && !ngx_exiting
&& r->keepalive && r->keepalive

View File

@ -473,6 +473,7 @@ struct ngx_http_request_s {
unsigned request_body_in_clean_file:1; unsigned request_body_in_clean_file:1;
unsigned request_body_file_group_access:1; unsigned request_body_file_group_access:1;
unsigned request_body_file_log_level:3; unsigned request_body_file_log_level:3;
unsigned request_body_no_buffering:1;
unsigned subrequest_in_memory:1; unsigned subrequest_in_memory:1;
unsigned waited:1; unsigned waited:1;
@ -509,6 +510,7 @@ struct ngx_http_request_s {
unsigned keepalive:1; unsigned keepalive:1;
unsigned lingering_close:1; unsigned lingering_close:1;
unsigned discard_body:1; unsigned discard_body:1;
unsigned reading_body:1;
unsigned internal:1; unsigned internal:1;
unsigned error_page:1; unsigned error_page:1;
unsigned filter_finalize:1; unsigned filter_finalize:1;

View File

@ -42,12 +42,14 @@ ngx_http_read_client_request_body(ngx_http_request_t *r,
#if (NGX_HTTP_SPDY) #if (NGX_HTTP_SPDY)
if (r->spdy_stream && r == r->main) { if (r->spdy_stream && r == r->main) {
r->request_body_no_buffering = 0;
rc = ngx_http_spdy_read_request_body(r, post_handler); rc = ngx_http_spdy_read_request_body(r, post_handler);
goto done; goto done;
} }
#endif #endif
if (r != r->main || r->request_body || r->discard_body) { if (r != r->main || r->request_body || r->discard_body) {
r->request_body_no_buffering = 0;
post_handler(r); post_handler(r);
return NGX_OK; return NGX_OK;
} }
@ -57,6 +59,10 @@ ngx_http_read_client_request_body(ngx_http_request_t *r,
goto done; goto done;
} }
if (r->request_body_no_buffering) {
r->request_body_in_file_only = 0;
}
rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t)); rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t));
if (rb == NULL) { if (rb == NULL) {
rc = NGX_HTTP_INTERNAL_SERVER_ERROR; rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
@ -79,6 +85,7 @@ ngx_http_read_client_request_body(ngx_http_request_t *r,
r->request_body = rb; r->request_body = rb;
if (r->headers_in.content_length_n < 0 && !r->headers_in.chunked) { if (r->headers_in.content_length_n < 0 && !r->headers_in.chunked) {
r->request_body_no_buffering = 0;
post_handler(r); post_handler(r);
return NGX_OK; return NGX_OK;
} }
@ -171,6 +178,8 @@ ngx_http_read_client_request_body(ngx_http_request_t *r,
} }
} }
r->request_body_no_buffering = 0;
post_handler(r); post_handler(r);
return NGX_OK; return NGX_OK;
@ -214,6 +223,21 @@ ngx_http_read_client_request_body(ngx_http_request_t *r,
done: done:
if (r->request_body_no_buffering
&& (rc == NGX_OK || rc == NGX_AGAIN))
{
if (rc == NGX_OK) {
r->request_body_no_buffering = 0;
} else {
/* rc == NGX_AGAIN */
r->reading_body = 1;
}
r->read_event_handler = ngx_http_block_reading;
post_handler(r);
}
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
r->main->count--; r->main->count--;
} }
@ -222,6 +246,26 @@ done:
} }
ngx_int_t
ngx_http_read_unbuffered_request_body(ngx_http_request_t *r)
{
ngx_int_t rc;
if (r->connection->read->timedout) {
r->connection->timedout = 1;
return NGX_HTTP_REQUEST_TIME_OUT;
}
rc = ngx_http_do_read_client_request_body(r);
if (rc == NGX_OK) {
r->reading_body = 0;
}
return rc;
}
static void static void
ngx_http_read_client_request_body_handler(ngx_http_request_t *r) ngx_http_read_client_request_body_handler(ngx_http_request_t *r)
{ {
@ -264,18 +308,43 @@ ngx_http_do_read_client_request_body(ngx_http_request_t *r)
for ( ;; ) { for ( ;; ) {
if (rb->buf->last == rb->buf->end) { if (rb->buf->last == rb->buf->end) {
/* pass buffer to request body filter chain */ if (rb->buf->pos != rb->buf->last) {
out.buf = rb->buf; /* pass buffer to request body filter chain */
out.next = NULL;
rc = ngx_http_request_body_filter(r, &out); out.buf = rb->buf;
out.next = NULL;
if (rc != NGX_OK) { rc = ngx_http_request_body_filter(r, &out);
return rc;
if (rc != NGX_OK) {
return rc;
}
} else {
/* update chains */
rc = ngx_http_request_body_filter(r, NULL);
if (rc != NGX_OK) {
return rc;
}
} }
if (rb->busy != NULL) { if (rb->busy != NULL) {
if (r->request_body_no_buffering) {
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
return NGX_AGAIN;
}
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
@ -342,6 +411,22 @@ ngx_http_do_read_client_request_body(ngx_http_request_t *r)
} }
if (!c->read->ready) { if (!c->read->ready) {
if (r->request_body_no_buffering
&& rb->buf->pos != rb->buf->last)
{
/* pass buffer to request body filter chain */
out.buf = rb->buf;
out.next = NULL;
rc = ngx_http_request_body_filter(r, &out);
if (rc != NGX_OK) {
return rc;
}
}
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
ngx_add_timer(c->read, clcf->client_body_timeout); ngx_add_timer(c->read, clcf->client_body_timeout);
@ -387,9 +472,10 @@ ngx_http_do_read_client_request_body(ngx_http_request_t *r)
} }
} }
r->read_event_handler = ngx_http_block_reading; if (!r->request_body_no_buffering) {
r->read_event_handler = ngx_http_block_reading;
rb->post_handler(r); rb->post_handler(r);
}
return NGX_OK; return NGX_OK;
} }
@ -1085,7 +1171,8 @@ ngx_http_request_body_save_filter(ngx_http_request_t *r, ngx_chain_t *in)
} }
if (rb->rest > 0 if (rb->rest > 0
&& rb->buf && rb->buf->last == rb->buf->end) && rb->buf && rb->buf->last == rb->buf->end
&& !r->request_body_no_buffering)
{ {
if (ngx_http_write_request_body(r) != NGX_OK) { if (ngx_http_write_request_body(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;

View File

@ -36,9 +36,12 @@ static void ngx_http_upstream_connect(ngx_http_request_t *r,
static ngx_int_t ngx_http_upstream_reinit(ngx_http_request_t *r, static ngx_int_t ngx_http_upstream_reinit(ngx_http_request_t *r,
ngx_http_upstream_t *u); ngx_http_upstream_t *u);
static void ngx_http_upstream_send_request(ngx_http_request_t *r, static void ngx_http_upstream_send_request(ngx_http_request_t *r,
ngx_http_upstream_t *u); ngx_http_upstream_t *u, ngx_uint_t do_write);
static ngx_int_t ngx_http_upstream_send_request_body(ngx_http_request_t *r,
ngx_http_upstream_t *u, ngx_uint_t do_write);
static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
ngx_http_upstream_t *u); ngx_http_upstream_t *u);
static void ngx_http_upstream_read_request_handler(ngx_http_request_t *r);
static void ngx_http_upstream_process_header(ngx_http_request_t *r, static void ngx_http_upstream_process_header(ngx_http_request_t *r,
ngx_http_upstream_t *u); ngx_http_upstream_t *u);
static ngx_int_t ngx_http_upstream_test_next(ngx_http_request_t *r, static ngx_int_t ngx_http_upstream_test_next(ngx_http_request_t *r,
@ -568,8 +571,11 @@ ngx_http_upstream_init_request(ngx_http_request_t *r)
u->output.pool = r->pool; u->output.pool = r->pool;
u->output.bufs.num = 1; u->output.bufs.num = 1;
u->output.bufs.size = clcf->client_body_buffer_size; u->output.bufs.size = clcf->client_body_buffer_size;
u->output.output_filter = ngx_chain_writer;
u->output.filter_ctx = &u->writer; if (u->output.output_filter == NULL) {
u->output.output_filter = ngx_chain_writer;
u->output.filter_ctx = &u->writer;
}
u->writer.pool = r->pool; u->writer.pool = r->pool;
@ -1432,7 +1438,7 @@ ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
#endif #endif
ngx_http_upstream_send_request(r, u); ngx_http_upstream_send_request(r, u, 1);
} }
@ -1536,7 +1542,7 @@ ngx_http_upstream_ssl_handshake(ngx_connection_t *c)
c = r->connection; c = r->connection;
ngx_http_upstream_send_request(r, u); ngx_http_upstream_send_request(r, u, 1);
ngx_http_run_posted_requests(c); ngx_http_run_posted_requests(c);
return; return;
@ -1724,7 +1730,8 @@ ngx_http_upstream_reinit(ngx_http_request_t *r, ngx_http_upstream_t *u)
static void static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u) ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u,
ngx_uint_t do_write)
{ {
ngx_int_t rc; ngx_int_t rc;
ngx_connection_t *c; ngx_connection_t *c;
@ -1741,21 +1748,25 @@ ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
c->log->action = "sending request to upstream"; c->log->action = "sending request to upstream";
rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs); rc = ngx_http_upstream_send_request_body(r, u, do_write);
u->request_sent = 1;
if (rc == NGX_ERROR) { if (rc == NGX_ERROR) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return; return;
} }
if (c->write->timer_set) { if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
ngx_del_timer(c->write); ngx_http_upstream_finalize_request(r, u, rc);
return;
} }
if (rc == NGX_AGAIN) { if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->send_timeout); if (!c->write->ready) {
ngx_add_timer(c->write, u->conf->send_timeout);
} else if (c->write->timer_set) {
ngx_del_timer(c->write);
}
if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) { if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, ngx_http_upstream_finalize_request(r, u,
@ -1768,6 +1779,10 @@ ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
/* rc == NGX_OK */ /* rc == NGX_OK */
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) { if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
if (ngx_tcp_push(c->fd) == NGX_ERROR) { if (ngx_tcp_push(c->fd) == NGX_ERROR) {
ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno, ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
@ -1797,6 +1812,123 @@ ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
} }
static ngx_int_t
ngx_http_upstream_send_request_body(ngx_http_request_t *r,
ngx_http_upstream_t *u, ngx_uint_t do_write)
{
int tcp_nodelay;
ngx_int_t rc;
ngx_chain_t *out, *cl, *ln;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream send request body");
if (!r->request_body_no_buffering) {
/* buffered request body */
if (!u->request_sent) {
u->request_sent = 1;
out = u->request_bufs;
} else {
out = NULL;
}
return ngx_output_chain(&u->output, out);
}
if (!u->request_sent) {
u->request_sent = 1;
out = u->request_bufs;
if (r->request_body->bufs) {
for (cl = out; cl->next; cl = out->next) { /* void */ }
cl->next = r->request_body->bufs;
r->request_body->bufs = NULL;
}
c = u->peer.connection;
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");
tcp_nodelay = 1;
if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
(const void *) &tcp_nodelay, sizeof(int)) == -1)
{
ngx_connection_error(c, ngx_socket_errno,
"setsockopt(TCP_NODELAY) failed");
return NGX_ERROR;
}
c->tcp_nodelay = NGX_TCP_NODELAY_SET;
}
r->read_event_handler = ngx_http_upstream_read_request_handler;
} else {
out = NULL;
}
for ( ;; ) {
if (do_write) {
rc = ngx_output_chain(&u->output, out);
if (rc == NGX_ERROR) {
return NGX_ERROR;
}
while (out) {
ln = out;
out = out->next;
ngx_free_chain(r->pool, ln);
}
if (rc == NGX_OK && !r->reading_body) {
break;
}
}
if (r->reading_body) {
/* read client request body */
rc = ngx_http_read_unbuffered_request_body(r);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
out = r->request_body->bufs;
r->request_body->bufs = NULL;
}
/* stop if there is nothing to send */
if (out == NULL) {
rc = NGX_AGAIN;
break;
}
do_write = 1;
}
if (!r->reading_body) {
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
r->read_event_handler =
ngx_http_upstream_rd_check_broken_connection;
}
}
return rc;
}
static void static void
ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
ngx_http_upstream_t *u) ngx_http_upstream_t *u)
@ -1830,7 +1962,29 @@ ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
return; return;
} }
ngx_http_upstream_send_request(r, u); ngx_http_upstream_send_request(r, u, 1);
}
static void
ngx_http_upstream_read_request_handler(ngx_http_request_t *r)
{
ngx_connection_t *c;
ngx_http_upstream_t *u;
c = r->connection;
u = r->upstream;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream read request handler");
if (c->read->timedout) {
c->timedout = 1;
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT);
return;
}
ngx_http_upstream_send_request(r, u, 0);
} }
@ -3626,7 +3780,9 @@ ngx_http_upstream_next(ngx_http_request_t *r, ngx_http_upstream_t *u,
"upstream timed out"); "upstream timed out");
} }
if (u->peer.cached && ft_type == NGX_HTTP_UPSTREAM_FT_ERROR) { if (u->peer.cached && ft_type == NGX_HTTP_UPSTREAM_FT_ERROR
&& (!u->request_sent || !r->request_body_no_buffering))
{
status = 0; status = 0;
/* TODO: inform balancer instead */ /* TODO: inform balancer instead */
@ -3674,6 +3830,7 @@ ngx_http_upstream_next(ngx_http_request_t *r, ngx_http_upstream_t *u,
if (u->peer.tries == 0 if (u->peer.tries == 0
|| !(u->conf->next_upstream & ft_type) || !(u->conf->next_upstream & ft_type)
|| (u->request_sent && r->request_body_no_buffering)
|| (timeout && ngx_current_msec - u->peer.start_time >= timeout)) || (timeout && ngx_current_msec - u->peer.start_time >= timeout))
{ {
#if (NGX_HTTP_CACHE) #if (NGX_HTTP_CACHE)

View File

@ -160,6 +160,7 @@ typedef struct {
ngx_uint_t store_access; ngx_uint_t store_access;
ngx_uint_t next_upstream_tries; ngx_uint_t next_upstream_tries;
ngx_flag_t buffering; ngx_flag_t buffering;
ngx_flag_t request_buffering;
ngx_flag_t pass_request_headers; ngx_flag_t pass_request_headers;
ngx_flag_t pass_request_body; ngx_flag_t pass_request_body;

View File

@ -1081,6 +1081,10 @@ ngx_http_variable_content_length(ngx_http_request_t *r,
v->no_cacheable = 0; v->no_cacheable = 0;
v->not_found = 0; v->not_found = 0;
} else if (r->reading_body) {
v->not_found = 1;
v->no_cacheable = 1;
} else if (r->headers_in.content_length_n >= 0) { } else if (r->headers_in.content_length_n >= 0) {
p = ngx_pnalloc(r->pool, NGX_OFF_T_LEN); p = ngx_pnalloc(r->pool, NGX_OFF_T_LEN);
if (p == NULL) { if (p == NULL) {