Add mg multithreaded example

PUBLISHED_FROM=8a3424ba6260b0351d7c06270dc141d804ae7f77
This commit is contained in:
Ruslan Valiullin 2017-05-02 16:45:34 +01:00 committed by Cesanta Bot
parent 663e928d65
commit ac857d18db
2 changed files with 127 additions and 0 deletions

View File

@ -0,0 +1,3 @@
PROG = multithreaded
MODULE_CFLAGS=-DMG_ENABLE_THREADS
include ../examples.mk

View File

@ -0,0 +1,124 @@
/*
* Copyright (c) 2014-2017 Cesanta Software Limited
* All rights reserved
*/
#include "mongoose.h"
static sig_atomic_t s_received_signal = 0;
static const char *s_http_port = "8000";
static const int s_num_worker_threads = 5;
static unsigned long s_next_id = 0;
static void signal_handler(int sig_num) {
signal(sig_num, signal_handler);
s_received_signal = sig_num;
}
static struct mg_serve_http_opts s_http_server_opts;
static sock_t sock[2];
// This info is passed to the worker thread
struct work_request {
unsigned long conn_id; // needed to identify the connection where to send the reply
// optionally, more data that could be required by worker
};
// This info is passed by the worker thread to mg_broadcast
struct work_result {
unsigned long conn_id;
int sleep_time;
};
static void on_work_complete(struct mg_connection *nc, int ev, void *ev_data) {
(void) ev;
char s[32];
struct mg_connection *c;
for (c = mg_next(nc->mgr, NULL); c != NULL; c = mg_next(nc->mgr, c)) {
if (c->user_data != NULL) {
struct work_result *res = (struct work_result *)ev_data;
if ((unsigned long)c->user_data == res->conn_id) {
sprintf(s, "conn_id:%lu sleep:%d", res->conn_id, res->sleep_time);
mg_send_head(c, 200, strlen(s), "Content-Type: text/plain");
mg_printf(c, "%s", s);
}
}
}
}
void *worker_thread_proc(void *param) {
struct mg_mgr *mgr = (struct mg_mgr *) param;
struct work_request req = {0};
while (s_received_signal == 0) {
if (read(sock[1], &req, sizeof(req)) < 0)
perror("Reading worker sock");
int r = rand() % 10;
sleep(r);
struct work_result res = {req.conn_id, r};
mg_broadcast(mgr, on_work_complete, (void *)&res, sizeof(res));
}
return NULL;
}
static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) {
(void) nc;
(void) ev_data;
switch (ev) {
case MG_EV_ACCEPT:
nc->user_data = (void *)++s_next_id;
break;
case MG_EV_HTTP_REQUEST: {
struct work_request req = {(unsigned long)nc->user_data};
if (write(sock[0], &req, sizeof(req)) < 0)
perror("Writing worker sock");
break;
}
case MG_EV_CLOSE: {
if (nc->user_data) nc->user_data = NULL;
}
}
}
int main(void) {
struct mg_mgr mgr;
struct mg_connection *nc;
int i;
if (mg_socketpair(sock, SOCK_STREAM) == 0) {
perror("Opening socket pair");
exit(1);
}
signal(SIGTERM, signal_handler);
signal(SIGINT, signal_handler);
mg_mgr_init(&mgr, NULL);
nc = mg_bind(&mgr, s_http_port, ev_handler);
if (nc == NULL) {
printf("Failed to create listener\n");
return 1;
}
mg_set_protocol_http_websocket(nc);
s_http_server_opts.document_root = "."; // Serve current directory
s_http_server_opts.enable_directory_listing = "no";
for (i = 0; i < s_num_worker_threads; i++) {
mg_start_thread(worker_thread_proc, &mgr);
}
printf("Started on port %s\n", s_http_port);
while (s_received_signal == 0) {
mg_mgr_poll(&mgr, 200);
}
mg_mgr_free(&mgr);
closesocket(sock[0]);
closesocket(sock[1]);
return 0;
}