Add buf,len params to mg_mgr_wakeup()

This commit is contained in:
Sergey Lyubka 2022-01-19 09:25:01 +00:00
parent 4cd830a0bf
commit fe813ee247
9 changed files with 36 additions and 24 deletions

View File

@ -687,14 +687,17 @@ Usage example: see [examples/multi-threaded](https://github.com/cesanta/mongoose
### mg\_mgr\_wakeup()
```c
void mg_mgr_wakeup(struct mg_connection *pipe);
void mg_mgr_wakeup(struct mg_connection *pipe, const void *buf, size_len len);
```
Wake up an event manager that sleeps in `mg_mgr_poll()` call. This function
must be called from a separate task/thread. Parameters:
must be called from a separate task/thread. A calling thread can pass
some specific data to the IO thread via `buf`, `len`. Parameters:
Parameters:
- `pipe` - a special connection created by the `mg_mkpipe()` call
- `buf` - a data to send to the pipe connection. Use `""` if there is no data
- `len` - a data length
Return value: None

View File

@ -37,7 +37,7 @@ static void start_thread(void (*f)(void *), void *p) {
static void thread_function(void *param) {
struct mg_connection *c = param; // Pipe connection
sleep(2); // Simulate long execution
mg_mgr_wakeup(c); // Wakeup event manager
mg_mgr_wakeup(c, "hi", 2); // Wakeup event manager
}
// HTTP request callback
@ -62,8 +62,9 @@ static void pcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
struct mg_connection *t;
for (t = c->mgr->conns; t != NULL; t = t->next) {
if (t->label[0] != 'W') continue; // Ignore un-marked connections
mg_http_reply(t, 200, "Host: foo.com\r\n", "hi\n"); // Respond!
t->label[0] = 0; // Clear mark
mg_http_reply(t, 200, "Host: foo.com\r\n", "%.*s\n", c->recv.len,
c->recv.buf); // Respond!
t->label[0] = 0; // Clear mark
}
}
}

View File

@ -3510,13 +3510,17 @@ static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
return result;
}
void mg_mgr_wakeup(struct mg_connection *c) {
void mg_mgr_wakeup(struct mg_connection *c, const void *buf, size_t len) {
LOG(LL_INFO, ("skt: %p", c->pfn_data));
send((SOCKET) (size_t) c->pfn_data, "\x01", 1, MSG_NONBLOCKING);
send((SOCKET) (size_t) c->pfn_data, (const char *) buf, len, MSG_NONBLOCKING);
}
static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_READ) mg_iobuf_free(&c->recv);
if (ev == MG_EV_READ) {
mg_iobuf_free(&c->recv);
} else if (ev == MG_EV_CLOSE) {
closesocket((SOCKET) (size_t) c->pfn_data);
}
(void) ev_data, (void) fn_data;
}

View File

@ -637,6 +637,10 @@ struct mg_fs {
bool (*mkd)(const char *path); // Create directory
};
extern struct mg_fs mg_fs_posix; // POSIX open/close/read/write/seek
extern struct mg_fs mg_fs_packed; // Packed FS, see examples/complete
extern struct mg_fs mg_fs_fat; // FAT FS
// File descriptor
struct mg_fd {
void *fd;
@ -649,10 +653,6 @@ char *mg_file_read(struct mg_fs *fs, const char *path, size_t *size);
bool mg_file_write(struct mg_fs *fs, const char *path, const void *, size_t);
bool mg_file_printf(struct mg_fs *fs, const char *path, const char *fmt, ...);
extern struct mg_fs mg_fs_posix; // POSIX open/close/read/write/seek
extern struct mg_fs mg_fs_packed; // Packed FS, see examples/complete
extern struct mg_fs mg_fs_fat; // FAT FS
@ -882,7 +882,7 @@ bool mg_aton(struct mg_str str, struct mg_addr *addr);
char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len);
struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
void mg_mgr_wakeup(struct mg_connection *pipe);
void mg_mgr_wakeup(struct mg_connection *pipe, const void *buf, size_t len);

View File

@ -21,6 +21,10 @@ struct mg_fs {
bool (*mkd)(const char *path); // Create directory
};
extern struct mg_fs mg_fs_posix; // POSIX open/close/read/write/seek
extern struct mg_fs mg_fs_packed; // Packed FS, see examples/complete
extern struct mg_fs mg_fs_fat; // FAT FS
// File descriptor
struct mg_fd {
void *fd;
@ -32,7 +36,3 @@ void mg_fs_close(struct mg_fd *fd);
char *mg_file_read(struct mg_fs *fs, const char *path, size_t *size);
bool mg_file_write(struct mg_fs *fs, const char *path, const void *, size_t);
bool mg_file_printf(struct mg_fs *fs, const char *path, const char *fmt, ...);
extern struct mg_fs mg_fs_posix; // POSIX open/close/read/write/seek
extern struct mg_fs mg_fs_packed; // Packed FS, see examples/complete
extern struct mg_fs mg_fs_fat; // FAT FS

View File

@ -76,4 +76,4 @@ bool mg_aton(struct mg_str str, struct mg_addr *addr);
char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len);
struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
void mg_mgr_wakeup(struct mg_connection *pipe);
void mg_mgr_wakeup(struct mg_connection *pipe, const void *buf, size_t len);

View File

@ -444,13 +444,17 @@ static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
return result;
}
void mg_mgr_wakeup(struct mg_connection *c) {
void mg_mgr_wakeup(struct mg_connection *c, const void *buf, size_t len) {
LOG(LL_INFO, ("skt: %p", c->pfn_data));
send((SOCKET) (size_t) c->pfn_data, "\x01", 1, MSG_NONBLOCKING);
send((SOCKET) (size_t) c->pfn_data, (const char *) buf, len, MSG_NONBLOCKING);
}
static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_READ) mg_iobuf_free(&c->recv);
if (ev == MG_EV_READ) {
mg_iobuf_free(&c->recv);
} else if (ev == MG_EV_CLOSE) {
closesocket((SOCKET) (size_t) c->pfn_data);
}
(void) ev_data, (void) fn_data;
}

View File

@ -31,8 +31,8 @@ bool mg_send(struct mg_connection *c, const void *buf, size_t len) {
return 0;
}
void mg_mgr_wakeup(struct mg_connection *c) {
(void) c;
void mg_mgr_wakeup(struct mg_connection *c, const void *buf, size_t len) {
(void) c, (void) buf, (void) len;
}
struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn,

View File

@ -1504,7 +1504,7 @@ static void test_pipe(void) {
int i, done = 0;
mg_mgr_init(&mgr);
ASSERT((c = mg_mkpipe(&mgr, eh6, (void *) &done)) != NULL);
mg_mgr_wakeup(c);
mg_mgr_wakeup(c, "", 1);
for (i = 0; i < 10 && done == 0; i++) mg_mgr_poll(&mgr, 1);
ASSERT(done == 1);
mg_mgr_free(&mgr);