LoboMQ
Loading...
Searching...
No Matches
Broker.cpp
Go to the documentation of this file.
1
8#include "LoboMQ/Broker.h"
11
12#define SUBSCRIBETASKS 1
13#define UNSUBSCRIBETASKS 1
14#define PUBLISHTASKS 1
15
16std::vector<BrokerTopic> topicsVector; //each topic has messages and subscribers
17
18//Message queues
19QueueHandle_t subMsgQueue;
20QueueHandle_t unsubMsgQueue;
21QueueHandle_t pubMsgQueue;
22
23Elog *logger;
24bool gPersistence;
25int gCsSdPin;
26SemaphoreHandle_t mutex;
27
28MACAddrList *gWhitelist;
29
30void SubscribeTask(void *parameter) {
31 for (;;) {
32 SubscribeTaskParams *subParams;
33 if (xQueueReceive(subMsgQueue, &subParams, portMAX_DELAY) == pdPASS) { //gets the message from the queue
34 //Extracts fields from the subscription parameters
35 SubscribeAnnouncement *subAnnounce = subParams->subAnnounce;
36 const uint8_t *mac = subParams->mac;
37
38 logger->log(INFO, "Subscribing to '%s' by %02X:%02X:%02X:%02X:%02X:%02X.",
39 subAnnounce->topic, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
40
41 bool subscribed = false;
42 for (const auto& topicObject : topicsVector) { //checks every topicObject to subscribe to the proper ones
43 if (strcmp(subAnnounce->topic, topicObject.getTopic()) == 0) { //if the topic in message is the same as the topicObject
44 if (!topicObject.isSubscribed(mac)) {
45 topicObject.subscribe(mac);
46 if (gPersistence)
47 writeBTToFile(const_cast<BrokerTopic*>(&topicObject), logger, &mutex, portMAX_DELAY);
48 } else {
49 logger->log(INFO, "\tAlready subscribed to '%s'.", topicObject.getTopic());
50 }
51 subscribed = true; //if the topic was found in the vector
52 break; //exit loop, no need to keep searching topics
53 }
54 }
55
56 if (!subscribed) { //if it's a topic not existing in the vector
57 logger->log(INFO, "Topic '%s' not found, creating a new topic.", subAnnounce->topic);
58
59 BrokerTopic newTopic(logger, subAnnounce->topic);
60 newTopic.subscribe(mac);
61 if (gPersistence) {
62 newTopic.setFilename(replaceChars(subAnnounce->topic).c_str()); //translates the topic to have a compatible filename
63 writeBTToFile(&newTopic, logger, &mutex, portMAX_DELAY);
64 }
65 topicsVector.push_back(newTopic);
66 }
67 free(subParams->subAnnounce);
68 free(subParams);
69 }
70 vTaskDelay(pdMS_TO_TICKS(1000));
71 }
72
73 vTaskDelete(NULL);
74}
75
76void UnsubscribeTask(void *parameter) {
77 UnsubscribeTaskParams *unsubParams;
78
79 for (;;) {
80 if (xQueueReceive(unsubMsgQueue, &unsubParams, portMAX_DELAY) == pdPASS) { //gets the message from the queue
81 //Extracts fields from the unsubscription parameters
82 UnsubscribeAnnouncement *unsubAnnounce = unsubParams->unsubAnnounce;
83 const uint8_t *mac = unsubParams->mac;
84
85 logger->log(INFO, "Unsubscribing %02X:%02X:%02X:%02X:%02X:%02X from '%s'.",
86 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5], unsubAnnounce->topic);
87
88 bool unsubscribed = false;
89 for (auto it = topicsVector.begin(); it != topicsVector.end(); ++it) { //checks every topicObject
90 if (strcmp(unsubAnnounce->topic, it->getTopic()) == 0) { //if the topic in message is the same as the topicObject
91 if (it->isSubscribed(mac)) {
92 it->unsubscribe(mac);
93 if (it->getSubscribersAmount() == 0) { //if topicObject has no subscribers,
94 logger->log(INFO, "Topic '%s' has no subscribers, is being deleted.", it->getTopic());
95 if (gPersistence)
96 deleteBTFile(it->getFilename(), logger, &mutex, portMAX_DELAY);
97 topicsVector.erase(it); //delete topicObject from topicsVector
98 } else {
99 if (gPersistence)
100 writeBTToFile(&(*it), logger, &mutex, portMAX_DELAY);
101 }
102 unsubscribed = true;
103 break; //exit loop, no need to keep searching topics
104 }
105 }
106 }
107
108 if (!unsubscribed) { //if it's a topic not existing in the vector
109 logger->log(INFO, "\tTopic '%s' not found, it was not subscribed.", unsubAnnounce->topic);
110 }
111 free(unsubParams->unsubAnnounce);
112 free(unsubParams);
113 }
114 vTaskDelay(pdMS_TO_TICKS(1000));
115 }
116 vTaskDelete(NULL);
117}
118
119void PublishTask(void *parameter) {
120 PublishTaskParams *pubParams;
121 for (;;) {
122 if (xQueueReceive(pubMsgQueue, &pubParams, portMAX_DELAY) == pdPASS) { //gets the message from the queue
123 //Extracts fields from the publication parameters
124 PublishContent *pubContent = pubParams->pubContent;
125 const uint8_t *mac = pubParams->mac;
126
127 bool sent = false;
128 std::vector<std::array<uint8_t, 6>> alreadySentMacs;
129 for (const auto& topicObject : topicsVector) { //checks every topicObject to send the message to the proper ones
130 if (topicObject.isPublishable(pubContent->topic)) { //if the topic in message is compatible with the topicObject
131 topicObject.publish(*pubContent, alreadySentMacs);
132 sent = true; //the topic was found in the vector
133 }
134 }
135
136 logger->log(INFO, "Received a %dB message by %02X:%02X:%02X:%02X:%02X:%02X with topic '%s':",
137 pubContent->contentSize, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5], pubContent->topic);
138 if (!sent) { //if it's a topic not existing in the vector
139 logger->log(INFO, "\tTopic '%s' not found (has no subscribers, so it isn't published).", pubContent->topic);
140 } else {
141 logger->log(INFO, "\tSent to %d subscribers.", alreadySentMacs.size());
142 }
143 free(pubParams->pubContent);
144 free(pubParams);
145 }
146 vTaskDelay(pdMS_TO_TICKS(1000));
147 }
148
149 vTaskDelete(NULL);
150}
151
152void OnDataRecv(const uint8_t *mac, const uint8_t *incomingData, int len) {
153 if (gWhitelist != BRO_DEFAULT_WHITELIST && !gWhitelist->isInList(mac)) {
154 logger->log(INFO, "Ignored message from %02X:%02X:%02X:%02X:%02X:%02X, it's not in the whitelist.",
155 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
156 return;
157 }
158
159 MessageType msgType = ((MessageBase*)incomingData)->type;
160 switch (msgType) {
161 case MSGTYPE_SUBSCRIBE: {
162 logger->log(DEBUG, "Received a subscribe message from %02X:%02X:%02X:%02X:%02X:%02X, adding it to the queue.",
163 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
164 //Initialize subTaskParams with the subscribe message
165 SubscribeTaskParams *subTaskParams = (SubscribeTaskParams*)malloc(sizeof(SubscribeTaskParams));
166 if (subTaskParams == NULL) {
167 logger->log(ERROR, "Couldn't allocate memory for subscribe task params.");
168 return;
169 }
170 subTaskParams->subAnnounce = (SubscribeAnnouncement*)malloc(sizeof(SubscribeAnnouncement));
171 if (subTaskParams->subAnnounce == NULL) {
172 logger->log(ERROR, "Couldn't allocate memory for subscribe announcement.");
173 free(subTaskParams);
174 return;
175 }
176 memcpy(subTaskParams->subAnnounce, (SubscribeAnnouncement*)incomingData, sizeof(SubscribeAnnouncement));
177 subTaskParams->mac = mac;
178
179 if (xQueueSend(subMsgQueue, &subTaskParams, pdMS_TO_TICKS(1000)) != pdTRUE) {
180 logger->log(ERROR, "Couldn't send the subscribe message to the queue.");
181 free(subTaskParams->subAnnounce);
182 free(subTaskParams);
183 return;
184 }
185 } break;
186 case MSGTYPE_UNSUBSCRIBE: {
187 logger->log(DEBUG, "Received an unsubscribe message from %02X:%02X:%02X:%02X:%02X:%02X, adding it to the queue.",
188 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
189 //Initialize unsubTaskParams with the unsubscribe message
190 UnsubscribeTaskParams *unsubTaskParams = (UnsubscribeTaskParams*)malloc(sizeof(UnsubscribeTaskParams));
191 if (unsubTaskParams == NULL) {
192 logger->log(ERROR, "Couldn't allocate memory for unsubscribe task params.");
193 return;
194 }
195 unsubTaskParams->unsubAnnounce = (UnsubscribeAnnouncement*)malloc(sizeof(UnsubscribeAnnouncement));
196 if (unsubTaskParams->unsubAnnounce == NULL) {
197 logger->log(ERROR, "Couldn't allocate memory for unsubscribe announcement.");
198 free(unsubTaskParams);
199 return;
200 }
201 memcpy(unsubTaskParams->unsubAnnounce, (UnsubscribeAnnouncement*)incomingData, sizeof(UnsubscribeAnnouncement));
202 unsubTaskParams->mac = mac;
203
204 if (xQueueSend(unsubMsgQueue, &unsubTaskParams, pdMS_TO_TICKS(1000)) != pdTRUE) {
205 logger->log(ERROR, "Couldn't send the unsubscribe message to the queue.");
206 return;
207 }
208 } break;
209 case MSGTYPE_PUBLISH: {
210 logger->log(DEBUG, "Received a publish message from %02X:%02X:%02X:%02X:%02X:%02X, adding it to the queue",
211 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
212 //Initialize publishTaskParams with the publish message
213 PublishTaskParams *pubTaskParams = (PublishTaskParams*)malloc(sizeof(PublishTaskParams));
214 if (pubTaskParams == NULL) {
215 logger->log(ERROR, "Couldn't allocate memory for publish task params.");
216 return;
217 }
218 pubTaskParams->pubContent = (PublishContent*)malloc(sizeof(PublishContent));
219 if (pubTaskParams->pubContent == NULL) {
220 logger->log(ERROR, "Couldn't allocate memory for publish content.");
221 free(pubTaskParams);
222 return;
223 }
224 memcpy(pubTaskParams->pubContent, (PublishContent*)incomingData, sizeof(PublishContent));
225 pubTaskParams->mac = mac;
226 if (xQueueSend(pubMsgQueue, &pubTaskParams, pdMS_TO_TICKS(1000)) != pdTRUE) {
227 logger->log(ERROR, "Couldn't send the publish message to the queue.");
228 return;
229 }
230 } break;
231
232 default: {
233 //Log with "invalid message" and the sender's mac
234 logger->log(INFO, "Invalid message type received from %02X:%02X:%02X:%02X:%02X:%02X.",
235 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
236 } break;
237 }
238}
239
240LMQErrType initBroker(MACAddrList *whitelist, Elog *_logger, bool persistence, int csSdPin) {
241 gWhitelist = whitelist;
242 logger = _logger;
243 randomSeed(analogRead(0)); //to generate random numbers
244 logger->log(DEBUG, "Initializing broker...");
245
246 //Retrieve topics from SD card (if persistence is enabled)
247 gCsSdPin = csSdPin;
248 gPersistence = persistence;
249 if (gPersistence) {
250 mutex = xSemaphoreCreateBinary();
251 if(mutex == NULL) {
252 logger->log(ERROR, "Failed to create mutex, trying to continue without persistence.");
253 gPersistence = false;
254 } else {
255 xSemaphoreGive(mutex);
256 if (!initializeSDCard(gCsSdPin, logger, &mutex, portMAX_DELAY)) {
257 logger->log(WARNING, "Couldn't initialize SD card for persistence, continuing without it.");
258 gPersistence = false;
259 } else {
260 logger->log(INFO, "SD card initialized for persistence.");
261 }
262 }
263
264 //Restore topics from SD card
265 restoreBTs(&topicsVector, logger, &mutex, portMAX_DELAY);
266 }
267
268
269 WiFi.mode(WIFI_STA);
270 //Initialize ESP-NOW and set up receive callback
271 if (esp_now_init() != ESP_OK || esp_now_register_recv_cb(OnDataRecv) != ESP_OK) {
272 logger->log(CRITICAL, "Couldn't initialize ESP-NOW, aborting!");
274 }
275
276 subMsgQueue = xQueueCreate(10, sizeof(SubscribeTaskParams));
277 if (subMsgQueue == NULL) {
278 logger->log(CRITICAL, "Couldn't create the subscribe message queue, aborting!");
280 }
281
282 unsubMsgQueue = xQueueCreate(10, sizeof(UnsubscribeTaskParams));
283 if (unsubMsgQueue == NULL) {
284 logger->log(CRITICAL, "Couldn't create the unsubscribe message queue, aborting!");
286 }
287
288 pubMsgQueue = xQueueCreate(10, sizeof(PublishTaskParams));
289 if (pubMsgQueue == NULL) {
290 logger->log(CRITICAL, "Couldn't create the publish message queue, aborting!");
292 }
293
294
295 char taskName[20];
296
297 for (int i = 0; i < SUBSCRIBETASKS; i++) {
298 snprintf(taskName, sizeof(taskName), "SubscribeTask%d", 0+1);
299 if (xTaskCreate(SubscribeTask, taskName, 10000, (void *) i, 1, NULL) != pdPASS) {
300 logger->log(CRITICAL, "Couldn't create the subscribe task, aborting!");
302 }
303 }
304
305 for (int i = 0; i < UNSUBSCRIBETASKS; i++) {
306 snprintf(taskName, sizeof(taskName), "UnsubscribeTask%d", i+1);
307 if (xTaskCreate(UnsubscribeTask, taskName, 10000, (void *) i, 1, NULL) != pdPASS) {
308 logger->log(CRITICAL, "Couldn't create the unsubscribe task, aborting!");
310 }
311 }
312
313 for (int i = 0; i < PUBLISHTASKS; i++) {
314 snprintf(taskName, sizeof(taskName), "PublishTask%d", i+1);
315 if (xTaskCreate(PublishTask, taskName, 10000, (void *) i, 1, NULL) != pdPASS) {
316 logger->log(CRITICAL, "Couldn't create the publish task, aborting!");
318 }
319 }
320 logger->log(INFO, "Broker is running at %s!", WiFi.macAddress().c_str());
321 return LMQ_ERR_SUCCESS;
322}
LMQErrType initBroker(MACAddrList *whitelist, Elog *_logger, bool persistence, int csSdPin)
Initializes the broker.
Definition Broker.cpp:240
LMQErrType
Enumerates every error code that can be returned by the library functions.
Definition Includes.h:28
@ LMQ_ERR_XQUEUECREATE_FAIL
Couldn't create the broker queue.
Definition Includes.h:45
@ LMQ_ERR_BAD_ESP_CONFIG
Couldn't initialize ESP-NOW.
Definition Includes.h:39
@ LMQ_ERR_XTASKCREATE_FAIL
Couldn't create the broker task.
Definition Includes.h:47
@ LMQ_ERR_SUCCESS
No error, operation successful.
Definition Includes.h:30
MessageType
Enumerates every type of message sent between the broker and the clients.
Definition PubSub.h:25
@ MSGTYPE_UNSUBSCRIBE
Unsubscribe message, sent from subscriber to broker.
Definition PubSub.h:29
@ MSGTYPE_PUBLISH
Publish message, sent from publisher to broker or from broker to subscriber.
Definition PubSub.h:31
@ MSGTYPE_SUBSCRIBE
Subscribe message, sent from subscriber to broker.
Definition PubSub.h:27
A class for managing a list of MAC addresses.
Definition MACAddrList.h:27
bool isInList(const uint8_t *mac) const
Checks if a MAC address is in the list.
Structure that contains the fields used by every message.
Definition PubSub.h:38
Structure that contains the fields used by a publish message, apart from those inherited from the Mes...
Definition PubSub.h:65
char topic[MAXTOPICLENGTH]
Topic where the message is published to.
Definition PubSub.h:66
size_t contentSize
Size of the content.
Definition PubSub.h:67
Structure that contains the fields used by a subscribe message, apart from those inherited from the M...
Definition PubSub.h:47
char topic[MAXTOPICLENGTH]
Topic that the subscriber shows interest in.
Definition PubSub.h:48
Structure that contains the fields used by a unsubscribe message, apart from those inherited from the...
Definition PubSub.h:56
char topic[MAXTOPICLENGTH]
Topic that the subscriber no longer shows interest in.
Definition PubSub.h:57