mirror of
https://github.com/nginx/nginx.git
synced 2025-06-06 00:42:40 +08:00
Events: removed broken thread support from posted events.
It's mostly dead code. And the idea of thread support for this task has been deprecated.
This commit is contained in:
parent
3ca3f609cb
commit
2a81e05566
@ -951,40 +951,21 @@ ngx_close_connection(ngx_connection_t *c)
|
||||
* before we clean the connection
|
||||
*/
|
||||
|
||||
ngx_mutex_lock(ngx_posted_events_mutex);
|
||||
|
||||
if (c->read->prev) {
|
||||
ngx_delete_posted_event(c->read);
|
||||
}
|
||||
|
||||
if (c->write->prev) {
|
||||
ngx_delete_posted_event(c->write);
|
||||
}
|
||||
|
||||
c->read->closed = 1;
|
||||
c->write->closed = 1;
|
||||
|
||||
ngx_unlock(&c->lock);
|
||||
c->read->locked = 0;
|
||||
c->write->locked = 0;
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
#else
|
||||
|
||||
if (c->read->prev) {
|
||||
ngx_delete_posted_event(c->read);
|
||||
}
|
||||
|
||||
if (c->write->prev) {
|
||||
ngx_delete_posted_event(c->write);
|
||||
}
|
||||
|
||||
c->read->closed = 1;
|
||||
c->write->closed = 1;
|
||||
|
||||
#endif
|
||||
|
||||
if (c->read->prev) {
|
||||
ngx_delete_posted_event(c->read);
|
||||
}
|
||||
|
||||
if (c->write->prev) {
|
||||
ngx_delete_posted_event(c->write);
|
||||
}
|
||||
|
||||
c->read->closed = 1;
|
||||
c->write->closed = 1;
|
||||
|
||||
ngx_reusable_connection(c, 0);
|
||||
|
||||
log_error = c->log_error;
|
||||
|
@ -3082,17 +3082,6 @@ ngx_udp_connect(ngx_udp_connection_t *uc)
|
||||
|
||||
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
|
||||
|
||||
#if (NGX_THREADS)
|
||||
|
||||
/* TODO: lock event when call completion handler */
|
||||
|
||||
rev->lock = &c->lock;
|
||||
wev->lock = &c->lock;
|
||||
rev->own_lock = &c->lock;
|
||||
wev->own_lock = &c->lock;
|
||||
|
||||
#endif
|
||||
|
||||
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, &uc->log, 0,
|
||||
"connect to %V, fd:%d #%uA", &uc->server, s, c->number);
|
||||
|
||||
|
@ -404,8 +404,6 @@ ngx_devpoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_mutex_lock(ngx_posted_events_mutex);
|
||||
|
||||
for (i = 0; i < events; i++) {
|
||||
|
||||
fd = event_list[i].fd;
|
||||
@ -495,19 +493,13 @@ ngx_devpoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
rev = c->read;
|
||||
|
||||
if ((revents & POLLIN) && rev->active) {
|
||||
|
||||
if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
|
||||
rev->posted_ready = 1;
|
||||
|
||||
} else {
|
||||
rev->ready = 1;
|
||||
}
|
||||
rev->ready = 1;
|
||||
|
||||
if (flags & NGX_POST_EVENTS) {
|
||||
queue = (ngx_event_t **) (rev->accept ?
|
||||
&ngx_posted_accept_events : &ngx_posted_events);
|
||||
queue = rev->accept ? &ngx_posted_accept_events
|
||||
: &ngx_posted_events;
|
||||
|
||||
ngx_locked_post_event(rev, queue);
|
||||
ngx_post_event(rev, queue);
|
||||
|
||||
} else {
|
||||
instance = rev->instance;
|
||||
@ -523,16 +515,10 @@ ngx_devpoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
wev = c->write;
|
||||
|
||||
if ((revents & POLLOUT) && wev->active) {
|
||||
|
||||
if (flags & NGX_POST_THREAD_EVENTS) {
|
||||
wev->posted_ready = 1;
|
||||
|
||||
} else {
|
||||
wev->ready = 1;
|
||||
}
|
||||
wev->ready = 1;
|
||||
|
||||
if (flags & NGX_POST_EVENTS) {
|
||||
ngx_locked_post_event(wev, &ngx_posted_events);
|
||||
ngx_post_event(wev, &ngx_posted_events);
|
||||
|
||||
} else {
|
||||
wev->handler(wev);
|
||||
@ -540,8 +526,6 @@ ngx_devpoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
}
|
||||
}
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
@ -612,8 +612,6 @@ ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_mutex_lock(ngx_posted_events_mutex);
|
||||
|
||||
for (i = 0; i < events; i++) {
|
||||
c = event_list[i].data.ptr;
|
||||
|
||||
@ -674,18 +672,13 @@ ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
|
||||
}
|
||||
#endif
|
||||
|
||||
if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
|
||||
rev->posted_ready = 1;
|
||||
|
||||
} else {
|
||||
rev->ready = 1;
|
||||
}
|
||||
rev->ready = 1;
|
||||
|
||||
if (flags & NGX_POST_EVENTS) {
|
||||
queue = (ngx_event_t **) (rev->accept ?
|
||||
&ngx_posted_accept_events : &ngx_posted_events);
|
||||
queue = rev->accept ? &ngx_posted_accept_events
|
||||
: &ngx_posted_events;
|
||||
|
||||
ngx_locked_post_event(rev, queue);
|
||||
ngx_post_event(rev, queue);
|
||||
|
||||
} else {
|
||||
rev->handler(rev);
|
||||
@ -708,15 +701,10 @@ ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
|
||||
continue;
|
||||
}
|
||||
|
||||
if (flags & NGX_POST_THREAD_EVENTS) {
|
||||
wev->posted_ready = 1;
|
||||
|
||||
} else {
|
||||
wev->ready = 1;
|
||||
}
|
||||
wev->ready = 1;
|
||||
|
||||
if (flags & NGX_POST_EVENTS) {
|
||||
ngx_locked_post_event(wev, &ngx_posted_events);
|
||||
ngx_post_event(wev, &ngx_posted_events);
|
||||
|
||||
} else {
|
||||
wev->handler(wev);
|
||||
@ -724,8 +712,6 @@ ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
|
||||
}
|
||||
}
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
@ -466,8 +466,6 @@ ngx_eventport_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_mutex_lock(ngx_posted_events_mutex);
|
||||
|
||||
for (i = 0; i < events; i++) {
|
||||
|
||||
if (event_list[i].portev_source == PORT_SOURCE_TIMER) {
|
||||
@ -534,19 +532,13 @@ ngx_eventport_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
wev->active = 0;
|
||||
|
||||
if (revents & POLLIN) {
|
||||
|
||||
if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
|
||||
rev->posted_ready = 1;
|
||||
|
||||
} else {
|
||||
rev->ready = 1;
|
||||
}
|
||||
rev->ready = 1;
|
||||
|
||||
if (flags & NGX_POST_EVENTS) {
|
||||
queue = (ngx_event_t **) (rev->accept ?
|
||||
&ngx_posted_accept_events : &ngx_posted_events);
|
||||
queue = rev->accept ? &ngx_posted_accept_events
|
||||
: &ngx_posted_events;
|
||||
|
||||
ngx_locked_post_event(rev, queue);
|
||||
ngx_post_event(rev, queue);
|
||||
|
||||
} else {
|
||||
rev->handler(rev);
|
||||
@ -574,16 +566,10 @@ ngx_eventport_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
}
|
||||
|
||||
if (revents & POLLOUT) {
|
||||
|
||||
if (flags & NGX_POST_THREAD_EVENTS) {
|
||||
wev->posted_ready = 1;
|
||||
|
||||
} else {
|
||||
wev->ready = 1;
|
||||
}
|
||||
wev->ready = 1;
|
||||
|
||||
if (flags & NGX_POST_EVENTS) {
|
||||
ngx_locked_post_event(wev, &ngx_posted_events);
|
||||
ngx_post_event(wev, &ngx_posted_events);
|
||||
|
||||
} else {
|
||||
wev->handler(wev);
|
||||
@ -600,8 +586,6 @@ ngx_eventport_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
}
|
||||
}
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
@ -573,8 +573,6 @@ ngx_kqueue_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_mutex_lock(ngx_posted_events_mutex);
|
||||
|
||||
for (i = 0; i < events; i++) {
|
||||
|
||||
ngx_kqueue_dump_event(cycle->log, &event_list[i]);
|
||||
@ -626,24 +624,6 @@ ngx_kqueue_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
ev->active = 0;
|
||||
}
|
||||
|
||||
#if (NGX_THREADS)
|
||||
|
||||
if ((flags & NGX_POST_THREAD_EVENTS) && !ev->accept) {
|
||||
ev->posted_ready = 1;
|
||||
ev->posted_available = event_list[i].data;
|
||||
|
||||
if (event_list[i].flags & EV_EOF) {
|
||||
ev->posted_eof = 1;
|
||||
ev->posted_errno = event_list[i].fflags;
|
||||
}
|
||||
|
||||
ngx_locked_post_event(ev, &ngx_posted_events);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
ev->available = event_list[i].data;
|
||||
|
||||
if (event_list[i].flags & EV_EOF) {
|
||||
@ -674,9 +654,10 @@ ngx_kqueue_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
}
|
||||
|
||||
if (flags & NGX_POST_EVENTS) {
|
||||
queue = (ngx_event_t **) (ev->accept ? &ngx_posted_accept_events:
|
||||
&ngx_posted_events);
|
||||
ngx_locked_post_event(ev, queue);
|
||||
queue = ev->accept ? &ngx_posted_accept_events
|
||||
: &ngx_posted_events;
|
||||
|
||||
ngx_post_event(ev, queue);
|
||||
|
||||
continue;
|
||||
}
|
||||
@ -684,8 +665,6 @@ ngx_kqueue_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
ev->handler(ev);
|
||||
}
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
@ -297,8 +297,6 @@ ngx_poll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_mutex_lock(ngx_posted_events_mutex);
|
||||
|
||||
for (i = 0; i < nevents && ready; i++) {
|
||||
|
||||
revents = event_list[i].revents;
|
||||
@ -372,31 +370,21 @@ ngx_poll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
|
||||
found = 1;
|
||||
|
||||
ev = c->read;
|
||||
ev->ready = 1;
|
||||
|
||||
if ((flags & NGX_POST_THREAD_EVENTS) && !ev->accept) {
|
||||
ev->posted_ready = 1;
|
||||
queue = ev->accept ? &ngx_posted_accept_events
|
||||
: &ngx_posted_events;
|
||||
|
||||
} else {
|
||||
ev->ready = 1;
|
||||
}
|
||||
|
||||
queue = (ngx_event_t **) (ev->accept ? &ngx_posted_accept_events:
|
||||
&ngx_posted_events);
|
||||
ngx_locked_post_event(ev, queue);
|
||||
ngx_post_event(ev, queue);
|
||||
}
|
||||
|
||||
if ((revents & POLLOUT) && c->write->active) {
|
||||
found = 1;
|
||||
|
||||
ev = c->write;
|
||||
ev->ready = 1;
|
||||
|
||||
if (flags & NGX_POST_THREAD_EVENTS) {
|
||||
ev->posted_ready = 1;
|
||||
|
||||
} else {
|
||||
ev->ready = 1;
|
||||
}
|
||||
|
||||
ngx_locked_post_event(ev, &ngx_posted_events);
|
||||
ngx_post_event(ev, &ngx_posted_events);
|
||||
}
|
||||
|
||||
if (found) {
|
||||
@ -405,8 +393,6 @@ ngx_poll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
|
||||
}
|
||||
}
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
if (ready != 0) {
|
||||
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "poll ready != events");
|
||||
}
|
||||
|
@ -404,10 +404,10 @@ ngx_rtsig_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
|
||||
rev->ready = 1;
|
||||
|
||||
if (flags & NGX_POST_EVENTS) {
|
||||
queue = (ngx_event_t **) (rev->accept ?
|
||||
&ngx_posted_accept_events : &ngx_posted_events);
|
||||
queue = rev->accept ? &ngx_posted_accept_events
|
||||
: &ngx_posted_events;
|
||||
|
||||
ngx_locked_post_event(rev, queue);
|
||||
ngx_post_event(rev, queue);
|
||||
|
||||
} else {
|
||||
rev->handler(rev);
|
||||
@ -421,7 +421,7 @@ ngx_rtsig_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
|
||||
wev->ready = 1;
|
||||
|
||||
if (flags & NGX_POST_EVENTS) {
|
||||
ngx_locked_post_event(wev, &ngx_posted_events);
|
||||
ngx_post_event(wev, &ngx_posted_events);
|
||||
|
||||
} else {
|
||||
wev->handler(wev);
|
||||
@ -554,8 +554,6 @@ ngx_rtsig_process_overflow(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
continue;
|
||||
}
|
||||
|
||||
ngx_mutex_lock(ngx_posted_events_mutex);
|
||||
|
||||
for (i = 0; i < n; i++) {
|
||||
c = cycle->files[overflow_list[i].fd];
|
||||
|
||||
@ -573,18 +571,13 @@ ngx_rtsig_process_overflow(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
{
|
||||
tested++;
|
||||
|
||||
if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
|
||||
rev->posted_ready = 1;
|
||||
|
||||
} else {
|
||||
rev->ready = 1;
|
||||
}
|
||||
rev->ready = 1;
|
||||
|
||||
if (flags & NGX_POST_EVENTS) {
|
||||
queue = (ngx_event_t **) (rev->accept ?
|
||||
&ngx_posted_accept_events : &ngx_posted_events);
|
||||
queue = rev->accept ? &ngx_posted_accept_events
|
||||
: &ngx_posted_events;
|
||||
|
||||
ngx_locked_post_event(rev, queue);
|
||||
ngx_post_event(rev, queue);
|
||||
|
||||
} else {
|
||||
rev->handler(rev);
|
||||
@ -601,15 +594,10 @@ ngx_rtsig_process_overflow(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
{
|
||||
tested++;
|
||||
|
||||
if (flags & NGX_POST_THREAD_EVENTS) {
|
||||
wev->posted_ready = 1;
|
||||
|
||||
} else {
|
||||
wev->ready = 1;
|
||||
}
|
||||
wev->ready = 1;
|
||||
|
||||
if (flags & NGX_POST_EVENTS) {
|
||||
ngx_locked_post_event(wev, &ngx_posted_events);
|
||||
ngx_post_event(wev, &ngx_posted_events);
|
||||
|
||||
} else {
|
||||
wev->handler(wev);
|
||||
@ -617,8 +605,6 @@ ngx_rtsig_process_overflow(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
}
|
||||
}
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
if (tested >= rtscf->overflow_test) {
|
||||
|
||||
if (ngx_linux_rtsig_max) {
|
||||
|
@ -305,8 +305,6 @@ ngx_select_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_mutex_lock(ngx_posted_events_mutex);
|
||||
|
||||
nready = 0;
|
||||
|
||||
for (i = 0; i < nevents; i++) {
|
||||
@ -332,16 +330,15 @@ ngx_select_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
if (found) {
|
||||
ev->ready = 1;
|
||||
|
||||
queue = (ngx_event_t **) (ev->accept ? &ngx_posted_accept_events:
|
||||
&ngx_posted_events);
|
||||
ngx_locked_post_event(ev, queue);
|
||||
queue = ev->accept ? &ngx_posted_accept_events
|
||||
: &ngx_posted_events;
|
||||
|
||||
ngx_post_event(ev, queue);
|
||||
|
||||
nready++;
|
||||
}
|
||||
}
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
if (ready != nready) {
|
||||
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
|
||||
"select ready != events: %d:%d", ready, nready);
|
||||
|
@ -296,8 +296,6 @@ ngx_select_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_mutex_lock(ngx_posted_events_mutex);
|
||||
|
||||
nready = 0;
|
||||
|
||||
for (i = 0; i < nevents; i++) {
|
||||
@ -323,16 +321,15 @@ ngx_select_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
|
||||
if (found) {
|
||||
ev->ready = 1;
|
||||
|
||||
queue = (ngx_event_t **) (ev->accept ? &ngx_posted_accept_events:
|
||||
&ngx_posted_events);
|
||||
ngx_locked_post_event(ev, queue);
|
||||
queue = ev->accept ? &ngx_posted_accept_events
|
||||
: &ngx_posted_events;
|
||||
|
||||
ngx_post_event(ev, queue);
|
||||
|
||||
nready++;
|
||||
}
|
||||
}
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
if (ready != nready) {
|
||||
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
|
||||
"select ready != events: %d:%d", ready, nready);
|
||||
|
@ -268,12 +268,7 @@ ngx_process_events_and_timers(ngx_cycle_t *cycle)
|
||||
"posted events %p", ngx_posted_events);
|
||||
|
||||
if (ngx_posted_events) {
|
||||
if (ngx_threaded) {
|
||||
ngx_wakeup_worker_thread(cycle);
|
||||
|
||||
} else {
|
||||
ngx_event_process_posted(cycle, &ngx_posted_events);
|
||||
}
|
||||
ngx_event_process_posted(cycle, &ngx_posted_events);
|
||||
}
|
||||
}
|
||||
|
||||
@ -617,13 +612,6 @@ ngx_event_process_init(ngx_cycle_t *cycle)
|
||||
|
||||
#endif
|
||||
|
||||
#if (NGX_THREADS)
|
||||
ngx_posted_events_mutex = ngx_mutex_init(cycle->log, 0);
|
||||
if (ngx_posted_events_mutex == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (ngx_event_timer_init(cycle->log) == NGX_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
@ -712,10 +700,6 @@ ngx_event_process_init(ngx_cycle_t *cycle)
|
||||
for (i = 0; i < cycle->connection_n; i++) {
|
||||
rev[i].closed = 1;
|
||||
rev[i].instance = 1;
|
||||
#if (NGX_THREADS)
|
||||
rev[i].lock = &c[i].lock;
|
||||
rev[i].own_lock = &c[i].lock;
|
||||
#endif
|
||||
}
|
||||
|
||||
cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
|
||||
@ -727,10 +711,6 @@ ngx_event_process_init(ngx_cycle_t *cycle)
|
||||
wev = cycle->write_events;
|
||||
for (i = 0; i < cycle->connection_n; i++) {
|
||||
wev[i].closed = 1;
|
||||
#if (NGX_THREADS)
|
||||
wev[i].lock = &c[i].lock;
|
||||
wev[i].own_lock = &c[i].lock;
|
||||
#endif
|
||||
}
|
||||
|
||||
i = cycle->connection_n;
|
||||
|
@ -74,10 +74,6 @@ struct ngx_event_s {
|
||||
/* the pending eof reported by kqueue, epoll or in aio chain operation */
|
||||
unsigned pending_eof:1;
|
||||
|
||||
#if !(NGX_THREADS)
|
||||
unsigned posted_ready:1;
|
||||
#endif
|
||||
|
||||
#if (NGX_WIN32)
|
||||
/* setsockopt(SO_UPDATE_ACCEPT_CONTEXT) was successful */
|
||||
unsigned accept_context_updated:1;
|
||||
@ -135,30 +131,6 @@ struct ngx_event_s {
|
||||
unsigned channel:1;
|
||||
unsigned resolver:1;
|
||||
|
||||
#if (NGX_THREADS)
|
||||
|
||||
unsigned locked:1;
|
||||
|
||||
unsigned posted_ready:1;
|
||||
unsigned posted_timedout:1;
|
||||
unsigned posted_eof:1;
|
||||
|
||||
#if (NGX_HAVE_KQUEUE)
|
||||
/* the pending errno reported by kqueue */
|
||||
int posted_errno;
|
||||
#endif
|
||||
|
||||
#if (NGX_HAVE_KQUEUE) || (NGX_HAVE_IOCP)
|
||||
int posted_available;
|
||||
#else
|
||||
unsigned posted_available:1;
|
||||
#endif
|
||||
|
||||
ngx_atomic_t *lock;
|
||||
ngx_atomic_t *own_lock;
|
||||
|
||||
#endif
|
||||
|
||||
/* the links of the posted queue */
|
||||
ngx_event_t *next;
|
||||
ngx_event_t **prev;
|
||||
@ -519,7 +491,6 @@ extern ngx_atomic_t *ngx_stat_waiting;
|
||||
|
||||
#define NGX_UPDATE_TIME 1
|
||||
#define NGX_POST_EVENTS 2
|
||||
#define NGX_POST_THREAD_EVENTS 4
|
||||
|
||||
|
||||
extern sig_atomic_t ngx_event_timer_alarm;
|
||||
|
@ -262,13 +262,6 @@ ngx_event_accept(ngx_event_t *ev)
|
||||
(void) ngx_atomic_fetch_add(ngx_stat_handled, 1);
|
||||
#endif
|
||||
|
||||
#if (NGX_THREADS)
|
||||
rev->lock = &c->lock;
|
||||
wev->lock = &c->lock;
|
||||
rev->own_lock = &c->lock;
|
||||
wev->own_lock = &c->lock;
|
||||
#endif
|
||||
|
||||
if (ls->addr_ntop) {
|
||||
c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
|
||||
if (c->addr_text.data == NULL) {
|
||||
|
@ -104,17 +104,6 @@ ngx_event_connect_peer(ngx_peer_connection_t *pc)
|
||||
|
||||
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
|
||||
|
||||
#if (NGX_THREADS)
|
||||
|
||||
/* TODO: lock event when call completion handler */
|
||||
|
||||
rev->lock = pc->lock;
|
||||
wev->lock = pc->lock;
|
||||
rev->own_lock = &c->lock;
|
||||
wev->own_lock = &c->lock;
|
||||
|
||||
#endif
|
||||
|
||||
if (ngx_add_conn) {
|
||||
if (ngx_add_conn(c) == NGX_ERROR) {
|
||||
goto failed;
|
||||
|
@ -62,7 +62,7 @@ ngx_int_t ngx_event_mutex_unlock(ngx_event_mutex_t *m, ngx_log_t *log)
|
||||
ev = m->events;
|
||||
m->events = ev->next;
|
||||
|
||||
ev->next = (ngx_event_t *) ngx_posted_events;
|
||||
ev->next = ngx_posted_events;
|
||||
ngx_posted_events = ev;
|
||||
}
|
||||
|
||||
|
@ -10,23 +10,18 @@
|
||||
#include <ngx_event.h>
|
||||
|
||||
|
||||
ngx_thread_volatile ngx_event_t *ngx_posted_accept_events;
|
||||
ngx_thread_volatile ngx_event_t *ngx_posted_events;
|
||||
|
||||
#if (NGX_THREADS)
|
||||
ngx_mutex_t *ngx_posted_events_mutex;
|
||||
#endif
|
||||
ngx_event_t *ngx_posted_accept_events;
|
||||
ngx_event_t *ngx_posted_events;
|
||||
|
||||
|
||||
void
|
||||
ngx_event_process_posted(ngx_cycle_t *cycle,
|
||||
ngx_thread_volatile ngx_event_t **posted)
|
||||
ngx_event_process_posted(ngx_cycle_t *cycle, ngx_event_t **posted)
|
||||
{
|
||||
ngx_event_t *ev;
|
||||
|
||||
for ( ;; ) {
|
||||
|
||||
ev = (ngx_event_t *) *posted;
|
||||
ev = *posted;
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
|
||||
"posted event %p", ev);
|
||||
@ -40,134 +35,3 @@ ngx_event_process_posted(ngx_cycle_t *cycle,
|
||||
ev->handler(ev);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#if (NGX_THREADS) && !(NGX_WIN32)
|
||||
|
||||
void
|
||||
ngx_wakeup_worker_thread(ngx_cycle_t *cycle)
|
||||
{
|
||||
ngx_int_t i;
|
||||
#if 0
|
||||
ngx_uint_t busy;
|
||||
ngx_event_t *ev;
|
||||
|
||||
busy = 1;
|
||||
|
||||
if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (ev = (ngx_event_t *) ngx_posted_events; ev; ev = ev->next) {
|
||||
if (*(ev->lock) == 0) {
|
||||
busy = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
if (busy) {
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
for (i = 0; i < ngx_threads_n; i++) {
|
||||
if (ngx_threads[i].state == NGX_THREAD_FREE) {
|
||||
ngx_cond_signal(ngx_threads[i].cv);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ngx_int_t
|
||||
ngx_event_thread_process_posted(ngx_cycle_t *cycle)
|
||||
{
|
||||
ngx_event_t *ev;
|
||||
|
||||
for ( ;; ) {
|
||||
|
||||
ev = (ngx_event_t *) ngx_posted_events;
|
||||
|
||||
for ( ;; ) {
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
|
||||
"posted event %p", ev);
|
||||
|
||||
if (ev == NULL) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
if (ngx_trylock(ev->lock) == 0) {
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
|
||||
"posted event %p is busy", ev);
|
||||
|
||||
ev = ev->next;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ev->lock != ev->own_lock) {
|
||||
if (*(ev->own_lock)) {
|
||||
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
|
||||
"the own lock of the posted event %p is busy", ev);
|
||||
ngx_unlock(ev->lock);
|
||||
ev = ev->next;
|
||||
continue;
|
||||
}
|
||||
*(ev->own_lock) = 1;
|
||||
}
|
||||
|
||||
ngx_delete_posted_event(ev);
|
||||
|
||||
ev->locked = 1;
|
||||
|
||||
ev->ready |= ev->posted_ready;
|
||||
ev->timedout |= ev->posted_timedout;
|
||||
ev->pending_eof |= ev->posted_eof;
|
||||
#if (NGX_HAVE_KQUEUE)
|
||||
ev->kq_errno |= ev->posted_errno;
|
||||
#endif
|
||||
if (ev->posted_available) {
|
||||
ev->available = ev->posted_available;
|
||||
}
|
||||
|
||||
ev->posted_ready = 0;
|
||||
ev->posted_timedout = 0;
|
||||
ev->posted_eof = 0;
|
||||
#if (NGX_HAVE_KQUEUE)
|
||||
ev->posted_errno = 0;
|
||||
#endif
|
||||
ev->posted_available = 0;
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
ev->handler(ev);
|
||||
|
||||
ngx_mutex_lock(ngx_posted_events_mutex);
|
||||
|
||||
if (ev->locked) {
|
||||
ngx_unlock(ev->lock);
|
||||
|
||||
if (ev->lock != ev->own_lock) {
|
||||
ngx_unlock(ev->own_lock);
|
||||
}
|
||||
}
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
|
||||
"posted event %p is done", ev);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
void
|
||||
ngx_wakeup_worker_thread(ngx_cycle_t *cycle)
|
||||
{
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -14,16 +14,11 @@
|
||||
#include <ngx_event.h>
|
||||
|
||||
|
||||
#if (NGX_THREADS)
|
||||
extern ngx_mutex_t *ngx_posted_events_mutex;
|
||||
#endif
|
||||
|
||||
|
||||
#define ngx_locked_post_event(ev, queue) \
|
||||
#define ngx_post_event(ev, queue) \
|
||||
\
|
||||
if (ev->prev == NULL) { \
|
||||
ev->next = (ngx_event_t *) *queue; \
|
||||
ev->prev = (ngx_event_t **) queue; \
|
||||
ev->next = *queue; \
|
||||
ev->prev = queue; \
|
||||
*queue = ev; \
|
||||
\
|
||||
if (ev->next) { \
|
||||
@ -38,13 +33,6 @@ extern ngx_mutex_t *ngx_posted_events_mutex;
|
||||
}
|
||||
|
||||
|
||||
#define ngx_post_event(ev, queue) \
|
||||
\
|
||||
ngx_mutex_lock(ngx_posted_events_mutex); \
|
||||
ngx_locked_post_event(ev, queue); \
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
|
||||
#define ngx_delete_posted_event(ev) \
|
||||
\
|
||||
*(ev->prev) = ev->next; \
|
||||
@ -59,17 +47,11 @@ extern ngx_mutex_t *ngx_posted_events_mutex;
|
||||
|
||||
|
||||
|
||||
void ngx_event_process_posted(ngx_cycle_t *cycle,
|
||||
ngx_thread_volatile ngx_event_t **posted);
|
||||
void ngx_wakeup_worker_thread(ngx_cycle_t *cycle);
|
||||
|
||||
#if (NGX_THREADS)
|
||||
ngx_int_t ngx_event_thread_process_posted(ngx_cycle_t *cycle);
|
||||
#endif
|
||||
void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_event_t **posted);
|
||||
|
||||
|
||||
extern ngx_thread_volatile ngx_event_t *ngx_posted_accept_events;
|
||||
extern ngx_thread_volatile ngx_event_t *ngx_posted_events;
|
||||
extern ngx_event_t *ngx_posted_accept_events;
|
||||
extern ngx_event_t *ngx_posted_events;
|
||||
|
||||
|
||||
#endif /* _NGX_EVENT_POSTED_H_INCLUDED_ */
|
||||
|
@ -98,24 +98,6 @@ ngx_event_expire_timers(void)
|
||||
if ((ngx_msec_int_t) (node->key - ngx_current_msec) <= 0) {
|
||||
ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer));
|
||||
|
||||
#if (NGX_THREADS)
|
||||
|
||||
if (ngx_threaded && ngx_trylock(ev->lock) == 0) {
|
||||
|
||||
/*
|
||||
* We cannot change the timer of the event that is being
|
||||
* handled by another thread. And we cannot easy walk
|
||||
* the rbtree to find next expired timer so we exit the loop.
|
||||
* However, it should be a rare case when the event that is
|
||||
* being handled has an expired timer.
|
||||
*/
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0,
|
||||
"event %p is busy in expire timers", ev);
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
|
||||
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
|
||||
"event timer del: %d: %M",
|
||||
ngx_event_ident(ev->data), ev->timer.key);
|
||||
@ -132,18 +114,6 @@ ngx_event_expire_timers(void)
|
||||
|
||||
ev->timer_set = 0;
|
||||
|
||||
#if (NGX_THREADS)
|
||||
if (ngx_threaded) {
|
||||
ev->posted_timedout = 1;
|
||||
|
||||
ngx_post_event(ev, &ngx_posted_events);
|
||||
|
||||
ngx_unlock(ev->lock);
|
||||
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
ev->timedout = 1;
|
||||
|
||||
ev->handler(ev);
|
||||
|
@ -216,13 +216,6 @@ ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event,
|
||||
rev->log = cycle->log;
|
||||
wev->log = cycle->log;
|
||||
|
||||
#if (NGX_THREADS)
|
||||
rev->lock = &c->lock;
|
||||
wev->lock = &c->lock;
|
||||
rev->own_lock = &c->lock;
|
||||
wev->own_lock = &c->lock;
|
||||
#endif
|
||||
|
||||
rev->channel = 1;
|
||||
wev->channel = 1;
|
||||
|
||||
|
@ -1214,7 +1214,6 @@ ngx_wakeup_worker_threads(ngx_cycle_t *cycle)
|
||||
/* STUB */
|
||||
ngx_done_events(cycle);
|
||||
ngx_mutex_destroy(ngx_event_timer_mutex);
|
||||
ngx_mutex_destroy(ngx_posted_events_mutex);
|
||||
|
||||
return;
|
||||
}
|
||||
@ -1265,20 +1264,18 @@ ngx_worker_thread_cycle(void *data)
|
||||
return (ngx_thread_value_t) 1;
|
||||
}
|
||||
|
||||
ngx_mutex_lock(ngx_posted_events_mutex);
|
||||
|
||||
for ( ;; ) {
|
||||
thr->state = NGX_THREAD_FREE;
|
||||
|
||||
#if 0
|
||||
if (ngx_cond_wait(thr->cv, ngx_posted_events_mutex) == NGX_ERROR) {
|
||||
return (ngx_thread_value_t) 1;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (ngx_terminate) {
|
||||
thr->state = NGX_THREAD_EXIT;
|
||||
|
||||
ngx_mutex_unlock(ngx_posted_events_mutex);
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_CORE, cycle->log, 0,
|
||||
"thread " NGX_TID_T_FMT " is done",
|
||||
ngx_thread_self());
|
||||
@ -1288,6 +1285,7 @@ ngx_worker_thread_cycle(void *data)
|
||||
|
||||
thr->state = NGX_THREAD_BUSY;
|
||||
|
||||
#if 0
|
||||
if (ngx_event_thread_process_posted(cycle) == NGX_ERROR) {
|
||||
return (ngx_thread_value_t) 1;
|
||||
}
|
||||
@ -1295,6 +1293,7 @@ ngx_worker_thread_cycle(void *data)
|
||||
if (ngx_event_thread_process_posted(cycle) == NGX_ERROR) {
|
||||
return (ngx_thread_value_t) 1;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (ngx_process_changes) {
|
||||
if (ngx_process_changes(cycle, 1) == NGX_ERROR) {
|
||||
|
Loading…
Reference in New Issue
Block a user