Added support for offloading Linux sendfile() in thread pools.

This commit is contained in:
Valentin Bartenev 2015-03-14 17:37:30 +03:00
parent a7ad493aa6
commit 7ed1a9681b
2 changed files with 187 additions and 8 deletions

View File

@ -184,6 +184,10 @@ struct ngx_connection_s {
unsigned busy_count:2; unsigned busy_count:2;
#endif #endif
#if (NGX_THREADS)
ngx_thread_task_t *sendfile_task;
#endif
#if (NGX_OLD_THREADS) #if (NGX_OLD_THREADS)
ngx_atomic_t lock; ngx_atomic_t lock;
#endif #endif

View File

@ -13,6 +13,18 @@
static ssize_t ngx_linux_sendfile(ngx_connection_t *c, ngx_buf_t *file, static ssize_t ngx_linux_sendfile(ngx_connection_t *c, ngx_buf_t *file,
size_t size); size_t size);
#if (NGX_THREADS)
#include <ngx_thread_pool.h>
#if !(NGX_HAVE_SENDFILE64)
#error sendfile64() is required!
#endif
static ngx_int_t ngx_linux_sendfile_thread(ngx_connection_t *c, ngx_buf_t *file,
size_t size, size_t *sent);
static void ngx_linux_sendfile_thread_handler(void *data, ngx_log_t *log);
#endif
/* /*
* On Linux up to 2.4.21 sendfile() (syscall #187) works with 32-bit * On Linux up to 2.4.21 sendfile() (syscall #187) works with 32-bit
@ -35,8 +47,8 @@ ngx_chain_t *
ngx_linux_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit) ngx_linux_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
{ {
int tcp_nodelay; int tcp_nodelay;
off_t send, prev_send, sent; off_t send, prev_send;
size_t file_size; size_t file_size, sent;
ssize_t n; ssize_t n;
ngx_err_t err; ngx_err_t err;
ngx_buf_t *file; ngx_buf_t *file;
@ -44,6 +56,10 @@ ngx_linux_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
ngx_chain_t *cl; ngx_chain_t *cl;
ngx_iovec_t header; ngx_iovec_t header;
struct iovec headers[NGX_IOVS_PREALLOCATE]; struct iovec headers[NGX_IOVS_PREALLOCATE];
#if (NGX_THREADS)
ngx_int_t rc;
ngx_uint_t thread_handled, thread_complete;
#endif
wev = c->write; wev = c->write;
@ -66,6 +82,10 @@ ngx_linux_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
for ( ;; ) { for ( ;; ) {
prev_send = send; prev_send = send;
#if (NGX_THREADS)
thread_handled = 0;
thread_complete = 0;
#endif
/* create the iovec and coalesce the neighbouring bufs */ /* create the iovec and coalesce the neighbouring bufs */
@ -158,14 +178,39 @@ ngx_linux_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
return NGX_CHAIN_ERROR; return NGX_CHAIN_ERROR;
} }
#endif #endif
n = ngx_linux_sendfile(c, file, file_size);
if (n == NGX_ERROR) { #if (NGX_THREADS)
return NGX_CHAIN_ERROR; if (file->file->thread_handler) {
rc = ngx_linux_sendfile_thread(c, file, file_size, &sent);
switch (rc) {
case NGX_OK:
thread_handled = 1;
break;
case NGX_DONE:
thread_complete = 1;
break;
case NGX_AGAIN:
break;
default: /* NGX_ERROR */
return NGX_CHAIN_ERROR;
}
} else
#endif
{
n = ngx_linux_sendfile(c, file, file_size);
if (n == NGX_ERROR) {
return NGX_CHAIN_ERROR;
}
sent = (n == NGX_AGAIN) ? 0 : n;
} }
sent = (n == NGX_AGAIN) ? 0 : n;
} else { } else {
n = ngx_writev(c, &header); n = ngx_writev(c, &header);
@ -180,7 +225,17 @@ ngx_linux_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
in = ngx_chain_update_sent(in, sent); in = ngx_chain_update_sent(in, sent);
if (send - prev_send != sent) { if ((size_t) (send - prev_send) != sent) {
#if (NGX_THREADS)
if (thread_handled) {
return in;
}
if (thread_complete) {
send = prev_send + sent;
continue;
}
#endif
wev->ready = 0; wev->ready = 0;
return in; return in;
} }
@ -242,3 +297,123 @@ eintr:
return n; return n;
} }
#if (NGX_THREADS)
typedef struct {
ngx_buf_t *file;
ngx_socket_t socket;
size_t size;
size_t sent;
ngx_err_t err;
} ngx_linux_sendfile_ctx_t;
static ngx_int_t
ngx_linux_sendfile_thread(ngx_connection_t *c, ngx_buf_t *file, size_t size,
size_t *sent)
{
ngx_uint_t flags;
ngx_event_t *wev;
ngx_thread_task_t *task;
ngx_linux_sendfile_ctx_t *ctx;
ngx_log_debug3(NGX_LOG_DEBUG_CORE, c->log, 0,
"linux sendfile thread: %d, %uz, %O",
file->file->fd, size, file->file_pos);
task = c->sendfile_task;
if (task == NULL) {
task = ngx_thread_task_alloc(c->pool, sizeof(ngx_linux_sendfile_ctx_t));
if (task == NULL) {
return NGX_ERROR;
}
task->handler = ngx_linux_sendfile_thread_handler;
c->sendfile_task = task;
}
ctx = task->ctx;
wev = c->write;
if (task->event.complete) {
task->event.complete = 0;
if (ctx->err && ctx->err != NGX_EAGAIN) {
wev->error = 1;
ngx_connection_error(c, ctx->err, "sendfile() failed");
return NGX_ERROR;
}
*sent = ctx->sent;
return (ctx->sent == ctx->size) ? NGX_DONE : NGX_AGAIN;
}
ctx->file = file;
ctx->socket = c->fd;
ctx->size = size;
if (wev->active) {
flags = (ngx_event_flags & NGX_USE_CLEAR_EVENT) ? NGX_CLEAR_EVENT
: NGX_LEVEL_EVENT;
if (ngx_del_event(wev, NGX_WRITE_EVENT, flags) == NGX_ERROR) {
return NGX_ERROR;
}
}
if (file->file->thread_handler(task, file->file) != NGX_OK) {
return NGX_ERROR;
}
*sent = 0;
return NGX_OK;
}
static void
ngx_linux_sendfile_thread_handler(void *data, ngx_log_t *log)
{
ngx_linux_sendfile_ctx_t *ctx = data;
off_t offset;
ssize_t n;
ngx_buf_t *file;
ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, 0, "linux sendfile thread handler");
file = ctx->file;
offset = file->file_pos;
again:
n = sendfile(ctx->socket, file->file->fd, &offset, ctx->size);
if (n == -1) {
ctx->err = ngx_errno;
} else {
ctx->sent = n;
ctx->err = 0;
}
#if 0
ngx_time_update();
#endif
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, log, 0,
"sendfile: %z (err: %i) of %uz @%O",
n, ctx->err, ctx->size, file->file_pos);
if (ctx->err == NGX_EINTR) {
goto again;
}
}
#endif /* NGX_THREADS */