MqttClient.h
Go to the documentation of this file.
1 /****
2  * Sming Framework Project - Open Source framework for high efficiency native ESP8266 development.
3  * Created 2015 by Skurydin Alexey
4  * http://github.com/SmingHub/Sming
5  * All files of the Sming Core are provided under the LGPL v3 license.
6  *
7  * MqttClient.h
8  *
9  ****/
10 
11 #pragma once
12 
13 #include "TcpClient.h"
14 #include "Url.h"
15 #include <BitManipulations.h>
16 #include <WString.h>
17 #include <WHashMap.h>
18 #include <Data/ObjectQueue.h>
19 #include <Platform/Timers.h>
20 #include "Mqtt/MqttPayloadParser.h"
21 #include "mqtt-codec/src/message.h"
22 #include "mqtt-codec/src/serialiser.h"
23 #include "mqtt-codec/src/parser.h"
24 
32 
33 #ifndef MQTT_REQUEST_POOL_SIZE
34 #define MQTT_REQUEST_POOL_SIZE 10
35 #endif
36 
37 #define MQTT_CLIENT_CONNECTED bit(1)
38 
39 #define MQTT_FLAG_RETAINED 1
40 
41 #ifndef MQTT_NO_COMPAT
42 #define MQTT_MAX_BUFFER_SIZE MQTT_PAYLOAD_LENGTH
43 #define MQTT_MSG_PUBREC MQTT_TYPE_PUBREC
44 #endif
45 
46 class MqttClient;
47 
50 
51 #ifndef MQTT_NO_COMPAT
52 
56 #endif
57 
58 class MqttClient : protected TcpClient
59 {
60 public:
61  MqttClient(bool withDefaultPayloadParser = true, bool autoDestruct = false);
62 
63  ~MqttClient();
64 
69  void setKeepAlive(uint16_t seconds) //send to broker
70  {
71  keepAlive = seconds;
72  if(seconds < pingRepeatTime) {
73  setPingRepeatTime(seconds);
74  }
75  }
76 
82  {
83  seconds = std::min(keepAlive, seconds);
84  if(seconds != pingRepeatTime) {
85  pingRepeatTime = seconds;
86  pingTimer.reset(seconds);
87  }
88  }
89 
97  bool setWill(const String& topic, const String& message, uint8_t flags = 0);
98 
104  bool connect(const Url& url, const String& uniqueClientName);
105 
106  bool publish(const String& topic, const String& message, uint8_t flags = 0);
107  bool publish(const String& topic, IDataSourceStream* stream, uint8_t flags = 0);
108 
109  bool subscribe(const String& topic);
110  bool unsubscribe(const String& topic);
111 
112  void setEventHandler(mqtt_type_t type, MqttDelegate handler)
113  {
114  eventHandlers[type] = handler;
115  }
116 
123  void setPayloadParser(MqttPayloadParser payloadParser = nullptr)
124  {
125  this->payloadParser = payloadParser;
126  }
127 
128  /* [ Convenience methods ] */
129 
136  {
137  eventHandlers[MQTT_TYPE_CONNACK] = handler;
138  }
139 
147  {
148  eventHandlers[MQTT_TYPE_PUBACK] = handler;
149  eventHandlers[MQTT_TYPE_PUBREC] = handler;
150  }
151 
158  {
159  eventHandlers[MQTT_TYPE_PUBLISH] = handler;
160  }
161 
168  {
170  }
171 
172  using TcpClient::getSsl;
174 
176 
179 
182 
183 #ifndef MQTT_NO_COMPAT
184 
187  bool setWill(const String& topic, const String& message, int QoS, bool retained = false)
188  {
189  uint8_t flags = (uint8_t)(retained + (QoS << 1));
190  return setWill(topic, message, flags);
191  }
192 
204  bool publishWithQoS(const String& topic, const String& message, int QoS, bool retained = false,
205  MqttMessageDeliveredCallback onDelivery = nullptr)
206  {
207  if(onDelivery) {
208  if(QoS == 1) {
209  setEventHandler(MQTT_TYPE_PUBACK, onPuback);
210  this->onDelivery = onDelivery;
211  } else if(QoS == 2) {
212  setEventHandler(MQTT_TYPE_PUBREC, onPuback);
213  this->onDelivery = onDelivery;
214  } else {
215  debug_w("No callback is set for QoS == 0");
216  }
217  }
218 
219  uint8_t flags = (uint8_t)(retained + (QoS << 1));
220  return publish(topic, message, flags);
221  }
222 
226  void setCallback(MqttStringSubscriptionCallback subscriptionCallback = nullptr)
227  {
228  this->subscriptionCallback = subscriptionCallback;
229  setEventHandler(MQTT_TYPE_PUBLISH, onPublish);
230  }
231 #endif
232 
233 protected:
234  void onReadyToSendData(TcpConnectionEvent sourceEvent) override;
235  void onFinished(TcpClientState finishState) override;
236 
237 private:
238  // TCP methods
239  virtual bool onTcpReceive(TcpClient& client, char* data, int size);
240 
241  // MQTT parser methods
242  static int staticOnMessageBegin(void* user_data, mqtt_message_t* message);
243  static int staticOnDataBegin(void* user_data, mqtt_message_t* message);
244  static int staticOnDataPayload(void* user_data, mqtt_message_t* message, const char* data, size_t length);
245  static int staticOnDataEnd(void* user_data, mqtt_message_t* message);
246  static int staticOnMessageEnd(void* user_data, mqtt_message_t* message);
247  int onMessageEnd(mqtt_message_t* message);
248 
249 #ifndef MQTT_NO_COMPAT
250 
251  static int onPuback(MqttClient& client, mqtt_message_t* message)
252  {
253  if(!message) {
254  return 1;
255  }
256 
257  if(client.onDelivery) {
258  uint16_t msgId = 0;
259  if(message->common.type == MQTT_TYPE_PUBACK) {
260  msgId = message->puback.message_id;
261  } else if(message->common.type == MQTT_TYPE_PUBREC) {
262  msgId = message->pubrec.message_id;
263  }
264 
265  if(msgId) {
266  client.onDelivery(msgId, (int)message->common.type);
267  }
268  }
269 
270  return 0;
271  }
272 
274  static int onPublish(MqttClient& client, mqtt_message_t* message)
275  {
276  if(message == nullptr) {
277  return -1;
278  }
279 
280  if(message->common.length > MQTT_PAYLOAD_LENGTH) {
281  return -2;
282  }
283 
284  if(client.subscriptionCallback) {
285  String topic = String((const char*)message->publish.topic_name.data, message->publish.topic_name.length);
286  String content;
287  if(message->publish.content.data) {
288  content.concat((const char*)message->publish.content.data, message->publish.content.length);
289  }
290  client.subscriptionCallback(topic, content);
291  }
292 
293  return 0;
294  }
295 #endif
296 
297 private:
298  Url url;
299 
300  // callbacks
302  HandlerMap eventHandlers;
303  MqttPayloadParser payloadParser = nullptr;
304 
305  // states
306  MqttClientState state = eMCS_Ready;
307  MqttPayloadParserState payloadState = {};
308 
309  // keep-alives and pings
310  uint16_t keepAlive = 60;
311  uint16_t pingRepeatTime = 20;
313 
314  // messages
315  MqttRequestQueue requestQueue;
316  mqtt_message_t connectMessage;
317  bool connectQueued = false;
318  mqtt_message_t* outgoingMessage = nullptr;
319  mqtt_message_t incomingMessage;
320 
321  // parsers and serializers
322  mqtt_serialiser_t serialiser;
323  static const mqtt_parser_callbacks_t callbacks;
324  mqtt_parser_t parser;
325 
326  // client flags
327  uint8_t flags = 0;
328  /* 7 8 6 5 4 3 2 1 0
329  * |
330  * --- set when connected ...
331  */
332 
333 #ifndef MQTT_NO_COMPAT
334  MqttMessageDeliveredCallback onDelivery = nullptr;
335  MqttStringSubscriptionCallback subscriptionCallback = nullptr;
336 #endif
337 };
338 
Class to manage URL instance.
Definition: Url.h:66
IpAddress getRemoteIp() const
Definition: TcpConnection.h:102
Definition: MqttClient.h:31
void setSslInitHandler(Ssl::Session::InitDelegate handler)
Set the SSL session initialisation callback.
Definition: TcpConnection.h:125
void setCompleteDelegate(TcpClientCompleteDelegate completeCb=nullptr)
Set or clear the callback for connection close.
Definition: TcpClient.h:98
Base class for read-only stream.
Definition: DataSourceStream.h:40
Template class to implement a polled timer.
Definition: PolledTimer.h:67
void onFinished(TcpClientState finishState) override
uint16_t getRemotePort() const
Definition: TcpConnection.h:107
void setPayloadParser(MqttPayloadParser payloadParser=nullptr)
Sets or clears a payload parser (for PUBLISH messages from the server to us)
Definition: MqttClient.h:123
#define debug_w
Definition: debug_progmem.h:98
void setDisconnectHandler(TcpClientCompleteDelegate handler)
Sets a handler to be called on disconnect from the server.
Definition: MqttClient.h:167
Definition: MqttClient.h:58
Ssl::Session * getSsl()
Get a pointer to the current SSL session object.
Definition: TcpConnection.h:148
bool connect(const Url &url, const String &uniqueClientName)
Connect to a MQTT server.
Definition: MqttClient.h:31
bool setWill(const String &topic, const String &message, uint8_t flags=0)
void setMessageHandler(MqttDelegate handler)
Sets a handler to be called after receiving a PUBLISH message from the server.
Definition: MqttClient.h:157
The String class.
Definition: WString.h:136
MqttClient(bool withDefaultPayloadParser=true, bool autoDestruct=false)
bool setWill(const String &topic, const String &message, int QoS, bool retained=false)
Definition: MqttClient.h:187
Definition: TcpClient.h:46
Definition: Delegate.h:20
TcpConnectionEvent
Definition: TcpConnection.h:25
Definition: MqttPayloadParser.h:29
bool publish(const String &topic, const String &message, uint8_t flags=0)
bool subscribe(const String &topic)
void setCallback(MqttStringSubscriptionCallback subscriptionCallback=nullptr)
Provide a function to be called when a message is received from the broker.
Definition: MqttClient.h:226
#define MQTT_PAYLOAD_LENGTH
Definition: MqttPayloadParser.h:27
bool publishWithQoS(const String &topic, const String &message, int QoS, bool retained=false, MqttMessageDeliveredCallback onDelivery=nullptr)
Definition: MqttClient.h:204
MqttClientState
Definition: MqttClient.h:31
void setConnectedHandler(MqttDelegate handler)
Sets a handler to be called after successful MQTT connection.
Definition: MqttClient.h:135
void onReadyToSendData(TcpConnectionEvent sourceEvent) override
bool isProcessing()
Definition: TcpClient.h:110
void setPublishedHandler(MqttDelegate handler)
Sets a handler to be called after receiving confirmation from the server for a published message from...
Definition: MqttClient.h:146
void setPingRepeatTime(uint16_t seconds)
Definition: MqttClient.h:81
TcpClientState
Definition: TcpClient.h:29
IDataSourceStream * stream
The currently active stream being sent.
Definition: TcpClient.h:157
void setKeepAlive(uint16_t seconds)
Sets keep-alive time. That information is sent during connection to the server.
Definition: MqttClient.h:69
void setEventHandler(mqtt_type_t type, MqttDelegate handler)
Definition: MqttClient.h:112
bool unsubscribe(const String &topic)
bool concat(const String &str)
Definition: WString.h:317
TcpClientState getConnectionState()
Definition: TcpClient.h:115