From 7d19ec26750adf1b2eecb4cd44fb9418482d51b1 Mon Sep 17 00:00:00 2001 From: LeoDJ Date: Tue, 25 Jun 2024 03:01:09 +0200 Subject: [PATCH] implement MQTT sending --- software/src/globals.h | 4 + software/src/janitza.cpp | 16 ++-- software/src/janitza.h | 3 +- software/src/janitzaRegDefs/UMG96RM.h | 128 +++++++++++++------------- software/src/main.cpp | 19 +++- software/src/mqtt.h | 69 ++++++++++++++ software/src/secrets.h.sample | 10 +- 7 files changed, 175 insertions(+), 74 deletions(-) create mode 100644 software/src/mqtt.h diff --git a/software/src/globals.h b/software/src/globals.h index 7ff7595..aee92a1 100644 --- a/software/src/globals.h +++ b/software/src/globals.h @@ -3,6 +3,10 @@ #include "secrets.h" // don't forget to create secrets.h based on the .sample file + +#define USE_INFLUXDB 0 +#define USE_MQTT 1 + #define DBG Serial1 #define UPDATE_INTERVAL 5000 // ms diff --git a/software/src/janitza.cpp b/software/src/janitza.cpp index 97723e4..af3a7a2 100644 --- a/software/src/janitza.cpp +++ b/software/src/janitza.cpp @@ -110,10 +110,10 @@ bool Janitza::generateInfluxCommands() { uint16_t mbBufIdx = 0; influxQueryLen += sprintf(_influxQueryBuf + influxQueryLen, "%s", _influxMeasurement); // measurement name - influxQueryLen += sprintf(_influxQueryBuf + influxQueryLen, ",serial=%d,phase=%s ", _serialNumber, + influxQueryLen += sprintf(_influxQueryBuf + influxQueryLen, ",serial=%lu,phase=%s ", _serialNumber, phaseStr); // print tags here - for (int i = REG_DEF_START; i < _regDefLen; i++) { + for (size_t i = REG_DEF_START; i < _regDefLen; i++) { registerDefinition_t* regDef = &_regDef[i]; // if (regDef->address == 0) { // skip "disabled" addresses @@ -124,7 +124,7 @@ bool Janitza::generateInfluxCommands() { float val = getValue(regDef, mbBufIdx); if (!isnanf(val)) { - influxQueryLen += sprintf(_influxQueryBuf + influxQueryLen, "%s=%g,", regDef->influxStr, val); + influxQueryLen += sprintf(_influxQueryBuf + influxQueryLen, "%s=%g,", regDef->nameStr, val); } } mbBufIdx += registerDataTypeSize[regDef->type] / 2; // increment modbus buffer index by type size @@ -161,10 +161,10 @@ JsonDocument Janitza::generateJson() { if (regDef->phaseTag != P_NONE) { const char *phaseStr = phaseTagStr[regDef->phaseTag]; - doc["values"][regDef->influxStr][phaseStr] = val; + doc["values"][regDef->nameStr][phaseStr] = val; } else { - doc["values"][regDef->influxStr] = val; + doc["values"][regDef->nameStr] = val; } } @@ -177,7 +177,7 @@ uint16_t Janitza::getContiguousRegisters(uint16_t startIndex) { return _regDefLen - 1; } - for (int i = startIndex; i < _regDefLen - 1; i++) { // loop until second to last element + for (size_t i = startIndex; i < _regDefLen - 1; i++) { // loop until second to last element registerDefinition_t def = _regDef[i]; uint16_t nextAddr = def.address + (registerDataTypeSize[def.type] / 2); if (_regDef[i + 1].address != nextAddr) { @@ -242,6 +242,10 @@ uint32_t Janitza::readSerialNumber() { return _mb.getResponseBuffer(0) << 16 | _mb.getResponseBuffer(1); } +uint32_t Janitza::getSerialNumber() { + return _serialNumber; +} + bool Janitza::modbusReadBulk(int16_t* destBuf, uint16_t startAddr, uint16_t count) { int iterations = (count + (MB_CHUNK_SIZE - 1)) / MB_CHUNK_SIZE; // division, but rounded up for (int i = 0; i < iterations; i++) { diff --git a/software/src/janitza.h b/software/src/janitza.h index ec181c5..8689672 100644 --- a/software/src/janitza.h +++ b/software/src/janitza.h @@ -14,7 +14,7 @@ typedef struct { uint8_t applyCtRatio : 1; uint8_t type : 3; uint8_t phaseTag : 3; - const char *influxStr; + const char *nameStr; } registerDefinition_t; @@ -73,6 +73,7 @@ class Janitza { void setInfluxSendCallback(void (*influxSendRequest)(char *lineProtocolCommand), const char *measurementName); void useRS485(uint32_t dePin, uint32_t rePin, void (*preTransmission)(), void (*postTransmission)()); // supply pin for RS485 transceiver direction (DE / RE) uint32_t readSerialNumber(); + uint32_t getSerialNumber(); bool read(); JsonDocument generateJson(); bool generateInfluxCommands(); diff --git a/software/src/janitzaRegDefs/UMG96RM.h b/software/src/janitzaRegDefs/UMG96RM.h index e983323..98a1a3b 100644 --- a/software/src/janitzaRegDefs/UMG96RM.h +++ b/software/src/janitzaRegDefs/UMG96RM.h @@ -2,72 +2,72 @@ registerDefinition_t regDef_UMG96RM[] = { // addr multipl CTratio type phase name - { 754, 1, false, Janitza::INT, Janitza::P_NONE, "SN" }, // index 0 has to be serial number - { 10, 1, false, Janitza::FLOAT, Janitza::P_NONE, "CT_prim" }, // index 1 has to be CT primary - { 12, 1, false, Janitza::FLOAT, Janitza::P_NONE, "CT_sec" }, // index 2 has to be CT secondary + { 754, 1, false, Janitza::INT, Janitza::P_NONE, "SN" }, // index 0 has to be serial number + { 10, 1, false, Janitza::FLOAT, Janitza::P_NONE, "CT_prim" }, // index 1 has to be CT primary + { 12, 1, false, Janitza::FLOAT, Janitza::P_NONE, "CT_sec" }, // index 2 has to be CT secondary - { 19000, 1, false, Janitza::FLOAT, Janitza::P_L1, "U_LN" }, - { 19002, 1, false, Janitza::FLOAT, Janitza::P_L2, "U_LN" }, - { 19004, 1, false, Janitza::FLOAT, Janitza::P_L3, "U_LN" }, - { 19006, 1, false, Janitza::FLOAT, Janitza::P_L1L2, "U_LL" }, - { 19008, 1, false, Janitza::FLOAT, Janitza::P_L2L3, "U_LL" }, - { 19010, 1, false, Janitza::FLOAT, Janitza::P_L3L1, "U_LL" }, - { 19012, 1, false, Janitza::FLOAT, Janitza::P_L1, "I" }, - { 19014, 1, false, Janitza::FLOAT, Janitza::P_L2, "I" }, - { 19016, 1, false, Janitza::FLOAT, Janitza::P_L3, "I" }, - { 19018, 1, false, Janitza::FLOAT, Janitza::P_ALL, "I_N" }, - { 19020, 1, false, Janitza::FLOAT, Janitza::P_L1, "P" }, - { 19022, 1, false, Janitza::FLOAT, Janitza::P_L2, "P" }, - { 19024, 1, false, Janitza::FLOAT, Janitza::P_L3, "P" }, - { 19026, 1, false, Janitza::FLOAT, Janitza::P_ALL, "P" }, - { 19028, 1, false, Janitza::FLOAT, Janitza::P_L1, "Q" }, - { 19030, 1, false, Janitza::FLOAT, Janitza::P_L2, "Q" }, - { 19032, 1, false, Janitza::FLOAT, Janitza::P_L3, "Q" }, - { 19034, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Q" }, - { 19036, 1, false, Janitza::FLOAT, Janitza::P_L1, "S" }, - { 19038, 1, false, Janitza::FLOAT, Janitza::P_L2, "S" }, - { 19040, 1, false, Janitza::FLOAT, Janitza::P_L3, "S" }, - { 19042, 1, false, Janitza::FLOAT, Janitza::P_ALL, "S" }, - { 19044, 1, false, Janitza::FLOAT, Janitza::P_L1, "CosPhi" }, - { 19046, 1, false, Janitza::FLOAT, Janitza::P_L2, "CosPhi" }, - { 19048, 1, false, Janitza::FLOAT, Janitza::P_L3, "CosPhi" }, - { 19050, 1, false, Janitza::FLOAT, Janitza::P_NONE, "F" }, - { 19052, 1, false, Janitza::FLOAT, Janitza::P_NONE, "Rotation" }, - { 19054, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp" }, - { 19056, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp" }, - { 19058, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp" }, - { 19060, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp" }, - { 19062, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp_Consumed" }, - { 19064, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp_Consumed" }, - { 19066, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp_Consumed" }, - { 19068, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp_Consumed" }, - { 19070, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp_Delivered" }, - { 19072, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp_Delivered" }, - { 19074, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp_Delivered" }, - { 19076, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp_Delivered" }, - { 19078, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wq" }, - { 19080, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wq" }, - { 19082, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wq" }, - { 19084, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wq" }, - { 19086, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws" }, - { 19088, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws" }, - { 19090, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws" }, - { 19092, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws" }, - { 19094, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws_ind" }, - { 19096, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws_ind" }, - { 19098, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws_ind" }, - { 19100, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws_ind" }, - { 19102, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws_cap" }, - { 19104, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws_cap" }, - { 19106, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws_cap" }, - { 19108, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws_cap" }, - { 19110, 1, false, Janitza::FLOAT, Janitza::P_L1, "THD_U" }, - { 19112, 1, false, Janitza::FLOAT, Janitza::P_L2, "THD_U" }, - { 19114, 1, false, Janitza::FLOAT, Janitza::P_L3, "THD_U" }, - { 19116, 1, false, Janitza::FLOAT, Janitza::P_L1, "THD_I" }, - { 19118, 1, false, Janitza::FLOAT, Janitza::P_L2, "THD_I" }, - { 19120, 1, false, Janitza::FLOAT, Janitza::P_L3, "THD_I" }, + { 19000, 1, false, Janitza::FLOAT, Janitza::P_L1, "U_LN" }, // V + { 19002, 1, false, Janitza::FLOAT, Janitza::P_L2, "U_LN" }, // V + { 19004, 1, false, Janitza::FLOAT, Janitza::P_L3, "U_LN" }, // V + { 19006, 1, false, Janitza::FLOAT, Janitza::P_L1L2, "U_LL" }, // V + { 19008, 1, false, Janitza::FLOAT, Janitza::P_L2L3, "U_LL" }, // V + { 19010, 1, false, Janitza::FLOAT, Janitza::P_L3L1, "U_LL" }, // V + { 19012, 1, false, Janitza::FLOAT, Janitza::P_L1, "I" }, // A + { 19014, 1, false, Janitza::FLOAT, Janitza::P_L2, "I" }, // A + { 19016, 1, false, Janitza::FLOAT, Janitza::P_L3, "I" }, // A + { 19018, 1, false, Janitza::FLOAT, Janitza::P_ALL, "I_N" }, // A + { 19020, 1, false, Janitza::FLOAT, Janitza::P_L1, "P" }, // W + { 19022, 1, false, Janitza::FLOAT, Janitza::P_L2, "P" }, // W + { 19024, 1, false, Janitza::FLOAT, Janitza::P_L3, "P" }, // W + { 19026, 1, false, Janitza::FLOAT, Janitza::P_ALL, "P" }, // W + { 19028, 1, false, Janitza::FLOAT, Janitza::P_L1, "Q" }, // VA + { 19030, 1, false, Janitza::FLOAT, Janitza::P_L2, "Q" }, // VA + { 19032, 1, false, Janitza::FLOAT, Janitza::P_L3, "Q" }, // VA + { 19034, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Q" }, // VA + { 19036, 1, false, Janitza::FLOAT, Janitza::P_L1, "S" }, // var + { 19038, 1, false, Janitza::FLOAT, Janitza::P_L2, "S" }, // var + { 19040, 1, false, Janitza::FLOAT, Janitza::P_L3, "S" }, // var + { 19042, 1, false, Janitza::FLOAT, Janitza::P_ALL, "S" }, // var + { 19044, 1, false, Janitza::FLOAT, Janitza::P_L1, "CosPhi" }, // - + { 19046, 1, false, Janitza::FLOAT, Janitza::P_L2, "CosPhi" }, // - + { 19048, 1, false, Janitza::FLOAT, Janitza::P_L3, "CosPhi" }, // - + { 19050, 1, false, Janitza::FLOAT, Janitza::P_NONE, "F" }, // Hz + { 19052, 1, false, Janitza::FLOAT, Janitza::P_NONE, "Rotation" }, // -1: left, 0: none, 1: right + { 19054, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp" }, // Wh + { 19056, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp" }, // Wh + { 19058, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp" }, // Wh + { 19060, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp" }, // Wh + { 19062, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp_Consumed" }, // Wh + { 19064, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp_Consumed" }, // Wh + { 19066, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp_Consumed" }, // Wh + { 19068, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp_Consumed" }, // Wh + { 19070, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp_Delivered" }, // Wh + { 19072, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp_Delivered" }, // Wh + { 19074, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp_Delivered" }, // Wh + { 19076, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp_Delivered" }, // Wh + { 19078, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wq" }, // VAh + { 19080, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wq" }, // VAh + { 19082, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wq" }, // VAh + { 19084, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wq" }, // VAh + { 19086, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws" }, // varh + { 19088, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws" }, // varh + { 19090, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws" }, // varh + { 19092, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws" }, // varh + { 19094, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws_ind" }, // varh + { 19096, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws_ind" }, // varh + { 19098, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws_ind" }, // varh + { 19100, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws_ind" }, // varh + { 19102, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws_cap" }, // varh + { 19104, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws_cap" }, // varh + { 19106, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws_cap" }, // varh + { 19108, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws_cap" }, // varh + { 19110, 1, false, Janitza::FLOAT, Janitza::P_L1, "THD_U" }, // % + { 19112, 1, false, Janitza::FLOAT, Janitza::P_L2, "THD_U" }, // % + { 19114, 1, false, Janitza::FLOAT, Janitza::P_L3, "THD_U" }, // % + { 19116, 1, false, Janitza::FLOAT, Janitza::P_L1, "THD_I" }, // % + { 19118, 1, false, Janitza::FLOAT, Janitza::P_L2, "THD_I" }, // % + { 19120, 1, false, Janitza::FLOAT, Janitza::P_L3, "THD_I" }, // % { 5896, 1, false, Janitza::INT, Janitza::P_NONE, "OperatingTime" }, // seconds diff --git a/software/src/main.cpp b/software/src/main.cpp index 364fcf9..f98b00b 100644 --- a/software/src/main.cpp +++ b/software/src/main.cpp @@ -4,6 +4,7 @@ #include "janitzaRegDefs/UMG96RM.h" #include "eth.h" #include "ArduinoJson.h" +#include "mqtt.h" Janitza janitza; EthernetClient client; @@ -68,7 +69,9 @@ void setup() { MODUBS_SERIAL.begin(MODBUS_BAUD); janitza.setDebugSerial(DBG); janitza.useRS485(MODBUS_DE_PIN, MODBUS_RE_PIN, preTransmission, postTransmission); + #if USE_INFLUXDB janitza.setInfluxSendCallback(sendInfluxRequest, INFLUX_MEASUREMENT); + #endif janitza.init(MODUBS_SERIAL, MODBUS_ADDR, regDef_UMG96RM, sizeof(regDef_UMG96RM)); //blocks until modbus connected @@ -76,6 +79,7 @@ void setup() { initEthernet(); connectEthernet(); + mqttInit(); } uint32_t lastUpdate = 0; @@ -83,12 +87,23 @@ uint32_t lastUpdate = 0; void loop () { if (millis() - lastUpdate >= UPDATE_INTERVAL) { lastUpdate = millis(); - // janitza.readAndSendToInflux(); + + #if USE_INFLUXDB + janitza.readAndSendToInflux(); + #endif + + #if USE_MQTT if (janitza.read()) { doc = janitza.generateJson(); - serializeJson(doc, DBG); + uint8_t buf[4096]; + size_t len = serializeJson(doc, buf); + String topic = MQTT_BASE_TOPIC; + topic += String(janitza.getSerialNumber()); + mqttPublish(topic.c_str(), buf, len); } + #endif } handleHttpResponse(); + mqttLoop(); } \ No newline at end of file diff --git a/software/src/mqtt.h b/software/src/mqtt.h new file mode 100644 index 0000000..ab9633e --- /dev/null +++ b/software/src/mqtt.h @@ -0,0 +1,69 @@ + +#include +#include "eth.h" + +EthernetClient ethClient; +PubSubClient mqttClient(ethClient); +uint32_t mqttLastReconnect = 0; + +bool mqttPublish(const char* topic, const uint8_t* payload, size_t plength, bool retained = false) { + DBG.printf("MQTT publishing %s [%d]: ", topic, plength); + DBG.write(payload, plength); + DBG.printf("\n"); + if (mqttClient.connected()) { + bool success = mqttClient.publish(topic, payload, plength, retained); + if (!success) { + DBG.println("MQTT publish error"); + } + return success; + } + DBG.println("MQTT publish error: Not connected"); + return false; +} + +bool mqttPublish(const char* topic, const uint8_t* payload, bool retained = false) { + if (mqttClient.connected()) { + return mqttClient.publish(topic, payload, retained); + } + return false; +} + +void mqttCallback(char* topic, byte* payload, unsigned int length) { + // handle message arrived + DBG.printf("MQTT /%s: ", topic); + DBG.write(payload, length); + DBG.println(); +} + +bool mqttReconnect() { + DBG.print("MQTT connecting... "); + if (mqttClient.connect(HOSTNAME, MQTT_USER, MQTT_PASSWORD)) { + // mqttClient.publish("test", "hello world"); + // mqttClient.subscribe("test"); + DBG.println("success!"); + } + else { + DBG.println("failed."); + } + return mqttClient.connected(); +} + +void mqttLoop() { + #if USE_MQTT + if (!mqttClient.connected()) { + if (millis() - mqttLastReconnect > 5000) { + mqttLastReconnect = millis(); + mqttReconnect(); + } + } + mqttClient.loop(); + #endif +} + +void mqttInit() { + DBG.println("MQTT init"); + mqttClient.setServer(MQTT_HOST, MQTT_PORT); + mqttClient.setCallback(mqttCallback); + mqttClient.setBufferSize(4096); + mqttReconnect(); +} \ No newline at end of file diff --git a/software/src/secrets.h.sample b/software/src/secrets.h.sample index 512c9a3..c3fa200 100644 --- a/software/src/secrets.h.sample +++ b/software/src/secrets.h.sample @@ -1,9 +1,17 @@ // rename to secrets.h +#define HOSTNAME "Janitza-Modbus2MQTT" + #define INFLUX_HOST "hostnameofinflux" #define INFLUX_PORT 1883 #define INFLUX_DB "databaseName" #define INFLUX_MEASUREMENT "measurementName" //#define INFLUX_USER "username" // actually not used, use base64 below //#define INFLUX_PASS "password" -#define INFLUX_AUTH_BASE64 "dXNlcjpwYXNz" // format: user:pass base64 encoded \ No newline at end of file +#define INFLUX_AUTH_BASE64 "dXNlcjpwYXNz" // format: user:pass base64 encoded + +#define MQTT_HOST "mqttServer" +#define MQTT_PORT 1883 +#define MQTT_USER "mqttUser" // set to NULL when unused (without "") +#define MQTT_PASSWORD "mqttPasswd" // set to NULL when unused (without "") +#define MQTT_BASE_TOPIC "power/janitza/" // has to end with slash