12#define SUBSCRIBETASKS 1
13#define UNSUBSCRIBETASKS 1
16std::vector<BrokerTopic> topicsVector;
19QueueHandle_t subMsgQueue;
20QueueHandle_t unsubMsgQueue;
21QueueHandle_t pubMsgQueue;
26SemaphoreHandle_t mutex;
30void SubscribeTask(
void *parameter) {
33 if (xQueueReceive(subMsgQueue, &subParams, portMAX_DELAY) == pdPASS) {
36 const uint8_t *mac = subParams->mac;
38 logger->log(INFO,
"Subscribing to '%s' by %02X:%02X:%02X:%02X:%02X:%02X.",
39 subAnnounce->
topic, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
41 bool subscribed =
false;
42 for (
const auto& topicObject : topicsVector) {
43 if (strcmp(subAnnounce->
topic, topicObject.getTopic()) == 0) {
44 if (!topicObject.isSubscribed(mac)) {
45 topicObject.subscribe(mac);
47 writeBTToFile(
const_cast<BrokerTopic*
>(&topicObject), logger, &mutex, portMAX_DELAY);
49 logger->log(INFO,
"\tAlready subscribed to '%s'.", topicObject.getTopic());
57 logger->log(INFO,
"Topic '%s' not found, creating a new topic.", subAnnounce->
topic);
60 newTopic.subscribe(mac);
62 newTopic.setFilename(replaceChars(subAnnounce->
topic).c_str());
63 writeBTToFile(&newTopic, logger, &mutex, portMAX_DELAY);
65 topicsVector.push_back(newTopic);
67 free(subParams->subAnnounce);
70 vTaskDelay(pdMS_TO_TICKS(1000));
76void UnsubscribeTask(
void *parameter) {
80 if (xQueueReceive(unsubMsgQueue, &unsubParams, portMAX_DELAY) == pdPASS) {
83 const uint8_t *mac = unsubParams->mac;
85 logger->log(INFO,
"Unsubscribing %02X:%02X:%02X:%02X:%02X:%02X from '%s'.",
86 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5], unsubAnnounce->
topic);
88 bool unsubscribed =
false;
89 for (
auto it = topicsVector.begin(); it != topicsVector.end(); ++it) {
90 if (strcmp(unsubAnnounce->
topic, it->getTopic()) == 0) {
91 if (it->isSubscribed(mac)) {
93 if (it->getSubscribersAmount() == 0) {
94 logger->log(INFO,
"Topic '%s' has no subscribers, is being deleted.", it->getTopic());
96 deleteBTFile(it->getFilename(), logger, &mutex, portMAX_DELAY);
97 topicsVector.erase(it);
100 writeBTToFile(&(*it), logger, &mutex, portMAX_DELAY);
109 logger->log(INFO,
"\tTopic '%s' not found, it was not subscribed.", unsubAnnounce->
topic);
111 free(unsubParams->unsubAnnounce);
114 vTaskDelay(pdMS_TO_TICKS(1000));
119void PublishTask(
void *parameter) {
122 if (xQueueReceive(pubMsgQueue, &pubParams, portMAX_DELAY) == pdPASS) {
125 const uint8_t *mac = pubParams->mac;
128 std::vector<std::array<uint8_t, 6>> alreadySentMacs;
129 for (
const auto& topicObject : topicsVector) {
130 if (topicObject.isPublishable(pubContent->
topic)) {
131 topicObject.publish(*pubContent, alreadySentMacs);
136 logger->log(INFO,
"Received a %dB message by %02X:%02X:%02X:%02X:%02X:%02X with topic '%s':",
137 pubContent->
contentSize, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5], pubContent->
topic);
139 logger->log(INFO,
"\tTopic '%s' not found (has no subscribers, so it isn't published).", pubContent->
topic);
141 logger->log(INFO,
"\tSent to %d subscribers.", alreadySentMacs.size());
143 free(pubParams->pubContent);
146 vTaskDelay(pdMS_TO_TICKS(1000));
152void OnDataRecv(
const uint8_t *mac,
const uint8_t *incomingData,
int len) {
153 if (gWhitelist != BRO_DEFAULT_WHITELIST && !gWhitelist->
isInList(mac)) {
154 logger->log(INFO,
"Ignored message from %02X:%02X:%02X:%02X:%02X:%02X, it's not in the whitelist.",
155 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
162 logger->log(DEBUG,
"Received a subscribe message from %02X:%02X:%02X:%02X:%02X:%02X, adding it to the queue.",
163 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
166 if (subTaskParams == NULL) {
167 logger->log(ERROR,
"Couldn't allocate memory for subscribe task params.");
171 if (subTaskParams->subAnnounce == NULL) {
172 logger->log(ERROR,
"Couldn't allocate memory for subscribe announcement.");
177 subTaskParams->mac = mac;
179 if (xQueueSend(subMsgQueue, &subTaskParams, pdMS_TO_TICKS(1000)) != pdTRUE) {
180 logger->log(ERROR,
"Couldn't send the subscribe message to the queue.");
181 free(subTaskParams->subAnnounce);
187 logger->log(DEBUG,
"Received an unsubscribe message from %02X:%02X:%02X:%02X:%02X:%02X, adding it to the queue.",
188 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
191 if (unsubTaskParams == NULL) {
192 logger->log(ERROR,
"Couldn't allocate memory for unsubscribe task params.");
196 if (unsubTaskParams->unsubAnnounce == NULL) {
197 logger->log(ERROR,
"Couldn't allocate memory for unsubscribe announcement.");
198 free(unsubTaskParams);
202 unsubTaskParams->mac = mac;
204 if (xQueueSend(unsubMsgQueue, &unsubTaskParams, pdMS_TO_TICKS(1000)) != pdTRUE) {
205 logger->log(ERROR,
"Couldn't send the unsubscribe message to the queue.");
210 logger->log(DEBUG,
"Received a publish message from %02X:%02X:%02X:%02X:%02X:%02X, adding it to the queue",
211 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
214 if (pubTaskParams == NULL) {
215 logger->log(ERROR,
"Couldn't allocate memory for publish task params.");
219 if (pubTaskParams->pubContent == NULL) {
220 logger->log(ERROR,
"Couldn't allocate memory for publish content.");
225 pubTaskParams->mac = mac;
226 if (xQueueSend(pubMsgQueue, &pubTaskParams, pdMS_TO_TICKS(1000)) != pdTRUE) {
227 logger->log(ERROR,
"Couldn't send the publish message to the queue.");
234 logger->log(INFO,
"Invalid message type received from %02X:%02X:%02X:%02X:%02X:%02X.",
235 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
241 gWhitelist = whitelist;
243 randomSeed(analogRead(0));
244 logger->log(DEBUG,
"Initializing broker...");
248 gPersistence = persistence;
250 mutex = xSemaphoreCreateBinary();
252 logger->log(ERROR,
"Failed to create mutex, trying to continue without persistence.");
253 gPersistence =
false;
255 xSemaphoreGive(mutex);
256 if (!initializeSDCard(gCsSdPin, logger, &mutex, portMAX_DELAY)) {
257 logger->log(WARNING,
"Couldn't initialize SD card for persistence, continuing without it.");
258 gPersistence =
false;
260 logger->log(INFO,
"SD card initialized for persistence.");
265 restoreBTs(&topicsVector, logger, &mutex, portMAX_DELAY);
271 if (esp_now_init() != ESP_OK || esp_now_register_recv_cb(OnDataRecv) != ESP_OK) {
272 logger->log(CRITICAL,
"Couldn't initialize ESP-NOW, aborting!");
277 if (subMsgQueue == NULL) {
278 logger->log(CRITICAL,
"Couldn't create the subscribe message queue, aborting!");
283 if (unsubMsgQueue == NULL) {
284 logger->log(CRITICAL,
"Couldn't create the unsubscribe message queue, aborting!");
289 if (pubMsgQueue == NULL) {
290 logger->log(CRITICAL,
"Couldn't create the publish message queue, aborting!");
297 for (
int i = 0; i < SUBSCRIBETASKS; i++) {
298 snprintf(taskName,
sizeof(taskName),
"SubscribeTask%d", 0+1);
299 if (xTaskCreate(SubscribeTask, taskName, 10000, (
void *) i, 1, NULL) != pdPASS) {
300 logger->log(CRITICAL,
"Couldn't create the subscribe task, aborting!");
305 for (
int i = 0; i < UNSUBSCRIBETASKS; i++) {
306 snprintf(taskName,
sizeof(taskName),
"UnsubscribeTask%d", i+1);
307 if (xTaskCreate(UnsubscribeTask, taskName, 10000, (
void *) i, 1, NULL) != pdPASS) {
308 logger->log(CRITICAL,
"Couldn't create the unsubscribe task, aborting!");
313 for (
int i = 0; i < PUBLISHTASKS; i++) {
314 snprintf(taskName,
sizeof(taskName),
"PublishTask%d", i+1);
315 if (xTaskCreate(PublishTask, taskName, 10000, (
void *) i, 1, NULL) != pdPASS) {
316 logger->log(CRITICAL,
"Couldn't create the publish task, aborting!");
320 logger->log(INFO,
"Broker is running at %s!", WiFi.macAddress().c_str());
LMQErrType initBroker(MACAddrList *whitelist, Elog *_logger, bool persistence, int csSdPin)
Initializes the broker.
LMQErrType
Enumerates every error code that can be returned by the library functions.
@ LMQ_ERR_XQUEUECREATE_FAIL
Couldn't create the broker queue.
@ LMQ_ERR_BAD_ESP_CONFIG
Couldn't initialize ESP-NOW.
@ LMQ_ERR_XTASKCREATE_FAIL
Couldn't create the broker task.
@ LMQ_ERR_SUCCESS
No error, operation successful.
MessageType
Enumerates every type of message sent between the broker and the clients.
@ MSGTYPE_UNSUBSCRIBE
Unsubscribe message, sent from subscriber to broker.
@ MSGTYPE_PUBLISH
Publish message, sent from publisher to broker or from broker to subscriber.
@ MSGTYPE_SUBSCRIBE
Subscribe message, sent from subscriber to broker.
A class for managing a list of MAC addresses.
bool isInList(const uint8_t *mac) const
Checks if a MAC address is in the list.
Structure that contains the fields used by every message.
Structure that contains the fields used by a publish message, apart from those inherited from the Mes...
char topic[MAXTOPICLENGTH]
Topic where the message is published to.
size_t contentSize
Size of the content.
Structure that contains the fields used by a subscribe message, apart from those inherited from the M...
char topic[MAXTOPICLENGTH]
Topic that the subscriber shows interest in.
Structure that contains the fields used by a unsubscribe message, apart from those inherited from the...
char topic[MAXTOPICLENGTH]
Topic that the subscriber no longer shows interest in.