Refactor multithreading API

This commit is contained in:
Sergey Lyubka 2022-04-22 14:42:07 +01:00
parent 8174842b05
commit f19eec7fae
9 changed files with 167 additions and 149 deletions

View File

@ -677,38 +677,45 @@ char buf[100];
LOG(LL_INFO, ("%s", mg_straddr(&c->peer, buf, sizeof(buf))));
```
### mg\_mkpipe()
### mg\_wrapfd()
```c
struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd,
mg_event_handler_t fn, void *fn_data);
```
Create a "pipe" connection which is safe to pass to a different task/thread
and used to wake up event manager from a different task. These
functions are designed to implement multi-threaded support, to handle two
common use cases:
- There are multiple consumer connections, e.g. connected websocket clients.
A server constantly pushes some data to all of them. In this case, a data
producer task should call `mg_mgr_wakeup()` as soon as more data is produced.
A pipe's event handler should push data to all client connection.
Use `c->label` to mark client connections.
- In order to serve a request, a long blocking operation should be performed.
In this case, request handler assigns some marker to `c->label` and then
spawns a handler task and gives a pipe to a
handler task. A handler does its job, and when data is ready, wakes up a
manager. A pipe's event handler pushes data to a marked connection.
Another task can wake up a sleeping event manager (in `mg_mgr_poll()` call)
using `mg_mgr_wakeup()`. When an event manager is woken up, a pipe
connection event handler function receives `MG_EV_READ` event.
Wrap a given file descriptor `fd` into a connection, and add that connection
to the event manager. An `fd` descriptor must suport `send()`, `recv()`,
`select()` syscalls, and be non-blocking. Mongoose will treat it as a TCP
socket. The `c->rem` and `c->loc` addresses will be empty.
Parameters:
- `fd` - A file descriptor to wrap
- `mgr` - An event manager
- `fn` - A pointer to event handler function
- `ud` - A user data pointer. It will be passed to `fn` as `fn_data` parameter
Return value: Pointer to created connection or `NULL` in case of error
Return value: Pointer to the created connection or `NULL` in case of error
### mg\_mkpipe()
```c
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data);
```
Create two interconnected sockets for inter-thread communication. One socket
is wrapped into a Mongoose connection and is added to the event manager.
Another socket is returned, and supposed to be passed to a worker thread.
When a worker thread `send()`s to socket any data, that wakes up `mgr` and
`fn` event handler reveives `MG_EV_READ` event. Also, `fn` can send any
data to a worker thread, which can be `recv()`ed by a worker thread.
Parameters:
- `mgr` - An event manager
- `fn` - A pointer to event handler function
- `fn_data` - A user data pointer. It will be passed to `fn` as `fn_data` parameter
Return value: created socket, or `-1` on error
Usage example: see [examples/multi-threaded](https://github.com/cesanta/mongoose/tree/master/examples/multi-threaded).

View File

@ -35,9 +35,36 @@ static void start_thread(void (*f)(void *), void *p) {
}
static void thread_function(void *param) {
struct mg_connection *c = param; // Pipe connection
int sock = (int) (size_t) param; // Paired socket. We own it
sleep(2); // Simulate long execution
mg_mgr_wakeup(c, "hi", 2); // Wakeup event manager
send(sock, "hi", 2, 2); // Wakeup event manager
close(sock); // Close the connection
}
static void link_conns(struct mg_connection *c1, struct mg_connection *c2) {
c1->fn_data = c2;
c2->fn_data = c1;
}
static void unlink_conns(struct mg_connection *c1, struct mg_connection *c2) {
c1->fn_data = c2->fn_data = NULL;
}
// Pipe event handler
static void pcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
struct mg_connection *parent = (struct mg_connection *) fn_data;
MG_INFO(("%lu %p %d %p", c->id, c->fd, ev, parent));
if (parent == NULL) { // If parent connection closed, close too
c->is_closing = 1;
} else if (ev == MG_EV_READ) { // Got data from the worker thread
mg_http_reply(parent, 200, "Host: foo.com\r\n", "%.*s\n", c->recv.len,
c->recv.buf); // Respond!
c->recv.len = 0; // Tell Mongoose we've consumed data
} else if (ev == MG_EV_OPEN) {
link_conns(c, parent);
} else if (ev == MG_EV_CLOSE) {
unlink_conns(c, parent);
}
}
// HTTP request callback
@ -50,32 +77,19 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n");
} else {
// Multithreading code path
c->label[0] = 'W'; // Mark us as waiting for data
start_thread(thread_function, fn_data); // Start handling thread
}
}
}
// Pipe event handler
static void pcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_READ) {
struct mg_connection *t;
for (t = c->mgr->conns; t != NULL; t = t->next) {
if (t->label[0] != 'W') continue; // Ignore un-marked connections
mg_http_reply(t, 200, "Host: foo.com\r\n", "%.*s\n", c->recv.len,
c->recv.buf); // Respond!
t->label[0] = 0; // Clear mark
int sock = mg_mkpipe(c->mgr, pcb, c); // Create pipe
start_thread(thread_function, (void *) (size_t) sock); // Start thread
}
} else if (ev == MG_EV_CLOSE) {
if (c->fn_data != NULL) unlink_conns(c, c->fn_data);
}
}
int main(void) {
struct mg_mgr mgr;
struct mg_connection *pipe; // Used to wake up event manager
mg_mgr_init(&mgr);
mg_log_set("3");
pipe = mg_mkpipe(&mgr, pcb, NULL); // Create pipe
mg_http_listen(&mgr, "http://localhost:8000", fn, pipe); // Create listener
mg_http_listen(&mgr, "http://localhost:8000", fn, NULL); // Create listener
for (;;) mg_mgr_poll(&mgr, 1000); // Event loop
mg_mgr_free(&mgr); // Cleanup
return 0;

View File

@ -2753,6 +2753,19 @@ struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url,
return c;
}
struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd,
mg_event_handler_t fn, void *fn_data) {
struct mg_connection *c = mg_alloc_conn(mgr);
if (c != NULL) {
c->fd = (void *) (size_t) fd;
c->fn = fn;
c->fn_data = fn_data;
mg_call(c, MG_EV_OPEN, NULL);
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
}
return c;
}
struct mg_timer *mg_timer_add(struct mg_mgr *mgr, uint64_t milliseconds,
unsigned flags, void (*fn)(void *), void *arg) {
struct mg_timer *t = (struct mg_timer *) calloc(1, sizeof(*t));
@ -3446,11 +3459,19 @@ void mg_connect_resolved(struct mg_connection *c) {
MG_DEBUG(("%lu %p", c->id, c->fd));
}
static SOCKET raccept(SOCKET sock, union usa *usa, socklen_t len) {
SOCKET s = INVALID_SOCKET;
do {
s = accept(sock, &usa->sa, &len);
} while (s == INVALID_SOCKET && errno == EINTR);
return s;
}
static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
struct mg_connection *c = NULL;
union usa usa;
socklen_t sa_len = sizeof(usa);
SOCKET fd = accept(FD(lsn), &usa.sa, &sa_len);
SOCKET fd = raccept(FD(lsn), &usa, sa_len);
if (fd == INVALID_SOCKET) {
#if MG_ARCH == MG_ARCH_AZURERTOS
// AzureRTOS, in non-block socket mode can mark listening socket readable
@ -3489,71 +3510,48 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
}
static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
socklen_t n = sizeof(usa[0].sin);
bool result = false;
SOCKET sock;
socklen_t len = sizeof(usa[0].sin);
bool success = false;
sock = sp[0] = sp[1] = INVALID_SOCKET;
(void) memset(&usa[0], 0, sizeof(usa[0]));
usa[0].sin.sin_family = AF_INET;
*(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001); // 127.0.0.1
*(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001U); // 127.0.0.1
usa[1] = usa[0];
if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
(sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
bind(sp[0], &usa[0].sa, n) == 0 && bind(sp[1], &usa[1].sa, n) == 0 &&
getsockname(sp[0], &usa[0].sa, &n) == 0 &&
getsockname(sp[1], &usa[1].sa, &n) == 0 &&
connect(sp[0], &usa[1].sa, n) == 0 &&
connect(sp[1], &usa[0].sa, n) == 0) {
mg_set_non_blocking_mode(sp[1]); // Set close-on-exec
result = true;
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
bind(sock, &usa[0].sa, len) == 0 && listen(sock, 3) == 0 &&
getsockname(sock, &usa[0].sa, &len) == 0 &&
(sp[0] = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
connect(sp[0], &usa[0].sa, len) == 0 &&
(sp[1] = raccept(sock, &usa[1], len)) != INVALID_SOCKET) {
mg_set_non_blocking_mode(sp[1]);
success = true;
} else {
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
sp[0] = sp[1] = INVALID_SOCKET;
}
return result;
if (sock != INVALID_SOCKET) closesocket(sock);
return success;
}
bool mg_mgr_wakeup(struct mg_connection *c, const void *buf, size_t len) {
if (buf == NULL || len == 0) buf = (void *) "", len = 1;
return (size_t) send((SOCKET) (size_t) c->pfn_data, (const char *) buf, len,
MSG_NONBLOCKING) == len;
}
static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_READ) {
mg_iobuf_free(&c->recv);
} else if (ev == MG_EV_CLOSE) {
closesocket((SOCKET) (size_t) c->pfn_data);
}
(void) ev_data, (void) fn_data;
}
struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn,
void *fn_data) {
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data) {
union usa usa[2];
SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET};
struct mg_connection *c = NULL;
if (!mg_socketpair(sp, usa)) {
MG_ERROR(("Cannot create socket pair"));
} else if ((c = mg_alloc_conn(mgr)) == NULL) {
} else if ((c = mg_wrapfd(mgr, (int) sp[1], fn, fn_data)) == NULL) {
closesocket(sp[0]);
closesocket(sp[1]);
MG_ERROR(("OOM"));
sp[0] = sp[1] = INVALID_SOCKET;
} else {
MG_DEBUG(("pipe %lu", (unsigned long) sp[0]));
tomgaddr(&usa[0], &c->rem, false);
c->fd = S2PTR(sp[1]);
c->is_udp = 1;
c->pfn = pf1;
c->pfn_data = (void *) (size_t) sp[0];
c->fn = fn;
c->fn_data = fn_data;
mg_call(c, MG_EV_OPEN, NULL);
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
MG_DEBUG(("%lu %p pipe %lu", c->id, c->fd, (unsigned long) sp[0]));
}
return c;
return (int) sp[0];
}
static void mg_iotest(struct mg_mgr *mgr, int ms) {

View File

@ -948,6 +948,8 @@ struct mg_connection *mg_listen(struct mg_mgr *, const char *url,
mg_event_handler_t fn, void *fn_data);
struct mg_connection *mg_connect(struct mg_mgr *, const char *url,
mg_event_handler_t fn, void *fn_data);
struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd,
mg_event_handler_t fn, void *fn_data);
void mg_connect_resolved(struct mg_connection *);
bool mg_send(struct mg_connection *, const void *, size_t);
size_t mg_printf(struct mg_connection *, const char *fmt, ...);
@ -955,9 +957,7 @@ size_t mg_vprintf(struct mg_connection *, const char *fmt, va_list ap);
char *mg_straddr(struct mg_addr *, char *, size_t);
bool mg_aton(struct mg_str str, struct mg_addr *addr);
char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len);
struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
bool mg_mgr_wakeup(struct mg_connection *pipe, const void *buf, size_t len);
int mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
// These functions are used to integrate with custom network stacks
struct mg_connection *mg_alloc_conn(struct mg_mgr *);

View File

@ -205,6 +205,19 @@ struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url,
return c;
}
struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd,
mg_event_handler_t fn, void *fn_data) {
struct mg_connection *c = mg_alloc_conn(mgr);
if (c != NULL) {
c->fd = (void *) (size_t) fd;
c->fn = fn;
c->fn_data = fn_data;
mg_call(c, MG_EV_OPEN, NULL);
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
}
return c;
}
struct mg_timer *mg_timer_add(struct mg_mgr *mgr, uint64_t milliseconds,
unsigned flags, void (*fn)(void *), void *arg) {
struct mg_timer *t = (struct mg_timer *) calloc(1, sizeof(*t));

View File

@ -73,6 +73,8 @@ struct mg_connection *mg_listen(struct mg_mgr *, const char *url,
mg_event_handler_t fn, void *fn_data);
struct mg_connection *mg_connect(struct mg_mgr *, const char *url,
mg_event_handler_t fn, void *fn_data);
struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd,
mg_event_handler_t fn, void *fn_data);
void mg_connect_resolved(struct mg_connection *);
bool mg_send(struct mg_connection *, const void *, size_t);
size_t mg_printf(struct mg_connection *, const char *fmt, ...);
@ -80,9 +82,7 @@ size_t mg_vprintf(struct mg_connection *, const char *fmt, va_list ap);
char *mg_straddr(struct mg_addr *, char *, size_t);
bool mg_aton(struct mg_str str, struct mg_addr *addr);
char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len);
struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
bool mg_mgr_wakeup(struct mg_connection *pipe, const void *buf, size_t len);
int mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
// These functions are used to integrate with custom network stacks
struct mg_connection *mg_alloc_conn(struct mg_mgr *);

View File

@ -355,11 +355,19 @@ void mg_connect_resolved(struct mg_connection *c) {
MG_DEBUG(("%lu %p", c->id, c->fd));
}
static SOCKET raccept(SOCKET sock, union usa *usa, socklen_t len) {
SOCKET s = INVALID_SOCKET;
do {
s = accept(sock, &usa->sa, &len);
} while (s == INVALID_SOCKET && errno == EINTR);
return s;
}
static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
struct mg_connection *c = NULL;
union usa usa;
socklen_t sa_len = sizeof(usa);
SOCKET fd = accept(FD(lsn), &usa.sa, &sa_len);
SOCKET fd = raccept(FD(lsn), &usa, sa_len);
if (fd == INVALID_SOCKET) {
#if MG_ARCH == MG_ARCH_AZURERTOS
// AzureRTOS, in non-block socket mode can mark listening socket readable
@ -398,71 +406,48 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
}
static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
socklen_t n = sizeof(usa[0].sin);
bool result = false;
SOCKET sock;
socklen_t len = sizeof(usa[0].sin);
bool success = false;
sock = sp[0] = sp[1] = INVALID_SOCKET;
(void) memset(&usa[0], 0, sizeof(usa[0]));
usa[0].sin.sin_family = AF_INET;
*(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001); // 127.0.0.1
*(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001U); // 127.0.0.1
usa[1] = usa[0];
if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
(sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
bind(sp[0], &usa[0].sa, n) == 0 && bind(sp[1], &usa[1].sa, n) == 0 &&
getsockname(sp[0], &usa[0].sa, &n) == 0 &&
getsockname(sp[1], &usa[1].sa, &n) == 0 &&
connect(sp[0], &usa[1].sa, n) == 0 &&
connect(sp[1], &usa[0].sa, n) == 0) {
mg_set_non_blocking_mode(sp[1]); // Set close-on-exec
result = true;
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
bind(sock, &usa[0].sa, len) == 0 && listen(sock, 3) == 0 &&
getsockname(sock, &usa[0].sa, &len) == 0 &&
(sp[0] = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
connect(sp[0], &usa[0].sa, len) == 0 &&
(sp[1] = raccept(sock, &usa[1], len)) != INVALID_SOCKET) {
mg_set_non_blocking_mode(sp[1]);
success = true;
} else {
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
sp[0] = sp[1] = INVALID_SOCKET;
}
return result;
if (sock != INVALID_SOCKET) closesocket(sock);
return success;
}
bool mg_mgr_wakeup(struct mg_connection *c, const void *buf, size_t len) {
if (buf == NULL || len == 0) buf = (void *) "", len = 1;
return (size_t) send((SOCKET) (size_t) c->pfn_data, (const char *) buf, len,
MSG_NONBLOCKING) == len;
}
static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_READ) {
mg_iobuf_free(&c->recv);
} else if (ev == MG_EV_CLOSE) {
closesocket((SOCKET) (size_t) c->pfn_data);
}
(void) ev_data, (void) fn_data;
}
struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn,
void *fn_data) {
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data) {
union usa usa[2];
SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET};
struct mg_connection *c = NULL;
if (!mg_socketpair(sp, usa)) {
MG_ERROR(("Cannot create socket pair"));
} else if ((c = mg_alloc_conn(mgr)) == NULL) {
} else if ((c = mg_wrapfd(mgr, (int) sp[1], fn, fn_data)) == NULL) {
closesocket(sp[0]);
closesocket(sp[1]);
MG_ERROR(("OOM"));
sp[0] = sp[1] = INVALID_SOCKET;
} else {
MG_DEBUG(("pipe %lu", (unsigned long) sp[0]));
tomgaddr(&usa[0], &c->rem, false);
c->fd = S2PTR(sp[1]);
c->is_udp = 1;
c->pfn = pf1;
c->pfn_data = (void *) (size_t) sp[0];
c->fn = fn;
c->fn_data = fn_data;
mg_call(c, MG_EV_OPEN, NULL);
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
MG_DEBUG(("%lu %p pipe %lu", c->id, c->fd, (unsigned long) sp[0]));
}
return c;
return (int) sp[0];
}
static void mg_iotest(struct mg_mgr *mgr, int ms) {

View File

@ -20,18 +20,12 @@ void mg_mgr_poll(struct mg_mgr *mgr, int ms) {
bool mg_send(struct mg_connection *c, const void *buf, size_t len) {
(void) c, (void) buf, (void) len;
return 0;
return false;
}
bool mg_mgr_wakeup(struct mg_connection *c, const void *buf, size_t len) {
(void) c, (void) buf, (void) len;
return 0;
}
struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn,
void *fn_data) {
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data) {
(void) mgr, (void) fn, (void) fn_data;
return NULL;
return -1;
}
void _fini(void);

View File

@ -1720,13 +1720,20 @@ static void eh6(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
(void) c, (void) ev_data;
}
#ifdef __arm__
int send(int sock, const void *buf, size_t len, int flags);
int send(int sock, const void *buf, size_t len, int flags) {
(void) sock, (void) buf, (void) len, (void) flags;
return -1;
}
#endif
static void test_pipe(void) {
struct mg_mgr mgr;
struct mg_connection *c;
int i, done = 0;
int i, sock, done = 0;
mg_mgr_init(&mgr);
ASSERT((c = mg_mkpipe(&mgr, eh6, (void *) &done)) != NULL);
mg_mgr_wakeup(c, "", 1);
ASSERT((sock = mg_mkpipe(&mgr, eh6, (void *) &done)) >= 0);
ASSERT(send(sock, "hi", 2, 0) == 2);
for (i = 0; i < 10 && done == 0; i++) mg_mgr_poll(&mgr, 1);
ASSERT(done == 1);
mg_mgr_free(&mgr);