MQTT QoS + v5

This commit is contained in:
Sergio R. Caprile 2023-05-31 21:23:26 -03:00
parent 1dde754724
commit f8579131e8
3 changed files with 134 additions and 37 deletions

View File

@ -3372,9 +3372,9 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) mm.topic.len,
mm.topic.ptr, (int) mm.data.len, mm.data.ptr));
if (mm.qos > 0) {
uint16_t id = mg_htons(mm.id);
uint16_t id = mg_ntohs(mm.id);
uint32_t remaining_len = sizeof(id);
if (c->is_mqtt5) remaining_len += 1;
if (c->is_mqtt5) remaining_len += 2; // 3.4.2
mg_mqtt_send_header(
c, mm.qos == 2 ? MQTT_CMD_PUBREC : MQTT_CMD_PUBACK, 0,
@ -3386,7 +3386,21 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
mg_send(c, &zero, sizeof(zero));
}
}
mg_call(c, MG_EV_MQTT_MSG, &mm); // let the app handle qos2 stuff
mg_call(c, MG_EV_MQTT_MSG, &mm); // let the app handle qos stuff
break;
}
case MQTT_CMD_PUBREC: { // MQTT5: 3.5.2-1 TODO(): variable header rc
uint16_t id = mg_ntohs(mm.id);
uint32_t remaining_len = sizeof(id); // MQTT5 3.6.2-1
mg_mqtt_send_header(c, MQTT_CMD_PUBREL, 2, remaining_len);
mg_send(c, &id, sizeof(id)); // MQTT5 3.6.1-1, flags = 2
break;
}
case MQTT_CMD_PUBREL: { // MQTT5: 3.6.2-1 TODO(): variable header rc
uint16_t id = mg_ntohs(mm.id);
uint32_t remaining_len = sizeof(id); // MQTT5 3.7.2-1
mg_mqtt_send_header(c, MQTT_CMD_PUBCOMP, 0, remaining_len);
mg_send(c, &id, sizeof(id));
break;
}
}

View File

@ -445,9 +445,9 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) mm.topic.len,
mm.topic.ptr, (int) mm.data.len, mm.data.ptr));
if (mm.qos > 0) {
uint16_t id = mg_htons(mm.id);
uint16_t id = mg_ntohs(mm.id);
uint32_t remaining_len = sizeof(id);
if (c->is_mqtt5) remaining_len += 1;
if (c->is_mqtt5) remaining_len += 2; // 3.4.2
mg_mqtt_send_header(
c, mm.qos == 2 ? MQTT_CMD_PUBREC : MQTT_CMD_PUBACK, 0,
@ -459,7 +459,21 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
mg_send(c, &zero, sizeof(zero));
}
}
mg_call(c, MG_EV_MQTT_MSG, &mm); // let the app handle qos2 stuff
mg_call(c, MG_EV_MQTT_MSG, &mm); // let the app handle qos stuff
break;
}
case MQTT_CMD_PUBREC: { // MQTT5: 3.5.2-1 TODO(): variable header rc
uint16_t id = mg_ntohs(mm.id);
uint32_t remaining_len = sizeof(id); // MQTT5 3.6.2-1
mg_mqtt_send_header(c, MQTT_CMD_PUBREL, 2, remaining_len);
mg_send(c, &id, sizeof(id)); // MQTT5 3.6.1-1, flags = 2
break;
}
case MQTT_CMD_PUBREL: { // MQTT5: 3.6.2-1 TODO(): variable header rc
uint16_t id = mg_ntohs(mm.id);
uint32_t remaining_len = sizeof(id); // MQTT5 3.7.2-1
mg_mqtt_send_header(c, MQTT_CMD_PUBCOMP, 0, remaining_len);
mg_send(c, &id, sizeof(id));
break;
}
}

View File

@ -343,9 +343,13 @@ static void test_sntp(void) {
struct mqtt_data {
char *buf;
size_t bufsize;
int subscribed;
int published;
int flags;
};
#define flags_subscribed (1 << 0)
#define flags_published (1 << 1)
#define flags_received (1 << 2)
#define flags_released (1 << 3)
#define flags_completed (1 << 4)
static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
struct mqtt_data *test_data = (struct mqtt_data *) fnd;
@ -356,10 +360,15 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
} else if (ev == MG_EV_MQTT_CMD) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
if (mm->cmd == MQTT_CMD_SUBACK) {
test_data->subscribed = 1;
}
if (mm->cmd == MQTT_CMD_PUBACK) {
test_data->published = 1;
test_data->flags = flags_subscribed;
} else if (mm->cmd == MQTT_CMD_PUBACK) { // here we assume the broker
test_data->flags = flags_published; // reported no errors,
} else if (mm->cmd == MQTT_CMD_PUBREC) { // either no var header or
test_data->flags |= flags_received; // reason code 0x00
} else if (mm->cmd == MQTT_CMD_PUBREL) {
test_data->flags |= flags_released;
} else if (mm->cmd == MQTT_CMD_PUBCOMP) {
test_data->flags |= flags_completed;
}
} else if (ev == MG_EV_MQTT_MSG) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
@ -418,7 +427,7 @@ static void construct_props(struct mg_mqtt_prop *props) {
static void test_mqtt_base(void);
static void test_mqtt_base(void) {
char buf[50] = {0};
struct mqtt_data test_data = {buf, 50, 0, 0};
struct mqtt_data test_data = {buf, 50, 0};
struct mg_mgr mgr;
struct mg_connection *c;
const char *url = "mqtt://broker.hivemq.com:1883";
@ -438,42 +447,67 @@ static void test_mqtt_base(void) {
static void test_mqtt_ver(uint8_t mqtt_version) {
char buf[50] = {0}, client_id[16], will_topic[16];
struct mqtt_data test_data = {buf, 50, 0, 0};
struct mqtt_data test_data = {buf, 50, 0};
struct mg_mgr mgr;
struct mg_str topic = mg_str("x/f12"), data = mg_str("hi");
struct mg_connection *c;
struct mg_mqtt_opts opts;
struct mg_mqtt_prop properties[4];
const char *url = "mqtt://broker.hivemq.com:1883";
int i;
int i, retries;
mg_mgr_init(&mgr);
// Connect with empty client ID
// Connect with empty client ID, no options, ergo no MQTT != 3.1.1
if (mqtt_version != 4) goto connect_with_options;
c = mg_mqtt_connect(&mgr, url, NULL, mqtt_cb, &test_data);
for (i = 0; i < 300 && buf[0] == 0; i++) mg_mgr_poll(&mgr, 10);
if (buf[0] != 'X') MG_INFO(("[%s]", buf));
ASSERT(buf[0] == 'X');
ASSERT(test_data.subscribed == 0);
ASSERT(test_data.flags == 0);
// Subscribe with QoS1
opts.topic = topic, opts.qos = 1;
mg_mqtt_sub(c, &opts);
for (i = 0; i < 500 && test_data.subscribed == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.subscribed == 1);
ASSERT(test_data.published == 0);
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.flags == flags_subscribed);
test_data.flags = 0;
opts.topic = topic, opts.message = data, opts.qos = 1, opts.retain = false;
// Publish with QoS0 to subscribed topic and check reception
opts.topic = topic, opts.message = data, opts.qos = 0, opts.retain = false;
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && test_data.published == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.published == 1);
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.flags == 0); // No PUBACK for QoS0
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf));
ASSERT(strcmp(buf, "Xx/f12/hi") == 0);
memset(buf + 1, 0, sizeof(buf) - 1);
test_data.flags = 0;
// Set params
test_data.subscribed = 0;
test_data.published = 0;
// Publish with QoS1 to subscribed topic and check reception
opts.topic = topic, opts.message = data, opts.qos = 1, opts.retain = false;
retries = 1;
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
} while (test_data.flags == 0 && retries--);
ASSERT(test_data.flags == flags_published);
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf));
ASSERT(strcmp(buf, "Xx/f12/hi") == 0);
memset(buf + 1, 0, sizeof(buf) - 1);
test_data.flags = 0;
// Disconnect !
mg_mgr_free(&mgr);
ASSERT(mgr.conns == NULL);
connect_with_options:
// (Re-)connect with options: version, clean session, last will, keepalive
// time
test_data.flags = 0;
memset(buf, 0, sizeof(buf));
memset(&opts, 0, sizeof(opts));
mg_mgr_init(&mgr);
opts.clean = true, opts.qos = 1, opts.retain = true, opts.keepalive = 20;
opts.version = mqtt_version;
@ -484,27 +518,61 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
for (i = 0; i < 300 && buf[0] == 0; i++) mg_mgr_poll(&mgr, 10);
if (buf[0] != 'X') MG_INFO(("[%s]", buf));
ASSERT(buf[0] == 'X');
ASSERT(test_data.subscribed == 0);
ASSERT(test_data.flags == 0);
opts.topic = topic, opts.qos = 1;
// Subscribe with QoS2 (reception downgrades to published QoS)
opts.topic = topic, opts.qos = 2;
mg_mqtt_sub(c, &opts);
for (i = 0; i < 500 && test_data.subscribed == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.subscribed == 1);
ASSERT(test_data.published == 0);
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.flags == flags_subscribed);
test_data.flags = 0;
// Publish with QoS1 to subscribed topic and check reception
opts.topic = topic, opts.message = data, opts.qos = 1, opts.retain = false;
if (mqtt_version == 5) {
opts.props = properties;
opts.num_props = 4;
construct_props(properties);
}
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && test_data.published == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.published == 1);
retries = 1;
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
} while (test_data.flags == 0 && retries--);
ASSERT(test_data.flags == flags_published);
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf));
ASSERT(strcmp(buf, "Xx/f12/hi") == 0);
test_data.flags = 0;
// Publish with QoS2 to subscribed topic and check (simultaneous) reception
opts.topic = topic, opts.message = data, opts.qos = 2, opts.retain = false;
if (mqtt_version == 5) {
opts.props = properties;
opts.num_props = 4;
construct_props(properties);
}
retries = 1;
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && !(test_data.flags & flags_received); i++)
mg_mgr_poll(&mgr, 10);
} while (!(test_data.flags & flags_received) && retries--);
ASSERT(test_data.flags & flags_received);
test_data.flags &= ~flags_received;
// Mongoose sent PUBREL, wait for PUBCOMP
for (i = 0; i < 500 && !(test_data.flags & flags_completed); i++)
mg_mgr_poll(&mgr, 10);
// TODO(): retry sending PUBREL on failure after an expected timeout
ASSERT(test_data.flags & flags_completed);
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf));
ASSERT(strcmp(buf, "Xx/f12/hi") == 0);
for (i = 0; i < 500 && !(test_data.flags & flags_released); i++)
mg_mgr_poll(&mgr, 10);
ASSERT(test_data.flags & flags_released); // Mongoose sent PUBCOMP
memset(buf + 1, 0, sizeof(buf) - 1);
test_data.flags = 0;
mg_mgr_free(&mgr);
ASSERT(mgr.conns == NULL);
@ -2156,7 +2224,7 @@ static void test_http_chunked_case(mg_event_handler_t s, mg_event_handler_t c,
struct mg_mgr mgr;
uint32_t i, crc = 0, expected_crc = mg_crc32(0, expected, strlen(expected));
struct mg_connection *conn;
static uint16_t port = 32344; // To prevent bind errors on Windows
static uint16_t port = 32344; // To prevent bind errors on Windows
mg_snprintf(url, sizeof(url), "http://127.0.0.1:%d", port++);
mg_mgr_init(&mgr);
mg_http_listen(&mgr, url, s, NULL);
@ -2816,7 +2884,8 @@ static void test_poll(void) {
int count = 0, i;
struct mg_mgr mgr;
mg_mgr_init(&mgr);
mg_http_listen(&mgr, "http://127.0.0.1:42346", ph, &count); // To prevent bind errors on Windows
mg_http_listen(&mgr, "http://127.0.0.1:42346", ph,
&count); // To prevent bind errors on Windows
for (i = 0; i < 10; i++) mg_mgr_poll(&mgr, 0);
ASSERT(count == 10);
mg_mgr_free(&mgr);