Merge branch 'task/mqtt_budget' into 'master'

esp_rmaker_mqtt: Add MQTT budgeting to control the number of messages sent

See merge request app-frameworks/esp-rainmaker!334
This commit is contained in:
Piyush Shah
2022-11-24 03:35:27 +08:00
8 changed files with 295 additions and 6 deletions

View File

@@ -1,5 +1,15 @@
# Changes
## 21-Nov-2022 (esp_rmaker_mqtt: Add MQTT budgeting to control the number of messages sent)
- Due to some poor, non-optimised coding or bugs, it is possible that the node keeps bombarding the MQTT
broker with publish messages. To prevent this, a concept of MQTT Budgeting has been added.
- By default, a node will be given a budget of 100 (`CONFIG_ESP_RMAKER_MQTT_DEFAULT_BUDGET`), which will
go on incrementing by 1 (`CONFIG_ESP_RMAKER_MQTT_BUDGET_REVIVE_COUNT` every 5 seconds (`CONFIG_ESP_RMAKER_MQTT_BUDGET_REVIVE_PERIOD`),
limited to a max value of 1024 (`CONFIG_ESP_RMAKER_MQTT_MAX_BUDGET`).
- Budget will be decremented by 1 for every MQTT publish and messages will be dropped if budget is 0.
- This behaviour is enabled by default and can be disabled by disabling `CONFIG_ESP_RMAKER_MQTT_ENABLE_BUDGETING`.
## 16-Nov-2022 (mqtt_topics: Added support for AWS basic ingest topics.)
- AWS Basic Ingest Topics optimize data flow by removing the publish/subscribe message broker from the ingestion path, making it more cost effective. You can refer the official docs [here](https://docs.aws.amazon.com/iot/latest/developerguide/iot-basic-ingest.html).

View File

@@ -36,7 +36,9 @@ endif()
set(core_priv_includes "src/core")
# MQTT
set(mqtt_srcs "src/mqtt/esp_rmaker_mqtt.c")
set(mqtt_srcs "src/mqtt/esp_rmaker_mqtt.c"
"src/mqtt/esp_rmaker_mqtt_budget.c")
set(mqtt_priv_includes "src/mqtt")
# OTA
set(ota_srcs "src/ota/esp_rmaker_ota.c"
@@ -56,7 +58,7 @@ set(standard_types_srcs "src/standard_types/esp_rmaker_standard_params.c"
idf_component_register(SRCS ${core_srcs} ${mqtt_srcs} ${ota_srcs} ${standard_types_srcs} ${console_srcs}
INCLUDE_DIRS "include"
PRIV_INCLUDE_DIRS ${core_priv_includes} ${ota_priv_includes} ${console_priv_includes}
PRIV_INCLUDE_DIRS ${core_priv_includes} ${ota_priv_includes} ${console_priv_includes} ${mqtt_priv_includes}
REQUIRES rmaker_common
PRIV_REQUIRES ${priv_req})

View File

@@ -59,6 +59,46 @@ menu "ESP RainMaker Config"
This config enables the use of AWS Basic Ingest Topics for Node to Cloud communication,
which eliminates the MQTT Broker and thus reduces messaging cost.
config ESP_RMAKER_MQTT_ENABLE_BUDGETING
bool "Enable MQTT budgeting"
default y
help
Enable MQTT budgeting, which will control the number of MQTT messages sent by the node.
config ESP_RMAKER_MQTT_DEFAULT_BUDGET
int "Default MQTT Budget"
depends on ESP_RMAKER_MQTT_ENABLE_BUDGETING
default 100
range 64 ESP_RMAKER_MQTT_MAX_BUDGET
help
Default MQTT budget. Budget will reduce on sending an MQTT message and increase based on
ESP_RMAKER_MQTT_BUDGET_REVIVE_PERIOD. If no budget is available, MQTT message will be dropped.
config ESP_RMAKER_MQTT_MAX_BUDGET
int "Max MQTT Budget"
depends on ESP_RMAKER_MQTT_ENABLE_BUDGETING
default 1024
range 64 2048
help
Maximum budget that the node can have. No additional budget will be allocated if this count is reached.
config ESP_RMAKER_MQTT_BUDGET_REVIVE_PERIOD
int "MQTT Budget revive period"
depends on ESP_RMAKER_MQTT_ENABLE_BUDGETING
default 5
range 5 600
help
Period in seconds after which the MQTT budget should revive (by ESP_RMAKER_MQTT_BUDGET_REVIVE_COUNT).
This is used to limit the messages being sent by the node.
config ESP_RMAKER_MQTT_BUDGET_REVIVE_COUNT
int "MQTT Budget revive count"
depends on ESP_RMAKER_MQTT_ENABLE_BUDGETING
default 1
range 1 16
help
The count by which the budget will be increased periodically based on ESP_RMAKER_MQTT_BUDGET_REVIVE_PERIOD.
config ESP_RMAKER_MAX_PARAM_DATA_SIZE
int "Maximum Parameters' data size"
default 1024

View File

@@ -32,6 +32,12 @@ esp_rmaker_mqtt_conn_params_t *esp_rmaker_mqtt_get_conn_params(void);
*/
esp_err_t esp_rmaker_mqtt_init(esp_rmaker_mqtt_conn_params_t *conn_params);
/* Deinitialize ESP RainMaker MQTT
*
* Call this function after MQTT has disconnected.
*/
void esp_rmaker_mqtt_deinit(void);
/** MQTT Connect
*
* Starts the connection attempts to the MQTT broker as per the configuration

View File

@@ -30,4 +30,4 @@
#define TO_NODE_TOPIC_SUFFIX "to-node"
#define INSIGHTS_TOPIC_SUFFIX "diagnostics/from-node"
#define MQTT_TOPIC_BUFFER_SIZE 150
#define MQTT_TOPIC_BUFFER_SIZE 150

View File

@@ -17,6 +17,8 @@
#include <esp_rmaker_client_data.h>
#include <esp_rmaker_core.h>
#include "esp_rmaker_mqtt_budget.h"
static const char *TAG = "esp_rmaker_mqtt";
static esp_rmaker_mqtt_config_t g_mqtt_config;
@@ -42,23 +44,44 @@ esp_err_t esp_rmaker_mqtt_init(esp_rmaker_mqtt_conn_params_t *conn_params)
esp_rmaker_mqtt_glue_setup(&g_mqtt_config);
}
if (g_mqtt_config.init) {
return g_mqtt_config.init(conn_params);
esp_err_t err = g_mqtt_config.init(conn_params);
if (err == ESP_OK) {
if (esp_rmaker_mqtt_budgeting_init() != ESP_OK) {
ESP_LOGE(TAG, "Failied to initialise MQTT Budgeting.");
}
}
return err;
}
ESP_LOGW(TAG, "esp_rmaker_mqtt_init not registered");
return ESP_OK;
}
void esp_rmaker_mqtt_deinit(void)
{
esp_rmaker_mqtt_budgeting_deinit();
if (g_mqtt_config.deinit) {
return g_mqtt_config.deinit();
}
ESP_LOGW(TAG, "esp_rmaker_mqtt_deinit not registered");
}
esp_err_t esp_rmaker_mqtt_connect(void)
{
if (g_mqtt_config.connect) {
return g_mqtt_config.connect();
esp_err_t err = g_mqtt_config.connect();
if (err == ESP_OK) {
esp_rmaker_mqtt_budgeting_start();
}
return err;
}
ESP_LOGW(TAG, "esp_rmaker_mqtt_connect not registered");
return ESP_OK;
}
esp_err_t esp_rmaker_mqtt_disconnect(void)
{
esp_rmaker_mqtt_budgeting_stop();
if (g_mqtt_config.disconnect) {
return g_mqtt_config.disconnect();
}
@@ -86,8 +109,16 @@ esp_err_t esp_rmaker_mqtt_unsubscribe(const char *topic)
esp_err_t esp_rmaker_mqtt_publish(const char *topic, void *data, size_t data_len, uint8_t qos, int *msg_id)
{
if (esp_rmaker_mqtt_is_budget_available() != true) {
ESP_LOGE(TAG, "Out of MQTT Budget. Dropping publish message.");
return ESP_FAIL;
}
if (g_mqtt_config.publish) {
return g_mqtt_config.publish(topic, data, data_len, qos, msg_id);
esp_err_t err = g_mqtt_config.publish(topic, data, data_len, qos, msg_id);
if (err == ESP_OK) {
esp_rmaker_mqtt_decrease_budget(1);
}
return err;
}
ESP_LOGW(TAG, "esp_rmaker_mqtt_publish not registered");
return ESP_OK;

View File

@@ -0,0 +1,182 @@
/*
* SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <sdkconfig.h>
#include <esp_log.h>
static const char *TAG = "esp_rmaker_mqtt_budget";
#ifdef CONFIG_ESP_RMAKER_MQTT_ENABLE_BUDGETING
#include <freertos/FreeRTOS.h>
#include <freertos/timers.h>
#include <freertos/semphr.h>
#define DEFAULT_BUDGET CONFIG_ESP_RMAKER_MQTT_DEFAULT_BUDGET
#define MAX_BUDGET CONFIG_ESP_RMAKER_MQTT_MAX_BUDGET
#define BUDGET_REVIVE_COUNT CONFIG_ESP_RMAKER_MQTT_BUDGET_REVIVE_COUNT
#define BUDGET_REVIVE_PERIOD CONFIG_ESP_RMAKER_MQTT_BUDGET_REVIVE_PERIOD
static int16_t mqtt_budget = DEFAULT_BUDGET;
static TimerHandle_t mqtt_budget_timer;
static SemaphoreHandle_t mqtt_budget_lock;
#define SEMAPHORE_DELAY_MSEC 500
bool esp_rmaker_mqtt_is_budget_available(void)
{
if (mqtt_budget_lock == NULL) {
ESP_LOGW(TAG, "MQTT budgeting not started yet. Allowing publish.");
return true;
}
if (xSemaphoreTake(mqtt_budget_lock, SEMAPHORE_DELAY_MSEC/portTICK_PERIOD_MS) != pdTRUE) {
ESP_LOGW(TAG, "Could not acquire MQTT budget lock. Allowing publish.");
return true;
}
int16_t budget = mqtt_budget;
xSemaphoreGive(mqtt_budget_lock);
return budget ? true : false;
}
esp_err_t esp_rmaker_mqtt_increase_budget(uint8_t budget)
{
if (mqtt_budget_lock == NULL) {
ESP_LOGW(TAG, "MQTT budgeting not started. Not increasing the budget.");
return ESP_FAIL;
}
if (xSemaphoreTake(mqtt_budget_lock, SEMAPHORE_DELAY_MSEC/portTICK_PERIOD_MS) != pdTRUE) {
ESP_LOGE(TAG, "Failed to increase MQTT budget.");
return ESP_FAIL;
}
mqtt_budget += budget;
if (mqtt_budget > MAX_BUDGET) {
mqtt_budget = MAX_BUDGET;
}
xSemaphoreGive(mqtt_budget_lock);
ESP_LOGD(TAG, "MQTT budget increased to %d", mqtt_budget);
return ESP_OK;
}
esp_err_t esp_rmaker_mqtt_decrease_budget(uint8_t budget)
{
if (mqtt_budget_lock == NULL) {
ESP_LOGW(TAG, "MQTT budgeting not started. Not decreasing the budget.");
return ESP_FAIL;
}
if (xSemaphoreTake(mqtt_budget_lock, SEMAPHORE_DELAY_MSEC/portTICK_PERIOD_MS) != pdTRUE) {
ESP_LOGE(TAG, "Failed to decrease MQTT budget.");
return ESP_FAIL;
}
mqtt_budget -= budget;
if (mqtt_budget < 0) {
mqtt_budget = 0;
}
xSemaphoreGive(mqtt_budget_lock);
ESP_LOGD(TAG, "MQTT budget decreased to %d.", mqtt_budget);
return ESP_OK;
}
static void esp_rmaker_mqtt_revive_budget(TimerHandle_t handle)
{
esp_rmaker_mqtt_increase_budget(BUDGET_REVIVE_COUNT);
}
esp_err_t esp_rmaker_mqtt_budgeting_start(void)
{
if (mqtt_budget_timer) {
xTimerStart(mqtt_budget_timer, 0);
return ESP_OK;
}
return ESP_FAIL;
}
esp_err_t esp_rmaker_mqtt_budgeting_stop(void)
{
if (mqtt_budget_timer) {
xTimerStop(mqtt_budget_timer, 100);
return ESP_OK;
}
return ESP_FAIL;
}
esp_err_t esp_rmaker_mqtt_budgeting_deinit(void)
{
if (mqtt_budget_timer) {
esp_rmaker_mqtt_budgeting_stop();
xTimerDelete(mqtt_budget_timer, 100);
mqtt_budget_timer = NULL;
}
if (mqtt_budget_lock) {
vSemaphoreDelete(mqtt_budget_lock);
mqtt_budget_lock = NULL;
}
return ESP_OK;
}
esp_err_t esp_rmaker_mqtt_budgeting_init(void)
{
if (mqtt_budget_timer) {
ESP_LOGI(TAG, "MQTT budgeting already initialised.");
return ESP_OK;
}
mqtt_budget_lock = xSemaphoreCreateMutex();
if (!mqtt_budget_lock) {
return ESP_FAIL;
}
mqtt_budget_timer = xTimerCreate("mqtt_budget_tm", (BUDGET_REVIVE_PERIOD * 1000) / portTICK_PERIOD_MS,
pdTRUE, NULL, esp_rmaker_mqtt_revive_budget);
if (mqtt_budget_timer) {
ESP_LOGI(TAG, "MQTT Budgeting initialised. Default: %d, Max: %d, Revive count: %d, Revive period: %d",
DEFAULT_BUDGET, MAX_BUDGET, BUDGET_REVIVE_COUNT, BUDGET_REVIVE_PERIOD);
return ESP_OK;
}
return ESP_FAIL;
}
#else /* ! CONFIG_ESP_RMAKER_MQTT_ENABLE_BUDGETING */
esp_err_t esp_rmaker_mqtt_budgeting_init(void)
{
/* Adding a print only here, because this is always going to be the first function
* to be invoked since it is called from MQTT init. Else, MQTT itself is going to fail.
*/
ESP_LOGW(TAG, "MQTT Budgeting is not enabled.");
return ESP_FAIL;
}
esp_err_t esp_rmaker_mqtt_budgeting_deinit(void)
{
return ESP_FAIL;
}
esp_err_t esp_rmaker_mqtt_budgeting_stop(void)
{
return ESP_FAIL;
}
esp_err_t esp_rmaker_mqtt_budgeting_start(void)
{
return ESP_FAIL;
}
esp_err_t esp_rmaker_mqtt_increase_budget(uint8_t budget)
{
return ESP_OK;
}
esp_err_t esp_rmaker_mqtt_decrease_budget(uint8_t budget)
{
return ESP_OK;
}
bool esp_rmaker_mqtt_is_budget_available(void)
{
return true;
}
#endif /* ! CONFIG_ESP_RMAKER_MQTT_ENABLE_BUDGETING */

View File

@@ -0,0 +1,18 @@
/*
* SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once
#include <stdint.h>
#include <esp_err.h>
esp_err_t esp_rmaker_mqtt_budgeting_init(void);
esp_err_t esp_rmaker_mqtt_budgeting_deinit(void);
esp_err_t esp_rmaker_mqtt_budgeting_stop(void);
esp_err_t esp_rmaker_mqtt_budgeting_start(void);
esp_err_t esp_rmaker_mqtt_increase_budget(uint8_t budget);
esp_err_t esp_rmaker_mqtt_decrease_budget(uint8_t budget);
bool esp_rmaker_mqtt_is_budget_available(void);