This commit is contained in:
2025-10-07 00:18:25 -04:00
parent 970e081f64
commit 909dddeb77
1612 changed files with 0 additions and 159018 deletions

View File

@@ -1,56 +0,0 @@
/*
* SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef _MQTT5_CLIENT_PRIV_H_
#define _MQTT5_CLIENT_PRIV_H_
#include "mqtt5_client.h"
#include "mqtt_client_priv.h"
#include "mqtt5_msg.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct mqtt5_topic_alias {
char *topic;
uint16_t topic_len;
uint16_t topic_alias;
STAILQ_ENTRY(mqtt5_topic_alias) next;
} mqtt5_topic_alias_t;
STAILQ_HEAD(mqtt5_topic_alias_list_t, mqtt5_topic_alias);
typedef struct mqtt5_topic_alias_list_t *mqtt5_topic_alias_handle_t;
typedef struct mqtt5_topic_alias *mqtt5_topic_alias_item_t;
typedef struct {
esp_mqtt5_connection_property_storage_t connect_property_info;
esp_mqtt5_connection_will_property_storage_t will_property_info;
esp_mqtt5_connection_server_resp_property_t server_resp_property_info;
esp_mqtt5_disconnect_property_config_t disconnect_property_info;
const esp_mqtt5_publish_property_config_t *publish_property_info;
const esp_mqtt5_subscribe_property_config_t *subscribe_property_info;
const esp_mqtt5_unsubscribe_property_config_t *unsubscribe_property_info;
mqtt5_topic_alias_handle_t peer_topic_alias;
} mqtt5_config_storage_t;
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client);
void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_suback(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_disconnect(esp_mqtt5_client_handle_t client, int *disconnect_rsp_code);
esp_err_t esp_mqtt5_parse_connack(esp_mqtt5_client_handle_t client, int *connect_rsp_code);
void esp_mqtt5_client_destory(esp_mqtt5_client_handle_t client);
esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int qos, int retain);
esp_err_t esp_mqtt5_client_subscribe_check(esp_mqtt5_client_handle_t client, int qos);
esp_err_t esp_mqtt5_create_default_config(esp_mqtt5_client_handle_t client);
esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *msg_buf, size_t msg_read_len, char **msg_topic, size_t *msg_topic_len, char **msg_data, size_t *msg_data_len);
#ifdef __cplusplus
}
#endif //__cplusplus
#endif

View File

@@ -1,142 +0,0 @@
#ifndef MQTT5_MSG_H
#define MQTT5_MSG_H
#include <stdint.h>
#include <stdbool.h>
#include "sys/queue.h"
#include "mqtt_config.h"
#include "mqtt_msg.h"
#include "mqtt_client.h"
#ifdef __cplusplus
extern "C" {
#endif
enum mqtt_properties_type {
MQTT5_PROPERTY_PAYLOAD_FORMAT_INDICATOR = 0x01,
MQTT5_PROPERTY_MESSAGE_EXPIRY_INTERVAL = 0x02,
MQTT5_PROPERTY_CONTENT_TYPE = 0x03,
MQTT5_PROPERTY_RESPONSE_TOPIC = 0x08,
MQTT5_PROPERTY_CORRELATION_DATA = 0x09,
MQTT5_PROPERTY_SUBSCRIBE_IDENTIFIER = 0x0B,
MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL = 0x11,
MQTT5_PROPERTY_ASSIGNED_CLIENT_IDENTIFIER = 0x12,
MQTT5_PROPERTY_SERVER_KEEP_ALIVE = 0x13,
MQTT5_PROPERTY_AUTHENTICATION_METHOD = 0x15,
MQTT5_PROPERTY_AUTHENTICATION_DATA = 0x16,
MQTT5_PROPERTY_REQUEST_PROBLEM_INFO = 0x17,
MQTT5_PROPERTY_WILL_DELAY_INTERVAL = 0x18,
MQTT5_PROPERTY_REQUEST_RESP_INFO = 0x19,
MQTT5_PROPERTY_RESP_INFO = 0x1A,
MQTT5_PROPERTY_SERVER_REFERENCE = 0x1C,
MQTT5_PROPERTY_REASON_STRING = 0x1F,
MQTT5_PROPERTY_RECEIVE_MAXIMUM = 0x21,
MQTT5_PROPERTY_TOPIC_ALIAS_MAXIMIM = 0x22,
MQTT5_PROPERTY_TOPIC_ALIAS = 0x23,
MQTT5_PROPERTY_MAXIMUM_QOS = 0x24,
MQTT5_PROPERTY_RETAIN_AVAILABLE = 0x25,
MQTT5_PROPERTY_USER_PROPERTY = 0x26,
MQTT5_PROPERTY_MAXIMUM_PACKET_SIZE = 0x27,
MQTT5_PROPERTY_WILDCARD_SUBSCR_AVAILABLE = 0x28,
MQTT5_PROPERTY_SUBSCR_IDENTIFIER_AVAILABLE = 0x29,
MQTT5_PROPERTY_SHARED_SUBSCR_AVAILABLE = 0x2A,
};
typedef struct mqtt5_user_property {
char *key;
char *value;
STAILQ_ENTRY(mqtt5_user_property) next;
} mqtt5_user_property_t;
STAILQ_HEAD(mqtt5_user_property_list_t, mqtt5_user_property);
typedef struct mqtt5_user_property *mqtt5_user_property_item_t;
typedef struct {
uint32_t maximum_packet_size;
uint16_t receive_maximum;
uint16_t topic_alias_maximum;
uint8_t max_qos;
bool retain_available;
bool wildcard_subscribe_available;
bool subscribe_identifiers_available;
bool shared_subscribe_available;
char *response_info;
} esp_mqtt5_connection_server_resp_property_t;
typedef struct {
bool payload_format_indicator;
uint32_t message_expiry_interval;
uint16_t topic_alias;
char *response_topic;
int response_topic_len;
char *correlation_data;
uint16_t correlation_data_len;
char *content_type;
int content_type_len;
uint16_t subscribe_id;
} esp_mqtt5_publish_resp_property_t;
typedef struct {
uint32_t session_expiry_interval;
uint32_t maximum_packet_size;
uint16_t receive_maximum;
uint16_t topic_alias_maximum;
bool request_resp_info;
bool request_problem_info;
mqtt5_user_property_handle_t user_property;
} esp_mqtt5_connection_property_storage_t;
typedef struct {
uint32_t will_delay_interval;
uint32_t message_expiry_interval;
bool payload_format_indicator;
char *content_type;
char *response_topic;
char *correlation_data;
uint16_t correlation_data_len;
mqtt5_user_property_handle_t user_property;
} esp_mqtt5_connection_will_property_storage_t;
#define mqtt5_get_type mqtt_get_type
#define mqtt5_get_dup mqtt_get_dup
#define mqtt5_set_dup mqtt_set_dup
#define mqtt5_get_qos mqtt_get_qos
#define mqtt5_get_retain mqtt_get_retain
#define mqtt5_msg_init mqtt_msg_init
#define mqtt5_get_total_length mqtt_get_total_length
#define mqtt5_has_valid_msg_hdr mqtt_has_valid_msg_hdr
#define mqtt5_msg_pingreq mqtt_msg_pingreq
#define mqtt5_msg_pingresp mqtt_msg_pingresp
#define mqtt5_get_unsuback_data mqtt5_get_suback_data
#define mqtt5_get_pubcomp_data mqtt5_get_puback_data
uint16_t mqtt5_get_id(uint8_t *buffer, size_t length);
char *mqtt5_get_publish_property_payload(uint8_t *buffer, size_t buffer_length, char **msg_topic, size_t *msg_topic_len, esp_mqtt5_publish_resp_property_t *resp_property, uint16_t *property_len, size_t *payload_len, mqtt5_user_property_handle_t *user_property);
char *mqtt5_get_suback_data(uint8_t *buffer, size_t *length, mqtt5_user_property_handle_t *user_property);
char *mqtt5_get_puback_data(uint8_t *buffer, size_t *length, mqtt5_user_property_handle_t *user_property);
mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info, esp_mqtt5_connection_property_storage_t *property, esp_mqtt5_connection_will_property_storage_t *will_property);
mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id, const esp_mqtt5_publish_property_config_t *property, const char *resp_info);
esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, mqtt_connect_info_t *connection_info, esp_mqtt5_connection_property_storage_t *connection_property, esp_mqtt5_connection_server_resp_property_t *resp_property, int *reason_code, uint8_t *ack_flag, mqtt5_user_property_handle_t *user_property);
int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length);
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id, const esp_mqtt5_unsubscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info);
mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt5_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt5_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt5_msg_puback(mqtt_connection_t *connection, uint16_t message_id);
#ifdef __cplusplus
}
#endif
#endif /* MQTT5_MSG_H */

View File

@@ -1,150 +0,0 @@
/*
* SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef _MQTT_CLIENT_PRIV_H_
#define _MQTT_CLIENT_PRIV_H_
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include "esp_err.h"
#include "platform.h"
#include "esp_event.h"
#include "mqtt_client.h"
#include "mqtt_msg.h"
#ifdef MQTT_PROTOCOL_5
#include "mqtt5_client_priv.h"
#endif
#include "esp_transport.h"
#include "esp_transport_tcp.h"
#include "esp_transport_ssl.h"
#include "esp_transport_ws.h"
#include "esp_log.h"
#include "mqtt_outbox.h"
#include "freertos/event_groups.h"
#include <errno.h>
#include <string.h>
#include "mqtt_supported_features.h"
/* using uri parser */
#include "http_parser.h"
#ifdef __cplusplus
extern "C" {
#endif
#if CONFIG_LIBC_NEWLIB_NANO_FORMAT
#define NEWLIB_NANO_COMPAT_FORMAT PRIu32
#define NEWLIB_NANO_COMPAT_CAST(size_t_var) (uint32_t)size_t_var
#else
#define NEWLIB_NANO_COMPAT_FORMAT "zu"
#define NEWLIB_NANO_COMPAT_CAST(size_t_var) size_t_var
#endif
#ifdef MQTT_DISABLE_API_LOCKS
# define MQTT_API_LOCK(c)
# define MQTT_API_UNLOCK(c)
#else
# define MQTT_API_LOCK(c) xSemaphoreTakeRecursive(c->api_lock, portMAX_DELAY)
# define MQTT_API_UNLOCK(c) xSemaphoreGiveRecursive(c->api_lock)
#endif /* MQTT_USE_API_LOCKS */
typedef struct mqtt_state {
uint8_t *in_buffer;
int in_buffer_length;
size_t message_length;
size_t in_buffer_read_len;
mqtt_connection_t connection;
uint16_t pending_msg_id;
int pending_msg_type;
int pending_publish_qos;
} mqtt_state_t;
typedef struct {
esp_event_loop_handle_t event_loop_handle;
int task_stack;
int task_prio;
char *uri;
char *host;
char *path;
char *scheme;
int port;
bool auto_reconnect;
int network_timeout_ms;
int refresh_connection_after_ms;
int reconnect_timeout_ms;
char **alpn_protos;
int num_alpn_protos;
char *clientkey_password;
int clientkey_password_len;
bool use_global_ca_store;
esp_err_t ((*crt_bundle_attach)(void *conf));
const int *ciphersuites_list;
const char *cacert_buf;
size_t cacert_bytes;
const char *clientcert_buf;
size_t clientcert_bytes;
const char *clientkey_buf;
size_t clientkey_bytes;
const struct psk_key_hint *psk_hint_key;
bool skip_cert_common_name_check;
const char *common_name;
bool use_secure_element;
void *ds_data;
bool use_ecdsa_peripheral;
uint8_t ecdsa_key_efuse_blk;
int message_retransmit_timeout;
uint64_t outbox_limit;
esp_transport_handle_t transport;
struct ifreq * if_name;
esp_transport_keep_alive_t tcp_keep_alive_cfg;
} mqtt_config_storage_t;
typedef enum {
MQTT_STATE_INIT = 0,
MQTT_STATE_DISCONNECTED,
MQTT_STATE_CONNECTED,
MQTT_STATE_WAIT_RECONNECT,
} mqtt_client_state_t;
struct esp_mqtt_client {
esp_transport_list_handle_t transport_list;
esp_transport_handle_t transport;
mqtt_config_storage_t *config;
mqtt_state_t mqtt_state;
_Atomic mqtt_client_state_t state;
uint64_t refresh_connection_tick;
int64_t keepalive_tick;
uint64_t reconnect_tick;
#ifdef MQTT_PROTOCOL_5
mqtt5_config_storage_t *mqtt5_config;
uint16_t send_publish_packet_count; // This is for MQTT v5.0 flow control
#endif
int wait_timeout_ms;
int auto_reconnect;
esp_mqtt_event_t event;
bool run;
bool wait_for_ping_resp;
outbox_handle_t outbox;
EventGroupHandle_t status_bits;
SemaphoreHandle_t api_lock;
TaskHandle_t task_handle;
#if MQTT_EVENT_QUEUE_SIZE > 1
atomic_int queued_events;
#endif
};
bool esp_mqtt_set_if_config(char const *const new_config, char **old_config);
void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client);
#ifdef __cplusplus
}
#endif //__cplusplus
#endif

View File

@@ -1,118 +0,0 @@
/*
* This file is subject to the terms and conditions defined in
* file 'LICENSE', which is part of this source code package.
* Tuan PM <tuanpm at live dot com>
*/
#ifndef _MQTT_CONFIG_H_
#define _MQTT_CONFIG_H_
#include "sdkconfig.h"
#ifdef CONFIG_MQTT_PROTOCOL_311
#define MQTT_PROTOCOL_311
#endif
#ifdef CONFIG_MQTT_PROTOCOL_5
#define MQTT_PROTOCOL_5
#endif
#define MQTT_RECON_DEFAULT_MS (10*1000)
#ifdef CONFIG_MQTT_POLL_READ_TIMEOUT_MS
#define MQTT_POLL_READ_TIMEOUT_MS CONFIG_MQTT_POLL_READ_TIMEOUT_MS
#else
#define MQTT_POLL_READ_TIMEOUT_MS (1000)
#endif
#define MQTT_MSG_ID_INCREMENTAL CONFIG_MQTT_MSG_ID_INCREMENTAL
#define MQTT_SKIP_PUBLISH_IF_DISCONNECTED CONFIG_MQTT_SKIP_PUBLISH_IF_DISCONNECTED
#define MQTT_REPORT_DELETED_MESSAGES CONFIG_MQTT_REPORT_DELETED_MESSAGES
#if CONFIG_MQTT_BUFFER_SIZE
#define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE
#else
#define MQTT_BUFFER_SIZE_BYTE 1024
#endif
#if CONFIG_MQTT_TASK_PRIORITY
#define MQTT_TASK_PRIORITY CONFIG_MQTT_TASK_PRIORITY
#else
#define MQTT_TASK_PRIORITY 5
#endif
#if CONFIG_MQTT_TASK_STACK_SIZE
#define MQTT_TASK_STACK CONFIG_MQTT_TASK_STACK_SIZE
#else
#define MQTT_TASK_STACK (6*1024)
#endif
#define MQTT_KEEPALIVE_TICK (120)
#define MQTT_NETWORK_TIMEOUT_MS (10000)
#ifdef CONFIG_MQTT_TCP_DEFAULT_PORT
#define MQTT_TCP_DEFAULT_PORT CONFIG_MQTT_TCP_DEFAULT_PORT
#else
#define MQTT_TCP_DEFAULT_PORT 1883
#endif
#ifdef CONFIG_MQTT_SSL_DEFAULT_PORT
#define MQTT_SSL_DEFAULT_PORT CONFIG_MQTT_SSL_DEFAULT_PORT
#else
#define MQTT_SSL_DEFAULT_PORT 8883
#endif
#ifdef CONFIG_MQTT_WS_DEFAULT_PORT
#define MQTT_WS_DEFAULT_PORT CONFIG_MQTT_WS_DEFAULT_PORT
#else
#define MQTT_WS_DEFAULT_PORT 80
#endif
#ifdef CONFIG_MQTT_WSS_DEFAULT_PORT
#define MQTT_WSS_DEFAULT_PORT CONFIG_MQTT_WSS_DEFAULT_PORT
#else
#define MQTT_WSS_DEFAULT_PORT 443
#endif
#define MQTT_CORE_SELECTION_ENABLED CONFIG_MQTT_TASK_CORE_SELECTION_ENABLED
#ifdef CONFIG_MQTT_DISABLE_API_LOCKS
#define MQTT_DISABLE_API_LOCKS CONFIG_MQTT_DISABLE_API_LOCKS
#endif
#ifdef CONFIG_MQTT_USE_CORE_0
#define MQTT_TASK_CORE 0
#else
#ifdef CONFIG_MQTT_USE_CORE_1
#define MQTT_TASK_CORE 1
#else
#define MQTT_TASK_CORE 0
#endif
#endif
#ifdef CONFIG_MQTT_OUTBOX_EXPIRED_TIMEOUT_MS
#define OUTBOX_EXPIRED_TIMEOUT_MS CONFIG_MQTT_OUTBOX_EXPIRED_TIMEOUT_MS
#else
#define OUTBOX_EXPIRED_TIMEOUT_MS (30*1000)
#endif
#define MQTT_ENABLE_SSL CONFIG_MQTT_TRANSPORT_SSL
#define MQTT_ENABLE_WS CONFIG_MQTT_TRANSPORT_WEBSOCKET
#define MQTT_ENABLE_WSS CONFIG_MQTT_TRANSPORT_WEBSOCKET_SECURE
#define MQTT_DEFAULT_RETRANSMIT_TIMEOUT_MS 1000
#ifdef CONFIG_MQTT_EVENT_QUEUE_SIZE
#define MQTT_EVENT_QUEUE_SIZE CONFIG_MQTT_EVENT_QUEUE_SIZE
#else
#define MQTT_EVENT_QUEUE_SIZE 1
#endif
#ifdef CONFIG_MQTT_OUTBOX_DATA_ON_EXTERNAL_MEMORY
#define MQTT_OUTBOX_MEMORY MALLOC_CAP_SPIRAM
#else
#define MQTT_OUTBOX_MEMORY MALLOC_CAP_DEFAULT
#endif
#define OUTBOX_MAX_SIZE (4*1024)
#endif

View File

@@ -1,152 +0,0 @@
#ifndef MQTT_MSG_H
#define MQTT_MSG_H
#include <stdint.h>
#include <stdbool.h>
#include "mqtt_config.h"
#include "mqtt_client.h"
#ifdef __cplusplus
extern "C" {
#endif
/*
* Copyright (c) 2014, Stephen Robinson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
/* 7 6 5 4 3 2 1 0 */
/*| --- Message Type---- | DUP Flag | QoS Level | Retain | */
/* Remaining Length */
enum mqtt_message_type {
MQTT_MSG_TYPE_CONNECT = 1,
MQTT_MSG_TYPE_CONNACK = 2,
MQTT_MSG_TYPE_PUBLISH = 3,
MQTT_MSG_TYPE_PUBACK = 4,
MQTT_MSG_TYPE_PUBREC = 5,
MQTT_MSG_TYPE_PUBREL = 6,
MQTT_MSG_TYPE_PUBCOMP = 7,
MQTT_MSG_TYPE_SUBSCRIBE = 8,
MQTT_MSG_TYPE_SUBACK = 9,
MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
MQTT_MSG_TYPE_UNSUBACK = 11,
MQTT_MSG_TYPE_PINGREQ = 12,
MQTT_MSG_TYPE_PINGRESP = 13,
MQTT_MSG_TYPE_DISCONNECT = 14
};
typedef struct mqtt_message {
uint8_t *data;
size_t length;
size_t fragmented_msg_total_length; /*!< total len of fragmented messages (zero for all other messages) */
size_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */
} mqtt_message_t;
typedef struct mqtt_connect_info {
char *client_id;
char *username;
char *password;
char *will_topic;
char *will_message;
int64_t keepalive; /*!< keepalive=0 -> keepalive is disabled */
int will_length;
int will_qos;
int will_retain;
int clean_session;
esp_mqtt_protocol_ver_t protocol_ver;
} mqtt_connect_info_t;
typedef struct mqtt_connection {
mqtt_message_t outbound_message;
#if MQTT_MSG_ID_INCREMENTAL
uint16_t last_message_id; /*!< last used id if incremental message id configured */
#endif
uint8_t *buffer;
size_t buffer_length;
mqtt_connect_info_t information;
} mqtt_connection_t;
static inline int mqtt_get_type(const uint8_t *buffer)
{
return (buffer[0] & 0xf0) >> 4;
}
static inline int mqtt_get_connect_session_present(const uint8_t *buffer)
{
return buffer[2] & 0x01;
}
static inline int mqtt_get_connect_return_code(const uint8_t *buffer)
{
return buffer[3];
}
static inline int mqtt_get_dup(const uint8_t *buffer)
{
return (buffer[0] & 0x08) >> 3;
}
static inline void mqtt_set_dup(uint8_t *buffer)
{
buffer[0] |= 0x08;
}
static inline int mqtt_get_qos(const uint8_t *buffer)
{
return (buffer[0] & 0x06) >> 1;
}
static inline int mqtt_get_retain(const uint8_t *buffer)
{
return (buffer[0] & 0x01);
}
bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length);
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len);
char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length);
char *mqtt_get_publish_data(uint8_t *buffer, size_t *length);
char *mqtt_get_suback_data(uint8_t *buffer, size_t *length);
uint16_t mqtt_get_id(uint8_t *buffer, size_t length);
int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length);
esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size);
void mqtt_msg_buffer_destroy(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info);
mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id);
mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id) __attribute__((nonnull));
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id);
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection);
#ifdef __cplusplus
}
#endif
#endif /* MQTT_MSG_H */

View File

@@ -1,64 +0,0 @@
/*
* This file is subject to the terms and conditions defined in
* file 'LICENSE', which is part of this source code package.
* Tuan PM <tuanpm at live dot com>
*/
#ifndef _MQTT_OUTOBX_H_
#define _MQTT_OUTOBX_H_
#include "platform.h"
#include "esp_err.h"
#ifdef __cplusplus
extern "C" {
#endif
struct outbox_item;
typedef struct outbox_t *outbox_handle_t;
typedef struct outbox_item *outbox_item_handle_t;
typedef struct outbox_message *outbox_message_handle_t;
typedef long long outbox_tick_t;
typedef struct outbox_message {
uint8_t *data;
int len;
int msg_id;
int msg_qos;
int msg_type;
uint8_t *remaining_data;
int remaining_len;
} outbox_message_t;
typedef enum pending_state {
QUEUED,
TRANSMITTED,
ACKNOWLEDGED,
CONFIRMED
} pending_state_t;
outbox_handle_t outbox_init(void);
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick);
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick);
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item);
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
/**
* @brief Deletes single expired message returning it's message id
*
* @return msg id of the deleted message, -1 if no expired message in the outbox
*/
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
pending_state_t outbox_item_get_pending(outbox_item_handle_t item);
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick);
uint64_t outbox_get_size(outbox_handle_t outbox);
void outbox_destroy(outbox_handle_t outbox);
void outbox_delete_all_items(outbox_handle_t outbox);
#ifdef __cplusplus
}
#endif
#endif

View File

@@ -1,14 +0,0 @@
/*
* This file is subject to the terms and conditions defined in
* file 'LICENSE', which is part of this source code package.
* Tuan PM <tuanpm at live dot com>
*/
#ifndef _PLATFORM_H__
#define _PLATFORM_H__
//Support ESP32
#ifdef ESP_PLATFORM
#include "platform_esp32_idf.h"
#endif
#endif

View File

@@ -1,29 +0,0 @@
/*
* This file is subject to the terms and conditions defined in
* file 'LICENSE', which is part of this source code package.
* Tuan PM <tuanpm at live dot com>
*/
#ifndef _ESP_PLATFORM_H__
#define _ESP_PLATFORM_H__
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
#include <stdint.h>
#include <sys/time.h>
char *platform_create_id_string(void);
int platform_random(int max);
uint64_t platform_tick_get_ms(void);
#define ESP_MEM_CHECK(TAG, a, action) if (!(a)) { \
ESP_LOGE(TAG,"%s(%d): %s", __FUNCTION__, __LINE__, "Memory exhausted"); \
action; \
}
#define ESP_OK_CHECK(TAG, a, action) if ((a) != ESP_OK) { \
ESP_LOGE(TAG,"%s(%d): %s", __FUNCTION__, __LINE__, "Failed"); \
action; \
}
#endif

View File

@@ -1,637 +0,0 @@
/*
* Copyright (c) 2014, Stephen Robinson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <string.h>
#include "mqtt_client.h"
#include "mqtt_msg.h"
#include "mqtt_config.h"
#include "platform.h"
#define MQTT_MAX_FIXED_HEADER_SIZE 5
#define MQTT_3_1_VARIABLE_HEADER_SIZE 12
#define MQTT_3_1_1_VARIABLE_HEADER_SIZE 10
enum mqtt_connect_flag {
MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
MQTT_CONNECT_FLAG_WILL = 1 << 2,
MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
};
static int append_string(mqtt_connection_t *connection, const char *string, int len)
{
if (connection->outbound_message.length + len + 2 > connection->buffer_length) {
return -1;
}
connection->buffer[connection->outbound_message.length++] = len >> 8;
connection->buffer[connection->outbound_message.length++] = len & 0xff;
memcpy(connection->buffer + connection->outbound_message.length, string, len);
connection->outbound_message.length += len;
return len + 2;
}
static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t message_id)
{
// If message_id is zero then we should assign one, otherwise
// we'll use the one supplied by the caller
while (message_id == 0) {
#if MQTT_MSG_ID_INCREMENTAL
message_id = ++connection->last_message_id;
#else
message_id = platform_random(65535);
#endif
}
if (connection->outbound_message.length + 2 > connection->buffer_length) {
return 0;
}
connection->buffer[connection->outbound_message.length++] = message_id >> 8;
connection->buffer[connection->outbound_message.length++] = message_id & 0xff;
return message_id;
}
static int set_message_header_size(mqtt_connection_t *connection)
{
connection->outbound_message.length = MQTT_MAX_FIXED_HEADER_SIZE;
return MQTT_MAX_FIXED_HEADER_SIZE;
}
static mqtt_message_t *fail_message(mqtt_connection_t *connection)
{
connection->outbound_message.data = connection->buffer;
connection->outbound_message.length = 0;
return &connection->outbound_message;
}
static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain)
{
int message_length = connection->outbound_message.length - MQTT_MAX_FIXED_HEADER_SIZE;
int total_length = message_length;
int encoded_length = 0;
uint8_t encoded_lens[4] = {0};
// Check if we have fragmented message and update total_len
if (connection->outbound_message.fragmented_msg_total_length) {
total_length = connection->outbound_message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
}
// Encode MQTT message length
int len_bytes = 0; // size of encoded message length
do {
encoded_length = total_length % 128;
total_length /= 128;
if (total_length > 0) {
encoded_length |= 0x80;
}
encoded_lens[len_bytes] = encoded_length;
len_bytes++;
} while (total_length > 0);
// Sanity check for MQTT header
if (len_bytes + 1 > MQTT_MAX_FIXED_HEADER_SIZE) {
return fail_message(connection);
}
// Save the header bytes
connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
connection->outbound_message.data = connection->buffer + offs;
connection->outbound_message.fragmented_msg_data_offset -= offs;
// type byte
connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
// length bytes
for (int j = 0; j < len_bytes; j++) {
connection->buffer[offs++] = encoded_lens[j];
}
return &connection->outbound_message;
}
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len)
{
int i;
size_t totlen = 0;
for (i = 1; i < length; ++i) {
totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}
}
totlen += i;
if (fixed_size_len) {
*fixed_size_len = i;
}
return totlen;
}
bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length)
{
uint16_t i;
uint16_t topiclen;
for (i = 1; i < MQTT_MAX_FIXED_HEADER_SIZE; ++i) {
if (i >= buffer_length) {
return false;
}
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}
}
// i is now the length of the fixed header
if (i + 2 >= buffer_length) {
return false;
}
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
i += topiclen;
if (mqtt_get_qos(buffer) > 0) {
i += 2;
}
// i is now the length of the fixed + variable header
return buffer_length >= i;
}
char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length)
{
int i;
int topiclen;
for (i = 1; i < *length; ++i) {
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}
}
if (i + 2 >= *length) {
return NULL;
}
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if (i + topiclen > *length) {
return NULL;
}
*length = topiclen;
return (char *)(buffer + i);
}
char *mqtt_get_publish_data(uint8_t *buffer, size_t *length)
{
int i;
int totlen = 0;
int topiclen;
int blength = *length;
*length = 0;
for (i = 1; i < blength; ++i) {
totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}
}
totlen += i;
if (i + 2 >= blength) {
return NULL;
}
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if (i + topiclen >= blength) {
return NULL;
}
i += topiclen;
if (mqtt_get_qos(buffer) > 0) {
if (i + 2 >= blength) {
return NULL;
}
i += 2;
}
if (totlen < i) {
return NULL;
}
if (totlen <= blength) {
*length = totlen - i;
} else {
*length = blength - i;
}
return (char *)(buffer + i);
}
char *mqtt_get_suback_data(uint8_t *buffer, size_t *length)
{
// SUBACK payload length = total length - (fixed header (2 bytes) + variable header (2 bytes))
// This requires the remaining length to be encoded in 1 byte.
if (*length > 4) {
*length -= 4;
return (char *)(buffer + 4);
}
*length = 0;
return NULL;
}
uint16_t mqtt_get_id(uint8_t *buffer, size_t length)
{
if (length < 1) {
return 0;
}
switch (mqtt_get_type(buffer)) {
case MQTT_MSG_TYPE_PUBLISH: {
int i;
int topiclen;
for (i = 1; i < length; ++i) {
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}
}
if (i + 2 >= length) {
return 0;
}
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if (i + topiclen > length) {
return 0;
}
i += topiclen;
if (mqtt_get_qos(buffer) > 0) {
if (i + 2 > length) {
return 0;
}
//i += 2;
} else {
return 0;
}
return (buffer[i] << 8) | buffer[i + 1];
}
case MQTT_MSG_TYPE_PUBACK:
case MQTT_MSG_TYPE_PUBREC:
case MQTT_MSG_TYPE_PUBREL:
case MQTT_MSG_TYPE_PUBCOMP:
case MQTT_MSG_TYPE_SUBACK:
case MQTT_MSG_TYPE_UNSUBACK:
case MQTT_MSG_TYPE_SUBSCRIBE:
case MQTT_MSG_TYPE_UNSUBSCRIBE: {
// This requires the remaining length to be encoded in 1 byte,
// which it should be.
if (length >= 4 && (buffer[1] & 0x80) == 0) {
return (buffer[2] << 8) | buffer[3];
} else {
return 0;
}
}
default:
return 0;
}
}
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info)
{
set_message_header_size(connection);
int header_len;
if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) {
header_len = MQTT_3_1_VARIABLE_HEADER_SIZE;
} else {
header_len = MQTT_3_1_1_VARIABLE_HEADER_SIZE;
}
if (connection->outbound_message.length + header_len > connection->buffer_length) {
return fail_message(connection);
}
char *variable_header = (char *)(connection->buffer + connection->outbound_message.length);
connection->outbound_message.length += header_len;
int header_idx = 0;
variable_header[header_idx++] = 0; // Variable header length MSB
if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) {
variable_header[header_idx++] = 6; // Variable header length LSB
memcpy(&variable_header[header_idx], "MQIsdp", 6); // Protocol name
header_idx = header_idx + 6;
variable_header[header_idx++] = 3; // Protocol version
} else {
/* Defaults to protocol version 3.1.1 values */
variable_header[header_idx++] = 4; // Variable header length LSB
memcpy(&variable_header[header_idx], "MQTT", 4); // Protocol name
header_idx = header_idx + 4;
variable_header[header_idx++] = 4; // Protocol version
}
int flags_offset = header_idx;
variable_header[header_idx++] = 0; // Flags
variable_header[header_idx++] = info->keepalive >> 8; // Keep-alive MSB
variable_header[header_idx] = info->keepalive & 0xff; // Keep-alive LSB
if (info->clean_session) {
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
}
if (info->client_id != NULL && info->client_id[0] != '\0') {
if (append_string(connection, info->client_id, strlen(info->client_id)) < 0) {
return fail_message(connection);
}
} else {
if (append_string(connection, "", 0) < 0) {
return fail_message(connection);
}
}
if (info->will_topic != NULL && info->will_topic[0] != '\0') {
if (append_string(connection, info->will_topic, strlen(info->will_topic)) < 0) {
return fail_message(connection);
}
if (append_string(connection, info->will_message, info->will_length) < 0) {
return fail_message(connection);
}
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_WILL;
if (info->will_retain) {
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_WILL_RETAIN;
}
variable_header[flags_offset] |= (info->will_qos & 3) << 3;
}
if (info->username != NULL && info->username[0] != '\0') {
if (append_string(connection, info->username, strlen(info->username)) < 0) {
return fail_message(connection);
}
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_USERNAME;
}
if (info->password != NULL && info->password[0] != '\0') {
if (info->username == NULL || info->username[0] == '\0') {
/* In case if password is set without username, we need to set a zero length username.
* (otherwise we violate: MQTT-3.1.2-22: If the User Name Flag is set to 0 then the Password Flag MUST be set to 0.)
*/
if (append_string(connection, "", 0) < 0) {
return fail_message(connection);
}
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_USERNAME;
}
if (append_string(connection, info->password, strlen(info->password)) < 0) {
return fail_message(connection);
}
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_PASSWORD;
}
return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id)
{
set_message_header_size(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
}
if (append_string(connection, topic, strlen(topic)) < 0) {
return fail_message(connection);
}
if (data == NULL && data_length > 0) {
return fail_message(connection);
}
if (qos > 0) {
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
}
} else {
*message_id = 0;
}
if (data != NULL) {
if (connection->outbound_message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length;
memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length);
connection->outbound_message.length = connection->buffer_length;
connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset;
} else {
memcpy(connection->buffer + connection->outbound_message.length, data, data_length);
connection->outbound_message.length += data_length;
connection->outbound_message.fragmented_msg_total_length = 0;
}
}
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}
mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id)
{
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id)
{
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id)
{
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
}
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id)
{
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id)
{
set_message_header_size(connection);
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
}
for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].filter[0] == '\0') {
return fail_message(connection);
}
if (append_string(connection, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)) < 0) {
return fail_message(connection);
}
if (connection->outbound_message.length + 1 > connection->buffer_length) {
return fail_message(connection);
}
connection->buffer[connection->outbound_message.length] = topic_list[topic_number].qos;
connection->outbound_message.length ++;
}
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id)
{
set_message_header_size(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
}
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
}
if (append_string(connection, topic, strlen(topic)) < 0) {
return fail_message(connection);
}
return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0);
}
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection)
{
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection)
{
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection)
{
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}
/*
* check flags: [MQTT-2.2.2-1], [MQTT-2.2.2-2]
* returns 0 if flags are invalid, otherwise returns 1
*/
int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length)
{
int qos, dup;
if (length < 1) {
return 0;
}
switch (mqtt_get_type(buffer)) {
case MQTT_MSG_TYPE_CONNECT:
case MQTT_MSG_TYPE_CONNACK:
case MQTT_MSG_TYPE_PUBACK:
case MQTT_MSG_TYPE_PUBREC:
case MQTT_MSG_TYPE_PUBCOMP:
case MQTT_MSG_TYPE_SUBACK:
case MQTT_MSG_TYPE_UNSUBACK:
case MQTT_MSG_TYPE_PINGREQ:
case MQTT_MSG_TYPE_PINGRESP:
case MQTT_MSG_TYPE_DISCONNECT:
return (buffer[0] & 0x0f) == 0; /* all flag bits are 0 */
case MQTT_MSG_TYPE_PUBREL:
case MQTT_MSG_TYPE_SUBSCRIBE:
case MQTT_MSG_TYPE_UNSUBSCRIBE:
return (buffer[0] & 0x0f) == 0x02; /* only bit 1 is set */
case MQTT_MSG_TYPE_PUBLISH:
qos = mqtt_get_qos(buffer);
dup = mqtt_get_dup(buffer);
/*
* there is no qos=3 [MQTT-3.3.1-4]
* dup flag must be set to 0 for all qos=0 messages [MQTT-3.3.1-2]
*/
return (qos < 3) && ((qos > 0) || (dup == 0));
default:
return 0;
}
}
esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size)
{
memset(&connection->outbound_message, 0, sizeof(mqtt_message_t));
connection->buffer = (uint8_t *)calloc(buffer_size, sizeof(uint8_t));
if (!connection->buffer) {
return ESP_ERR_NO_MEM;
}
connection->buffer_length = buffer_size;
return ESP_OK;
}
void mqtt_msg_buffer_destroy(mqtt_connection_t *connection)
{
if (connection) {
free(connection->buffer);
}
}

View File

@@ -1,225 +0,0 @@
#include "mqtt_outbox.h"
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include "mqtt_config.h"
#include "sys/queue.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
#ifndef CONFIG_MQTT_CUSTOM_OUTBOX
static const char *TAG = "outbox";
typedef struct outbox_item {
char *buffer;
int len;
int msg_id;
int msg_type;
int msg_qos;
outbox_tick_t tick;
pending_state_t pending;
STAILQ_ENTRY(outbox_item) next;
} outbox_item_t;
STAILQ_HEAD(outbox_list_t, outbox_item);
struct outbox_t {
_Atomic uint64_t size;
struct outbox_list_t *list;
};
outbox_handle_t outbox_init(void)
{
outbox_handle_t outbox = calloc(1, sizeof(struct outbox_t));
ESP_MEM_CHECK(TAG, outbox, return NULL);
outbox->list = calloc(1, sizeof(struct outbox_list_t));
ESP_MEM_CHECK(TAG, outbox->list, {free(outbox); return NULL;});
outbox->size = 0;
STAILQ_INIT(outbox->list);
return outbox;
}
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
{
outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
ESP_MEM_CHECK(TAG, item, return NULL);
item->msg_id = message->msg_id;
item->msg_type = message->msg_type;
item->msg_qos = message->msg_qos;
item->tick = tick;
item->len = message->len + message->remaining_len;
item->pending = QUEUED;
item->buffer = heap_caps_malloc(message->len + message->remaining_len, MQTT_OUTBOX_MEMORY);
ESP_MEM_CHECK(TAG, item->buffer, {
free(item);
return NULL;
});
memcpy(item->buffer, message->data, message->len);
if (message->remaining_data) {
memcpy(item->buffer + message->len, message->remaining_data, message->remaining_len);
}
STAILQ_INSERT_TAIL(outbox->list, item, next);
outbox->size += item->len;
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%"PRIu64, message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
return item;
}
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox->list, next) {
if (item->msg_id == msg_id) {
return item;
}
}
return NULL;
}
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox->list, next) {
if (item->pending == pending) {
if (tick) {
*tick = item->tick;
}
return item;
}
}
return NULL;
}
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox->list, next) {
if (item == item_to_delete) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
ESP_LOGD(TAG, "DELETE_ITEM msgid=%d, msg_type=%d, remain size=%"PRIu64, item_to_delete->msg_id, item_to_delete->msg_type, outbox_get_size(outbox));
free(item->buffer);
free(item);
return ESP_OK;
}
}
return ESP_FAIL;
}
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
{
if (item) {
*len = item->len;
*msg_id = item->msg_id;
*msg_type = item->msg_type;
*qos = item->msg_qos;
return (uint8_t *)item->buffer;
}
return NULL;
}
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
if (item->msg_id == msg_id && (0xFF & (item->msg_type)) == msg_type) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
ESP_LOGD(TAG, "DELETE msgid=%d, msg_type=%d, remain size=%"PRIu64, msg_id, msg_type, outbox_get_size(outbox));
free(item->buffer);
free(item);
return ESP_OK;
}
}
return ESP_FAIL;
}
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
{
outbox_item_handle_t item = outbox_get(outbox, msg_id);
if (item) {
item->pending = pending;
return ESP_OK;
}
return ESP_FAIL;
}
pending_state_t outbox_item_get_pending(outbox_item_handle_t item)
{
if (item) {
return item->pending;
}
return QUEUED;
}
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick)
{
outbox_item_handle_t item = outbox_get(outbox, msg_id);
if (item) {
item->tick = tick;
return ESP_OK;
}
return ESP_FAIL;
}
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
{
int msg_id = -1;
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox->list, next) {
if (current_tick - item->tick > timeout) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
free(item->buffer);
outbox->size -= item->len;
msg_id = item->msg_id;
free(item);
ESP_LOGD(TAG, "DELETE_SINGLE_EXPIRED msgid=%d, remain size=%"PRIu64, msg_id, outbox_get_size(outbox));
return msg_id;
}
}
return msg_id;
}
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
{
int deleted_items = 0;
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
if (current_tick - item->tick > timeout) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
free(item->buffer);
outbox->size -= item->len;
ESP_LOGD(TAG, "DELETE_EXPIRED msgid=%d, remain size=%"PRIu64, item->msg_id, outbox_get_size(outbox));
free(item);
deleted_items ++;
}
}
return deleted_items;
}
uint64_t outbox_get_size(outbox_handle_t outbox)
{
return outbox->size;
}
void outbox_delete_all_items(outbox_handle_t outbox)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
ESP_LOGD(TAG, "DELETE_ALL_ITEMS msgid=%d, msg_type=%d, remain size=%"PRIu64, item->msg_id, item->msg_type, outbox_get_size(outbox));
free(item->buffer);
free(item);
}
}
void outbox_destroy(outbox_handle_t outbox)
{
outbox_delete_all_items(outbox);
free(outbox->list);
free(outbox);
}
#endif /* CONFIG_MQTT_CUSTOM_OUTBOX */

View File

@@ -1,48 +0,0 @@
#include "platform.h"
#ifdef ESP_PLATFORM
#include "esp_log.h"
#include "esp_mac.h"
#include "soc/soc_caps.h"
#include "esp_timer.h"
#include "esp_random.h"
#include <stdlib.h>
#include <stdint.h>
static const char *TAG = "platform";
#define MAX_ID_STRING (32)
#if defined SOC_WIFI_SUPPORTED
#define MAC_TYPE ESP_MAC_WIFI_STA
#elif defined SOC_EMAC_SUPPORTED
#define MAC_TYPE ESP_MAC_ETH
#elif defined SOC_IEEE802154_SUPPORTED
#define MAC_TYPE ESP_MAC_IEEE802154
#endif
char *platform_create_id_string(void)
{
char *id_string = calloc(1, MAX_ID_STRING);
ESP_MEM_CHECK(TAG, id_string, return NULL);
#ifndef MAC_TYPE
ESP_LOGW(TAG, "Soc doesn't provide MAC, client could be disconnected in case of device with same name in the broker.");
sprintf(id_string, "esp_mqtt_client_id");
#else
uint8_t mac[6];
esp_read_mac(mac, MAC_TYPE);
sprintf(id_string, "ESP32_%02x%02X%02X", mac[3], mac[4], mac[5]);
#endif
return id_string;
}
int platform_random(int max)
{
return esp_random() % max;
}
uint64_t platform_tick_get_ms(void)
{
return esp_timer_get_time()/(int64_t)1000;
}
#endif