From fe2f82e30304aec9ac6d50ea880bf66762f62eda Mon Sep 17 00:00:00 2001 From: Bernhard Kirchen Date: Sat, 30 Dec 2023 17:21:48 +0100 Subject: [PATCH 1/2] Fix: switch context when handling AC charger MQTT messages MQTT message callbacks are executed in the MQTT thread context. when processing topics that control the huawei AC charger, we must avoid executing methods that are not thread-safe. this change bound the methods to be called to the respective parameters and executes them in the TaskScheduler context, such that they no longer need to be thread-safe. --- include/MqttHandleHuawei.h | 8 +++++++ src/MqttHandleHuawei.cpp | 45 ++++++++++++++++++++++++-------------- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/include/MqttHandleHuawei.h b/include/MqttHandleHuawei.h index e25a82e0f..0ced25edb 100644 --- a/include/MqttHandleHuawei.h +++ b/include/MqttHandleHuawei.h @@ -5,6 +5,9 @@ #include #include #include +#include +#include +#include class MqttHandleHuaweiClass { public: @@ -19,6 +22,11 @@ class MqttHandleHuaweiClass { uint32_t _lastPublishStats; uint32_t _lastPublish; + // MQTT callbacks to process updates on subscribed topics are executed in + // the MQTT thread's context. we use this queue to switch processing the + // user requests into the main loop's context (TaskScheduler context). + mutable std::mutex _mqttMutex; + std::deque> _mqttCallbacks; }; extern MqttHandleHuaweiClass MqttHandleHuawei; \ No newline at end of file diff --git a/src/MqttHandleHuawei.cpp b/src/MqttHandleHuawei.cpp index 06e5d22ad..f14bfe1f6 100644 --- a/src/MqttHandleHuawei.cpp +++ b/src/MqttHandleHuawei.cpp @@ -46,13 +46,21 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler) void MqttHandleHuaweiClass::loop() { - if (!MqttSettings.getConnected() ) { + const CONFIG_T& config = Configuration.get(); + + std::unique_lock mqttLock(_mqttMutex); + + if (!config.Huawei.Enabled) { + _mqttCallbacks.clear(); return; } - const CONFIG_T& config = Configuration.get(); + for (auto& callback : _mqttCallbacks) { callback(); } + _mqttCallbacks.clear(); - if (!config.Huawei.Enabled) { + mqttLock.unlock(); + + if (!MqttSettings.getConnected() ) { return; } @@ -82,11 +90,6 @@ void MqttHandleHuaweiClass::onMqttMessage(const espMqttClientTypes::MessagePrope { const CONFIG_T& config = Configuration.get(); - // ignore messages if Huawei is disabled - if (!config.Huawei.Enabled) { - return; - } - char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char* @@ -108,46 +111,56 @@ void MqttHandleHuaweiClass::onMqttMessage(const espMqttClientTypes::MessagePrope float payload_val = strtof(strlimit, NULL); delete[] strlimit; + std::lock_guard mqttLock(_mqttMutex); + if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_VOLTAGE)) { // Set voltage limit MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val); - HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_VOLTAGE); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_ONLINE_VOLTAGE)); } else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE)) { // Set current limit MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val); - HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_VOLTAGE); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_OFFLINE_VOLTAGE)); } else if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_CURRENT)) { // Set current limit MessageOutput.printf("Limit Current: %f A\r\n", payload_val); - HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_CURRENT); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_ONLINE_CURRENT)); } else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_CURRENT)) { // Set current limit MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val); - HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_CURRENT); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_OFFLINE_CURRENT)); } else if (!strcmp(setting, TOPIC_SUB_MODE)) { // Control power on/off if(payload_val == 3) { MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control"); - HuaweiCan.setMode(HUAWEI_MODE_AUTO_INT); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_AUTO_INT)); } if(payload_val == 2) { MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit"); - HuaweiCan.setMode(HUAWEI_MODE_AUTO_EXT); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_AUTO_EXT)); } if(payload_val == 1) { MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON"); - HuaweiCan.setMode(HUAWEI_MODE_ON); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_ON)); } if(payload_val == 0) { MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF"); - HuaweiCan.setMode(HUAWEI_MODE_OFF); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_OFF)); } } } \ No newline at end of file From 463226082fa8afaede2439aa5f4795ed7d72f72c Mon Sep 17 00:00:00 2001 From: Bernhard Kirchen Date: Sat, 30 Dec 2023 18:31:50 +0100 Subject: [PATCH 2/2] clean up Huawei MQTT handler * bind the callback to a topic (enum value) such that there is no need to tokenize the full topic (string) to find out what value is being processed. tokenizing is expensive. * get rid of using the config in the callback, which improves thread-safety since the MQTT callback is running in the MQTT thread. * prefer C++ method stof to convert MQTT value to a float, which saves us from using new and delete for a buffer in particular. * prefer switch statements over if-else-trees. * split long lines. * get rid of topic #defines. * fix indention. --- include/MqttHandleHuawei.h | 14 ++- src/MqttHandleHuawei.cpp | 173 ++++++++++++++++++------------------- 2 files changed, 97 insertions(+), 90 deletions(-) diff --git a/include/MqttHandleHuawei.h b/include/MqttHandleHuawei.h index 0ced25edb..f518ed9d4 100644 --- a/include/MqttHandleHuawei.h +++ b/include/MqttHandleHuawei.h @@ -15,7 +15,19 @@ class MqttHandleHuaweiClass { private: void loop(); - void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total); + + enum class Topic : unsigned { + LimitOnlineVoltage, + LimitOnlineCurrent, + LimitOfflineVoltage, + LimitOfflineCurrent, + Mode + }; + + void onMqttMessage(Topic t, + const espMqttClientTypes::MessageProperties& properties, + const char* topic, const uint8_t* payload, size_t len, + size_t index, size_t total); Task _loopTask; diff --git a/src/MqttHandleHuawei.cpp b/src/MqttHandleHuawei.cpp index f14bfe1f6..4330dc7c2 100644 --- a/src/MqttHandleHuawei.cpp +++ b/src/MqttHandleHuawei.cpp @@ -10,12 +10,6 @@ #include "WebApi_Huawei.h" #include -#define TOPIC_SUB_LIMIT_ONLINE_VOLTAGE "limit_online_voltage" -#define TOPIC_SUB_LIMIT_ONLINE_CURRENT "limit_online_current" -#define TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE "limit_offline_voltage" -#define TOPIC_SUB_LIMIT_OFFLINE_CURRENT "limit_offline_current" -#define TOPIC_SUB_MODE "mode" - MqttHandleHuaweiClass MqttHandleHuawei; void MqttHandleHuaweiClass::init(Scheduler& scheduler) @@ -25,19 +19,22 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler) _loopTask.setIterations(TASK_FOREVER); _loopTask.enable(); - using std::placeholders::_1; - using std::placeholders::_2; - using std::placeholders::_3; - using std::placeholders::_4; - using std::placeholders::_5; - using std::placeholders::_6; + String const& prefix = MqttSettings.getPrefix(); + + auto subscribe = [&prefix, this](char const* subTopic, Topic t) { + String fullTopic(prefix + "huawei/cmd/" + subTopic); + MqttSettings.subscribe(fullTopic.c_str(), 0, + std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, t, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4, + std::placeholders::_5, std::placeholders::_6)); + }; - String topic = MqttSettings.getPrefix(); - MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_ONLINE_VOLTAGE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); - MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_ONLINE_CURRENT).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); - MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); - MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_OFFLINE_CURRENT).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); - MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_MODE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); + subscribe("limit_online_voltage", Topic::LimitOnlineVoltage); + subscribe("limit_online_current", Topic::LimitOnlineCurrent); + subscribe("limit_offline_voltage", Topic::LimitOfflineVoltage); + subscribe("limit_offline_current", Topic::LimitOfflineCurrent); + subscribe("mode", Topic::Mode); _lastPublish = millis(); @@ -86,81 +83,79 @@ void MqttHandleHuaweiClass::loop() } -void MqttHandleHuaweiClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) +void MqttHandleHuaweiClass::onMqttMessage(Topic t, + const espMqttClientTypes::MessageProperties& properties, + const char* topic, const uint8_t* payload, size_t len, + size_t index, size_t total) { - const CONFIG_T& config = Configuration.get(); - - char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics - strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char* - - char* setting; - char* rest = &token_topic[strlen(config.Mqtt.Topic)]; - - strtok_r(rest, "/", &rest); // Remove "huawei" - strtok_r(rest, "/", &rest); // Remove "cmd" - - setting = strtok_r(rest, "/", &rest); - - if (setting == NULL) { + std::string strValue(reinterpret_cast(payload), len); + float payload_val = -1; + try { + payload_val = std::stof(strValue); + } + catch (std::invalid_argument const& e) { + MessageOutput.printf("Huawei MQTT handler: cannot parse payload of topic '%s' as float: %s\r\n", + topic, strValue.c_str()); return; } - char* strlimit = new char[len + 1]; - memcpy(strlimit, payload, len); - strlimit[len] = '\0'; - float payload_val = strtof(strlimit, NULL); - delete[] strlimit; - std::lock_guard mqttLock(_mqttMutex); - if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_VOLTAGE)) { - // Set voltage limit - MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val); - _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, - &HuaweiCan, payload_val, HUAWEI_ONLINE_VOLTAGE)); - - } else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE)) { - // Set current limit - MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val); - _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, - &HuaweiCan, payload_val, HUAWEI_OFFLINE_VOLTAGE)); - - } else if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_CURRENT)) { - // Set current limit - MessageOutput.printf("Limit Current: %f A\r\n", payload_val); - _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, - &HuaweiCan, payload_val, HUAWEI_ONLINE_CURRENT)); - - } else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_CURRENT)) { - // Set current limit - MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val); - _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, - &HuaweiCan, payload_val, HUAWEI_OFFLINE_CURRENT)); - - } else if (!strcmp(setting, TOPIC_SUB_MODE)) { - // Control power on/off - if(payload_val == 3) { - MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control"); - _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, - &HuaweiCan, HUAWEI_MODE_AUTO_INT)); - } - - if(payload_val == 2) { - MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit"); - _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, - &HuaweiCan, HUAWEI_MODE_AUTO_EXT)); - } - - if(payload_val == 1) { - MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON"); - _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, - &HuaweiCan, HUAWEI_MODE_ON)); - } - - if(payload_val == 0) { - MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF"); - _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, - &HuaweiCan, HUAWEI_MODE_OFF)); - } - } + switch (t) { + case Topic::LimitOnlineVoltage: + MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_ONLINE_VOLTAGE)); + break; + + case Topic::LimitOfflineVoltage: + MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_OFFLINE_VOLTAGE)); + break; + + case Topic::LimitOnlineCurrent: + MessageOutput.printf("Limit Current: %f A\r\n", payload_val); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_ONLINE_CURRENT)); + break; + + case Topic::LimitOfflineCurrent: + MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_OFFLINE_CURRENT)); + break; + + case Topic::Mode: + switch (static_cast(payload_val)) { + case 3: + MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control"); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_AUTO_INT)); + break; + + case 2: + MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit"); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_AUTO_EXT)); + break; + + case 1: + MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON"); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_ON)); + break; + + case 0: + MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF"); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_OFF)); + break; + + default: + MessageOutput.printf("[Huawei MQTT::] Invalid mode %.0f\r\n", payload_val); + break; + } + break; + } } \ No newline at end of file