Better handling of MONGOOSE_NO_SOCKETPAIR

This commit is contained in:
Sergey Lyubka 2014-02-04 14:04:54 +00:00
parent e70d98b7b0
commit a34d1ec00a
2 changed files with 20 additions and 25 deletions

View File

@ -129,8 +129,8 @@ is returned.
void *param); void *param);
This is an interface primarily designed to push arbitrary data to websocket This is an interface primarily designed to push arbitrary data to websocket
connections at any time. This function could be called from any thread. When connections at any time. This function could be called from the IO thread only.
it returns, an IO thread called `func()` on each active connection, When it returns, an IO thread calls `func()` on each active connection,
passing `param` as an extra parameter. It is allowed to call `mg_send_data()` or passing `param` as an extra parameter. It is allowed to call `mg_send_data()` or
`mg_websocket_write()` within a callback, cause `func` is executed in the `mg_websocket_write()` within a callback, cause `func` is executed in the
context of the IO thread. context of the IO thread.

View File

@ -306,7 +306,9 @@ struct mg_server {
SSL_CTX *ssl_ctx; // Server SSL context SSL_CTX *ssl_ctx; // Server SSL context
SSL_CTX *client_ssl_ctx; // Client SSL context SSL_CTX *client_ssl_ctx; // Client SSL context
#endif #endif
#ifndef MONGOOSE_NO_SOCKETPAIR
sock_t ctl[2]; // Control socketpair. Used to wake up from select() call sock_t ctl[2]; // Control socketpair. Used to wake up from select() call
#endif
}; };
// Expandable IO buffer // Expandable IO buffer
@ -785,6 +787,7 @@ int mg_printf(struct mg_connection *conn, const char *fmt, ...) {
return len; return len;
} }
#ifndef MONGOOSE_NO_SOCKETPAIR
static int mg_socketpair(sock_t sp[2]) { static int mg_socketpair(sock_t sp[2]) {
struct sockaddr_in sa; struct sockaddr_in sa;
sock_t sock, ret = -1; sock_t sock, ret = -1;
@ -816,6 +819,7 @@ static int mg_socketpair(sock_t sp[2]) {
return ret; return ret;
} }
#endif
static int is_error(int n) { static int is_error(int n) {
return n == 0 || return n == 0 ||
@ -3110,7 +3114,8 @@ int mg_parse_header(const char *s, const char *var_name, char *buf,
} }
#ifdef MONGOOSE_USE_LUA #ifdef MONGOOSE_USE_LUA
#include "lua_5.2.1.h" #include <lua.h>
#include <lauxlib.h>
#ifdef _WIN32 #ifdef _WIN32
static void *mmap(void *addr, int64_t len, int prot, int flags, int fd, static void *mmap(void *addr, int64_t len, int prot, int flags, int fd,
@ -3783,19 +3788,6 @@ static void transfer_file_data(struct connection *conn) {
} }
} }
static void execute_iteration(struct mg_server *server) {
struct ll *lp, *tmp;
struct connection *conn;
union { mg_handler_t f; void *p; } msg[2];
recv(server->ctl[1], (void *) msg, sizeof(msg), 0);
LINKED_LIST_FOREACH(&server->active_connections, lp, tmp) {
conn = LINKED_LIST_ENTRY(lp, struct connection, link);
conn->mg_conn.connection_param = msg[1].p;
msg[0].f(&conn->mg_conn);
}
}
void add_to_set(sock_t sock, fd_set *set, sock_t *max_fd) { void add_to_set(sock_t sock, fd_set *set, sock_t *max_fd) {
FD_SET(sock, set); FD_SET(sock, set);
if (sock > *max_fd) { if (sock > *max_fd) {
@ -3817,7 +3809,9 @@ unsigned int mg_poll_server(struct mg_server *server, int milliseconds) {
FD_ZERO(&read_set); FD_ZERO(&read_set);
FD_ZERO(&write_set); FD_ZERO(&write_set);
add_to_set(server->listening_sock, &read_set, &max_fd); add_to_set(server->listening_sock, &read_set, &max_fd);
#ifndef MONGOOSE_NO_SOCKETPAIR
add_to_set(server->ctl[1], &read_set, &max_fd); add_to_set(server->ctl[1], &read_set, &max_fd);
#endif
LINKED_LIST_FOREACH(&server->active_connections, lp, tmp) { LINKED_LIST_FOREACH(&server->active_connections, lp, tmp) {
conn = LINKED_LIST_ENTRY(lp, struct connection, link); conn = LINKED_LIST_ENTRY(lp, struct connection, link);
@ -3841,10 +3835,6 @@ unsigned int mg_poll_server(struct mg_server *server, int milliseconds) {
tv.tv_usec = (milliseconds % 1000) * 1000; tv.tv_usec = (milliseconds % 1000) * 1000;
if (select(max_fd + 1, &read_set, &write_set, NULL, &tv) > 0) { if (select(max_fd + 1, &read_set, &write_set, NULL, &tv) > 0) {
if (FD_ISSET(server->ctl[1], &read_set)) {
execute_iteration(server);
}
// Accept new connections // Accept new connections
if (FD_ISSET(server->listening_sock, &read_set)) { if (FD_ISSET(server->listening_sock, &read_set)) {
// We're not looping here, and accepting just one connection at // We're not looping here, and accepting just one connection at
@ -3909,8 +3899,10 @@ void mg_destroy_server(struct mg_server **server) {
// Do one last poll, see https://github.com/cesanta/mongoose/issues/286 // Do one last poll, see https://github.com/cesanta/mongoose/issues/286
mg_poll_server(s, 0); mg_poll_server(s, 0);
closesocket(s->listening_sock); closesocket(s->listening_sock);
#ifndef MONGOOSE_NO_SOCKETPAIR
closesocket(s->ctl[0]); closesocket(s->ctl[0]);
closesocket(s->ctl[1]); closesocket(s->ctl[1]);
#endif
LINKED_LIST_FOREACH(&s->active_connections, lp, tmp) { LINKED_LIST_FOREACH(&s->active_connections, lp, tmp) {
close_conn(LINKED_LIST_ENTRY(lp, struct connection, link)); close_conn(LINKED_LIST_ENTRY(lp, struct connection, link));
} }
@ -3929,11 +3921,14 @@ void mg_destroy_server(struct mg_server **server) {
// Apply function to all active connections. // Apply function to all active connections.
void mg_iterate_over_connections(struct mg_server *server, mg_handler_t handler, void mg_iterate_over_connections(struct mg_server *server, mg_handler_t handler,
void *param) { void *param) {
// Send closure (function + parameter) to the IO thread to execute struct ll *lp, *tmp;
union { mg_handler_t f; void *p; } msg[2]; struct connection *conn;
msg[0].f = handler;
msg[1].p = param; LINKED_LIST_FOREACH(&server->active_connections, lp, tmp) {
send(server->ctl[0], (void *) msg, sizeof(msg), 0); conn = LINKED_LIST_ENTRY(lp, struct connection, link);
conn->mg_conn.connection_param = param;
handler(&conn->mg_conn);
}
} }
static int get_var(const char *data, size_t data_len, const char *name, static int get_var(const char *data, size_t data_len, const char *name,