Thread pools: fixed the waiting tasks accounting.

Behave like POSIX semaphores.  If N worker threads are waiting for tasks,
at least that number of tasks should be allowed to be put into the queue.
This commit is contained in:
Ruslan Ermilov 2015-03-19 13:00:48 +03:00
parent 20d07074e3
commit afe1fcffaa

View File

@ -25,7 +25,7 @@ typedef struct {
struct ngx_thread_pool_s { struct ngx_thread_pool_s {
ngx_thread_pool_queue_t queue; ngx_thread_pool_queue_t queue;
ngx_uint_t waiting; ngx_int_t waiting;
ngx_thread_cond_t cond; ngx_thread_cond_t cond;
ngx_log_t *log; ngx_log_t *log;
@ -33,7 +33,7 @@ struct ngx_thread_pool_s {
ngx_str_t name; ngx_str_t name;
ngx_uint_t threads; ngx_uint_t threads;
ngx_uint_t max_queue; ngx_int_t max_queue;
u_char *file; u_char *file;
ngx_uint_t line; ngx_uint_t line;
@ -219,7 +219,7 @@ ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
ngx_log_error(NGX_LOG_ERR, tp->log, 0, ngx_log_error(NGX_LOG_ERR, tp->log, 0,
"thread pool \"%V\" queue overflow: %ui tasks waiting", "thread pool \"%V\" queue overflow: %i tasks waiting",
&tp->name, tp->waiting); &tp->name, tp->waiting);
return NGX_ERROR; return NGX_ERROR;
} }
@ -283,7 +283,10 @@ ngx_thread_pool_cycle(void *data)
return NULL; return NULL;
} }
while (tp->waiting == 0) { /* the number may become negative */
tp->waiting--;
while (tp->queue.first == NULL) {
if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log) if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log)
!= NGX_OK) != NGX_OK)
{ {
@ -292,8 +295,6 @@ ngx_thread_pool_cycle(void *data)
} }
} }
tp->waiting--;
task = tp->queue.first; task = tp->queue.first;
tp->queue.first = task->next; tp->queue.first = task->next;
@ -476,7 +477,7 @@ ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10); tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10);
if (tp->max_queue == (ngx_uint_t) NGX_ERROR) { if (tp->max_queue == NGX_ERROR) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid max_queue value \"%V\"", &value[i]); "invalid max_queue value \"%V\"", &value[i]);
return NGX_CONF_ERROR; return NGX_CONF_ERROR;