From d9994f3fb6854d41ba5ecc38d0f6319f3d5b7ccd Mon Sep 17 00:00:00 2001 From: Piyush Shah Date: Mon, 21 Nov 2022 18:54:20 +0530 Subject: [PATCH] esp_rmaker_mqtt: Add MQTT budgeting to control the number of messages sent --- CHANGES.md | 10 + components/esp_rainmaker/CMakeLists.txt | 6 +- components/esp_rainmaker/Kconfig.projbuild | 40 ++++ .../esp_rainmaker/include/esp_rmaker_mqtt.h | 6 + .../src/core/esp_rmaker_mqtt_topics.h | 2 +- .../esp_rainmaker/src/mqtt/esp_rmaker_mqtt.c | 37 +++- .../src/mqtt/esp_rmaker_mqtt_budget.c | 182 ++++++++++++++++++ .../src/mqtt/esp_rmaker_mqtt_budget.h | 18 ++ 8 files changed, 295 insertions(+), 6 deletions(-) create mode 100644 components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt_budget.c create mode 100644 components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt_budget.h diff --git a/CHANGES.md b/CHANGES.md index 3b87352..0b65184 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,15 @@ # Changes +## 21-Nov-2022 (esp_rmaker_mqtt: Add MQTT budgeting to control the number of messages sent) + +- Due to some poor, non-optimised coding or bugs, it is possible that the node keeps bombarding the MQTT +broker with publish messages. To prevent this, a concept of MQTT Budgeting has been added. +- By default, a node will be given a budget of 100 (`CONFIG_ESP_RMAKER_MQTT_DEFAULT_BUDGET`), which will + go on incrementing by 1 (`CONFIG_ESP_RMAKER_MQTT_BUDGET_REVIVE_COUNT` every 5 seconds (`CONFIG_ESP_RMAKER_MQTT_BUDGET_REVIVE_PERIOD`), + limited to a max value of 1024 (`CONFIG_ESP_RMAKER_MQTT_MAX_BUDGET`). +- Budget will be decremented by 1 for every MQTT publish and messages will be dropped if budget is 0. +- This behaviour is enabled by default and can be disabled by disabling `CONFIG_ESP_RMAKER_MQTT_ENABLE_BUDGETING`. + ## 16-Nov-2022 (mqtt_topics: Added support for AWS basic ingest topics.) - AWS Basic Ingest Topics optimize data flow by removing the publish/subscribe message broker from the ingestion path, making it more cost effective. You can refer the official docs [here](https://docs.aws.amazon.com/iot/latest/developerguide/iot-basic-ingest.html). diff --git a/components/esp_rainmaker/CMakeLists.txt b/components/esp_rainmaker/CMakeLists.txt index 71306b9..d8d43cc 100644 --- a/components/esp_rainmaker/CMakeLists.txt +++ b/components/esp_rainmaker/CMakeLists.txt @@ -36,7 +36,9 @@ endif() set(core_priv_includes "src/core") # MQTT -set(mqtt_srcs "src/mqtt/esp_rmaker_mqtt.c") +set(mqtt_srcs "src/mqtt/esp_rmaker_mqtt.c" + "src/mqtt/esp_rmaker_mqtt_budget.c") +set(mqtt_priv_includes "src/mqtt") # OTA set(ota_srcs "src/ota/esp_rmaker_ota.c" @@ -56,7 +58,7 @@ set(standard_types_srcs "src/standard_types/esp_rmaker_standard_params.c" idf_component_register(SRCS ${core_srcs} ${mqtt_srcs} ${ota_srcs} ${standard_types_srcs} ${console_srcs} INCLUDE_DIRS "include" - PRIV_INCLUDE_DIRS ${core_priv_includes} ${ota_priv_includes} ${console_priv_includes} + PRIV_INCLUDE_DIRS ${core_priv_includes} ${ota_priv_includes} ${console_priv_includes} ${mqtt_priv_includes} REQUIRES rmaker_common PRIV_REQUIRES ${priv_req}) diff --git a/components/esp_rainmaker/Kconfig.projbuild b/components/esp_rainmaker/Kconfig.projbuild index 6e71cc6..6d76c54 100644 --- a/components/esp_rainmaker/Kconfig.projbuild +++ b/components/esp_rainmaker/Kconfig.projbuild @@ -59,6 +59,46 @@ menu "ESP RainMaker Config" This config enables the use of AWS Basic Ingest Topics for Node to Cloud communication, which eliminates the MQTT Broker and thus reduces messaging cost. + config ESP_RMAKER_MQTT_ENABLE_BUDGETING + bool "Enable MQTT budgeting" + default y + help + Enable MQTT budgeting, which will control the number of MQTT messages sent by the node. + + config ESP_RMAKER_MQTT_DEFAULT_BUDGET + int "Default MQTT Budget" + depends on ESP_RMAKER_MQTT_ENABLE_BUDGETING + default 100 + range 64 ESP_RMAKER_MQTT_MAX_BUDGET + help + Default MQTT budget. Budget will reduce on sending an MQTT message and increase based on + ESP_RMAKER_MQTT_BUDGET_REVIVE_PERIOD. If no budget is available, MQTT message will be dropped. + + config ESP_RMAKER_MQTT_MAX_BUDGET + int "Max MQTT Budget" + depends on ESP_RMAKER_MQTT_ENABLE_BUDGETING + default 1024 + range 64 2048 + help + Maximum budget that the node can have. No additional budget will be allocated if this count is reached. + + config ESP_RMAKER_MQTT_BUDGET_REVIVE_PERIOD + int "MQTT Budget revive period" + depends on ESP_RMAKER_MQTT_ENABLE_BUDGETING + default 5 + range 5 600 + help + Period in seconds after which the MQTT budget should revive (by ESP_RMAKER_MQTT_BUDGET_REVIVE_COUNT). + This is used to limit the messages being sent by the node. + + config ESP_RMAKER_MQTT_BUDGET_REVIVE_COUNT + int "MQTT Budget revive count" + depends on ESP_RMAKER_MQTT_ENABLE_BUDGETING + default 1 + range 1 16 + help + The count by which the budget will be increased periodically based on ESP_RMAKER_MQTT_BUDGET_REVIVE_PERIOD. + config ESP_RMAKER_MAX_PARAM_DATA_SIZE int "Maximum Parameters' data size" default 1024 diff --git a/components/esp_rainmaker/include/esp_rmaker_mqtt.h b/components/esp_rainmaker/include/esp_rmaker_mqtt.h index 1d29d71..eba3615 100644 --- a/components/esp_rainmaker/include/esp_rmaker_mqtt.h +++ b/components/esp_rainmaker/include/esp_rmaker_mqtt.h @@ -32,6 +32,12 @@ esp_rmaker_mqtt_conn_params_t *esp_rmaker_mqtt_get_conn_params(void); */ esp_err_t esp_rmaker_mqtt_init(esp_rmaker_mqtt_conn_params_t *conn_params); +/* Deinitialize ESP RainMaker MQTT + * + * Call this function after MQTT has disconnected. + */ +void esp_rmaker_mqtt_deinit(void); + /** MQTT Connect * * Starts the connection attempts to the MQTT broker as per the configuration diff --git a/components/esp_rainmaker/src/core/esp_rmaker_mqtt_topics.h b/components/esp_rainmaker/src/core/esp_rmaker_mqtt_topics.h index 6fffc75..6f3b74a 100644 --- a/components/esp_rainmaker/src/core/esp_rmaker_mqtt_topics.h +++ b/components/esp_rainmaker/src/core/esp_rmaker_mqtt_topics.h @@ -30,4 +30,4 @@ #define TO_NODE_TOPIC_SUFFIX "to-node" #define INSIGHTS_TOPIC_SUFFIX "diagnostics/from-node" -#define MQTT_TOPIC_BUFFER_SIZE 150 \ No newline at end of file +#define MQTT_TOPIC_BUFFER_SIZE 150 diff --git a/components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt.c b/components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt.c index d6dd3c3..f46007a 100644 --- a/components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt.c +++ b/components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt.c @@ -17,6 +17,8 @@ #include #include +#include "esp_rmaker_mqtt_budget.h" + static const char *TAG = "esp_rmaker_mqtt"; static esp_rmaker_mqtt_config_t g_mqtt_config; @@ -42,23 +44,44 @@ esp_err_t esp_rmaker_mqtt_init(esp_rmaker_mqtt_conn_params_t *conn_params) esp_rmaker_mqtt_glue_setup(&g_mqtt_config); } if (g_mqtt_config.init) { - return g_mqtt_config.init(conn_params); + esp_err_t err = g_mqtt_config.init(conn_params); + if (err == ESP_OK) { + if (esp_rmaker_mqtt_budgeting_init() != ESP_OK) { + ESP_LOGE(TAG, "Failied to initialise MQTT Budgeting."); + } + } + return err; } ESP_LOGW(TAG, "esp_rmaker_mqtt_init not registered"); return ESP_OK; } +void esp_rmaker_mqtt_deinit(void) +{ + esp_rmaker_mqtt_budgeting_deinit(); + if (g_mqtt_config.deinit) { + return g_mqtt_config.deinit(); + } + ESP_LOGW(TAG, "esp_rmaker_mqtt_deinit not registered"); +} + esp_err_t esp_rmaker_mqtt_connect(void) { if (g_mqtt_config.connect) { - return g_mqtt_config.connect(); + esp_err_t err = g_mqtt_config.connect(); + if (err == ESP_OK) { + esp_rmaker_mqtt_budgeting_start(); + } + return err; } ESP_LOGW(TAG, "esp_rmaker_mqtt_connect not registered"); return ESP_OK; } + esp_err_t esp_rmaker_mqtt_disconnect(void) { + esp_rmaker_mqtt_budgeting_stop(); if (g_mqtt_config.disconnect) { return g_mqtt_config.disconnect(); } @@ -86,8 +109,16 @@ esp_err_t esp_rmaker_mqtt_unsubscribe(const char *topic) esp_err_t esp_rmaker_mqtt_publish(const char *topic, void *data, size_t data_len, uint8_t qos, int *msg_id) { + if (esp_rmaker_mqtt_is_budget_available() != true) { + ESP_LOGE(TAG, "Out of MQTT Budget. Dropping publish message."); + return ESP_FAIL; + } if (g_mqtt_config.publish) { - return g_mqtt_config.publish(topic, data, data_len, qos, msg_id); + esp_err_t err = g_mqtt_config.publish(topic, data, data_len, qos, msg_id); + if (err == ESP_OK) { + esp_rmaker_mqtt_decrease_budget(1); + } + return err; } ESP_LOGW(TAG, "esp_rmaker_mqtt_publish not registered"); return ESP_OK; diff --git a/components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt_budget.c b/components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt_budget.c new file mode 100644 index 0000000..94778d5 --- /dev/null +++ b/components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt_budget.c @@ -0,0 +1,182 @@ + +/* + * SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include + +static const char *TAG = "esp_rmaker_mqtt_budget"; + +#ifdef CONFIG_ESP_RMAKER_MQTT_ENABLE_BUDGETING + +#include +#include +#include + +#define DEFAULT_BUDGET CONFIG_ESP_RMAKER_MQTT_DEFAULT_BUDGET +#define MAX_BUDGET CONFIG_ESP_RMAKER_MQTT_MAX_BUDGET +#define BUDGET_REVIVE_COUNT CONFIG_ESP_RMAKER_MQTT_BUDGET_REVIVE_COUNT +#define BUDGET_REVIVE_PERIOD CONFIG_ESP_RMAKER_MQTT_BUDGET_REVIVE_PERIOD + +static int16_t mqtt_budget = DEFAULT_BUDGET; +static TimerHandle_t mqtt_budget_timer; +static SemaphoreHandle_t mqtt_budget_lock; +#define SEMAPHORE_DELAY_MSEC 500 + +bool esp_rmaker_mqtt_is_budget_available(void) +{ + if (mqtt_budget_lock == NULL) { + ESP_LOGW(TAG, "MQTT budgeting not started yet. Allowing publish."); + return true; + } + if (xSemaphoreTake(mqtt_budget_lock, SEMAPHORE_DELAY_MSEC/portTICK_PERIOD_MS) != pdTRUE) { + ESP_LOGW(TAG, "Could not acquire MQTT budget lock. Allowing publish."); + return true; + } + int16_t budget = mqtt_budget; + xSemaphoreGive(mqtt_budget_lock); + return budget ? true : false; +} + +esp_err_t esp_rmaker_mqtt_increase_budget(uint8_t budget) +{ + if (mqtt_budget_lock == NULL) { + ESP_LOGW(TAG, "MQTT budgeting not started. Not increasing the budget."); + return ESP_FAIL; + } + if (xSemaphoreTake(mqtt_budget_lock, SEMAPHORE_DELAY_MSEC/portTICK_PERIOD_MS) != pdTRUE) { + ESP_LOGE(TAG, "Failed to increase MQTT budget."); + return ESP_FAIL; + } + mqtt_budget += budget; + if (mqtt_budget > MAX_BUDGET) { + mqtt_budget = MAX_BUDGET; + } + xSemaphoreGive(mqtt_budget_lock); + ESP_LOGD(TAG, "MQTT budget increased to %d", mqtt_budget); + return ESP_OK; +} + +esp_err_t esp_rmaker_mqtt_decrease_budget(uint8_t budget) +{ + if (mqtt_budget_lock == NULL) { + ESP_LOGW(TAG, "MQTT budgeting not started. Not decreasing the budget."); + return ESP_FAIL; + } + if (xSemaphoreTake(mqtt_budget_lock, SEMAPHORE_DELAY_MSEC/portTICK_PERIOD_MS) != pdTRUE) { + ESP_LOGE(TAG, "Failed to decrease MQTT budget."); + return ESP_FAIL; + } + mqtt_budget -= budget; + if (mqtt_budget < 0) { + mqtt_budget = 0; + } + xSemaphoreGive(mqtt_budget_lock); + ESP_LOGD(TAG, "MQTT budget decreased to %d.", mqtt_budget); + return ESP_OK; +} + +static void esp_rmaker_mqtt_revive_budget(TimerHandle_t handle) +{ + esp_rmaker_mqtt_increase_budget(BUDGET_REVIVE_COUNT); +} + +esp_err_t esp_rmaker_mqtt_budgeting_start(void) +{ + if (mqtt_budget_timer) { + xTimerStart(mqtt_budget_timer, 0); + return ESP_OK; + } + return ESP_FAIL; +} + +esp_err_t esp_rmaker_mqtt_budgeting_stop(void) +{ + if (mqtt_budget_timer) { + xTimerStop(mqtt_budget_timer, 100); + return ESP_OK; + } + return ESP_FAIL; +} + +esp_err_t esp_rmaker_mqtt_budgeting_deinit(void) +{ + if (mqtt_budget_timer) { + esp_rmaker_mqtt_budgeting_stop(); + xTimerDelete(mqtt_budget_timer, 100); + mqtt_budget_timer = NULL; + } + if (mqtt_budget_lock) { + vSemaphoreDelete(mqtt_budget_lock); + mqtt_budget_lock = NULL; + } + return ESP_OK; +} + +esp_err_t esp_rmaker_mqtt_budgeting_init(void) +{ + if (mqtt_budget_timer) { + ESP_LOGI(TAG, "MQTT budgeting already initialised."); + return ESP_OK; + } + + mqtt_budget_lock = xSemaphoreCreateMutex(); + if (!mqtt_budget_lock) { + return ESP_FAIL; + } + + mqtt_budget_timer = xTimerCreate("mqtt_budget_tm", (BUDGET_REVIVE_PERIOD * 1000) / portTICK_PERIOD_MS, + pdTRUE, NULL, esp_rmaker_mqtt_revive_budget); + if (mqtt_budget_timer) { + ESP_LOGI(TAG, "MQTT Budgeting initialised. Default: %d, Max: %d, Revive count: %d, Revive period: %d", + DEFAULT_BUDGET, MAX_BUDGET, BUDGET_REVIVE_COUNT, BUDGET_REVIVE_PERIOD); + return ESP_OK; + } + return ESP_FAIL; +} + +#else /* ! CONFIG_ESP_RMAKER_MQTT_ENABLE_BUDGETING */ + +esp_err_t esp_rmaker_mqtt_budgeting_init(void) +{ + /* Adding a print only here, because this is always going to be the first function + * to be invoked since it is called from MQTT init. Else, MQTT itself is going to fail. + */ + ESP_LOGW(TAG, "MQTT Budgeting is not enabled."); + return ESP_FAIL; +} + +esp_err_t esp_rmaker_mqtt_budgeting_deinit(void) +{ + return ESP_FAIL; +} + +esp_err_t esp_rmaker_mqtt_budgeting_stop(void) +{ + return ESP_FAIL; +} + +esp_err_t esp_rmaker_mqtt_budgeting_start(void) +{ + return ESP_FAIL; +} + +esp_err_t esp_rmaker_mqtt_increase_budget(uint8_t budget) +{ + return ESP_OK; +} + +esp_err_t esp_rmaker_mqtt_decrease_budget(uint8_t budget) +{ + return ESP_OK; +} + +bool esp_rmaker_mqtt_is_budget_available(void) +{ + return true; +} + +#endif /* ! CONFIG_ESP_RMAKER_MQTT_ENABLE_BUDGETING */ diff --git a/components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt_budget.h b/components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt_budget.h new file mode 100644 index 0000000..c7e0526 --- /dev/null +++ b/components/esp_rainmaker/src/mqtt/esp_rmaker_mqtt_budget.h @@ -0,0 +1,18 @@ + +/* + * SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include + +esp_err_t esp_rmaker_mqtt_budgeting_init(void); +esp_err_t esp_rmaker_mqtt_budgeting_deinit(void); +esp_err_t esp_rmaker_mqtt_budgeting_stop(void); +esp_err_t esp_rmaker_mqtt_budgeting_start(void); +esp_err_t esp_rmaker_mqtt_increase_budget(uint8_t budget); +esp_err_t esp_rmaker_mqtt_decrease_budget(uint8_t budget); +bool esp_rmaker_mqtt_is_budget_available(void);