Improve MQTT tests resiliency

This commit is contained in:
Sergio R. Caprile 2023-06-07 15:38:03 -03:00
parent 0168a312f2
commit c061e60664

View File

@ -341,8 +341,10 @@ static void test_sntp(void) {
}
struct mqtt_data {
char *buf;
size_t bufsize;
char *topic;
char *msg;
size_t topicsize;
size_t msgsize;
int flags;
};
#define flags_subscribed (1 << 0)
@ -353,10 +355,12 @@ struct mqtt_data {
static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
struct mqtt_data *test_data = (struct mqtt_data *) fnd;
char *buf = test_data->buf;
char *buf = test_data->msg;
if (ev == MG_EV_MQTT_OPEN) {
buf[0] = *(int *) evd == 0 ? 'X' : 'Y';
} else if (ev == MG_EV_CLOSE) {
buf[0] = 0;
} else if (ev == MG_EV_MQTT_CMD) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
if (mm->cmd == MQTT_CMD_SUBACK) {
@ -372,8 +376,10 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
}
} else if (ev == MG_EV_MQTT_MSG) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
snprintf(buf + 1, test_data->bufsize, "%.*s/%.*s", (int) mm->topic.len,
mm->topic.ptr, (int) mm->data.len, mm->data.ptr);
snprintf(test_data->topic, test_data->topicsize, "%.*s",
(int) mm->topic.len, mm->topic.ptr);
snprintf(buf + 1, test_data->msgsize - 2, "%.*s", (int) mm->data.len,
mm->data.ptr);
if (mm->cmd == MQTT_CMD_PUBLISH && c->is_mqtt5) {
size_t pos = 0;
@ -426,8 +432,8 @@ static void construct_props(struct mg_mqtt_prop *props) {
static void test_mqtt_base(void);
static void test_mqtt_base(void) {
char buf[50] = {0};
struct mqtt_data test_data = {buf, 50, 0};
char buf[10] = {0}; // we won't use it
struct mqtt_data test_data = {buf, buf, 10, 10, 0};
struct mg_mgr mgr;
struct mg_connection *c;
const char *url = "mqtt://broker.hivemq.com:1883";
@ -445,11 +451,24 @@ static void test_mqtt_base(void) {
ASSERT(mgr.conns == NULL);
}
static void check_mqtt_message(struct mg_mqtt_opts *opts,
struct mqtt_data *data, bool enforce) {
if (opts->topic.len != strlen(data->topic) ||
strcmp(opts->topic.ptr, data->topic)) {
MG_INFO(("TOPIC[%s]", data->topic));
if (enforce) ASSERT(0);
}
if (*data->msg != 'X' || opts->message.len != (strlen(&data->msg[1])) ||
strcmp(opts->message.ptr, &data->msg[1])) {
MG_INFO(("MSG[%s]", data->msg));
if (enforce) ASSERT(0);
}
}
static void test_mqtt_ver(uint8_t mqtt_version) {
char buf[50] = {0}, client_id[16], will_topic[16];
struct mqtt_data test_data = {buf, 50, 0};
char tbuf[16], mbuf[50] = {0}, client_id[16], topic[16];
struct mqtt_data test_data = {tbuf, mbuf, 16, 50, 0};
struct mg_mgr mgr;
struct mg_str topic = mg_str("x/f12"), data = mg_str("hi");
struct mg_connection *c;
struct mg_mqtt_opts opts;
struct mg_mqtt_prop properties[4];
@ -460,99 +479,105 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
// Connect with empty client ID, no options, ergo no MQTT != 3.1.1
if (mqtt_version != 4) goto connect_with_options;
c = mg_mqtt_connect(&mgr, url, NULL, mqtt_cb, &test_data);
for (i = 0; i < 300 && buf[0] == 0; i++) mg_mgr_poll(&mgr, 10);
if (buf[0] != 'X') MG_INFO(("[%s]", buf));
ASSERT(buf[0] == 'X');
for (i = 0; i < 300 && mbuf[0] == 0; i++) mg_mgr_poll(&mgr, 10);
if (mbuf[0] != 'X') MG_INFO(("[%s]", mbuf));
ASSERT(mbuf[0] == 'X');
ASSERT(test_data.flags == 0);
// Subscribe with QoS1
opts.topic = topic, opts.qos = 1;
opts.topic = mg_str(mg_random_str(topic, sizeof(topic)));
opts.qos = 1;
mg_mqtt_sub(c, &opts);
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.flags == flags_subscribed);
test_data.flags = 0;
// Publish with QoS0 to subscribed topic and check reception
opts.topic = topic, opts.message = data, opts.qos = 0, opts.retain = false;
// keep former opts.topic
opts.message = mg_str("hi0"), opts.qos = 0, opts.retain = false;
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf));
ASSERT(!(test_data.flags & flags_published)); // No PUBACK for QoS0
ASSERT(strcmp(buf, "Xx/f12/hi") == 0);
memset(buf + 1, 0, sizeof(buf) - 1);
for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(!(test_data.flags & flags_published)); // No PUBACK for QoS0
check_mqtt_message(&opts, &test_data, false); // We may not get the msg
memset(mbuf + 1, 0, sizeof(mbuf) - 1);
test_data.flags = 0;
// Publish with QoS1 to subscribed topic and check reception
opts.topic = topic, opts.message = data, opts.qos = 1, opts.retain = false;
// keep former opts.topic
opts.message = mg_str("hi1"), opts.qos = 1, opts.retain = false;
retries = 0; // don't do retries for test speed
do { // retry on failure after an expected timeout
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
} while (test_data.flags == 0 && retries--);
ASSERT(test_data.flags == flags_published);
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf));
ASSERT(strcmp(buf, "Xx/f12/hi") == 0);
memset(buf + 1, 0, sizeof(buf) - 1);
for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
check_mqtt_message(&opts, &test_data, true);
memset(mbuf + 1, 0, sizeof(mbuf) - 1);
test_data.flags = 0;
// Disconnect !
mg_mqtt_disconnect(c, NULL);
for (i = 0; i < 10 && mbuf[0] != 0; i++) mg_mgr_poll(&mgr, 10);
mg_mgr_free(&mgr);
ASSERT(mgr.conns == NULL);
connect_with_options:
// (Re-)connect with options: version, clean session, last will, keepalive
// time
// time. Don't set retain, some runners are not random
test_data.flags = 0;
memset(buf, 0, sizeof(buf));
memset(mbuf, 0, sizeof(mbuf));
memset(&opts, 0, sizeof(opts));
mg_mgr_init(&mgr);
opts.clean = true, opts.qos = 1, opts.retain = true, opts.keepalive = 20;
opts.clean = true, opts.qos = 1, opts.retain = false, opts.keepalive = 20;
opts.version = mqtt_version;
opts.topic = mg_str(mg_random_str(will_topic, sizeof(will_topic)));
opts.topic = mg_str(mg_random_str(topic, sizeof(topic)));
opts.message = mg_str("mg_will_messsage");
opts.client_id = mg_str(mg_random_str(client_id, sizeof(client_id)));
c = mg_mqtt_connect(&mgr, url, &opts, mqtt_cb, &test_data);
for (i = 0; i < 300 && buf[0] == 0; i++) mg_mgr_poll(&mgr, 10);
if (buf[0] != 'X') MG_INFO(("[%s]", buf));
ASSERT(buf[0] == 'X');
for (i = 0; i < 300 && mbuf[0] == 0; i++) mg_mgr_poll(&mgr, 10);
if (mbuf[0] != 'X') MG_INFO(("[%s]", mbuf));
ASSERT(mbuf[0] == 'X');
ASSERT(test_data.flags == 0);
// Subscribe with QoS2 (reception downgrades to published QoS)
opts.topic = topic, opts.qos = 2;
opts.topic = mg_str(mg_random_str(topic, sizeof(topic)));
opts.qos = 2;
mg_mqtt_sub(c, &opts);
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.flags == flags_subscribed);
test_data.flags = 0;
// Publish with QoS1 to subscribed topic and check reception
opts.topic = topic, opts.message = data, opts.qos = 1, opts.retain = false;
// keep former opts.topic
opts.message = mg_str("hi1"), opts.qos = 1, opts.retain = false;
if (mqtt_version == 5) {
opts.props = properties;
opts.num_props = 4;
construct_props(properties);
}
retries = 0; // don't do retries for test speed
do { // retry on failure after an expected timeout
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && test_data.flags == 0; i++) mg_mgr_poll(&mgr, 10);
} while (test_data.flags == 0 && retries--);
ASSERT(test_data.flags == flags_published);
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf));
ASSERT(strcmp(buf, "Xx/f12/hi") == 0);
for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
check_mqtt_message(&opts, &test_data, true);
memset(mbuf + 1, 0, sizeof(mbuf) - 1);
test_data.flags = 0;
// Publish with QoS2 to subscribed topic and check (simultaneous) reception
opts.topic = topic, opts.message = data, opts.qos = 2, opts.retain = false;
// keep former opts.topic
opts.message = mg_str("hi2"), opts.qos = 2, opts.retain = false;
if (mqtt_version == 5) {
opts.props = properties;
opts.num_props = 4;
construct_props(properties);
}
retries = 0; // don't do retries for test speed
do { // retry on failure after an expected timeout
do { // retry on failure after an expected timeout
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && !(test_data.flags & flags_received); i++)
mg_mgr_poll(&mgr, 10);
@ -565,15 +590,15 @@ connect_with_options:
// TODO(): retry sending PUBREL on failure after an expected timeout
// or broker sends PUBREC again
ASSERT(test_data.flags & flags_completed);
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf));
ASSERT(strcmp(buf, "Xx/f12/hi") == 0);
for (i = 0; i < 500 && mbuf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
check_mqtt_message(&opts, &test_data, true);
for (i = 0; i < 500 && !(test_data.flags & flags_released); i++)
mg_mgr_poll(&mgr, 10);
ASSERT(test_data.flags & flags_released); // Mongoose sent PUBCOMP
memset(buf + 1, 0, sizeof(buf) - 1);
memset(mbuf + 1, 0, sizeof(mbuf) - 1);
test_data.flags = 0;
// dirty disconnect
mg_mgr_free(&mgr);
ASSERT(mgr.conns == NULL);
}
@ -582,7 +607,6 @@ static void test_mqtt(void) {
test_mqtt_base();
test_mqtt_ver(4);
test_mqtt_ver(5);
test_mqtt_base();
}
static void eh1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {