From 41a451e286cb6de9e0a0ad97f91a1dcac17ef68f Mon Sep 17 00:00:00 2001 From: Vladimir Homutov Date: Mon, 12 Nov 2018 16:29:30 +0300 Subject: [PATCH] Stream: proxy_requests directive. The directive allows to drop binding between a client and existing UDP stream session after receiving a specified number of packets. First packet from the same client address and port will start a new session. Old session continues to exist and will terminate at moment defined by configuration: either after receiving the expected number of responses, or after timeout, as specified by the "proxy_responses" and/or "proxy_timeout" directives. By default, proxy_requests is zero (disabled). --- src/event/ngx_event.h | 1 + src/event/ngx_event_udp.c | 15 +++++++++++++-- src/stream/ngx_stream_proxy_module.c | 27 +++++++++++++++++++++++++-- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h index c3979fb46..bb77c4ae6 100644 --- a/src/event/ngx_event.h +++ b/src/event/ngx_event.h @@ -509,6 +509,7 @@ void ngx_event_recvmsg(ngx_event_t *ev); void ngx_udp_rbtree_insert_value(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel); #endif +void ngx_delete_udp_connection(void *data); ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle); ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle); u_char *ngx_accept_log_error(ngx_log_t *log, u_char *buf, size_t len); diff --git a/src/event/ngx_event_udp.c b/src/event/ngx_event_udp.c index 53b090475..65eb22fd2 100644 --- a/src/event/ngx_event_udp.c +++ b/src/event/ngx_event_udp.c @@ -23,7 +23,6 @@ static void ngx_close_accepted_udp_connection(ngx_connection_t *c); static ssize_t ngx_udp_shared_recv(ngx_connection_t *c, u_char *buf, size_t size); static ngx_int_t ngx_insert_udp_connection(ngx_connection_t *c); -static void ngx_delete_udp_connection(void *data); static ngx_connection_t *ngx_lookup_udp_connection(ngx_listening_t *ls, struct sockaddr *sockaddr, socklen_t socklen, struct sockaddr *local_sockaddr, socklen_t local_socklen); @@ -558,11 +557,15 @@ ngx_insert_udp_connection(ngx_connection_t *c) } -static void +void ngx_delete_udp_connection(void *data) { ngx_connection_t *c = data; + if (c->udp == NULL) { + return; + } + ngx_rbtree_delete(&c->listening->rbtree, &c->udp->node); c->udp = NULL; @@ -643,4 +646,12 @@ ngx_lookup_udp_connection(ngx_listening_t *ls, struct sockaddr *sockaddr, return NULL; } +#else + +void +ngx_delete_udp_connection(void *data) +{ + return; +} + #endif diff --git a/src/stream/ngx_stream_proxy_module.c b/src/stream/ngx_stream_proxy_module.c index 56829058a..1fcf21e8f 100644 --- a/src/stream/ngx_stream_proxy_module.c +++ b/src/stream/ngx_stream_proxy_module.c @@ -26,6 +26,7 @@ typedef struct { size_t buffer_size; size_t upload_rate; size_t download_rate; + ngx_uint_t requests; ngx_uint_t responses; ngx_uint_t next_upstream_tries; ngx_flag_t next_upstream; @@ -195,6 +196,13 @@ static ngx_command_t ngx_stream_proxy_commands[] = { offsetof(ngx_stream_proxy_srv_conf_t, download_rate), NULL }, + { ngx_string("proxy_requests"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, requests), + NULL }, + { ngx_string("proxy_responses"), NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, ngx_conf_set_num_slot, @@ -1341,11 +1349,14 @@ ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream) } else { if (s->connection->type == SOCK_DGRAM) { - if (pscf->responses == NGX_MAX_INT32_VALUE) { + + if (pscf->responses == NGX_MAX_INT32_VALUE + || (u->responses >= pscf->responses * u->requests)) + { /* * successfully terminate timed out UDP session - * with unspecified number of responses + * if expected number of responses was received */ handler = c->log->handler; @@ -1692,6 +1703,14 @@ ngx_stream_proxy_test_finalize(ngx_stream_session_t *s, if (c->type == SOCK_DGRAM) { + if (pscf->requests && u->requests < pscf->requests) { + return NGX_DECLINED; + } + + if (pscf->requests) { + ngx_delete_udp_connection(c); + } + if (pscf->responses == NGX_MAX_INT32_VALUE || u->responses < pscf->responses * u->requests) { @@ -1943,6 +1962,7 @@ ngx_stream_proxy_create_srv_conf(ngx_conf_t *cf) conf->buffer_size = NGX_CONF_UNSET_SIZE; conf->upload_rate = NGX_CONF_UNSET_SIZE; conf->download_rate = NGX_CONF_UNSET_SIZE; + conf->requests = NGX_CONF_UNSET_UINT; conf->responses = NGX_CONF_UNSET_UINT; conf->next_upstream_tries = NGX_CONF_UNSET_UINT; conf->next_upstream = NGX_CONF_UNSET; @@ -1987,6 +2007,9 @@ ngx_stream_proxy_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_size_value(conf->download_rate, prev->download_rate, 0); + ngx_conf_merge_uint_value(conf->requests, + prev->requests, 0); + ngx_conf_merge_uint_value(conf->responses, prev->responses, NGX_MAX_INT32_VALUE);