Handle #1562 - add udp/tcp extra param to mg_mkpipe()

This commit is contained in:
Sergey Lyubka 2022-05-31 18:23:32 +01:00
parent dc486a2aa3
commit a3d63c095a
11 changed files with 68 additions and 47 deletions

View File

@ -13,7 +13,7 @@ VCFLAGS = /nologo /W3 /O2 /MD /I. $(DEFS) $(TFLAGS)
IPV6 ?= 1
ASAN ?= -fsanitize=address,undefined -fno-sanitize-recover=all
ASAN_OPTIONS ?= detect_leaks=1
EXAMPLES := $(dir $(wildcard examples/*/Makefile))
EXAMPLES := $(dir $(wildcard examples/*/Makefile)) examples/stm32/nucleo-f746zg-baremetal
PREFIX ?= /usr/local
VERSION ?= $(shell cut -d'"' -f2 src/version.h)
COMMON_CFLAGS ?= $(WARN) $(INCS) $(DEFS) -DMG_ENABLE_IPV6=$(IPV6) $(TFLAGS) $(EXTRA)

View File

@ -701,7 +701,7 @@ Return value: Pointer to the created connection or `NULL` in case of error
### mg\_mkpipe()
```c
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data);
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data, bool udp);
```
Create two interconnected sockets for inter-thread communication. One socket
@ -710,11 +710,13 @@ Another socket is returned, and supposed to be passed to a worker thread.
When a worker thread `send()`s to socket any data, that wakes up `mgr` and
`fn` event handler reveives `MG_EV_READ` event. Also, `fn` can send any
data to a worker thread, which can be `recv()`ed by a worker thread.
If a socketpair is UDP, then it is guaranteed to send a
Parameters:
- `mgr` - An event manager
- `fn` - A pointer to event handler function
- `fn_data` - A user data pointer. It will be passed to `fn` as `fn_data` parameter
- `mgr` - an event manager
- `fn` - a pointer to event handler function
- `fn_data` - a user data pointer. It will be passed to `fn` as `fn_data` parameter
- `udp` - tells to create UDP or TCP socketpair.
Return value: created socket, or `-1` on error

View File

@ -77,7 +77,7 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n");
} else {
// Multithreading code path
int sock = mg_mkpipe(c->mgr, pcb, c); // Create pipe
int sock = mg_mkpipe(c->mgr, pcb, c, true); // Create pipe
start_thread(thread_function, (void *) (size_t) sock); // Start thread
}
} else if (ev == MG_EV_CLOSE) {

View File

@ -13,7 +13,7 @@ LDFLAGS ?= -Tlink.ld -nostartfiles -nostdlib --specs nano.specs \
SOURCES = boot.c main.c syscalls.c $(ROOT)/drivers/mip_driver_stm32.c \
$(ROOT)/mongoose.c
all build: $(TARGET).bin
all build example: $(TARGET).bin
$(TARGET).bin: $(TARGET).elf
$(DOCKER) $(CROSS)-objcopy -O binary $< $@

View File

@ -3076,11 +3076,6 @@ bool mg_send(struct mg_connection *c, const void *buf, size_t len) {
}
return res;
}
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data) {
(void) mgr, (void) fn, (void) fn_data;
return -1;
}
#endif // MG_ENABLE_MIP
#ifdef MG_ENABLE_LINES
@ -4338,9 +4333,9 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
}
}
static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
static bool mg_socketpair(SOCKET sp[2], union usa usa[2], bool udp) {
SOCKET sock;
socklen_t len = sizeof(usa[0].sin);
socklen_t n = sizeof(usa[0].sin);
bool success = false;
sock = sp[0] = sp[1] = INVALID_SOCKET;
@ -4349,15 +4344,26 @@ static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
*(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001U); // 127.0.0.1
usa[1] = usa[0];
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
bind(sock, &usa[0].sa, len) == 0 &&
listen(sock, MG_SOCK_LISTEN_BACKLOG_SIZE) == 0 &&
getsockname(sock, &usa[0].sa, &len) == 0 &&
(sp[0] = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
connect(sp[0], &usa[0].sa, len) == 0 &&
(sp[1] = raccept(sock, &usa[1], len)) != INVALID_SOCKET) {
mg_set_non_blocking_mode(sp[1]);
if (udp && (sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
(sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
bind(sp[0], &usa[0].sa, n) == 0 && bind(sp[1], &usa[1].sa, n) == 0 &&
getsockname(sp[0], &usa[0].sa, &n) == 0 &&
getsockname(sp[1], &usa[1].sa, &n) == 0 &&
connect(sp[0], &usa[1].sa, n) == 0 &&
connect(sp[1], &usa[0].sa, n) == 0) {
success = true;
} else if (!udp &&
(sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
bind(sock, &usa[0].sa, n) == 0 &&
listen(sock, MG_SOCK_LISTEN_BACKLOG_SIZE) == 0 &&
getsockname(sock, &usa[0].sa, &n) == 0 &&
(sp[0] = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
connect(sp[0], &usa[0].sa, n) == 0 &&
(sp[1] = raccept(sock, &usa[1], n)) != INVALID_SOCKET) {
success = true;
}
if (success) {
mg_set_non_blocking_mode(sp[1]);
} else {
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
@ -4367,11 +4373,12 @@ static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
return success;
}
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data) {
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data,
bool udp) {
union usa usa[2];
SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET};
struct mg_connection *c = NULL;
if (!mg_socketpair(sp, usa)) {
if (!mg_socketpair(sp, usa, udp)) {
MG_ERROR(("Cannot create socket pair"));
} else if ((c = mg_wrapfd(mgr, (int) sp[1], fn, fn_data)) == NULL) {
closesocket(sp[0]);

View File

@ -1021,7 +1021,7 @@ size_t mg_vprintf(struct mg_connection *, const char *fmt, va_list ap);
char *mg_straddr(struct mg_addr *, char *, size_t);
bool mg_aton(struct mg_str str, struct mg_addr *addr);
char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len);
int mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
int mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *, bool udp);
// These functions are used to integrate with custom network stacks
struct mg_connection *mg_alloc_conn(struct mg_mgr *);

View File

@ -785,9 +785,4 @@ bool mg_send(struct mg_connection *c, const void *buf, size_t len) {
}
return res;
}
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data) {
(void) mgr, (void) fn, (void) fn_data;
return -1;
}
#endif // MG_ENABLE_MIP

View File

@ -84,7 +84,7 @@ size_t mg_vprintf(struct mg_connection *, const char *fmt, va_list ap);
char *mg_straddr(struct mg_addr *, char *, size_t);
bool mg_aton(struct mg_str str, struct mg_addr *addr);
char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len);
int mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
int mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *, bool udp);
// These functions are used to integrate with custom network stacks
struct mg_connection *mg_alloc_conn(struct mg_mgr *);

View File

@ -412,9 +412,9 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
}
}
static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
static bool mg_socketpair(SOCKET sp[2], union usa usa[2], bool udp) {
SOCKET sock;
socklen_t len = sizeof(usa[0].sin);
socklen_t n = sizeof(usa[0].sin);
bool success = false;
sock = sp[0] = sp[1] = INVALID_SOCKET;
@ -423,15 +423,26 @@ static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
*(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001U); // 127.0.0.1
usa[1] = usa[0];
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
bind(sock, &usa[0].sa, len) == 0 &&
listen(sock, MG_SOCK_LISTEN_BACKLOG_SIZE) == 0 &&
getsockname(sock, &usa[0].sa, &len) == 0 &&
(sp[0] = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
connect(sp[0], &usa[0].sa, len) == 0 &&
(sp[1] = raccept(sock, &usa[1], len)) != INVALID_SOCKET) {
mg_set_non_blocking_mode(sp[1]);
if (udp && (sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
(sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
bind(sp[0], &usa[0].sa, n) == 0 && bind(sp[1], &usa[1].sa, n) == 0 &&
getsockname(sp[0], &usa[0].sa, &n) == 0 &&
getsockname(sp[1], &usa[1].sa, &n) == 0 &&
connect(sp[0], &usa[1].sa, n) == 0 &&
connect(sp[1], &usa[0].sa, n) == 0) {
success = true;
} else if (!udp &&
(sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
bind(sock, &usa[0].sa, n) == 0 &&
listen(sock, MG_SOCK_LISTEN_BACKLOG_SIZE) == 0 &&
getsockname(sock, &usa[0].sa, &n) == 0 &&
(sp[0] = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
connect(sp[0], &usa[0].sa, n) == 0 &&
(sp[1] = raccept(sock, &usa[1], n)) != INVALID_SOCKET) {
success = true;
}
if (success) {
mg_set_non_blocking_mode(sp[1]);
} else {
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
@ -441,11 +452,12 @@ static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
return success;
}
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data) {
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data,
bool udp) {
union usa usa[2];
SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET};
struct mg_connection *c = NULL;
if (!mg_socketpair(sp, usa)) {
if (!mg_socketpair(sp, usa, udp)) {
MG_ERROR(("Cannot create socket pair"));
} else if ((c = mg_wrapfd(mgr, (int) sp[1], fn, fn_data)) == NULL) {
closesocket(sp[0]);

View File

@ -23,8 +23,8 @@ bool mg_send(struct mg_connection *c, const void *buf, size_t len) {
return false;
}
int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data) {
(void) mgr, (void) fn, (void) fn_data;
int mg_mkpipe(struct mg_mgr *m, mg_event_handler_t fn, void *d, bool udp) {
(void) m, (void) fn, (void) d, (void) udp;
return -1;
}

View File

@ -1821,11 +1821,11 @@ int send(int sock, const void *buf, size_t len, int flags) {
}
#endif
static void test_pipe(void) {
static void test_pipe_proto(bool is_udp) {
struct mg_mgr mgr;
int i, sock, done = 0;
mg_mgr_init(&mgr);
ASSERT((sock = mg_mkpipe(&mgr, eh6, (void *) &done)) >= 0);
ASSERT((sock = mg_mkpipe(&mgr, eh6, (void *) &done, is_udp)) >= 0);
ASSERT(send(sock, "hi", 2, 0) == 2);
for (i = 0; i < 10 && done == 0; i++) mg_mgr_poll(&mgr, 1);
ASSERT(done == 1);
@ -1833,6 +1833,11 @@ static void test_pipe(void) {
ASSERT(mgr.conns == NULL);
}
static void test_pipe(void) {
test_pipe_proto(true);
test_pipe_proto(false);
}
static void u1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_CONNECT) {
((int *) fn_data)[0] += 1;