Skip to content

Commit

Permalink
implement MQTT sending
Browse files Browse the repository at this point in the history
  • Loading branch information
LeoDJ committed Jun 25, 2024
1 parent 3e44e6a commit 7d19ec2
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 74 deletions.
4 changes: 4 additions & 0 deletions software/src/globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

#include "secrets.h" // don't forget to create secrets.h based on the .sample file


#define USE_INFLUXDB 0
#define USE_MQTT 1

#define DBG Serial1

#define UPDATE_INTERVAL 5000 // ms
Expand Down
16 changes: 10 additions & 6 deletions software/src/janitza.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ bool Janitza::generateInfluxCommands() {
uint16_t mbBufIdx = 0;

influxQueryLen += sprintf(_influxQueryBuf + influxQueryLen, "%s", _influxMeasurement); // measurement name
influxQueryLen += sprintf(_influxQueryBuf + influxQueryLen, ",serial=%d,phase=%s ", _serialNumber,
influxQueryLen += sprintf(_influxQueryBuf + influxQueryLen, ",serial=%lu,phase=%s ", _serialNumber,
phaseStr); // print tags here

for (int i = REG_DEF_START; i < _regDefLen; i++) {
for (size_t i = REG_DEF_START; i < _regDefLen; i++) {
registerDefinition_t* regDef = &_regDef[i];

// if (regDef->address == 0) { // skip "disabled" addresses
Expand All @@ -124,7 +124,7 @@ bool Janitza::generateInfluxCommands() {

float val = getValue(regDef, mbBufIdx);
if (!isnanf(val)) {
influxQueryLen += sprintf(_influxQueryBuf + influxQueryLen, "%s=%g,", regDef->influxStr, val);
influxQueryLen += sprintf(_influxQueryBuf + influxQueryLen, "%s=%g,", regDef->nameStr, val);
}
}
mbBufIdx += registerDataTypeSize[regDef->type] / 2; // increment modbus buffer index by type size
Expand Down Expand Up @@ -161,10 +161,10 @@ JsonDocument Janitza::generateJson() {

if (regDef->phaseTag != P_NONE) {
const char *phaseStr = phaseTagStr[regDef->phaseTag];
doc["values"][regDef->influxStr][phaseStr] = val;
doc["values"][regDef->nameStr][phaseStr] = val;
}
else {
doc["values"][regDef->influxStr] = val;
doc["values"][regDef->nameStr] = val;
}
}

Expand All @@ -177,7 +177,7 @@ uint16_t Janitza::getContiguousRegisters(uint16_t startIndex) {
return _regDefLen - 1;
}

for (int i = startIndex; i < _regDefLen - 1; i++) { // loop until second to last element
for (size_t i = startIndex; i < _regDefLen - 1; i++) { // loop until second to last element
registerDefinition_t def = _regDef[i];
uint16_t nextAddr = def.address + (registerDataTypeSize[def.type] / 2);
if (_regDef[i + 1].address != nextAddr) {
Expand Down Expand Up @@ -242,6 +242,10 @@ uint32_t Janitza::readSerialNumber() {
return _mb.getResponseBuffer(0) << 16 | _mb.getResponseBuffer(1);
}

uint32_t Janitza::getSerialNumber() {
return _serialNumber;
}

bool Janitza::modbusReadBulk(int16_t* destBuf, uint16_t startAddr, uint16_t count) {
int iterations = (count + (MB_CHUNK_SIZE - 1)) / MB_CHUNK_SIZE; // division, but rounded up
for (int i = 0; i < iterations; i++) {
Expand Down
3 changes: 2 additions & 1 deletion software/src/janitza.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ typedef struct {
uint8_t applyCtRatio : 1;
uint8_t type : 3;
uint8_t phaseTag : 3;
const char *influxStr;
const char *nameStr;
} registerDefinition_t;


Expand Down Expand Up @@ -73,6 +73,7 @@ class Janitza {
void setInfluxSendCallback(void (*influxSendRequest)(char *lineProtocolCommand), const char *measurementName);
void useRS485(uint32_t dePin, uint32_t rePin, void (*preTransmission)(), void (*postTransmission)()); // supply pin for RS485 transceiver direction (DE / RE)
uint32_t readSerialNumber();
uint32_t getSerialNumber();
bool read();
JsonDocument generateJson();
bool generateInfluxCommands();
Expand Down
128 changes: 64 additions & 64 deletions software/src/janitzaRegDefs/UMG96RM.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,72 @@

registerDefinition_t regDef_UMG96RM[] = {
// addr multipl CTratio type phase name
{ 754, 1, false, Janitza::INT, Janitza::P_NONE, "SN" }, // index 0 has to be serial number
{ 10, 1, false, Janitza::FLOAT, Janitza::P_NONE, "CT_prim" }, // index 1 has to be CT primary
{ 12, 1, false, Janitza::FLOAT, Janitza::P_NONE, "CT_sec" }, // index 2 has to be CT secondary
{ 754, 1, false, Janitza::INT, Janitza::P_NONE, "SN" }, // index 0 has to be serial number
{ 10, 1, false, Janitza::FLOAT, Janitza::P_NONE, "CT_prim" }, // index 1 has to be CT primary
{ 12, 1, false, Janitza::FLOAT, Janitza::P_NONE, "CT_sec" }, // index 2 has to be CT secondary


{ 19000, 1, false, Janitza::FLOAT, Janitza::P_L1, "U_LN" },
{ 19002, 1, false, Janitza::FLOAT, Janitza::P_L2, "U_LN" },
{ 19004, 1, false, Janitza::FLOAT, Janitza::P_L3, "U_LN" },
{ 19006, 1, false, Janitza::FLOAT, Janitza::P_L1L2, "U_LL" },
{ 19008, 1, false, Janitza::FLOAT, Janitza::P_L2L3, "U_LL" },
{ 19010, 1, false, Janitza::FLOAT, Janitza::P_L3L1, "U_LL" },
{ 19012, 1, false, Janitza::FLOAT, Janitza::P_L1, "I" },
{ 19014, 1, false, Janitza::FLOAT, Janitza::P_L2, "I" },
{ 19016, 1, false, Janitza::FLOAT, Janitza::P_L3, "I" },
{ 19018, 1, false, Janitza::FLOAT, Janitza::P_ALL, "I_N" },
{ 19020, 1, false, Janitza::FLOAT, Janitza::P_L1, "P" },
{ 19022, 1, false, Janitza::FLOAT, Janitza::P_L2, "P" },
{ 19024, 1, false, Janitza::FLOAT, Janitza::P_L3, "P" },
{ 19026, 1, false, Janitza::FLOAT, Janitza::P_ALL, "P" },
{ 19028, 1, false, Janitza::FLOAT, Janitza::P_L1, "Q" },
{ 19030, 1, false, Janitza::FLOAT, Janitza::P_L2, "Q" },
{ 19032, 1, false, Janitza::FLOAT, Janitza::P_L3, "Q" },
{ 19034, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Q" },
{ 19036, 1, false, Janitza::FLOAT, Janitza::P_L1, "S" },
{ 19038, 1, false, Janitza::FLOAT, Janitza::P_L2, "S" },
{ 19040, 1, false, Janitza::FLOAT, Janitza::P_L3, "S" },
{ 19042, 1, false, Janitza::FLOAT, Janitza::P_ALL, "S" },
{ 19044, 1, false, Janitza::FLOAT, Janitza::P_L1, "CosPhi" },
{ 19046, 1, false, Janitza::FLOAT, Janitza::P_L2, "CosPhi" },
{ 19048, 1, false, Janitza::FLOAT, Janitza::P_L3, "CosPhi" },
{ 19050, 1, false, Janitza::FLOAT, Janitza::P_NONE, "F" },
{ 19052, 1, false, Janitza::FLOAT, Janitza::P_NONE, "Rotation" },
{ 19054, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp" },
{ 19056, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp" },
{ 19058, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp" },
{ 19060, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp" },
{ 19062, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp_Consumed" },
{ 19064, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp_Consumed" },
{ 19066, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp_Consumed" },
{ 19068, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp_Consumed" },
{ 19070, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp_Delivered" },
{ 19072, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp_Delivered" },
{ 19074, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp_Delivered" },
{ 19076, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp_Delivered" },
{ 19078, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wq" },
{ 19080, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wq" },
{ 19082, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wq" },
{ 19084, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wq" },
{ 19086, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws" },
{ 19088, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws" },
{ 19090, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws" },
{ 19092, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws" },
{ 19094, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws_ind" },
{ 19096, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws_ind" },
{ 19098, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws_ind" },
{ 19100, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws_ind" },
{ 19102, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws_cap" },
{ 19104, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws_cap" },
{ 19106, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws_cap" },
{ 19108, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws_cap" },
{ 19110, 1, false, Janitza::FLOAT, Janitza::P_L1, "THD_U" },
{ 19112, 1, false, Janitza::FLOAT, Janitza::P_L2, "THD_U" },
{ 19114, 1, false, Janitza::FLOAT, Janitza::P_L3, "THD_U" },
{ 19116, 1, false, Janitza::FLOAT, Janitza::P_L1, "THD_I" },
{ 19118, 1, false, Janitza::FLOAT, Janitza::P_L2, "THD_I" },
{ 19120, 1, false, Janitza::FLOAT, Janitza::P_L3, "THD_I" },
{ 19000, 1, false, Janitza::FLOAT, Janitza::P_L1, "U_LN" }, // V
{ 19002, 1, false, Janitza::FLOAT, Janitza::P_L2, "U_LN" }, // V
{ 19004, 1, false, Janitza::FLOAT, Janitza::P_L3, "U_LN" }, // V
{ 19006, 1, false, Janitza::FLOAT, Janitza::P_L1L2, "U_LL" }, // V
{ 19008, 1, false, Janitza::FLOAT, Janitza::P_L2L3, "U_LL" }, // V
{ 19010, 1, false, Janitza::FLOAT, Janitza::P_L3L1, "U_LL" }, // V
{ 19012, 1, false, Janitza::FLOAT, Janitza::P_L1, "I" }, // A
{ 19014, 1, false, Janitza::FLOAT, Janitza::P_L2, "I" }, // A
{ 19016, 1, false, Janitza::FLOAT, Janitza::P_L3, "I" }, // A
{ 19018, 1, false, Janitza::FLOAT, Janitza::P_ALL, "I_N" }, // A
{ 19020, 1, false, Janitza::FLOAT, Janitza::P_L1, "P" }, // W
{ 19022, 1, false, Janitza::FLOAT, Janitza::P_L2, "P" }, // W
{ 19024, 1, false, Janitza::FLOAT, Janitza::P_L3, "P" }, // W
{ 19026, 1, false, Janitza::FLOAT, Janitza::P_ALL, "P" }, // W
{ 19028, 1, false, Janitza::FLOAT, Janitza::P_L1, "Q" }, // VA
{ 19030, 1, false, Janitza::FLOAT, Janitza::P_L2, "Q" }, // VA
{ 19032, 1, false, Janitza::FLOAT, Janitza::P_L3, "Q" }, // VA
{ 19034, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Q" }, // VA
{ 19036, 1, false, Janitza::FLOAT, Janitza::P_L1, "S" }, // var
{ 19038, 1, false, Janitza::FLOAT, Janitza::P_L2, "S" }, // var
{ 19040, 1, false, Janitza::FLOAT, Janitza::P_L3, "S" }, // var
{ 19042, 1, false, Janitza::FLOAT, Janitza::P_ALL, "S" }, // var
{ 19044, 1, false, Janitza::FLOAT, Janitza::P_L1, "CosPhi" }, // -
{ 19046, 1, false, Janitza::FLOAT, Janitza::P_L2, "CosPhi" }, // -
{ 19048, 1, false, Janitza::FLOAT, Janitza::P_L3, "CosPhi" }, // -
{ 19050, 1, false, Janitza::FLOAT, Janitza::P_NONE, "F" }, // Hz
{ 19052, 1, false, Janitza::FLOAT, Janitza::P_NONE, "Rotation" }, // -1: left, 0: none, 1: right
{ 19054, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp" }, // Wh
{ 19056, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp" }, // Wh
{ 19058, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp" }, // Wh
{ 19060, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp" }, // Wh
{ 19062, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp_Consumed" }, // Wh
{ 19064, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp_Consumed" }, // Wh
{ 19066, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp_Consumed" }, // Wh
{ 19068, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp_Consumed" }, // Wh
{ 19070, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wp_Delivered" }, // Wh
{ 19072, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wp_Delivered" }, // Wh
{ 19074, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wp_Delivered" }, // Wh
{ 19076, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wp_Delivered" }, // Wh
{ 19078, 1, false, Janitza::FLOAT, Janitza::P_L1, "Wq" }, // VAh
{ 19080, 1, false, Janitza::FLOAT, Janitza::P_L2, "Wq" }, // VAh
{ 19082, 1, false, Janitza::FLOAT, Janitza::P_L3, "Wq" }, // VAh
{ 19084, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Wq" }, // VAh
{ 19086, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws" }, // varh
{ 19088, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws" }, // varh
{ 19090, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws" }, // varh
{ 19092, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws" }, // varh
{ 19094, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws_ind" }, // varh
{ 19096, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws_ind" }, // varh
{ 19098, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws_ind" }, // varh
{ 19100, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws_ind" }, // varh
{ 19102, 1, false, Janitza::FLOAT, Janitza::P_L1, "Ws_cap" }, // varh
{ 19104, 1, false, Janitza::FLOAT, Janitza::P_L2, "Ws_cap" }, // varh
{ 19106, 1, false, Janitza::FLOAT, Janitza::P_L3, "Ws_cap" }, // varh
{ 19108, 1, false, Janitza::FLOAT, Janitza::P_ALL, "Ws_cap" }, // varh
{ 19110, 1, false, Janitza::FLOAT, Janitza::P_L1, "THD_U" }, // %
{ 19112, 1, false, Janitza::FLOAT, Janitza::P_L2, "THD_U" }, // %
{ 19114, 1, false, Janitza::FLOAT, Janitza::P_L3, "THD_U" }, // %
{ 19116, 1, false, Janitza::FLOAT, Janitza::P_L1, "THD_I" }, // %
{ 19118, 1, false, Janitza::FLOAT, Janitza::P_L2, "THD_I" }, // %
{ 19120, 1, false, Janitza::FLOAT, Janitza::P_L3, "THD_I" }, // %

{ 5896, 1, false, Janitza::INT, Janitza::P_NONE, "OperatingTime" }, // seconds

Expand Down
19 changes: 17 additions & 2 deletions software/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "janitzaRegDefs/UMG96RM.h"
#include "eth.h"
#include "ArduinoJson.h"
#include "mqtt.h"

Janitza janitza;
EthernetClient client;
Expand Down Expand Up @@ -68,27 +69,41 @@ void setup() {
MODUBS_SERIAL.begin(MODBUS_BAUD);
janitza.setDebugSerial(DBG);
janitza.useRS485(MODBUS_DE_PIN, MODBUS_RE_PIN, preTransmission, postTransmission);
#if USE_INFLUXDB
janitza.setInfluxSendCallback(sendInfluxRequest, INFLUX_MEASUREMENT);
#endif
janitza.init(MODUBS_SERIAL, MODBUS_ADDR, regDef_UMG96RM, sizeof(regDef_UMG96RM)); //blocks until modbus connected


// janitza.read();

initEthernet();
connectEthernet();
mqttInit();
}

uint32_t lastUpdate = 0;

void loop () {
if (millis() - lastUpdate >= UPDATE_INTERVAL) {
lastUpdate = millis();
// janitza.readAndSendToInflux();

#if USE_INFLUXDB
janitza.readAndSendToInflux();
#endif

#if USE_MQTT
if (janitza.read()) {
doc = janitza.generateJson();
serializeJson(doc, DBG);
uint8_t buf[4096];
size_t len = serializeJson(doc, buf);
String topic = MQTT_BASE_TOPIC;
topic += String(janitza.getSerialNumber());
mqttPublish(topic.c_str(), buf, len);
}
#endif
}

handleHttpResponse();
mqttLoop();
}
69 changes: 69 additions & 0 deletions software/src/mqtt.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@

#include <PubSubClient.h>
#include "eth.h"

EthernetClient ethClient;
PubSubClient mqttClient(ethClient);
uint32_t mqttLastReconnect = 0;

bool mqttPublish(const char* topic, const uint8_t* payload, size_t plength, bool retained = false) {
DBG.printf("MQTT publishing %s [%d]: ", topic, plength);
DBG.write(payload, plength);
DBG.printf("\n");
if (mqttClient.connected()) {
bool success = mqttClient.publish(topic, payload, plength, retained);
if (!success) {
DBG.println("MQTT publish error");
}
return success;
}
DBG.println("MQTT publish error: Not connected");
return false;
}

bool mqttPublish(const char* topic, const uint8_t* payload, bool retained = false) {
if (mqttClient.connected()) {
return mqttClient.publish(topic, payload, retained);
}
return false;
}

void mqttCallback(char* topic, byte* payload, unsigned int length) {
// handle message arrived
DBG.printf("MQTT /%s: ", topic);
DBG.write(payload, length);
DBG.println();
}

bool mqttReconnect() {
DBG.print("MQTT connecting... ");
if (mqttClient.connect(HOSTNAME, MQTT_USER, MQTT_PASSWORD)) {
// mqttClient.publish("test", "hello world");
// mqttClient.subscribe("test");
DBG.println("success!");
}
else {
DBG.println("failed.");
}
return mqttClient.connected();
}

void mqttLoop() {
#if USE_MQTT
if (!mqttClient.connected()) {
if (millis() - mqttLastReconnect > 5000) {
mqttLastReconnect = millis();
mqttReconnect();
}
}
mqttClient.loop();
#endif
}

void mqttInit() {
DBG.println("MQTT init");
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
mqttClient.setCallback(mqttCallback);
mqttClient.setBufferSize(4096);
mqttReconnect();
}
10 changes: 9 additions & 1 deletion software/src/secrets.h.sample
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
// rename to secrets.h

#define HOSTNAME "Janitza-Modbus2MQTT"

#define INFLUX_HOST "hostnameofinflux"
#define INFLUX_PORT 1883
#define INFLUX_DB "databaseName"
#define INFLUX_MEASUREMENT "measurementName"
//#define INFLUX_USER "username" // actually not used, use base64 below
//#define INFLUX_PASS "password"
#define INFLUX_AUTH_BASE64 "dXNlcjpwYXNz" // format: user:pass base64 encoded
#define INFLUX_AUTH_BASE64 "dXNlcjpwYXNz" // format: user:pass base64 encoded

#define MQTT_HOST "mqttServer"
#define MQTT_PORT 1883
#define MQTT_USER "mqttUser" // set to NULL when unused (without "")
#define MQTT_PASSWORD "mqttPasswd" // set to NULL when unused (without "")
#define MQTT_BASE_TOPIC "power/janitza/" // has to end with slash

0 comments on commit 7d19ec2

Please sign in to comment.