21 #include "mqtt-codec/src/message.h" 22 #include "mqtt-codec/src/serialiser.h" 23 #include "mqtt-codec/src/parser.h" 33 #ifndef MQTT_REQUEST_POOL_SIZE 34 #define MQTT_REQUEST_POOL_SIZE 10 37 #define MQTT_CLIENT_CONNECTED bit(1) 39 #define MQTT_FLAG_RETAINED 1 41 #ifndef MQTT_NO_COMPAT 42 #define MQTT_MAX_BUFFER_SIZE MQTT_PAYLOAD_LENGTH 43 #define MQTT_MSG_PUBREC MQTT_TYPE_PUBREC 51 #ifndef MQTT_NO_COMPAT 61 MqttClient(
bool withDefaultPayloadParser =
true,
bool autoDestruct =
false);
72 if(seconds < pingRepeatTime) {
83 seconds = std::min(keepAlive, seconds);
84 if(seconds != pingRepeatTime) {
85 pingRepeatTime = seconds;
86 pingTimer.reset(seconds);
114 eventHandlers[type] = handler;
125 this->payloadParser = payloadParser;
137 eventHandlers[MQTT_TYPE_CONNACK] = handler;
148 eventHandlers[MQTT_TYPE_PUBACK] = handler;
149 eventHandlers[MQTT_TYPE_PUBREC] = handler;
159 eventHandlers[MQTT_TYPE_PUBLISH] = handler;
183 #ifndef MQTT_NO_COMPAT 189 uint8_t flags = (uint8_t)(retained + (QoS << 1));
190 return setWill(topic, message, flags);
210 this->onDelivery = onDelivery;
211 }
else if(QoS == 2) {
213 this->onDelivery = onDelivery;
215 debug_w(
"No callback is set for QoS == 0");
219 uint8_t flags = (uint8_t)(retained + (QoS << 1));
220 return publish(topic, message, flags);
228 this->subscriptionCallback = subscriptionCallback;
239 virtual bool onTcpReceive(
TcpClient& client,
char* data,
int size);
242 static int staticOnMessageBegin(
void* user_data, mqtt_message_t* message);
243 static int staticOnDataBegin(
void* user_data, mqtt_message_t* message);
244 static int staticOnDataPayload(
void* user_data, mqtt_message_t* message,
const char* data,
size_t length);
245 static int staticOnDataEnd(
void* user_data, mqtt_message_t* message);
246 static int staticOnMessageEnd(
void* user_data, mqtt_message_t* message);
247 int onMessageEnd(mqtt_message_t* message);
249 #ifndef MQTT_NO_COMPAT 251 static int onPuback(
MqttClient& client, mqtt_message_t* message)
257 if(client.onDelivery) {
259 if(message->common.type == MQTT_TYPE_PUBACK) {
260 msgId = message->puback.message_id;
261 }
else if(message->common.type == MQTT_TYPE_PUBREC) {
262 msgId = message->pubrec.message_id;
266 client.onDelivery(msgId, (
int)message->common.type);
274 static int onPublish(
MqttClient& client, mqtt_message_t* message)
276 if(message ==
nullptr) {
284 if(client.subscriptionCallback) {
285 String topic =
String((
const char*)message->publish.topic_name.data, message->publish.topic_name.length);
287 if(message->publish.content.data) {
288 content.
concat((
const char*)message->publish.content.data, message->publish.content.length);
290 client.subscriptionCallback(topic, content);
316 mqtt_message_t connectMessage;
317 bool connectQueued =
false;
318 mqtt_message_t* outgoingMessage =
nullptr;
319 mqtt_message_t incomingMessage;
322 mqtt_serialiser_t serialiser;
323 static const mqtt_parser_callbacks_t callbacks;
324 mqtt_parser_t parser;
333 #ifndef MQTT_NO_COMPAT Class to manage URL instance.
Definition: Url.h:66
IpAddress getRemoteIp() const
Definition: TcpConnection.h:102
Definition: MqttClient.h:31
void setSslInitHandler(Ssl::Session::InitDelegate handler)
Set the SSL session initialisation callback.
Definition: TcpConnection.h:125
void setCompleteDelegate(TcpClientCompleteDelegate completeCb=nullptr)
Set or clear the callback for connection close.
Definition: TcpClient.h:98
Base class for read-only stream.
Definition: DataSourceStream.h:40
Template class to implement a polled timer.
Definition: PolledTimer.h:67
void onFinished(TcpClientState finishState) override
uint16_t getRemotePort() const
Definition: TcpConnection.h:107
void setPayloadParser(MqttPayloadParser payloadParser=nullptr)
Sets or clears a payload parser (for PUBLISH messages from the server to us)
Definition: MqttClient.h:123
#define debug_w
Definition: debug_progmem.h:98
void setDisconnectHandler(TcpClientCompleteDelegate handler)
Sets a handler to be called on disconnect from the server.
Definition: MqttClient.h:167
Definition: MqttClient.h:58
Ssl::Session * getSsl()
Get a pointer to the current SSL session object.
Definition: TcpConnection.h:148
bool connect(const Url &url, const String &uniqueClientName)
Connect to a MQTT server.
Definition: MqttClient.h:31
bool setWill(const String &topic, const String &message, uint8_t flags=0)
void setMessageHandler(MqttDelegate handler)
Sets a handler to be called after receiving a PUBLISH message from the server.
Definition: MqttClient.h:157
The String class.
Definition: WString.h:136
MqttClient(bool withDefaultPayloadParser=true, bool autoDestruct=false)
bool setWill(const String &topic, const String &message, int QoS, bool retained=false)
Definition: MqttClient.h:187
Definition: TcpClient.h:46
Definition: Delegate.h:20
TcpConnectionEvent
Definition: TcpConnection.h:25
Definition: MqttPayloadParser.h:29
bool publish(const String &topic, const String &message, uint8_t flags=0)
bool subscribe(const String &topic)
void setCallback(MqttStringSubscriptionCallback subscriptionCallback=nullptr)
Provide a function to be called when a message is received from the broker.
Definition: MqttClient.h:226
#define MQTT_PAYLOAD_LENGTH
Definition: MqttPayloadParser.h:27
bool publishWithQoS(const String &topic, const String &message, int QoS, bool retained=false, MqttMessageDeliveredCallback onDelivery=nullptr)
Definition: MqttClient.h:204
MqttClientState
Definition: MqttClient.h:31
void setConnectedHandler(MqttDelegate handler)
Sets a handler to be called after successful MQTT connection.
Definition: MqttClient.h:135
void onReadyToSendData(TcpConnectionEvent sourceEvent) override
bool isProcessing()
Definition: TcpClient.h:110
void setPublishedHandler(MqttDelegate handler)
Sets a handler to be called after receiving confirmation from the server for a published message from...
Definition: MqttClient.h:146
void setPingRepeatTime(uint16_t seconds)
Definition: MqttClient.h:81
TcpClientState
Definition: TcpClient.h:29
IDataSourceStream * stream
The currently active stream being sent.
Definition: TcpClient.h:157
void setKeepAlive(uint16_t seconds)
Sets keep-alive time. That information is sent during connection to the server.
Definition: MqttClient.h:69
void setEventHandler(mqtt_type_t type, MqttDelegate handler)
Definition: MqttClient.h:112
bool unsubscribe(const String &topic)
bool concat(const String &str)
Definition: WString.h:317
TcpClientState getConnectionState()
Definition: TcpClient.h:115