diff --git a/include/MqttHandleHuawei.h b/include/MqttHandleHuawei.h index e25a82e0f..f518ed9d4 100644 --- a/include/MqttHandleHuawei.h +++ b/include/MqttHandleHuawei.h @@ -5,6 +5,9 @@ #include #include #include +#include +#include +#include class MqttHandleHuaweiClass { public: @@ -12,13 +15,30 @@ class MqttHandleHuaweiClass { private: void loop(); - void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total); + + enum class Topic : unsigned { + LimitOnlineVoltage, + LimitOnlineCurrent, + LimitOfflineVoltage, + LimitOfflineCurrent, + Mode + }; + + void onMqttMessage(Topic t, + const espMqttClientTypes::MessageProperties& properties, + const char* topic, const uint8_t* payload, size_t len, + size_t index, size_t total); Task _loopTask; uint32_t _lastPublishStats; uint32_t _lastPublish; + // MQTT callbacks to process updates on subscribed topics are executed in + // the MQTT thread's context. we use this queue to switch processing the + // user requests into the main loop's context (TaskScheduler context). + mutable std::mutex _mqttMutex; + std::deque> _mqttCallbacks; }; extern MqttHandleHuaweiClass MqttHandleHuawei; \ No newline at end of file diff --git a/src/MqttHandleHuawei.cpp b/src/MqttHandleHuawei.cpp index 06e5d22ad..4330dc7c2 100644 --- a/src/MqttHandleHuawei.cpp +++ b/src/MqttHandleHuawei.cpp @@ -10,12 +10,6 @@ #include "WebApi_Huawei.h" #include -#define TOPIC_SUB_LIMIT_ONLINE_VOLTAGE "limit_online_voltage" -#define TOPIC_SUB_LIMIT_ONLINE_CURRENT "limit_online_current" -#define TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE "limit_offline_voltage" -#define TOPIC_SUB_LIMIT_OFFLINE_CURRENT "limit_offline_current" -#define TOPIC_SUB_MODE "mode" - MqttHandleHuaweiClass MqttHandleHuawei; void MqttHandleHuaweiClass::init(Scheduler& scheduler) @@ -25,19 +19,22 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler) _loopTask.setIterations(TASK_FOREVER); _loopTask.enable(); - using std::placeholders::_1; - using std::placeholders::_2; - using std::placeholders::_3; - using std::placeholders::_4; - using std::placeholders::_5; - using std::placeholders::_6; + String const& prefix = MqttSettings.getPrefix(); + + auto subscribe = [&prefix, this](char const* subTopic, Topic t) { + String fullTopic(prefix + "huawei/cmd/" + subTopic); + MqttSettings.subscribe(fullTopic.c_str(), 0, + std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, t, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4, + std::placeholders::_5, std::placeholders::_6)); + }; - String topic = MqttSettings.getPrefix(); - MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_ONLINE_VOLTAGE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); - MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_ONLINE_CURRENT).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); - MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); - MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_OFFLINE_CURRENT).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); - MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_MODE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); + subscribe("limit_online_voltage", Topic::LimitOnlineVoltage); + subscribe("limit_online_current", Topic::LimitOnlineCurrent); + subscribe("limit_offline_voltage", Topic::LimitOfflineVoltage); + subscribe("limit_offline_current", Topic::LimitOfflineCurrent); + subscribe("mode", Topic::Mode); _lastPublish = millis(); @@ -46,13 +43,21 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler) void MqttHandleHuaweiClass::loop() { - if (!MqttSettings.getConnected() ) { + const CONFIG_T& config = Configuration.get(); + + std::unique_lock mqttLock(_mqttMutex); + + if (!config.Huawei.Enabled) { + _mqttCallbacks.clear(); return; } - const CONFIG_T& config = Configuration.get(); + for (auto& callback : _mqttCallbacks) { callback(); } + _mqttCallbacks.clear(); - if (!config.Huawei.Enabled) { + mqttLock.unlock(); + + if (!MqttSettings.getConnected() ) { return; } @@ -78,76 +83,79 @@ void MqttHandleHuaweiClass::loop() } -void MqttHandleHuaweiClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) +void MqttHandleHuaweiClass::onMqttMessage(Topic t, + const espMqttClientTypes::MessageProperties& properties, + const char* topic, const uint8_t* payload, size_t len, + size_t index, size_t total) { - const CONFIG_T& config = Configuration.get(); - - // ignore messages if Huawei is disabled - if (!config.Huawei.Enabled) { - return; + std::string strValue(reinterpret_cast(payload), len); + float payload_val = -1; + try { + payload_val = std::stof(strValue); } - - char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics - strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char* - - char* setting; - char* rest = &token_topic[strlen(config.Mqtt.Topic)]; - - strtok_r(rest, "/", &rest); // Remove "huawei" - strtok_r(rest, "/", &rest); // Remove "cmd" - - setting = strtok_r(rest, "/", &rest); - - if (setting == NULL) { + catch (std::invalid_argument const& e) { + MessageOutput.printf("Huawei MQTT handler: cannot parse payload of topic '%s' as float: %s\r\n", + topic, strValue.c_str()); return; } - char* strlimit = new char[len + 1]; - memcpy(strlimit, payload, len); - strlimit[len] = '\0'; - float payload_val = strtof(strlimit, NULL); - delete[] strlimit; - - if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_VOLTAGE)) { - // Set voltage limit - MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val); - HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_VOLTAGE); - - } else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE)) { - // Set current limit - MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val); - HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_VOLTAGE); - - } else if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_CURRENT)) { - // Set current limit - MessageOutput.printf("Limit Current: %f A\r\n", payload_val); - HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_CURRENT); - - } else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_CURRENT)) { - // Set current limit - MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val); - HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_CURRENT); - - } else if (!strcmp(setting, TOPIC_SUB_MODE)) { - // Control power on/off - if(payload_val == 3) { - MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control"); - HuaweiCan.setMode(HUAWEI_MODE_AUTO_INT); - } - - if(payload_val == 2) { - MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit"); - HuaweiCan.setMode(HUAWEI_MODE_AUTO_EXT); - } - - if(payload_val == 1) { - MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON"); - HuaweiCan.setMode(HUAWEI_MODE_ON); - } - - if(payload_val == 0) { - MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF"); - HuaweiCan.setMode(HUAWEI_MODE_OFF); - } - } + std::lock_guard mqttLock(_mqttMutex); + + switch (t) { + case Topic::LimitOnlineVoltage: + MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_ONLINE_VOLTAGE)); + break; + + case Topic::LimitOfflineVoltage: + MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_OFFLINE_VOLTAGE)); + break; + + case Topic::LimitOnlineCurrent: + MessageOutput.printf("Limit Current: %f A\r\n", payload_val); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_ONLINE_CURRENT)); + break; + + case Topic::LimitOfflineCurrent: + MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, + &HuaweiCan, payload_val, HUAWEI_OFFLINE_CURRENT)); + break; + + case Topic::Mode: + switch (static_cast(payload_val)) { + case 3: + MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control"); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_AUTO_INT)); + break; + + case 2: + MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit"); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_AUTO_EXT)); + break; + + case 1: + MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON"); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_ON)); + break; + + case 0: + MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF"); + _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, + &HuaweiCan, HUAWEI_MODE_OFF)); + break; + + default: + MessageOutput.printf("[Huawei MQTT::] Invalid mode %.0f\r\n", payload_val); + break; + } + break; + } } \ No newline at end of file