Stream: port from NGINX+.

This commit is contained in:
Ruslan Ermilov 2015-04-20 13:05:11 +03:00
parent a2dac51398
commit c799c82faa
20 changed files with 6079 additions and 2 deletions

View File

@ -10,6 +10,7 @@ mkdir -p $NGX_OBJS/src/core $NGX_OBJS/src/event $NGX_OBJS/src/event/modules \
$NGX_OBJS/src/http $NGX_OBJS/src/http/modules \
$NGX_OBJS/src/http/modules/perl \
$NGX_OBJS/src/mail \
$NGX_OBJS/src/stream \
$NGX_OBJS/src/misc
@ -121,6 +122,32 @@ END
fi
# the stream dependences and include paths
if [ $STREAM = YES ]; then
ngx_all_srcs="$ngx_all_srcs $STREAM_SRCS"
ngx_deps=`echo $STREAM_DEPS \
| sed -e "s/ *\([^ ][^ ]*\)/$ngx_regex_cont\1/g" \
-e "s/\//$ngx_regex_dirsep/g"`
ngx_incs=`echo $STREAM_INCS \
| sed -e "s/ *\([^ ][^ ]*\)/$ngx_regex_cont$ngx_include_opt\1/g" \
-e "s/\//$ngx_regex_dirsep/g"`
cat << END >> $NGX_MAKEFILE
STREAM_DEPS = $ngx_deps
STREAM_INCS = $ngx_include_opt$ngx_incs
END
fi
ngx_all_srcs="$ngx_all_srcs $NGX_MISC_SRCS"
@ -306,6 +333,36 @@ END
fi
# the stream sources
if [ $STREAM = YES ]; then
if test -n "$NGX_PCH"; then
ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS)"
else
ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) \$(CORE_INCS) \$(STREAM_INCS)"
fi
for ngx_src in $STREAM_SRCS
do
ngx_src=`echo $ngx_src | sed -e "s/\//$ngx_regex_dirsep/g"`
ngx_obj=`echo $ngx_src \
| sed -e "s#^\(.*\.\)cpp\\$#$ngx_objs_dir\1$ngx_objext#g" \
-e "s#^\(.*\.\)cc\\$#$ngx_objs_dir\1$ngx_objext#g" \
-e "s#^\(.*\.\)c\\$#$ngx_objs_dir\1$ngx_objext#g" \
-e "s#^\(.*\.\)S\\$#$ngx_objs_dir\1$ngx_objext#g"`
cat << END >> $NGX_MAKEFILE
$ngx_obj: \$(CORE_DEPS) \$(STREAM_DEPS)$ngx_cont$ngx_src
$ngx_cc$ngx_tab$ngx_objout$ngx_obj$ngx_tab$ngx_src$NGX_AUX
END
done
fi
# the misc sources
if test -n "$NGX_MISC_SRCS"; then

View File

@ -435,6 +435,12 @@ if [ $MAIL_SSL = YES ]; then
fi
if [ $STREAM_SSL = YES ]; then
have=NGX_STREAM_SSL . auto/have
USE_OPENSSL=YES
fi
modules="$CORE_MODULES $EVENT_MODULES"
@ -505,6 +511,36 @@ if [ $MAIL = YES ]; then
fi
if [ $STREAM = YES ]; then
have=NGX_STREAM . auto/have
modules="$modules $STREAM_MODULES"
if [ $STREAM_SSL = YES ]; then
modules="$modules $STREAM_SSL_MODULE"
STREAM_DEPS="$STREAM_DEPS $STREAM_SSL_DEPS"
STREAM_SRCS="$STREAM_SRCS $STREAM_SSL_SRCS"
fi
if [ $STREAM_UPSTREAM_HASH = YES ]; then
modules="$modules $STREAM_UPSTREAM_HASH_MODULE"
STREAM_SRCS="$STREAM_SRCS $STREAM_UPSTREAM_HASH_SRCS"
fi
if [ $STREAM_UPSTREAM_LEAST_CONN = YES ]; then
modules="$modules $STREAM_UPSTREAM_LEAST_CONN_MODULE"
STREAM_SRCS="$STREAM_SRCS $STREAM_UPSTREAM_LEAST_CONN_SRCS"
fi
if [ $STREAM_UPSTREAM_ZONE = YES ]; then
have=NGX_STREAM_UPSTREAM_ZONE . auto/have
modules="$modules $STREAM_UPSTREAM_ZONE_MODULE"
STREAM_SRCS="$STREAM_SRCS $STREAM_UPSTREAM_ZONE_SRCS"
fi
NGX_ADDON_DEPS="$NGX_ADDON_DEPS \$(STREAM_DEPS)"
fi
if [ $NGX_GOOGLE_PERFTOOLS = YES ]; then
modules="$modules $NGX_GOOGLE_PERFTOOLS_MODULE"
NGX_MISC_SRCS="$NGX_MISC_SRCS $NGX_GOOGLE_PERFTOOLS_SRCS"

View File

@ -114,6 +114,12 @@ MAIL_POP3=YES
MAIL_IMAP=YES
MAIL_SMTP=YES
STREAM=NO
STREAM_SSL=NO
STREAM_UPSTREAM_HASH=YES
STREAM_UPSTREAM_LEAST_CONN=YES
STREAM_UPSTREAM_ZONE=YES
NGX_ADDONS=
USE_PCRE=NO
@ -275,6 +281,15 @@ use the \"--without-http_limit_conn_module\" option instead"
--without-mail_imap_module) MAIL_IMAP=NO ;;
--without-mail_smtp_module) MAIL_SMTP=NO ;;
--with-stream) STREAM=YES ;;
--with-stream_ssl_module) STREAM_SSL=YES ;;
--without-stream_upstream_hash_module)
STREAM_UPSTREAM_HASH=NO ;;
--without-stream_upstream_least_conn_module)
STREAM_UPSTREAM_LEAST_CONN=NO ;;
--without-stream_upstream_zone_module)
STREAM_UPSTREAM_ZONE=NO ;;
--with-google_perftools_module) NGX_GOOGLE_PERFTOOLS=YES ;;
--with-cpp_test_module) NGX_CPP_TEST=YES ;;
@ -436,6 +451,15 @@ cat << END
--without-mail_imap_module disable ngx_mail_imap_module
--without-mail_smtp_module disable ngx_mail_smtp_module
--with-stream enable TCP proxy module
--with-stream_ssl_module enable ngx_stream_ssl_module
--without-stream_upstream_hash_module
disable ngx_stream_upstream_hash_module
--without-stream_upstream_least_conn_module
disable ngx_stream_upstream_least_conn_module
--without-stream_upstream_zone_module
disable ngx_stream_upstream_zone_module
--with-google_perftools_module enable ngx_google_perftools_module
--with-cpp_test_module enable ngx_cpp_test_module

View File

@ -554,6 +554,40 @@ MAIL_AUTH_HTTP_SRCS="src/mail/ngx_mail_auth_http_module.c"
MAIL_PROXY_MODULE="ngx_mail_proxy_module"
MAIL_PROXY_SRCS="src/mail/ngx_mail_proxy_module.c"
STREAM_INCS="src/stream"
STREAM_DEPS="src/stream/ngx_stream.h \
src/stream/ngx_stream_upstream.h \
src/stream/ngx_stream_upstream_round_robin.h"
STREAM_MODULES="ngx_stream_module \
ngx_stream_core_module \
ngx_stream_proxy_module \
ngx_stream_upstream_module"
STREAM_SRCS="src/stream/ngx_stream.c \
src/stream/ngx_stream_handler.c \
src/stream/ngx_stream_core_module.c \
src/stream/ngx_stream_proxy_module.c \
src/stream/ngx_stream_upstream.c \
src/stream/ngx_stream_upstream_round_robin.c"
STREAM_SSL_MODULE="ngx_stream_ssl_module"
STREAM_SSL_DEPS="src/stream/ngx_stream_ssl_module.h"
STREAM_SSL_SRCS="src/stream/ngx_stream_ssl_module.c"
STREAM_UPSTREAM_HASH_MODULE=ngx_stream_upstream_hash_module
STREAM_UPSTREAM_HASH_SRCS=src/stream/ngx_stream_upstream_hash_module.c
STREAM_UPSTREAM_LEAST_CONN_MODULE=ngx_stream_upstream_least_conn_module
STREAM_UPSTREAM_LEAST_CONN_SRCS=" \
src/stream/ngx_stream_upstream_least_conn_module.c"
STREAM_UPSTREAM_ZONE_MODULE=ngx_stream_upstream_zone_module
STREAM_UPSTREAM_ZONE_SRCS=src/stream/ngx_stream_upstream_zone_module.c
NGX_GOOGLE_PERFTOOLS_MODULE=ngx_google_perftools_module
NGX_GOOGLE_PERFTOOLS_SRCS=src/misc/ngx_google_perftools_module.c

View File

@ -86,7 +86,7 @@ static ngx_str_t err_levels[] = {
static const char *debug_levels[] = {
"debug_core", "debug_alloc", "debug_mutex", "debug_event",
"debug_http", "debug_mail", "debug_mysql"
"debug_http", "debug_mail", "debug_mysql", "debug_stream"
};

View File

@ -30,6 +30,7 @@
#define NGX_LOG_DEBUG_HTTP 0x100
#define NGX_LOG_DEBUG_MAIL 0x200
#define NGX_LOG_DEBUG_MYSQL 0x400
#define NGX_LOG_DEBUG_STREAM 0x800
/*
* do not forget to update debug_levels[] in src/core/ngx_log.c
@ -37,7 +38,7 @@
*/
#define NGX_LOG_DEBUG_FIRST NGX_LOG_DEBUG_CORE
#define NGX_LOG_DEBUG_LAST NGX_LOG_DEBUG_MYSQL
#define NGX_LOG_DEBUG_LAST NGX_LOG_DEBUG_STREAM
#define NGX_LOG_DEBUG_CONNECTION 0x80000000
#define NGX_LOG_DEBUG_ALL 0x7ffffff0

557
src/stream/ngx_stream.c Normal file
View File

@ -0,0 +1,557 @@
/*
* Copyright (C) Roman Arutyunyan
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_event.h>
#include <ngx_stream.h>
static char *ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static ngx_int_t ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
ngx_stream_listen_t *listen);
static char *ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports);
static ngx_int_t ngx_stream_add_addrs(ngx_conf_t *cf, ngx_stream_port_t *stport,
ngx_stream_conf_addr_t *addr);
#if (NGX_HAVE_INET6)
static ngx_int_t ngx_stream_add_addrs6(ngx_conf_t *cf,
ngx_stream_port_t *stport, ngx_stream_conf_addr_t *addr);
#endif
static ngx_int_t ngx_stream_cmp_conf_addrs(const void *one, const void *two);
ngx_uint_t ngx_stream_max_module;
static ngx_command_t ngx_stream_commands[] = {
{ ngx_string("stream"),
NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
ngx_stream_block,
0,
0,
NULL },
ngx_null_command
};
static ngx_core_module_t ngx_stream_module_ctx = {
ngx_string("stream"),
NULL,
NULL
};
ngx_module_t ngx_stream_module = {
NGX_MODULE_V1,
&ngx_stream_module_ctx, /* module context */
ngx_stream_commands, /* module directives */
NGX_CORE_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static char *
ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
char *rv;
ngx_uint_t i, m, mi, s;
ngx_conf_t pcf;
ngx_array_t ports;
ngx_stream_listen_t *listen;
ngx_stream_module_t *module;
ngx_stream_conf_ctx_t *ctx;
ngx_stream_core_srv_conf_t **cscfp;
ngx_stream_core_main_conf_t *cmcf;
/* the main stream context */
ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t));
if (ctx == NULL) {
return NGX_CONF_ERROR;
}
*(ngx_stream_conf_ctx_t **) conf = ctx;
/* count the number of the stream modules and set up their indices */
ngx_stream_max_module = 0;
for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_STREAM_MODULE) {
continue;
}
ngx_modules[m]->ctx_index = ngx_stream_max_module++;
}
/* the stream main_conf context, it's the same in the all stream contexts */
ctx->main_conf = ngx_pcalloc(cf->pool,
sizeof(void *) * ngx_stream_max_module);
if (ctx->main_conf == NULL) {
return NGX_CONF_ERROR;
}
/*
* the stream null srv_conf context, it is used to merge
* the server{}s' srv_conf's
*/
ctx->srv_conf = ngx_pcalloc(cf->pool,
sizeof(void *) * ngx_stream_max_module);
if (ctx->srv_conf == NULL) {
return NGX_CONF_ERROR;
}
/*
* create the main_conf's and the null srv_conf's of the all stream modules
*/
for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_STREAM_MODULE) {
continue;
}
module = ngx_modules[m]->ctx;
mi = ngx_modules[m]->ctx_index;
if (module->create_main_conf) {
ctx->main_conf[mi] = module->create_main_conf(cf);
if (ctx->main_conf[mi] == NULL) {
return NGX_CONF_ERROR;
}
}
if (module->create_srv_conf) {
ctx->srv_conf[mi] = module->create_srv_conf(cf);
if (ctx->srv_conf[mi] == NULL) {
return NGX_CONF_ERROR;
}
}
}
/* parse inside the stream{} block */
pcf = *cf;
cf->ctx = ctx;
cf->module_type = NGX_STREAM_MODULE;
cf->cmd_type = NGX_STREAM_MAIN_CONF;
rv = ngx_conf_parse(cf, NULL);
if (rv != NGX_CONF_OK) {
*cf = pcf;
return rv;
}
/* init stream{} main_conf's, merge the server{}s' srv_conf's */
cmcf = ctx->main_conf[ngx_stream_core_module.ctx_index];
cscfp = cmcf->servers.elts;
for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_STREAM_MODULE) {
continue;
}
module = ngx_modules[m]->ctx;
mi = ngx_modules[m]->ctx_index;
/* init stream{} main_conf's */
cf->ctx = ctx;
if (module->init_main_conf) {
rv = module->init_main_conf(cf, ctx->main_conf[mi]);
if (rv != NGX_CONF_OK) {
*cf = pcf;
return rv;
}
}
for (s = 0; s < cmcf->servers.nelts; s++) {
/* merge the server{}s' srv_conf's */
cf->ctx = cscfp[s]->ctx;
if (module->merge_srv_conf) {
rv = module->merge_srv_conf(cf,
ctx->srv_conf[mi],
cscfp[s]->ctx->srv_conf[mi]);
if (rv != NGX_CONF_OK) {
*cf = pcf;
return rv;
}
}
}
}
*cf = pcf;
if (ngx_array_init(&ports, cf->temp_pool, 4, sizeof(ngx_stream_conf_port_t))
!= NGX_OK)
{
return NGX_CONF_ERROR;
}
listen = cmcf->listen.elts;
for (i = 0; i < cmcf->listen.nelts; i++) {
if (ngx_stream_add_ports(cf, &ports, &listen[i]) != NGX_OK) {
return NGX_CONF_ERROR;
}
}
return ngx_stream_optimize_servers(cf, &ports);
}
static ngx_int_t
ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
ngx_stream_listen_t *listen)
{
in_port_t p;
ngx_uint_t i;
struct sockaddr *sa;
struct sockaddr_in *sin;
ngx_stream_conf_port_t *port;
ngx_stream_conf_addr_t *addr;
#if (NGX_HAVE_INET6)
struct sockaddr_in6 *sin6;
#endif
sa = (struct sockaddr *) &listen->sockaddr;
switch (sa->sa_family) {
#if (NGX_HAVE_INET6)
case AF_INET6:
sin6 = (struct sockaddr_in6 *) sa;
p = sin6->sin6_port;
break;
#endif
#if (NGX_HAVE_UNIX_DOMAIN)
case AF_UNIX:
p = 0;
break;
#endif
default: /* AF_INET */
sin = (struct sockaddr_in *) sa;
p = sin->sin_port;
break;
}
port = ports->elts;
for (i = 0; i < ports->nelts; i++) {
if (p == port[i].port && sa->sa_family == port[i].family) {
/* a port is already in the port list */
port = &port[i];
goto found;
}
}
/* add a port to the port list */
port = ngx_array_push(ports);
if (port == NULL) {
return NGX_ERROR;
}
port->family = sa->sa_family;
port->port = p;
if (ngx_array_init(&port->addrs, cf->temp_pool, 2,
sizeof(ngx_stream_conf_addr_t))
!= NGX_OK)
{
return NGX_ERROR;
}
found:
addr = ngx_array_push(&port->addrs);
if (addr == NULL) {
return NGX_ERROR;
}
addr->sockaddr = (struct sockaddr *) &listen->sockaddr;
addr->socklen = listen->socklen;
addr->ctx = listen->ctx;
addr->bind = listen->bind;
addr->wildcard = listen->wildcard;
addr->so_keepalive = listen->so_keepalive;
#if (NGX_HAVE_KEEPALIVE_TUNABLE)
addr->tcp_keepidle = listen->tcp_keepidle;
addr->tcp_keepintvl = listen->tcp_keepintvl;
addr->tcp_keepcnt = listen->tcp_keepcnt;
#endif
#if (NGX_STREAM_SSL)
addr->ssl = listen->ssl;
#endif
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
addr->ipv6only = listen->ipv6only;
#endif
return NGX_OK;
}
static char *
ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports)
{
ngx_uint_t i, p, last, bind_wildcard;
ngx_listening_t *ls;
ngx_stream_port_t *stport;
ngx_stream_conf_port_t *port;
ngx_stream_conf_addr_t *addr;
ngx_stream_core_srv_conf_t *cscf;
port = ports->elts;
for (p = 0; p < ports->nelts; p++) {
ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts,
sizeof(ngx_stream_conf_addr_t), ngx_stream_cmp_conf_addrs);
addr = port[p].addrs.elts;
last = port[p].addrs.nelts;
/*
* if there is the binding to the "*:port" then we need to bind()
* to the "*:port" only and ignore the other bindings
*/
if (addr[last - 1].wildcard) {
addr[last - 1].bind = 1;
bind_wildcard = 1;
} else {
bind_wildcard = 0;
}
i = 0;
while (i < last) {
if (bind_wildcard && !addr[i].bind) {
i++;
continue;
}
ls = ngx_create_listening(cf, addr[i].sockaddr, addr[i].socklen);
if (ls == NULL) {
return NGX_CONF_ERROR;
}
ls->addr_ntop = 1;
ls->handler = ngx_stream_init_connection;
ls->pool_size = 256;
cscf = addr->ctx->srv_conf[ngx_stream_core_module.ctx_index];
ls->logp = cscf->error_log;
ls->log.data = &ls->addr_text;
ls->log.handler = ngx_accept_log_error;
ls->keepalive = addr[i].so_keepalive;
#if (NGX_HAVE_KEEPALIVE_TUNABLE)
ls->keepidle = addr[i].tcp_keepidle;
ls->keepintvl = addr[i].tcp_keepintvl;
ls->keepcnt = addr[i].tcp_keepcnt;
#endif
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
ls->ipv6only = addr[i].ipv6only;
#endif
stport = ngx_palloc(cf->pool, sizeof(ngx_stream_port_t));
if (stport == NULL) {
return NGX_CONF_ERROR;
}
ls->servers = stport;
if (i == last - 1) {
stport->naddrs = last;
} else {
stport->naddrs = 1;
i = 0;
}
switch (ls->sockaddr->sa_family) {
#if (NGX_HAVE_INET6)
case AF_INET6:
if (ngx_stream_add_addrs6(cf, stport, addr) != NGX_OK) {
return NGX_CONF_ERROR;
}
break;
#endif
default: /* AF_INET */
if (ngx_stream_add_addrs(cf, stport, addr) != NGX_OK) {
return NGX_CONF_ERROR;
}
break;
}
addr++;
last--;
}
}
return NGX_CONF_OK;
}
static ngx_int_t
ngx_stream_add_addrs(ngx_conf_t *cf, ngx_stream_port_t *stport,
ngx_stream_conf_addr_t *addr)
{
u_char *p;
size_t len;
ngx_uint_t i;
struct sockaddr_in *sin;
ngx_stream_in_addr_t *addrs;
u_char buf[NGX_SOCKADDR_STRLEN];
stport->addrs = ngx_pcalloc(cf->pool,
stport->naddrs * sizeof(ngx_stream_in_addr_t));
if (stport->addrs == NULL) {
return NGX_ERROR;
}
addrs = stport->addrs;
for (i = 0; i < stport->naddrs; i++) {
sin = (struct sockaddr_in *) addr[i].sockaddr;
addrs[i].addr = sin->sin_addr.s_addr;
addrs[i].conf.ctx = addr[i].ctx;
#if (NGX_STREAM_SSL)
addrs[i].conf.ssl = addr[i].ssl;
#endif
len = ngx_sock_ntop(addr[i].sockaddr, addr[i].socklen, buf,
NGX_SOCKADDR_STRLEN, 1);
p = ngx_pnalloc(cf->pool, len);
if (p == NULL) {
return NGX_ERROR;
}
ngx_memcpy(p, buf, len);
addrs[i].conf.addr_text.len = len;
addrs[i].conf.addr_text.data = p;
}
return NGX_OK;
}
#if (NGX_HAVE_INET6)
static ngx_int_t
ngx_stream_add_addrs6(ngx_conf_t *cf, ngx_stream_port_t *stport,
ngx_stream_conf_addr_t *addr)
{
u_char *p;
size_t len;
ngx_uint_t i;
struct sockaddr_in6 *sin6;
ngx_stream_in6_addr_t *addrs6;
u_char buf[NGX_SOCKADDR_STRLEN];
stport->addrs = ngx_pcalloc(cf->pool,
stport->naddrs * sizeof(ngx_stream_in6_addr_t));
if (stport->addrs == NULL) {
return NGX_ERROR;
}
addrs6 = stport->addrs;
for (i = 0; i < stport->naddrs; i++) {
sin6 = (struct sockaddr_in6 *) addr[i].sockaddr;
addrs6[i].addr6 = sin6->sin6_addr;
addrs6[i].conf.ctx = addr[i].ctx;
#if (NGX_STREAM_SSL)
addrs6[i].conf.ssl = addr[i].ssl;
#endif
len = ngx_sock_ntop(addr[i].sockaddr, addr[i].socklen, buf,
NGX_SOCKADDR_STRLEN, 1);
p = ngx_pnalloc(cf->pool, len);
if (p == NULL) {
return NGX_ERROR;
}
ngx_memcpy(p, buf, len);
addrs6[i].conf.addr_text.len = len;
addrs6[i].conf.addr_text.data = p;
}
return NGX_OK;
}
#endif
static ngx_int_t
ngx_stream_cmp_conf_addrs(const void *one, const void *two)
{
ngx_stream_conf_addr_t *first, *second;
first = (ngx_stream_conf_addr_t *) one;
second = (ngx_stream_conf_addr_t *) two;
if (first->wildcard) {
/* a wildcard must be the last resort, shift it to the end */
return 1;
}
if (second->wildcard) {
/* a wildcard must be the last resort, shift it to the end */
return -1;
}
if (first->bind && !second->bind) {
/* shift explicit bind()ed addresses to the start */
return -1;
}
if (!first->bind && second->bind) {
/* shift explicit bind()ed addresses to the start */
return 1;
}
/* do not sort by default */
return 0;
}

215
src/stream/ngx_stream.h Normal file
View File

@ -0,0 +1,215 @@
/*
* Copyright (C) Roman Arutyunyan
* Copyright (C) Nginx, Inc.
*/
#ifndef _NGX_STREAM_H_INCLUDED_
#define _NGX_STREAM_H_INCLUDED_
#include <ngx_config.h>
#include <ngx_core.h>
#if (NGX_STREAM_SSL)
#include <ngx_stream_ssl_module.h>
#endif
typedef struct ngx_stream_session_s ngx_stream_session_t;
#include <ngx_stream_upstream.h>
#include <ngx_stream_upstream_round_robin.h>
typedef struct {
void **main_conf;
void **srv_conf;
} ngx_stream_conf_ctx_t;
typedef struct {
u_char sockaddr[NGX_SOCKADDRLEN];
socklen_t socklen;
/* server ctx */
ngx_stream_conf_ctx_t *ctx;
unsigned bind:1;
unsigned wildcard:1;
#if (NGX_STREAM_SSL)
unsigned ssl:1;
#endif
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
unsigned ipv6only:1;
#endif
unsigned so_keepalive:2;
#if (NGX_HAVE_KEEPALIVE_TUNABLE)
int tcp_keepidle;
int tcp_keepintvl;
int tcp_keepcnt;
#endif
} ngx_stream_listen_t;
typedef struct {
ngx_stream_conf_ctx_t *ctx;
ngx_str_t addr_text;
#if (NGX_STREAM_SSL)
ngx_uint_t ssl; /* unsigned ssl:1; */
#endif
} ngx_stream_addr_conf_t;
typedef struct {
in_addr_t addr;
ngx_stream_addr_conf_t conf;
} ngx_stream_in_addr_t;
#if (NGX_HAVE_INET6)
typedef struct {
struct in6_addr addr6;
ngx_stream_addr_conf_t conf;
} ngx_stream_in6_addr_t;
#endif
typedef struct {
/* ngx_stream_in_addr_t or ngx_stream_in6_addr_t */
void *addrs;
ngx_uint_t naddrs;
} ngx_stream_port_t;
typedef struct {
int family;
in_port_t port;
ngx_array_t addrs; /* array of ngx_stream_conf_addr_t */
} ngx_stream_conf_port_t;
typedef struct {
struct sockaddr *sockaddr;
socklen_t socklen;
ngx_stream_conf_ctx_t *ctx;
unsigned bind:1;
unsigned wildcard:1;
#if (NGX_STREAM_SSL)
unsigned ssl:1;
#endif
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
unsigned ipv6only:1;
#endif
unsigned so_keepalive:2;
#if (NGX_HAVE_KEEPALIVE_TUNABLE)
int tcp_keepidle;
int tcp_keepintvl;
int tcp_keepcnt;
#endif
} ngx_stream_conf_addr_t;
typedef struct {
ngx_array_t servers; /* ngx_stream_core_srv_conf_t */
ngx_array_t listen; /* ngx_stream_listen_t */
} ngx_stream_core_main_conf_t;
typedef void (*ngx_stream_handler_pt)(ngx_stream_session_t *s);
typedef struct {
ngx_stream_handler_pt handler;
ngx_stream_conf_ctx_t *ctx;
u_char *file_name;
ngx_int_t line;
ngx_log_t *error_log;
} ngx_stream_core_srv_conf_t;
struct ngx_stream_session_s {
uint32_t signature; /* "STRM" */
ngx_connection_t *connection;
off_t received;
ngx_log_handler_pt log_handler;
void **ctx;
void **main_conf;
void **srv_conf;
ngx_stream_upstream_t *upstream;
};
typedef struct {
void *(*create_main_conf)(ngx_conf_t *cf);
char *(*init_main_conf)(ngx_conf_t *cf, void *conf);
void *(*create_srv_conf)(ngx_conf_t *cf);
char *(*merge_srv_conf)(ngx_conf_t *cf, void *prev,
void *conf);
} ngx_stream_module_t;
#define NGX_STREAM_MODULE 0x4d525453 /* "STRM" */
#define NGX_STREAM_MAIN_CONF 0x02000000
#define NGX_STREAM_SRV_CONF 0x04000000
#define NGX_STREAM_UPS_CONF 0x08000000
#define NGX_STREAM_MAIN_CONF_OFFSET offsetof(ngx_stream_conf_ctx_t, main_conf)
#define NGX_STREAM_SRV_CONF_OFFSET offsetof(ngx_stream_conf_ctx_t, srv_conf)
#define ngx_stream_get_module_ctx(s, module) (s)->ctx[module.ctx_index]
#define ngx_stream_set_ctx(s, c, module) s->ctx[module.ctx_index] = c;
#define ngx_stream_delete_ctx(s, module) s->ctx[module.ctx_index] = NULL;
#define ngx_stream_get_module_main_conf(s, module) \
(s)->main_conf[module.ctx_index]
#define ngx_stream_get_module_srv_conf(s, module) \
(s)->srv_conf[module.ctx_index]
#define ngx_stream_conf_get_module_main_conf(cf, module) \
((ngx_stream_conf_ctx_t *) cf->ctx)->main_conf[module.ctx_index]
#define ngx_stream_conf_get_module_srv_conf(cf, module) \
((ngx_stream_conf_ctx_t *) cf->ctx)->srv_conf[module.ctx_index]
#define ngx_stream_cycle_get_module_main_conf(cycle, module) \
(cycle->conf_ctx[ngx_stream_module.index] ? \
((ngx_stream_conf_ctx_t *) cycle->conf_ctx[ngx_stream_module.index]) \
->main_conf[module.ctx_index]: \
NULL)
#define ngx_stream_set_connection_log(c, l) \
\
c->log->file = l->file; \
c->log->next = l->next; \
c->log->writer = l->writer; \
c->log->wdata = l->wdata; \
if (!(c->log->log_level & NGX_LOG_DEBUG_CONNECTION)) { \
c->log->log_level = l->log_level; \
}
void ngx_stream_init_connection(ngx_connection_t *c);
void ngx_stream_close_connection(ngx_connection_t *c);
extern ngx_module_t ngx_stream_module;
extern ngx_uint_t ngx_stream_max_module;
extern ngx_module_t ngx_stream_core_module;
#endif /* _NGX_STREAM_H_INCLUDED_ */

View File

@ -0,0 +1,495 @@
/*
* Copyright (C) Roman Arutyunyan
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_stream.h>
static void *ngx_stream_core_create_main_conf(ngx_conf_t *cf);
static void *ngx_stream_core_create_srv_conf(ngx_conf_t *cf);
static char *ngx_stream_core_merge_srv_conf(ngx_conf_t *cf, void *parent,
void *child);
static char *ngx_stream_core_error_log(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_stream_core_server(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_stream_core_listen(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static ngx_command_t ngx_stream_core_commands[] = {
{ ngx_string("server"),
NGX_STREAM_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
ngx_stream_core_server,
0,
0,
NULL },
{ ngx_string("listen"),
NGX_STREAM_SRV_CONF|NGX_CONF_1MORE,
ngx_stream_core_listen,
NGX_STREAM_SRV_CONF_OFFSET,
0,
NULL },
{ ngx_string("error_log"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_1MORE,
ngx_stream_core_error_log,
NGX_STREAM_SRV_CONF_OFFSET,
0,
NULL },
ngx_null_command
};
static ngx_stream_module_t ngx_stream_core_module_ctx = {
ngx_stream_core_create_main_conf, /* create main configuration */
NULL, /* init main configuration */
ngx_stream_core_create_srv_conf, /* create server configuration */
ngx_stream_core_merge_srv_conf /* merge server configuration */
};
ngx_module_t ngx_stream_core_module = {
NGX_MODULE_V1,
&ngx_stream_core_module_ctx, /* module context */
ngx_stream_core_commands, /* module directives */
NGX_STREAM_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static void *
ngx_stream_core_create_main_conf(ngx_conf_t *cf)
{
ngx_stream_core_main_conf_t *cmcf;
cmcf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_core_main_conf_t));
if (cmcf == NULL) {
return NULL;
}
if (ngx_array_init(&cmcf->servers, cf->pool, 4,
sizeof(ngx_stream_core_srv_conf_t *))
!= NGX_OK)
{
return NULL;
}
if (ngx_array_init(&cmcf->listen, cf->pool, 4, sizeof(ngx_stream_listen_t))
!= NGX_OK)
{
return NULL;
}
return cmcf;
}
static void *
ngx_stream_core_create_srv_conf(ngx_conf_t *cf)
{
ngx_stream_core_srv_conf_t *cscf;
cscf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_core_srv_conf_t));
if (cscf == NULL) {
return NULL;
}
/*
* set by ngx_pcalloc():
*
* cscf->handler = NULL;
* cscf->error_log = NULL;
*/
cscf->file_name = cf->conf_file->file.name.data;
cscf->line = cf->conf_file->line;
return cscf;
}
static char *
ngx_stream_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_stream_core_srv_conf_t *prev = parent;
ngx_stream_core_srv_conf_t *conf = child;
if (conf->handler == NULL) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"no handler for server in %s:%ui",
conf->file_name, conf->line);
return NGX_CONF_ERROR;
}
if (conf->error_log == NULL) {
if (prev->error_log) {
conf->error_log = prev->error_log;
} else {
conf->error_log = &cf->cycle->new_log;
}
}
return NGX_CONF_OK;
}
static char *
ngx_stream_core_error_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_stream_core_srv_conf_t *cscf = conf;
return ngx_log_set_log(cf, &cscf->error_log);
}
static char *
ngx_stream_core_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
char *rv;
void *mconf;
ngx_uint_t m;
ngx_conf_t pcf;
ngx_stream_module_t *module;
ngx_stream_conf_ctx_t *ctx, *stream_ctx;
ngx_stream_core_srv_conf_t *cscf, **cscfp;
ngx_stream_core_main_conf_t *cmcf;
ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t));
if (ctx == NULL) {
return NGX_CONF_ERROR;
}
stream_ctx = cf->ctx;
ctx->main_conf = stream_ctx->main_conf;
/* the server{}'s srv_conf */
ctx->srv_conf = ngx_pcalloc(cf->pool,
sizeof(void *) * ngx_stream_max_module);
if (ctx->srv_conf == NULL) {
return NGX_CONF_ERROR;
}
for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_STREAM_MODULE) {
continue;
}
module = ngx_modules[m]->ctx;
if (module->create_srv_conf) {
mconf = module->create_srv_conf(cf);
if (mconf == NULL) {
return NGX_CONF_ERROR;
}
ctx->srv_conf[ngx_modules[m]->ctx_index] = mconf;
}
}
/* the server configuration context */
cscf = ctx->srv_conf[ngx_stream_core_module.ctx_index];
cscf->ctx = ctx;
cmcf = ctx->main_conf[ngx_stream_core_module.ctx_index];
cscfp = ngx_array_push(&cmcf->servers);
if (cscfp == NULL) {
return NGX_CONF_ERROR;
}
*cscfp = cscf;
/* parse inside server{} */
pcf = *cf;
cf->ctx = ctx;
cf->cmd_type = NGX_STREAM_SRV_CONF;
rv = ngx_conf_parse(cf, NULL);
*cf = pcf;
return rv;
}
static char *
ngx_stream_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
size_t len, off;
in_port_t port;
ngx_str_t *value;
ngx_url_t u;
ngx_uint_t i;
struct sockaddr *sa;
struct sockaddr_in *sin;
ngx_stream_listen_t *ls;
ngx_stream_core_main_conf_t *cmcf;
#if (NGX_HAVE_INET6)
struct sockaddr_in6 *sin6;
#endif
value = cf->args->elts;
ngx_memzero(&u, sizeof(ngx_url_t));
u.url = value[1];
u.listen = 1;
if (ngx_parse_url(cf->pool, &u) != NGX_OK) {
if (u.err) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"%s in \"%V\" of the \"listen\" directive",
u.err, &u.url);
}
return NGX_CONF_ERROR;
}
cmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_core_module);
ls = cmcf->listen.elts;
for (i = 0; i < cmcf->listen.nelts; i++) {
sa = (struct sockaddr *) ls[i].sockaddr;
if (sa->sa_family != u.family) {
continue;
}
switch (sa->sa_family) {
#if (NGX_HAVE_INET6)
case AF_INET6:
off = offsetof(struct sockaddr_in6, sin6_addr);
len = 16;
sin6 = (struct sockaddr_in6 *) sa;
port = sin6->sin6_port;
break;
#endif
#if (NGX_HAVE_UNIX_DOMAIN)
case AF_UNIX:
off = offsetof(struct sockaddr_un, sun_path);
len = sizeof(((struct sockaddr_un *) sa)->sun_path);
port = 0;
break;
#endif
default: /* AF_INET */
off = offsetof(struct sockaddr_in, sin_addr);
len = 4;
sin = (struct sockaddr_in *) sa;
port = sin->sin_port;
break;
}
if (ngx_memcmp(ls[i].sockaddr + off, u.sockaddr + off, len) != 0) {
continue;
}
if (port != u.port) {
continue;
}
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"duplicate \"%V\" address and port pair", &u.url);
return NGX_CONF_ERROR;
}
ls = ngx_array_push(&cmcf->listen);
if (ls == NULL) {
return NGX_CONF_ERROR;
}
ngx_memzero(ls, sizeof(ngx_stream_listen_t));
ngx_memcpy(ls->sockaddr, u.sockaddr, u.socklen);
ls->socklen = u.socklen;
ls->wildcard = u.wildcard;
ls->ctx = cf->ctx;
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
ls->ipv6only = 1;
#endif
for (i = 2; i < cf->args->nelts; i++) {
if (ngx_strcmp(value[i].data, "bind") == 0) {
ls->bind = 1;
continue;
}
if (ngx_strncmp(value[i].data, "ipv6only=o", 10) == 0) {
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
struct sockaddr *sa;
u_char buf[NGX_SOCKADDR_STRLEN];
sa = (struct sockaddr *) ls->sockaddr;
if (sa->sa_family == AF_INET6) {
if (ngx_strcmp(&value[i].data[10], "n") == 0) {
ls->ipv6only = 1;
} else if (ngx_strcmp(&value[i].data[10], "ff") == 0) {
ls->ipv6only = 0;
} else {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid ipv6only flags \"%s\"",
&value[i].data[9]);
return NGX_CONF_ERROR;
}
ls->bind = 1;
} else {
len = ngx_sock_ntop(sa, ls->socklen, buf,
NGX_SOCKADDR_STRLEN, 1);
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipv6only is not supported "
"on addr \"%*s\", ignored", len, buf);
}
continue;
#else
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"bind ipv6only is not supported "
"on this platform");
return NGX_CONF_ERROR;
#endif
}
if (ngx_strcmp(value[i].data, "ssl") == 0) {
#if (NGX_STREAM_SSL)
ls->ssl = 1;
continue;
#else
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"the \"ssl\" parameter requires "
"ngx_stream_ssl_module");
return NGX_CONF_ERROR;
#endif
}
if (ngx_strncmp(value[i].data, "so_keepalive=", 13) == 0) {
if (ngx_strcmp(&value[i].data[13], "on") == 0) {
ls->so_keepalive = 1;
} else if (ngx_strcmp(&value[i].data[13], "off") == 0) {
ls->so_keepalive = 2;
} else {
#if (NGX_HAVE_KEEPALIVE_TUNABLE)
u_char *p, *end;
ngx_str_t s;
end = value[i].data + value[i].len;
s.data = value[i].data + 13;
p = ngx_strlchr(s.data, end, ':');
if (p == NULL) {
p = end;
}
if (p > s.data) {
s.len = p - s.data;
ls->tcp_keepidle = ngx_parse_time(&s, 1);
if (ls->tcp_keepidle == (time_t) NGX_ERROR) {
goto invalid_so_keepalive;
}
}
s.data = (p < end) ? (p + 1) : end;
p = ngx_strlchr(s.data, end, ':');
if (p == NULL) {
p = end;
}
if (p > s.data) {
s.len = p - s.data;
ls->tcp_keepintvl = ngx_parse_time(&s, 1);
if (ls->tcp_keepintvl == (time_t) NGX_ERROR) {
goto invalid_so_keepalive;
}
}
s.data = (p < end) ? (p + 1) : end;
if (s.data < end) {
s.len = end - s.data;
ls->tcp_keepcnt = ngx_atoi(s.data, s.len);
if (ls->tcp_keepcnt == NGX_ERROR) {
goto invalid_so_keepalive;
}
}
if (ls->tcp_keepidle == 0 && ls->tcp_keepintvl == 0
&& ls->tcp_keepcnt == 0)
{
goto invalid_so_keepalive;
}
ls->so_keepalive = 1;
#else
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"the \"so_keepalive\" parameter accepts "
"only \"on\" or \"off\" on this platform");
return NGX_CONF_ERROR;
#endif
}
ls->bind = 1;
continue;
#if (NGX_HAVE_KEEPALIVE_TUNABLE)
invalid_so_keepalive:
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid so_keepalive value: \"%s\"",
&value[i].data[13]);
return NGX_CONF_ERROR;
#endif
}
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"the invalid \"%V\" parameter", &value[i]);
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}

View File

@ -0,0 +1,296 @@
/*
* Copyright (C) Roman Arutyunyan
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_event.h>
#include <ngx_stream.h>
static u_char *ngx_stream_log_error(ngx_log_t *log, u_char *buf, size_t len);
static void ngx_stream_init_session(ngx_connection_t *c);
#if (NGX_STREAM_SSL)
static void ngx_stream_ssl_init_connection(ngx_ssl_t *ssl, ngx_connection_t *c);
static void ngx_stream_ssl_handshake_handler(ngx_connection_t *c);
#endif
void
ngx_stream_init_connection(ngx_connection_t *c)
{
u_char text[NGX_SOCKADDR_STRLEN];
size_t len;
ngx_uint_t i;
struct sockaddr *sa;
ngx_stream_port_t *port;
struct sockaddr_in *sin;
ngx_stream_in_addr_t *addr;
ngx_stream_session_t *s;
ngx_stream_addr_conf_t *addr_conf;
#if (NGX_HAVE_INET6)
struct sockaddr_in6 *sin6;
ngx_stream_in6_addr_t *addr6;
#endif
ngx_stream_core_srv_conf_t *cscf;
/* find the server configuration for the address:port */
port = c->listening->servers;
if (port->naddrs > 1) {
/*
* There are several addresses on this port and one of them
* is the "*:port" wildcard so getsockname() is needed to determine
* the server address.
*
* AcceptEx() already gave this address.
*/
if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) {
ngx_stream_close_connection(c);
return;
}
sa = c->local_sockaddr;
switch (sa->sa_family) {
#if (NGX_HAVE_INET6)
case AF_INET6:
sin6 = (struct sockaddr_in6 *) sa;
addr6 = port->addrs;
/* the last address is "*" */
for (i = 0; i < port->naddrs - 1; i++) {
if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) {
break;
}
}
addr_conf = &addr6[i].conf;
break;
#endif
default: /* AF_INET */
sin = (struct sockaddr_in *) sa;
addr = port->addrs;
/* the last address is "*" */
for (i = 0; i < port->naddrs - 1; i++) {
if (addr[i].addr == sin->sin_addr.s_addr) {
break;
}
}
addr_conf = &addr[i].conf;
break;
}
} else {
switch (c->local_sockaddr->sa_family) {
#if (NGX_HAVE_INET6)
case AF_INET6:
addr6 = port->addrs;
addr_conf = &addr6[0].conf;
break;
#endif
default: /* AF_INET */
addr = port->addrs;
addr_conf = &addr[0].conf;
break;
}
}
s = ngx_pcalloc(c->pool, sizeof(ngx_stream_session_t));
if (s == NULL) {
ngx_stream_close_connection(c);
return;
}
s->signature = NGX_STREAM_MODULE;
s->main_conf = addr_conf->ctx->main_conf;
s->srv_conf = addr_conf->ctx->srv_conf;
s->connection = c;
c->data = s;
cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
ngx_stream_set_connection_log(c, cscf->error_log);
len = ngx_sock_ntop(c->sockaddr, c->socklen, text, NGX_SOCKADDR_STRLEN, 1);
ngx_log_error(NGX_LOG_INFO, c->log, 0, "*%uA client %*s connected to %V",
c->number, len, text, &addr_conf->addr_text);
c->log->connection = c->number;
c->log->handler = ngx_stream_log_error;
c->log->data = s;
c->log->action = "initializing connection";
c->log_error = NGX_ERROR_INFO;
#if (NGX_STREAM_SSL)
{
ngx_stream_ssl_conf_t *sslcf;
sslcf = ngx_stream_get_module_srv_conf(s, ngx_stream_ssl_module);
if (addr_conf->ssl) {
c->log->action = "SSL handshaking";
if (sslcf->ssl.ctx == NULL) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"no \"ssl_certificate\" is defined "
"in server listening on SSL port");
ngx_stream_close_connection(c);
return;
}
ngx_stream_ssl_init_connection(&sslcf->ssl, c);
return;
}
}
#endif
ngx_stream_init_session(c);
}
static void
ngx_stream_init_session(ngx_connection_t *c)
{
ngx_stream_session_t *s;
ngx_stream_core_srv_conf_t *cscf;
s = c->data;
c->log->action = "handling client connection";
cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_stream_max_module);
if (s->ctx == NULL) {
ngx_stream_close_connection(c);
return;
}
cscf->handler(s);
}
#if (NGX_STREAM_SSL)
static void
ngx_stream_ssl_init_connection(ngx_ssl_t *ssl, ngx_connection_t *c)
{
ngx_stream_session_t *s;
ngx_stream_ssl_conf_t *sslcf;
if (ngx_ssl_create_connection(ssl, c, 0) == NGX_ERROR) {
ngx_stream_close_connection(c);
return;
}
if (ngx_ssl_handshake(c) == NGX_AGAIN) {
s = c->data;
sslcf = ngx_stream_get_module_srv_conf(s, ngx_stream_ssl_module);
ngx_add_timer(c->read, sslcf->handshake_timeout);
c->ssl->handler = ngx_stream_ssl_handshake_handler;
return;
}
ngx_stream_ssl_handshake_handler(c);
}
static void
ngx_stream_ssl_handshake_handler(ngx_connection_t *c)
{
if (!c->ssl->handshaked) {
ngx_stream_close_connection(c);
return;
}
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
ngx_stream_init_session(c);
}
#endif
void
ngx_stream_close_connection(ngx_connection_t *c)
{
ngx_pool_t *pool;
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
"close stream connection: %d", c->fd);
#if (NGX_STREAM_SSL)
if (c->ssl) {
if (ngx_ssl_shutdown(c) == NGX_AGAIN) {
c->ssl->handler = ngx_stream_close_connection;
return;
}
}
#endif
#if (NGX_STAT_STUB)
(void) ngx_atomic_fetch_add(ngx_stat_active, -1);
#endif
pool = c->pool;
ngx_close_connection(c);
ngx_destroy_pool(pool);
}
static u_char *
ngx_stream_log_error(ngx_log_t *log, u_char *buf, size_t len)
{
u_char *p;
ngx_stream_session_t *s;
if (log->action) {
p = ngx_snprintf(buf, len, " while %s", log->action);
len -= p - buf;
buf = p;
}
s = log->data;
p = ngx_snprintf(buf, len, ", client: %V, server: %V",
&s->connection->addr_text,
&s->connection->listening->addr_text);
if (s->log_handler) {
return s->log_handler(log, p, len);
}
return p;
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,456 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_stream.h>
#define NGX_DEFAULT_CIPHERS "HIGH:!aNULL:!MD5"
#define NGX_DEFAULT_ECDH_CURVE "prime256v1"
static void *ngx_stream_ssl_create_conf(ngx_conf_t *cf);
static char *ngx_stream_ssl_merge_conf(ngx_conf_t *cf, void *parent,
void *child);
static char *ngx_stream_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_stream_ssl_session_cache(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static ngx_conf_bitmask_t ngx_stream_ssl_protocols[] = {
{ ngx_string("SSLv2"), NGX_SSL_SSLv2 },
{ ngx_string("SSLv3"), NGX_SSL_SSLv3 },
{ ngx_string("TLSv1"), NGX_SSL_TLSv1 },
{ ngx_string("TLSv1.1"), NGX_SSL_TLSv1_1 },
{ ngx_string("TLSv1.2"), NGX_SSL_TLSv1_2 },
{ ngx_null_string, 0 }
};
static ngx_command_t ngx_stream_ssl_commands[] = {
{ ngx_string("ssl_handshake_timeout"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_STREAM_SRV_CONF_OFFSET,
offsetof(ngx_stream_ssl_conf_t, handshake_timeout),
NULL },
{ ngx_string("ssl_certificate"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_STREAM_SRV_CONF_OFFSET,
offsetof(ngx_stream_ssl_conf_t, certificate),
NULL },
{ ngx_string("ssl_certificate_key"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_STREAM_SRV_CONF_OFFSET,
offsetof(ngx_stream_ssl_conf_t, certificate_key),
NULL },
{ ngx_string("ssl_password_file"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
ngx_stream_ssl_password_file,
NGX_STREAM_SRV_CONF_OFFSET,
0,
NULL },
{ ngx_string("ssl_dhparam"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_STREAM_SRV_CONF_OFFSET,
offsetof(ngx_stream_ssl_conf_t, dhparam),
NULL },
{ ngx_string("ssl_ecdh_curve"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_STREAM_SRV_CONF_OFFSET,
offsetof(ngx_stream_ssl_conf_t, ecdh_curve),
NULL },
{ ngx_string("ssl_protocols"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_1MORE,
ngx_conf_set_bitmask_slot,
NGX_STREAM_SRV_CONF_OFFSET,
offsetof(ngx_stream_ssl_conf_t, protocols),
&ngx_stream_ssl_protocols },
{ ngx_string("ssl_ciphers"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_STREAM_SRV_CONF_OFFSET,
offsetof(ngx_stream_ssl_conf_t, ciphers),
NULL },
{ ngx_string("ssl_prefer_server_ciphers"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
ngx_conf_set_flag_slot,
NGX_STREAM_SRV_CONF_OFFSET,
offsetof(ngx_stream_ssl_conf_t, prefer_server_ciphers),
NULL },
{ ngx_string("ssl_session_cache"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE12,
ngx_stream_ssl_session_cache,
NGX_STREAM_SRV_CONF_OFFSET,
0,
NULL },
{ ngx_string("ssl_session_tickets"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
ngx_conf_set_flag_slot,
NGX_STREAM_SRV_CONF_OFFSET,
offsetof(ngx_stream_ssl_conf_t, session_tickets),
NULL },
{ ngx_string("ssl_session_ticket_key"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_array_slot,
NGX_STREAM_SRV_CONF_OFFSET,
offsetof(ngx_stream_ssl_conf_t, session_ticket_keys),
NULL },
{ ngx_string("ssl_session_timeout"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot,
NGX_STREAM_SRV_CONF_OFFSET,
offsetof(ngx_stream_ssl_conf_t, session_timeout),
NULL },
ngx_null_command
};
static ngx_stream_module_t ngx_stream_ssl_module_ctx = {
NULL, /* create main configuration */
NULL, /* init main configuration */
ngx_stream_ssl_create_conf, /* create server configuration */
ngx_stream_ssl_merge_conf /* merge server configuration */
};
ngx_module_t ngx_stream_ssl_module = {
NGX_MODULE_V1,
&ngx_stream_ssl_module_ctx, /* module context */
ngx_stream_ssl_commands, /* module directives */
NGX_STREAM_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static ngx_str_t ngx_stream_ssl_sess_id_ctx = ngx_string("STREAM");
static void *
ngx_stream_ssl_create_conf(ngx_conf_t *cf)
{
ngx_stream_ssl_conf_t *scf;
scf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_ssl_conf_t));
if (scf == NULL) {
return NULL;
}
/*
* set by ngx_pcalloc():
*
* scf->protocols = 0;
* scf->certificate = { 0, NULL };
* scf->certificate_key = { 0, NULL };
* scf->dhparam = { 0, NULL };
* scf->ecdh_curve = { 0, NULL };
* scf->ciphers = { 0, NULL };
* scf->shm_zone = NULL;
*/
scf->handshake_timeout = NGX_CONF_UNSET_MSEC;
scf->passwords = NGX_CONF_UNSET_PTR;
scf->prefer_server_ciphers = NGX_CONF_UNSET;
scf->builtin_session_cache = NGX_CONF_UNSET;
scf->session_timeout = NGX_CONF_UNSET;
scf->session_tickets = NGX_CONF_UNSET;
scf->session_ticket_keys = NGX_CONF_UNSET_PTR;
return scf;
}
static char *
ngx_stream_ssl_merge_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_stream_ssl_conf_t *prev = parent;
ngx_stream_ssl_conf_t *conf = child;
ngx_pool_cleanup_t *cln;
ngx_conf_merge_msec_value(conf->handshake_timeout,
prev->handshake_timeout, 60000);
ngx_conf_merge_value(conf->session_timeout,
prev->session_timeout, 300);
ngx_conf_merge_value(conf->prefer_server_ciphers,
prev->prefer_server_ciphers, 0);
ngx_conf_merge_bitmask_value(conf->protocols, prev->protocols,
(NGX_CONF_BITMASK_SET|NGX_SSL_SSLv3|NGX_SSL_TLSv1
|NGX_SSL_TLSv1_1|NGX_SSL_TLSv1_2));
ngx_conf_merge_str_value(conf->certificate, prev->certificate, "");
ngx_conf_merge_str_value(conf->certificate_key, prev->certificate_key, "");
ngx_conf_merge_ptr_value(conf->passwords, prev->passwords, NULL);
ngx_conf_merge_str_value(conf->dhparam, prev->dhparam, "");
ngx_conf_merge_str_value(conf->ecdh_curve, prev->ecdh_curve,
NGX_DEFAULT_ECDH_CURVE);
ngx_conf_merge_str_value(conf->ciphers, prev->ciphers, NGX_DEFAULT_CIPHERS);
conf->ssl.log = cf->log;
if (conf->certificate.len == 0) {
return NGX_CONF_OK;
}
if (conf->certificate_key.len == 0) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"no \"ssl_certificate_key\" is defined "
"for certificate \"%V\"",
&conf->certificate);
return NGX_CONF_ERROR;
}
if (ngx_ssl_create(&conf->ssl, conf->protocols, NULL) != NGX_OK) {
return NGX_CONF_ERROR;
}
cln = ngx_pool_cleanup_add(cf->pool, 0);
if (cln == NULL) {
return NGX_CONF_ERROR;
}
cln->handler = ngx_ssl_cleanup_ctx;
cln->data = &conf->ssl;
if (ngx_ssl_certificate(cf, &conf->ssl, &conf->certificate,
&conf->certificate_key, conf->passwords)
!= NGX_OK)
{
return NGX_CONF_ERROR;
}
if (SSL_CTX_set_cipher_list(conf->ssl.ctx,
(const char *) conf->ciphers.data)
== 0)
{
ngx_ssl_error(NGX_LOG_EMERG, cf->log, 0,
"SSL_CTX_set_cipher_list(\"%V\") failed",
&conf->ciphers);
return NGX_CONF_ERROR;
}
if (conf->prefer_server_ciphers) {
SSL_CTX_set_options(conf->ssl.ctx, SSL_OP_CIPHER_SERVER_PREFERENCE);
}
SSL_CTX_set_tmp_rsa_callback(conf->ssl.ctx, ngx_ssl_rsa512_key_callback);
if (ngx_ssl_dhparam(cf, &conf->ssl, &conf->dhparam) != NGX_OK) {
return NGX_CONF_ERROR;
}
if (ngx_ssl_ecdh_curve(cf, &conf->ssl, &conf->ecdh_curve) != NGX_OK) {
return NGX_CONF_ERROR;
}
ngx_conf_merge_value(conf->builtin_session_cache,
prev->builtin_session_cache, NGX_SSL_NONE_SCACHE);
if (conf->shm_zone == NULL) {
conf->shm_zone = prev->shm_zone;
}
if (ngx_ssl_session_cache(&conf->ssl, &ngx_stream_ssl_sess_id_ctx,
conf->builtin_session_cache,
conf->shm_zone, conf->session_timeout)
!= NGX_OK)
{
return NGX_CONF_ERROR;
}
ngx_conf_merge_value(conf->session_tickets,
prev->session_tickets, 1);
#ifdef SSL_OP_NO_TICKET
if (!conf->session_tickets) {
SSL_CTX_set_options(conf->ssl.ctx, SSL_OP_NO_TICKET);
}
#endif
ngx_conf_merge_ptr_value(conf->session_ticket_keys,
prev->session_ticket_keys, NULL);
if (ngx_ssl_session_ticket_keys(cf, &conf->ssl, conf->session_ticket_keys)
!= NGX_OK)
{
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
static char *
ngx_stream_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_stream_ssl_conf_t *scf = conf;
ngx_str_t *value;
if (scf->passwords != NGX_CONF_UNSET_PTR) {
return "is duplicate";
}
value = cf->args->elts;
scf->passwords = ngx_ssl_read_password_file(cf, &value[1]);
if (scf->passwords == NULL) {
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
static char *
ngx_stream_ssl_session_cache(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_stream_ssl_conf_t *scf = conf;
size_t len;
ngx_str_t *value, name, size;
ngx_int_t n;
ngx_uint_t i, j;
value = cf->args->elts;
for (i = 1; i < cf->args->nelts; i++) {
if (ngx_strcmp(value[i].data, "off") == 0) {
scf->builtin_session_cache = NGX_SSL_NO_SCACHE;
continue;
}
if (ngx_strcmp(value[i].data, "none") == 0) {
scf->builtin_session_cache = NGX_SSL_NONE_SCACHE;
continue;
}
if (ngx_strcmp(value[i].data, "builtin") == 0) {
scf->builtin_session_cache = NGX_SSL_DFLT_BUILTIN_SCACHE;
continue;
}
if (value[i].len > sizeof("builtin:") - 1
&& ngx_strncmp(value[i].data, "builtin:", sizeof("builtin:") - 1)
== 0)
{
n = ngx_atoi(value[i].data + sizeof("builtin:") - 1,
value[i].len - (sizeof("builtin:") - 1));
if (n == NGX_ERROR) {
goto invalid;
}
scf->builtin_session_cache = n;
continue;
}
if (value[i].len > sizeof("shared:") - 1
&& ngx_strncmp(value[i].data, "shared:", sizeof("shared:") - 1)
== 0)
{
len = 0;
for (j = sizeof("shared:") - 1; j < value[i].len; j++) {
if (value[i].data[j] == ':') {
break;
}
len++;
}
if (len == 0) {
goto invalid;
}
name.len = len;
name.data = value[i].data + sizeof("shared:") - 1;
size.len = value[i].len - j - 1;
size.data = name.data + len + 1;
n = ngx_parse_size(&size);
if (n == NGX_ERROR) {
goto invalid;
}
if (n < (ngx_int_t) (8 * ngx_pagesize)) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"session cache \"%V\" is too small",
&value[i]);
return NGX_CONF_ERROR;
}
scf->shm_zone = ngx_shared_memory_add(cf, &name, n,
&ngx_stream_ssl_module);
if (scf->shm_zone == NULL) {
return NGX_CONF_ERROR;
}
scf->shm_zone->init = ngx_ssl_session_cache_init;
continue;
}
goto invalid;
}
if (scf->shm_zone && scf->builtin_session_cache == NGX_CONF_UNSET) {
scf->builtin_session_cache = NGX_SSL_NO_BUILTIN_SCACHE;
}
return NGX_CONF_OK;
invalid:
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid session cache \"%V\"", &value[i]);
return NGX_CONF_ERROR;
}

View File

@ -0,0 +1,49 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#ifndef _NGX_STREAM_SSL_H_INCLUDED_
#define _NGX_STREAM_SSL_H_INCLUDED_
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_stream.h>
typedef struct {
ngx_msec_t handshake_timeout;
ngx_flag_t prefer_server_ciphers;
ngx_ssl_t ssl;
ngx_uint_t protocols;
ssize_t builtin_session_cache;
time_t session_timeout;
ngx_str_t certificate;
ngx_str_t certificate_key;
ngx_str_t dhparam;
ngx_str_t ecdh_curve;
ngx_str_t ciphers;
ngx_array_t *passwords;
ngx_shm_zone_t *shm_zone;
ngx_flag_t session_tickets;
ngx_array_t *session_ticket_keys;
} ngx_stream_ssl_conf_t;
extern ngx_module_t ngx_stream_ssl_module;
#endif /* _NGX_STREAM_SSL_H_INCLUDED_ */

View File

@ -0,0 +1,462 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_stream.h>
static char *ngx_stream_upstream(ngx_conf_t *cf, ngx_command_t *cmd,
void *dummy);
static char *ngx_stream_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static void *ngx_stream_upstream_create_main_conf(ngx_conf_t *cf);
static char *ngx_stream_upstream_init_main_conf(ngx_conf_t *cf, void *conf);
static ngx_command_t ngx_stream_upstream_commands[] = {
{ ngx_string("upstream"),
NGX_STREAM_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_TAKE1,
ngx_stream_upstream,
0,
0,
NULL },
{ ngx_string("server"),
NGX_STREAM_UPS_CONF|NGX_CONF_1MORE,
ngx_stream_upstream_server,
NGX_STREAM_SRV_CONF_OFFSET,
0,
NULL },
ngx_null_command
};
static ngx_stream_module_t ngx_stream_upstream_module_ctx = {
ngx_stream_upstream_create_main_conf, /* create main configuration */
ngx_stream_upstream_init_main_conf, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
};
ngx_module_t ngx_stream_upstream_module = {
NGX_MODULE_V1,
&ngx_stream_upstream_module_ctx, /* module context */
ngx_stream_upstream_commands, /* module directives */
NGX_STREAM_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static char *
ngx_stream_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy)
{
char *rv;
void *mconf;
ngx_str_t *value;
ngx_url_t u;
ngx_uint_t m;
ngx_conf_t pcf;
ngx_stream_module_t *module;
ngx_stream_conf_ctx_t *ctx, *stream_ctx;
ngx_stream_upstream_srv_conf_t *uscf;
ngx_memzero(&u, sizeof(ngx_url_t));
value = cf->args->elts;
u.host = value[1];
u.no_resolve = 1;
u.no_port = 1;
uscf = ngx_stream_upstream_add(cf, &u, NGX_STREAM_UPSTREAM_CREATE
|NGX_STREAM_UPSTREAM_WEIGHT
|NGX_STREAM_UPSTREAM_MAX_FAILS
|NGX_STREAM_UPSTREAM_FAIL_TIMEOUT
|NGX_STREAM_UPSTREAM_DOWN
|NGX_STREAM_UPSTREAM_BACKUP);
if (uscf == NULL) {
return NGX_CONF_ERROR;
}
ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t));
if (ctx == NULL) {
return NGX_CONF_ERROR;
}
stream_ctx = cf->ctx;
ctx->main_conf = stream_ctx->main_conf;
/* the upstream{}'s srv_conf */
ctx->srv_conf = ngx_pcalloc(cf->pool,
sizeof(void *) * ngx_stream_max_module);
if (ctx->srv_conf == NULL) {
return NGX_CONF_ERROR;
}
ctx->srv_conf[ngx_stream_upstream_module.ctx_index] = uscf;
uscf->srv_conf = ctx->srv_conf;
for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_STREAM_MODULE) {
continue;
}
module = ngx_modules[m]->ctx;
if (module->create_srv_conf) {
mconf = module->create_srv_conf(cf);
if (mconf == NULL) {
return NGX_CONF_ERROR;
}
ctx->srv_conf[ngx_modules[m]->ctx_index] = mconf;
}
}
uscf->servers = ngx_array_create(cf->pool, 4,
sizeof(ngx_stream_upstream_server_t));
if (uscf->servers == NULL) {
return NGX_CONF_ERROR;
}
/* parse inside upstream{} */
pcf = *cf;
cf->ctx = ctx;
cf->cmd_type = NGX_STREAM_UPS_CONF;
rv = ngx_conf_parse(cf, NULL);
*cf = pcf;
if (rv != NGX_CONF_OK) {
return rv;
}
if (uscf->servers->nelts == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"no servers are inside upstream");
return NGX_CONF_ERROR;
}
return rv;
}
static char *
ngx_stream_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_stream_upstream_srv_conf_t *uscf = conf;
time_t fail_timeout;
ngx_str_t *value, s;
ngx_url_t u;
ngx_int_t weight, max_fails;
ngx_uint_t i;
ngx_stream_upstream_server_t *us;
us = ngx_array_push(uscf->servers);
if (us == NULL) {
return NGX_CONF_ERROR;
}
ngx_memzero(us, sizeof(ngx_stream_upstream_server_t));
value = cf->args->elts;
weight = 1;
max_fails = 1;
fail_timeout = 10;
for (i = 2; i < cf->args->nelts; i++) {
if (ngx_strncmp(value[i].data, "weight=", 7) == 0) {
if (!(uscf->flags & NGX_STREAM_UPSTREAM_WEIGHT)) {
goto not_supported;
}
weight = ngx_atoi(&value[i].data[7], value[i].len - 7);
if (weight == NGX_ERROR || weight == 0) {
goto invalid;
}
continue;
}
if (ngx_strncmp(value[i].data, "max_fails=", 10) == 0) {
if (!(uscf->flags & NGX_STREAM_UPSTREAM_MAX_FAILS)) {
goto not_supported;
}
max_fails = ngx_atoi(&value[i].data[10], value[i].len - 10);
if (max_fails == NGX_ERROR) {
goto invalid;
}
continue;
}
if (ngx_strncmp(value[i].data, "fail_timeout=", 13) == 0) {
if (!(uscf->flags & NGX_STREAM_UPSTREAM_FAIL_TIMEOUT)) {
goto not_supported;
}
s.len = value[i].len - 13;
s.data = &value[i].data[13];
fail_timeout = ngx_parse_time(&s, 1);
if (fail_timeout == (time_t) NGX_ERROR) {
goto invalid;
}
continue;
}
if (ngx_strcmp(value[i].data, "backup") == 0) {
if (!(uscf->flags & NGX_STREAM_UPSTREAM_BACKUP)) {
goto not_supported;
}
us->backup = 1;
continue;
}
if (ngx_strcmp(value[i].data, "down") == 0) {
if (!(uscf->flags & NGX_STREAM_UPSTREAM_DOWN)) {
goto not_supported;
}
us->down = 1;
continue;
}
goto invalid;
}
ngx_memzero(&u, sizeof(ngx_url_t));
u.url = value[1];
if (ngx_parse_url(cf->pool, &u) != NGX_OK) {
if (u.err) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"%s in upstream \"%V\"", u.err, &u.url);
}
return NGX_CONF_ERROR;
}
if (u.no_port) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"no port in upstream \"%V\"", &u.url);
return NGX_CONF_ERROR;
}
us->name = u.url;
us->addrs = u.addrs;
us->naddrs = u.naddrs;
us->weight = weight;
us->max_fails = max_fails;
us->fail_timeout = fail_timeout;
return NGX_CONF_OK;
invalid:
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid parameter \"%V\"", &value[i]);
return NGX_CONF_ERROR;
not_supported:
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"balancing method does not support parameter \"%V\"",
&value[i]);
return NGX_CONF_ERROR;
}
ngx_stream_upstream_srv_conf_t *
ngx_stream_upstream_add(ngx_conf_t *cf, ngx_url_t *u, ngx_uint_t flags)
{
ngx_uint_t i;
ngx_stream_upstream_server_t *us;
ngx_stream_upstream_srv_conf_t *uscf, **uscfp;
ngx_stream_upstream_main_conf_t *umcf;
if (!(flags & NGX_STREAM_UPSTREAM_CREATE)) {
if (ngx_parse_url(cf->pool, u) != NGX_OK) {
if (u->err) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"%s in upstream \"%V\"", u->err, &u->url);
}
return NULL;
}
}
umcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_upstream_module);
uscfp = umcf->upstreams.elts;
for (i = 0; i < umcf->upstreams.nelts; i++) {
if (uscfp[i]->host.len != u->host.len
|| ngx_strncasecmp(uscfp[i]->host.data, u->host.data, u->host.len)
!= 0)
{
continue;
}
if ((flags & NGX_STREAM_UPSTREAM_CREATE)
&& (uscfp[i]->flags & NGX_STREAM_UPSTREAM_CREATE))
{
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"duplicate upstream \"%V\"", &u->host);
return NULL;
}
if ((uscfp[i]->flags & NGX_STREAM_UPSTREAM_CREATE) && !u->no_port) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
"upstream \"%V\" may not have port %d",
&u->host, u->port);
return NULL;
}
if ((flags & NGX_STREAM_UPSTREAM_CREATE) && !uscfp[i]->no_port) {
ngx_log_error(NGX_LOG_WARN, cf->log, 0,
"upstream \"%V\" may not have port %d in %s:%ui",
&u->host, uscfp[i]->port,
uscfp[i]->file_name, uscfp[i]->line);
return NULL;
}
if (uscfp[i]->port != u->port) {
continue;
}
if (flags & NGX_STREAM_UPSTREAM_CREATE) {
uscfp[i]->flags = flags;
}
return uscfp[i];
}
uscf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_srv_conf_t));
if (uscf == NULL) {
return NULL;
}
uscf->flags = flags;
uscf->host = u->host;
uscf->file_name = cf->conf_file->file.name.data;
uscf->line = cf->conf_file->line;
uscf->port = u->port;
uscf->no_port = u->no_port;
if (u->naddrs == 1) {
uscf->servers = ngx_array_create(cf->pool, 1,
sizeof(ngx_stream_upstream_server_t));
if (uscf->servers == NULL) {
return NULL;
}
us = ngx_array_push(uscf->servers);
if (us == NULL) {
return NULL;
}
ngx_memzero(us, sizeof(ngx_stream_upstream_server_t));
us->addrs = u->addrs;
us->naddrs = 1;
}
uscfp = ngx_array_push(&umcf->upstreams);
if (uscfp == NULL) {
return NULL;
}
*uscfp = uscf;
return uscf;
}
static void *
ngx_stream_upstream_create_main_conf(ngx_conf_t *cf)
{
ngx_stream_upstream_main_conf_t *umcf;
umcf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_main_conf_t));
if (umcf == NULL) {
return NULL;
}
if (ngx_array_init(&umcf->upstreams, cf->pool, 4,
sizeof(ngx_stream_upstream_srv_conf_t *))
!= NGX_OK)
{
return NULL;
}
return umcf;
}
static char *
ngx_stream_upstream_init_main_conf(ngx_conf_t *cf, void *conf)
{
ngx_stream_upstream_main_conf_t *umcf = conf;
ngx_uint_t i;
ngx_stream_upstream_init_pt init;
ngx_stream_upstream_srv_conf_t **uscfp;
uscfp = umcf->upstreams.elts;
for (i = 0; i < umcf->upstreams.nelts; i++) {
init = uscfp[i]->peer.init_upstream
? uscfp[i]->peer.init_upstream
: ngx_stream_upstream_init_round_robin;
if (init(cf, uscfp[i]) != NGX_OK) {
return NGX_CONF_ERROR;
}
}
return NGX_CONF_OK;
}

View File

@ -0,0 +1,103 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#ifndef _NGX_STREAM_UPSTREAM_H_INCLUDED_
#define _NGX_STREAM_UPSTREAM_H_INCLUDED_
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_stream.h>
#include <ngx_event_connect.h>
#define NGX_STREAM_UPSTREAM_CREATE 0x0001
#define NGX_STREAM_UPSTREAM_WEIGHT 0x0002
#define NGX_STREAM_UPSTREAM_MAX_FAILS 0x0004
#define NGX_STREAM_UPSTREAM_FAIL_TIMEOUT 0x0008
#define NGX_STREAM_UPSTREAM_DOWN 0x0010
#define NGX_STREAM_UPSTREAM_BACKUP 0x0020
typedef struct {
ngx_array_t upstreams;
/* ngx_stream_upstream_srv_conf_t */
} ngx_stream_upstream_main_conf_t;
typedef struct ngx_stream_upstream_srv_conf_s ngx_stream_upstream_srv_conf_t;
typedef ngx_int_t (*ngx_stream_upstream_init_pt)(ngx_conf_t *cf,
ngx_stream_upstream_srv_conf_t *us);
typedef ngx_int_t (*ngx_stream_upstream_init_peer_pt)(ngx_stream_session_t *s,
ngx_stream_upstream_srv_conf_t *us);
typedef struct {
ngx_stream_upstream_init_pt init_upstream;
ngx_stream_upstream_init_peer_pt init;
void *data;
} ngx_stream_upstream_peer_t;
typedef struct {
ngx_str_t name;
ngx_addr_t *addrs;
ngx_uint_t naddrs;
ngx_uint_t weight;
ngx_uint_t max_fails;
time_t fail_timeout;
unsigned down:1;
unsigned backup:1;
} ngx_stream_upstream_server_t;
struct ngx_stream_upstream_srv_conf_s {
ngx_stream_upstream_peer_t peer;
void **srv_conf;
ngx_array_t *servers;
/* ngx_stream_upstream_server_t */
ngx_uint_t flags;
ngx_str_t host;
u_char *file_name;
ngx_uint_t line;
in_port_t port;
ngx_uint_t no_port; /* unsigned no_port:1 */
#if (NGX_STREAM_UPSTREAM_ZONE)
ngx_shm_zone_t *shm_zone;
#endif
};
typedef struct {
ngx_peer_connection_t peer;
ngx_buf_t downstream_buf;
ngx_buf_t upstream_buf;
off_t received;
#if (NGX_STREAM_SSL)
ngx_str_t ssl_name;
#endif
} ngx_stream_upstream_t;
ngx_stream_upstream_srv_conf_t *ngx_stream_upstream_add(ngx_conf_t *cf,
ngx_url_t *u, ngx_uint_t flags);
#define ngx_stream_conf_upstream_srv_conf(uscf, module) \
uscf->srv_conf[module.ctx_index]
extern ngx_module_t ngx_stream_upstream_module;
#endif /* _NGX_STREAM_UPSTREAM_H_INCLUDED_ */

View File

@ -0,0 +1,657 @@
/*
* Copyright (C) Roman Arutyunyan
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_stream.h>
typedef struct {
uint32_t hash;
ngx_str_t *server;
} ngx_stream_upstream_chash_point_t;
typedef struct {
ngx_uint_t number;
ngx_stream_upstream_chash_point_t point[1];
} ngx_stream_upstream_chash_points_t;
typedef struct {
ngx_stream_upstream_chash_points_t *points;
} ngx_stream_upstream_hash_srv_conf_t;
typedef struct {
/* the round robin data must be first */
ngx_stream_upstream_rr_peer_data_t rrp;
ngx_stream_upstream_hash_srv_conf_t *conf;
ngx_str_t key;
ngx_uint_t tries;
ngx_uint_t rehash;
uint32_t hash;
ngx_event_get_peer_pt get_rr_peer;
} ngx_stream_upstream_hash_peer_data_t;
static ngx_int_t ngx_stream_upstream_init_hash(ngx_conf_t *cf,
ngx_stream_upstream_srv_conf_t *us);
static ngx_int_t ngx_stream_upstream_init_hash_peer(ngx_stream_session_t *s,
ngx_stream_upstream_srv_conf_t *us);
static ngx_int_t ngx_stream_upstream_get_hash_peer(ngx_peer_connection_t *pc,
void *data);
static ngx_int_t ngx_stream_upstream_init_chash(ngx_conf_t *cf,
ngx_stream_upstream_srv_conf_t *us);
static int ngx_libc_cdecl
ngx_stream_upstream_chash_cmp_points(const void *one, const void *two);
static ngx_uint_t ngx_stream_upstream_find_chash_point(
ngx_stream_upstream_chash_points_t *points, uint32_t hash);
static ngx_int_t ngx_stream_upstream_init_chash_peer(ngx_stream_session_t *s,
ngx_stream_upstream_srv_conf_t *us);
static ngx_int_t ngx_stream_upstream_get_chash_peer(ngx_peer_connection_t *pc,
void *data);
static void *ngx_stream_upstream_hash_create_conf(ngx_conf_t *cf);
static char *ngx_stream_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static ngx_command_t ngx_stream_upstream_hash_commands[] = {
{ ngx_string("hash"),
NGX_STREAM_UPS_CONF|NGX_CONF_TAKE12,
ngx_stream_upstream_hash,
NGX_STREAM_SRV_CONF_OFFSET,
0,
NULL },
ngx_null_command
};
static ngx_stream_module_t ngx_stream_upstream_hash_module_ctx = {
NULL, /* create main configuration */
NULL, /* init main configuration */
ngx_stream_upstream_hash_create_conf, /* create server configuration */
NULL, /* merge server configuration */
};
ngx_module_t ngx_stream_upstream_hash_module = {
NGX_MODULE_V1,
&ngx_stream_upstream_hash_module_ctx, /* module context */
ngx_stream_upstream_hash_commands, /* module directives */
NGX_STREAM_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static ngx_int_t
ngx_stream_upstream_init_hash(ngx_conf_t *cf,
ngx_stream_upstream_srv_conf_t *us)
{
if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) {
return NGX_ERROR;
}
us->peer.init = ngx_stream_upstream_init_hash_peer;
return NGX_OK;
}
static ngx_int_t
ngx_stream_upstream_init_hash_peer(ngx_stream_session_t *s,
ngx_stream_upstream_srv_conf_t *us)
{
ngx_stream_upstream_hash_srv_conf_t *hcf;
ngx_stream_upstream_hash_peer_data_t *hp;
hp = ngx_palloc(s->connection->pool,
sizeof(ngx_stream_upstream_hash_peer_data_t));
if (hp == NULL) {
return NGX_ERROR;
}
s->upstream->peer.data = &hp->rrp;
if (ngx_stream_upstream_init_round_robin_peer(s, us) != NGX_OK) {
return NGX_ERROR;
}
s->upstream->peer.get = ngx_stream_upstream_get_hash_peer;
hcf = ngx_stream_conf_upstream_srv_conf(us,
ngx_stream_upstream_hash_module);
hp->key = s->connection->addr_text;
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
"upstream hash key:\"%V\"", &hp->key);
hp->conf = hcf;
hp->tries = 0;
hp->rehash = 0;
hp->hash = 0;
hp->get_rr_peer = ngx_stream_upstream_get_round_robin_peer;
return NGX_OK;
}
static ngx_int_t
ngx_stream_upstream_get_hash_peer(ngx_peer_connection_t *pc, void *data)
{
ngx_stream_upstream_hash_peer_data_t *hp = data;
time_t now;
u_char buf[NGX_INT_T_LEN];
size_t size;
uint32_t hash;
ngx_int_t w;
uintptr_t m;
ngx_uint_t i, n, p;
ngx_stream_upstream_rr_peer_t *peer;
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"get hash peer, try: %ui", pc->tries);
ngx_stream_upstream_rr_peers_wlock(hp->rrp.peers);
if (hp->tries > 20 || hp->rrp.peers->single) {
ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
return hp->get_rr_peer(pc, &hp->rrp);
}
now = ngx_time();
pc->connection = NULL;
for ( ;; ) {
/*
* Hash expression is compatible with Cache::Memcached:
* ((crc32([REHASH] KEY) >> 16) & 0x7fff) + PREV_HASH
* with REHASH omitted at the first iteration.
*/
ngx_crc32_init(hash);
if (hp->rehash > 0) {
size = ngx_sprintf(buf, "%ui", hp->rehash) - buf;
ngx_crc32_update(&hash, buf, size);
}
ngx_crc32_update(&hash, hp->key.data, hp->key.len);
ngx_crc32_final(hash);
hash = (hash >> 16) & 0x7fff;
hp->hash += hash;
hp->rehash++;
if (!hp->rrp.peers->weighted) {
p = hp->hash % hp->rrp.peers->number;
peer = hp->rrp.peers->peer;
for (i = 0; i < p; i++) {
peer = peer->next;
}
} else {
w = hp->hash % hp->rrp.peers->total_weight;
for (peer = hp->rrp.peers->peer, i = 0;
peer;
peer = peer->next, i++)
{
w -= peer->weight;
if (w < 0) {
break;
}
}
p = i;
}
n = p / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));
if (hp->rrp.tried[n] & m) {
goto next;
}
ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"get hash peer, value:%uD, peer:%ui", hp->hash, p);
if (peer->down) {
goto next;
}
if (peer->max_fails
&& peer->fails >= peer->max_fails
&& now - peer->checked <= peer->fail_timeout)
{
goto next;
}
break;
next:
if (++hp->tries > 20) {
ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
return hp->get_rr_peer(pc, &hp->rrp);
}
}
hp->rrp.current = peer;
pc->sockaddr = peer->sockaddr;
pc->socklen = peer->socklen;
pc->name = &peer->name;
peer->conns++;
if (now - peer->checked > peer->fail_timeout) {
peer->checked = now;
}
ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
hp->rrp.tried[n] |= m;
return NGX_OK;
}
static ngx_int_t
ngx_stream_upstream_init_chash(ngx_conf_t *cf,
ngx_stream_upstream_srv_conf_t *us)
{
u_char *host, *port, c;
size_t host_len, port_len, size;
uint32_t hash, base_hash, prev_hash;
ngx_str_t *server;
ngx_uint_t npoints, i, j;
ngx_stream_upstream_rr_peer_t *peer;
ngx_stream_upstream_rr_peers_t *peers;
ngx_stream_upstream_chash_points_t *points;
ngx_stream_upstream_hash_srv_conf_t *hcf;
if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) {
return NGX_ERROR;
}
us->peer.init = ngx_stream_upstream_init_chash_peer;
peers = us->peer.data;
npoints = peers->total_weight * 160;
size = sizeof(ngx_stream_upstream_chash_points_t)
+ sizeof(ngx_stream_upstream_chash_point_t) * (npoints - 1);
points = ngx_palloc(cf->pool, size);
if (points == NULL) {
return NGX_ERROR;
}
points->number = 0;
for (peer = peers->peer; peer; peer = peer->next) {
server = &peer->server;
/*
* Hash expression is compatible with Cache::Memcached::Fast:
* crc32(HOST \0 PORT PREV_HASH).
*/
if (server->len >= 5
&& ngx_strncasecmp(server->data, (u_char *) "unix:", 5) == 0)
{
host = server->data + 5;
host_len = server->len - 5;
port = NULL;
port_len = 0;
goto done;
}
for (j = 0; j < server->len; j++) {
c = server->data[server->len - j - 1];
if (c == ':') {
host = server->data;
host_len = server->len - j - 1;
port = server->data + server->len - j;
port_len = j;
goto done;
}
if (c < '0' || c > '9') {
break;
}
}
host = server->data;
host_len = server->len;
port = NULL;
port_len = 0;
done:
ngx_crc32_init(base_hash);
ngx_crc32_update(&base_hash, host, host_len);
ngx_crc32_update(&base_hash, (u_char *) "", 1);
ngx_crc32_update(&base_hash, port, port_len);
prev_hash = 0;
npoints = peer->weight * 160;
for (j = 0; j < npoints; j++) {
hash = base_hash;
ngx_crc32_update(&hash, (u_char *) &prev_hash, sizeof(uint32_t));
ngx_crc32_final(hash);
points->point[points->number].hash = hash;
points->point[points->number].server = server;
points->number++;
prev_hash = hash;
}
}
ngx_qsort(points->point,
points->number,
sizeof(ngx_stream_upstream_chash_point_t),
ngx_stream_upstream_chash_cmp_points);
for (i = 0, j = 1; j < points->number; j++) {
if (points->point[i].hash != points->point[j].hash) {
points->point[++i] = points->point[j];
}
}
points->number = i + 1;
hcf = ngx_stream_conf_upstream_srv_conf(us,
ngx_stream_upstream_hash_module);
hcf->points = points;
return NGX_OK;
}
static int ngx_libc_cdecl
ngx_stream_upstream_chash_cmp_points(const void *one, const void *two)
{
ngx_stream_upstream_chash_point_t *first =
(ngx_stream_upstream_chash_point_t *) one;
ngx_stream_upstream_chash_point_t *second =
(ngx_stream_upstream_chash_point_t *) two;
if (first->hash < second->hash) {
return -1;
} else if (first->hash > second->hash) {
return 1;
} else {
return 0;
}
}
static ngx_uint_t
ngx_stream_upstream_find_chash_point(ngx_stream_upstream_chash_points_t *points,
uint32_t hash)
{
ngx_uint_t i, j, k;
ngx_stream_upstream_chash_point_t *point;
/* find first point >= hash */
point = &points->point[0];
i = 0;
j = points->number;
while (i < j) {
k = (i + j) / 2;
if (hash > point[k].hash) {
i = k + 1;
} else if (hash < point[k].hash) {
j = k;
} else {
return k;
}
}
return i;
}
static ngx_int_t
ngx_stream_upstream_init_chash_peer(ngx_stream_session_t *s,
ngx_stream_upstream_srv_conf_t *us)
{
uint32_t hash;
ngx_stream_upstream_hash_srv_conf_t *hcf;
ngx_stream_upstream_hash_peer_data_t *hp;
if (ngx_stream_upstream_init_hash_peer(s, us) != NGX_OK) {
return NGX_ERROR;
}
s->upstream->peer.get = ngx_stream_upstream_get_chash_peer;
hp = s->upstream->peer.data;
hcf = ngx_stream_conf_upstream_srv_conf(us,
ngx_stream_upstream_hash_module);
hash = ngx_crc32_long(hp->key.data, hp->key.len);
ngx_stream_upstream_rr_peers_rlock(hp->rrp.peers);
hp->hash = ngx_stream_upstream_find_chash_point(hcf->points, hash);
ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
return NGX_OK;
}
static ngx_int_t
ngx_stream_upstream_get_chash_peer(ngx_peer_connection_t *pc, void *data)
{
ngx_stream_upstream_hash_peer_data_t *hp = data;
time_t now;
intptr_t m;
ngx_str_t *server;
ngx_int_t total;
ngx_uint_t i, n, best_i;
ngx_stream_upstream_rr_peer_t *peer, *best;
ngx_stream_upstream_chash_point_t *point;
ngx_stream_upstream_chash_points_t *points;
ngx_stream_upstream_hash_srv_conf_t *hcf;
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"get consistent hash peer, try: %ui", pc->tries);
ngx_stream_upstream_rr_peers_wlock(hp->rrp.peers);
pc->connection = NULL;
now = ngx_time();
hcf = hp->conf;
points = hcf->points;
point = &points->point[0];
for ( ;; ) {
server = point[hp->hash % points->number].server;
ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"consistent hash peer:%uD, server:\"%V\"",
hp->hash, server);
best = NULL;
best_i = 0;
total = 0;
for (peer = hp->rrp.peers->peer, i = 0;
peer;
peer = peer->next, i++)
{
n = i / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
if (hp->rrp.tried[n] & m) {
continue;
}
if (peer->down) {
continue;
}
if (peer->server.len != server->len
|| ngx_strncmp(peer->server.data, server->data, server->len)
!= 0)
{
continue;
}
if (peer->max_fails
&& peer->fails >= peer->max_fails
&& now - peer->checked <= peer->fail_timeout)
{
continue;
}
peer->current_weight += peer->effective_weight;
total += peer->effective_weight;
if (peer->effective_weight < peer->weight) {
peer->effective_weight++;
}
if (best == NULL || peer->current_weight > best->current_weight) {
best = peer;
best_i = i;
}
}
if (best) {
best->current_weight -= total;
break;
}
hp->hash++;
hp->tries++;
if (hp->tries >= points->number) {
ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
return NGX_BUSY;
}
}
hp->rrp.current = best;
pc->sockaddr = best->sockaddr;
pc->socklen = best->socklen;
pc->name = &best->name;
best->conns++;
if (now - best->checked > best->fail_timeout) {
best->checked = now;
}
ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
n = best_i / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << best_i % (8 * sizeof(uintptr_t));
hp->rrp.tried[n] |= m;
return NGX_OK;
}
static void *
ngx_stream_upstream_hash_create_conf(ngx_conf_t *cf)
{
ngx_stream_upstream_hash_srv_conf_t *conf;
conf = ngx_palloc(cf->pool, sizeof(ngx_stream_upstream_hash_srv_conf_t));
if (conf == NULL) {
return NULL;
}
conf->points = NULL;
return conf;
}
static char *
ngx_stream_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_str_t *value;
ngx_stream_upstream_srv_conf_t *uscf;
value = cf->args->elts;
if (ngx_strcmp(value[1].data, "$remote_addr")) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"unsupported hash key \"%V\", use $remote_addr",
&value[1]);
return NGX_CONF_ERROR;
}
uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module);
if (uscf->peer.init_upstream) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
"load balancing method redefined");
}
uscf->flags = NGX_STREAM_UPSTREAM_CREATE
|NGX_STREAM_UPSTREAM_WEIGHT
|NGX_STREAM_UPSTREAM_MAX_FAILS
|NGX_STREAM_UPSTREAM_FAIL_TIMEOUT
|NGX_STREAM_UPSTREAM_DOWN;
if (cf->args->nelts == 2) {
uscf->peer.init_upstream = ngx_stream_upstream_init_hash;
} else if (ngx_strcmp(value[2].data, "consistent") == 0) {
uscf->peer.init_upstream = ngx_stream_upstream_init_chash;
} else {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid parameter \"%V\"", &value[2]);
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}

View File

@ -0,0 +1,305 @@
/*
* Copyright (C) Maxim Dounin
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_stream.h>
static ngx_int_t ngx_stream_upstream_init_least_conn_peer(
ngx_stream_session_t *s, ngx_stream_upstream_srv_conf_t *us);
static ngx_int_t ngx_stream_upstream_get_least_conn_peer(
ngx_peer_connection_t *pc, void *data);
static char *ngx_stream_upstream_least_conn(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static ngx_command_t ngx_stream_upstream_least_conn_commands[] = {
{ ngx_string("least_conn"),
NGX_STREAM_UPS_CONF|NGX_CONF_NOARGS,
ngx_stream_upstream_least_conn,
0,
0,
NULL },
ngx_null_command
};
static ngx_stream_module_t ngx_stream_upstream_least_conn_module_ctx = {
NULL, /* create main configuration */
NULL, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
};
ngx_module_t ngx_stream_upstream_least_conn_module = {
NGX_MODULE_V1,
&ngx_stream_upstream_least_conn_module_ctx, /* module context */
ngx_stream_upstream_least_conn_commands, /* module directives */
NGX_STREAM_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static ngx_int_t
ngx_stream_upstream_init_least_conn(ngx_conf_t *cf,
ngx_stream_upstream_srv_conf_t *us)
{
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, cf->log, 0,
"init least conn");
if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) {
return NGX_ERROR;
}
us->peer.init = ngx_stream_upstream_init_least_conn_peer;
return NGX_OK;
}
static ngx_int_t
ngx_stream_upstream_init_least_conn_peer(ngx_stream_session_t *s,
ngx_stream_upstream_srv_conf_t *us)
{
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
"init least conn peer");
if (ngx_stream_upstream_init_round_robin_peer(s, us) != NGX_OK) {
return NGX_ERROR;
}
s->upstream->peer.get = ngx_stream_upstream_get_least_conn_peer;
return NGX_OK;
}
static ngx_int_t
ngx_stream_upstream_get_least_conn_peer(ngx_peer_connection_t *pc, void *data)
{
ngx_stream_upstream_rr_peer_data_t *rrp = data;
time_t now;
uintptr_t m;
ngx_int_t rc, total;
ngx_uint_t i, n, p, many;
ngx_stream_upstream_rr_peer_t *peer, *best;
ngx_stream_upstream_rr_peers_t *peers;
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"get least conn peer, try: %ui", pc->tries);
if (rrp->peers->single) {
return ngx_stream_upstream_get_round_robin_peer(pc, rrp);
}
pc->connection = NULL;
now = ngx_time();
peers = rrp->peers;
ngx_stream_upstream_rr_peers_wlock(peers);
best = NULL;
total = 0;
#if (NGX_SUPPRESS_WARN)
many = 0;
p = 0;
#endif
for (peer = peers->peer, i = 0;
peer;
peer = peer->next, i++)
{
n = i / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
if (rrp->tried[n] & m) {
continue;
}
if (peer->down) {
continue;
}
if (peer->max_fails
&& peer->fails >= peer->max_fails
&& now - peer->checked <= peer->fail_timeout)
{
continue;
}
/*
* select peer with least number of connections; if there are
* multiple peers with the same number of connections, select
* based on round-robin
*/
if (best == NULL
|| peer->conns * best->weight < best->conns * peer->weight)
{
best = peer;
many = 0;
p = i;
} else if (peer->conns * best->weight == best->conns * peer->weight) {
many = 1;
}
}
if (best == NULL) {
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"get least conn peer, no peer found");
goto failed;
}
if (many) {
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"get least conn peer, many");
for (peer = best, i = p;
peer;
peer = peer->next, i++)
{
n = i / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
if (rrp->tried[n] & m) {
continue;
}
if (peer->down) {
continue;
}
if (peer->conns * best->weight != best->conns * peer->weight) {
continue;
}
if (peer->max_fails
&& peer->fails >= peer->max_fails
&& now - peer->checked <= peer->fail_timeout)
{
continue;
}
peer->current_weight += peer->effective_weight;
total += peer->effective_weight;
if (peer->effective_weight < peer->weight) {
peer->effective_weight++;
}
if (peer->current_weight > best->current_weight) {
best = peer;
p = i;
}
}
}
best->current_weight -= total;
if (now - best->checked > best->fail_timeout) {
best->checked = now;
}
pc->sockaddr = best->sockaddr;
pc->socklen = best->socklen;
pc->name = &best->name;
best->conns++;
rrp->current = best;
n = p / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));
rrp->tried[n] |= m;
ngx_stream_upstream_rr_peers_unlock(peers);
return NGX_OK;
failed:
if (peers->next) {
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"get least conn peer, backup servers");
rrp->peers = peers->next;
n = (rrp->peers->number + (8 * sizeof(uintptr_t) - 1))
/ (8 * sizeof(uintptr_t));
for (i = 0; i < n; i++) {
rrp->tried[i] = 0;
}
ngx_stream_upstream_rr_peers_unlock(peers);
rc = ngx_stream_upstream_get_least_conn_peer(pc, rrp);
if (rc != NGX_BUSY) {
return rc;
}
ngx_stream_upstream_rr_peers_wlock(peers);
}
/* all peers failed, mark them as live for quick recovery */
for (peer = peers->peer; peer; peer = peer->next) {
peer->fails = 0;
}
ngx_stream_upstream_rr_peers_unlock(peers);
pc->name = peers->name;
return NGX_BUSY;
}
static char *
ngx_stream_upstream_least_conn(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_stream_upstream_srv_conf_t *uscf;
uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module);
if (uscf->peer.init_upstream) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
"load balancing method redefined");
}
uscf->peer.init_upstream = ngx_stream_upstream_init_least_conn;
uscf->flags = NGX_STREAM_UPSTREAM_CREATE
|NGX_STREAM_UPSTREAM_WEIGHT
|NGX_STREAM_UPSTREAM_MAX_FAILS
|NGX_STREAM_UPSTREAM_FAIL_TIMEOUT
|NGX_STREAM_UPSTREAM_DOWN
|NGX_STREAM_UPSTREAM_BACKUP;
return NGX_CONF_OK;
}

View File

@ -0,0 +1,697 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_stream.h>
#define ngx_stream_upstream_tries(p) ((p)->number \
+ ((p)->next ? (p)->next->number : 0))
static ngx_stream_upstream_rr_peer_t *ngx_stream_upstream_get_peer(
ngx_stream_upstream_rr_peer_data_t *rrp);
#if (NGX_STREAM_SSL)
static ngx_int_t ngx_stream_upstream_set_round_robin_peer_session(
ngx_peer_connection_t *pc, void *data);
static void ngx_stream_upstream_save_round_robin_peer_session(
ngx_peer_connection_t *pc, void *data);
#endif
ngx_int_t
ngx_stream_upstream_init_round_robin(ngx_conf_t *cf,
ngx_stream_upstream_srv_conf_t *us)
{
ngx_url_t u;
ngx_uint_t i, j, n, w;
ngx_stream_upstream_server_t *server;
ngx_stream_upstream_rr_peer_t *peer, **peerp;
ngx_stream_upstream_rr_peers_t *peers, *backup;
us->peer.init = ngx_stream_upstream_init_round_robin_peer;
if (us->servers) {
server = us->servers->elts;
n = 0;
w = 0;
for (i = 0; i < us->servers->nelts; i++) {
if (server[i].backup) {
continue;
}
n += server[i].naddrs;
w += server[i].naddrs * server[i].weight;
}
if (n == 0) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"no servers in upstream \"%V\" in %s:%ui",
&us->host, us->file_name, us->line);
return NGX_ERROR;
}
peers = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peers_t));
if (peers == NULL) {
return NGX_ERROR;
}
peer = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peer_t) * n);
if (peer == NULL) {
return NGX_ERROR;
}
peers->single = (n == 1);
peers->number = n;
peers->weighted = (w != n);
peers->total_weight = w;
peers->name = &us->host;
n = 0;
peerp = &peers->peer;
for (i = 0; i < us->servers->nelts; i++) {
if (server[i].backup) {
continue;
}
for (j = 0; j < server[i].naddrs; j++) {
peer[n].sockaddr = server[i].addrs[j].sockaddr;
peer[n].socklen = server[i].addrs[j].socklen;
peer[n].name = server[i].addrs[j].name;
peer[n].weight = server[i].weight;
peer[n].effective_weight = server[i].weight;
peer[n].current_weight = 0;
peer[n].max_fails = server[i].max_fails;
peer[n].fail_timeout = server[i].fail_timeout;
peer[n].down = server[i].down;
peer[n].server = server[i].name;
*peerp = &peer[n];
peerp = &peer[n].next;
n++;
}
}
us->peer.data = peers;
/* backup servers */
n = 0;
w = 0;
for (i = 0; i < us->servers->nelts; i++) {
if (!server[i].backup) {
continue;
}
n += server[i].naddrs;
w += server[i].naddrs * server[i].weight;
}
if (n == 0) {
return NGX_OK;
}
backup = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peers_t));
if (backup == NULL) {
return NGX_ERROR;
}
peer = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peer_t) * n);
if (peer == NULL) {
return NGX_ERROR;
}
peers->single = 0;
backup->single = 0;
backup->number = n;
backup->weighted = (w != n);
backup->total_weight = w;
backup->name = &us->host;
n = 0;
peerp = &backup->peer;
for (i = 0; i < us->servers->nelts; i++) {
if (!server[i].backup) {
continue;
}
for (j = 0; j < server[i].naddrs; j++) {
peer[n].sockaddr = server[i].addrs[j].sockaddr;
peer[n].socklen = server[i].addrs[j].socklen;
peer[n].name = server[i].addrs[j].name;
peer[n].weight = server[i].weight;
peer[n].effective_weight = server[i].weight;
peer[n].current_weight = 0;
peer[n].max_fails = server[i].max_fails;
peer[n].fail_timeout = server[i].fail_timeout;
peer[n].down = server[i].down;
peer[n].server = server[i].name;
*peerp = &peer[n];
peerp = &peer[n].next;
n++;
}
}
peers->next = backup;
return NGX_OK;
}
/* an upstream implicitly defined by proxy_pass, etc. */
if (us->port == 0) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"no port in upstream \"%V\" in %s:%ui",
&us->host, us->file_name, us->line);
return NGX_ERROR;
}
ngx_memzero(&u, sizeof(ngx_url_t));
u.host = us->host;
u.port = us->port;
if (ngx_inet_resolve_host(cf->pool, &u) != NGX_OK) {
if (u.err) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"%s in upstream \"%V\" in %s:%ui",
u.err, &us->host, us->file_name, us->line);
}
return NGX_ERROR;
}
n = u.naddrs;
peers = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peers_t));
if (peers == NULL) {
return NGX_ERROR;
}
peer = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peer_t) * n);
if (peer == NULL) {
return NGX_ERROR;
}
peers->single = (n == 1);
peers->number = n;
peers->weighted = 0;
peers->total_weight = n;
peers->name = &us->host;
peerp = &peers->peer;
for (i = 0; i < u.naddrs; i++) {
peer[i].sockaddr = u.addrs[i].sockaddr;
peer[i].socklen = u.addrs[i].socklen;
peer[i].name = u.addrs[i].name;
peer[i].weight = 1;
peer[i].effective_weight = 1;
peer[i].current_weight = 0;
peer[i].max_fails = 1;
peer[i].fail_timeout = 10;
*peerp = &peer[i];
peerp = &peer[i].next;
}
us->peer.data = peers;
/* implicitly defined upstream has no backup servers */
return NGX_OK;
}
ngx_int_t
ngx_stream_upstream_init_round_robin_peer(ngx_stream_session_t *s,
ngx_stream_upstream_srv_conf_t *us)
{
ngx_uint_t n;
ngx_stream_upstream_rr_peer_data_t *rrp;
rrp = s->upstream->peer.data;
if (rrp == NULL) {
rrp = ngx_palloc(s->connection->pool,
sizeof(ngx_stream_upstream_rr_peer_data_t));
if (rrp == NULL) {
return NGX_ERROR;
}
s->upstream->peer.data = rrp;
}
rrp->peers = us->peer.data;
rrp->current = NULL;
n = rrp->peers->number;
if (rrp->peers->next && rrp->peers->next->number > n) {
n = rrp->peers->next->number;
}
if (n <= 8 * sizeof(uintptr_t)) {
rrp->tried = &rrp->data;
rrp->data = 0;
} else {
n = (n + (8 * sizeof(uintptr_t) - 1)) / (8 * sizeof(uintptr_t));
rrp->tried = ngx_pcalloc(s->connection->pool, n * sizeof(uintptr_t));
if (rrp->tried == NULL) {
return NGX_ERROR;
}
}
s->upstream->peer.get = ngx_stream_upstream_get_round_robin_peer;
s->upstream->peer.free = ngx_stream_upstream_free_round_robin_peer;
s->upstream->peer.tries = ngx_stream_upstream_tries(rrp->peers);
#if (NGX_STREAM_SSL)
s->upstream->peer.set_session =
ngx_stream_upstream_set_round_robin_peer_session;
s->upstream->peer.save_session =
ngx_stream_upstream_save_round_robin_peer_session;
#endif
return NGX_OK;
}
ngx_int_t
ngx_stream_upstream_get_round_robin_peer(ngx_peer_connection_t *pc, void *data)
{
ngx_stream_upstream_rr_peer_data_t *rrp = data;
ngx_int_t rc;
ngx_uint_t i, n;
ngx_stream_upstream_rr_peer_t *peer;
ngx_stream_upstream_rr_peers_t *peers;
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"get rr peer, try: %ui", pc->tries);
pc->connection = NULL;
peers = rrp->peers;
ngx_stream_upstream_rr_peers_wlock(peers);
if (peers->single) {
peer = peers->peer;
if (peer->down) {
goto failed;
}
rrp->current = peer;
} else {
/* there are several peers */
peer = ngx_stream_upstream_get_peer(rrp);
if (peer == NULL) {
goto failed;
}
ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"get rr peer, current: %p %i",
peer, peer->current_weight);
}
pc->sockaddr = peer->sockaddr;
pc->socklen = peer->socklen;
pc->name = &peer->name;
peer->conns++;
ngx_stream_upstream_rr_peers_unlock(peers);
return NGX_OK;
failed:
if (peers->next) {
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "backup servers");
rrp->peers = peers->next;
n = (rrp->peers->number + (8 * sizeof(uintptr_t) - 1))
/ (8 * sizeof(uintptr_t));
for (i = 0; i < n; i++) {
rrp->tried[i] = 0;
}
ngx_stream_upstream_rr_peers_unlock(peers);
rc = ngx_stream_upstream_get_round_robin_peer(pc, rrp);
if (rc != NGX_BUSY) {
return rc;
}
ngx_stream_upstream_rr_peers_wlock(peers);
}
/* all peers failed, mark them as live for quick recovery */
for (peer = peers->peer; peer; peer = peer->next) {
peer->fails = 0;
}
ngx_stream_upstream_rr_peers_unlock(peers);
pc->name = peers->name;
return NGX_BUSY;
}
static ngx_stream_upstream_rr_peer_t *
ngx_stream_upstream_get_peer(ngx_stream_upstream_rr_peer_data_t *rrp)
{
time_t now;
uintptr_t m;
ngx_int_t total;
ngx_uint_t i, n, p;
ngx_stream_upstream_rr_peer_t *peer, *best;
now = ngx_time();
best = NULL;
total = 0;
#if (NGX_SUPPRESS_WARN)
p = 0;
#endif
for (peer = rrp->peers->peer, i = 0;
peer;
peer = peer->next, i++)
{
n = i / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
if (rrp->tried[n] & m) {
continue;
}
if (peer->down) {
continue;
}
if (peer->max_fails
&& peer->fails >= peer->max_fails
&& now - peer->checked <= peer->fail_timeout)
{
continue;
}
peer->current_weight += peer->effective_weight;
total += peer->effective_weight;
if (peer->effective_weight < peer->weight) {
peer->effective_weight++;
}
if (best == NULL || peer->current_weight > best->current_weight) {
best = peer;
p = i;
}
}
if (best == NULL) {
return NULL;
}
rrp->current = best;
n = p / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));
rrp->tried[n] |= m;
best->current_weight -= total;
if (now - best->checked > best->fail_timeout) {
best->checked = now;
}
return best;
}
void
ngx_stream_upstream_free_round_robin_peer(ngx_peer_connection_t *pc, void *data,
ngx_uint_t state)
{
ngx_stream_upstream_rr_peer_data_t *rrp = data;
time_t now;
ngx_stream_upstream_rr_peer_t *peer;
ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"free rr peer %ui %ui", pc->tries, state);
peer = rrp->current;
ngx_stream_upstream_rr_peers_rlock(rrp->peers);
ngx_stream_upstream_rr_peer_lock(rrp->peers, peer);
if (rrp->peers->single) {
peer->conns--;
ngx_stream_upstream_rr_peer_unlock(rrp->peers, peer);
ngx_stream_upstream_rr_peers_unlock(rrp->peers);
pc->tries = 0;
return;
}
if (state & NGX_PEER_FAILED) {
now = ngx_time();
peer->fails++;
peer->accessed = now;
peer->checked = now;
if (peer->max_fails) {
peer->effective_weight -= peer->weight / peer->max_fails;
}
ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"free rr peer failed: %p %i",
peer, peer->effective_weight);
if (peer->effective_weight < 0) {
peer->effective_weight = 0;
}
} else {
/* mark peer live if check passed */
if (peer->accessed < peer->checked) {
peer->fails = 0;
}
}
peer->conns--;
ngx_stream_upstream_rr_peer_unlock(rrp->peers, peer);
ngx_stream_upstream_rr_peers_unlock(rrp->peers);
if (pc->tries) {
pc->tries--;
}
}
#if (NGX_STREAM_SSL)
static ngx_int_t
ngx_stream_upstream_set_round_robin_peer_session(ngx_peer_connection_t *pc,
void *data)
{
ngx_stream_upstream_rr_peer_data_t *rrp = data;
ngx_int_t rc;
ngx_ssl_session_t *ssl_session;
ngx_stream_upstream_rr_peer_t *peer;
#if (NGX_STREAM_UPSTREAM_ZONE)
int len;
#if OPENSSL_VERSION_NUMBER >= 0x0090707fL
const
#endif
u_char *p;
ngx_stream_upstream_rr_peers_t *peers;
u_char buf[NGX_SSL_MAX_SESSION_SIZE];
#endif
peer = rrp->current;
#if (NGX_STREAM_UPSTREAM_ZONE)
peers = rrp->peers;
if (peers->shpool) {
ngx_stream_upstream_rr_peers_rlock(peers);
ngx_stream_upstream_rr_peer_lock(peers, peer);
if (peer->ssl_session == NULL) {
ngx_stream_upstream_rr_peer_unlock(peers, peer);
ngx_stream_upstream_rr_peers_unlock(peers);
return NGX_OK;
}
len = peer->ssl_session_len;
ngx_memcpy(buf, peer->ssl_session, len);
ngx_stream_upstream_rr_peer_unlock(peers, peer);
ngx_stream_upstream_rr_peers_unlock(peers);
p = buf;
ssl_session = d2i_SSL_SESSION(NULL, &p, len);
rc = ngx_ssl_set_session(pc->connection, ssl_session);
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"set session: %p", ssl_session);
ngx_ssl_free_session(ssl_session);
return rc;
}
#endif
ssl_session = peer->ssl_session;
rc = ngx_ssl_set_session(pc->connection, ssl_session);
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"set session: %p", ssl_session);
return rc;
}
static void
ngx_stream_upstream_save_round_robin_peer_session(ngx_peer_connection_t *pc,
void *data)
{
ngx_stream_upstream_rr_peer_data_t *rrp = data;
ngx_ssl_session_t *old_ssl_session, *ssl_session;
ngx_stream_upstream_rr_peer_t *peer;
#if (NGX_STREAM_UPSTREAM_ZONE)
int len;
u_char *p;
ngx_stream_upstream_rr_peers_t *peers;
u_char buf[NGX_SSL_MAX_SESSION_SIZE];
#endif
#if (NGX_STREAM_UPSTREAM_ZONE)
peers = rrp->peers;
if (peers->shpool) {
ssl_session = SSL_get0_session(pc->connection->ssl->connection);
if (ssl_session == NULL) {
return;
}
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"save session: %p", ssl_session);
len = i2d_SSL_SESSION(ssl_session, NULL);
/* do not cache too big session */
if (len > NGX_SSL_MAX_SESSION_SIZE) {
return;
}
p = buf;
(void) i2d_SSL_SESSION(ssl_session, &p);
peer = rrp->current;
ngx_stream_upstream_rr_peers_rlock(peers);
ngx_stream_upstream_rr_peer_lock(peers, peer);
if (len > peer->ssl_session_len) {
ngx_shmtx_lock(&peers->shpool->mutex);
if (peer->ssl_session) {
ngx_slab_free_locked(peers->shpool, peer->ssl_session);
}
peer->ssl_session = ngx_slab_alloc_locked(peers->shpool, len);
ngx_shmtx_unlock(&peers->shpool->mutex);
if (peer->ssl_session == NULL) {
peer->ssl_session_len = 0;
ngx_stream_upstream_rr_peer_unlock(peers, peer);
ngx_stream_upstream_rr_peers_unlock(peers);
return;
}
peer->ssl_session_len = len;
}
ngx_memcpy(peer->ssl_session, buf, len);
ngx_stream_upstream_rr_peer_unlock(peers, peer);
ngx_stream_upstream_rr_peers_unlock(peers);
return;
}
#endif
ssl_session = ngx_ssl_get_session(pc->connection);
if (ssl_session == NULL) {
return;
}
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"save session: %p", ssl_session);
peer = rrp->current;
old_ssl_session = peer->ssl_session;
peer->ssl_session = ssl_session;
if (old_ssl_session) {
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
"old session: %p", old_ssl_session);
/* TODO: may block */
ngx_ssl_free_session(old_ssl_session);
}
}
#endif

View File

@ -0,0 +1,138 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#ifndef _NGX_STREAM_UPSTREAM_ROUND_ROBIN_H_INCLUDED_
#define _NGX_STREAM_UPSTREAM_ROUND_ROBIN_H_INCLUDED_
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_stream.h>
typedef struct ngx_stream_upstream_rr_peer_s ngx_stream_upstream_rr_peer_t;
struct ngx_stream_upstream_rr_peer_s {
struct sockaddr *sockaddr;
socklen_t socklen;
ngx_str_t name;
ngx_str_t server;
ngx_int_t current_weight;
ngx_int_t effective_weight;
ngx_int_t weight;
ngx_uint_t conns;
ngx_uint_t fails;
time_t accessed;
time_t checked;
ngx_uint_t max_fails;
time_t fail_timeout;
ngx_uint_t down; /* unsigned down:1; */
#if (NGX_STREAM_SSL)
void *ssl_session;
int ssl_session_len;
#endif
ngx_stream_upstream_rr_peer_t *next;
#if (NGX_STREAM_UPSTREAM_ZONE)
ngx_atomic_t lock;
#endif
};
typedef struct ngx_stream_upstream_rr_peers_s ngx_stream_upstream_rr_peers_t;
struct ngx_stream_upstream_rr_peers_s {
ngx_uint_t number;
#if (NGX_STREAM_UPSTREAM_ZONE)
ngx_slab_pool_t *shpool;
ngx_atomic_t rwlock;
#endif
ngx_uint_t total_weight;
unsigned single:1;
unsigned weighted:1;
ngx_str_t *name;
ngx_stream_upstream_rr_peers_t *next;
ngx_stream_upstream_rr_peer_t *peer;
};
#if (NGX_STREAM_UPSTREAM_ZONE)
#define ngx_stream_upstream_rr_peers_rlock(peers) \
\
if (peers->shpool) { \
ngx_rwlock_rlock(&peers->rwlock); \
}
#define ngx_stream_upstream_rr_peers_wlock(peers) \
\
if (peers->shpool) { \
ngx_rwlock_wlock(&peers->rwlock); \
}
#define ngx_stream_upstream_rr_peers_unlock(peers) \
\
if (peers->shpool) { \
ngx_rwlock_unlock(&peers->rwlock); \
}
#define ngx_stream_upstream_rr_peer_lock(peers, peer) \
\
if (peers->shpool) { \
ngx_rwlock_wlock(&peer->lock); \
}
#define ngx_stream_upstream_rr_peer_unlock(peers, peer) \
\
if (peers->shpool) { \
ngx_rwlock_unlock(&peer->lock); \
}
#else
#define ngx_stream_upstream_rr_peers_rlock(peers)
#define ngx_stream_upstream_rr_peers_wlock(peers)
#define ngx_stream_upstream_rr_peers_unlock(peers)
#define ngx_stream_upstream_rr_peer_lock(peers, peer)
#define ngx_stream_upstream_rr_peer_unlock(peers, peer)
#endif
typedef struct {
ngx_stream_upstream_rr_peers_t *peers;
ngx_stream_upstream_rr_peer_t *current;
uintptr_t *tried;
uintptr_t data;
} ngx_stream_upstream_rr_peer_data_t;
ngx_int_t ngx_stream_upstream_init_round_robin(ngx_conf_t *cf,
ngx_stream_upstream_srv_conf_t *us);
ngx_int_t ngx_stream_upstream_init_round_robin_peer(ngx_stream_session_t *s,
ngx_stream_upstream_srv_conf_t *us);
ngx_int_t ngx_stream_upstream_get_round_robin_peer(ngx_peer_connection_t *pc,
void *data);
void ngx_stream_upstream_free_round_robin_peer(ngx_peer_connection_t *pc,
void *data, ngx_uint_t state);
#endif /* _NGX_STREAM_UPSTREAM_ROUND_ROBIN_H_INCLUDED_ */

View File

@ -0,0 +1,207 @@
/*
* Copyright (C) Ruslan Ermilov
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_stream.h>
static char *ngx_stream_upstream_zone(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static ngx_int_t ngx_stream_upstream_init_zone(ngx_shm_zone_t *shm_zone,
void *data);
static ngx_command_t ngx_stream_upstream_zone_commands[] = {
{ ngx_string("zone"),
NGX_STREAM_UPS_CONF|NGX_CONF_TAKE2,
ngx_stream_upstream_zone,
0,
0,
NULL },
ngx_null_command
};
static ngx_stream_module_t ngx_stream_upstream_zone_module_ctx = {
NULL, /* create main configuration */
NULL, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
};
ngx_module_t ngx_stream_upstream_zone_module = {
NGX_MODULE_V1,
&ngx_stream_upstream_zone_module_ctx, /* module context */
ngx_stream_upstream_zone_commands, /* module directives */
NGX_STREAM_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static char *
ngx_stream_upstream_zone(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ssize_t size;
ngx_str_t *value;
ngx_stream_upstream_srv_conf_t *uscf;
uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module);
value = cf->args->elts;
if (!value[1].len) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid zone name \"%V\"", &value[1]);
return NGX_CONF_ERROR;
}
size = ngx_parse_size(&value[2]);
if (size == NGX_ERROR) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid zone size \"%V\"", &value[2]);
return NGX_CONF_ERROR;
}
if (size < (ssize_t) (8 * ngx_pagesize)) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"zone \"%V\" is too small", &value[1]);
return NGX_CONF_ERROR;
}
uscf->shm_zone = ngx_shared_memory_add(cf, &value[1], size,
&ngx_stream_upstream_module);
if (uscf->shm_zone == NULL) {
return NGX_CONF_ERROR;
}
if (uscf->shm_zone->data) {
uscf = uscf->shm_zone->data;
ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
"upstream \"%V\" in %s:%ui "
"is already bound to zone \"%V\"",
&uscf->host, uscf->file_name, uscf->line,
&value[1]);
return NGX_CONF_ERROR;
}
uscf->shm_zone->init = ngx_stream_upstream_init_zone;
uscf->shm_zone->data = uscf;
uscf->shm_zone->noreuse = 1;
return NGX_CONF_OK;
}
static ngx_int_t
ngx_stream_upstream_init_zone(ngx_shm_zone_t *shm_zone, void *data)
{
ngx_stream_upstream_srv_conf_t *ouscf = data;
size_t len;
ngx_slab_pool_t *shpool;
ngx_stream_upstream_rr_peer_t *peer, **peerp;
ngx_stream_upstream_rr_peers_t *peers, *backup;
ngx_stream_upstream_srv_conf_t *uscf;
uscf = shm_zone->data;
if (ouscf) {
ngx_log_error(NGX_LOG_EMERG, shm_zone->shm.log, 0,
"zone \"%V\" cannot be reused", &shm_zone->shm.name);
return NGX_ERROR;
}
shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
if (shm_zone->shm.exists) {
return NGX_ERROR;
}
/* copy peers to shared memory */
len = sizeof(" in upstream zone \"\"") + shm_zone->shm.name.len;
shpool->log_ctx = ngx_slab_alloc(shpool, len);
if (shpool->log_ctx == NULL) {
return NGX_ERROR;
}
ngx_sprintf(shpool->log_ctx, " in upstream zone \"%V\"%Z",
&shm_zone->shm.name);
peers = ngx_slab_alloc(shpool, sizeof(ngx_stream_upstream_rr_peers_t));
if (peers == NULL) {
return NGX_ERROR;
}
ngx_memcpy(peers, uscf->peer.data, sizeof(ngx_stream_upstream_rr_peers_t));
peers->shpool = shpool;
for (peerp = &peers->peer; *peerp; peerp = &peer->next) {
/* pool is unlocked */
peer = ngx_slab_calloc_locked(shpool,
sizeof(ngx_stream_upstream_rr_peer_t));
if (peer == NULL) {
return NGX_ERROR;
}
ngx_memcpy(peer, *peerp, sizeof(ngx_stream_upstream_rr_peer_t));
*peerp = peer;
}
if (peers->next == NULL) {
goto done;
}
backup = ngx_slab_alloc(shpool, sizeof(ngx_stream_upstream_rr_peers_t));
if (backup == NULL) {
return NGX_ERROR;
}
ngx_memcpy(backup, peers->next, sizeof(ngx_stream_upstream_rr_peers_t));
backup->shpool = shpool;
for (peerp = &backup->peer; *peerp; peerp = &peer->next) {
/* pool is unlocked */
peer = ngx_slab_calloc_locked(shpool,
sizeof(ngx_stream_upstream_rr_peer_t));
if (peer == NULL) {
return NGX_ERROR;
}
ngx_memcpy(peer, *peerp, sizeof(ngx_stream_upstream_rr_peer_t));
*peerp = peer;
}
peers->next = backup;
done:
uscf->peer.data = peers;
return NGX_OK;
}