Skip to content

Commit

Permalink
pull master updates
Browse files Browse the repository at this point in the history
  • Loading branch information
tagyoureit committed Aug 6, 2022
1 parent 85d5fcb commit cb43ee8
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 422 deletions.
1 change: 0 additions & 1 deletion app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ export async function stopPacketCaptureAsync() {
export async function stopAsync(): Promise<void> {
try {
console.log('Shutting down open processes');
// await sys.board.virtualPumpControllers.stopAsync();
await webApp.stopAutoBackup();
await sys.stopAsync();
await state.stopAsync();
Expand Down
110 changes: 64 additions & 46 deletions controller/comms/Comms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { EventEmitter } from 'events';
import * as SerialPort from 'serialport';
import * as MockBinding from '@serialport/binding-mock';
import { SerialPort, SerialPortMock, SerialPortOpenOptions } from 'serialport'
import { AutoDetectTypes } from '@serialport/bindings-cpp';
import { config } from '../../config/Config';
import { logger } from '../../logger/Logger';
import * as net from 'net';
import { setTimeout, setInterval } from 'timers';
import { setTimeout } from 'timers';
import { Message, Outbound, Inbound, Response } from './messages/Messages';
import { InvalidEquipmentDataError, InvalidOperationError, MessageError, OutboundMessageError } from '../Errors';
import { InvalidEquipmentDataError, InvalidOperationError, OutboundMessageError } from '../Errors';
import { utils } from "../Constants";
import { sys } from "../Equipment";
import { webApp } from "../../web/Server";
import { messagesMock } from './messages/mock/MessagesMock'
const extend = require("extend");
export class Connection {
constructor() { }
Expand Down Expand Up @@ -87,6 +88,7 @@ export class Connection {
pdata.netConnect = typeof data.netConnect !== 'undefined' ? utils.makeBool(data.netConnect) : utils.makeBool(pdata.netConnect);
pdata.rs485Port = typeof data.rs485Port !== 'undefined' ? data.rs485Port : pdata.rs485Port;
pdata.inactivityRetry = typeof data.inactivityRetry === 'number' ? data.inactivityRetry : pdata.inactivityRetry;
pdata.mockPort = typeof data.mockPort !== 'undefined' ? utils.makeBool(data.mockPort) : utils.makeBool(pdata.mockPort);
if (pdata.netConnect) {
pdata.netHost = typeof data.netHost !== 'undefined' ? data.netHost : pdata.netHost;
pdata.netPort = typeof data.netPort === 'number' ? data.netPort : pdata.netPort;
Expand Down Expand Up @@ -262,7 +264,7 @@ export class RS485Port {
public isOpen: boolean = false;
public closing: boolean = false;
private _cfg: any;
private _port: any;
private _port: SerialPort | SerialPortMock | net.Socket;
public mockPort: boolean = false;
private isPaused: boolean = false;
private connTimer: NodeJS.Timeout;
Expand All @@ -282,7 +284,7 @@ export class RS485Port {
if (this.isOpen) await this.closeAsync();
if (typeof cfg !== 'undefined') this._cfg = cfg;
if (!this._cfg.enabled) return true;
if (this._cfg.netConnect && !this._cfg.mockPort) {
if (this._cfg.netConnect) {
let sock: net.Socket = this._port as net.Socket;
if (typeof this._port !== 'undefined' && this.isOpen) {
// This used to try to reconnect and recreate events even though the socket was already connected. This resulted in
Expand Down Expand Up @@ -384,23 +386,26 @@ export class RS485Port {
});
}
else {
if (typeof this._port !== 'undefined' && this._port.isOpen) {
if (typeof this._port !== 'undefined' && this.isOpen) {
// This used to try to reconnect even though the serial port was already connected. This resulted in
// instances where an access denied error was emitted. So if the port is open we will simply return.
this.resetConnTimer();
return true;
}
let sp: SerialPort = null;
let sp: SerialPort | SerialPortMock = null;
if (this._cfg.mockPort) {
this.mockPort = true;
SerialPort.Binding = MockBinding;
let portPath = 'FAKE_PORT';
MockBinding.createPort(portPath, { echo: false, record: true });
sp = new SerialPort(portPath, { autoOpen: false });
let portPath = 'MOCK_PORT';
SerialPortMock.binding.createPort(portPath)
// SerialPortMock.binding = SerialPortMock;
// SerialPortMock.createPort(portPath, { echo: false, record: true });
let opts: SerialPortOpenOptions<AutoDetectTypes> = { path: portPath, autoOpen: false, baudRate: 9600 };
sp = new SerialPortMock(opts);
}
else {
this.mockPort = false;
sp = new SerialPort(this._cfg.rs485Port, this._cfg.portSettings);
let opts: SerialPortOpenOptions<AutoDetectTypes> = extend(true, { path: this._cfg.rs485Port }, this._cfg.portSettings);
sp = new SerialPort(opts);
}
return await new Promise<boolean>((resolve, _) => {
// The serial port open method calls the callback just once. Unfortunately that is not the case for
Expand All @@ -421,20 +426,23 @@ export class RS485Port {
// for a successul connect and false otherwise.
sp.on('open', () => {
if (typeof this._port !== 'undefined') logger.info(`Serial Port ${this.portId}: ${this._cfg.rs485Port} recovered from lost connection.`)
else logger.info(`Serial port: ${this._cfg.rs485Port} request to open successful ${this._cfg.portSettings.baudRate}b ${this._cfg.portSettings.dataBits}-${this._cfg.portSettings.parity}-${this._cfg.portSettings.stopBits}`);
else logger.info(`Serial port: ${sp.path} request to open successful ${sp.baudRate}b ${sp.port.openOptions.dataBits}-${sp.port.openOptions.parity}-${sp.port.openOptions.stopBits}`);
this._port = sp;
this.isOpen = true;
/// if just changing existing port, reset key flags
this.isRTS = true;
this.closing = false;
this._processing = false;
sp.on('data', (data) => { if (!this.mockPort && !this.isPaused) this.resetConnTimer(); this.pushIn(data); });
sp.on('data', (data) => {
if (!this.mockPort && !this.isPaused) this.resetConnTimer();
this.pushIn(data);
});
this.resetConnTimer();
this.emitPortStats();
});
sp.on('close', (err) => {
this.isOpen = false;
if (typeof err !== 'undefined' && err !== null && err.disconnected) {
if (err && err.disconnected) {
logger.info(`Serial Port ${this.portId} - ${this._cfg.rs485Port} has been disconnected and closed. ${JSON.stringify(err)}`)
}
else {
Expand All @@ -444,7 +452,7 @@ export class RS485Port {
sp.on('error', (err) => {
// an underlying streams error from a SP write may call the error event
// instead/in leiu of the error callback
if (typeof sp.writeTimer !== 'undefined') { clearTimeout(sp.writeTimer); sp.writeTimer = null; }
if (typeof this.writeTimer !== 'undefined') { clearTimeout(this.writeTimer); this.writeTimer = null; }
this.isOpen = false;
if (sp.isOpen) sp.close((err) => { }); // call this with the error callback so that it doesn't emit to the error again.
this.resetConnTimer();
Expand Down Expand Up @@ -494,7 +502,7 @@ export class RS485Port {
// just call destroy and forcibly close it.
this._port.destroy();
}
else if (typeof this._port.close === 'function') {
else if (!(this._port instanceof net.Socket) && typeof this._port.close === 'function') {
this._port.close((err) => {
if (err) {
logger.error(`Error closing ${this.portId} serial port ${this._cfg.rs485Port}: ${err}`);
Expand Down Expand Up @@ -542,52 +550,62 @@ export class RS485Port {
}, this._cfg.inactivityRetry * 1000);
}
// Data management functions
public drain(cb: Function) {
public drain(cb: (err?: Error) => void) {
if (typeof this._port === 'undefined') {
logger.debug(`Serial Port ${this.portId}: Cannot perform drain function on port that is not open.`);
cb();
}
if (typeof (this._port.drain) === 'function')
this._port.drain(cb);
if ((this._port instanceof SerialPort || this._port instanceof SerialPortMock) && typeof (this._port.drain) === 'function')
this._port.drain(cb as (err) => void);
else // Call the method immediately as the port doesn't wait to send.
cb();
}
public write(bytes: Buffer, cb: Function) {
public write(msg: Outbound, cb: (err?: Error) => void) {
let bytes = Buffer.from(msg.toPacket());
let _cb = cb;
if (this._cfg.netConnect) {
// SOCAT drops the connection and destroys the stream. Could be weeks or as little as a day.
if (typeof this._port === 'undefined' || this._port.destroyed !== false) {
this.openAsync().then(() => {
this._port.write(bytes, 'binary', cb);
(this._port as net.Socket).write(bytes, 'binary', cb);
});
}
else
this._port.write(bytes, 'binary', cb);
(this._port as net.Socket).write(bytes, 'binary', cb);
}
else {
this.writeTimer = setTimeout(() => {
// RSG - I ran into a scenario where the underlying stream
// processor was not retuning the CB and comms would
// completely stop. This timeout is a failsafe.
// Further, the underlying stream may throw an event error
// and not call the callback (per node docs) hence the
// public writeTimer.
if (typeof cb === 'function') {
cb = undefined;
_cb(new Error(`Serialport stream has not called the callback in 3s.`));
}
}, 3000);
this._port.write(bytes, (err) => {
if (typeof this.writeTimer !== 'undefined') {
clearTimeout(this.writeTimer);
this.writeTimer = null;
// resolve();
if (this._port instanceof SerialPortMock && this.mockPort === true) {
let m = messagesMock.process(msg);
this._port.port.emitData(Buffer.from(m));
cb();
}
else {

this.writeTimer = setTimeout(() => {
// RSG - I ran into a scenario where the underlying stream
// processor was not retuning the CB and comms would
// completely stop. This timeout is a failsafe.
// Further, the underlying stream may throw an event error
// and not call the callback (per node docs) hence the
// public writeTimer.
if (typeof cb === 'function') {
cb = undefined;
_cb(err);
_cb(new Error(`Serialport stream has not called the callback in 3s.`));
}
}
});
}, 3000);
this._port.write(bytes, (err) => {
if (typeof this.writeTimer !== 'undefined') {
clearTimeout(this.writeTimer);
this.writeTimer = null;
// resolve();
if (typeof cb === 'function') {
cb = undefined;
_cb(err);
}
}
});
}

}
}
private pushIn(pkt) { this._inBuffer.push.apply(this._inBuffer, pkt.toJSON().data); if (sys.isReady) setImmediate(() => { this.processPackets(); }); }
Expand Down Expand Up @@ -690,7 +708,7 @@ export class RS485Port {
// write only verifies that the buffer got ahold of it.
try {
let self = this;
if (!this.isRTS || this.mockPort || this.closing) return;
if (!this.isRTS || this.closing) return;
var bytes = msg.toPacket();
if (this.isOpen) {
this.isRTS = false; // only set if port is open, otherwise it won't be set back to true
Expand All @@ -715,7 +733,7 @@ export class RS485Port {
this.counter.bytesSent += bytes.length;
msg.timestamp = new Date();
logger.packet(msg);
this.write(Buffer.from(bytes), (err) => {
this.write(msg, (err) => {
clearTimeout(this.writeTimer);
this.writeTimer = null;
msg.tries++;
Expand Down
1 change: 0 additions & 1 deletion controller/comms/messages/Messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import { TouchScheduleCommands } from "controller/boards/EasyTouchBoard";
import { IntelliValveStateMessage } from "./status/IntelliValveStateMessage";
import { IntelliChemStateMessage } from "./status/IntelliChemStateMessage";
import { OutboundMessageError } from "../../Errors";
import { prototype } from "events";
import extend = require("extend");
export enum Direction {
In = 'in',
Expand Down
50 changes: 50 additions & 0 deletions controller/comms/messages/mock/MessagesMock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { sys } from "../../../../controller/Equipment";
import { logger } from "../../../../logger/Logger";
import { ControllerType } from "../../../../controller/Constants";
import { Outbound, Protocol } from "../Messages";
import { mockPump } from "./status/MockPump";


export class MessagesMock {
constructor() { }

public process(outboundMsg: Outbound) {


return this.generateInbound(outboundMsg);

}

private generateInbound(outboundMsg: Outbound): number[] {
switch (outboundMsg.protocol) {
/* case Protocol.Broadcast:
outboundMsg.processBroadcast();
break;
case Protocol.IntelliValve:
IntelliValveStateMessage.process(outboundMsg);
break;
case Protocol.IntelliChem:
IntelliChemStateMessage.process(outboundMsg);
break; */
case Protocol.Pump:
if ((outboundMsg.source >= 96 && outboundMsg.source <= 111) || (outboundMsg.dest >= 96 && outboundMsg.dest <= 111))
return mockPump.convertOutbound(outboundMsg);
// else
// outboundMsg.processBroadcast();
break;
/* case Protocol.Heater:
HeaterStateMessage.process(outboundMsg);
break;
case Protocol.Chlorinator:
ChlorinatorStateMessage.process(outboundMsg);
break;
case Protocol.Hayward:
PumpStateMessage.processHayward(outboundMsg);
break; */
default:
logger.debug(`Unprocessed Message ${outboundMsg.toPacket()}`)
break;
}
}
}
export var messagesMock = new MessagesMock();
82 changes: 82 additions & 0 deletions controller/comms/messages/mock/status/MockChlorinator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { sys } from "../../../../Equipment";
import { PumpState, state } from "../../../../State";
import { Outbound } from "../../Messages";

export class MockPump {
constructor(){}

public convertOutbound(outboundMsg: Outbound){
let response: Outbound = Outbound.create({
portId: outboundMsg.portId,
protocol: outboundMsg.protocol
});

switch (outboundMsg.action){
case 7:
return this.pumpStatus(outboundMsg, response);
default:
return this.pumpAck(outboundMsg, response);
}
}

public pumpStatus(outboundMsg: Outbound, response: Outbound){
let pState:PumpState = state.pumps.getItemById(outboundMsg.dest - 96);
let pt = sys.board.valueMaps.pumpTypes.get(pState.type);
response.action = 7;
response.source = outboundMsg.dest;
response.dest = outboundMsg.source;
response.setPayloadBytes(0, 15);
response.setPayloadByte(0, pState.command, 2);
response.setPayloadByte(1, pState.mode, 0);
response.setPayloadByte(2, pState.driveState, 2);
let watts = 0;
if (Math.max(pState.rpm, pState.flow) > 0){
if (pState.rpm > 0) watts = pState.rpm/pt.maxSpeed * 2000 + this.random(100);
else if (pState.flow > 0) watts = pState.flow/pt.maxFlow * 2000 + this.random(100);
else //ss, ds, etc
watts = 2000 + this.random(250);
}
response.setPayloadByte(3, Math.floor(watts / 256), 0);
response.setPayloadByte(4, watts % 256, 0);
response.setPayloadByte(5, Math.floor(pState.rpm / 256), 0);
response.setPayloadByte(6, pState.rpm % 256, 0);
response.setPayloadByte(7, pState.flow, 0);
response.setPayloadByte(8, pState.ppc, 0);
// 9, 10 = unknown
// 11, 12 = Status code;
response.setPayloadByte(11, Math.floor(pState.status / 256), 0);
response.setPayloadByte(12, pState.status % 256, 1);
let time = new Date();
response.setPayloadByte(13, time.getHours() * 60);
response.setPayloadByte(14, time.getMinutes());

return response.toPacket()
}

public pumpAck(outboundMsg: Outbound, response: Outbound){
response.action = outboundMsg.action;
response.source = outboundMsg.dest;
response.dest = outboundMsg.source;
switch (outboundMsg.action){
case 10: {
response.setPayloadByte(0, outboundMsg.payload[2]);
response.setPayloadByte(1, outboundMsg.payload[3]);
break;
}
default:
response.setPayloadByte(0, outboundMsg.payload[0]);
}
return response.toPacket();
}

private random(bounds: number, onlyPositive: boolean = false){
let rand = Math.random() * bounds;
if (!onlyPositive) {
if (Math.random()<=.5) rand = rand * -1;
}
return rand;
}

}

export var mockPump: MockPump = new MockPump();
Loading

0 comments on commit cb43ee8

Please sign in to comment.