From 5646caa572f09012741ae030be0250fdca529374 Mon Sep 17 00:00:00 2001 From: Sergey Lyubka Date: Wed, 16 Jan 2013 23:45:03 +0000 Subject: [PATCH] Using poll() instead of select() to prevent FD_SETSIZE overflows, for programs that open large number of descriptors --- mongoose.c | 144 ++++++++++++++++++++++++++++------------------------- 1 file changed, 76 insertions(+), 68 deletions(-) diff --git a/mongoose.c b/mongoose.c index 746c7ac1..124d33eb 100644 --- a/mongoose.c +++ b/mongoose.c @@ -176,13 +176,23 @@ typedef struct DIR { struct dirent result; } DIR; +#ifndef HAS_POLL +struct pollfd { + int fd; + short events; + short revents; +}; +#define POLLIN 1 +#endif + + // Mark required libraries #pragma comment(lib, "Ws2_32.lib") #else // UNIX specific #include #include -#include +#include #include #include #include @@ -435,7 +445,6 @@ struct file { // Describes listening socket, or socket which was accept()-ed by the master // thread and queued for future handling by the worker thread. struct socket { - struct socket *next; // Linkage SOCKET sock; // Listening socket union usa lsa; // Local socket address union usa rsa; // Remote socket address @@ -490,6 +499,7 @@ struct mg_context { void *user_data; // User-defined data struct socket *listening_sockets; + int num_listening_sockets; volatile int num_threads; // Number of threads pthread_mutex_t mutex; // Protects (max|num)_threads @@ -1231,6 +1241,33 @@ static struct dirent *readdir(DIR *dir) { return result; } +#ifndef HAVE_POLL +static int poll(struct pollfd *pfd, int n, int milliseconds) { + struct timeval tv; + fd_set set; + int i, result; + + tv.tv_sec = milliseconds / 1000; + tv.tv_usec = (milliseconds % 1000) * 1000; + FD_ZERO(&set); + + for (i = 0; i < n; i++) { + FD_SET((SOCKET) pfd[i].fd, &set); + pfd[i].revents = 0; + } + + if ((result = select(0, &set, NULL, NULL, &tv)) > 0) { + for (i = 0; i < n; i++) { + if (FD_ISSET(pfd[i].fd, &set)) { + pfd[i].revents = POLLIN; + } + } + } + + return result; +} +#endif // HAVE_POLL + #define set_close_on_exec(x) // No FD_CLOEXEC on Windows int mg_start_thread(mg_thread_func_t f, void *p) { @@ -1470,17 +1507,14 @@ static int64_t push(FILE *fp, SOCKET sock, SSL *ssl, const char *buf, // reading, must give up and close the connection and exit serving thread. static int wait_until_socket_is_readable(struct mg_connection *conn) { int result; - struct timeval tv; - fd_set set; + struct pollfd pfd; do { - tv.tv_sec = 0; - tv.tv_usec = 300 * 1000; - FD_ZERO(&set); - FD_SET(conn->client.sock, &set); - result = select(conn->client.sock + 1, &set, NULL, NULL, &tv); - if(result == 0 && conn->ssl != NULL) { - result = SSL_pending(conn->ssl); + pfd.fd = conn->client.sock; + pfd.events = POLLIN; + result = poll(&pfd, 1, 200); + if (result == 0 && conn->ssl != NULL) { + result = SSL_pending(conn->ssl); } } while ((result == 0 || (result < 0 && ERRNO == EINTR)) && conn->ctx->stop_flag == 0); @@ -1609,7 +1643,8 @@ int mg_printf(struct mg_connection *conn, const char *fmt, ...) { // vsnprintf() error, give up len = -1; cry(conn, "%s(%s, ...): vsnprintf() error", __func__, fmt); - } else if (len > (int) sizeof(mem) && (buf = (char *) malloc(len + 1)) != NULL) { + } else if (len > (int) sizeof(mem) && + (buf = (char *) malloc(len + 1)) != NULL) { // Local buffer is not large enough, allocate big buffer on heap va_start(ap, fmt); vsnprintf(buf, len + 1, fmt, ap); @@ -4272,12 +4307,11 @@ static void handle_request(struct mg_connection *conn) { } static void close_all_listening_sockets(struct mg_context *ctx) { - struct socket *sp, *tmp; - for (sp = ctx->listening_sockets; sp != NULL; sp = tmp) { - tmp = sp->next; - (void) closesocket(sp->sock); - free(sp); + int i; + for (i = 0; i < ctx->num_listening_sockets; i++) { + closesocket(ctx->listening_sockets[i].sock); } + free(ctx->listening_sockets); } // Valid listening port specification is: [ip_address:]port[s] @@ -4316,9 +4350,8 @@ static int parse_port_string(const struct vec *vec, struct socket *so) { static int set_ports_option(struct mg_context *ctx) { const char *list = ctx->config[LISTENING_PORTS]; int on = 1, success = 1; - SOCKET sock; struct vec vec; - struct socket so, *listener; + struct socket so; while (success && (list = next_option(list, &vec, NULL)) != NULL) { if (!parse_port_string(&vec, &so)) { @@ -4329,11 +4362,11 @@ static int set_ports_option(struct mg_context *ctx) { (ctx->ssl_ctx == NULL || ctx->config[SSL_CERTIFICATE] == NULL)) { cry(fc(ctx), "Cannot add SSL socket, is -ssl_certificate option set?"); success = 0; - } else if ((sock = socket(so.lsa.sa.sa_family, SOCK_STREAM, 6)) == + } else if ((so.sock = socket(so.lsa.sa.sa_family, SOCK_STREAM, 6)) == INVALID_SOCKET || // On Windows, SO_REUSEADDR is recommended only for // broadcast UDP sockets - setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &on, + setsockopt(so.sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &on, sizeof(on)) != 0 || // Set TCP keep-alive. This is needed because if HTTP-level // keep-alive is enabled, and client resets the connection, @@ -4342,27 +4375,22 @@ static int set_ports_option(struct mg_context *ctx) { // handshake will figure out that the client is down and // will close the server end. // Thanks to Igor Klopov who suggested the patch. - setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &on, + setsockopt(so.sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &on, sizeof(on)) != 0 || - bind(sock, &so.lsa.sa, sizeof(so.lsa)) != 0 || - listen(sock, SOMAXCONN) != 0) { - closesocket(sock); + bind(so.sock, &so.lsa.sa, sizeof(so.lsa)) != 0 || + listen(so.sock, SOMAXCONN) != 0) { + closesocket(so.sock); cry(fc(ctx), "%s: cannot bind to %.*s: %s", __func__, (int) vec.len, vec.ptr, strerror(ERRNO)); success = 0; - } else if ((listener = (struct socket *) - calloc(1, sizeof(*listener))) == NULL) { - // NOTE(lsm): order is important: call cry before closesocket(), - // cause closesocket() alters the errno. - cry(fc(ctx), "%s: %s", __func__, strerror(ERRNO)); - closesocket(sock); - success = 0; } else { - *listener = so; - listener->sock = sock; - set_close_on_exec(listener->sock); - listener->next = ctx->listening_sockets; - ctx->listening_sockets = listener; + set_close_on_exec(so.sock); + // TODO: handle realloc failure + ctx->listening_sockets = realloc(ctx->listening_sockets, + (ctx->num_listening_sockets + 1) * + sizeof(ctx->listening_sockets[0])); + ctx->listening_sockets[ctx->num_listening_sockets] = so; + ctx->num_listening_sockets++; } } @@ -4443,13 +4471,6 @@ static int check_acl(struct mg_context *ctx, uint32_t remote_ip) { return allowed == '+'; } -static void add_to_set(SOCKET fd, fd_set *set, int *max_fd) { - FD_SET(fd, set); - if (fd > (SOCKET) *max_fd) { - *max_fd = (int) fd; - } -} - #if !defined(_WIN32) static int set_uid_option(struct mg_context *ctx) { struct passwd *pw; @@ -4982,10 +5003,8 @@ static void accept_new_connection(const struct socket *listener, static void *master_thread(void *thread_func_param) { struct mg_context *ctx = thread_func_param; - fd_set read_set; - struct timeval tv; - struct socket *sp; - int max_fd; + struct pollfd *pfd; + int i; // Increase priority of the master thread #if defined(_WIN32) @@ -4998,33 +5017,22 @@ static void *master_thread(void *thread_func_param) { pthread_setschedparam(pthread_self(), SCHED_RR, &sched_param); #endif + pfd = calloc(ctx->num_listening_sockets, sizeof(pfd[0])); while (ctx->stop_flag == 0) { - FD_ZERO(&read_set); - max_fd = -1; - - // Add listening sockets to the read set - for (sp = ctx->listening_sockets; sp != NULL; sp = sp->next) { - add_to_set(sp->sock, &read_set, &max_fd); + for (i = 0; i < ctx->num_listening_sockets; i++) { + pfd[i].fd = ctx->listening_sockets[i].sock; + pfd[i].events = POLLIN; } - tv.tv_sec = 0; - tv.tv_usec = 200 * 1000; - - if (select(max_fd + 1, &read_set, NULL, NULL, &tv) < 0) { -#ifdef _WIN32 - // On windows, if read_set and write_set are empty, - // select() returns "Invalid parameter" error - // (at least on my Windows XP Pro). So in this case, we sleep here. - mg_sleep(1000); -#endif // _WIN32 - } else { - for (sp = ctx->listening_sockets; sp != NULL; sp = sp->next) { - if (ctx->stop_flag == 0 && FD_ISSET(sp->sock, &read_set)) { - accept_new_connection(sp, ctx); + if (poll(pfd, ctx->num_listening_sockets, 200) > 0) { + for (i = 0; i < ctx->num_listening_sockets; i++) { + if (ctx->stop_flag == 0 && pfd[i].revents == POLLIN) { + accept_new_connection(&ctx->listening_sockets[i], ctx); } } } } + free(pfd); DEBUG_TRACE(("stopping workers")); // Stop signal received: somebody called mg_stop. Quit.