diff --git a/mongoose.c b/mongoose.c index 93662b94..d04744d3 100644 --- a/mongoose.c +++ b/mongoose.c @@ -3511,6 +3511,7 @@ struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd, c->fd = (void *) (size_t) fd; c->fn = fn; c->fn_data = fn_data; + MG_EPOLL_ADD(c); mg_call(c, MG_EV_OPEN, NULL); LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); } @@ -3536,10 +3537,18 @@ void mg_mgr_free(struct mg_mgr *mgr) { FreeRTOS_DeleteSocketSet(mgr->ss); #endif MG_DEBUG(("All connections closed")); +#if MG_ENABLE_EPOLL + if (mgr->epoll_fd >= 0) close(mgr->epoll_fd), mgr->epoll_fd = -1; +#endif } void mg_mgr_init(struct mg_mgr *mgr) { memset(mgr, 0, sizeof(*mgr)); +#if MG_ENABLE_EPOLL + if ((mgr->epoll_fd = epoll_create1(0)) < 0) MG_ERROR(("epoll: %d", errno)); +#else + mgr->epoll_fd = -1; +#endif #if MG_ARCH == MG_ARCH_WIN32 && MG_ENABLE_WINSOCK // clang-format off { WSADATA data; WSAStartup(MAKEWORD(2, 2), &data); } @@ -4091,6 +4100,9 @@ static void iolog(struct mg_connection *c, char *buf, long n, bool r) { } else { mg_iobuf_del(&c->send, 0, (size_t) n); // if (c->send.len == 0) mg_iobuf_resize(&c->send, 0); + if (c->send.len == 0) { + MG_EPOLL_MOD(c, 0); + } mg_call(c, MG_EV_WRITE, &n); } } @@ -4207,6 +4219,7 @@ bool mg_open_listener(struct mg_connection *c, const char *url) { setlocaddr(fd, &c->loc); mg_set_non_blocking_mode(fd); c->fd = S2PTR(fd); + MG_EPOLL_ADD(c); success = true; } } @@ -4259,6 +4272,9 @@ static void write_conn(struct mg_connection *c) { static void close_conn(struct mg_connection *c) { if (FD(c) != INVALID_SOCKET) { +#if MG_ENABLE_EPOLL + epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_DEL, FD(c), NULL); +#endif closesocket(FD(c)); #if MG_ARCH == MG_ARCH_FREERTOS_TCP FreeRTOS_FD_CLR(c->fd, c->mgr->ss, eSELECT_ALL); @@ -4275,6 +4291,7 @@ static void connect_conn(struct mg_connection *c) { if (getpeername(FD(c), &usa.sa, &n) == 0) { c->is_connecting = 0; mg_call(c, MG_EV_CONNECT, NULL); + MG_EPOLL_MOD(c, 0); if (c->is_tls_hs) mg_tls_handshake(c); } else { mg_error(c, "socket error"); @@ -4308,6 +4325,7 @@ void mg_connect_resolved(struct mg_connection *c) { if (FD(c) == INVALID_SOCKET) { mg_error(c, "socket(): %d", MG_SOCK_ERRNO); } else if (c->is_udp) { + MG_EPOLL_ADD(c); mg_call(c, MG_EV_RESOLVE, NULL); mg_call(c, MG_EV_CONNECT, NULL); } else { @@ -4315,6 +4333,7 @@ void mg_connect_resolved(struct mg_connection *c) { socklen_t slen = tousa(&c->rem, &usa); mg_set_non_blocking_mode(FD(c)); setsockopts(c); + MG_EPOLL_ADD(c); mg_call(c, MG_EV_RESOLVE, NULL); if ((rc = connect(FD(c), &usa.sa, slen)) == 0) { mg_call(c, MG_EV_CONNECT, NULL); @@ -4366,6 +4385,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { MG_DEBUG(("%lu accepted %s", c->id, buf)); LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); c->fd = S2PTR(fd); + MG_EPOLL_ADD(c); mg_set_non_blocking_mode(FD(c)); setsockopts(c); c->is_accepted = 1; @@ -4469,6 +4489,28 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) { FreeRTOS_FD_CLR(c->fd, mgr->ss, eSELECT_READ | eSELECT_EXCEPT | eSELECT_WRITE); } +#elif MG_ENABLE_EPOLL + size_t max = 1; + for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) { + c->is_readable = c->is_writable = 0; + if (mg_tls_pending(c) > 0) ms = 1, c->is_readable = 1; + if (can_write(c)) MG_EPOLL_MOD(c, 1); + max++; + } + struct epoll_event *evs = (struct epoll_event *) alloca(max * sizeof(evs[0])); + int n = epoll_wait(mgr->epoll_fd, evs, (int) max, ms); + for (int i = 0; i < n; i++) { + struct mg_connection *c = (struct mg_connection *) evs[i].data.ptr; + if (evs[i].events & EPOLLERR) { + mg_error(c, "socket error"); + } else if (c->is_readable == 0) { + bool rd = evs[i].events & (EPOLLIN | EPOLLHUP); + bool wr = evs[i].events & EPOLLOUT; + c->is_readable = can_read(c) && rd ? 1U : 0; + c->is_writable = can_write(c) && wr ? 1U : 0; + } + } + (void) skip_iotest; #elif MG_ENABLE_POLL nfds_t n = 0; for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) n++; diff --git a/mongoose.h b/mongoose.h index 4dcfe03e..35becd90 100644 --- a/mongoose.h +++ b/mongoose.h @@ -435,7 +435,9 @@ extern int SockSet(SOCKET hSock, int Type, int Prop, void *pbuf, int size); #include #endif -#if !defined(MG_ENABLE_POLL) && (defined(__linux__) || defined(__APPLE__)) +#if !defined(MG_ENABLE_EPOLL) && defined(__linux__) +#define MG_ENABLE_EPOLL 1 +#elif !defined(MG_ENABLE_POLL) #define MG_ENABLE_POLL 1 #endif @@ -457,11 +459,15 @@ extern int SockSet(SOCKET hSock, int Type, int Prop, void *pbuf, int size); #include #include #include -#if defined(MG_ENABLE_POLL) && MG_ENABLE_POLL + +#if defined(MG_ENABLE_EPOLL) && MG_ENABLE_EPOLL +#include +#elif defined(MG_ENABLE_POLL) && MG_ENABLE_POLL #include #else #include #endif + #include #include #include @@ -611,6 +617,10 @@ int sscanf(const char *, const char *, ...); #define MG_ENABLE_POLL 0 #endif +#ifndef MG_ENABLE_EPOLL +#define MG_ENABLE_EPOLL 0 +#endif + #ifndef MG_ENABLE_FATFS #define MG_ENABLE_FATFS 0 #endif @@ -706,6 +716,23 @@ int sscanf(const char *, const char *, ...); #endif #endif +#if MG_ENABLE_EPOLL +#define MG_EPOLL_ADD(c) \ + do { \ + struct epoll_event ev = {EPOLLIN | EPOLLERR | EPOLLHUP, {c}}; \ + epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_ADD, (int) (size_t) c->fd, &ev); \ + } while (0) +#define MG_EPOLL_MOD(c, wr) \ + do { \ + struct epoll_event ev = {EPOLLIN | EPOLLERR | EPOLLHUP, {c}}; \ + if (wr) ev.events |= EPOLLOUT; \ + epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_MOD, (int) (size_t) c->fd, &ev); \ + } while (0) +#else +#define MG_EPOLL_ADD(c) +#define MG_EPOLL_MOD(c, wr) +#endif + @@ -1013,6 +1040,7 @@ struct mg_mgr { uint16_t mqtt_id; // MQTT IDs for pub/sub void *active_dns_requests; // DNS requests in progress struct mg_timer *timers; // Active timers + int epoll_fd; // Used when MG_EPOLL_ENABLE=1 void *priv; // Used by the MIP stack size_t extraconnsize; // Used by the MIP stack #if MG_ARCH == MG_ARCH_FREERTOS_TCP diff --git a/src/arch_unix.h b/src/arch_unix.h index 8e336eed..7eaeeeb1 100644 --- a/src/arch_unix.h +++ b/src/arch_unix.h @@ -8,7 +8,9 @@ #include #endif -#if !defined(MG_ENABLE_POLL) && (defined(__linux__) || defined(__APPLE__)) +#if !defined(MG_ENABLE_EPOLL) && defined(__linux__) +#define MG_ENABLE_EPOLL 1 +#elif !defined(MG_ENABLE_POLL) #define MG_ENABLE_POLL 1 #endif @@ -30,11 +32,15 @@ #include #include #include -#if defined(MG_ENABLE_POLL) && MG_ENABLE_POLL + +#if defined(MG_ENABLE_EPOLL) && MG_ENABLE_EPOLL +#include +#elif defined(MG_ENABLE_POLL) && MG_ENABLE_POLL #include #else #include #endif + #include #include #include diff --git a/src/config.h b/src/config.h index 28a8afe5..67229c76 100644 --- a/src/config.h +++ b/src/config.h @@ -12,6 +12,10 @@ #define MG_ENABLE_POLL 0 #endif +#ifndef MG_ENABLE_EPOLL +#define MG_ENABLE_EPOLL 0 +#endif + #ifndef MG_ENABLE_FATFS #define MG_ENABLE_FATFS 0 #endif @@ -106,3 +110,20 @@ #define MG_ENABLE_FILE 0 #endif #endif + +#if MG_ENABLE_EPOLL +#define MG_EPOLL_ADD(c) \ + do { \ + struct epoll_event ev = {EPOLLIN | EPOLLERR | EPOLLHUP, {c}}; \ + epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_ADD, (int) (size_t) c->fd, &ev); \ + } while (0) +#define MG_EPOLL_MOD(c, wr) \ + do { \ + struct epoll_event ev = {EPOLLIN | EPOLLERR | EPOLLHUP, {c}}; \ + if (wr) ev.events |= EPOLLOUT; \ + epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_MOD, (int) (size_t) c->fd, &ev); \ + } while (0) +#else +#define MG_EPOLL_ADD(c) +#define MG_EPOLL_MOD(c, wr) +#endif diff --git a/src/net.c b/src/net.c index ce9a5417..24d953c1 100644 --- a/src/net.c +++ b/src/net.c @@ -215,6 +215,7 @@ struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd, c->fd = (void *) (size_t) fd; c->fn = fn; c->fn_data = fn_data; + MG_EPOLL_ADD(c); mg_call(c, MG_EV_OPEN, NULL); LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); } @@ -240,10 +241,18 @@ void mg_mgr_free(struct mg_mgr *mgr) { FreeRTOS_DeleteSocketSet(mgr->ss); #endif MG_DEBUG(("All connections closed")); +#if MG_ENABLE_EPOLL + if (mgr->epoll_fd >= 0) close(mgr->epoll_fd), mgr->epoll_fd = -1; +#endif } void mg_mgr_init(struct mg_mgr *mgr) { memset(mgr, 0, sizeof(*mgr)); +#if MG_ENABLE_EPOLL + if ((mgr->epoll_fd = epoll_create1(0)) < 0) MG_ERROR(("epoll: %d", errno)); +#else + mgr->epoll_fd = -1; +#endif #if MG_ARCH == MG_ARCH_WIN32 && MG_ENABLE_WINSOCK // clang-format off { WSADATA data; WSAStartup(MAKEWORD(2, 2), &data); } diff --git a/src/net.h b/src/net.h index 357ff11a..aab62dd1 100644 --- a/src/net.h +++ b/src/net.h @@ -30,6 +30,7 @@ struct mg_mgr { uint16_t mqtt_id; // MQTT IDs for pub/sub void *active_dns_requests; // DNS requests in progress struct mg_timer *timers; // Active timers + int epoll_fd; // Used when MG_EPOLL_ENABLE=1 void *priv; // Used by the MIP stack size_t extraconnsize; // Used by the MIP stack #if MG_ARCH == MG_ARCH_FREERTOS_TCP diff --git a/src/sock.c b/src/sock.c index 85977855..b1664144 100644 --- a/src/sock.c +++ b/src/sock.c @@ -143,6 +143,9 @@ static void iolog(struct mg_connection *c, char *buf, long n, bool r) { } else { mg_iobuf_del(&c->send, 0, (size_t) n); // if (c->send.len == 0) mg_iobuf_resize(&c->send, 0); + if (c->send.len == 0) { + MG_EPOLL_MOD(c, 0); + } mg_call(c, MG_EV_WRITE, &n); } } @@ -259,6 +262,7 @@ bool mg_open_listener(struct mg_connection *c, const char *url) { setlocaddr(fd, &c->loc); mg_set_non_blocking_mode(fd); c->fd = S2PTR(fd); + MG_EPOLL_ADD(c); success = true; } } @@ -311,6 +315,9 @@ static void write_conn(struct mg_connection *c) { static void close_conn(struct mg_connection *c) { if (FD(c) != INVALID_SOCKET) { +#if MG_ENABLE_EPOLL + epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_DEL, FD(c), NULL); +#endif closesocket(FD(c)); #if MG_ARCH == MG_ARCH_FREERTOS_TCP FreeRTOS_FD_CLR(c->fd, c->mgr->ss, eSELECT_ALL); @@ -327,6 +334,7 @@ static void connect_conn(struct mg_connection *c) { if (getpeername(FD(c), &usa.sa, &n) == 0) { c->is_connecting = 0; mg_call(c, MG_EV_CONNECT, NULL); + MG_EPOLL_MOD(c, 0); if (c->is_tls_hs) mg_tls_handshake(c); } else { mg_error(c, "socket error"); @@ -360,6 +368,7 @@ void mg_connect_resolved(struct mg_connection *c) { if (FD(c) == INVALID_SOCKET) { mg_error(c, "socket(): %d", MG_SOCK_ERRNO); } else if (c->is_udp) { + MG_EPOLL_ADD(c); mg_call(c, MG_EV_RESOLVE, NULL); mg_call(c, MG_EV_CONNECT, NULL); } else { @@ -367,6 +376,7 @@ void mg_connect_resolved(struct mg_connection *c) { socklen_t slen = tousa(&c->rem, &usa); mg_set_non_blocking_mode(FD(c)); setsockopts(c); + MG_EPOLL_ADD(c); mg_call(c, MG_EV_RESOLVE, NULL); if ((rc = connect(FD(c), &usa.sa, slen)) == 0) { mg_call(c, MG_EV_CONNECT, NULL); @@ -418,6 +428,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { MG_DEBUG(("%lu accepted %s", c->id, buf)); LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); c->fd = S2PTR(fd); + MG_EPOLL_ADD(c); mg_set_non_blocking_mode(FD(c)); setsockopts(c); c->is_accepted = 1; @@ -521,6 +532,28 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) { FreeRTOS_FD_CLR(c->fd, mgr->ss, eSELECT_READ | eSELECT_EXCEPT | eSELECT_WRITE); } +#elif MG_ENABLE_EPOLL + size_t max = 1; + for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) { + c->is_readable = c->is_writable = 0; + if (mg_tls_pending(c) > 0) ms = 1, c->is_readable = 1; + if (can_write(c)) MG_EPOLL_MOD(c, 1); + max++; + } + struct epoll_event *evs = (struct epoll_event *) alloca(max * sizeof(evs[0])); + int n = epoll_wait(mgr->epoll_fd, evs, (int) max, ms); + for (int i = 0; i < n; i++) { + struct mg_connection *c = (struct mg_connection *) evs[i].data.ptr; + if (evs[i].events & EPOLLERR) { + mg_error(c, "socket error"); + } else if (c->is_readable == 0) { + bool rd = evs[i].events & (EPOLLIN | EPOLLHUP); + bool wr = evs[i].events & EPOLLOUT; + c->is_readable = can_read(c) && rd ? 1U : 0; + c->is_writable = can_write(c) && wr ? 1U : 0; + } + } + (void) skip_iotest; #elif MG_ENABLE_POLL nfds_t n = 0; for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) n++; diff --git a/test/unit_test.c b/test/unit_test.c index be99a597..9e3968a7 100644 --- a/test/unit_test.c +++ b/test/unit_test.c @@ -521,6 +521,8 @@ static void fcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { fd->closed = 1; c->is_closing = 1; (void) c; + } else if (ev == MG_EV_CLOSE) { + fd->closed = 1; } } @@ -545,13 +547,13 @@ static int fetch(struct mg_mgr *mgr, char *buf, const char *url, } mg_tls_init(c, &opts); if (c->tls == NULL) fd.closed = 1; - // c->is_hexdumping = 1; } + // c->is_hexdumping = 1; va_start(ap, fmt); mg_vprintf(c, fmt, ap); va_end(ap); buf[0] = '\0'; - for (i = 0; i < 250 && buf[0] == '\0'; i++) mg_mgr_poll(mgr, 1); + for (i = 0; i < 50 && buf[0] == '\0'; i++) mg_mgr_poll(mgr, 1); if (!fd.closed) c->is_closing = 1; mg_mgr_poll(mgr, 1); return fd.code;