Extend mg_mqtt_{pub,sub} by adding qos and retain

This commit is contained in:
cpq 2021-06-10 19:27:19 +01:00
parent a15d518571
commit 3deafaea7f
8 changed files with 39 additions and 63 deletions

View File

@ -977,28 +977,18 @@ Create client MQTT connection.
```c
void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic,
struct mg_str *data);
struct mg_str *data, int qos, bool retain);
```
Publish message `data` to the topic `topic`.
### mg\_mqtt\_pubex()
```c
void mg_mqtt_pubex(struct mg_connection *, struct mg_str *topic,
struct mg_str *data, int qos, bool retain);
```
Like `mg_mqtt_pub()` but also allows to set QoS and retain flag.
Publish message `data` to the topic `topic` with given QoS and retain flag.
### mg\_mqtt\_sub()
```c
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic);
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic, int qos);
```
Subscribe to topic `topic`.
Subscribe to topic `topic` with given QoS.
### mg\_mqtt\_next\_sub()

View File

@ -29,9 +29,9 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
// MQTT connect is successful
struct mg_str topic = mg_str(s_topic), data = mg_str("hello");
LOG(LL_INFO, ("CONNECTED to %s", s_url));
mg_mqtt_sub(c, &topic);
mg_mqtt_sub(c, &topic, 1);
LOG(LL_INFO, ("SUBSCRIBED to %.*s", (int) topic.len, topic.ptr));
mg_mqtt_pub(c, &topic, &data);
mg_mqtt_pub(c, &topic, &data, 1, false);
LOG(LL_INFO, ("PUBSLISHED %.*s -> %.*s", (int) data.len, data.ptr,
(int) topic.len, topic.ptr));
} else if (ev == MG_EV_MQTT_MSG) {
@ -47,16 +47,16 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
}
int main(void) {
struct mg_mgr mgr; // Event manager
struct mg_mqtt_opts opts; // MQTT connection options
bool done = false; // Event handler flips it to true when done
mg_mgr_init(&mgr); // Initialise event manager
memset(&opts, 0, sizeof(opts)); // Set MQTT options
opts.qos = 1; // Set QoS to 1
opts.will_topic = mg_str(s_topic); // Set last will topic
opts.will_message = mg_str("goodbye"); // And last will message
struct mg_mgr mgr; // Event manager
struct mg_mqtt_opts opts; // MQTT connection options
bool done = false; // Event handler flips it to true when done
mg_mgr_init(&mgr); // Initialise event manager
memset(&opts, 0, sizeof(opts)); // Set MQTT options
opts.qos = 1; // Set QoS to 1
opts.will_topic = mg_str(s_topic); // Set last will topic
opts.will_message = mg_str("goodbye"); // And last will message
mg_mqtt_connect(&mgr, s_url, &opts, fn, &done); // Create client connection
while (done == false) mg_mgr_poll(&mgr, 1000); // Event loop
mg_mgr_free(&mgr); // Finished, cleanup
while (done == false) mg_mgr_poll(&mgr, 1000); // Event loop
mg_mgr_free(&mgr); // Finished, cleanup
return 0;
}

View File

@ -55,7 +55,7 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
mm->data.ptr, (int) mm->topic.len, mm->topic.ptr));
for (struct sub *sub = s_subs; sub != NULL; sub = sub->next) {
if (mg_strcmp(mm->topic, sub->topic) != 0) continue;
mg_mqtt_pub(sub->c, &mm->topic, &mm->data);
mg_mqtt_pub(sub->c, &mm->topic, &mm->data, 1, false);
}
break;
}

View File

@ -1921,8 +1921,8 @@ static void mqtt_login(struct mg_connection *c, const char *url,
}
}
void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain) {
void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain) {
uint8_t flags = (uint8_t)((qos & 3) << 1) | (retain ? 1 : 0);
uint32_t total_len = 2 + (uint32_t) topic->len + (uint32_t) data->len;
LOG(LL_DEBUG, ("%lu [%.*s] -> [%.*s]", c->id, (int) topic->len,
@ -1939,22 +1939,17 @@ void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic,
mg_send(c, data->ptr, data->len);
}
void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data) {
mg_mqtt_pubex(c, topic, data, 1, false);
}
void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic) {
void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic, int qos) {
static uint16_t s_id;
uint8_t qos = 1;
uint8_t qos_ = qos & 3;
uint32_t total_len = 2 + (uint32_t) topic->len + 2 + 1;
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos),
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos_),
total_len);
if (++s_id == 0) ++s_id;
mg_send_u16(c, mg_htons(s_id));
mg_send_u16(c, mg_htons((uint16_t) topic->len));
mg_send(c, topic->ptr, topic->len);
mg_send(c, &qos, sizeof(qos));
mg_send(c, &qos_, sizeof(qos_));
}
int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m) {

View File

@ -907,11 +907,9 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
mg_event_handler_t fn, void *fn_data);
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic,
struct mg_str *data);
void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain);
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic);
void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain);
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic, int qos);
int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
uint32_t len);

View File

@ -81,8 +81,8 @@ static void mqtt_login(struct mg_connection *c, const char *url,
}
}
void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain) {
void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain) {
uint8_t flags = (uint8_t)((qos & 3) << 1) | (retain ? 1 : 0);
uint32_t total_len = 2 + (uint32_t) topic->len + (uint32_t) data->len;
LOG(LL_DEBUG, ("%lu [%.*s] -> [%.*s]", c->id, (int) topic->len,
@ -99,22 +99,17 @@ void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic,
mg_send(c, data->ptr, data->len);
}
void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data) {
mg_mqtt_pubex(c, topic, data, 1, false);
}
void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic) {
void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic, int qos) {
static uint16_t s_id;
uint8_t qos = 1;
uint8_t qos_ = qos & 3;
uint32_t total_len = 2 + (uint32_t) topic->len + 2 + 1;
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos),
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos_),
total_len);
if (++s_id == 0) ++s_id;
mg_send_u16(c, mg_htons(s_id));
mg_send_u16(c, mg_htons((uint16_t) topic->len));
mg_send(c, topic->ptr, topic->len);
mg_send(c, &qos, sizeof(qos));
mg_send(c, &qos_, sizeof(qos_));
}
int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m) {

View File

@ -47,11 +47,9 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
mg_event_handler_t fn, void *fn_data);
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic,
struct mg_str *data);
void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain);
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic);
void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain);
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic, int qos);
int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
uint32_t len);

View File

@ -300,8 +300,8 @@ static void test_mqtt(void) {
c = mg_mqtt_connect(&mgr, url, NULL, mqtt_cb, buf);
for (i = 0; i < 100 && buf[0] == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(buf[0] == 'X');
mg_mqtt_sub(c, &topic);
mg_mqtt_pub(c, &topic, &data);
mg_mqtt_sub(c, &topic, 1);
mg_mqtt_pub(c, &topic, &data, 1, false);
for (i = 0; i < 100 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
// LOG(LL_INFO, ("[%s]", buf));
ASSERT(strcmp(buf, "Xx/f12/hi") == 0);
@ -319,8 +319,8 @@ static void test_mqtt(void) {
c = mg_mqtt_connect(&mgr, url, &opts, mqtt_cb, buf);
for (i = 0; i < 100 && buf[0] == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(buf[0] == 'X');
mg_mqtt_sub(c, &topic);
mg_mqtt_pub(c, &topic, &data);
mg_mqtt_sub(c, &topic, 1);
mg_mqtt_pub(c, &topic, &data, 1, false);
for (i = 0; i < 100 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(strcmp(buf, "Xx/f12/hi") == 0);