Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: switch context when handling AC charger MQTT messages #571

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion include/MqttHandleHuawei.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,40 @@
#include <Huawei_can.h>
#include <espMqttClient.h>
#include <TaskSchedulerDeclarations.h>
#include <mutex>
#include <deque>
#include <functional>

class MqttHandleHuaweiClass {
public:
void init(Scheduler& scheduler);

private:
void loop();
void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);

enum class Topic : unsigned {
LimitOnlineVoltage,
LimitOnlineCurrent,
LimitOfflineVoltage,
LimitOfflineCurrent,
Mode
};

void onMqttMessage(Topic t,
const espMqttClientTypes::MessageProperties& properties,
const char* topic, const uint8_t* payload, size_t len,
size_t index, size_t total);

Task _loopTask;

uint32_t _lastPublishStats;
uint32_t _lastPublish;

// MQTT callbacks to process updates on subscribed topics are executed in
// the MQTT thread's context. we use this queue to switch processing the
// user requests into the main loop's context (TaskScheduler context).
mutable std::mutex _mqttMutex;
std::deque<std::function<void()>> _mqttCallbacks;
};

extern MqttHandleHuaweiClass MqttHandleHuawei;
184 changes: 96 additions & 88 deletions src/MqttHandleHuawei.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@
#include "WebApi_Huawei.h"
#include <ctime>

#define TOPIC_SUB_LIMIT_ONLINE_VOLTAGE "limit_online_voltage"
#define TOPIC_SUB_LIMIT_ONLINE_CURRENT "limit_online_current"
#define TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE "limit_offline_voltage"
#define TOPIC_SUB_LIMIT_OFFLINE_CURRENT "limit_offline_current"
#define TOPIC_SUB_MODE "mode"

MqttHandleHuaweiClass MqttHandleHuawei;

void MqttHandleHuaweiClass::init(Scheduler& scheduler)
Expand All @@ -25,19 +19,22 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler)
_loopTask.setIterations(TASK_FOREVER);
_loopTask.enable();

using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;
using std::placeholders::_4;
using std::placeholders::_5;
using std::placeholders::_6;
String const& prefix = MqttSettings.getPrefix();

auto subscribe = [&prefix, this](char const* subTopic, Topic t) {
String fullTopic(prefix + "huawei/cmd/" + subTopic);
MqttSettings.subscribe(fullTopic.c_str(), 0,
std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, t,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5, std::placeholders::_6));
};

String topic = MqttSettings.getPrefix();
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_ONLINE_VOLTAGE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_ONLINE_CURRENT).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_OFFLINE_CURRENT).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_MODE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
subscribe("limit_online_voltage", Topic::LimitOnlineVoltage);
subscribe("limit_online_current", Topic::LimitOnlineCurrent);
subscribe("limit_offline_voltage", Topic::LimitOfflineVoltage);
subscribe("limit_offline_current", Topic::LimitOfflineCurrent);
subscribe("mode", Topic::Mode);

_lastPublish = millis();

Expand All @@ -46,13 +43,21 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler)

void MqttHandleHuaweiClass::loop()
{
if (!MqttSettings.getConnected() ) {
const CONFIG_T& config = Configuration.get();

std::unique_lock<std::mutex> mqttLock(_mqttMutex);

if (!config.Huawei.Enabled) {
_mqttCallbacks.clear();
return;
}

const CONFIG_T& config = Configuration.get();
for (auto& callback : _mqttCallbacks) { callback(); }
_mqttCallbacks.clear();

if (!config.Huawei.Enabled) {
mqttLock.unlock();

if (!MqttSettings.getConnected() ) {
return;
}

Expand All @@ -78,76 +83,79 @@ void MqttHandleHuaweiClass::loop()
}


void MqttHandleHuaweiClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total)
void MqttHandleHuaweiClass::onMqttMessage(Topic t,
const espMqttClientTypes::MessageProperties& properties,
const char* topic, const uint8_t* payload, size_t len,
size_t index, size_t total)
{
const CONFIG_T& config = Configuration.get();

// ignore messages if Huawei is disabled
if (!config.Huawei.Enabled) {
return;
std::string strValue(reinterpret_cast<const char*>(payload), len);
float payload_val = -1;
try {
payload_val = std::stof(strValue);
}

char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics
strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char*

char* setting;
char* rest = &token_topic[strlen(config.Mqtt.Topic)];

strtok_r(rest, "/", &rest); // Remove "huawei"
strtok_r(rest, "/", &rest); // Remove "cmd"

setting = strtok_r(rest, "/", &rest);

if (setting == NULL) {
catch (std::invalid_argument const& e) {
MessageOutput.printf("Huawei MQTT handler: cannot parse payload of topic '%s' as float: %s\r\n",
topic, strValue.c_str());
return;
}

char* strlimit = new char[len + 1];
memcpy(strlimit, payload, len);
strlimit[len] = '\0';
float payload_val = strtof(strlimit, NULL);
delete[] strlimit;

if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_VOLTAGE)) {
// Set voltage limit
MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_VOLTAGE);

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE)) {
// Set current limit
MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_VOLTAGE);

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_CURRENT)) {
// Set current limit
MessageOutput.printf("Limit Current: %f A\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_CURRENT);

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_CURRENT)) {
// Set current limit
MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_CURRENT);

} else if (!strcmp(setting, TOPIC_SUB_MODE)) {
// Control power on/off
if(payload_val == 3) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control");
HuaweiCan.setMode(HUAWEI_MODE_AUTO_INT);
}

if(payload_val == 2) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit");
HuaweiCan.setMode(HUAWEI_MODE_AUTO_EXT);
}

if(payload_val == 1) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON");
HuaweiCan.setMode(HUAWEI_MODE_ON);
}

if(payload_val == 0) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF");
HuaweiCan.setMode(HUAWEI_MODE_OFF);
}
}
std::lock_guard<std::mutex> mqttLock(_mqttMutex);

switch (t) {
case Topic::LimitOnlineVoltage:
MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_ONLINE_VOLTAGE));
break;

case Topic::LimitOfflineVoltage:
MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_OFFLINE_VOLTAGE));
break;

case Topic::LimitOnlineCurrent:
MessageOutput.printf("Limit Current: %f A\r\n", payload_val);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_ONLINE_CURRENT));
break;

case Topic::LimitOfflineCurrent:
MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_OFFLINE_CURRENT));
break;

case Topic::Mode:
switch (static_cast<int>(payload_val)) {
case 3:
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control");
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
&HuaweiCan, HUAWEI_MODE_AUTO_INT));
break;

case 2:
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit");
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
&HuaweiCan, HUAWEI_MODE_AUTO_EXT));
break;

case 1:
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON");
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
&HuaweiCan, HUAWEI_MODE_ON));
break;

case 0:
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF");
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
&HuaweiCan, HUAWEI_MODE_OFF));
break;

default:
MessageOutput.printf("[Huawei MQTT::] Invalid mode %.0f\r\n", payload_val);
break;
}
break;
}
}