LoboMQ
Loading...
Searching...
No Matches
BrokerTopic.cpp
Go to the documentation of this file.
1
10
11bool hasWildcardCheck(const char topic[]) {
12 for (int i = 0; i < strlen(topic); i++) {
13 if (topic[i] == '+' || topic[i] == '#')
14 return true;
15 }
16 return false;
17}
18
19BrokerTopic::BrokerTopic(): logger(disableLogger()), topic(""), hasWildcards(false) {}
20
21BrokerTopic::BrokerTopic(Elog *_logger, const char topic[]) {
22 logger = _logger;
23 //inserts topic to the attribute
24 strncpy(this->topic, topic, sizeof(this->topic) - 1);
25 this->topic[sizeof(this->topic) - 1] = '\0';
26
27 this->hasWildcards = hasWildcardCheck(this->topic);
28
29 //initializes the queue
30 messagesQueue = xQueueCreate(10, sizeof(PublishContent));
31 if (messagesQueue == NULL) {
32 logger->log(ERROR, "[BROKER TOPIC %s] Couldn't create the message queue.", this->topic);
33 return;
34 }
35 logger->log(DEBUG, "[BROKER TOPIC %s] Created.", this->topic);
36}
37
38const char* BrokerTopic::getTopic() const {
39 return topic;
40}
41
42int BrokerTopic::getSubscribersAmount() const {
43 return subscribers.size();
44}
45
46const std::vector<std::array<uint8_t, 6>>& BrokerTopic::getSubscribers() const {
47 return subscribers;
48}
49
50const char* BrokerTopic::getFilename() const {
51 return filename;
52}
53
54void BrokerTopic::setFilename(const char* filename) {
55 strncpy(this->filename, filename, sizeof(this->filename) - 1);
56 this->filename[sizeof(this->filename)-1] = '\0';
57}
58
59bool addPeer(const uint8_t *mac) {
60 if(!esp_now_is_peer_exist(mac)) { //if peer not registered
61 //Register peer
62 esp_now_peer_info_t peerInfo;
63 memset(&peerInfo, 0, sizeof(peerInfo));
64 memcpy(peerInfo.peer_addr, mac, 6);
65 peerInfo.channel = 0;
66 peerInfo.encrypt = false;
67
68 if (esp_now_add_peer(&peerInfo) != ESP_OK)
69 return false;
70 }
71 return true;
72}
73
74bool removePeer(const uint8_t *mac) {
75 //Check if peer exists
76 if (esp_now_is_peer_exist(mac)) {
77 if (esp_now_del_peer(mac) != ESP_OK)
78 return false;
79
80 return true;
81 } else {
82 return false;
83 }
84}
85
86bool BrokerTopic::subscribe(const uint8_t *mac) const {
87 if (!isSubscribed(mac)) {
88 const_cast<BrokerTopic*>(this)->subscribers.push_back(*reinterpret_cast<const std::array<uint8_t, 6>*>(mac));
89 }
90 return true;
91}
92
93bool BrokerTopic::subscribe(const std::array<uint8_t, 6>& mac) const {
94 if (!isSubscribed(mac.data())) {
95 const_cast<BrokerTopic*>(this)->subscribers.push_back(mac);
96 }
97 return true;
98}
99
100bool BrokerTopic::unsubscribe(const uint8_t *mac) {
101 //Find the subscriber in the vector
102 auto it = std::find_if(subscribers.begin(), subscribers.end(), [&](const std::array<uint8_t, 6>& subscriber) {
103 return memcmp(subscriber.data(), mac, 6) == 0;
104 });
105
106 if (it != subscribers.end()) { //if found
107 subscribers.erase(it);
108 removePeer(mac);
109 return true;
110 }
111
112 return false; //Subscriber not found
113}
114
115bool BrokerTopic::isSubscribed(const uint8_t *mac) const {
116 for (const auto& subscriber : subscribers) { //checks every subscriber
117 if (memcmp(mac, subscriber.data(), 6) == 0) //if the mac was found
118 return true; //already subscribed
119 }
120 return false; //not subscribed
121}
122
123std::string BrokerTopic::getSubscribersString() const {
124 std::string result = "";
125 for (const auto& mac : subscribers) {
126 char macChar[18];
127 snprintf(macChar, sizeof(macChar), "%02X:%02X:%02X:%02X:%02X:%02X",
128 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
129 result = result + macChar + "\n";
130 }
131 return result;
132}
133
134void BrokerTopic::publish(PublishContent pubContent, std::vector<std::array<uint8_t, 6>>& alreadySentMacs) const {
135 for (const auto& subscriber : subscribers) { //goes through every subscriber
136 //Checks if the message was already sent to this subscriber's MAC
137 bool alreadySent = false;
138 for (const auto& sentMac : alreadySentMacs) {
139 if (sentMac == subscriber) {
140 alreadySent = true;
141 break;
142 }
143 }
144 if (!alreadySent) {
145 if (addPeer(subscriber.data())) {
146 esp_now_send(subscriber.data(), (uint8_t *)&pubContent, sizeof(PublishContent));
147 alreadySentMacs.push_back(subscriber);
148 } else {
149 logger->log(ERROR, "[BROKER TOPIC %s] Couldn't add peer %02X:%02X:%02X:%02X:%02X:%02X, it won't receive.",
150 topic, subscriber[0], subscriber[1], subscriber[2], subscriber[3], subscriber[4], subscriber[5]);
151 }
152 }
153 }
154 logger->log(DEBUG, "[BROKER TOPIC %s] Sent message to %d subscribers.", topic, subscribers.size());
155}
156
157bool BrokerTopic::isPublishable(const char *publishTopic) const {
158 //compares publishTopic with topic
159 if (strcmp(publishTopic, topic) == 0) //if both topics are the same
160 return true;
161 else if (!hasWildcards)
162 return false;
163 else {
164 int publishLen = strlen(publishTopic);
165 int subscribeLen = strlen(topic);
166
167 int i = 0, j = 0;
168
169 while (i < publishLen && j < subscribeLen) { //runs through every character in the topics
170 char pubChar = publishTopic[i];
171 char subChar = topic[j];
172 //if current chars are different and not wildcards, incompatibles
173 if (pubChar != subChar && subChar != '+' && subChar != '#')
174 return false;
175 if (subChar == '+') { //'+' only accepts 1 level, goes to the next topic separator '/'
176 while (publishTopic[i] != '/' && i < publishLen)
177 i++;
178 j++;
179 } else if (subChar == '#') { //'#' will accept any topic level, compatibles
180 return true;
181 } else {
182 i++; j++;
183 }
184 }
185 //if only one of the topics has been traversed, incompatibles
186 if (i < publishLen || j < subscribeLen)
187 return false;
188 else
189 return true;
190 }
191}
192
193std::string BrokerTopic::toString() const {
194 std::string result = "Topic: " + std::string(topic) + "\n";
195 result += "Subscribers:\n";
196 result += getSubscribersString();
197 return result;
198}
Elog * disableLogger()
Creates a logger without the ability to print messages.
Definition Logger.cpp:52
Structure that contains the fields used by a publish message, apart from those inherited from the Mes...
Definition PubSub.h:65