From 2fe0a7a0f6feb13cf777c9e5e635f887e4cd040c Mon Sep 17 00:00:00 2001 From: Maxim Dounin Date: Thu, 15 Sep 2011 19:23:20 +0000 Subject: [PATCH] Proxy: basic HTTP/1.1 support (including keepalive). By default we still send requests using HTTP/1.0. This may be changed with new proxy_http_version directive. --- src/http/modules/ngx_http_proxy_module.c | 778 ++++++++++++++++++++++- 1 file changed, 773 insertions(+), 5 deletions(-) diff --git a/src/http/modules/ngx_http_proxy_module.c b/src/http/modules/ngx_http_proxy_module.c index 0dfa16d17..88ee42472 100644 --- a/src/http/modules/ngx_http_proxy_module.c +++ b/src/http/modules/ngx_http_proxy_module.c @@ -71,6 +71,8 @@ typedef struct { ngx_flag_t redirect; + ngx_uint_t http_version; + ngx_uint_t headers_hash_max_size; ngx_uint_t headers_hash_bucket_size; } ngx_http_proxy_loc_conf_t; @@ -80,6 +82,12 @@ typedef struct { ngx_http_status_t status; ngx_http_proxy_vars_t vars; size_t internal_body_length; + + ngx_uint_t state; + off_t size; + off_t length; + + ngx_uint_t head; /* unsigned head:1 */ } ngx_http_proxy_ctx_t; @@ -92,6 +100,15 @@ static ngx_int_t ngx_http_proxy_create_request(ngx_http_request_t *r); static ngx_int_t ngx_http_proxy_reinit_request(ngx_http_request_t *r); static ngx_int_t ngx_http_proxy_process_status_line(ngx_http_request_t *r); static ngx_int_t ngx_http_proxy_process_header(ngx_http_request_t *r); +static ngx_int_t ngx_http_proxy_input_filter_init(void *data); +static ngx_int_t ngx_http_proxy_copy_filter(ngx_event_pipe_t *p, + ngx_buf_t *buf); +static ngx_int_t ngx_http_proxy_chunked_filter(ngx_event_pipe_t *p, + ngx_buf_t *buf); +static ngx_int_t ngx_http_proxy_non_buffered_copy_filter(void *data, + ssize_t bytes); +static ngx_int_t ngx_http_proxy_non_buffered_chunked_filter(void *data, + ssize_t bytes); static void ngx_http_proxy_abort_request(ngx_http_request_t *r); static void ngx_http_proxy_finalize_request(ngx_http_request_t *r, ngx_int_t rc); @@ -157,6 +174,13 @@ static ngx_conf_bitmask_t ngx_http_proxy_next_upstream_masks[] = { }; +static ngx_conf_enum_t ngx_http_proxy_http_version[] = { + { ngx_string("1.0"), NGX_HTTP_VERSION_10 }, + { ngx_string("1.1"), NGX_HTTP_VERSION_11 }, + { ngx_null_string, 0 } +}; + + ngx_module_t ngx_http_proxy_module; @@ -432,6 +456,13 @@ static ngx_command_t ngx_http_proxy_commands[] = { offsetof(ngx_http_proxy_loc_conf_t, upstream.ignore_headers), &ngx_http_upstream_ignore_headers_masks }, + { ngx_string("proxy_http_version"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_enum_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_proxy_loc_conf_t, http_version), + &ngx_http_proxy_http_version }, + #if (NGX_HTTP_SSL) { ngx_string("proxy_ssl_session_reuse"), @@ -479,6 +510,7 @@ ngx_module_t ngx_http_proxy_module = { static char ngx_http_proxy_version[] = " HTTP/1.0" CRLF; +static char ngx_http_proxy_version_11[] = " HTTP/1.1" CRLF; static ngx_keyval_t ngx_http_proxy_headers[] = { @@ -486,6 +518,7 @@ static ngx_keyval_t ngx_http_proxy_headers[] = { { ngx_string("Connection"), ngx_string("close") }, { ngx_string("Keep-Alive"), ngx_string("") }, { ngx_string("Expect"), ngx_string("") }, + { ngx_string("Upgrade"), ngx_string("") }, { ngx_null_string, ngx_null_string } }; @@ -610,7 +643,12 @@ ngx_http_proxy_handler(ngx_http_request_t *r) return NGX_HTTP_INTERNAL_SERVER_ERROR; } - u->pipe->input_filter = ngx_event_pipe_copy_input_filter; + u->pipe->input_filter = ngx_http_proxy_copy_filter; + u->pipe->input_ctx = r; + + u->input_filter_init = ngx_http_proxy_input_filter_init; + u->input_filter = ngx_http_proxy_non_buffered_copy_filter; + u->input_filter_ctx = r; u->accel = 1; @@ -866,14 +904,20 @@ ngx_http_proxy_create_request(ngx_http_request_t *r) method.len++; } + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + + if (method.len == 5 + && ngx_strncasecmp(method.data, (u_char *) "HEAD ", 5) == 0) + { + ctx->head = 1; + } + len = method.len + sizeof(ngx_http_proxy_version) - 1 + sizeof(CRLF) - 1; escape = 0; loc_len = 0; unparsed_uri = 0; - ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); - if (plcf->proxy_lengths) { uri_len = ctx->vars.uri.len; @@ -1009,8 +1053,14 @@ ngx_http_proxy_create_request(ngx_http_request_t *r) u->uri.len = b->last - u->uri.data; - b->last = ngx_cpymem(b->last, ngx_http_proxy_version, - sizeof(ngx_http_proxy_version) - 1); + if (plcf->http_version == NGX_HTTP_VERSION_11) { + b->last = ngx_cpymem(b->last, ngx_http_proxy_version_11, + sizeof(ngx_http_proxy_version_11) - 1); + + } else { + b->last = ngx_cpymem(b->last, ngx_http_proxy_version, + sizeof(ngx_http_proxy_version) - 1); + } ngx_memzero(&e, sizeof(ngx_http_script_engine_t)); @@ -1158,8 +1208,11 @@ ngx_http_proxy_reinit_request(ngx_http_request_t *r) ctx->status.count = 0; ctx->status.start = NULL; ctx->status.end = NULL; + ctx->state = 0; r->upstream->process_header = ngx_http_proxy_process_status_line; + r->upstream->pipe->input_filter = ngx_http_proxy_copy_filter; + r->upstream->input_filter = ngx_http_proxy_non_buffered_copy_filter; r->state = 0; return NGX_OK; @@ -1250,6 +1303,8 @@ ngx_http_proxy_process_header(ngx_http_request_t *r) { ngx_int_t rc; ngx_table_elt_t *h; + ngx_http_upstream_t *u; + ngx_http_proxy_ctx_t *ctx; ngx_http_upstream_header_t *hh; ngx_http_upstream_main_conf_t *umcf; @@ -1345,6 +1400,30 @@ ngx_http_proxy_process_header(ngx_http_request_t *r) h->lowcase_key = (u_char *) "date"; } + /* clear content length if response is chunked */ + + u = r->upstream; + + if (u->headers_in.chunked) { + u->headers_in.content_length_n = -1; + } + + /* + * set u->keepalive if response has no body; this allows to keep + * connections alive in case of r->header_only or X-Accel-Redirect + */ + + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + + if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT + || u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED + || ctx->head + || (!u->headers_in.chunked + && u->headers_in.content_length_n == 0)) + { + u->keepalive = !u->headers_in.connection_close; + } + return NGX_OK; } @@ -1362,6 +1441,690 @@ ngx_http_proxy_process_header(ngx_http_request_t *r) } +static ngx_int_t +ngx_http_proxy_input_filter_init(void *data) +{ + ngx_http_request_t *r = data; + ngx_http_upstream_t *u; + ngx_http_proxy_ctx_t *ctx; + + u = r->upstream; + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + + ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http proxy filter init s:%d h:%d c:%d l:%O", + u->headers_in.status_n, ctx->head, u->headers_in.chunked, + u->headers_in.content_length_n); + + /* as per RFC2616, 4.4 Message Length */ + + if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT + || u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED + || ctx->head) + { + /* 1xx, 204, and 304 and replies to HEAD requests */ + /* no 1xx since we don't send Expect and Upgrade */ + + u->pipe->length = 0; + u->length = 0; + u->keepalive = !u->headers_in.connection_close; + + } else if (u->headers_in.chunked) { + /* chunked */ + + u->pipe->input_filter = ngx_http_proxy_chunked_filter; + u->pipe->length = 3; /* "0" LF LF */ + + u->input_filter = ngx_http_proxy_non_buffered_chunked_filter; + u->length = -1; + + } else if (u->headers_in.content_length_n == 0) { + /* empty body: special case as filter won't be called */ + + u->pipe->length = 0; + u->length = 0; + u->keepalive = !u->headers_in.connection_close; + + } else { + /* content length or connection close */ + + u->pipe->length = u->headers_in.content_length_n; + u->length = u->headers_in.content_length_n; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_proxy_copy_filter(ngx_event_pipe_t *p, ngx_buf_t *buf) +{ + ngx_buf_t *b; + ngx_chain_t *cl; + ngx_http_request_t *r; + + if (buf->pos == buf->last) { + return NGX_OK; + } + + if (p->free) { + cl = p->free; + b = cl->buf; + p->free = cl->next; + ngx_free_chain(p->pool, cl); + + } else { + b = ngx_alloc_buf(p->pool); + if (b == NULL) { + return NGX_ERROR; + } + } + + ngx_memcpy(b, buf, sizeof(ngx_buf_t)); + b->shadow = buf; + b->tag = p->tag; + b->last_shadow = 1; + b->recycled = 1; + buf->shadow = b; + + cl = ngx_alloc_chain_link(p->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = b; + cl->next = NULL; + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num); + + if (p->in) { + *p->last_in = cl; + } else { + p->in = cl; + } + p->last_in = &cl->next; + + if (p->length == -1) { + return NGX_OK; + } + + p->length -= b->last - b->pos; + + if (p->length == 0) { + r = p->input_ctx; + p->upstream_done = 1; + r->upstream->keepalive = !r->upstream->headers_in.connection_close; + + } else if (p->length < 0) { + r = p->input_ctx; + p->upstream_done = 1; + + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "upstream sent too many data"); + } + + return NGX_OK; +} + + +static ngx_inline ngx_int_t +ngx_http_proxy_parse_chunked(ngx_http_request_t *r, ngx_buf_t *buf) +{ + u_char *pos, ch, c; + ngx_int_t rc; + ngx_http_proxy_ctx_t *ctx; + enum { + sw_chunk_start = 0, + sw_chunk_size, + sw_chunk_extension, + sw_chunk_extension_almost_done, + sw_chunk_data, + sw_after_data, + sw_after_data_almost_done, + sw_last_chunk_extension, + sw_last_chunk_extension_almost_done, + sw_trailer, + sw_trailer_almost_done, + sw_trailer_header, + sw_trailer_header_almost_done + } state; + + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + state = ctx->state; + + if (state == sw_chunk_data && ctx->size == 0) { + state = sw_after_data; + } + + rc = NGX_AGAIN; + + for (pos = buf->pos; pos < buf->last; pos++) { + + ch = *pos; + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http proxy chunked byte: %02Xd s:%d", ch, state); + + switch (state) { + + case sw_chunk_start: + if (ch >= '0' && ch <= '9') { + state = sw_chunk_size; + ctx->size = ch - '0'; + break; + } + + c = (u_char) (ch | 0x20); + + if (c >= 'a' && c <= 'f') { + state = sw_chunk_size; + ctx->size = c - 'a' + 10; + break; + } + + goto invalid; + + case sw_chunk_size: + if (ch >= '0' && ch <= '9') { + ctx->size = ctx->size * 16 + (ch - '0'); + break; + } + + c = (u_char) (ch | 0x20); + + if (c >= 'a' && c <= 'f') { + ctx->size = ctx->size * 16 + (c - 'a' + 10); + break; + } + + if (ctx->size == 0) { + + switch (ch) { + case CR: + state = sw_last_chunk_extension_almost_done; + break; + case LF: + state = sw_trailer; + break; + case ';': + state = sw_last_chunk_extension; + break; + default: + goto invalid; + } + + break; + } + + switch (ch) { + case CR: + state = sw_chunk_extension_almost_done; + break; + case LF: + state = sw_chunk_data; + break; + case ';': + state = sw_chunk_extension; + break; + default: + goto invalid; + } + + break; + + case sw_chunk_extension: + switch (ch) { + case CR: + state = sw_chunk_extension_almost_done; + break; + case LF: + state = sw_chunk_data; + } + break; + + case sw_chunk_extension_almost_done: + if (ch == LF) { + state = sw_chunk_data; + break; + } + goto invalid; + + case sw_chunk_data: + rc = NGX_OK; + goto data; + + case sw_after_data: + switch (ch) { + case CR: + state = sw_after_data_almost_done; + break; + case LF: + state = sw_chunk_start; + } + break; + + case sw_after_data_almost_done: + if (ch == LF) { + state = sw_chunk_start; + break; + } + goto invalid; + + case sw_last_chunk_extension: + switch (ch) { + case CR: + state = sw_last_chunk_extension_almost_done; + break; + case LF: + state = sw_trailer; + } + break; + + case sw_last_chunk_extension_almost_done: + if (ch == LF) { + state = sw_trailer; + break; + } + goto invalid; + + case sw_trailer: + switch (ch) { + case CR: + state = sw_trailer_almost_done; + break; + case LF: + goto done; + default: + state = sw_trailer_header; + } + break; + + case sw_trailer_almost_done: + if (ch == LF) { + goto done; + } + goto invalid; + + case sw_trailer_header: + switch (ch) { + case CR: + state = sw_trailer_header_almost_done; + break; + case LF: + state = sw_trailer; + } + break; + + case sw_trailer_header_almost_done: + if (ch == LF) { + state = sw_trailer; + break; + } + goto invalid; + + } + } + +data: + + ctx->state = state; + buf->pos = pos; + + switch (state) { + + case sw_chunk_start: + ctx->length = 3 /* "0" LF LF */; + break; + case sw_chunk_size: + ctx->length = 2 /* LF LF */ + + (ctx->size ? ctx->size + 4 /* LF "0" LF LF */ : 0); + break; + case sw_chunk_extension: + case sw_chunk_extension_almost_done: + ctx->length = 1 /* LF */ + ctx->size + 4 /* LF "0" LF LF */; + break; + case sw_chunk_data: + ctx->length = ctx->size + 4 /* LF "0" LF LF */; + break; + case sw_after_data: + case sw_after_data_almost_done: + ctx->length = 4 /* LF "0" LF LF */; + break; + case sw_last_chunk_extension: + case sw_last_chunk_extension_almost_done: + ctx->length = 2 /* LF LF */; + break; + case sw_trailer: + case sw_trailer_almost_done: + ctx->length = 1 /* LF */; + break; + case sw_trailer_header: + case sw_trailer_header_almost_done: + ctx->length = 2 /* LF LF */; + break; + + } + + return rc; + +done: + + return NGX_DONE; + +invalid: + + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, + "upstream sent invalid chunked response"); + + return NGX_ERROR; +} + + +static ngx_int_t +ngx_http_proxy_chunked_filter(ngx_event_pipe_t *p, ngx_buf_t *buf) +{ + ngx_int_t rc; + ngx_buf_t *b, **prev; + ngx_chain_t *cl; + ngx_http_request_t *r; + ngx_http_proxy_ctx_t *ctx; + + if (buf->pos == buf->last) { + return NGX_OK; + } + + r = p->input_ctx; + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + + b = NULL; + prev = &buf->shadow; + + for ( ;; ) { + + rc = ngx_http_proxy_parse_chunked(r, buf); + + if (rc == NGX_OK) { + + /* a chunk has been parsed successfully */ + + if (p->free) { + cl = p->free; + b = cl->buf; + p->free = cl->next; + ngx_free_chain(p->pool, cl); + + } else { + b = ngx_alloc_buf(p->pool); + if (b == NULL) { + return NGX_ERROR; + } + } + + ngx_memzero(b, sizeof(ngx_buf_t)); + + b->pos = buf->pos; + b->start = buf->start; + b->end = buf->end; + b->tag = p->tag; + b->temporary = 1; + b->recycled = 1; + + *prev = b; + prev = &b->shadow; + + cl = ngx_alloc_chain_link(p->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = b; + cl->next = NULL; + + if (p->in) { + *p->last_in = cl; + } else { + p->in = cl; + } + p->last_in = &cl->next; + + /* STUB */ b->num = buf->num; + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0, + "input buf #%d %p", b->num, b->pos); + + if (buf->last - buf->pos >= ctx->size) { + + buf->pos += ctx->size; + b->last = buf->pos; + ctx->size = 0; + + continue; + } + + ctx->size -= buf->last - buf->pos; + buf->pos = buf->last; + b->last = buf->last; + + continue; + } + + if (rc == NGX_DONE) { + + /* a whole response has been parsed successfully */ + + p->upstream_done = 1; + r->upstream->keepalive = !r->upstream->headers_in.connection_close; + + break; + } + + if (rc == NGX_AGAIN) { + + /* set p->length, minimal amount of data we want to see */ + + p->length = ctx->length; + + break; + } + + /* invalid response */ + + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, + "upstream sent invalid chunked response"); + + return NGX_ERROR; + } + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http proxy chunked state %d, length %d", + ctx->state, p->length); + + if (b) { + b->shadow = buf; + b->last_shadow = 1; + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0, + "input buf %p %z", b->pos, b->last - b->pos); + + return NGX_OK; + } + + /* there is no data record in the buf, add it to free chain */ + + if (ngx_event_pipe_add_free_buf(p, buf) != NGX_OK) { + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_proxy_non_buffered_copy_filter(void *data, ssize_t bytes) +{ + ngx_http_request_t *r = data; + + ngx_buf_t *b; + ngx_chain_t *cl, **ll; + ngx_http_upstream_t *u; + + u = r->upstream; + + for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) { + ll = &cl->next; + } + + cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs); + if (cl == NULL) { + return NGX_ERROR; + } + + *ll = cl; + + cl->buf->flush = 1; + cl->buf->memory = 1; + + b = &u->buffer; + + cl->buf->pos = b->last; + b->last += bytes; + cl->buf->last = b->last; + cl->buf->tag = u->output.tag; + + if (u->length == -1) { + return NGX_OK; + } + + u->length -= bytes; + + if (u->length == 0) { + u->keepalive = !u->headers_in.connection_close; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_proxy_non_buffered_chunked_filter(void *data, ssize_t bytes) +{ + ngx_http_request_t *r = data; + + ngx_int_t rc; + ngx_buf_t *b, *buf; + ngx_chain_t *cl, **ll; + ngx_http_upstream_t *u; + ngx_http_proxy_ctx_t *ctx; + + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + u = r->upstream; + buf = &u->buffer; + + buf->pos = buf->last; + buf->last += bytes; + + for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) { + ll = &cl->next; + } + + for ( ;; ) { + + rc = ngx_http_proxy_parse_chunked(r, buf); + + if (rc == NGX_OK) { + + /* a chunk has been parsed successfully */ + + cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs); + if (cl == NULL) { + return NGX_ERROR; + } + + *ll = cl; + ll = &cl->next; + + b = cl->buf; + + b->flush = 1; + b->memory = 1; + + b->pos = buf->pos; + b->tag = u->output.tag; + + if (buf->last - buf->pos >= ctx->size) { + buf->pos += ctx->size; + b->last = buf->pos; + ctx->size = 0; + + } else { + ctx->size -= buf->last - buf->pos; + buf->pos = buf->last; + b->last = buf->last; + } + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http proxy out buf %p %z", + b->pos, b->last - b->pos); + + continue; + } + + if (rc == NGX_DONE) { + + /* a whole response has been parsed successfully */ + + u->keepalive = !u->headers_in.connection_close; + u->length = 0; + + break; + } + + if (rc == NGX_AGAIN) { + break; + } + + /* invalid response */ + + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, + "upstream sent invalid chunked response"); + + return NGX_ERROR; + } + + /* provide continuous buffer for subrequests in memory */ + + if (r->subrequest_in_memory) { + + cl = u->out_bufs; + + if (cl) { + buf->pos = cl->buf->pos; + } + + buf->last = buf->pos; + + for (cl = u->out_bufs; cl; cl = cl->next) { + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http proxy in memory %p-%p %uz", + cl->buf->pos, cl->buf->last, ngx_buf_size(cl->buf)); + + if (buf->last == cl->buf->pos) { + buf->last = cl->buf->last; + continue; + } + + buf->last = ngx_movemem(buf->last, cl->buf->pos, + cl->buf->last - cl->buf->pos); + + cl->buf->pos = buf->last - (cl->buf->last - cl->buf->pos); + cl->buf->last = buf->last; + } + } + + return NGX_OK; +} + + static void ngx_http_proxy_abort_request(ngx_http_request_t *r) { @@ -1710,6 +2473,8 @@ ngx_http_proxy_create_loc_conf(ngx_conf_t *cf) conf->redirect = NGX_CONF_UNSET; conf->upstream.change_buffering = 1; + conf->http_version = NGX_CONF_UNSET_UINT; + conf->headers_hash_max_size = NGX_CONF_UNSET_UINT; conf->headers_hash_bucket_size = NGX_CONF_UNSET_UINT; @@ -2013,6 +2778,9 @@ ngx_http_proxy_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) } #endif + ngx_conf_merge_uint_value(conf->http_version, prev->http_version, + NGX_HTTP_VERSION_10); + ngx_conf_merge_uint_value(conf->headers_hash_max_size, prev->headers_hash_max_size, 512);