From eb08985d678cbf55b05810966818028024e9c9e2 Mon Sep 17 00:00:00 2001 From: Robert Strouse Date: Mon, 2 May 2022 17:37:49 -0700 Subject: [PATCH] Added interface for AqualinkD. Still needs some testing but it should support pumps, chlorinators, heat setpoints, and temperature sensors. --- controller/Equipment.ts | 2 +- controller/comms/Comms.ts | 4 + controller/nixie/chemistry/Chlorinator.ts | 157 ++++---- controller/nixie/pumps/Pump.ts | 373 +++++++++--------- defaultConfig.json | 19 + web/Server.ts | 4 +- web/bindings/aqualinkD.json | 441 ++++++++++++++++++++++ web/interfaces/mqttInterface.ts | 126 ++++--- 8 files changed, 826 insertions(+), 300 deletions(-) create mode 100644 web/bindings/aqualinkD.json diff --git a/controller/Equipment.ts b/controller/Equipment.ts index 583132d6..8cd189d0 100644 --- a/controller/Equipment.ts +++ b/controller/Equipment.ts @@ -1021,7 +1021,7 @@ export class BodyCollection extends EqItemCollection { let body = this.find(elem => { if (typeof obj.id !== 'undefined') return obj.id === elem.id; else if (typeof obj.circuit !== 'undefined') return obj.circuit === elem.circuit; - else if (typeof obj.name !== 'undefined') return obj.name === body.name; + else if (typeof obj.name !== 'undefined') return obj.name === elem.name; else return false; }); return body; diff --git a/controller/comms/Comms.ts b/controller/comms/Comms.ts index 250bb1d9..fd21f1eb 100755 --- a/controller/comms/Comms.ts +++ b/controller/comms/Comms.ts @@ -34,6 +34,10 @@ export class Connection { let port = this.findPortById(0); return typeof port !== 'undefined' && port.mockPort ? true : false; } + public isPortEnabled(portId: number) { + let port: RS485Port = this.findPortById(portId); + return typeof port === 'undefined' ? false : port.enabled; + } public async deleteAuxPort(data: any): Promise { try { let portId = parseInt(data.portId, 10); diff --git a/controller/nixie/chemistry/Chlorinator.ts b/controller/nixie/chemistry/Chlorinator.ts index 88abe1bd..4a06f518 100644 --- a/controller/nixie/chemistry/Chlorinator.ts +++ b/controller/nixie/chemistry/Chlorinator.ts @@ -218,34 +218,40 @@ export class NixieChlorinator extends NixieEquipment { // Disable the control panel by sending an action 0 the chlorinator should respond with an action 1. //[16, 2, 80, 0][0][98, 16, 3] let success = await new Promise((resolve, reject) => { - let out = Outbound.create({ - portId: this.chlor.portId || 0, - protocol: Protocol.Chlorinator, - //dest: this.chlor.id, - dest: 1, - action: 0, - payload: [0], - retries: 3, // IntelliCenter tries 4 times to get a response. - response: Response.create({ protocol: Protocol.Chlorinator, action: 1 }), - onAbort: () => { this.chlor.superChlor = cstate.superChlor = false; this.setSuperChlor(cstate); }, - onComplete: (err) => { - if (err) { - // This flag is cleared in ChlorinatorStateMessage - this.chlor.superChlor = cstate.superChlor = false; - this.setSuperChlor(cstate); - cstate.status = 128; - resolve(false); - } - else { - // If this is successful the action 1 message will have been - // digested by ChlorinatorStateMessage and the lastComm will have been set clearing the - // communication lost flag. - resolve(true); + if (conn.isPortEnabled(this.chlor.portId || 0)) { + let out = Outbound.create({ + portId: this.chlor.portId || 0, + protocol: Protocol.Chlorinator, + //dest: this.chlor.id, + dest: 1, + action: 0, + payload: [0], + retries: 3, // IntelliCenter tries 4 times to get a response. + response: Response.create({ protocol: Protocol.Chlorinator, action: 1 }), + onAbort: () => { this.chlor.superChlor = cstate.superChlor = false; this.setSuperChlor(cstate); }, + onComplete: (err) => { + if (err) { + // This flag is cleared in ChlorinatorStateMessage + this.chlor.superChlor = cstate.superChlor = false; + this.setSuperChlor(cstate); + cstate.status = 128; + resolve(false); + } + else { + // If this is successful the action 1 message will have been + // digested by ChlorinatorStateMessage and the lastComm will have been set clearing the + // communication lost flag. + resolve(true); + } + cstate.emitEquipmentChange(); } - cstate.emitEquipmentChange(); - } - }); - conn.queueSendMessage(out); + }); + conn.queueSendMessage(out); + } + else { + cstate.status = 0; + resolve(true); + } }); return success; } catch (err) { logger.error(`Communication error with Chlorinator ${this.chlor.name} : ${err.message}`); } @@ -280,32 +286,39 @@ export class NixieChlorinator extends NixieEquipment { //[16, 2, 80, 17][0][115, 16, 3] cstate.targetOutput = cstate.superChlor ? 100 : setpoint; let success = await new Promise((resolve, reject) => { - let out = Outbound.create({ - portId: this.chlor.portId || 0, - protocol: Protocol.Chlorinator, - //dest: this.chlor.id, - dest: 1, - action: 17, - payload: [cstate.targetOutput], - retries: 7, // IntelliCenter tries 8 times to make this happen. - response: Response.create({ protocol: Protocol.Chlorinator, action: 18 }), - onAbort: () => {}, - onComplete: (err) => { - if (err) { - cstate.currentOutput = 0; - cstate.status = 128; - resolve(false); - } - else { - cstate.currentOutput = cstate.targetOutput; - this.setSuperChlor(cstate); - resolve(true); + if (conn.isPortEnabled(this.chlor.portId || 0)) { + let out = Outbound.create({ + portId: this.chlor.portId || 0, + protocol: Protocol.Chlorinator, + //dest: this.chlor.id, + dest: 1, + action: 17, + payload: [cstate.targetOutput], + retries: 7, // IntelliCenter tries 8 times to make this happen. + response: Response.create({ protocol: Protocol.Chlorinator, action: 18 }), + onAbort: () => { }, + onComplete: (err) => { + if (err) { + cstate.currentOutput = 0; + cstate.status = 128; + resolve(false); + } + else { + cstate.currentOutput = cstate.targetOutput; + this.setSuperChlor(cstate); + resolve(true); + } } - } - }); - // #338 - if (setpoint === 16) { out.appendPayloadByte(0); } - conn.queueSendMessage(out); + }); + // #338 + if (setpoint === 16) { out.appendPayloadByte(0); } + conn.queueSendMessage(out); + } + else { + cstate.currentOutput = cstate.targetOutput; + this.setSuperChlor(cstate); + resolve(true); + } }); cstate.emitEquipmentChange(); return success; @@ -322,27 +335,29 @@ export class NixieChlorinator extends NixieEquipment { // Ask the chlorinator for its model. //[16, 2, 80, 20][0][118, 16, 3] let success = await new Promise((resolve, reject) => { - let out = Outbound.create({ - portId: this.chlor.portId || 0, - protocol: Protocol.Chlorinator, - //dest: this.chlor.id, - dest: 1, - action: 20, - payload: [0], - retries: 3, // IntelliCenter tries 4 times to get a response. - response: Response.create({ protocol: Protocol.Chlorinator, action: 3 }), - onAbort: () => { }, - onComplete: (err) => { - if (err) resolve(false); - else resolve(true); - } - }); - conn.queueSendMessage(out); + if (conn.isPortEnabled(this.chlor.portId || 0)) { + let out = Outbound.create({ + portId: this.chlor.portId || 0, + protocol: Protocol.Chlorinator, + //dest: this.chlor.id, + dest: 1, + action: 20, + payload: [0], + retries: 3, // IntelliCenter tries 4 times to get a response. + response: Response.create({ protocol: Protocol.Chlorinator, action: 3 }), + onAbort: () => { }, + onComplete: (err) => { + if (err) resolve(false); + else resolve(true); + } + }); + conn.queueSendMessage(out); + } + else { resolve(true); } }); + return success; } - else return Promise.resolve(false); + else return false; } catch (err) { logger.error(`Communication error with Chlorinator ${this.chlor.name} : ${err.message}`); return Promise.reject(err);} - } - } diff --git a/controller/nixie/pumps/Pump.ts b/controller/nixie/pumps/Pump.ts index 221439f0..ca487f77 100644 --- a/controller/nixie/pumps/Pump.ts +++ b/controller/nixie/pumps/Pump.ts @@ -505,135 +505,147 @@ export class NixiePumpRS485 extends NixiePump { finally { this.suspendPolling = false; } }; protected async setDriveStateAsync(running: boolean = true) { - return new Promise((resolve, reject) => { - let out = Outbound.create({ - portId: this.pump.portId || 0, - protocol: Protocol.Pump, - dest: this.pump.address, - action: 6, - payload: running && this._targetSpeed > 0 ? [10] : [4], - retries: 1, - response: true, - onComplete: (err, msg: Outbound) => { - if (err) { - logger.error(`Error sending setDriveState for ${this.pump.name} : ${err.message}`); - reject(err); + if (conn.isPortEnabled(this.pump.portId || 0)) { + return new Promise((resolve, reject) => { + let out = Outbound.create({ + portId: this.pump.portId || 0, + protocol: Protocol.Pump, + dest: this.pump.address, + action: 6, + payload: running && this._targetSpeed > 0 ? [10] : [4], + retries: 1, + response: true, + onComplete: (err, msg: Outbound) => { + if (err) { + logger.error(`Error sending setDriveState for ${this.pump.name} : ${err.message}`); + reject(err); + } + else resolve(); } - else resolve(); - } + }); + conn.queueSendMessage(out); }); - conn.queueSendMessage(out); - }); + } }; protected async requestPumpStatus() { - return new Promise((resolve, reject) => { - let out = Outbound.create({ - portId: this.pump.portId || 0, - protocol: Protocol.Pump, - dest: this.pump.address, - action: 7, - payload: [], - retries: 2, - response: true, - onComplete: (err, msg) => { - if (err) { - logger.error(`Error sending requestPumpStatus for ${this.pump.name}: ${err.message}`); - reject(err); + if (conn.isPortEnabled(this.pump.portId || 0)) { + return new Promise((resolve, reject) => { + let out = Outbound.create({ + portId: this.pump.portId || 0, + protocol: Protocol.Pump, + dest: this.pump.address, + action: 7, + payload: [], + retries: 2, + response: true, + onComplete: (err, msg) => { + if (err) { + logger.error(`Error sending requestPumpStatus for ${this.pump.name}: ${err.message}`); + reject(err); + } + else resolve(); } - else resolve(); - } + }); + conn.queueSendMessage(out); }); - conn.queueSendMessage(out); - }) + } }; protected setPumpToRemoteControl(running: boolean = true) { - return new Promise((resolve, reject) => { - let out = Outbound.create({ - portId: this.pump.portId || 0, - protocol: Protocol.Pump, - dest: this.pump.address, - action: 4, - payload: running ? [255] : [0], // when stopAsync is called, pass false to return control to pump panel - // payload: spump.virtualControllerStatus === sys.board.valueMaps.virtualControllerStatus.getValue('running') ? [255] : [0], - retries: 1, - response: true, - onComplete: (err) => { - if (err) { - logger.error(`Error sending setPumpToRemoteControl for ${this.pump.name}: ${err.message}`); - reject(err); + if (conn.isPortEnabled(this.pump.portId || 0)) { + return new Promise((resolve, reject) => { + let out = Outbound.create({ + portId: this.pump.portId || 0, + protocol: Protocol.Pump, + dest: this.pump.address, + action: 4, + payload: running ? [255] : [0], // when stopAsync is called, pass false to return control to pump panel + // payload: spump.virtualControllerStatus === sys.board.valueMaps.virtualControllerStatus.getValue('running') ? [255] : [0], + retries: 1, + response: true, + onComplete: (err) => { + if (err) { + logger.error(`Error sending setPumpToRemoteControl for ${this.pump.name}: ${err.message}`); + reject(err); + } + else resolve(); } - else resolve(); - } + }); + conn.queueSendMessage(out); }); - conn.queueSendMessage(out); - }); + } } protected setPumpFeature(feature?: number) { - // empty payload (possibly 0?, too) is no feature - // 6: Feature 1 - return new Promise((resolve, reject) => { - let out = Outbound.create({ - portId: this.pump.portId || 0, - protocol: Protocol.Pump, - dest: this.pump.address, - action: 5, - payload: typeof feature === 'undefined' ? [] : [ feature ], - retries: 2, - response: true, - onComplete: (err, msg: Outbound) => { - if (err) { - logger.error(`Error sending setPumpManual for ${this.pump.name}: ${err.message}`); - reject(err); + if (conn.isPortEnabled(this.pump.portId || 0)) { + // empty payload (possibly 0?, too) is no feature + // 6: Feature 1 + return new Promise((resolve, reject) => { + let out = Outbound.create({ + portId: this.pump.portId || 0, + protocol: Protocol.Pump, + dest: this.pump.address, + action: 5, + payload: typeof feature === 'undefined' ? [] : [feature], + retries: 2, + response: true, + onComplete: (err, msg: Outbound) => { + if (err) { + logger.error(`Error sending setPumpManual for ${this.pump.name}: ${err.message}`); + reject(err); + } + else resolve(); } - else resolve(); - } + }); + conn.queueSendMessage(out); }); - conn.queueSendMessage(out); - }); + } }; protected async setPumpRPMAsync() { - return new Promise((resolve, reject) => { - let out = Outbound.create({ - portId: this.pump.portId || 0, - protocol: Protocol.Pump, - dest: this.pump.address, - action: 1, - payload: [2, 196, Math.floor(this._targetSpeed / 256), this._targetSpeed % 256], - retries: 1, - // timeout: 250, - response: true, - onComplete: (err, msg) => { - if (err) { - logger.error(`Error sending setPumpRPMAsync for ${this.pump.name}: ${err.message}`); - reject(err); + if (conn.isPortEnabled(this.pump.portId || 0)) { + return new Promise((resolve, reject) => { + let out = Outbound.create({ + portId: this.pump.portId || 0, + protocol: Protocol.Pump, + dest: this.pump.address, + action: 1, + payload: [2, 196, Math.floor(this._targetSpeed / 256), this._targetSpeed % 256], + retries: 1, + // timeout: 250, + response: true, + onComplete: (err, msg) => { + if (err) { + logger.error(`Error sending setPumpRPMAsync for ${this.pump.name}: ${err.message}`); + reject(err); + } + else resolve(); } - else resolve(); - } + }); + conn.queueSendMessage(out); }); - conn.queueSendMessage(out); - }); + } }; protected async setPumpGPMAsync() { - // packet for vf; vsf will override - return new Promise((resolve, reject) => { - let out = Outbound.create({ - portId: this.pump.portId || 0, - protocol: Protocol.Pump, - dest: this.pump.address, - action: 1, - payload: [2, 228, 0, this._targetSpeed], - retries: 1, - response: true, - onComplete: (err, msg) => { - if (err) { - logger.error(`Error sending setPumpGPMAsync for ${this.pump.name}: ${err.message}`); - reject(err); + if (conn.isPortEnabled(this.pump.portId || 0)) { + // packet for vf; vsf will override + return new Promise((resolve, reject) => { + let out = Outbound.create({ + portId: this.pump.portId || 0, + protocol: Protocol.Pump, + dest: this.pump.address, + action: 1, + payload: [2, 228, 0, this._targetSpeed], + retries: 1, + response: true, + onComplete: (err, msg) => { + if (err) { + logger.error(`Error sending setPumpGPMAsync for ${this.pump.name}: ${err.message}`); + reject(err); + } + else resolve(); } - else resolve(); - } + }); + conn.queueSendMessage(out); }); - conn.queueSendMessage(out); - }); + } }; public async closeAsync() { try { @@ -742,49 +754,53 @@ export class NixiePumpVSF extends NixiePumpRS485 { } protected async setPumpRPMAsync() { // vsf action is 10 for rpm - return new Promise((resolve, reject) => { - let out = Outbound.create({ - portId: this.pump.portId || 0, - protocol: Protocol.Pump, - dest: this.pump.address, - action: 10, - payload: [2, 196, Math.floor(this._targetSpeed / 256), this._targetSpeed % 256], - retries: 1, - // timeout: 250, - response: true, - onComplete: (err, msg) => { - if (err) { - logger.error(`Error sending setPumpRPMAsync for ${this.pump.name}: ${err.message}`); - reject(err); + if (conn.isPortEnabled(this.pump.portId || 0)) { + return new Promise((resolve, reject) => { + let out = Outbound.create({ + portId: this.pump.portId || 0, + protocol: Protocol.Pump, + dest: this.pump.address, + action: 10, + payload: [2, 196, Math.floor(this._targetSpeed / 256), this._targetSpeed % 256], + retries: 1, + // timeout: 250, + response: true, + onComplete: (err, msg) => { + if (err) { + logger.error(`Error sending setPumpRPMAsync for ${this.pump.name}: ${err.message}`); + reject(err); + } + else resolve(); } - else resolve(); - } + }); + conn.queueSendMessage(out); }); - conn.queueSendMessage(out); - }); + } }; protected async setPumpGPMAsync() { - // vsf payload; different from vf payload - return new Promise((resolve, reject) => { - let out = Outbound.create({ - portId: this.pump.portId || 0, - protocol: Protocol.Pump, - dest: this.pump.address, - action: 9, - payload: [2, 196, 0, this._targetSpeed], - retries: 1, - response: true, - onComplete: (err, msg) => { - if (err) { - logger.error(`Error sending setPumpGPMAsync for ${this.pump.name}: ${err.message}`); - reject(err); + if (conn.isPortEnabled(this.pump.portId || 0)) { + // vsf payload; different from vf payload + return new Promise((resolve, reject) => { + let out = Outbound.create({ + portId: this.pump.portId || 0, + protocol: Protocol.Pump, + dest: this.pump.address, + action: 9, + payload: [2, 196, 0, this._targetSpeed], + retries: 1, + response: true, + onComplete: (err, msg) => { + if (err) { + logger.error(`Error sending setPumpGPMAsync for ${this.pump.name}: ${err.message}`); + reject(err); + } + else resolve(); + return } - else resolve(); - return - } + }); + conn.queueSendMessage(out); }); - conn.queueSendMessage(out); - }); + } }; }; export class NixiePumpHWVS extends NixiePumpRS485 { @@ -823,54 +839,57 @@ export class NixiePumpHWVS extends NixiePumpRS485 { protected async requestPumpStatus() { return Promise.resolve(); }; protected setPumpFeature(feature?: number) { return Promise.resolve(); } protected setPumpToRemoteControl(running: boolean = true) { - // We do nothing on this pump to set it to remote control. That is unless we are turning it off. - return new Promise((resolve, reject) => { - if (!running) { + if (conn.isPortEnabled(this.pump.portId || 0)) { + // We do nothing on this pump to set it to remote control. That is unless we are turning it off. + return new Promise((resolve, reject) => { + if (!running) { + let out = Outbound.create({ + portId: this.pump.portId || 0, + protocol: Protocol.Hayward, + source: 12, // Use the broadcast address + dest: this.pump.address, + action: 1, + payload: [0], // when stopAsync is called, pass false to return control to pump panel + // payload: spump.virtualControllerStatus === sys.board.valueMaps.virtualControllerStatus.getValue('running') ? [255] : [0], + retries: 1, + response: Response.create({ protocol: Protocol.Hayward, action: 12, source: this.pump.address }), + onComplete: (err) => { + if (err) { + logger.error(`Error sending setPumpToRemoteControl for ${this.pump.name}: ${err.message}`); + reject(err); + } + else resolve(); + } + }); + conn.queueSendMessage(out); + } + else resolve(); + }); + } + } + protected async setPumpRPMAsync() { + if (conn.isPortEnabled(this.pump.portId || 0)) { + return new Promise((resolve, reject) => { + let pt = sys.board.valueMaps.pumpTypes.get(this.pump.type); let out = Outbound.create({ portId: this.pump.portId || 0, protocol: Protocol.Hayward, source: 12, // Use the broadcast address - dest: this.pump.address, + dest: this.pump.address - 96, action: 1, - payload: [0], // when stopAsync is called, pass false to return control to pump panel - // payload: spump.virtualControllerStatus === sys.board.valueMaps.virtualControllerStatus.getValue('running') ? [255] : [0], + payload: [Math.min(Math.round((this._targetSpeed / pt.maxSpeed) * 100), 100)], // when stopAsync is called, pass false to return control to pump panel retries: 1, response: Response.create({ protocol: Protocol.Hayward, action: 12, source: this.pump.address }), onComplete: (err) => { if (err) { - logger.error(`Error sending setPumpToRemoteControl for ${this.pump.name}: ${err.message}`); + logger.error(`Error sending setPumpRPM for ${this.pump.name}: ${err.message}`); reject(err); } else resolve(); } }); conn.queueSendMessage(out); - } - else resolve(); - }); - } - protected async setPumpRPMAsync() { - return new Promise((resolve, reject) => { - let pt = sys.board.valueMaps.pumpTypes.get(this.pump.type); - let out = Outbound.create({ - portId: this.pump.portId || 0, - protocol: Protocol.Hayward, - source: 12, // Use the broadcast address - dest: this.pump.address - 96, - action: 1, - payload: [Math.min(Math.round((this._targetSpeed / pt.maxSpeed) * 100), 100)], // when stopAsync is called, pass false to return control to pump panel - retries: 1, - response: Response.create({ protocol: Protocol.Hayward, action: 12, source: this.pump.address }), - onComplete: (err) => { - if (err) { - logger.error(`Error sending setPumpRPM for ${this.pump.name}: ${err.message}`); - reject(err); - } - else resolve(); - } }); - conn.queueSendMessage(out); - }); + } }; - } diff --git a/defaultConfig.json b/defaultConfig.json index 6cae6052..30a4e238 100755 --- a/defaultConfig.json +++ b/defaultConfig.json @@ -160,6 +160,25 @@ "changesOnly": true } }, + "aqualinkD": { + "name": "AquaLinkD", + "type": "mqtt", + "enabled": false, + "fileName": "aqualinkD.json", + "globals": {}, + "options": { + "protocol": "mqtt://", + "host": "192.168.0.1", + "port": 1883, + "username": "", + "password": "", + "rootTopic": "aqualinkd", + "retain": true, + "qos": 0, + "changesOnly": true + } + + }, "mqttAlt": { "name": "MQTTAlt", "type": "mqtt", diff --git a/web/Server.ts b/web/Server.ts index 24f238bf..b0d9c3f1 100755 --- a/web/Server.ts +++ b/web/Server.ts @@ -1066,7 +1066,6 @@ export class HttpInterfaceServer extends ProtoServer { catch (err) { } } } - export class InfluxInterfaceServer extends ProtoServer { public bindingsPath: string; public bindings: InfluxInterfaceBindings; @@ -1127,7 +1126,6 @@ export class InfluxInterfaceServer extends ProtoServer { } } } - export class MqttInterfaceServer extends ProtoServer { public bindingsPath: string; public bindings: HttpInterfaceBindings; @@ -1414,7 +1412,7 @@ export class REMInterfaceServer extends ProtoServer { } private isJSONString(s: string): boolean { if (typeof s !== 'string') return false; - if (typeof s.startsWith('{') || typeof s.startsWith('[')) return true; + if (s.startsWith('{') || s.startsWith('[')) return true; return false; } public async getApiService(url: string, data?: any, timeout: number = 3600): Promise { diff --git a/web/bindings/aqualinkD.json b/web/bindings/aqualinkD.json new file mode 100644 index 00000000..75966c82 --- /dev/null +++ b/web/bindings/aqualinkD.json @@ -0,0 +1,441 @@ +{ + "context": { + "name": "AqualinkD", + "options": { + "formatter": [ + { + "transform": ".toLowerCase()" + }, + { + "regexkey": "\\s", + "replace": "", + "description": "Remove whitespace" + }, + { + "regexkey": "\\/", + "replace": "", + "description": "Remove /" + }, + { + "regexkey": "\\+", + "replace": "", + "description": "Remove +" + }, + { + "regexkey": "\\$", + "replace": "", + "description": "Remove $" + }, + { + "regexkey": "\\#", + "replace": "", + "description": "Remove #" + } + ], + "rootTopic-DIRECTIONS": "You can override the root topic by renaming _rootTopic to rootTopic", + "_rootTopic": "@bind=(state.equipment.alias).replace(' ','-').replace('/','').toLowerCase();", + "clientId": "@bind='aqualinkd_njsPC_'+ Math.random().toString(16).substr(2, 8);" + } + }, + "subscriptions": [ + { + "topic": "Aux_1", + "description": "State of the Aux_1 circuit on AqualinkD", + "processor": [ "sys.board.circuits.setCircuitStateAsync(2, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_2", + "description": "State of the Aux_2 circuit on AqualinkD", + "processor": [ "sys.board.circuits.setCircuitStateAsync(3, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_3", + "description": "State of the Aux_3 circuit on AqualinkD", + "processor": [ "sys.board.circuits.setCircuitStateAsync(4, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_5", + "description": "State of the Aux_5 circuit on AqualinkD", + "processor": [ "sys.board.circuits.setCircuitStateAsync(5, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_4", + "description": "State of the Aux_4 circuit on AqualinkD", + "processor": [ "sys.board.circuits.setCircuitStateAsync(7, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_6", + "description": "State of the Aux_1 circuit on AqualinkD", + "processor": [ "sys.board.circuits.setCircuitStateAsync(8, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_7", + "description": "State of the Aux_7 circuit on AqualinkD", + "processor": [ "sys.board.circuits.setCircuitStateAsync(9, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_8", + "description": "State of the Aux_8 circuit on AqualinkD", + "processor": [ "sys.board.circuits.setCircuitStateAsync(10, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_B1", + "description": "State of the Aux_B1 feature on AqualinkD", + "processor": [ "sys.board.circuits.setFeatureStateAsync(1, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_B2", + "description": "State of the Aux_B2 feature on AqualinkD", + "processor": [ "sys.board.circuits.setFeatureStateAsync(2, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_B3", + "description": "State of the Aux_B3 feature on AqualinkD", + "processor": [ "sys.board.circuits.setFeatureStateAsync(3, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_B4", + "description": "State of the Aux_B4 feature on AqualinkD", + "processor": [ "sys.board.circuits.setFeatureStateAsync(4, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_B5", + "description": "State of the Aux_B5 feature on AqualinkD", + "processor": [ "sys.board.circuits.setFeatureStateAsync(5, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_B6", + "description": "State of the Aux_B6 feature on AqualinkD", + "processor": [ "sys.board.circuits.setFeatureStateAsync(6, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_B7", + "description": "State of the Aux_B7 feature on AqualinkD", + "processor": [ "sys.board.circuits.setFeatureStateAsync(7, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Aux_B8", + "description": "State of the Aux_B8 feature on AqualinkD", + "processor": [ "sys.board.circuits.setFeatureStateAsync(8, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Filter_Pump", + "description": "State of the Filter_Pump on AqualinkD", + "processor": [ + "if(state.circuits.getItemById(1).isOn === false) ", + " sys.board.circuits.setCircuitStateAsync(6, parseInt(value, 10) === 1);", + "else sys.board.circuits.setCircuitStateAsync(1, parseInt(value, 10) === 1);" + ] + }, + { + "topic": "Spa_Mode", + "description": "The body mode for the controller. If 1 then we are in spa mode otherwise we are in pool mode.", + "processor": [ "sys.board.circuits.setCircuitStateAsync(1, parseInt(value, 10) === 1);" ] + }, + { + "topic": "Temperature/Pool", + "description": "The current temperature emitted by the controller for the pool. However, we only want to set this when the pool circuit is on.", + "processor": [ + "if(!state.circuits.getItemById(6).isOn) state.temps.bodies.getItemById(1).temp = parseFloat(value);", + "else sys.board.setTempsAsync({ waterSensor1: parseFloat(value) });" + ] + }, + { + "topic": "Temperature/Spa", + "description": "The current temperature emitted by the controller for the spa. However, we only want to set this when the spa circuit is on.", + "processor": [ + "if(!state.circuits.getItemById(1).isOn) state.temps.bodies.getItemById(2).temp = parseFloat(value);", + "else sys.board.setTempsAsync({ waterSensor1: parseFloat(value) });" + ] + }, + { + "topic": "Temperature/Air", + "description": "The current temperature emitted by the controller for air temperature.", + "processor": [ "sys.board.setTempsAsync({ air: parseFloat(value) });" ] + }, + { + "topic": "Temperature/Solar", + "description": "The current temperature emitted by the controller for solar.", + "processor": [ "sys.board.setTempsAsync({ solarSensor1: parseFloat(value) });" ] + }, + { + "topic": "Pool_Heater/setpoint", + "description": "The setpoint for the pool body", + "processor": [ "let body = sys.bodies.getItemById(1); sys.board.setHeatSetpointAsync(body, parseInt(value, 10));" ] + }, + { + "topic": "Spa_Heater/setpoint", + "description": "The setpoint for the spa body", + "processor": [ "let body = sys.bodies.getItemById(2); sys.board.setHeatSetpointAsync(body, parseInt(value, 10));" ] + }, + { + "topic": "Filter_Pump/RPM", + "description": "The RPM for the first pump", + "processor": [ "state.pumps.getPumpByAddress(96).rpm = parseInt(value, 10);" ] + }, + { + "topic": "Filter_Pump/Watts", + "description": "The Watts for the first pump", + "processor": [ "state.pumps.getPumpByAddress(96).watts = parseInt(value, 10);" ] + }, + { + "topic": "Filter_Pump/GPM", + "description": "The flow for the first pump", + "processor": [ "state.pumps.getPumpByAddress(96).flow = parseInt(value, 10);" ] + }, + { + "topic": "Pump_1/RPM", + "description": "The RPM for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(97).rpm = parseInt(value, 10);" ] + }, + { + "topic": "Pump_1/Watts", + "description": "The Watts for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(97).watts = parseInt(value, 10);" ] + }, + { + "topic": "Pump_1/GPM", + "description": "The flow for the aux pump", + "processor": [ "state.pumps.getPumpByAddress(97).flow = parseInt(value, 10);" ] + }, + { + "topic": "Pump_2/RPM", + "description": "The RPM for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(98).rpm = parseInt(value, 10);" ] + }, + { + "topic": "Pump_2/Watts", + "description": "The Watts for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(98).watts = parseInt(value, 10);" ] + }, + { + "topic": "Pump_2/GPM", + "description": "The flow for the aux pump", + "processor": [ "state.pumps.getPumpByAddress(98).flow = parseInt(value, 10);" ] + }, + { + "topic": "Pump_3/RPM", + "description": "The RPM for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(99).rpm = parseInt(value, 10);" ] + }, + { + "topic": "Pump_3/Watts", + "description": "The Watts for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(99).watts = parseInt(value, 10);" ] + }, + { + "topic": "Pump_3/GPM", + "description": "The flow for the aux pump", + "processor": [ "state.pumps.getPumpByAddress(99).flow = parseInt(value, 10);" ] + }, + { + "topic": "Pump_4/RPM", + "description": "The RPM for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(100).rpm = parseInt(value, 10);" ] + }, + { + "topic": "Pump_4/Watts", + "description": "The Watts for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(100).watts = parseInt(value, 10);" ] + }, + { + "topic": "Pump_4/GPM", + "description": "The flow for the aux pump", + "processor": [ "state.pumps.getPumpByAddress(100).flow = parseInt(value, 10);" ] + }, + { + "topic": "Pump_5/RPM", + "description": "The RPM for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(101).rpm = parseInt(value, 10);" ] + }, + { + "topic": "Pump_5/Watts", + "description": "The Watts for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(101).watts = parseInt(value, 10);" ] + }, + { + "topic": "Pump_5/GPM", + "description": "The flow for the aux pump", + "processor": [ "state.pumps.getPumpByAddress(101).flow = parseInt(value, 10);" ] + }, + { + "topic": "Pump_6/RPM", + "description": "The RPM for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(102).rpm = parseInt(value, 10);" ] + }, + { + "topic": "Pump_6/Watts", + "description": "The Watts for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(102).watts = parseInt(value, 10);" ] + }, + { + "topic": "Pump_6/GPM", + "description": "The flow for the aux pump", + "processor": [ "state.pumps.getPumpByAddress(102).flow = parseInt(value, 10);" ] + }, + { + "topic": "Pump_7/RPM", + "description": "The RPM for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(103).rpm = parseInt(value, 10);" ] + }, + { + "topic": "Pump_7/Watts", + "description": "The Watts for aux the pump", + "processor": [ "state.pumps.getPumpByAddress(103).watts = parseInt(value, 10);" ] + }, + { + "topic": "Pump_7/GPM", + "description": "The flow for the aux pump", + "processor": [ "state.pumps.getPumpByAddress(103).flow = parseInt(value, 10);" ] + }, + { + "topic": "SWG/PPM", + "description": "The salt level for the chlorinator", + "processor": [ "state.chlorinators.getItemById(1).saltLevel = parseInt(value, 10);" ] + }, + { + "topic": "SWG/fullstatus", + "description": "The current status for the chlorinator", + "processor": [ + "switch(parseInt(value, 10)) {", + "case 8: state.chlorinators.getItemById(1).status = 5;", + "case 9: state.chlorinators.getItemById(1).status = 0;", + "case 16: state.chlorinators.getItemById(1).status = 4;", + "case 32: state.chlorinators.getItemById(1).status = 6;", + "case 64: state.chlorinators.getItemById(1).status = 7;", + "case 128: state.chlorinators.getItemById(1).status = 8;", + "case 255: state.chlorinators.getItemById(1).status = 128;", + "default: state.chlorinators.getItemById(1).status = parseInt(value, 10); }" + ] + }, + { + "topic": "SWG/Percent", + "description": "The current status for the chlorinator", + "processor": [ "sys.chlorinators.getItemById(1).poolSetpoint = state.chlorinators.getItemById(1).poolSetpoint = parseInt(value, 10);" ] + }, + { + "topic": "Freeze_Protect/setpoint", + "description": "The current freeze protection setpoint", + "processor": [ "sys.general.options.freezeThreshold = parseInt(value, 10);" ] + } + ], + "events": [ + { + "name": "config", + "description": "Don't flood the MQTT bus.", + "enabled": false + }, + { + "name": "circuit", + "description": "Populate the circuits topics", + "topics": [ + { + "topic": "Filter_Pump/set", + "message": "@bind=data.isOn ? 1 : 0;", + "description": "Bind the state", + "filter": "@bind=data.id; === 6;" + }, + { + "topic": "Spa_Mode/set", + "message": "@bind=data.isOn ? 1 : 0;", + "description": "Bind the state", + "filter": "@bind=data.id; === 1;" + }, + { + "topic": "Aux_1/set", + "message": "@bind=data.isOn ? 1 : 0;", + "description": "Bind the state", + "filter": "@bind=data.id; === 2;" + }, + { + "topic": "Aux_2/set", + "message": "@bind=data.isOn ? 1 : 0;", + "description": "Bind the state", + "filter": "@bind=data.id; === 3;" + }, + { + "topic": "Aux_3/set", + "message": "@bind=data.isOn ? 1 : 0;", + "description": "Bind the state", + "filter": "@bind=data.id; === 4;" + }, + { + "topic": "Aux_4/set", + "message": "@bind=data.isOn ? 1 : 0;", + "description": "Bind the state", + "filter": "@bind=data.id; === 5;" + }, + { + "topic": "Aux_5/set", + "message": "@bind=data.isOn ? 1 : 0;", + "description": "Bind the state", + "filter": "@bind=data.id; === 7;" + }, + { + "topic": "Aux_6/set", + "message": "@bind=data.isOn ? 1 : 0;", + "description": "Bind the state", + "filter": "@bind=data.id; === 8;" + }, + { + "topic": "Aux_7/set", + "message": "@bind=data.isOn ? 1 : 0;", + "description": "Bind the state", + "filter": "@bind=data.id; === 9;" + }, + { + "topic": "Aux_8/set", + "message": "@bind=data.isOn; ? 1 : 0;", + "description": "Bind the state", + "filter": "@bind=data.id === 10;" + } + ] + }, + { + "name": "feature", + "description": "Populate the features topics", + "topics": [ + { + "topic": "Aux_B@bind=data.id;/set", + "message": "@bind=data.isOn ? 1 : 0;", + "description": "Bind the state" + } + ] + }, + { + "name": "body", + "description": "Populate the body topic", + "filter": "@bind=data.isActive;", + "topics": [ + { + "topic": "Pool_Heater/setpoint/set", + "message": "@bind=data.heatSetpoint;", + "description": "Set the heat setpoint for the pool", + "filter": "@bind=data.id; === 1;" + }, + { + "topic": "Spa_Heater/setpoint/set", + "message": "@bind=data.heatSetpoint;", + "description": "Set the heat setpoint for the spa", + "filter": "@bind=data.id; === 2;" + } + ] + }, + { + "name": "chlorinator", + "description": "Populate the chlorinator topic", + "topics": [ + { + "topic": "SWG/Percent/set", + "message": "@bind=data.superChlor ? 100 : data.disabled ? 0 : data.poolSetpoint;", + "description": "Bind the setpoint topic. Unfortunately aqualink only supports one setpoint." + } + ] + }, + { + "name": "pump", + "description": "Populate the pumps topic. We will need to figure this one out. Perhaps if the RS485 comm port is disabled we don't try to send anything.", + "topics": [] + } + ] +} \ No newline at end of file diff --git a/web/interfaces/mqttInterface.ts b/web/interfaces/mqttInterface.ts index a96921b0..e93b47ed 100644 --- a/web/interfaces/mqttInterface.ts +++ b/web/interfaces/mqttInterface.ts @@ -20,8 +20,8 @@ import * as http from "http"; import * as https from "https"; import extend = require("extend"); import { logger } from "../../logger/Logger"; -import { sys } from "../../controller/Equipment"; -import { state } from "../../controller/State"; +import { PoolSystem, sys } from "../../controller/Equipment"; +import { State, state } from "../../controller/State"; import { InterfaceEvent, BaseInterfaceBindings } from "./baseInterface"; import { sys as sysAlias } from "../../controller/Equipment"; import { state as stateAlias } from "../../controller/State"; @@ -35,8 +35,9 @@ export class MqttInterfaceBindings extends BaseInterfaceBindings { this.subscribed = false; } private client: MqttClient; - private topics: string[] = []; + private topics: MqttTopicSubscription[] = []; declare events: MqttInterfaceEvent[]; + declare subscriptions: MqttTopicSubscription[]; private subscribed: boolean; // subscribed to events or not private sentInitialMessages = false; private init = () => { @@ -55,7 +56,6 @@ export class MqttInterfaceBindings extends BaseInterfaceBindings { url } this.client = connect(url, opts); - this.client.on('connect', () => { try { logger.info(`MQTT connected to ${url}`); @@ -82,7 +82,7 @@ export class MqttInterfaceBindings extends BaseInterfaceBindings { while (this.topics.length > 0) { let topic = this.topics.pop(); if (typeof topic !== 'undefined') { - this.client.unsubscribe(topic, (err, packet) => { + this.client.unsubscribe(topic.topicPath, (err, packet) => { if (err) logger.error(`Error unsubscribing from MQTT topic ${topic}`); else { logger.debug(`Unsubscribed from MQTT topic ${topic}`); @@ -92,47 +92,46 @@ export class MqttInterfaceBindings extends BaseInterfaceBindings { } } catch (err) { logger.error(`Error unsubcribing to MQTT topic: ${err.message}`); } } - private subscribe() { - try { - if (this.topics.length > 0) this.unsubscribe(); - let root = this.rootTopic(); - this.topics.push(`${root}/state/+/setState`, - `${root}/state/+/setstate`, - `${root}/state/+/toggleState`, - `${root}/state/+/togglestate`, - `${root}/state/body/setPoint`, - `${root}/state/body/setpoint`, - `${root}/state/body/heatMode`, - `${root}/state/body/heatmode`, - `${root}/state/+/setTheme`, - `${root}/state/+/settheme`, - `${root}/state/temps`, - `${root}/config/tempSensors`, - `${root}/config/chemController`, - `${root}/state/chemController`, - `${root}/config/chlorinator`, - `${root}/state/chlorinator`); - for (let i = 0; i < this.topics.length; i++) { - let topic = this.topics[i]; - // await new Promise((resolve, reject) => { - this.client.subscribe(topic, (err, granted) => { - if (!err) { - logger.debug(`MQTT subscribed to ${JSON.stringify(granted)}`); - // resolve(); - } - else { - logger.error(`MQTT Subscribe: ${err}`); - // reject(err); - } - }); - // }); - + protected subscribe() { + if (this.topics.length > 0) this.unsubscribe(); + let root = this.rootTopic(); + if (typeof this.subscriptions !== 'undefined') { + for (let i = 0; i < this.subscriptions.length; i++) { + let sub = this.subscriptions[i]; + this.topics.push(new MqttTopicSubscription(root, sub)); } - this.client.on('message', async (topic, msg) => { try { await this.messageHandler(topic, msg) } catch (err) { logger.error(`Error processing MQTT request ${err}.`) }; }) - this.subscribed = true; - } catch (err) { logger.error(`Error subcribing to MQTT topics`); } + } + else { + let arrTopics = [`state/+/setState`, + `state/+/setstate`, + `Pumpstate/+/toggleState`, + `Pumpstate/+/togglestate`, + `Pumpstate/body/setPoint`, + `Pumpstate/body/setpoint`, + `Pumpstate/body/heatMode`, + `Pumpstate/body/heatmode`, + `Pumpstate/+/setTheme`, + `Pumpstate/+/settheme`, + `Pumpstate/temps`, + `Pumpconfig/tempSensors`, + `Pumpconfig/chemController`, + `Pumpstate/chemController`, + `Pumpconfig/chlorinator`, + `Pumpstate/chlorinator`]; + for (let i = 0; i < arrTopics.length; i++) { + this.topics.push(new MqttTopicSubscription(root, { topic: arrTopics[i] })); + } + } + for (let i = 0; i < this.topics.length; i++) { + let topic = this.topics[i]; + this.client.subscribe(topic.topicPath, (err, granted) => { + if (!err) logger.debug(`MQTT subscribed to ${JSON.stringify(granted)}`); + else logger.error(`MQTT Subscribe: ${err}`); + }); + } + this.client.on('message', async (topic, msg) => { try { await this.messageHandler(topic, msg) } catch (err) { logger.error(`Error processing MQTT request ${err}.`) }; }) + this.subscribed = true; } - // this will take in the MQTT Formatter options and format each token that is bound // otherwise, it's the same as the base buildTokens fn. // This could be combined into one fn but for now it's specific to MQTT formatting of topics @@ -178,7 +177,6 @@ export class MqttInterfaceBindings extends BaseInterfaceBindings { } return toks; } - private rootTopic = () => { let toks = {}; let baseOpts = extend(true, { headers: {} }, this.cfg.options, this.context.options); @@ -187,7 +185,6 @@ export class MqttInterfaceBindings extends BaseInterfaceBindings { topic = this.replaceTokens(baseOpts.rootTopic, toks); return topic; } - public bindEvent(evt: string, ...data: any) { try { if (!this.sentInitialMessages && evt === 'controller' && data[0].status.val === 1) { @@ -286,6 +283,16 @@ export class MqttInterfaceBindings extends BaseInterfaceBindings { try { let msg = message.toString(); if (msg[0] === '{') msg = JSON.parse(msg); + + let sub: MqttTopicSubscription = this.topics.find(elem => topic === elem.topicPath); + if (typeof sub === 'undefined') return; + // Alright so now lets process our results. + if (typeof sub.processor === 'function') { + let value = msg; + sub.processor(sub, sys, state, value); + state.emitEquipmentChanges(); + return; + } const topics = topic.split('/'); logger.debug(`MQTT: Inbound ${topic}: ${message.toString()}`); if (topic.startsWith(this.rootTopic() + '/') && typeof msg === 'object') { @@ -452,11 +459,34 @@ export class MqttInterfaceBindings extends BaseInterfaceBindings { } } } - class MqttInterfaceEvent extends InterfaceEvent { public topics: IMQTT[] } - +class MqttSubscriptions { + public subscriptions: IMQTTSubscription[] +} +class MqttTopicSubscription { + root: string; + topic: string; + processor: (sub: MqttTopicSubscription, sys: PoolSystem, state: State, value: any) => void; + constructor(root: string, sub: any) { + this.root = sub.root || root; + this.topic = sub.topic; + if (typeof sub.processor !== 'undefined') { + let fnBody = Array.isArray(sub.processor) ? sub.processor.join('\n') : sub.processor; + try { + this.processor = new Function('sub', 'sys', 'state', 'value', fnBody) as (sub: MqttTopicSubscription, sys: PoolSystem, state: State, value: any) => void; + } catch (err) { logger.error(`Error compiling subscription processor: ${err} -- ${fnBody}`); } + } + } + public get topicPath(): string { return `${this.root}/${this.topic}` }; + +} +export interface IMQTTSubscription { + topic: string, + description: string, + processor?:string +} export interface IMQTT { topic: string; message: string;