mirror of
https://github.com/nginx/nginx.git
synced 2025-06-06 00:42:40 +08:00
Stream: upstream and downstream limit rates.
This commit is contained in:
parent
62959c9133
commit
74942045d5
@ -18,7 +18,9 @@ typedef struct {
|
||||
ngx_msec_t timeout;
|
||||
ngx_msec_t next_upstream_timeout;
|
||||
size_t downstream_buf_size;
|
||||
size_t downstream_limit_rate;
|
||||
size_t upstream_buf_size;
|
||||
size_t upstream_limit_rate;
|
||||
ngx_uint_t next_upstream_tries;
|
||||
ngx_flag_t next_upstream;
|
||||
ngx_flag_t proxy_protocol;
|
||||
@ -132,6 +134,13 @@ static ngx_command_t ngx_stream_proxy_commands[] = {
|
||||
offsetof(ngx_stream_proxy_srv_conf_t, downstream_buf_size),
|
||||
NULL },
|
||||
|
||||
{ ngx_string("proxy_downstream_limit_rate"),
|
||||
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
|
||||
ngx_conf_set_size_slot,
|
||||
NGX_STREAM_SRV_CONF_OFFSET,
|
||||
offsetof(ngx_stream_proxy_srv_conf_t, downstream_limit_rate),
|
||||
NULL },
|
||||
|
||||
{ ngx_string("proxy_upstream_buffer"),
|
||||
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
|
||||
ngx_conf_set_size_slot,
|
||||
@ -139,6 +148,13 @@ static ngx_command_t ngx_stream_proxy_commands[] = {
|
||||
offsetof(ngx_stream_proxy_srv_conf_t, upstream_buf_size),
|
||||
NULL },
|
||||
|
||||
{ ngx_string("proxy_upstream_limit_rate"),
|
||||
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
|
||||
ngx_conf_set_size_slot,
|
||||
NGX_STREAM_SRV_CONF_OFFSET,
|
||||
offsetof(ngx_stream_proxy_srv_conf_t, upstream_limit_rate),
|
||||
NULL },
|
||||
|
||||
{ ngx_string("proxy_next_upstream"),
|
||||
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
|
||||
ngx_conf_set_flag_slot,
|
||||
@ -340,6 +356,7 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s)
|
||||
}
|
||||
|
||||
u->proxy_protocol = pscf->proxy_protocol;
|
||||
u->start_sec = ngx_time();
|
||||
|
||||
p = ngx_pnalloc(c->pool, pscf->downstream_buf_size);
|
||||
if (p == NULL) {
|
||||
@ -831,17 +848,56 @@ ngx_stream_proxy_upstream_handler(ngx_event_t *ev)
|
||||
static void
|
||||
ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream)
|
||||
{
|
||||
ngx_connection_t *c;
|
||||
ngx_stream_session_t *s;
|
||||
ngx_stream_upstream_t *u;
|
||||
ngx_connection_t *c, *pc;
|
||||
ngx_stream_session_t *s;
|
||||
ngx_stream_upstream_t *u;
|
||||
ngx_stream_proxy_srv_conf_t *pscf;
|
||||
|
||||
c = ev->data;
|
||||
s = c->data;
|
||||
u = s->upstream;
|
||||
|
||||
if (ev->timedout) {
|
||||
ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
|
||||
ngx_stream_proxy_finalize(s, NGX_DECLINED);
|
||||
|
||||
if (ev->delayed) {
|
||||
|
||||
ev->timedout = 0;
|
||||
ev->delayed = 0;
|
||||
|
||||
if (!ev->ready) {
|
||||
if (ngx_handle_read_event(ev, 0) != NGX_OK) {
|
||||
ngx_stream_proxy_finalize(s, NGX_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
if (u->upstream_buf.start) {
|
||||
pc = u->peer.connection;
|
||||
|
||||
if (!c->read->delayed && !pc->read->delayed) {
|
||||
pscf = ngx_stream_get_module_srv_conf(s,
|
||||
ngx_stream_proxy_module);
|
||||
ngx_add_timer(c->write, pscf->timeout);
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
} else {
|
||||
ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
|
||||
ngx_stream_proxy_finalize(s, NGX_DECLINED);
|
||||
return;
|
||||
}
|
||||
|
||||
} else if (ev->delayed) {
|
||||
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
|
||||
"stream connection delayed");
|
||||
|
||||
if (ngx_handle_read_event(ev, 0) != NGX_OK) {
|
||||
ngx_stream_proxy_finalize(s, NGX_ERROR);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@ -930,10 +986,12 @@ static ngx_int_t
|
||||
ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
||||
ngx_uint_t do_write)
|
||||
{
|
||||
size_t size;
|
||||
off_t *received, limit;
|
||||
size_t size, limit_rate;
|
||||
ssize_t n;
|
||||
ngx_buf_t *b;
|
||||
ngx_uint_t flags;
|
||||
ngx_msec_t delay;
|
||||
ngx_connection_t *c, *pc, *src, *dst;
|
||||
ngx_log_handler_pt handler;
|
||||
ngx_stream_upstream_t *u;
|
||||
@ -944,15 +1002,21 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
||||
c = s->connection;
|
||||
pc = u->upstream_buf.start ? u->peer.connection : NULL;
|
||||
|
||||
pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
|
||||
|
||||
if (from_upstream) {
|
||||
src = pc;
|
||||
dst = c;
|
||||
b = &u->upstream_buf;
|
||||
limit_rate = pscf->upstream_limit_rate;
|
||||
received = &u->received;
|
||||
|
||||
} else {
|
||||
src = c;
|
||||
dst = pc;
|
||||
b = &u->downstream_buf;
|
||||
limit_rate = pscf->downstream_limit_rate;
|
||||
received = &s->received;
|
||||
}
|
||||
|
||||
for ( ;; ) {
|
||||
@ -983,7 +1047,23 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
||||
|
||||
size = b->end - b->last;
|
||||
|
||||
if (size && src->read->ready) {
|
||||
if (size && src->read->ready && !src->read->delayed) {
|
||||
|
||||
if (limit_rate) {
|
||||
limit = (off_t) limit_rate * (ngx_time() - u->start_sec + 1)
|
||||
- *received;
|
||||
|
||||
if (limit <= 0) {
|
||||
src->read->delayed = 1;
|
||||
delay = (ngx_msec_t) (- limit * 1000 / limit_rate + 1);
|
||||
ngx_add_timer(src->read, delay);
|
||||
break;
|
||||
}
|
||||
|
||||
if (size > (size_t) limit) {
|
||||
size = limit;
|
||||
}
|
||||
}
|
||||
|
||||
n = src->recv(src, b->last, size);
|
||||
|
||||
@ -992,15 +1072,19 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
||||
}
|
||||
|
||||
if (n > 0) {
|
||||
if (from_upstream) {
|
||||
u->received += n;
|
||||
if (limit_rate) {
|
||||
delay = (ngx_msec_t) (n * 1000 / limit_rate);
|
||||
|
||||
} else {
|
||||
s->received += n;
|
||||
if (delay > 0) {
|
||||
src->read->delayed = 1;
|
||||
ngx_add_timer(src->read, delay);
|
||||
}
|
||||
}
|
||||
|
||||
do_write = 1;
|
||||
*received += n;
|
||||
b->last += n;
|
||||
do_write = 1;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -1012,8 +1096,6 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
||||
break;
|
||||
}
|
||||
|
||||
pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
|
||||
|
||||
if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) {
|
||||
handler = c->log->handler;
|
||||
c->log->handler = NULL;
|
||||
@ -1044,7 +1126,12 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_add_timer(c->read, pscf->timeout);
|
||||
if (!c->read->delayed && !pc->read->delayed) {
|
||||
ngx_add_timer(c->write, pscf->timeout);
|
||||
|
||||
} else if (c->write->timer_set) {
|
||||
ngx_del_timer(c->write);
|
||||
}
|
||||
}
|
||||
|
||||
return NGX_OK;
|
||||
@ -1207,7 +1294,9 @@ ngx_stream_proxy_create_srv_conf(ngx_conf_t *cf)
|
||||
conf->timeout = NGX_CONF_UNSET_MSEC;
|
||||
conf->next_upstream_timeout = NGX_CONF_UNSET_MSEC;
|
||||
conf->downstream_buf_size = NGX_CONF_UNSET_SIZE;
|
||||
conf->downstream_limit_rate = NGX_CONF_UNSET_SIZE;
|
||||
conf->upstream_buf_size = NGX_CONF_UNSET_SIZE;
|
||||
conf->upstream_limit_rate = NGX_CONF_UNSET_SIZE;
|
||||
conf->next_upstream_tries = NGX_CONF_UNSET_UINT;
|
||||
conf->next_upstream = NGX_CONF_UNSET;
|
||||
conf->proxy_protocol = NGX_CONF_UNSET;
|
||||
@ -1244,9 +1333,15 @@ ngx_stream_proxy_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
|
||||
ngx_conf_merge_size_value(conf->downstream_buf_size,
|
||||
prev->downstream_buf_size, 16384);
|
||||
|
||||
ngx_conf_merge_size_value(conf->downstream_limit_rate,
|
||||
prev->downstream_limit_rate, 0);
|
||||
|
||||
ngx_conf_merge_size_value(conf->upstream_buf_size,
|
||||
prev->upstream_buf_size, 16384);
|
||||
|
||||
ngx_conf_merge_size_value(conf->upstream_limit_rate,
|
||||
prev->upstream_limit_rate, 0);
|
||||
|
||||
ngx_conf_merge_uint_value(conf->next_upstream_tries,
|
||||
prev->next_upstream_tries, 0);
|
||||
|
||||
|
@ -83,6 +83,7 @@ typedef struct {
|
||||
ngx_buf_t downstream_buf;
|
||||
ngx_buf_t upstream_buf;
|
||||
off_t received;
|
||||
time_t start_sec;
|
||||
#if (NGX_STREAM_SSL)
|
||||
ngx_str_t ssl_name;
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user