From a15d5185719a8c06b0076f4094427d6f49fe2675 Mon Sep 17 00:00:00 2001 From: cpq Date: Thu, 10 Jun 2021 19:15:50 +0100 Subject: [PATCH] Add mg_mqtt_pubex --- docs/README.md | 9 +++++++++ mongoose.c | 26 +++++++++++++++++++------- mongoose.h | 8 +++++--- src/mqtt.c | 13 +++++++++---- src/mqtt.h | 8 +++++--- src/ws.c | 15 +++++++++++---- 6 files changed, 58 insertions(+), 21 deletions(-) diff --git a/docs/README.md b/docs/README.md index b3c66e12..e9852d62 100644 --- a/docs/README.md +++ b/docs/README.md @@ -982,6 +982,15 @@ void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic, Publish message `data` to the topic `topic`. +### mg\_mqtt\_pubex() + +```c +void mg_mqtt_pubex(struct mg_connection *, struct mg_str *topic, + struct mg_str *data, int qos, bool retain); +``` + +Like `mg_mqtt_pub()` but also allows to set QoS and retain flag. + ### mg\_mqtt\_sub() diff --git a/mongoose.c b/mongoose.c index 0bd177f7..ca7c8c7c 100644 --- a/mongoose.c +++ b/mongoose.c @@ -1921,9 +1921,9 @@ static void mqtt_login(struct mg_connection *c, const char *url, } } -void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic, - struct mg_str *data) { - uint8_t flags = MQTT_QOS(1); +void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic, + struct mg_str *data, int qos, bool retain) { + uint8_t flags = (uint8_t)((qos & 3) << 1) | (retain ? 1 : 0); uint32_t total_len = 2 + (uint32_t) topic->len + (uint32_t) data->len; LOG(LL_DEBUG, ("%lu [%.*s] -> [%.*s]", c->id, (int) topic->len, (char *) topic->ptr, (int) data->len, (char *) data->ptr)); @@ -1939,6 +1939,11 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic, mg_send(c, data->ptr, data->len); } +void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic, + struct mg_str *data) { + mg_mqtt_pubex(c, topic, data, 1, false); +} + void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic) { static uint16_t s_id; uint8_t qos = 1; @@ -4364,6 +4369,9 @@ static void mg_ws_cb(struct mg_connection *c, int ev, void *ev_data, char *s = (char *) c->recv.buf + msg.header_len; struct mg_ws_message m = {{s, msg.data_len}, msg.flags}; switch (msg.flags & WEBSOCKET_FLAGS_MASK_OP) { + case WEBSOCKET_OP_CONTINUE: + mg_call(c, MG_EV_WS_CTL, &m); + break; case WEBSOCKET_OP_PING: LOG(LL_DEBUG, ("%s", "WS PONG")); mg_ws_send(c, s, msg.data_len, WEBSOCKET_OP_PONG); @@ -4372,15 +4380,19 @@ static void mg_ws_cb(struct mg_connection *c, int ev, void *ev_data, case WEBSOCKET_OP_PONG: mg_call(c, MG_EV_WS_CTL, &m); break; + case WEBSOCKET_OP_TEXT: + case WEBSOCKET_OP_BINARY: + mg_call(c, MG_EV_WS_MSG, &m); + break; case WEBSOCKET_OP_CLOSE: LOG(LL_ERROR, ("%lu Got WS CLOSE", c->id)); mg_call(c, MG_EV_WS_CTL, &m); c->is_closing = 1; - return; - default: { - mg_call(c, MG_EV_WS_MSG, &m); break; - } + default: + // Per RFC6455, close conn when an unknown op is recvd + mg_error(c, "unknown WS op %d", msg.flags & WEBSOCKET_FLAGS_MASK_OP); + break; } mg_iobuf_delete(&c->recv, msg.header_len + msg.data_len); } diff --git a/mongoose.h b/mongoose.h index 5da8b471..1f0008d3 100644 --- a/mongoose.h +++ b/mongoose.h @@ -893,9 +893,9 @@ struct mg_mqtt_opts { }; struct mg_mqtt_message { - struct mg_str topic; // Parsed topic - struct mg_str data; // Parsed message - struct mg_str dgram; // Whole MQTT datagram, including headers + struct mg_str topic; // Parsed topic + struct mg_str data; // Parsed message + struct mg_str dgram; // Whole MQTT datagram, including headers uint16_t id; // Set for PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH uint8_t cmd; // MQTT command, one of MQTT_CMD_* uint8_t qos; // Quality of service @@ -909,6 +909,8 @@ struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url, mg_event_handler_t fn, void *fn_data); void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic, struct mg_str *data); +void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic, + struct mg_str *data, int qos, bool retain); void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic); int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m); void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags, diff --git a/src/mqtt.c b/src/mqtt.c index 3a198987..f883f9bb 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -1,8 +1,8 @@ +#include "mqtt.h" #include "arch.h" #include "base64.h" #include "event.h" #include "log.h" -#include "mqtt.h" #include "private.h" #include "url.h" #include "util.h" @@ -81,9 +81,9 @@ static void mqtt_login(struct mg_connection *c, const char *url, } } -void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic, - struct mg_str *data) { - uint8_t flags = MQTT_QOS(1); +void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic, + struct mg_str *data, int qos, bool retain) { + uint8_t flags = (uint8_t)((qos & 3) << 1) | (retain ? 1 : 0); uint32_t total_len = 2 + (uint32_t) topic->len + (uint32_t) data->len; LOG(LL_DEBUG, ("%lu [%.*s] -> [%.*s]", c->id, (int) topic->len, (char *) topic->ptr, (int) data->len, (char *) data->ptr)); @@ -99,6 +99,11 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic, mg_send(c, data->ptr, data->len); } +void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic, + struct mg_str *data) { + mg_mqtt_pubex(c, topic, data, 1, false); +} + void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic) { static uint16_t s_id; uint8_t qos = 1; diff --git a/src/mqtt.h b/src/mqtt.h index 695e34e6..6f78ffea 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -33,9 +33,9 @@ struct mg_mqtt_opts { }; struct mg_mqtt_message { - struct mg_str topic; // Parsed topic - struct mg_str data; // Parsed message - struct mg_str dgram; // Whole MQTT datagram, including headers + struct mg_str topic; // Parsed topic + struct mg_str data; // Parsed message + struct mg_str dgram; // Whole MQTT datagram, including headers uint16_t id; // Set for PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH uint8_t cmd; // MQTT command, one of MQTT_CMD_* uint8_t qos; // Quality of service @@ -49,6 +49,8 @@ struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url, mg_event_handler_t fn, void *fn_data); void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic, struct mg_str *data); +void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic, + struct mg_str *data, int qos, bool retain); void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic); int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m); void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags, diff --git a/src/ws.c b/src/ws.c index 1e806352..32158cb9 100644 --- a/src/ws.c +++ b/src/ws.c @@ -134,6 +134,9 @@ static void mg_ws_cb(struct mg_connection *c, int ev, void *ev_data, char *s = (char *) c->recv.buf + msg.header_len; struct mg_ws_message m = {{s, msg.data_len}, msg.flags}; switch (msg.flags & WEBSOCKET_FLAGS_MASK_OP) { + case WEBSOCKET_OP_CONTINUE: + mg_call(c, MG_EV_WS_CTL, &m); + break; case WEBSOCKET_OP_PING: LOG(LL_DEBUG, ("%s", "WS PONG")); mg_ws_send(c, s, msg.data_len, WEBSOCKET_OP_PONG); @@ -142,15 +145,19 @@ static void mg_ws_cb(struct mg_connection *c, int ev, void *ev_data, case WEBSOCKET_OP_PONG: mg_call(c, MG_EV_WS_CTL, &m); break; + case WEBSOCKET_OP_TEXT: + case WEBSOCKET_OP_BINARY: + mg_call(c, MG_EV_WS_MSG, &m); + break; case WEBSOCKET_OP_CLOSE: LOG(LL_ERROR, ("%lu Got WS CLOSE", c->id)); mg_call(c, MG_EV_WS_CTL, &m); c->is_closing = 1; - return; - default: { - mg_call(c, MG_EV_WS_MSG, &m); break; - } + default: + // Per RFC6455, close conn when an unknown op is recvd + mg_error(c, "unknown WS op %d", msg.flags & WEBSOCKET_FLAGS_MASK_OP); + break; } mg_iobuf_delete(&c->recv, msg.header_len + msg.data_len); }