mirror of
https://github.com/nginx/nginx.git
synced 2025-06-07 17:52:38 +08:00
Threads: writing via threads pools in event pipe.
The "aio_write" directive is introduced, which enables use of aio for writing. Currently it is meaningful only with "aio threads". Note that aio operations can be done by both event pipe and output chain, so proper mapping between r->aio and p->aio is provided when calling ngx_event_pipe() and in output filter. In collaboration with Valentin Bartenev.
This commit is contained in:
parent
10c8c8d6a4
commit
348f705c00
@ -112,6 +112,14 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
#if (NGX_THREADS)
|
||||
if (p->aio) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
||||
"pipe read upstream: aio");
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
#endif
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
||||
"pipe read upstream: %d", p->upstream->read->ready);
|
||||
|
||||
@ -258,19 +266,6 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
|
||||
break;
|
||||
}
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
if (ngx_event_flags & NGX_USE_LEVEL_EVENT
|
||||
&& p->upstream->read->active
|
||||
&& p->upstream->read->ready)
|
||||
{
|
||||
if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
|
||||
== NGX_ERROR)
|
||||
{
|
||||
return NGX_ABORT;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (rc != NGX_OK) {
|
||||
return rc;
|
||||
}
|
||||
@ -475,8 +470,10 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
||||
"pipe write chain");
|
||||
|
||||
if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
|
||||
return NGX_ABORT;
|
||||
rc = ngx_event_pipe_write_chain_to_temp_file(p);
|
||||
|
||||
if (rc != NGX_OK) {
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
@ -499,6 +496,18 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
||||
"pipe write downstream: %d", downstream->write->ready);
|
||||
|
||||
#if (NGX_THREADS)
|
||||
|
||||
if (p->writing) {
|
||||
rc = ngx_event_pipe_write_chain_to_temp_file(p);
|
||||
|
||||
if (rc == NGX_ABORT) {
|
||||
return NGX_ABORT;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
flushed = 0;
|
||||
|
||||
for ( ;; ) {
|
||||
@ -532,6 +541,10 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
|
||||
p->out = NULL;
|
||||
}
|
||||
|
||||
if (p->writing) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (p->in) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
||||
"pipe write downstream flush in");
|
||||
@ -608,7 +621,7 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
|
||||
|
||||
p->out = p->out->next;
|
||||
|
||||
} else if (!p->cacheable && p->in) {
|
||||
} else if (!p->cacheable && !p->writing && p->in) {
|
||||
cl = p->in;
|
||||
|
||||
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
||||
@ -710,12 +723,38 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
|
||||
ssize_t size, bsize, n;
|
||||
ngx_buf_t *b;
|
||||
ngx_uint_t prev_last_shadow;
|
||||
ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
|
||||
ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free;
|
||||
|
||||
#if (NGX_THREADS)
|
||||
|
||||
if (p->writing) {
|
||||
|
||||
if (p->aio) {
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
out = p->writing;
|
||||
p->writing = NULL;
|
||||
|
||||
n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
|
||||
|
||||
if (n == NGX_ERROR) {
|
||||
return NGX_ABORT;
|
||||
}
|
||||
|
||||
goto done;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
if (p->buf_to_file) {
|
||||
fl.buf = p->buf_to_file;
|
||||
fl.next = p->in;
|
||||
out = &fl;
|
||||
out = ngx_alloc_chain_link(p->pool);
|
||||
if (out == NULL) {
|
||||
return NGX_ABORT;
|
||||
}
|
||||
|
||||
out->buf = p->buf_to_file;
|
||||
out->next = p->in;
|
||||
|
||||
} else {
|
||||
out = p->in;
|
||||
@ -775,12 +814,31 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
|
||||
p->last_in = &p->in;
|
||||
}
|
||||
|
||||
#if (NGX_THREADS)
|
||||
p->temp_file->thread_write = p->thread_handler ? 1 : 0;
|
||||
p->temp_file->file.thread_task = p->thread_task;
|
||||
p->temp_file->file.thread_handler = p->thread_handler;
|
||||
p->temp_file->file.thread_ctx = p->thread_ctx;
|
||||
#endif
|
||||
|
||||
n = ngx_write_chain_to_temp_file(p->temp_file, out);
|
||||
|
||||
if (n == NGX_ERROR) {
|
||||
return NGX_ABORT;
|
||||
}
|
||||
|
||||
#if (NGX_THREADS)
|
||||
|
||||
if (n == NGX_AGAIN) {
|
||||
p->writing = out;
|
||||
p->thread_task = p->temp_file->file.thread_task;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
done:
|
||||
|
||||
#endif
|
||||
|
||||
if (p->buf_to_file) {
|
||||
p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
|
||||
n -= p->buf_to_file->last - p->buf_to_file->pos;
|
||||
|
@ -30,6 +30,8 @@ struct ngx_event_pipe_s {
|
||||
ngx_chain_t *in;
|
||||
ngx_chain_t **last_in;
|
||||
|
||||
ngx_chain_t *writing;
|
||||
|
||||
ngx_chain_t *out;
|
||||
ngx_chain_t *free;
|
||||
ngx_chain_t *busy;
|
||||
@ -45,6 +47,13 @@ struct ngx_event_pipe_s {
|
||||
ngx_event_pipe_output_filter_pt output_filter;
|
||||
void *output_ctx;
|
||||
|
||||
#if (NGX_THREADS)
|
||||
ngx_int_t (*thread_handler)(ngx_thread_task_t *task,
|
||||
ngx_file_t *file);
|
||||
void *thread_ctx;
|
||||
ngx_thread_task_t *thread_task;
|
||||
#endif
|
||||
|
||||
unsigned read:1;
|
||||
unsigned cacheable:1;
|
||||
unsigned single_buf:1;
|
||||
@ -56,6 +65,7 @@ struct ngx_event_pipe_s {
|
||||
unsigned downstream_done:1;
|
||||
unsigned downstream_error:1;
|
||||
unsigned cyclic_temp_file:1;
|
||||
unsigned aio:1;
|
||||
|
||||
ngx_int_t allocated;
|
||||
ngx_bufs_t bufs;
|
||||
|
@ -402,6 +402,13 @@ static ngx_command_t ngx_http_core_commands[] = {
|
||||
0,
|
||||
NULL },
|
||||
|
||||
{ ngx_string("aio_write"),
|
||||
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG,
|
||||
ngx_conf_set_flag_slot,
|
||||
NGX_HTTP_LOC_CONF_OFFSET,
|
||||
offsetof(ngx_http_core_loc_conf_t, aio_write),
|
||||
NULL },
|
||||
|
||||
{ ngx_string("read_ahead"),
|
||||
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
|
||||
ngx_conf_set_size_slot,
|
||||
@ -3608,6 +3615,7 @@ ngx_http_core_create_loc_conf(ngx_conf_t *cf)
|
||||
clcf->sendfile = NGX_CONF_UNSET;
|
||||
clcf->sendfile_max_chunk = NGX_CONF_UNSET_SIZE;
|
||||
clcf->aio = NGX_CONF_UNSET;
|
||||
clcf->aio_write = NGX_CONF_UNSET;
|
||||
#if (NGX_THREADS)
|
||||
clcf->thread_pool = NGX_CONF_UNSET_PTR;
|
||||
clcf->thread_pool_value = NGX_CONF_UNSET_PTR;
|
||||
@ -3829,6 +3837,7 @@ ngx_http_core_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
|
||||
prev->sendfile_max_chunk, 0);
|
||||
#if (NGX_HAVE_FILE_AIO || NGX_THREADS)
|
||||
ngx_conf_merge_value(conf->aio, prev->aio, NGX_HTTP_AIO_OFF);
|
||||
ngx_conf_merge_value(conf->aio_write, prev->aio_write, 0);
|
||||
#endif
|
||||
#if (NGX_THREADS)
|
||||
ngx_conf_merge_ptr_value(conf->thread_pool, prev->thread_pool, NULL);
|
||||
|
@ -404,6 +404,7 @@ struct ngx_http_core_loc_conf_s {
|
||||
ngx_flag_t internal; /* internal */
|
||||
ngx_flag_t sendfile; /* sendfile */
|
||||
ngx_flag_t aio; /* aio */
|
||||
ngx_flag_t aio_write; /* aio_write */
|
||||
ngx_flag_t tcp_nopush; /* tcp_nopush */
|
||||
ngx_flag_t tcp_nodelay; /* tcp_nodelay */
|
||||
ngx_flag_t reset_timedout_connection; /* reset_timedout_connection */
|
||||
|
@ -76,6 +76,13 @@ static void
|
||||
static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data);
|
||||
static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data,
|
||||
ssize_t bytes);
|
||||
#if (NGX_THREADS)
|
||||
static ngx_int_t ngx_http_upstream_thread_handler(ngx_thread_task_t *task,
|
||||
ngx_file_t *file);
|
||||
static void ngx_http_upstream_thread_event_handler(ngx_event_t *ev);
|
||||
#endif
|
||||
static ngx_int_t ngx_http_upstream_output_filter(void *data,
|
||||
ngx_chain_t *chain);
|
||||
static void ngx_http_upstream_process_downstream(ngx_http_request_t *r);
|
||||
static void ngx_http_upstream_process_upstream(ngx_http_request_t *r,
|
||||
ngx_http_upstream_t *u);
|
||||
@ -2870,7 +2877,7 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
|
||||
|
||||
p = u->pipe;
|
||||
|
||||
p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
|
||||
p->output_filter = ngx_http_upstream_output_filter;
|
||||
p->output_ctx = r;
|
||||
p->tag = u->output.tag;
|
||||
p->bufs = u->conf->bufs;
|
||||
@ -2913,6 +2920,13 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
|
||||
p->max_temp_file_size = u->conf->max_temp_file_size;
|
||||
p->temp_file_write_size = u->conf->temp_file_write_size;
|
||||
|
||||
#if (NGX_THREADS)
|
||||
if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) {
|
||||
p->thread_handler = ngx_http_upstream_thread_handler;
|
||||
p->thread_ctx = r;
|
||||
}
|
||||
#endif
|
||||
|
||||
p->preread_bufs = ngx_alloc_chain_link(r->pool);
|
||||
if (p->preread_bufs == NULL) {
|
||||
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
|
||||
@ -3487,6 +3501,97 @@ ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
|
||||
}
|
||||
|
||||
|
||||
#if (NGX_THREADS)
|
||||
|
||||
static ngx_int_t
|
||||
ngx_http_upstream_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
|
||||
{
|
||||
ngx_str_t name;
|
||||
ngx_event_pipe_t *p;
|
||||
ngx_thread_pool_t *tp;
|
||||
ngx_http_request_t *r;
|
||||
ngx_http_core_loc_conf_t *clcf;
|
||||
|
||||
r = file->thread_ctx;
|
||||
p = r->upstream->pipe;
|
||||
|
||||
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
|
||||
tp = clcf->thread_pool;
|
||||
|
||||
if (tp == NULL) {
|
||||
if (ngx_http_complex_value(r, clcf->thread_pool_value, &name)
|
||||
!= NGX_OK)
|
||||
{
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
tp = ngx_thread_pool_get((ngx_cycle_t *) ngx_cycle, &name);
|
||||
|
||||
if (tp == NULL) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"thread pool \"%V\" not found", &name);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
task->event.data = r;
|
||||
task->event.handler = ngx_http_upstream_thread_event_handler;
|
||||
|
||||
if (ngx_thread_task_post(tp, task) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
r->main->blocked++;
|
||||
r->aio = 1;
|
||||
p->aio = 1;
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ngx_http_upstream_thread_event_handler(ngx_event_t *ev)
|
||||
{
|
||||
ngx_connection_t *c;
|
||||
ngx_http_request_t *r;
|
||||
|
||||
r = ev->data;
|
||||
c = r->connection;
|
||||
|
||||
ngx_http_set_log_request(c->log, r);
|
||||
|
||||
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
|
||||
"http upstream thread: \"%V?%V\"", &r->uri, &r->args);
|
||||
|
||||
r->main->blocked--;
|
||||
r->aio = 0;
|
||||
|
||||
r->write_event_handler(r);
|
||||
|
||||
ngx_http_run_posted_requests(c);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain)
|
||||
{
|
||||
ngx_int_t rc;
|
||||
ngx_event_pipe_t *p;
|
||||
ngx_http_request_t *r;
|
||||
|
||||
r = data;
|
||||
p = r->upstream->pipe;
|
||||
|
||||
rc = ngx_http_output_filter(r, chain);
|
||||
|
||||
p->aio = r->aio;
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ngx_http_upstream_process_downstream(ngx_http_request_t *r)
|
||||
{
|
||||
@ -3505,6 +3610,10 @@ ngx_http_upstream_process_downstream(ngx_http_request_t *r)
|
||||
|
||||
c->log->action = "sending to client";
|
||||
|
||||
#if (NGX_THREADS)
|
||||
p->aio = r->aio;
|
||||
#endif
|
||||
|
||||
if (wev->timedout) {
|
||||
|
||||
if (wev->delayed) {
|
||||
@ -3634,6 +3743,12 @@ ngx_http_upstream_process_request(ngx_http_request_t *r,
|
||||
|
||||
p = u->pipe;
|
||||
|
||||
#if (NGX_THREADS)
|
||||
if (p->writing) {
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (u->peer.connection) {
|
||||
|
||||
if (u->store) {
|
||||
|
Loading…
Reference in New Issue
Block a user