From 0e49214c7eb241d32162445ac5904c8a9d31822c Mon Sep 17 00:00:00 2001 From: Sergey Lyubka Date: Sun, 1 Dec 2013 09:46:05 +0000 Subject: [PATCH] Added mutex protection for write calls --- build/src/core.c | 49 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/build/src/core.c b/build/src/core.c index 605f8b1a..28f3e259 100644 --- a/build/src/core.c +++ b/build/src/core.c @@ -59,10 +59,15 @@ typedef unsigned int uint32_t; typedef unsigned short uint16_t; typedef unsigned __int64 uint64_t; typedef __int64 int64_t; +typedef CRITICAL_SECTION mutex_t; #pragma comment(lib, "ws2_32.lib") #define snprintf _snprintf #define vsnprintf _vsnprintf #define INT64_FMT "I64d" +#define mutex_init(x) InitializeCriticalSection(x) +#define mutex_destroy(x) DeleteCriticalSection(x) +#define mutex_lock(x) EnterCriticalSection(x) +#define mutex_unlock(x) LeaveCriticalSection(x) #ifndef va_copy #define va_copy(x,y) x = y #endif // MINGW #defines va_copy @@ -78,6 +83,11 @@ typedef __int64 int64_t; #include #define closesocket(x) close(x) typedef int sock_t; +typedef pthread_mutex_t mutex_t; +#define mutex_init(x) pthread_mutex_init(x, NULL) +#define mutex_destroy(x) pthread_mutex_destroy(x) +#define mutex_lock(x) pthread_mutex_lock(x) +#define mutex_unlock(x) pthread_mutex_unlock(x) #define INVALID_SOCKET ((sock_t) -1) #define INT64_FMT PRId64 #endif @@ -102,9 +112,13 @@ struct linked_list_link { struct linked_list_link *prev, *next; }; #define MAX_REQUEST_SIZE 16384 #define IOBUF_SIZE 8192 #define MAX_PATH_SIZE 8192 + +#ifdef ENABLE_DBG #define DBG(x) do { printf("%s::%s() ", __FILE__, __func__); \ printf x; putchar('\n'); fflush(stdout); } while(0) -//#define DBG(x) +#else +#define DBG(x) +#endif union socket_address { struct sockaddr sa; @@ -156,7 +170,7 @@ enum connection_flags { CONN_CLOSE = 1, CONN_SPOOL_DONE = 2 }; struct mg_connection { struct linked_list_link link; // Linkage to server->active_connections struct mg_server *server; - struct mg_event event; + struct mg_event event; // NOTE(lsm): this has conn_data attribute sock_t client_sock; // Connected client union socket_address csa; // Client's socket address struct iobuf local_iobuf; @@ -168,6 +182,7 @@ struct mg_connection { struct mg_request_info request_info; char *path_info; int request_len; + mutex_t mutex; // Guards concurrent mg_write() and mg_printf() calls }; static const char *static_config_options[] = { @@ -476,8 +491,11 @@ static struct mg_connection *accept_new_connection(struct mg_server *server) { struct mg_connection *conn = NULL; if ((sock = accept(server->listening_sock, &sa.sa, &len)) == INVALID_SOCKET) { +#if 0 } else if (sock >= FD_SETSIZE) { + DBG((">fd_setsize")); closesocket(sock); +#endif } else if (!check_acl(server->config_options[ACCESS_CONTROL_LIST], ntohl(* (uint32_t *) &sa.sin.sin_addr))) { closesocket(sock); @@ -495,6 +513,7 @@ static struct mg_connection *accept_new_connection(struct mg_server *server) { conn->local_iobuf.size = MAX_REQUEST_SIZE; conn->remote_iobuf.buf = (char *) calloc(1, IOBUF_SIZE); conn->remote_iobuf.size = IOBUF_SIZE; + mutex_init(&conn->mutex); LINKED_LIST_ADD_TO_FRONT(&server->active_connections, &conn->link); DBG(("added conn %p", conn)); } @@ -507,6 +526,7 @@ static void close_conn(struct mg_connection *conn) { LINKED_LIST_REMOVE(&conn->link); closesocket(conn->client_sock); free(conn->remote_iobuf.buf); + mutex_destroy(&conn->mutex); free(conn); } @@ -756,12 +776,12 @@ static int convert_uri_to_file_name(struct mg_connection *conn, char *buf, } static int spool(struct iobuf *io, const void *buf, int len) { - char *ptr = io->buf; + char *p = io->buf; if (len <= 0) { } else if (len < io->size - io->len || - (ptr = realloc(io->buf, io->len + len - io->size)) != NULL) { - io->buf = ptr; + (p = (char *) realloc(io->buf, io->len + len - io->size)) != 0) { + io->buf = p; memcpy(io->buf + io->len, buf, len); io->len += len; } else { @@ -796,13 +816,21 @@ int mg_printf(struct mg_connection *conn, const char *fmt, ...) { va_list ap; int ret; va_start(ap, fmt); + mutex_lock(&conn->mutex); ret = vspool(&conn->remote_iobuf, fmt, ap); + mutex_unlock(&conn->mutex); va_end(ap); + send(conn->server->ctl[1], ".", 1, 0); // Wake up select call return ret; } int mg_write(struct mg_connection *conn, const void *buf, int len) { - return spool(&conn->remote_iobuf, buf, len); + int ret; + mutex_lock(&conn->mutex); + ret = spool(&conn->remote_iobuf, buf, len); + mutex_unlock(&conn->mutex); + send(conn->server->ctl[1], ".", 1, 0); // Wake up select call + return ret; } static int is_error(int n) { @@ -983,6 +1011,13 @@ void mg_poll_server(struct mg_server *server, unsigned int milliseconds) { tv.tv_usec = (milliseconds % 1000) * 1000; if (select(max_fd + 1, &read_set, &write_set, NULL, &tv) > 0) { + // If control socket is set, just read from it. It meant to wake up + // this select loop when another thread writes to any connection + if (FD_ISSET(server->ctl[0], &read_set)) { + char buf[500]; + recv(server->ctl[0], buf, sizeof(buf), 0); + } + // Accept new connections if (FD_ISSET(server->listening_sock, &read_set)) { while ((conn = accept_new_connection(server)) != NULL) { @@ -1080,6 +1115,8 @@ struct mg_server *mg_create_server(const char *opts[], mg_event_handler_t func, return server; } +// End of library, start of the application code + static int event_handler(struct mg_event *ev) { mg_printf(ev->conn, "%s", "HTTP/1.0 200 OK\r\n\r\n:-)\n"); return 1;