nginx/src/event/ngx_event_pipe.c
Maxim Dounin e66073c4d3 Upstream: fixed cache corruption and socket leaks with aio_write.
The ngx_event_pipe() function wasn't called on write events with
wev->delayed set.  As a result, threaded writing results weren't
properly collected in ngx_event_pipe_write_to_downstream() when a
write event was triggered for a completed write.

Further, this wasn't detected, as p->aio was reset by a thread completion
handler, and results were later collected in ngx_event_pipe_read_upstream()
instead of scheduling a new write of additional data.  If this happened
on the last reading from an upstream, last part of the response was never
written to the cache file.

Similar problems might also happen in case of timeouts when writing to
client, as this also results in ngx_event_pipe() not being called on write
events.  In this scenario socket leaks were observed.

Fix is to check if p->writing is set in ngx_event_pipe_read_upstream(), and
therefore collect results of previous write operations in case of read events
as well, similar to how we do so in ngx_event_pipe_write_downstream().
This is enough to fix the wev->delayed case.  Additionally, we now call
ngx_event_pipe() from ngx_http_upstream_process_request() if there are
uncollected write operations (p->writing and !p->aio).  This also fixes
the wev->timedout case.
2017-01-20 21:14:19 +03:00

1111 lines
27 KiB
C

/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_event.h>
#include <ngx_event_pipe.h>
static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
ngx_int_t
ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{
ngx_int_t rc;
ngx_uint_t flags;
ngx_event_t *rev, *wev;
for ( ;; ) {
if (do_write) {
p->log->action = "sending to client";
rc = ngx_event_pipe_write_to_downstream(p);
if (rc == NGX_ABORT) {
return NGX_ABORT;
}
if (rc == NGX_BUSY) {
return NGX_OK;
}
}
p->read = 0;
p->upstream_blocked = 0;
p->log->action = "reading upstream";
if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
return NGX_ABORT;
}
if (!p->read && !p->upstream_blocked) {
break;
}
do_write = 1;
}
if (p->upstream->fd != (ngx_socket_t) -1) {
rev = p->upstream->read;
flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
if (ngx_handle_read_event(rev, flags) != NGX_OK) {
return NGX_ABORT;
}
if (!rev->delayed) {
if (rev->active && !rev->ready) {
ngx_add_timer(rev, p->read_timeout);
} else if (rev->timer_set) {
ngx_del_timer(rev);
}
}
}
if (p->downstream->fd != (ngx_socket_t) -1
&& p->downstream->data == p->output_ctx)
{
wev = p->downstream->write;
if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
return NGX_ABORT;
}
if (!wev->delayed) {
if (wev->active && !wev->ready) {
ngx_add_timer(wev, p->send_timeout);
} else if (wev->timer_set) {
ngx_del_timer(wev);
}
}
}
return NGX_OK;
}
static ngx_int_t
ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
{
off_t limit;
ssize_t n, size;
ngx_int_t rc;
ngx_buf_t *b;
ngx_msec_t delay;
ngx_chain_t *chain, *cl, *ln;
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
return NGX_OK;
}
#if (NGX_THREADS)
if (p->aio) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe read upstream: aio");
return NGX_AGAIN;
}
if (p->writing) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe read upstream: writing");
rc = ngx_event_pipe_write_chain_to_temp_file(p);
if (rc != NGX_OK) {
return rc;
}
}
#endif
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe read upstream: %d", p->upstream->read->ready);
for ( ;; ) {
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
break;
}
if (p->preread_bufs == NULL && !p->upstream->read->ready) {
break;
}
if (p->preread_bufs) {
/* use the pre-read bufs if they exist */
chain = p->preread_bufs;
p->preread_bufs = NULL;
n = p->preread_size;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe preread: %z", n);
if (n) {
p->read = 1;
}
} else {
#if (NGX_HAVE_KQUEUE)
/*
* kqueue notifies about the end of file or a pending error.
* This test allows not to allocate a buf on these conditions
* and not to call c->recv_chain().
*/
if (p->upstream->read->available == 0
&& p->upstream->read->pending_eof)
{
p->upstream->read->ready = 0;
p->upstream->read->eof = 1;
p->upstream_eof = 1;
p->read = 1;
if (p->upstream->read->kq_errno) {
p->upstream->read->error = 1;
p->upstream_error = 1;
p->upstream_eof = 0;
ngx_log_error(NGX_LOG_ERR, p->log,
p->upstream->read->kq_errno,
"kevent() reported that upstream "
"closed connection");
}
break;
}
#endif
if (p->limit_rate) {
if (p->upstream->read->delayed) {
break;
}
limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
- p->read_length;
if (limit <= 0) {
p->upstream->read->delayed = 1;
delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);
ngx_add_timer(p->upstream->read, delay);
break;
}
} else {
limit = 0;
}
if (p->free_raw_bufs) {
/* use the free bufs if they exist */
chain = p->free_raw_bufs;
if (p->single_buf) {
p->free_raw_bufs = p->free_raw_bufs->next;
chain->next = NULL;
} else {
p->free_raw_bufs = NULL;
}
} else if (p->allocated < p->bufs.num) {
/* allocate a new buf if it's still allowed */
b = ngx_create_temp_buf(p->pool, p->bufs.size);
if (b == NULL) {
return NGX_ABORT;
}
p->allocated++;
chain = ngx_alloc_chain_link(p->pool);
if (chain == NULL) {
return NGX_ABORT;
}
chain->buf = b;
chain->next = NULL;
} else if (!p->cacheable
&& p->downstream->data == p->output_ctx
&& p->downstream->write->ready
&& !p->downstream->write->delayed)
{
/*
* if the bufs are not needed to be saved in a cache and
* a downstream is ready then write the bufs to a downstream
*/
p->upstream_blocked = 1;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe downstream ready");
break;
} else if (p->cacheable
|| p->temp_file->offset < p->max_temp_file_size)
{
/*
* if it is allowed, then save some bufs from p->in
* to a temporary file, and add them to a p->out chain
*/
rc = ngx_event_pipe_write_chain_to_temp_file(p);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe temp offset: %O", p->temp_file->offset);
if (rc == NGX_BUSY) {
break;
}
if (rc != NGX_OK) {
return rc;
}
chain = p->free_raw_bufs;
if (p->single_buf) {
p->free_raw_bufs = p->free_raw_bufs->next;
chain->next = NULL;
} else {
p->free_raw_bufs = NULL;
}
} else {
/* there are no bufs to read in */
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"no pipe bufs to read in");
break;
}
n = p->upstream->recv_chain(p->upstream, chain, limit);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe recv chain: %z", n);
if (p->free_raw_bufs) {
chain->next = p->free_raw_bufs;
}
p->free_raw_bufs = chain;
if (n == NGX_ERROR) {
p->upstream_error = 1;
break;
}
if (n == NGX_AGAIN) {
if (p->single_buf) {
ngx_event_pipe_remove_shadow_links(chain->buf);
}
break;
}
p->read = 1;
if (n == 0) {
p->upstream_eof = 1;
break;
}
}
delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;
p->read_length += n;
cl = chain;
p->free_raw_bufs = NULL;
while (cl && n > 0) {
ngx_event_pipe_remove_shadow_links(cl->buf);
size = cl->buf->end - cl->buf->last;
if (n >= size) {
cl->buf->last = cl->buf->end;
/* STUB */ cl->buf->num = p->num++;
if (p->input_filter(p, cl->buf) == NGX_ERROR) {
return NGX_ABORT;
}
n -= size;
ln = cl;
cl = cl->next;
ngx_free_chain(p->pool, ln);
} else {
cl->buf->last += n;
n = 0;
}
}
if (cl) {
for (ln = cl; ln->next; ln = ln->next) { /* void */ }
ln->next = p->free_raw_bufs;
p->free_raw_bufs = cl;
}
if (delay > 0) {
p->upstream->read->delayed = 1;
ngx_add_timer(p->upstream->read, delay);
break;
}
}
#if (NGX_DEBUG)
for (cl = p->busy; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf busy s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}
for (cl = p->out; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf out s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}
for (cl = p->in; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf in s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}
for (cl = p->free_raw_bufs; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf free s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe length: %O", p->length);
#endif
if (p->free_raw_bufs && p->length != -1) {
cl = p->free_raw_bufs;
if (cl->buf->last - cl->buf->pos >= p->length) {
p->free_raw_bufs = cl->next;
/* STUB */ cl->buf->num = p->num++;
if (p->input_filter(p, cl->buf) == NGX_ERROR) {
return NGX_ABORT;
}
ngx_free_chain(p->pool, cl);
}
}
if (p->length == 0) {
p->upstream_done = 1;
p->read = 1;
}
if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
/* STUB */ p->free_raw_bufs->buf->num = p->num++;
if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
return NGX_ABORT;
}
p->free_raw_bufs = p->free_raw_bufs->next;
if (p->free_bufs && p->buf_to_file == NULL) {
for (cl = p->free_raw_bufs; cl; cl = cl->next) {
if (cl->buf->shadow == NULL) {
ngx_pfree(p->pool, cl->buf->start);
}
}
}
}
if (p->cacheable && (p->in || p->buf_to_file)) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write chain");
rc = ngx_event_pipe_write_chain_to_temp_file(p);
if (rc != NGX_OK) {
return rc;
}
}
return NGX_OK;
}
static ngx_int_t
ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
{
u_char *prev;
size_t bsize;
ngx_int_t rc;
ngx_uint_t flush, flushed, prev_last_shadow;
ngx_chain_t *out, **ll, *cl;
ngx_connection_t *downstream;
downstream = p->downstream;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream: %d", downstream->write->ready);
#if (NGX_THREADS)
if (p->writing) {
rc = ngx_event_pipe_write_chain_to_temp_file(p);
if (rc == NGX_ABORT) {
return NGX_ABORT;
}
}
#endif
flushed = 0;
for ( ;; ) {
if (p->downstream_error) {
return ngx_event_pipe_drain_chains(p);
}
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
/* pass the p->out and p->in chains to the output filter */
for (cl = p->busy; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
if (p->out) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush out");
for (cl = p->out; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->out);
if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}
p->out = NULL;
}
if (p->writing) {
break;
}
if (p->in) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush in");
for (cl = p->in; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->in);
if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}
p->in = NULL;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream done");
/* TODO: free unused bufs */
p->downstream_done = 1;
break;
}
if (downstream->data != p->output_ctx
|| !downstream->write->ready
|| downstream->write->delayed)
{
break;
}
/* bsize is the size of the busy recycled bufs */
prev = NULL;
bsize = 0;
for (cl = p->busy; cl; cl = cl->next) {
if (cl->buf->recycled) {
if (prev == cl->buf->start) {
continue;
}
bsize += cl->buf->end - cl->buf->start;
prev = cl->buf->start;
}
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write busy: %uz", bsize);
out = NULL;
if (bsize >= (size_t) p->busy_size) {
flush = 1;
goto flush;
}
flush = 0;
ll = NULL;
prev_last_shadow = 1;
for ( ;; ) {
if (p->out) {
cl = p->out;
if (cl->buf->recycled) {
ngx_log_error(NGX_LOG_ALERT, p->log, 0,
"recycled buffer in pipe out chain");
}
p->out = p->out->next;
} else if (!p->cacheable && !p->writing && p->in) {
cl = p->in;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write buf ls:%d %p %z",
cl->buf->last_shadow,
cl->buf->pos,
cl->buf->last - cl->buf->pos);
if (cl->buf->recycled && prev_last_shadow) {
if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
flush = 1;
break;
}
bsize += cl->buf->end - cl->buf->start;
}
prev_last_shadow = cl->buf->last_shadow;
p->in = p->in->next;
} else {
break;
}
cl->next = NULL;
if (out) {
*ll = cl;
} else {
out = cl;
}
ll = &cl->next;
}
flush:
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write: out:%p, f:%ui", out, flush);
if (out == NULL) {
if (!flush) {
break;
}
/* a workaround for AIO */
if (flushed++ > 10) {
return NGX_BUSY;
}
}
rc = p->output_filter(p->output_ctx, out);
ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}
for (cl = p->free; cl; cl = cl->next) {
if (cl->buf->temp_file) {
if (p->cacheable || !p->cyclic_temp_file) {
continue;
}
/* reset p->temp_offset if all bufs had been sent */
if (cl->buf->file_last == p->temp_file->offset) {
p->temp_file->offset = 0;
}
}
/* TODO: free buf if p->free_bufs && upstream done */
/* add the free shadow raw buf to p->free_raw_bufs */
if (cl->buf->last_shadow) {
if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
return NGX_ABORT;
}
cl->buf->last_shadow = 0;
}
cl->buf->shadow = NULL;
}
}
return NGX_OK;
}
static ngx_int_t
ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
{
ssize_t size, bsize, n;
ngx_buf_t *b;
ngx_uint_t prev_last_shadow;
ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free;
#if (NGX_THREADS)
if (p->writing) {
if (p->aio) {
return NGX_AGAIN;
}
out = p->writing;
p->writing = NULL;
n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
if (n == NGX_ERROR) {
return NGX_ABORT;
}
goto done;
}
#endif
if (p->buf_to_file) {
out = ngx_alloc_chain_link(p->pool);
if (out == NULL) {
return NGX_ABORT;
}
out->buf = p->buf_to_file;
out->next = p->in;
} else {
out = p->in;
}
if (!p->cacheable) {
size = 0;
cl = out;
ll = NULL;
prev_last_shadow = 1;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe offset: %O", p->temp_file->offset);
do {
bsize = cl->buf->last - cl->buf->pos;
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf ls:%d %p, pos %p, size: %z",
cl->buf->last_shadow, cl->buf->start,
cl->buf->pos, bsize);
if (prev_last_shadow
&& ((size + bsize > p->temp_file_write_size)
|| (p->temp_file->offset + size + bsize
> p->max_temp_file_size)))
{
break;
}
prev_last_shadow = cl->buf->last_shadow;
size += bsize;
ll = &cl->next;
cl = cl->next;
} while (cl);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
if (ll == NULL) {
return NGX_BUSY;
}
if (cl) {
p->in = cl;
*ll = NULL;
} else {
p->in = NULL;
p->last_in = &p->in;
}
} else {
p->in = NULL;
p->last_in = &p->in;
}
#if (NGX_THREADS)
if (p->thread_handler) {
p->temp_file->thread_write = 1;
p->temp_file->file.thread_task = p->thread_task;
p->temp_file->file.thread_handler = p->thread_handler;
p->temp_file->file.thread_ctx = p->thread_ctx;
}
#endif
n = ngx_write_chain_to_temp_file(p->temp_file, out);
if (n == NGX_ERROR) {
return NGX_ABORT;
}
#if (NGX_THREADS)
if (n == NGX_AGAIN) {
p->writing = out;
p->thread_task = p->temp_file->file.thread_task;
return NGX_AGAIN;
}
done:
#endif
if (p->buf_to_file) {
p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
n -= p->buf_to_file->last - p->buf_to_file->pos;
p->buf_to_file = NULL;
out = out->next;
}
if (n > 0) {
/* update previous buffer or add new buffer */
if (p->out) {
for (cl = p->out; cl->next; cl = cl->next) { /* void */ }
b = cl->buf;
if (b->file_last == p->temp_file->offset) {
p->temp_file->offset += n;
b->file_last = p->temp_file->offset;
goto free;
}
last_out = &cl->next;
} else {
last_out = &p->out;
}
cl = ngx_chain_get_free_buf(p->pool, &p->free);
if (cl == NULL) {
return NGX_ABORT;
}
b = cl->buf;
ngx_memzero(b, sizeof(ngx_buf_t));
b->tag = p->tag;
b->file = &p->temp_file->file;
b->file_pos = p->temp_file->offset;
p->temp_file->offset += n;
b->file_last = p->temp_file->offset;
b->in_file = 1;
b->temp_file = 1;
*last_out = cl;
}
free:
for (last_free = &p->free_raw_bufs;
*last_free != NULL;
last_free = &(*last_free)->next)
{
/* void */
}
for (cl = out; cl; cl = next) {
next = cl->next;
cl->next = p->free;
p->free = cl;
b = cl->buf;
if (b->last_shadow) {
tl = ngx_alloc_chain_link(p->pool);
if (tl == NULL) {
return NGX_ABORT;
}
tl->buf = b->shadow;
tl->next = NULL;
*last_free = tl;
last_free = &tl->next;
b->shadow->pos = b->shadow->start;
b->shadow->last = b->shadow->start;
ngx_event_pipe_remove_shadow_links(b->shadow);
}
}
return NGX_OK;
}
/* the copy input filter */
ngx_int_t
ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
{
ngx_buf_t *b;
ngx_chain_t *cl;
if (buf->pos == buf->last) {
return NGX_OK;
}
cl = ngx_chain_get_free_buf(p->pool, &p->free);
if (cl == NULL) {
return NGX_ERROR;
}
b = cl->buf;
ngx_memcpy(b, buf, sizeof(ngx_buf_t));
b->shadow = buf;
b->tag = p->tag;
b->last_shadow = 1;
b->recycled = 1;
buf->shadow = b;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
if (p->in) {
*p->last_in = cl;
} else {
p->in = cl;
}
p->last_in = &cl->next;
if (p->length == -1) {
return NGX_OK;
}
p->length -= b->last - b->pos;
return NGX_OK;
}
static ngx_inline void
ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
{
ngx_buf_t *b, *next;
b = buf->shadow;
if (b == NULL) {
return;
}
while (!b->last_shadow) {
next = b->shadow;
b->temporary = 0;
b->recycled = 0;
b->shadow = NULL;
b = next;
}
b->temporary = 0;
b->recycled = 0;
b->last_shadow = 0;
b->shadow = NULL;
buf->shadow = NULL;
}
ngx_int_t
ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
{
ngx_chain_t *cl;
cl = ngx_alloc_chain_link(p->pool);
if (cl == NULL) {
return NGX_ERROR;
}
if (p->buf_to_file && b->start == p->buf_to_file->start) {
b->pos = p->buf_to_file->last;
b->last = p->buf_to_file->last;
} else {
b->pos = b->start;
b->last = b->start;
}
b->shadow = NULL;
cl->buf = b;
if (p->free_raw_bufs == NULL) {
p->free_raw_bufs = cl;
cl->next = NULL;
return NGX_OK;
}
if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
/* add the free buf to the list start */
cl->next = p->free_raw_bufs;
p->free_raw_bufs = cl;
return NGX_OK;
}
/* the first free buf is partially filled, thus add the free buf after it */
cl->next = p->free_raw_bufs->next;
p->free_raw_bufs->next = cl;
return NGX_OK;
}
static ngx_int_t
ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
{
ngx_chain_t *cl, *tl;
for ( ;; ) {
if (p->busy) {
cl = p->busy;
p->busy = NULL;
} else if (p->out) {
cl = p->out;
p->out = NULL;
} else if (p->in) {
cl = p->in;
p->in = NULL;
} else {
return NGX_OK;
}
while (cl) {
if (cl->buf->last_shadow) {
if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
return NGX_ABORT;
}
cl->buf->last_shadow = 0;
}
cl->buf->shadow = NULL;
tl = cl->next;
cl->next = p->free;
p->free = cl;
cl = tl;
}
}
}