MqttClient.h
Go to the documentation of this file.
1 /****
2  * Sming Framework Project - Open Source framework for high efficiency native ESP8266 development.
3  * Created 2015 by Skurydin Alexey
4  * http://github.com/SmingHub/Sming
5  * All files of the Sming Core are provided under the LGPL v3 license.
6  *
7  * MqttClient.h
8  *
9  ****/
10 
11 #pragma once
12 
13 #include "TcpClient.h"
14 #include "Url.h"
15 #include <BitManipulations.h>
16 #include <WString.h>
17 #include <WHashMap.h>
18 #include <Data/ObjectQueue.h>
19 #include "Mqtt/MqttPayloadParser.h"
20 #include "mqtt-codec/src/message.h"
21 #include "mqtt-codec/src/serialiser.h"
22 #include "mqtt-codec/src/parser.h"
23 
31 
32 #ifndef MQTT_REQUEST_POOL_SIZE
33 #define MQTT_REQUEST_POOL_SIZE 10
34 #endif
35 
36 #define MQTT_CLIENT_CONNECTED bit(1)
37 
38 #define MQTT_FLAG_RETAINED 1
39 
40 #ifndef MQTT_NO_COMPAT
41 #define MQTT_MAX_BUFFER_SIZE MQTT_PAYLOAD_LENGTH
42 #define MQTT_MSG_PUBREC MQTT_TYPE_PUBREC
43 #endif
44 
45 class MqttClient;
46 
49 
50 #ifndef MQTT_NO_COMPAT
51 
55 #endif
56 
57 class MqttClient : protected TcpClient
58 {
59 public:
60  MqttClient(bool withDefaultPayloadParser = true, bool autoDestruct = false);
61 
62  ~MqttClient();
63 
68  void setKeepAlive(uint16_t seconds) //send to broker
69  {
70  keepAlive = seconds;
71  }
72 
77  void setPingRepeatTime(unsigned seconds);
78 
86  bool setWill(const String& topic, const String& message, uint8_t flags = 0);
87 
93  bool connect(const Url& url, const String& uniqueClientName);
94 
95  bool publish(const String& topic, const String& message, uint8_t flags = 0);
96  bool publish(const String& topic, IDataSourceStream* stream, uint8_t flags = 0);
97 
98  bool subscribe(const String& topic);
99  bool unsubscribe(const String& topic);
100 
101  void setEventHandler(mqtt_type_t type, MqttDelegate handler)
102  {
103  eventHandler[type] = handler;
104  }
105 
112  void setPayloadParser(MqttPayloadParser payloadParser = nullptr)
113  {
114  this->payloadParser = payloadParser;
115  }
116 
117  /* [ Convenience methods ] */
118 
125  {
126  eventHandler[MQTT_TYPE_CONNACK] = handler;
127  }
128 
136  {
137  eventHandler[MQTT_TYPE_PUBACK] = handler;
138  eventHandler[MQTT_TYPE_PUBREC] = handler;
139  }
140 
147  {
148  eventHandler[MQTT_TYPE_PUBLISH] = handler;
149  }
150 
157  {
159  }
160 
161  using TcpClient::getSsl;
163 
165 
168 
171 
172 #ifndef MQTT_NO_COMPAT
173 
176  bool setWill(const String& topic, const String& message, int QoS, bool retained = false)
177  {
178  uint8_t flags = (uint8_t)(retained + (QoS << 1));
179  return setWill(topic, message, flags);
180  }
181 
193  bool publishWithQoS(const String& topic, const String& message, int QoS, bool retained = false,
194  MqttMessageDeliveredCallback onDelivery = nullptr)
195  {
196  if(onDelivery) {
197  if(QoS == 1) {
198  setEventHandler(MQTT_TYPE_PUBACK, onPuback);
199  this->onDelivery = onDelivery;
200  } else if(QoS == 2) {
201  setEventHandler(MQTT_TYPE_PUBREC, onPuback);
202  this->onDelivery = onDelivery;
203  } else {
204  debug_w("No callback is set for QoS == 0");
205  }
206  }
207 
208  uint8_t flags = (uint8_t)(retained + (QoS << 1));
209  return publish(topic, message, flags);
210  }
211 
215  void setCallback(MqttStringSubscriptionCallback subscriptionCallback = nullptr)
216  {
217  this->subscriptionCallback = subscriptionCallback;
218  setEventHandler(MQTT_TYPE_PUBLISH, onPublish);
219  }
220 #endif
221 
222 protected:
223  void onReadyToSendData(TcpConnectionEvent sourceEvent) override;
224  void onFinished(TcpClientState finishState) override;
225 
226 private:
227  // TCP methods
228  virtual bool onTcpReceive(TcpClient& client, char* data, int size);
229 
230  // MQTT parser methods
231  static int staticOnMessageBegin(void* user_data, mqtt_message_t* message);
232  static int staticOnDataBegin(void* user_data, mqtt_message_t* message);
233  static int staticOnDataPayload(void* user_data, mqtt_message_t* message, const char* data, size_t length);
234  static int staticOnDataEnd(void* user_data, mqtt_message_t* message);
235  static int staticOnMessageEnd(void* user_data, mqtt_message_t* message);
236 
237 #ifndef MQTT_NO_COMPAT
238 
239  static int onPuback(MqttClient& client, mqtt_message_t* message)
240  {
241  if(!message) {
242  return 1;
243  }
244 
245  if(client.onDelivery) {
246  uint16_t msgId = 0;
247  if(message->common.type == MQTT_TYPE_PUBACK) {
248  msgId = message->puback.message_id;
249  } else if(message->common.type == MQTT_TYPE_PUBREC) {
250  msgId = message->pubrec.message_id;
251  }
252 
253  if(msgId) {
254  client.onDelivery(msgId, (int)message->common.type);
255  }
256  }
257 
258  return 0;
259  }
260 
262  static int onPublish(MqttClient& client, mqtt_message_t* message)
263  {
264  if(message == nullptr) {
265  return -1;
266  }
267 
268  if(message->common.length > MQTT_PAYLOAD_LENGTH) {
269  return -2;
270  }
271 
272  if(client.subscriptionCallback) {
273  String topic = String((const char*)message->publish.topic_name.data, message->publish.topic_name.length);
274  String content;
275  if(message->publish.content.data) {
276  content.concat((const char*)message->publish.content.data, message->publish.content.length);
277  }
278  client.subscriptionCallback(topic, content);
279  }
280 
281  return 0;
282  }
283 #endif
284 
285 private:
286  Url url;
287 
288  // callbacks
290  MqttPayloadParser payloadParser = nullptr;
291 
292  // states
293  MqttClientState state = eMCS_Ready;
294  MqttPayloadParserState payloadState = {};
295 
296  // keep-alives and pings
297  uint16_t keepAlive = 60;
298  unsigned pingRepeatTime = 20;
299  unsigned long lastMessage = 0;
300 
301  // messages
302  MqttRequestQueue requestQueue;
303  mqtt_message_t connectMessage;
304  bool connectQueued = false;
305  mqtt_message_t* outgoingMessage = nullptr;
306  mqtt_message_t incomingMessage;
307 
308  // parsers and serializers
309  static mqtt_serialiser_t serialiser;
310  static mqtt_parser_callbacks_t callbacks;
311  mqtt_parser_t parser;
312 
313  // client flags
314  uint8_t flags = 0;
315  /* 7 8 6 5 4 3 2 1 0
316  * |
317  * --- set when connected ...
318  */
319 
320 #ifndef MQTT_NO_COMPAT
321  MqttMessageDeliveredCallback onDelivery = nullptr;
322  MqttStringSubscriptionCallback subscriptionCallback = nullptr;
323 #endif
324 };
325 
Class to manage URL instance.
Definition: Url.h:66
IpAddress getRemoteIp() const
Definition: TcpConnection.h:89
HashMap class template.
Definition: WHashMap.h:37
Definition: MqttClient.h:30
void setSslInitHandler(Ssl::Session::InitDelegate handler)
Set the SSL session initialisation callback.
Definition: TcpConnection.h:112
void setCompleteDelegate(TcpClientCompleteDelegate completeCb=nullptr)
Set or clear the callback for connection close.
Definition: TcpClient.h:98
Base class for data source stream.
Definition: DataSourceStream.h:39
void onFinished(TcpClientState finishState) override
uint16_t getRemotePort() const
Definition: TcpConnection.h:94
void setPayloadParser(MqttPayloadParser payloadParser=nullptr)
Sets or clears a payload parser (for PUBLISH messages from the server to us)
Definition: MqttClient.h:112
#define debug_w
Definition: debug_progmem.h:98
void setDisconnectHandler(TcpClientCompleteDelegate handler)
Sets a handler to be called on disconnect from the server.
Definition: MqttClient.h:156
Definition: MqttClient.h:57
Ssl::Session * getSsl()
Get a pointer to the current SSL session object.
Definition: TcpConnection.h:135
bool connect(const Url &url, const String &uniqueClientName)
Connect to a MQTT server.
Definition: MqttClient.h:30
void setPingRepeatTime(unsigned seconds)
bool setWill(const String &topic, const String &message, uint8_t flags=0)
void setMessageHandler(MqttDelegate handler)
Sets a handler to be called after receiving a PUBLISH message from the server.
Definition: MqttClient.h:146
The String class.
Definition: WString.h:136
MqttClient(bool withDefaultPayloadParser=true, bool autoDestruct=false)
bool setWill(const String &topic, const String &message, int QoS, bool retained=false)
Definition: MqttClient.h:176
Definition: TcpClient.h:46
Definition: Delegate.h:20
TcpConnectionEvent
Definition: TcpConnection.h:25
Definition: MqttPayloadParser.h:29
bool publish(const String &topic, const String &message, uint8_t flags=0)
bool subscribe(const String &topic)
void setCallback(MqttStringSubscriptionCallback subscriptionCallback=nullptr)
Provide a function to be called when a message is received from the broker.
Definition: MqttClient.h:215
#define MQTT_PAYLOAD_LENGTH
Definition: MqttPayloadParser.h:27
bool publishWithQoS(const String &topic, const String &message, int QoS, bool retained=false, MqttMessageDeliveredCallback onDelivery=nullptr)
Definition: MqttClient.h:193
MqttClientState
Definition: MqttClient.h:30
ObjectQueue< mqtt_message_t, MQTT_REQUEST_POOL_SIZE > MqttRequestQueue
Definition: MqttClient.h:48
void setConnectedHandler(MqttDelegate handler)
Sets a handler to be called after successful MQTT connection.
Definition: MqttClient.h:124
Delegate< void(uint16_t msgId, int type)> MqttMessageDeliveredCallback
Definition: MqttClient.h:54
void onReadyToSendData(TcpConnectionEvent sourceEvent) override
bool isProcessing()
Definition: TcpClient.h:110
void setPublishedHandler(MqttDelegate handler)
Sets a handler to be called after receiving confirmation from the server for a published message from...
Definition: MqttClient.h:135
Delegate< int(MqttClient &client, mqtt_message_t *message)> MqttDelegate
Definition: MqttClient.h:45
Delegate< void(String topic, String message)> MqttStringSubscriptionCallback
Definition: MqttClient.h:52
TcpClientState
Definition: TcpClient.h:29
IDataSourceStream * stream
The currently active stream being sent.
Definition: TcpClient.h:145
void setKeepAlive(uint16_t seconds)
Sets keep-alive time. That information is sent during connection to the server.
Definition: MqttClient.h:68
void setEventHandler(mqtt_type_t type, MqttDelegate handler)
Definition: MqttClient.h:101
bool unsubscribe(const String &topic)
bool concat(const String &str)
Definition: WString.h:283
TcpClientState getConnectionState()
Definition: TcpClient.h:115