mongoose/src/mg_mqtt_server.c
Dmitry Frank c3d9d17e34 Remove "mongoose" prefix from includes
In certain files it was necessary to make tests work from public
mongoose repo, so this commit makes things consistent.

PUBLISHED_FROM=694454d0ff007229c65d524a2a2beaf126420f15
2018-02-05 20:20:05 +00:00

195 lines
5.6 KiB
C

/*
* Copyright (c) 2014 Cesanta Software Limited
* All rights reserved
*/
#include "mg_internal.h"
#include "mg_mqtt_server.h"
#if MG_ENABLE_MQTT_BROKER
static void mg_mqtt_session_init(struct mg_mqtt_broker *brk,
struct mg_mqtt_session *s,
struct mg_connection *nc) {
s->brk = brk;
s->subscriptions = NULL;
s->num_subscriptions = 0;
s->nc = nc;
}
static void mg_mqtt_add_session(struct mg_mqtt_session *s) {
LIST_INSERT_HEAD(&s->brk->sessions, s, link);
}
static void mg_mqtt_remove_session(struct mg_mqtt_session *s) {
LIST_REMOVE(s, link);
}
static void mg_mqtt_destroy_session(struct mg_mqtt_session *s) {
size_t i;
for (i = 0; i < s->num_subscriptions; i++) {
MG_FREE((void *) s->subscriptions[i].topic);
}
MG_FREE(s->subscriptions);
MG_FREE(s);
}
static void mg_mqtt_close_session(struct mg_mqtt_session *s) {
mg_mqtt_remove_session(s);
mg_mqtt_destroy_session(s);
}
void mg_mqtt_broker_init(struct mg_mqtt_broker *brk, void *user_data) {
LIST_INIT(&brk->sessions);
brk->user_data = user_data;
}
static void mg_mqtt_broker_handle_connect(struct mg_mqtt_broker *brk,
struct mg_connection *nc) {
struct mg_mqtt_session *s =
(struct mg_mqtt_session *) MG_CALLOC(1, sizeof *s);
if (s == NULL) {
/* LCOV_EXCL_START */
mg_mqtt_connack(nc, MG_EV_MQTT_CONNACK_SERVER_UNAVAILABLE);
return;
/* LCOV_EXCL_STOP */
}
/* TODO(mkm): check header (magic and version) */
mg_mqtt_session_init(brk, s, nc);
nc->priv_2 = s;
mg_mqtt_add_session(s);
mg_mqtt_connack(nc, MG_EV_MQTT_CONNACK_ACCEPTED);
}
static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc,
struct mg_mqtt_message *msg) {
struct mg_mqtt_session *ss = (struct mg_mqtt_session *) nc->priv_2;
uint8_t qoss[MG_MQTT_MAX_SESSION_SUBSCRIPTIONS];
size_t num_subs = 0;
struct mg_str topic;
uint8_t qos;
int pos;
struct mg_mqtt_topic_expression *te;
for (pos = 0;
(pos = mg_mqtt_next_subscribe_topic(msg, &topic, &qos, pos)) != -1;) {
if (num_subs >= sizeof(MG_MQTT_MAX_SESSION_SUBSCRIPTIONS) ||
(ss->num_subscriptions + num_subs >=
MG_MQTT_MAX_SESSION_SUBSCRIPTIONS)) {
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
return;
}
qoss[num_subs++] = qos;
}
if (num_subs > 0) {
te = (struct mg_mqtt_topic_expression *) MG_REALLOC(
ss->subscriptions,
sizeof(*ss->subscriptions) * (ss->num_subscriptions + num_subs));
if (te == NULL) {
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
return;
}
ss->subscriptions = te;
for (pos = 0;
pos < (int) msg->payload.len &&
(pos = mg_mqtt_next_subscribe_topic(msg, &topic, &qos, pos)) != -1;
ss->num_subscriptions++) {
te = &ss->subscriptions[ss->num_subscriptions];
te->topic = (char *) MG_MALLOC(topic.len + 1);
te->qos = qos;
memcpy((char *) te->topic, topic.p, topic.len);
((char *) te->topic)[topic.len] = '\0';
}
}
if (pos == (int) msg->payload.len) {
mg_mqtt_suback(nc, qoss, num_subs, msg->message_id);
} else {
/* We did not fully parse the payload, something must be wrong. */
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
}
}
static void mg_mqtt_broker_handle_publish(struct mg_mqtt_broker *brk,
struct mg_mqtt_message *msg) {
struct mg_mqtt_session *s;
size_t i;
for (s = mg_mqtt_next(brk, NULL); s != NULL; s = mg_mqtt_next(brk, s)) {
for (i = 0; i < s->num_subscriptions; i++) {
if (mg_mqtt_vmatch_topic_expression(s->subscriptions[i].topic,
msg->topic)) {
char buf[100], *p = buf;
mg_asprintf(&p, sizeof(buf), "%.*s", (int) msg->topic.len,
msg->topic.p);
if (p == NULL) {
return;
}
mg_mqtt_publish(s->nc, p, 0, 0, msg->payload.p, msg->payload.len);
if (p != buf) {
MG_FREE(p);
}
break;
}
}
}
}
void mg_mqtt_broker(struct mg_connection *nc, int ev, void *data) {
struct mg_mqtt_message *msg = (struct mg_mqtt_message *) data;
struct mg_mqtt_broker *brk;
if (nc->listener) {
brk = (struct mg_mqtt_broker *) nc->listener->priv_2;
} else {
brk = (struct mg_mqtt_broker *) nc->priv_2;
}
switch (ev) {
case MG_EV_ACCEPT:
if (nc->proto_data == NULL) mg_set_protocol_mqtt(nc);
nc->priv_2 = NULL; /* Clear up the inherited pointer to broker */
break;
case MG_EV_MQTT_CONNECT:
if (nc->priv_2 == NULL) {
mg_mqtt_broker_handle_connect(brk, nc);
} else {
/* Repeated CONNECT */
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
}
break;
case MG_EV_MQTT_SUBSCRIBE:
if (nc->priv_2 != NULL) {
mg_mqtt_broker_handle_subscribe(nc, msg);
} else {
/* Subscribe before CONNECT */
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
}
break;
case MG_EV_MQTT_PUBLISH:
if (nc->priv_2 != NULL) {
mg_mqtt_broker_handle_publish(brk, msg);
} else {
/* Publish before CONNECT */
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
}
break;
case MG_EV_CLOSE:
if (nc->listener && nc->priv_2 != NULL) {
mg_mqtt_close_session((struct mg_mqtt_session *) nc->priv_2);
}
break;
}
}
struct mg_mqtt_session *mg_mqtt_next(struct mg_mqtt_broker *brk,
struct mg_mqtt_session *s) {
return s == NULL ? LIST_FIRST(&brk->sessions) : LIST_NEXT(s, link);
}
#endif /* MG_ENABLE_MQTT_BROKER */