mirror of
https://github.com/nginx/nginx.git
synced 2024-12-12 18:29:00 +08:00
a746bab7c1
As long as ngx_event_pipe() has more data read from upstream than specified in p->length it's passed to input filter even if buffer isn't yet full. This allows to process data with known length without relying on connection close to signal data end. By default p->length is set to -1 in upstream module, i.e. end of data is indicated by connection close. To set it from per-protocol handlers upstream input_filter_init() now called in buffered mode (as well as in unbuffered mode).
1025 lines
25 KiB
C
1025 lines
25 KiB
C
|
|
/*
|
|
* Copyright (C) Igor Sysoev
|
|
*/
|
|
|
|
|
|
#include <ngx_config.h>
|
|
#include <ngx_core.h>
|
|
#include <ngx_event.h>
|
|
#include <ngx_event_pipe.h>
|
|
|
|
|
|
static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
|
|
static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
|
|
|
|
static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
|
|
static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
|
|
static ngx_inline void ngx_event_pipe_free_shadow_raw_buf(ngx_chain_t **free,
|
|
ngx_buf_t *buf);
|
|
static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
|
|
|
|
|
|
ngx_int_t
|
|
ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
|
|
{
|
|
u_int flags;
|
|
ngx_int_t rc;
|
|
ngx_event_t *rev, *wev;
|
|
|
|
for ( ;; ) {
|
|
if (do_write) {
|
|
p->log->action = "sending to client";
|
|
|
|
rc = ngx_event_pipe_write_to_downstream(p);
|
|
|
|
if (rc == NGX_ABORT) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
if (rc == NGX_BUSY) {
|
|
return NGX_OK;
|
|
}
|
|
}
|
|
|
|
p->read = 0;
|
|
p->upstream_blocked = 0;
|
|
|
|
p->log->action = "reading upstream";
|
|
|
|
if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
if (!p->read && !p->upstream_blocked) {
|
|
break;
|
|
}
|
|
|
|
do_write = 1;
|
|
}
|
|
|
|
if (p->upstream->fd != -1) {
|
|
rev = p->upstream->read;
|
|
|
|
flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
|
|
|
|
if (ngx_handle_read_event(rev, flags) != NGX_OK) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
if (rev->active && !rev->ready) {
|
|
ngx_add_timer(rev, p->read_timeout);
|
|
|
|
} else if (rev->timer_set) {
|
|
ngx_del_timer(rev);
|
|
}
|
|
}
|
|
|
|
if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) {
|
|
wev = p->downstream->write;
|
|
if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
if (!wev->delayed) {
|
|
if (wev->active && !wev->ready) {
|
|
ngx_add_timer(wev, p->send_timeout);
|
|
|
|
} else if (wev->timer_set) {
|
|
ngx_del_timer(wev);
|
|
}
|
|
}
|
|
}
|
|
|
|
return NGX_OK;
|
|
}
|
|
|
|
|
|
static ngx_int_t
|
|
ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
|
|
{
|
|
ssize_t n, size;
|
|
ngx_int_t rc;
|
|
ngx_buf_t *b;
|
|
ngx_chain_t *chain, *cl, *ln;
|
|
|
|
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
|
|
return NGX_OK;
|
|
}
|
|
|
|
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe read upstream: %d", p->upstream->read->ready);
|
|
|
|
for ( ;; ) {
|
|
|
|
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
|
|
break;
|
|
}
|
|
|
|
if (p->preread_bufs == NULL && !p->upstream->read->ready) {
|
|
break;
|
|
}
|
|
|
|
if (p->preread_bufs) {
|
|
|
|
/* use the pre-read bufs if they exist */
|
|
|
|
chain = p->preread_bufs;
|
|
p->preread_bufs = NULL;
|
|
n = p->preread_size;
|
|
|
|
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe preread: %z", n);
|
|
|
|
if (n) {
|
|
p->read = 1;
|
|
}
|
|
|
|
} else {
|
|
|
|
#if (NGX_HAVE_KQUEUE)
|
|
|
|
/*
|
|
* kqueue notifies about the end of file or a pending error.
|
|
* This test allows not to allocate a buf on these conditions
|
|
* and not to call c->recv_chain().
|
|
*/
|
|
|
|
if (p->upstream->read->available == 0
|
|
&& p->upstream->read->pending_eof)
|
|
{
|
|
p->upstream->read->ready = 0;
|
|
p->upstream->read->eof = 1;
|
|
p->upstream_eof = 1;
|
|
p->read = 1;
|
|
|
|
if (p->upstream->read->kq_errno) {
|
|
p->upstream->read->error = 1;
|
|
p->upstream_error = 1;
|
|
p->upstream_eof = 0;
|
|
|
|
ngx_log_error(NGX_LOG_ERR, p->log,
|
|
p->upstream->read->kq_errno,
|
|
"kevent() reported that upstream "
|
|
"closed connection");
|
|
}
|
|
|
|
break;
|
|
}
|
|
#endif
|
|
|
|
if (p->free_raw_bufs) {
|
|
|
|
/* use the free bufs if they exist */
|
|
|
|
chain = p->free_raw_bufs;
|
|
if (p->single_buf) {
|
|
p->free_raw_bufs = p->free_raw_bufs->next;
|
|
chain->next = NULL;
|
|
} else {
|
|
p->free_raw_bufs = NULL;
|
|
}
|
|
|
|
} else if (p->allocated < p->bufs.num) {
|
|
|
|
/* allocate a new buf if it's still allowed */
|
|
|
|
b = ngx_create_temp_buf(p->pool, p->bufs.size);
|
|
if (b == NULL) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
p->allocated++;
|
|
|
|
chain = ngx_alloc_chain_link(p->pool);
|
|
if (chain == NULL) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
chain->buf = b;
|
|
chain->next = NULL;
|
|
|
|
} else if (!p->cacheable
|
|
&& p->downstream->data == p->output_ctx
|
|
&& p->downstream->write->ready
|
|
&& !p->downstream->write->delayed)
|
|
{
|
|
/*
|
|
* if the bufs are not needed to be saved in a cache and
|
|
* a downstream is ready then write the bufs to a downstream
|
|
*/
|
|
|
|
p->upstream_blocked = 1;
|
|
|
|
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe downstream ready");
|
|
|
|
break;
|
|
|
|
} else if (p->cacheable
|
|
|| p->temp_file->offset < p->max_temp_file_size)
|
|
{
|
|
|
|
/*
|
|
* if it is allowed, then save some bufs from r->in
|
|
* to a temporary file, and add them to a r->out chain
|
|
*/
|
|
|
|
rc = ngx_event_pipe_write_chain_to_temp_file(p);
|
|
|
|
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe temp offset: %O", p->temp_file->offset);
|
|
|
|
if (rc == NGX_BUSY) {
|
|
break;
|
|
}
|
|
|
|
if (rc == NGX_AGAIN) {
|
|
if (ngx_event_flags & NGX_USE_LEVEL_EVENT
|
|
&& p->upstream->read->active
|
|
&& p->upstream->read->ready)
|
|
{
|
|
if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
|
|
== NGX_ERROR)
|
|
{
|
|
return NGX_ABORT;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (rc != NGX_OK) {
|
|
return rc;
|
|
}
|
|
|
|
chain = p->free_raw_bufs;
|
|
if (p->single_buf) {
|
|
p->free_raw_bufs = p->free_raw_bufs->next;
|
|
chain->next = NULL;
|
|
} else {
|
|
p->free_raw_bufs = NULL;
|
|
}
|
|
|
|
} else {
|
|
|
|
/* there are no bufs to read in */
|
|
|
|
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"no pipe bufs to read in");
|
|
|
|
break;
|
|
}
|
|
|
|
n = p->upstream->recv_chain(p->upstream, chain);
|
|
|
|
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe recv chain: %z", n);
|
|
|
|
if (p->free_raw_bufs) {
|
|
chain->next = p->free_raw_bufs;
|
|
}
|
|
p->free_raw_bufs = chain;
|
|
|
|
if (n == NGX_ERROR) {
|
|
p->upstream_error = 1;
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
if (n == NGX_AGAIN) {
|
|
if (p->single_buf) {
|
|
ngx_event_pipe_remove_shadow_links(chain->buf);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
p->read = 1;
|
|
|
|
if (n == 0) {
|
|
p->upstream_eof = 1;
|
|
break;
|
|
}
|
|
}
|
|
|
|
p->read_length += n;
|
|
cl = chain;
|
|
p->free_raw_bufs = NULL;
|
|
|
|
while (cl && n > 0) {
|
|
|
|
ngx_event_pipe_remove_shadow_links(cl->buf);
|
|
|
|
size = cl->buf->end - cl->buf->last;
|
|
|
|
if (n >= size) {
|
|
cl->buf->last = cl->buf->end;
|
|
|
|
/* STUB */ cl->buf->num = p->num++;
|
|
|
|
if (p->input_filter(p, cl->buf) == NGX_ERROR) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
n -= size;
|
|
ln = cl;
|
|
cl = cl->next;
|
|
ngx_free_chain(p->pool, ln);
|
|
|
|
} else {
|
|
cl->buf->last += n;
|
|
n = 0;
|
|
}
|
|
}
|
|
|
|
if (cl) {
|
|
for (ln = cl; ln->next; ln = ln->next) { /* void */ }
|
|
|
|
ln->next = p->free_raw_bufs;
|
|
p->free_raw_bufs = cl;
|
|
}
|
|
}
|
|
|
|
#if (NGX_DEBUG)
|
|
|
|
for (cl = p->busy; cl; cl = cl->next) {
|
|
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe buf busy s:%d t:%d f:%d "
|
|
"%p, pos %p, size: %z "
|
|
"file: %O, size: %z",
|
|
(cl->buf->shadow ? 1 : 0),
|
|
cl->buf->temporary, cl->buf->in_file,
|
|
cl->buf->start, cl->buf->pos,
|
|
cl->buf->last - cl->buf->pos,
|
|
cl->buf->file_pos,
|
|
cl->buf->file_last - cl->buf->file_pos);
|
|
}
|
|
|
|
for (cl = p->out; cl; cl = cl->next) {
|
|
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe buf out s:%d t:%d f:%d "
|
|
"%p, pos %p, size: %z "
|
|
"file: %O, size: %z",
|
|
(cl->buf->shadow ? 1 : 0),
|
|
cl->buf->temporary, cl->buf->in_file,
|
|
cl->buf->start, cl->buf->pos,
|
|
cl->buf->last - cl->buf->pos,
|
|
cl->buf->file_pos,
|
|
cl->buf->file_last - cl->buf->file_pos);
|
|
}
|
|
|
|
for (cl = p->in; cl; cl = cl->next) {
|
|
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe buf in s:%d t:%d f:%d "
|
|
"%p, pos %p, size: %z "
|
|
"file: %O, size: %z",
|
|
(cl->buf->shadow ? 1 : 0),
|
|
cl->buf->temporary, cl->buf->in_file,
|
|
cl->buf->start, cl->buf->pos,
|
|
cl->buf->last - cl->buf->pos,
|
|
cl->buf->file_pos,
|
|
cl->buf->file_last - cl->buf->file_pos);
|
|
}
|
|
|
|
for (cl = p->free_raw_bufs; cl; cl = cl->next) {
|
|
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe buf free s:%d t:%d f:%d "
|
|
"%p, pos %p, size: %z "
|
|
"file: %O, size: %z",
|
|
(cl->buf->shadow ? 1 : 0),
|
|
cl->buf->temporary, cl->buf->in_file,
|
|
cl->buf->start, cl->buf->pos,
|
|
cl->buf->last - cl->buf->pos,
|
|
cl->buf->file_pos,
|
|
cl->buf->file_last - cl->buf->file_pos);
|
|
}
|
|
|
|
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe length: %O", p->length);
|
|
|
|
#endif
|
|
|
|
if (p->free_raw_bufs && p->length != -1) {
|
|
cl = p->free_raw_bufs;
|
|
|
|
if (cl->buf->last - cl->buf->pos >= p->length) {
|
|
|
|
/* STUB */ cl->buf->num = p->num++;
|
|
|
|
if (p->input_filter(p, cl->buf) == NGX_ERROR) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
p->free_raw_bufs = cl->next;
|
|
}
|
|
}
|
|
|
|
if (p->length == 0) {
|
|
p->upstream_done = 1;
|
|
p->read = 1;
|
|
}
|
|
|
|
if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
|
|
|
|
/* STUB */ p->free_raw_bufs->buf->num = p->num++;
|
|
|
|
if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
p->free_raw_bufs = p->free_raw_bufs->next;
|
|
|
|
if (p->free_bufs && p->buf_to_file == NULL) {
|
|
for (cl = p->free_raw_bufs; cl; cl = cl->next) {
|
|
if (cl->buf->shadow == NULL) {
|
|
ngx_pfree(p->pool, cl->buf->start);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (p->cacheable && p->in) {
|
|
if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
|
|
return NGX_ABORT;
|
|
}
|
|
}
|
|
|
|
return NGX_OK;
|
|
}
|
|
|
|
|
|
static ngx_int_t
|
|
ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
|
|
{
|
|
u_char *prev;
|
|
size_t bsize;
|
|
ngx_int_t rc;
|
|
ngx_uint_t flush, flushed, prev_last_shadow;
|
|
ngx_chain_t *out, **ll, *cl, file;
|
|
ngx_connection_t *downstream;
|
|
|
|
downstream = p->downstream;
|
|
|
|
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe write downstream: %d", downstream->write->ready);
|
|
|
|
flushed = 0;
|
|
|
|
for ( ;; ) {
|
|
if (p->downstream_error) {
|
|
return ngx_event_pipe_drain_chains(p);
|
|
}
|
|
|
|
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
|
|
|
|
/* pass the p->out and p->in chains to the output filter */
|
|
|
|
for (cl = p->busy; cl; cl = cl->next) {
|
|
cl->buf->recycled = 0;
|
|
}
|
|
|
|
if (p->out) {
|
|
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe write downstream flush out");
|
|
|
|
for (cl = p->out; cl; cl = cl->next) {
|
|
cl->buf->recycled = 0;
|
|
}
|
|
|
|
rc = p->output_filter(p->output_ctx, p->out);
|
|
|
|
if (rc == NGX_ERROR) {
|
|
p->downstream_error = 1;
|
|
return ngx_event_pipe_drain_chains(p);
|
|
}
|
|
|
|
p->out = NULL;
|
|
}
|
|
|
|
if (p->in) {
|
|
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe write downstream flush in");
|
|
|
|
for (cl = p->in; cl; cl = cl->next) {
|
|
cl->buf->recycled = 0;
|
|
}
|
|
|
|
rc = p->output_filter(p->output_ctx, p->in);
|
|
|
|
if (rc == NGX_ERROR) {
|
|
p->downstream_error = 1;
|
|
return ngx_event_pipe_drain_chains(p);
|
|
}
|
|
|
|
p->in = NULL;
|
|
}
|
|
|
|
if (p->cacheable && p->buf_to_file) {
|
|
|
|
file.buf = p->buf_to_file;
|
|
file.next = NULL;
|
|
|
|
if (ngx_write_chain_to_temp_file(p->temp_file, &file)
|
|
== NGX_ERROR)
|
|
{
|
|
return NGX_ABORT;
|
|
}
|
|
}
|
|
|
|
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe write downstream done");
|
|
|
|
/* TODO: free unused bufs */
|
|
|
|
p->downstream_done = 1;
|
|
break;
|
|
}
|
|
|
|
if (downstream->data != p->output_ctx
|
|
|| !downstream->write->ready
|
|
|| downstream->write->delayed)
|
|
{
|
|
break;
|
|
}
|
|
|
|
/* bsize is the size of the busy recycled bufs */
|
|
|
|
prev = NULL;
|
|
bsize = 0;
|
|
|
|
for (cl = p->busy; cl; cl = cl->next) {
|
|
|
|
if (cl->buf->recycled) {
|
|
if (prev == cl->buf->start) {
|
|
continue;
|
|
}
|
|
|
|
bsize += cl->buf->end - cl->buf->start;
|
|
prev = cl->buf->start;
|
|
}
|
|
}
|
|
|
|
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe write busy: %uz", bsize);
|
|
|
|
out = NULL;
|
|
|
|
if (bsize >= (size_t) p->busy_size) {
|
|
flush = 1;
|
|
goto flush;
|
|
}
|
|
|
|
flush = 0;
|
|
ll = NULL;
|
|
prev_last_shadow = 1;
|
|
|
|
for ( ;; ) {
|
|
if (p->out) {
|
|
cl = p->out;
|
|
|
|
if (cl->buf->recycled
|
|
&& bsize + cl->buf->last - cl->buf->pos > p->busy_size)
|
|
{
|
|
flush = 1;
|
|
break;
|
|
}
|
|
|
|
p->out = p->out->next;
|
|
|
|
ngx_event_pipe_free_shadow_raw_buf(&p->free_raw_bufs, cl->buf);
|
|
|
|
} else if (!p->cacheable && p->in) {
|
|
cl = p->in;
|
|
|
|
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe write buf ls:%d %p %z",
|
|
cl->buf->last_shadow,
|
|
cl->buf->pos,
|
|
cl->buf->last - cl->buf->pos);
|
|
|
|
if (cl->buf->recycled
|
|
&& cl->buf->last_shadow
|
|
&& bsize + cl->buf->last - cl->buf->pos > p->busy_size)
|
|
{
|
|
if (!prev_last_shadow) {
|
|
p->in = p->in->next;
|
|
|
|
cl->next = NULL;
|
|
|
|
if (out) {
|
|
*ll = cl;
|
|
} else {
|
|
out = cl;
|
|
}
|
|
}
|
|
|
|
flush = 1;
|
|
break;
|
|
}
|
|
|
|
prev_last_shadow = cl->buf->last_shadow;
|
|
|
|
p->in = p->in->next;
|
|
|
|
} else {
|
|
break;
|
|
}
|
|
|
|
if (cl->buf->recycled) {
|
|
bsize += cl->buf->last - cl->buf->pos;
|
|
}
|
|
|
|
cl->next = NULL;
|
|
|
|
if (out) {
|
|
*ll = cl;
|
|
} else {
|
|
out = cl;
|
|
}
|
|
ll = &cl->next;
|
|
}
|
|
|
|
flush:
|
|
|
|
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe write: out:%p, f:%d", out, flush);
|
|
|
|
if (out == NULL) {
|
|
|
|
if (!flush) {
|
|
break;
|
|
}
|
|
|
|
/* a workaround for AIO */
|
|
if (flushed++ > 10) {
|
|
return NGX_BUSY;
|
|
}
|
|
}
|
|
|
|
rc = p->output_filter(p->output_ctx, out);
|
|
|
|
if (rc == NGX_ERROR) {
|
|
p->downstream_error = 1;
|
|
return ngx_event_pipe_drain_chains(p);
|
|
}
|
|
|
|
ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
|
|
|
|
for (cl = p->free; cl; cl = cl->next) {
|
|
|
|
if (cl->buf->temp_file) {
|
|
if (p->cacheable || !p->cyclic_temp_file) {
|
|
continue;
|
|
}
|
|
|
|
/* reset p->temp_offset if all bufs had been sent */
|
|
|
|
if (cl->buf->file_last == p->temp_file->offset) {
|
|
p->temp_file->offset = 0;
|
|
}
|
|
}
|
|
|
|
/* TODO: free buf if p->free_bufs && upstream done */
|
|
|
|
/* add the free shadow raw buf to p->free_raw_bufs */
|
|
|
|
if (cl->buf->last_shadow) {
|
|
if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
cl->buf->last_shadow = 0;
|
|
}
|
|
|
|
cl->buf->shadow = NULL;
|
|
}
|
|
}
|
|
|
|
return NGX_OK;
|
|
}
|
|
|
|
|
|
static ngx_int_t
|
|
ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
|
|
{
|
|
ssize_t size, bsize;
|
|
ngx_buf_t *b;
|
|
ngx_chain_t *cl, *tl, *next, *out, **ll, **last_free, fl;
|
|
|
|
if (p->buf_to_file) {
|
|
fl.buf = p->buf_to_file;
|
|
fl.next = p->in;
|
|
out = &fl;
|
|
|
|
} else {
|
|
out = p->in;
|
|
}
|
|
|
|
if (!p->cacheable) {
|
|
|
|
size = 0;
|
|
cl = out;
|
|
ll = NULL;
|
|
|
|
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe offset: %O", p->temp_file->offset);
|
|
|
|
do {
|
|
bsize = cl->buf->last - cl->buf->pos;
|
|
|
|
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
|
"pipe buf %p, pos %p, size: %z",
|
|
cl->buf->start, cl->buf->pos, bsize);
|
|
|
|
if ((size + bsize > p->temp_file_write_size)
|
|
|| (p->temp_file->offset + size + bsize > p->max_temp_file_size))
|
|
{
|
|
break;
|
|
}
|
|
|
|
size += bsize;
|
|
ll = &cl->next;
|
|
cl = cl->next;
|
|
|
|
} while (cl);
|
|
|
|
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
|
|
|
|
if (ll == NULL) {
|
|
return NGX_BUSY;
|
|
}
|
|
|
|
if (cl) {
|
|
p->in = cl;
|
|
*ll = NULL;
|
|
|
|
} else {
|
|
p->in = NULL;
|
|
p->last_in = &p->in;
|
|
}
|
|
|
|
} else {
|
|
p->in = NULL;
|
|
p->last_in = &p->in;
|
|
}
|
|
|
|
if (ngx_write_chain_to_temp_file(p->temp_file, out) == NGX_ERROR) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
for (last_free = &p->free_raw_bufs;
|
|
*last_free != NULL;
|
|
last_free = &(*last_free)->next)
|
|
{
|
|
/* void */
|
|
}
|
|
|
|
if (p->buf_to_file) {
|
|
p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
|
|
p->buf_to_file = NULL;
|
|
out = out->next;
|
|
}
|
|
|
|
for (cl = out; cl; cl = next) {
|
|
next = cl->next;
|
|
cl->next = NULL;
|
|
|
|
b = cl->buf;
|
|
b->file = &p->temp_file->file;
|
|
b->file_pos = p->temp_file->offset;
|
|
p->temp_file->offset += b->last - b->pos;
|
|
b->file_last = p->temp_file->offset;
|
|
|
|
b->in_file = 1;
|
|
b->temp_file = 1;
|
|
|
|
if (p->out) {
|
|
*p->last_out = cl;
|
|
} else {
|
|
p->out = cl;
|
|
}
|
|
p->last_out = &cl->next;
|
|
|
|
if (b->last_shadow) {
|
|
|
|
tl = ngx_alloc_chain_link(p->pool);
|
|
if (tl == NULL) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
tl->buf = b->shadow;
|
|
tl->next = NULL;
|
|
|
|
*last_free = tl;
|
|
last_free = &tl->next;
|
|
|
|
b->shadow->pos = b->shadow->start;
|
|
b->shadow->last = b->shadow->start;
|
|
|
|
ngx_event_pipe_remove_shadow_links(b->shadow);
|
|
}
|
|
}
|
|
|
|
return NGX_OK;
|
|
}
|
|
|
|
|
|
/* the copy input filter */
|
|
|
|
ngx_int_t
|
|
ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|
{
|
|
ngx_buf_t *b;
|
|
ngx_chain_t *cl;
|
|
|
|
if (buf->pos == buf->last) {
|
|
return NGX_OK;
|
|
}
|
|
|
|
if (p->free) {
|
|
cl = p->free;
|
|
b = cl->buf;
|
|
p->free = cl->next;
|
|
ngx_free_chain(p->pool, cl);
|
|
|
|
} else {
|
|
b = ngx_alloc_buf(p->pool);
|
|
if (b == NULL) {
|
|
return NGX_ERROR;
|
|
}
|
|
}
|
|
|
|
ngx_memcpy(b, buf, sizeof(ngx_buf_t));
|
|
b->shadow = buf;
|
|
b->tag = p->tag;
|
|
b->last_shadow = 1;
|
|
b->recycled = 1;
|
|
buf->shadow = b;
|
|
|
|
cl = ngx_alloc_chain_link(p->pool);
|
|
if (cl == NULL) {
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
cl->buf = b;
|
|
cl->next = NULL;
|
|
|
|
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
|
|
|
|
if (p->in) {
|
|
*p->last_in = cl;
|
|
} else {
|
|
p->in = cl;
|
|
}
|
|
p->last_in = &cl->next;
|
|
|
|
if (p->length == -1) {
|
|
return NGX_OK;
|
|
}
|
|
|
|
p->length -= b->last - b->pos;
|
|
|
|
return NGX_OK;
|
|
}
|
|
|
|
|
|
static ngx_inline void
|
|
ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
|
|
{
|
|
ngx_buf_t *b, *next;
|
|
|
|
b = buf->shadow;
|
|
|
|
if (b == NULL) {
|
|
return;
|
|
}
|
|
|
|
while (!b->last_shadow) {
|
|
next = b->shadow;
|
|
|
|
b->temporary = 0;
|
|
b->recycled = 0;
|
|
|
|
b->shadow = NULL;
|
|
b = next;
|
|
}
|
|
|
|
b->temporary = 0;
|
|
b->recycled = 0;
|
|
b->last_shadow = 0;
|
|
|
|
b->shadow = NULL;
|
|
|
|
buf->shadow = NULL;
|
|
}
|
|
|
|
|
|
static ngx_inline void
|
|
ngx_event_pipe_free_shadow_raw_buf(ngx_chain_t **free, ngx_buf_t *buf)
|
|
{
|
|
ngx_buf_t *s;
|
|
ngx_chain_t *cl, **ll;
|
|
|
|
if (buf->shadow == NULL) {
|
|
return;
|
|
}
|
|
|
|
for (s = buf->shadow; !s->last_shadow; s = s->shadow) { /* void */ }
|
|
|
|
ll = free;
|
|
|
|
for (cl = *free; cl; cl = cl->next) {
|
|
if (cl->buf == s) {
|
|
*ll = cl->next;
|
|
break;
|
|
}
|
|
|
|
if (cl->buf->shadow) {
|
|
break;
|
|
}
|
|
|
|
ll = &cl->next;
|
|
}
|
|
}
|
|
|
|
|
|
ngx_int_t
|
|
ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
|
|
{
|
|
ngx_chain_t *cl;
|
|
|
|
cl = ngx_alloc_chain_link(p->pool);
|
|
if (cl == NULL) {
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
b->pos = b->start;
|
|
b->last = b->start;
|
|
b->shadow = NULL;
|
|
|
|
cl->buf = b;
|
|
|
|
if (p->free_raw_bufs == NULL) {
|
|
p->free_raw_bufs = cl;
|
|
cl->next = NULL;
|
|
|
|
return NGX_OK;
|
|
}
|
|
|
|
if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
|
|
|
|
/* add the free buf to the list start */
|
|
|
|
cl->next = p->free_raw_bufs;
|
|
p->free_raw_bufs = cl;
|
|
|
|
return NGX_OK;
|
|
}
|
|
|
|
/* the first free buf is partialy filled, thus add the free buf after it */
|
|
|
|
cl->next = p->free_raw_bufs->next;
|
|
p->free_raw_bufs->next = cl;
|
|
|
|
return NGX_OK;
|
|
}
|
|
|
|
|
|
static ngx_int_t
|
|
ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
|
|
{
|
|
ngx_chain_t *cl, *tl;
|
|
|
|
for ( ;; ) {
|
|
if (p->busy) {
|
|
cl = p->busy;
|
|
p->busy = NULL;
|
|
|
|
} else if (p->out) {
|
|
cl = p->out;
|
|
p->out = NULL;
|
|
|
|
} else if (p->in) {
|
|
cl = p->in;
|
|
p->in = NULL;
|
|
|
|
} else {
|
|
return NGX_OK;
|
|
}
|
|
|
|
while (cl) {
|
|
if (cl->buf->last_shadow) {
|
|
if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
|
|
return NGX_ABORT;
|
|
}
|
|
|
|
cl->buf->last_shadow = 0;
|
|
}
|
|
|
|
cl->buf->shadow = NULL;
|
|
tl = cl->next;
|
|
cl->next = p->free;
|
|
p->free = cl;
|
|
cl = tl;
|
|
}
|
|
}
|
|
}
|