mirror of
https://github.com/nginx/nginx.git
synced 2025-06-07 17:52:38 +08:00
Thread pools implementation.
This commit is contained in:
parent
08e05a4042
commit
305fc021db
1
auto/configure
vendored
1
auto/configure
vendored
@ -58,6 +58,7 @@ if [ "$NGX_PLATFORM" != win32 ]; then
|
|||||||
. auto/unix
|
. auto/unix
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
. auto/threads
|
||||||
. auto/modules
|
. auto/modules
|
||||||
. auto/lib/conf
|
. auto/lib/conf
|
||||||
|
|
||||||
|
@ -432,6 +432,12 @@ fi
|
|||||||
modules="$CORE_MODULES $EVENT_MODULES"
|
modules="$CORE_MODULES $EVENT_MODULES"
|
||||||
|
|
||||||
|
|
||||||
|
# thread pool module should be initialized after events
|
||||||
|
if [ $USE_THREADS = YES ]; then
|
||||||
|
modules="$modules $THREAD_POOL_MODULE"
|
||||||
|
fi
|
||||||
|
|
||||||
|
|
||||||
if [ $USE_OPENSSL = YES ]; then
|
if [ $USE_OPENSSL = YES ]; then
|
||||||
modules="$modules $OPENSSL_MODULE"
|
modules="$modules $OPENSSL_MODULE"
|
||||||
CORE_DEPS="$CORE_DEPS $OPENSSL_DEPS"
|
CORE_DEPS="$CORE_DEPS $OPENSSL_DEPS"
|
||||||
|
@ -190,6 +190,8 @@ do
|
|||||||
--without-poll_module) EVENT_POLL=NONE ;;
|
--without-poll_module) EVENT_POLL=NONE ;;
|
||||||
--with-aio_module) EVENT_AIO=YES ;;
|
--with-aio_module) EVENT_AIO=YES ;;
|
||||||
|
|
||||||
|
--with-threads) USE_THREADS=YES ;;
|
||||||
|
|
||||||
--with-file-aio) NGX_FILE_AIO=YES ;;
|
--with-file-aio) NGX_FILE_AIO=YES ;;
|
||||||
--with-ipv6) NGX_IPV6=YES ;;
|
--with-ipv6) NGX_IPV6=YES ;;
|
||||||
|
|
||||||
@ -351,6 +353,8 @@ cat << END
|
|||||||
--with-poll_module enable poll module
|
--with-poll_module enable poll module
|
||||||
--without-poll_module disable poll module
|
--without-poll_module disable poll module
|
||||||
|
|
||||||
|
--with-threads enable thread pool support
|
||||||
|
|
||||||
--with-file-aio enable file AIO support
|
--with-file-aio enable file AIO support
|
||||||
--with-ipv6 enable IPv6 support
|
--with-ipv6 enable IPv6 support
|
||||||
|
|
||||||
|
@ -193,6 +193,13 @@ UNIX_SRCS="$CORE_SRCS $EVENT_SRCS \
|
|||||||
|
|
||||||
POSIX_DEPS=src/os/unix/ngx_posix_config.h
|
POSIX_DEPS=src/os/unix/ngx_posix_config.h
|
||||||
|
|
||||||
|
THREAD_POOL_MODULE=ngx_thread_pool_module
|
||||||
|
THREAD_POOL_DEPS=src/core/ngx_thread_pool.h
|
||||||
|
THREAD_POOL_SRCS="src/core/ngx_thread_pool.c
|
||||||
|
src/os/unix/ngx_thread_cond.c
|
||||||
|
src/os/unix/ngx_thread_mutex.c
|
||||||
|
src/os/unix/ngx_thread_id.c"
|
||||||
|
|
||||||
FREEBSD_DEPS="src/os/unix/ngx_freebsd_config.h src/os/unix/ngx_freebsd.h"
|
FREEBSD_DEPS="src/os/unix/ngx_freebsd_config.h src/os/unix/ngx_freebsd.h"
|
||||||
FREEBSD_SRCS=src/os/unix/ngx_freebsd_init.c
|
FREEBSD_SRCS=src/os/unix/ngx_freebsd_init.c
|
||||||
FREEBSD_SENDFILE_SRCS=src/os/unix/ngx_freebsd_sendfile_chain.c
|
FREEBSD_SENDFILE_SRCS=src/os/unix/ngx_freebsd_sendfile_chain.c
|
||||||
|
@ -7,6 +7,10 @@ echo
|
|||||||
echo "Configuration summary"
|
echo "Configuration summary"
|
||||||
|
|
||||||
|
|
||||||
|
if [ $USE_THREADS = YES ]; then
|
||||||
|
echo " + using threads"
|
||||||
|
fi
|
||||||
|
|
||||||
if [ $USE_PCRE = DISABLED ]; then
|
if [ $USE_PCRE = DISABLED ]; then
|
||||||
echo " + PCRE library is disabled"
|
echo " + PCRE library is disabled"
|
||||||
|
|
||||||
|
20
auto/threads
Normal file
20
auto/threads
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
|
||||||
|
# Copyright (C) Nginx, Inc.
|
||||||
|
|
||||||
|
|
||||||
|
if [ $USE_THREADS = YES ]; then
|
||||||
|
|
||||||
|
if [ "$NGX_PLATFORM" = win32 ]; then
|
||||||
|
cat << END
|
||||||
|
|
||||||
|
$0: --with-threads is not supported on Windows
|
||||||
|
|
||||||
|
END
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
have=NGX_THREADS . auto/have
|
||||||
|
CORE_DEPS="$CORE_DEPS $THREAD_POOL_DEPS"
|
||||||
|
CORE_SRCS="$CORE_SRCS $THREAD_POOL_SRCS"
|
||||||
|
CORE_LIBS="$CORE_LIBS -lpthread"
|
||||||
|
fi
|
@ -22,6 +22,10 @@ typedef struct ngx_event_s ngx_event_t;
|
|||||||
typedef struct ngx_event_aio_s ngx_event_aio_t;
|
typedef struct ngx_event_aio_s ngx_event_aio_t;
|
||||||
typedef struct ngx_connection_s ngx_connection_t;
|
typedef struct ngx_connection_s ngx_connection_t;
|
||||||
|
|
||||||
|
#if (NGX_THREADS)
|
||||||
|
typedef struct ngx_thread_task_s ngx_thread_task_t;
|
||||||
|
#endif
|
||||||
|
|
||||||
typedef void (*ngx_event_handler_pt)(ngx_event_t *ev);
|
typedef void (*ngx_event_handler_pt)(ngx_event_t *ev);
|
||||||
typedef void (*ngx_connection_handler_pt)(ngx_connection_t *c);
|
typedef void (*ngx_connection_handler_pt)(ngx_connection_t *c);
|
||||||
|
|
||||||
|
631
src/core/ngx_thread_pool.c
Normal file
631
src/core/ngx_thread_pool.c
Normal file
@ -0,0 +1,631 @@
|
|||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright (C) Nginx, Inc.
|
||||||
|
* Copyright (C) Valentin V. Bartenev
|
||||||
|
* Copyright (C) Ruslan Ermilov
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
#include <ngx_config.h>
|
||||||
|
#include <ngx_core.h>
|
||||||
|
#include <ngx_thread_pool.h>
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
ngx_array_t pools;
|
||||||
|
} ngx_thread_pool_conf_t;
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
ngx_thread_mutex_t mtx;
|
||||||
|
ngx_uint_t count;
|
||||||
|
ngx_thread_task_t *first;
|
||||||
|
ngx_thread_task_t **last;
|
||||||
|
} ngx_thread_pool_queue_t;
|
||||||
|
|
||||||
|
|
||||||
|
struct ngx_thread_pool_s {
|
||||||
|
ngx_thread_cond_t cond;
|
||||||
|
|
||||||
|
ngx_thread_pool_queue_t queue;
|
||||||
|
|
||||||
|
ngx_log_t *log;
|
||||||
|
ngx_pool_t *pool;
|
||||||
|
|
||||||
|
ngx_str_t name;
|
||||||
|
ngx_uint_t threads;
|
||||||
|
ngx_uint_t max_queue;
|
||||||
|
|
||||||
|
u_char *file;
|
||||||
|
ngx_uint_t line;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log,
|
||||||
|
ngx_pool_t *pool);
|
||||||
|
static ngx_int_t ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue,
|
||||||
|
ngx_log_t *log);
|
||||||
|
static ngx_int_t ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue,
|
||||||
|
ngx_log_t *log);
|
||||||
|
static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp);
|
||||||
|
|
||||||
|
static void *ngx_thread_pool_cycle(void *data);
|
||||||
|
static void ngx_thread_pool_handler(ngx_event_t *ev);
|
||||||
|
|
||||||
|
static char *ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
|
||||||
|
|
||||||
|
static void *ngx_thread_pool_create_conf(ngx_cycle_t *cycle);
|
||||||
|
static char *ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf);
|
||||||
|
|
||||||
|
static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle);
|
||||||
|
static void ngx_thread_pool_exit_worker(ngx_cycle_t *cycle);
|
||||||
|
|
||||||
|
|
||||||
|
static ngx_command_t ngx_thread_pool_commands[] = {
|
||||||
|
|
||||||
|
{ ngx_string("thread_pool"),
|
||||||
|
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23,
|
||||||
|
ngx_thread_pool,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
NULL },
|
||||||
|
|
||||||
|
ngx_null_command
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static ngx_core_module_t ngx_thread_pool_module_ctx = {
|
||||||
|
ngx_string("thread_pool"),
|
||||||
|
ngx_thread_pool_create_conf,
|
||||||
|
ngx_thread_pool_init_conf
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
ngx_module_t ngx_thread_pool_module = {
|
||||||
|
NGX_MODULE_V1,
|
||||||
|
&ngx_thread_pool_module_ctx, /* module context */
|
||||||
|
ngx_thread_pool_commands, /* module directives */
|
||||||
|
NGX_CORE_MODULE, /* module type */
|
||||||
|
NULL, /* init master */
|
||||||
|
NULL, /* init module */
|
||||||
|
ngx_thread_pool_init_worker, /* init process */
|
||||||
|
NULL, /* init thread */
|
||||||
|
NULL, /* exit thread */
|
||||||
|
ngx_thread_pool_exit_worker, /* exit process */
|
||||||
|
NULL, /* exit master */
|
||||||
|
NGX_MODULE_V1_PADDING
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static ngx_str_t ngx_thread_pool_default = ngx_string("default");
|
||||||
|
|
||||||
|
static ngx_uint_t ngx_thread_pool_task_id;
|
||||||
|
static ngx_thread_pool_queue_t ngx_thread_pool_done;
|
||||||
|
|
||||||
|
|
||||||
|
static ngx_int_t
|
||||||
|
ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
|
||||||
|
{
|
||||||
|
int err;
|
||||||
|
pthread_t tid;
|
||||||
|
ngx_uint_t n;
|
||||||
|
pthread_attr_t attr;
|
||||||
|
|
||||||
|
if (ngx_notify == NULL) {
|
||||||
|
ngx_log_error(NGX_LOG_ALERT, log, 0,
|
||||||
|
"the configured event method cannot be used with thread pools");
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ngx_thread_pool_queue_init(&tp->queue, log) != NGX_OK) {
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
|
||||||
|
(void) ngx_thread_pool_queue_destroy(&tp->queue, log);
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
tp->log = log;
|
||||||
|
tp->pool = pool;
|
||||||
|
|
||||||
|
err = pthread_attr_init(&attr);
|
||||||
|
if (err) {
|
||||||
|
ngx_log_error(NGX_LOG_ALERT, log, err,
|
||||||
|
"pthread_attr_init() failed");
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
|
||||||
|
if (err) {
|
||||||
|
ngx_log_error(NGX_LOG_ALERT, log, err,
|
||||||
|
"pthread_attr_setstacksize() failed");
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
for (n = 0; n < tp->threads; n++) {
|
||||||
|
err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
|
||||||
|
if (err) {
|
||||||
|
ngx_log_error(NGX_LOG_ALERT, log, err,
|
||||||
|
"pthread_create() failed");
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(void) pthread_attr_destroy(&attr);
|
||||||
|
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static ngx_int_t
|
||||||
|
ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue, ngx_log_t *log)
|
||||||
|
{
|
||||||
|
queue->count = 0;
|
||||||
|
queue->first = NULL;
|
||||||
|
queue->last = &queue->first;
|
||||||
|
|
||||||
|
return ngx_thread_mutex_create(&queue->mtx, log);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static ngx_int_t
|
||||||
|
ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue, ngx_log_t *log)
|
||||||
|
{
|
||||||
|
return ngx_thread_mutex_destroy(&queue->mtx, log);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
|
||||||
|
{
|
||||||
|
/* TODO: exit threads */
|
||||||
|
|
||||||
|
(void) ngx_thread_cond_destroy(&tp->cond, tp->log);
|
||||||
|
(void) ngx_thread_pool_queue_destroy(&tp->queue, tp->log);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ngx_thread_task_t *
|
||||||
|
ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
|
||||||
|
{
|
||||||
|
ngx_thread_task_t *task;
|
||||||
|
|
||||||
|
task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size);
|
||||||
|
if (task == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
task->ctx = task + 1;
|
||||||
|
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ngx_int_t
|
||||||
|
ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
|
||||||
|
{
|
||||||
|
if (task->event.active) {
|
||||||
|
ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
|
||||||
|
"task #%ui already active", task->id);
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) {
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tp->queue.count >= tp->max_queue) {
|
||||||
|
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
|
||||||
|
|
||||||
|
ngx_log_error(NGX_LOG_ERR, tp->log, 0,
|
||||||
|
"thread pool \"%V\" queue overflow: %ui tasks waiting",
|
||||||
|
&tp->name, tp->queue.count);
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
task->event.active = 1;
|
||||||
|
|
||||||
|
task->id = ngx_thread_pool_task_id++;
|
||||||
|
task->next = NULL;
|
||||||
|
|
||||||
|
if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
|
||||||
|
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
*tp->queue.last = task;
|
||||||
|
tp->queue.last = &task->next;
|
||||||
|
|
||||||
|
tp->queue.count++;
|
||||||
|
|
||||||
|
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
|
||||||
|
|
||||||
|
ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
|
||||||
|
"task #%ui added to thread pool \"%V\"",
|
||||||
|
task->id, &tp->name);
|
||||||
|
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void *
|
||||||
|
ngx_thread_pool_cycle(void *data)
|
||||||
|
{
|
||||||
|
ngx_thread_pool_t *tp = data;
|
||||||
|
|
||||||
|
int err;
|
||||||
|
sigset_t set;
|
||||||
|
ngx_thread_task_t *task;
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
ngx_time_update();
|
||||||
|
#endif
|
||||||
|
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0,
|
||||||
|
"thread in pool \"%V\" started", &tp->name);
|
||||||
|
|
||||||
|
sigfillset(&set);
|
||||||
|
|
||||||
|
sigdelset(&set, SIGILL);
|
||||||
|
sigdelset(&set, SIGFPE);
|
||||||
|
sigdelset(&set, SIGSEGV);
|
||||||
|
sigdelset(&set, SIGBUS);
|
||||||
|
|
||||||
|
err = pthread_sigmask(SIG_BLOCK, &set, NULL);
|
||||||
|
if (err) {
|
||||||
|
ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
for ( ;; ) {
|
||||||
|
if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (tp->queue.count == 0) {
|
||||||
|
if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log)
|
||||||
|
!= NGX_OK)
|
||||||
|
{
|
||||||
|
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tp->queue.count--;
|
||||||
|
|
||||||
|
task = tp->queue.first;
|
||||||
|
tp->queue.first = task->next;
|
||||||
|
|
||||||
|
if (tp->queue.first == NULL) {
|
||||||
|
tp->queue.last = &tp->queue.first;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log) != NGX_OK) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
ngx_time_update();
|
||||||
|
#endif
|
||||||
|
|
||||||
|
ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
|
||||||
|
"run task #%ui in thread pool \"%V\"",
|
||||||
|
task->id, &tp->name);
|
||||||
|
|
||||||
|
task->handler(task->ctx, tp->log);
|
||||||
|
|
||||||
|
ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
|
||||||
|
"complete task #%ui in thread pool \"%V\"",
|
||||||
|
task->id, &tp->name);
|
||||||
|
|
||||||
|
task->next = NULL;
|
||||||
|
|
||||||
|
if (ngx_thread_mutex_lock(&ngx_thread_pool_done.mtx, tp->log)
|
||||||
|
!= NGX_OK)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ngx_thread_pool_done.last = task;
|
||||||
|
ngx_thread_pool_done.last = &task->next;
|
||||||
|
|
||||||
|
if (ngx_thread_mutex_unlock(&ngx_thread_pool_done.mtx, tp->log)
|
||||||
|
!= NGX_OK)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
(void) ngx_notify(ngx_thread_pool_handler);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
ngx_thread_pool_handler(ngx_event_t *ev)
|
||||||
|
{
|
||||||
|
ngx_event_t *event;
|
||||||
|
ngx_thread_task_t *task;
|
||||||
|
|
||||||
|
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler");
|
||||||
|
|
||||||
|
if (ngx_thread_mutex_lock(&ngx_thread_pool_done.mtx, ev->log) != NGX_OK) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
task = ngx_thread_pool_done.first;
|
||||||
|
ngx_thread_pool_done.first = NULL;
|
||||||
|
ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
|
||||||
|
|
||||||
|
if (ngx_thread_mutex_unlock(&ngx_thread_pool_done.mtx, ev->log) != NGX_OK) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (task) {
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
|
||||||
|
"run completion handler for task #%ui", task->id);
|
||||||
|
|
||||||
|
event = &task->event;
|
||||||
|
task = task->next;
|
||||||
|
|
||||||
|
event->complete = 1;
|
||||||
|
event->active = 0;
|
||||||
|
|
||||||
|
event->handler(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void *
|
||||||
|
ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
|
||||||
|
{
|
||||||
|
ngx_thread_pool_conf_t *tcf;
|
||||||
|
|
||||||
|
tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
|
||||||
|
if (tcf == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ngx_array_init(&tcf->pools, cycle->pool, 4,
|
||||||
|
sizeof(ngx_thread_pool_t *))
|
||||||
|
!= NGX_OK)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return tcf;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static char *
|
||||||
|
ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
|
||||||
|
{
|
||||||
|
ngx_thread_pool_conf_t *tcf = conf;
|
||||||
|
|
||||||
|
ngx_uint_t i;
|
||||||
|
ngx_thread_pool_t **tpp;
|
||||||
|
|
||||||
|
tpp = tcf->pools.elts;
|
||||||
|
|
||||||
|
for (i = 0; i < tcf->pools.nelts; i++) {
|
||||||
|
|
||||||
|
if (tpp[i]->threads) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tpp[i]->name.len == ngx_thread_pool_default.len
|
||||||
|
&& ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data,
|
||||||
|
ngx_thread_pool_default.len)
|
||||||
|
== 0)
|
||||||
|
{
|
||||||
|
tpp[i]->threads = 32;
|
||||||
|
tpp[i]->max_queue = 65536;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
|
||||||
|
"unknown thread pool \"%V\" in %s:%ui",
|
||||||
|
&tpp[i]->name, tpp[i]->file, tpp[i]->line);
|
||||||
|
|
||||||
|
return NGX_CONF_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static char *
|
||||||
|
ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
|
||||||
|
{
|
||||||
|
ngx_str_t *value;
|
||||||
|
ngx_uint_t i;
|
||||||
|
ngx_thread_pool_t *tp;
|
||||||
|
|
||||||
|
value = cf->args->elts;
|
||||||
|
|
||||||
|
tp = ngx_thread_pool_add(cf, &value[1]);
|
||||||
|
|
||||||
|
if (tp == NULL) {
|
||||||
|
return NGX_CONF_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tp->threads) {
|
||||||
|
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
|
||||||
|
"duplicate thread pool \"%V\"", &tp->name);
|
||||||
|
return NGX_CONF_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
tp->max_queue = 65536;
|
||||||
|
|
||||||
|
for (i = 2; i < cf->args->nelts; i++) {
|
||||||
|
|
||||||
|
if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {
|
||||||
|
|
||||||
|
tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8);
|
||||||
|
|
||||||
|
if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) {
|
||||||
|
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
|
||||||
|
"invalid threads value \"%V\"", &value[i]);
|
||||||
|
return NGX_CONF_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {
|
||||||
|
|
||||||
|
tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10);
|
||||||
|
|
||||||
|
if (tp->max_queue == (ngx_uint_t) NGX_ERROR) {
|
||||||
|
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
|
||||||
|
"invalid max_queue value \"%V\"", &value[i]);
|
||||||
|
return NGX_CONF_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tp->threads == 0) {
|
||||||
|
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
|
||||||
|
"\"%V\" must have \"threads\" parameter",
|
||||||
|
&cmd->name);
|
||||||
|
return NGX_CONF_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ngx_thread_pool_t *
|
||||||
|
ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name)
|
||||||
|
{
|
||||||
|
ngx_thread_pool_t *tp, **tpp;
|
||||||
|
ngx_thread_pool_conf_t *tcf;
|
||||||
|
|
||||||
|
if (name == NULL) {
|
||||||
|
name = &ngx_thread_pool_default;
|
||||||
|
}
|
||||||
|
|
||||||
|
tp = ngx_thread_pool_get(cf->cycle, name);
|
||||||
|
|
||||||
|
if (tp) {
|
||||||
|
return tp;
|
||||||
|
}
|
||||||
|
|
||||||
|
tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t));
|
||||||
|
if (tp == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tp->name = *name;
|
||||||
|
tp->file = cf->conf_file->file.name.data;
|
||||||
|
tp->line = cf->conf_file->line;
|
||||||
|
|
||||||
|
tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
|
||||||
|
ngx_thread_pool_module);
|
||||||
|
|
||||||
|
tpp = ngx_array_push(&tcf->pools);
|
||||||
|
if (tpp == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
*tpp = tp;
|
||||||
|
|
||||||
|
return tp;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ngx_thread_pool_t *
|
||||||
|
ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name)
|
||||||
|
{
|
||||||
|
ngx_uint_t i;
|
||||||
|
ngx_thread_pool_t **tpp;
|
||||||
|
ngx_thread_pool_conf_t *tcf;
|
||||||
|
|
||||||
|
tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
|
||||||
|
ngx_thread_pool_module);
|
||||||
|
|
||||||
|
tpp = tcf->pools.elts;
|
||||||
|
|
||||||
|
for (i = 0; i < tcf->pools.nelts; i++) {
|
||||||
|
|
||||||
|
if (tpp[i]->name.len == name->len
|
||||||
|
&& ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0)
|
||||||
|
{
|
||||||
|
return tpp[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static ngx_int_t
|
||||||
|
ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
|
||||||
|
{
|
||||||
|
ngx_uint_t i;
|
||||||
|
ngx_thread_pool_t **tpp;
|
||||||
|
ngx_thread_pool_conf_t *tcf;
|
||||||
|
|
||||||
|
if (ngx_process != NGX_PROCESS_WORKER
|
||||||
|
&& ngx_process != NGX_PROCESS_SINGLE)
|
||||||
|
{
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
|
||||||
|
ngx_thread_pool_module);
|
||||||
|
|
||||||
|
if (tcf == NULL) {
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ngx_thread_pool_queue_init(&ngx_thread_pool_done, cycle->log)
|
||||||
|
!= NGX_OK)
|
||||||
|
{
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
tpp = tcf->pools.elts;
|
||||||
|
|
||||||
|
for (i = 0; i < tcf->pools.nelts; i++) {
|
||||||
|
if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
ngx_thread_pool_exit_worker(ngx_cycle_t *cycle)
|
||||||
|
{
|
||||||
|
ngx_uint_t i;
|
||||||
|
ngx_thread_pool_t **tpp;
|
||||||
|
ngx_thread_pool_conf_t *tcf;
|
||||||
|
|
||||||
|
if (ngx_process != NGX_PROCESS_WORKER
|
||||||
|
&& ngx_process != NGX_PROCESS_SINGLE)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
|
||||||
|
ngx_thread_pool_module);
|
||||||
|
|
||||||
|
if (tcf == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tpp = tcf->pools.elts;
|
||||||
|
|
||||||
|
for (i = 0; i < tcf->pools.nelts; i++) {
|
||||||
|
ngx_thread_pool_destroy(tpp[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
(void) ngx_thread_pool_queue_destroy(&ngx_thread_pool_done, cycle->log);
|
||||||
|
}
|
36
src/core/ngx_thread_pool.h
Normal file
36
src/core/ngx_thread_pool.h
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright (C) Nginx, Inc.
|
||||||
|
* Copyright (C) Valentin V. Bartenev
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
#ifndef _NGX_THREAD_POOL_H_INCLUDED_
|
||||||
|
#define _NGX_THREAD_POOL_H_INCLUDED_
|
||||||
|
|
||||||
|
|
||||||
|
#include <ngx_config.h>
|
||||||
|
#include <ngx_core.h>
|
||||||
|
#include <ngx_event.h>
|
||||||
|
|
||||||
|
|
||||||
|
struct ngx_thread_task_s {
|
||||||
|
ngx_thread_task_t *next;
|
||||||
|
ngx_uint_t id;
|
||||||
|
void *ctx;
|
||||||
|
void (*handler)(void *data, ngx_log_t *log);
|
||||||
|
ngx_event_t event;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct ngx_thread_pool_s ngx_thread_pool_t;
|
||||||
|
|
||||||
|
|
||||||
|
ngx_thread_pool_t *ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name);
|
||||||
|
ngx_thread_pool_t *ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name);
|
||||||
|
|
||||||
|
ngx_thread_task_t *ngx_thread_task_alloc(ngx_pool_t *pool, size_t size);
|
||||||
|
ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task);
|
||||||
|
|
||||||
|
|
||||||
|
#endif /* _NGX_THREAD_POOL_H_INCLUDED_ */
|
@ -48,6 +48,7 @@ ngx_event_module_t ngx_aio_module_ctx = {
|
|||||||
NULL, /* disable an event */
|
NULL, /* disable an event */
|
||||||
NULL, /* add an connection */
|
NULL, /* add an connection */
|
||||||
ngx_aio_del_connection, /* delete an connection */
|
ngx_aio_del_connection, /* delete an connection */
|
||||||
|
NULL, /* trigger a notify */
|
||||||
NULL, /* process the changes */
|
NULL, /* process the changes */
|
||||||
ngx_aio_process_events, /* process the events */
|
ngx_aio_process_events, /* process the events */
|
||||||
ngx_aio_init, /* init the events */
|
ngx_aio_init, /* init the events */
|
||||||
|
@ -88,6 +88,7 @@ ngx_event_module_t ngx_devpoll_module_ctx = {
|
|||||||
ngx_devpoll_del_event, /* disable an event */
|
ngx_devpoll_del_event, /* disable an event */
|
||||||
NULL, /* add an connection */
|
NULL, /* add an connection */
|
||||||
NULL, /* delete an connection */
|
NULL, /* delete an connection */
|
||||||
|
NULL, /* trigger a notify */
|
||||||
NULL, /* process the changes */
|
NULL, /* process the changes */
|
||||||
ngx_devpoll_process_events, /* process the events */
|
ngx_devpoll_process_events, /* process the events */
|
||||||
ngx_devpoll_init, /* init the events */
|
ngx_devpoll_init, /* init the events */
|
||||||
|
@ -164,6 +164,7 @@ ngx_event_module_t ngx_epoll_module_ctx = {
|
|||||||
ngx_epoll_del_event, /* disable an event */
|
ngx_epoll_del_event, /* disable an event */
|
||||||
ngx_epoll_add_connection, /* add an connection */
|
ngx_epoll_add_connection, /* add an connection */
|
||||||
ngx_epoll_del_connection, /* delete an connection */
|
ngx_epoll_del_connection, /* delete an connection */
|
||||||
|
NULL, /* trigger a notify */
|
||||||
NULL, /* process the changes */
|
NULL, /* process the changes */
|
||||||
ngx_epoll_process_events, /* process the events */
|
ngx_epoll_process_events, /* process the events */
|
||||||
ngx_epoll_init, /* init the events */
|
ngx_epoll_init, /* init the events */
|
||||||
|
@ -172,6 +172,7 @@ ngx_event_module_t ngx_eventport_module_ctx = {
|
|||||||
ngx_eventport_del_event, /* disable an event */
|
ngx_eventport_del_event, /* disable an event */
|
||||||
NULL, /* add an connection */
|
NULL, /* add an connection */
|
||||||
NULL, /* delete an connection */
|
NULL, /* delete an connection */
|
||||||
|
NULL, /* trigger a notify */
|
||||||
NULL, /* process the changes */
|
NULL, /* process the changes */
|
||||||
ngx_eventport_process_events, /* process the events */
|
ngx_eventport_process_events, /* process the events */
|
||||||
ngx_eventport_init, /* init the events */
|
ngx_eventport_init, /* init the events */
|
||||||
|
@ -64,6 +64,7 @@ ngx_event_module_t ngx_iocp_module_ctx = {
|
|||||||
NULL, /* disable an event */
|
NULL, /* disable an event */
|
||||||
NULL, /* add an connection */
|
NULL, /* add an connection */
|
||||||
ngx_iocp_del_connection, /* delete an connection */
|
ngx_iocp_del_connection, /* delete an connection */
|
||||||
|
NULL, /* trigger a notify */
|
||||||
NULL, /* process the changes */
|
NULL, /* process the changes */
|
||||||
ngx_iocp_process_events, /* process the events */
|
ngx_iocp_process_events, /* process the events */
|
||||||
ngx_iocp_init, /* init the events */
|
ngx_iocp_init, /* init the events */
|
||||||
|
@ -89,6 +89,7 @@ ngx_event_module_t ngx_kqueue_module_ctx = {
|
|||||||
ngx_kqueue_del_event, /* disable an event */
|
ngx_kqueue_del_event, /* disable an event */
|
||||||
NULL, /* add an connection */
|
NULL, /* add an connection */
|
||||||
NULL, /* delete an connection */
|
NULL, /* delete an connection */
|
||||||
|
NULL, /* trigger a notify */
|
||||||
ngx_kqueue_process_changes, /* process the changes */
|
ngx_kqueue_process_changes, /* process the changes */
|
||||||
ngx_kqueue_process_events, /* process the events */
|
ngx_kqueue_process_events, /* process the events */
|
||||||
ngx_kqueue_init, /* init the events */
|
ngx_kqueue_init, /* init the events */
|
||||||
|
@ -39,6 +39,7 @@ ngx_event_module_t ngx_poll_module_ctx = {
|
|||||||
ngx_poll_del_event, /* disable an event */
|
ngx_poll_del_event, /* disable an event */
|
||||||
NULL, /* add an connection */
|
NULL, /* add an connection */
|
||||||
NULL, /* delete an connection */
|
NULL, /* delete an connection */
|
||||||
|
NULL, /* trigger a notify */
|
||||||
NULL, /* process the changes */
|
NULL, /* process the changes */
|
||||||
ngx_poll_process_events, /* process the events */
|
ngx_poll_process_events, /* process the events */
|
||||||
ngx_poll_init, /* init the events */
|
ngx_poll_init, /* init the events */
|
||||||
|
@ -130,6 +130,7 @@ ngx_event_module_t ngx_rtsig_module_ctx = {
|
|||||||
NULL, /* disable an event */
|
NULL, /* disable an event */
|
||||||
ngx_rtsig_add_connection, /* add an connection */
|
ngx_rtsig_add_connection, /* add an connection */
|
||||||
ngx_rtsig_del_connection, /* delete an connection */
|
ngx_rtsig_del_connection, /* delete an connection */
|
||||||
|
NULL, /* trigger a notify */
|
||||||
NULL, /* process the changes */
|
NULL, /* process the changes */
|
||||||
ngx_rtsig_process_events, /* process the events */
|
ngx_rtsig_process_events, /* process the events */
|
||||||
ngx_rtsig_init, /* init the events */
|
ngx_rtsig_init, /* init the events */
|
||||||
|
@ -47,6 +47,7 @@ ngx_event_module_t ngx_select_module_ctx = {
|
|||||||
ngx_select_del_event, /* disable an event */
|
ngx_select_del_event, /* disable an event */
|
||||||
NULL, /* add an connection */
|
NULL, /* add an connection */
|
||||||
NULL, /* delete an connection */
|
NULL, /* delete an connection */
|
||||||
|
NULL, /* trigger a notify */
|
||||||
NULL, /* process the changes */
|
NULL, /* process the changes */
|
||||||
ngx_select_process_events, /* process the events */
|
ngx_select_process_events, /* process the events */
|
||||||
ngx_select_init, /* init the events */
|
ngx_select_init, /* init the events */
|
||||||
|
@ -48,6 +48,7 @@ ngx_event_module_t ngx_select_module_ctx = {
|
|||||||
ngx_select_del_event, /* disable an event */
|
ngx_select_del_event, /* disable an event */
|
||||||
NULL, /* add an connection */
|
NULL, /* add an connection */
|
||||||
NULL, /* delete an connection */
|
NULL, /* delete an connection */
|
||||||
|
NULL, /* trigger a notify */
|
||||||
NULL, /* process the changes */
|
NULL, /* process the changes */
|
||||||
ngx_select_process_events, /* process the events */
|
ngx_select_process_events, /* process the events */
|
||||||
ngx_select_init, /* init the events */
|
ngx_select_init, /* init the events */
|
||||||
|
@ -178,7 +178,7 @@ ngx_event_module_t ngx_event_core_module_ctx = {
|
|||||||
ngx_event_core_create_conf, /* create configuration */
|
ngx_event_core_create_conf, /* create configuration */
|
||||||
ngx_event_core_init_conf, /* init configuration */
|
ngx_event_core_init_conf, /* init configuration */
|
||||||
|
|
||||||
{ NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL }
|
{ NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -200,6 +200,8 @@ typedef struct {
|
|||||||
ngx_int_t (*add_conn)(ngx_connection_t *c);
|
ngx_int_t (*add_conn)(ngx_connection_t *c);
|
||||||
ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);
|
ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);
|
||||||
|
|
||||||
|
ngx_int_t (*notify)(ngx_event_handler_pt handler);
|
||||||
|
|
||||||
ngx_int_t (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait);
|
ngx_int_t (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait);
|
||||||
ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
|
ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||||
ngx_uint_t flags);
|
ngx_uint_t flags);
|
||||||
@ -422,6 +424,8 @@ extern ngx_event_actions_t ngx_event_actions;
|
|||||||
#define ngx_add_conn ngx_event_actions.add_conn
|
#define ngx_add_conn ngx_event_actions.add_conn
|
||||||
#define ngx_del_conn ngx_event_actions.del_conn
|
#define ngx_del_conn ngx_event_actions.del_conn
|
||||||
|
|
||||||
|
#define ngx_notify ngx_event_actions.notify
|
||||||
|
|
||||||
#define ngx_add_timer ngx_event_add_timer
|
#define ngx_add_timer ngx_event_add_timer
|
||||||
#define ngx_del_timer ngx_event_del_timer
|
#define ngx_del_timer ngx_event_del_timer
|
||||||
|
|
||||||
|
@ -93,11 +93,11 @@ extern ssize_t sendfile(int s, int fd, int32_t *offset, size_t size);
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
#if (NGX_HAVE_FILE_AIO)
|
|
||||||
#if (NGX_HAVE_SYS_EVENTFD_H)
|
#if (NGX_HAVE_SYS_EVENTFD_H)
|
||||||
#include <sys/eventfd.h>
|
#include <sys/eventfd.h>
|
||||||
#endif
|
#endif
|
||||||
#include <sys/syscall.h>
|
#include <sys/syscall.h>
|
||||||
|
#if (NGX_HAVE_FILE_AIO)
|
||||||
#include <linux/aio_abi.h>
|
#include <linux/aio_abi.h>
|
||||||
typedef struct iocb ngx_aiocb_t;
|
typedef struct iocb ngx_aiocb_t;
|
||||||
#endif
|
#endif
|
||||||
|
@ -111,9 +111,61 @@ ngx_int_t ngx_cond_signal(ngx_cond_t *cv);
|
|||||||
|
|
||||||
#define ngx_thread_volatile
|
#define ngx_thread_volatile
|
||||||
|
|
||||||
|
#if (NGX_THREADS)
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
|
|
||||||
|
typedef pthread_mutex_t ngx_thread_mutex_t;
|
||||||
|
|
||||||
|
ngx_int_t ngx_thread_mutex_create(ngx_thread_mutex_t *mtx, ngx_log_t *log);
|
||||||
|
ngx_int_t ngx_thread_mutex_destroy(ngx_thread_mutex_t *mtx, ngx_log_t *log);
|
||||||
|
ngx_int_t ngx_thread_mutex_lock(ngx_thread_mutex_t *mtx, ngx_log_t *log);
|
||||||
|
ngx_int_t ngx_thread_mutex_unlock(ngx_thread_mutex_t *mtx, ngx_log_t *log);
|
||||||
|
|
||||||
|
|
||||||
|
typedef pthread_cond_t ngx_thread_cond_t;
|
||||||
|
|
||||||
|
ngx_int_t ngx_thread_cond_create(ngx_thread_cond_t *cond, ngx_log_t *log);
|
||||||
|
ngx_int_t ngx_thread_cond_destroy(ngx_thread_cond_t *cond, ngx_log_t *log);
|
||||||
|
ngx_int_t ngx_thread_cond_signal(ngx_thread_cond_t *cond, ngx_log_t *log);
|
||||||
|
ngx_int_t ngx_thread_cond_wait(ngx_thread_cond_t *cond, ngx_thread_mutex_t *mtx,
|
||||||
|
ngx_log_t *log);
|
||||||
|
|
||||||
|
|
||||||
|
#if (NGX_LINUX)
|
||||||
|
|
||||||
|
typedef pid_t ngx_tid_t;
|
||||||
|
#define NGX_TID_T_FMT "%P"
|
||||||
|
|
||||||
|
#elif (NGX_FREEBSD)
|
||||||
|
|
||||||
|
typedef uint32_t ngx_tid_t;
|
||||||
|
#define NGX_TID_T_FMT "%uD"
|
||||||
|
|
||||||
|
#elif (NGX_DARWIN)
|
||||||
|
|
||||||
|
typedef uint64_t ngx_tid_t;
|
||||||
|
#define NGX_TID_T_FMT "%uA"
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
typedef uint64_t ngx_tid_t;
|
||||||
|
#define NGX_TID_T_FMT "%uA"
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
ngx_tid_t ngx_thread_tid(void);
|
||||||
|
|
||||||
|
#define ngx_log_tid ngx_thread_tid()
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
#define ngx_log_tid 0
|
#define ngx_log_tid 0
|
||||||
#define NGX_TID_T_FMT "%d"
|
#define NGX_TID_T_FMT "%d"
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
#define ngx_mutex_trylock(m) NGX_OK
|
#define ngx_mutex_trylock(m) NGX_OK
|
||||||
#define ngx_mutex_lock(m)
|
#define ngx_mutex_lock(m)
|
||||||
#define ngx_mutex_unlock(m)
|
#define ngx_mutex_unlock(m)
|
||||||
|
87
src/os/unix/ngx_thread_cond.c
Normal file
87
src/os/unix/ngx_thread_cond.c
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright (C) Igor Sysoev
|
||||||
|
* Copyright (C) Nginx, Inc.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
#include <ngx_config.h>
|
||||||
|
#include <ngx_core.h>
|
||||||
|
|
||||||
|
|
||||||
|
ngx_int_t
|
||||||
|
ngx_thread_cond_create(ngx_thread_cond_t *cond, ngx_log_t *log)
|
||||||
|
{
|
||||||
|
ngx_err_t err;
|
||||||
|
|
||||||
|
err = pthread_cond_init(cond, NULL);
|
||||||
|
if (err == 0) {
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
|
||||||
|
"pthread_cond_init(%p)", cond);
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
ngx_log_error(NGX_LOG_EMERG, log, err, "pthread_cond_init() failed");
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ngx_int_t
|
||||||
|
ngx_thread_cond_destroy(ngx_thread_cond_t *cond, ngx_log_t *log)
|
||||||
|
{
|
||||||
|
ngx_err_t err;
|
||||||
|
|
||||||
|
err = pthread_cond_destroy(cond);
|
||||||
|
if (err == 0) {
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
|
||||||
|
"pthread_cond_destroy(%p)", cond);
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
ngx_log_error(NGX_LOG_EMERG, log, err, "pthread_cond_destroy() failed");
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ngx_int_t
|
||||||
|
ngx_thread_cond_signal(ngx_thread_cond_t *cond, ngx_log_t *log)
|
||||||
|
{
|
||||||
|
ngx_err_t err;
|
||||||
|
|
||||||
|
err = pthread_cond_signal(cond);
|
||||||
|
if (err == 0) {
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
|
||||||
|
"pthread_cond_signal(%p)", cond);
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
ngx_log_error(NGX_LOG_EMERG, log, err, "pthread_cond_signal() failed");
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ngx_int_t
|
||||||
|
ngx_thread_cond_wait(ngx_thread_cond_t *cond, ngx_thread_mutex_t *mtx,
|
||||||
|
ngx_log_t *log)
|
||||||
|
{
|
||||||
|
ngx_err_t err;
|
||||||
|
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
|
||||||
|
"pthread_cond_wait(%p) enter", cond);
|
||||||
|
|
||||||
|
err = pthread_cond_wait(cond, mtx);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
ngx_time_update();
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (err == 0) {
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
|
||||||
|
"pthread_cond_wait(%p) exit", cond);
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_cond_wait() failed");
|
||||||
|
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
70
src/os/unix/ngx_thread_id.c
Normal file
70
src/os/unix/ngx_thread_id.c
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright (C) Igor Sysoev
|
||||||
|
* Copyright (C) Nginx, Inc.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
#include <ngx_config.h>
|
||||||
|
#include <ngx_core.h>
|
||||||
|
#include <ngx_thread_pool.h>
|
||||||
|
|
||||||
|
|
||||||
|
#if (NGX_LINUX)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Linux thread id is a pid of thread created by clone(2),
|
||||||
|
* glibc does not provide a wrapper for gettid().
|
||||||
|
*/
|
||||||
|
|
||||||
|
ngx_tid_t
|
||||||
|
ngx_thread_tid(void)
|
||||||
|
{
|
||||||
|
return syscall(SYS_gettid);
|
||||||
|
}
|
||||||
|
|
||||||
|
#elif (NGX_FREEBSD) && (__FreeBSD_version >= 900031)
|
||||||
|
|
||||||
|
#include <pthread_np.h>
|
||||||
|
|
||||||
|
ngx_tid_t
|
||||||
|
ngx_thread_tid(void)
|
||||||
|
{
|
||||||
|
return pthread_getthreadid_np();
|
||||||
|
}
|
||||||
|
|
||||||
|
#elif (NGX_DARWIN)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MacOSX thread has two thread ids:
|
||||||
|
*
|
||||||
|
* 1) MacOSX 10.6 (Snow Leoprad) has pthread_threadid_np() returning
|
||||||
|
* an uint64_t value, which is obtained using the __thread_selfid()
|
||||||
|
* syscall. It is a number above 300,000.
|
||||||
|
*/
|
||||||
|
|
||||||
|
ngx_tid_t
|
||||||
|
ngx_thread_tid(void)
|
||||||
|
{
|
||||||
|
uint64_t tid;
|
||||||
|
|
||||||
|
(void) pthread_threadid_np(NULL, &tid);
|
||||||
|
return tid;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* 2) Kernel thread mach_port_t returned by pthread_mach_thread_np().
|
||||||
|
* It is a number in range 100-100,000.
|
||||||
|
*
|
||||||
|
* return pthread_mach_thread_np(pthread_self());
|
||||||
|
*/
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
ngx_tid_t
|
||||||
|
ngx_thread_tid(void)
|
||||||
|
{
|
||||||
|
return (uint64_t) (uintptr_t) pthread_self();
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
174
src/os/unix/ngx_thread_mutex.c
Normal file
174
src/os/unix/ngx_thread_mutex.c
Normal file
@ -0,0 +1,174 @@
|
|||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright (C) Igor Sysoev
|
||||||
|
* Copyright (C) Nginx, Inc.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <ngx_config.h>
|
||||||
|
#include <ngx_core.h>
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* All modern pthread mutex implementations try to acquire a lock
|
||||||
|
* atomically in userland before going to sleep in kernel. Some
|
||||||
|
* spins before the sleeping.
|
||||||
|
*
|
||||||
|
* In Solaris since version 8 all mutex types spin before sleeping.
|
||||||
|
* The default spin count is 1000. It can be overridden using
|
||||||
|
* _THREAD_ADAPTIVE_SPIN=100 environment variable.
|
||||||
|
*
|
||||||
|
* In MacOSX all mutex types spin to acquire a lock protecting a mutex's
|
||||||
|
* internals. If the mutex is busy, thread calls Mach semaphore_wait().
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* PTHREAD_MUTEX_NORMAL lacks deadlock detection and is the fastest
|
||||||
|
* mutex type.
|
||||||
|
*
|
||||||
|
* Linux: No spinning. The internal name PTHREAD_MUTEX_TIMED_NP
|
||||||
|
* remains from the times when pthread_mutex_timedlock() was
|
||||||
|
* non-standard extension. Alias name: PTHREAD_MUTEX_FAST_NP.
|
||||||
|
* FreeBSD: No spinning.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* PTHREAD_MUTEX_ERRORCHECK is usually as fast as PTHREAD_MUTEX_NORMAL
|
||||||
|
* yet has lightweight deadlock detection.
|
||||||
|
*
|
||||||
|
* Linux: No spinning. The internal name: PTHREAD_MUTEX_ERRORCHECK_NP.
|
||||||
|
* FreeBSD: No spinning.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* PTHREAD_MUTEX_RECURSIVE allows recursive locking.
|
||||||
|
*
|
||||||
|
* Linux: No spinning. The internal name: PTHREAD_MUTEX_RECURSIVE_NP.
|
||||||
|
* FreeBSD: No spinning.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* PTHREAD_MUTEX_ADAPTIVE_NP spins on SMP systems before sleeping.
|
||||||
|
*
|
||||||
|
* Linux: No deadlock detection. Dynamically changes a spin count
|
||||||
|
* for each mutex from 10 to 100 based on spin count taken
|
||||||
|
* previously.
|
||||||
|
* FreeBSD: Deadlock detection. The default spin count is 2000.
|
||||||
|
* It can be overriden using LIBPTHREAD_SPINLOOPS environment
|
||||||
|
* variable or by pthread_mutex_setspinloops_np(). If a lock
|
||||||
|
* is still busy, sched_yield() can be called on both UP and
|
||||||
|
* SMP systems. The default yield loop count is zero, but
|
||||||
|
* it can be set by LIBPTHREAD_YIELDLOOPS environment
|
||||||
|
* variable or by pthread_mutex_setyieldloops_np().
|
||||||
|
* Solaris: No PTHREAD_MUTEX_ADAPTIVE_NP.
|
||||||
|
* MacOSX: No PTHREAD_MUTEX_ADAPTIVE_NP.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* PTHREAD_MUTEX_ELISION_NP is a Linux extension to elide locks using
|
||||||
|
* Intel Restricted Transactional Memory. It is the most suitable for
|
||||||
|
* rwlock pattern access because it allows simultaneous reads without lock.
|
||||||
|
* Supported since glibc 2.18.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* PTHREAD_MUTEX_DEFAULT is default mutex type.
|
||||||
|
*
|
||||||
|
* Linux: PTHREAD_MUTEX_NORMAL.
|
||||||
|
* FreeBSD: PTHREAD_MUTEX_ERRORCHECK.
|
||||||
|
* Solaris: PTHREAD_MUTEX_NORMAL.
|
||||||
|
* MacOSX: PTHREAD_MUTEX_NORMAL.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
ngx_int_t
|
||||||
|
ngx_thread_mutex_create(ngx_thread_mutex_t *mtx, ngx_log_t *log)
|
||||||
|
{
|
||||||
|
ngx_err_t err;
|
||||||
|
pthread_mutexattr_t attr;
|
||||||
|
|
||||||
|
err = pthread_mutexattr_init(&attr);
|
||||||
|
if (err != 0) {
|
||||||
|
ngx_log_error(NGX_LOG_EMERG, log, err,
|
||||||
|
"pthread_mutexattr_init() failed");
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
|
||||||
|
if (err != 0) {
|
||||||
|
ngx_log_error(NGX_LOG_EMERG, log, err,
|
||||||
|
"pthread_mutexattr_settype"
|
||||||
|
"(PTHREAD_MUTEX_ERRORCHECK) failed");
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
err = pthread_mutex_init(mtx, &attr);
|
||||||
|
if (err != 0) {
|
||||||
|
ngx_log_error(NGX_LOG_EMERG, log, err,
|
||||||
|
"pthread_mutex_init() failed");
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
err = pthread_mutexattr_destroy(&attr);
|
||||||
|
if (err != 0) {
|
||||||
|
ngx_log_error(NGX_LOG_ALERT, log, err,
|
||||||
|
"pthread_mutexattr_destroy() failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
|
||||||
|
"pthread_mutex_init(%p)", mtx);
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ngx_int_t
|
||||||
|
ngx_thread_mutex_destroy(ngx_thread_mutex_t *mtx, ngx_log_t *log)
|
||||||
|
{
|
||||||
|
ngx_err_t err;
|
||||||
|
|
||||||
|
err = pthread_mutex_destroy(mtx);
|
||||||
|
if (err != 0) {
|
||||||
|
ngx_log_error(NGX_LOG_ALERT, log, err,
|
||||||
|
"pthread_mutex_destroy() failed");
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
|
||||||
|
"pthread_mutex_destroy(%p)", mtx);
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ngx_int_t
|
||||||
|
ngx_thread_mutex_lock(ngx_thread_mutex_t *mtx, ngx_log_t *log)
|
||||||
|
{
|
||||||
|
ngx_err_t err;
|
||||||
|
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
|
||||||
|
"pthread_mutex_lock(%p) enter", mtx);
|
||||||
|
|
||||||
|
err = pthread_mutex_lock(mtx);
|
||||||
|
if (err == 0) {
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_mutex_lock() failed");
|
||||||
|
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ngx_int_t
|
||||||
|
ngx_thread_mutex_unlock(ngx_thread_mutex_t *mtx, ngx_log_t *log)
|
||||||
|
{
|
||||||
|
ngx_err_t err;
|
||||||
|
|
||||||
|
err = pthread_mutex_unlock(mtx);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
ngx_time_update();
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (err == 0) {
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
|
||||||
|
"pthread_mutex_unlock(%p) exit", mtx);
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_mutex_unlock() failed");
|
||||||
|
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user