fix r2378, run posted requests after upstream event handling

This commit is contained in:
Igor Sysoev 2008-12-08 18:28:06 +00:00
parent 17b5953e71
commit 3e15a9712a

View File

@ -32,12 +32,16 @@ static void ngx_http_upstream_send_response(ngx_http_request_t *r,
ngx_http_upstream_t *u); ngx_http_upstream_t *u);
static void static void
ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r); ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r);
static void ngx_http_upstream_process_non_buffered_body(ngx_event_t *ev); static void ngx_http_upstream_process_non_buffered_upstream(ngx_event_t *ev);
static void
ngx_http_upstream_process_non_buffered_request(ngx_http_request_t *r,
ngx_uint_t do_write);
static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data); static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data);
static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data, static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data,
ssize_t bytes); ssize_t bytes);
static void ngx_http_upstream_process_downstream(ngx_http_request_t *r); static void ngx_http_upstream_process_downstream(ngx_http_request_t *r);
static void ngx_http_upstream_process_body(ngx_event_t *ev); static void ngx_http_upstream_process_upstream(ngx_event_t *rev);
static void ngx_http_upstream_process_request(ngx_http_request_t *r);
static void ngx_http_upstream_store(ngx_http_request_t *r, static void ngx_http_upstream_store(ngx_http_request_t *r,
ngx_http_upstream_t *u); ngx_http_upstream_t *u);
static void ngx_http_upstream_dummy_handler(ngx_event_t *wev); static void ngx_http_upstream_dummy_handler(ngx_event_t *wev);
@ -1651,7 +1655,7 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
u->input_filter_ctx = r; u->input_filter_ctx = r;
} }
u->read_event_handler = ngx_http_upstream_process_non_buffered_body; u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
r->write_event_handler = r->write_event_handler =
ngx_http_upstream_process_non_buffered_downstream; ngx_http_upstream_process_non_buffered_downstream;
@ -1689,7 +1693,7 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
return; return;
} }
ngx_http_upstream_process_non_buffered_body(c->write); ngx_http_upstream_process_non_buffered_downstream(r);
} else { } else {
u->buffer.pos = u->buffer.start; u->buffer.pos = u->buffer.start;
@ -1701,7 +1705,7 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
} }
if (u->peer.connection->read->ready) { if (u->peer.connection->read->ready) {
ngx_http_upstream_process_non_buffered_body( ngx_http_upstream_process_non_buffered_upstream(
u->peer.connection->read); u->peer.connection->read);
} }
} }
@ -1832,69 +1836,85 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
p->send_timeout = clcf->send_timeout; p->send_timeout = clcf->send_timeout;
p->send_lowat = clcf->send_lowat; p->send_lowat = clcf->send_lowat;
u->read_event_handler = ngx_http_upstream_process_body; u->read_event_handler = ngx_http_upstream_process_upstream;
r->write_event_handler = ngx_http_upstream_process_downstream; r->write_event_handler = ngx_http_upstream_process_downstream;
ngx_http_upstream_process_body(u->peer.connection->read); ngx_http_upstream_process_upstream(u->peer.connection->read);
} }
static void static void
ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r) ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r)
{ {
ngx_http_upstream_process_non_buffered_body(r->connection->write); ngx_event_t *wev;
ngx_connection_t *c;
ngx_http_upstream_t *u;
c = r->connection;
u = r->upstream;
wev = c->write;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process non buffered downstream");
c->log->action = "sending to client";
if (wev->timedout) {
c->timedout = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out");
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
ngx_http_upstream_process_non_buffered_request(r, 1);
} }
static void static void
ngx_http_upstream_process_non_buffered_body(ngx_event_t *ev) ngx_http_upstream_process_non_buffered_upstream(ngx_event_t *rev)
{
ngx_connection_t *c;
ngx_http_request_t *r;
ngx_http_upstream_t *u;
c = rev->data;
r = c->data;
u = r->upstream;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process non buffered upstream");
c->log->action = "reading upstream";
if (rev->timedout) {
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
ngx_http_upstream_process_non_buffered_request(r, 0);
}
static void
ngx_http_upstream_process_non_buffered_request(ngx_http_request_t *r,
ngx_uint_t do_write)
{ {
size_t size; size_t size;
ssize_t n; ssize_t n;
ngx_buf_t *b; ngx_buf_t *b;
ngx_int_t rc; ngx_int_t rc;
ngx_uint_t do_write; ngx_connection_t *downstream, *upstream;
ngx_connection_t *c, *downstream, *upstream;
ngx_http_request_t *r;
ngx_http_upstream_t *u; ngx_http_upstream_t *u;
ngx_http_core_loc_conf_t *clcf; ngx_http_core_loc_conf_t *clcf;
c = ev->data;
r = c->data;
u = r->upstream; u = r->upstream;
if (ev->write) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process non buffered downstream");
c->log->action = "sending to client";
} else {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process non buffered upstream");
c->log->action = "reading upstream";
}
if (ev->timedout) {
if (ev->write) {
c->timedout = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out");
} else {
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
}
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
downstream = r->connection; downstream = r->connection;
upstream = u->peer.connection; upstream = u->peer.connection;
b = &u->buffer; b = &u->buffer;
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); do_write = do_write || u->length == 0;
do_write = ev->write || u->length == 0;
for ( ;; ) { for ( ;; ) {
@ -1960,6 +1980,8 @@ ngx_http_upstream_process_non_buffered_body(ngx_event_t *ev)
break; break;
} }
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
if (downstream->data == r) { if (downstream->data == r) {
if (ngx_handle_write_event(downstream->write, clcf->send_lowat) if (ngx_handle_write_event(downstream->write, clcf->send_lowat)
!= NGX_OK) != NGX_OK)
@ -2042,58 +2064,41 @@ ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
static void static void
ngx_http_upstream_process_downstream(ngx_http_request_t *r) ngx_http_upstream_process_downstream(ngx_http_request_t *r)
{ {
ngx_http_upstream_process_body(r->connection->write); ngx_event_t *wev;
} ngx_connection_t *c;
static void
ngx_http_upstream_process_body(ngx_event_t *ev)
{
ngx_temp_file_t *tf;
ngx_event_pipe_t *p; ngx_event_pipe_t *p;
ngx_connection_t *c, *downstream;
ngx_http_request_t *r;
ngx_http_upstream_t *u; ngx_http_upstream_t *u;
c = ev->data; c = r->connection;
r = c->data;
u = r->upstream; u = r->upstream;
downstream = r->connection; p = u->pipe;
wev = c->write;
if (ev->write) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process downstream"); "http upstream process downstream");
c->log->action = "sending to client"; c->log->action = "sending to client";
} else { if (wev->timedout) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process upstream");
c->log->action = "reading upstream";
}
p = u->pipe; if (wev->delayed) {
if (ev->timedout) { wev->timedout = 0;
if (ev->write) { wev->delayed = 0;
if (ev->delayed) {
ev->timedout = 0; if (!wev->ready) {
ev->delayed = 0; ngx_add_timer(wev, p->send_timeout);
if (!ev->ready) { if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
ngx_add_timer(ev, p->send_timeout);
if (ngx_handle_write_event(ev, p->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, 0); ngx_http_upstream_finalize_request(r, u, 0);
return;
} }
return; return;
} }
if (ngx_event_pipe(p, ev->write) == NGX_ABORT) { if (ngx_event_pipe(p, wev->write) == NGX_ABORT) {
if (downstream->destroyed) { if (c->destroyed) {
return; return;
} }
@ -2108,26 +2113,22 @@ ngx_http_upstream_process_body(ngx_event_t *ev)
} }
} else { } else {
p->upstream_error = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
}
} else { if (wev->delayed) {
if (ev->write && ev->delayed) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http downstream delayed"); "http downstream delayed");
if (ngx_handle_write_event(ev, p->send_lowat) != NGX_OK) { if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, 0); ngx_http_upstream_finalize_request(r, u, 0);
return;
} }
return; return;
} }
if (ngx_event_pipe(p, ev->write) == NGX_ABORT) { if (ngx_event_pipe(p, 1) == NGX_ABORT) {
if (downstream->destroyed) { if (c->destroyed) {
return; return;
} }
@ -2136,6 +2137,60 @@ ngx_http_upstream_process_body(ngx_event_t *ev)
} }
} }
ngx_http_upstream_process_request(r);
}
static void
ngx_http_upstream_process_upstream(ngx_event_t *rev)
{
ngx_connection_t *c;
ngx_event_pipe_t *p;
ngx_http_request_t *r;
ngx_http_upstream_t *u;
c = rev->data;
r = c->data;
u = r->upstream;
p = u->pipe;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process upstream");
c->log->action = "reading upstream";
if (rev->timedout) {
p->upstream_error = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
} else {
c = r->connection;
if (ngx_event_pipe(p, 0) == NGX_ABORT) {
if (c->destroyed) {
return;
}
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
}
ngx_http_upstream_process_request(r);
}
static void
ngx_http_upstream_process_request(ngx_http_request_t *r)
{
ngx_temp_file_t *tf;
ngx_event_pipe_t *p;
ngx_http_upstream_t *u;
u = r->upstream;
p = u->pipe;
if (u->peer.connection) { if (u->peer.connection) {
if (u->store) { if (u->store) {
@ -2186,7 +2241,7 @@ ngx_http_upstream_process_body(ngx_event_t *ev)
#endif #endif
if (p->upstream_done || p->upstream_eof || p->upstream_error) { if (p->upstream_done || p->upstream_eof || p->upstream_error) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream exit: %p", p->out); "http upstream exit: %p", p->out);
#if 0 #if 0
ngx_http_busy_unlock(u->conf->busy_lock, &u->busy_lock); ngx_http_busy_unlock(u->conf->busy_lock, &u->busy_lock);
@ -2197,7 +2252,7 @@ ngx_http_upstream_process_body(ngx_event_t *ev)
} }
if (p->downstream_error) { if (p->downstream_error) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream downstream error"); "http upstream downstream error");
if (!u->cacheable && u->peer.connection) { if (!u->cacheable && u->peer.connection) {