diff --git a/app.ts b/app.ts index f8eb1f05..d99e631a 100755 --- a/app.ts +++ b/app.ts @@ -30,7 +30,7 @@ export async function initAsync() { try { await config.init(); await logger.init(); - await conn.init(); + await conn.initAsync(); await sys.init(); await state.init(); await webApp.init(); diff --git a/config/Config.ts b/config/Config.ts index 10292065..821f53f1 100755 --- a/config/Config.ts +++ b/config/Config.ts @@ -86,6 +86,20 @@ class Config { } } + public removeSection(section: string) { + let c = this._cfg; + if (section.indexOf('.') !== -1) { + let arr = section.split('.'); + for (let i = 0; i < arr.length - 1; i++) { + if (typeof c[arr[i]] === 'undefined') + c[arr[i]] = {}; + c = c[arr[i]]; + } + section = arr[arr.length - 1]; + } + if(typeof c[section] !== 'undefined') delete c[section]; + this.update(); + } public setSection(section: string, val) { let c = this._cfg; if (section.indexOf('.') !== -1) { diff --git a/controller/Equipment.ts b/controller/Equipment.ts index 9c2250ab..18f05ffa 100644 --- a/controller/Equipment.ts +++ b/controller/Equipment.ts @@ -70,6 +70,7 @@ interface IPoolSystem { export class PoolSystem implements IPoolSystem { public _hasChanged: boolean = false; + public isReady: boolean = false; constructor() { this.cfgPath = path.posix.join(process.cwd(), '/data/poolConfig.json'); } @@ -142,6 +143,7 @@ export class PoolSystem implements IPoolSystem { } else this.initNixieController(); + this.isReady = true; } public init() { let cfg = this.loadConfigFile(this.cfgPath, {}); @@ -202,7 +204,7 @@ export class PoolSystem implements IPoolSystem { } public resetSystem() { - conn.pause(); + conn.pauseAll(); this.resetData(); state.resetData(); this.data.controllerType === 'unknown'; @@ -210,7 +212,7 @@ export class PoolSystem implements IPoolSystem { this.controllerType = ControllerType.Unknown; state.status = 0; this.board = BoardFactory.fromControllerType(ControllerType.Unknown, this); - setTimeout(function () { state.status = 0; conn.resume(); }, 0); + setTimeout(function () { state.status = 0; conn.resumeAll(); }, 0); } public get controllerType(): ControllerType { return this.data.controllerType as ControllerType; } public set controllerType(val: ControllerType) { diff --git a/controller/boards/AquaLinkBoard.ts b/controller/boards/AquaLinkBoard.ts index 89cfc6fb..661bb62c 100644 --- a/controller/boards/AquaLinkBoard.ts +++ b/controller/boards/AquaLinkBoard.ts @@ -63,11 +63,7 @@ class AquaLinkConfigQueue extends ConfigQueue { protected queueItems(cat: number, items: number[] = [0]) { } public queueChanges() { this.reset(); - if (conn.mockPort) { - logger.info(`Skipping configuration request from OCP because MockPort enabled.`); - } else { - logger.info(`Requesting ${sys.controllerType} configuration`); - } + logger.info(`Requesting ${sys.controllerType} configuration`); if (this.remainingItems > 0) { var self = this; setTimeout(() => { self.processNext(); }, 50); diff --git a/controller/boards/IntelliCenterBoard.ts b/controller/boards/IntelliCenterBoard.ts index 0a50fb99..2c5dd8ad 100644 --- a/controller/boards/IntelliCenterBoard.ts +++ b/controller/boards/IntelliCenterBoard.ts @@ -669,14 +669,14 @@ class IntelliCenterConfigQueue extends ConfigQueue { response: Response.create({ dest:-1, action: 30, payload: [this.curr.category, itm], callback: () => { self.processNext(out); } }) }); logger.verbose(`Requesting config for: ${ConfigCategories[this.curr.category]} - Item: ${itm}`); - setTimeout(conn.queueSendMessage, 50, out); + setTimeout(() => { conn.queueSendMessage(out) }, 50); } else { // Now that we are done check the configuration a final time. If we have anything outstanding // it will get picked up. state.status = 1; this.curr = null; this._processing = false; - if (this._failed) setTimeout(function () { sys.checkConfiguration(); }, 100); + if (this._failed) setTimeout(() => { sys.checkConfiguration(); }, 100); logger.info(`Configuration Complete`); sys.board.heaters.updateHeaterServices(); state.cleanupState(); @@ -815,7 +815,7 @@ class IntelliCenterConfigQueue extends ConfigQueue { } this.maybeQueueItems(curr.systemState, ver.systemState, ConfigCategories.systemState, [0]); logger.info(`Queued ${this.remainingItems} configuration items`); - if (this.remainingItems > 0) setTimeout(function () { self.processNext(); }, 50); + if (this.remainingItems > 0) setTimeout(() => { self.processNext(); }, 50); else { this._processing = false; if (this._newRequest) { diff --git a/controller/comms/Comms.ts b/controller/comms/Comms.ts index 3f4d41f2..6a17ce82 100755 --- a/controller/comms/Comms.ts +++ b/controller/comms/Comms.ts @@ -22,39 +22,63 @@ import { logger } from '../../logger/Logger'; import * as net from 'net'; import { setTimeout, setInterval } from 'timers'; import { Message, Outbound, Inbound, Response } from './messages/Messages'; -import { InvalidOperationError, MessageError, OutboundMessageError } from '../Errors'; +import { InvalidEquipmentDataError, InvalidOperationError, MessageError, OutboundMessageError } from '../Errors'; import { utils } from "../Constants"; +import { sys } from "../Equipment"; import { webApp } from "../../web/Server"; const extend = require("extend"); export class Connection { - constructor() { - this.emitter = new EventEmitter(); + constructor() {} + public rs485Ports: RS485Port[] = []; + public get mockPort(): boolean { + let port = this.findPortById(0); + return typeof port !== 'undefined' && port.mockPort ? true : false; } - public isOpen: boolean = false; - private _closing: boolean = false; - private _cfg: any; - private _port: any; - public mockPort: boolean = false; - private isPaused: boolean = false; - public buffer: SendRecieveBuffer; - private connTimer: NodeJS.Timeout; - protected resetConnTimer(...args) { - //console.log(`resetting connection timer`); - if (conn.connTimer !== null) clearTimeout(conn.connTimer); - if (!conn._cfg.mockPort && conn._cfg.inactivityRetry > 0 && !conn._closing) conn.connTimer = setTimeout(async () => { - try { - await conn.openAsync(); - } - catch (err) { logger.error(`Error resetting RS485 port on inactivity: ${err.message}`); }; - }, conn._cfg.inactivityRetry * 1000); + public async deleteAuxPort(data: any): Promise { + try { + let portId = parseInt(data.portId, 10); + if (isNaN(portId)) return Promise.reject(new InvalidEquipmentDataError(`A valid port id was not provided to be deleted`, 'RS485Port', data.id)); + if (portId === 0) return Promise.reject(new InvalidEquipmentDataError(`You may not delete the primart RS485 Port`, 'RS485Port', data.id)); + let port = this.findPortById(portId); + this.removePortById(portId); + let section = `controller.comms` + (portId === 0 ? '' : portId); + let cfg = config.getSection(section, {}); + config.removeSection(section); + return cfg; + } catch (err) { logger.error(`Error deleting aux port`) } } - public isRTS: boolean = true; - public emitter: EventEmitter; - public get enabled(): boolean { return typeof this._cfg !== 'undefined' && this._cfg.enabled; } - public async setPortAsync(data: any) : Promise { + public async setPortAsync(data: any): Promise { try { + let ccfg = config.getSection('controller'); + let pConfig; + let portId; + let maxId = -1; + for (let sec in ccfg) { + if (sec.startsWith('comms')) { + let p = ccfg[sec]; + maxId = Math.max(p.portId, maxId); + if (p.portId === data.portId) pConfig = p; + } + } + if (typeof pConfig === 'undefined') { + // We are adding a new one. + if (data.portId === -1 || typeof data.portId === 'undefined') portId = maxId + 1; + else portId = data.portId; + } + else portId = pConfig.portId; + if (isNaN(portId) || portId < 0) return Promise.reject(new InvalidEquipmentDataError(`Invalid port id defined ${portId}`, 'RS485Port', data.portId)); + let section = `controller.comms` + (portId === 0 ? '' : portId); // Lets set the config data. - let pdata = config.getSection('controller.comms', {}); + let pdata = config.getSection(section, { + portId: portId, + rs485Port: "/dev/ttyUSB0", + portSettings: { baudRate: 9600, dataBits: 8, parity: 'none', stopBits: 1, flowControl: false, autoOpen: false, lock: false }, + mockPort: false, + netConnect: false, + netHost: "raspberrypi", + netPort: 9801, + inactivityRetry: 10 + }); pdata.enabled = typeof data.enabled !== 'undefined' ? utils.makeBool(data.enabled) : utils.makeBool(pdata.enabled); pdata.netConnect = typeof data.netConnect !== 'undefined' ? utils.makeBool(data.netConnect) : utils.makeBool(pdata.netConnect); pdata.rs485Port = typeof data.rs485Port !== 'undefined' ? data.rs485Port : pdata.rs485Port; @@ -63,11 +87,17 @@ export class Connection { pdata.netHost = typeof data.netHost !== 'undefined' ? data.netHost : pdata.netHost; pdata.netPort = typeof data.netPort === 'number' ? data.netPort : pdata.netPort; } - if (!await this.closeAsync()) { - return Promise.reject(new InvalidOperationError(`Unable to close the current RS485 port`, 'setPortAsync')); + if (typeof data.portSettings !== 'undefined') { + pdata.portSettings = extend(true, { baudRate: 9600, dataBits: 8, parity: 'none', stopBits: 1, flowControl: false, autoOpen: false, lock: false }, pdata.portSettings, data.portSettings); } - config.setSection('controller.comms', pdata); - this._cfg = config.getSection('controller.comms', { + let existing = this.findPortById(portId); + if (typeof existing !== 'undefined') { + if (!await existing.closeAsync()) { + return Promise.reject(new InvalidOperationError(`Unable to close the current RS485 port`, 'setPortAsync')); + } + } + config.setSection(section, pdata); + let cfg = config.getSection(section, { rs485Port: "/dev/ttyUSB0", portSettings: { baudRate: 9600, dataBits: 8, parity: 'none', stopBits: 1, flowControl: false, autoOpen: false, lock: false }, mockPort: false, @@ -76,165 +106,180 @@ export class Connection { netPort: 9801, inactivityRetry: 10 }); - if (!await this.openAsync()) { - return Promise.reject(new InvalidOperationError(`Unable to open RS485 port ${pdata.rs485Port}`, 'setPortAsync')); + existing = this.getPortById(cfg); + if (typeof existing !== 'undefined') { + existing.reconnects = 0; + if (!await existing.openAsync(cfg)) { + return Promise.reject(new InvalidOperationError(`Unable to open RS485 port ${pdata.rs485Port}`, 'setPortAsync')); + } } - return this._cfg; + return cfg; } catch (err) { return Promise.reject(err); } } - // RKS: 12-18-21 This is a bullpen method to work through the inconsistencies in the socket implementation for node. There - // are issues related to the event listeners and the construction of a socket. We want an implementation that awaits until the socket - // is completely open before continuing. - // We also need to be able to destroy the port without it restarting on its own when tearing the port down or deliberately closing it. This means - // that the listeners need to be removed before closing via method but re-open the port when the close is hit prematurely. - protected async openNetSerialPort(): Promise { + public async stopAsync() { try { - let opts: net.NetConnectOpts = { - host: this._cfg.netHost, - port: this._cfg.netPort - }; - if (typeof this.connTimer !== 'undefined' && this.connTimer) clearTimeout(this.connTimer); - this.connTimer = null; - let ret = await new Promise((resolve, _) => { - let nc = net.createConnection(opts, () => { - nc.on('connect', () => { logger.info(`Net connect (socat) connected to: ${this._cfg.netHost}:${this._cfg.netPort}`); }); // Socket is opened but not yet ready. - nc.on('ready', () => { - this.isOpen = true; - this.isRTS = true; - logger.info(`Net connect (socat) ready and communicating: ${this._cfg.netHost}:${this._cfg.netPort}`); - nc.on('data', (data) => { - if (data.length > 0 && !this.isPaused) this.emitter.emit('packetread', data); - }); - this._port = nc; - // After the port is fully opened, set an inactivity timeout to restart it when it stops communicating. - nc.setTimeout(Math.max(this._cfg.inactivityRetry, 10) * 1000, async () => { - logger.warn(`Net connect (socat) connection idle: ${this._cfg.netHost}:${this._cfg.netPort} retrying connection.`); - try { - await conn.endAsync(); - await conn.openAsync(); - } catch (err) { logger.error(`Net connect (socat) error retrying connection ${err.message}`); } - }); - resolve(true); - }); - nc.on('close', (hadError: boolean) => { - this.isOpen = false; - if (typeof this._port !== 'undefined') { - this._port.destroy(); - this._port.removeAllListeners(); - } - this._port = undefined; - this.buffer.clearOutbound(); - logger.info(`Net connect (socat) closed ${hadError === true ? 'due to error' : ''}: ${this._cfg.netHost}:${this._cfg.netPort}`); - if (!this._closing) { - // If we are closing manually this event should have been cleared already and should never be called. If this is fired out - // of sequence then we will check the closing flag to ensure we are not forcibly closing the socket. - if (typeof this.connTimer !== 'undefined' && this.connTimer) { - clearTimeout(this.connTimer); - this.connTimer = null; - } - this.connTimer = setTimeout(async () => { - try { - // We are already closed so give some inactivity retry and try again. - await conn.openAsync(); - } catch (err) { } - }, this._cfg.inactivityRetry * 1000); - } - }); - nc.on('end', () => { // Happens when the other end of the socket closes. - this.isOpen = false; - logger.info(`Net connect (socat) end event was fired`); - }); - }); - nc.once('error', (err) => { - // if the promise has already been fulfilled, but the error happens later, we don't want to call the promise again. - if (this._cfg.inactivityRetry > 0) { - logger.error(`Net connect (socat) connection error: ${err}. Retry in ${this._cfg.inactivityRetry} seconds`); - this.connTimer = setTimeout(async () => { - try { - await conn.closeAsync(); - await conn.openAsync(); - } catch (err) { } - }, this._cfg.inactivityRetry * 1000); - } - else logger.error(`Net connect (socat) connection error: ${err}. Never retrying -- No retry time set`); - resolve(false); - }); - }); - return ret; - } catch (err) { logger.error(`Error opening net serial port. ${this._cfg.netHost}:${this._cfg.netPort}`); } + for (let i = this.rs485Ports.length - 1; i >= 0; i--) { + let port = this.rs485Ports[i]; + await port.closeAsync(); + } + logger.info(`Closed all serial communications connection.`); + } catch (err) { logger.error(`Error closing comms connection: ${err.message} `); } } - protected async closeNetSerialPort(): Promise { + public async initAsync() { try { - this._closing = true; - if (this.connTimer) clearTimeout(this.connTimer); - this.connTimer = null; - if (typeof this._port !== 'undefined' && this.isOpen) { - let success = await new Promise((resolve, reject) => { - if (this._cfg.netConnect) { - this._port.removeAllListeners(); - this._port.once('error', (err) => { - if (err) { - logger.error(`Net connect (socat) error closing ${this._cfg.netHost}:${this._cfg.netPort}: ${err}`); - resolve(false); - } - else { - conn._port = undefined; - this.isOpen = false; - logger.info(`Successfully closed (socat) port ${this._cfg.netHost}:${this._cfg.netPort}`); - resolve(true); - } - }); - this._port.once('close', (p) => { - this.isOpen = false; - this._port = undefined; - logger.info(`Net connect (socat) successfully closed: ${this._cfg.netHost}:${this._cfg.netPort}`); - resolve(true); - }); - this._port.destroy(); - } - else { - resolve(true); - conn._port = undefined; - } - }); - if (success) { - if (typeof conn.buffer !== 'undefined') conn.buffer.close(); + // So now that we are now allowing multiple comm ports we need to initialize each one. We are keeping the comms section from the config.json + // simply because I have no idea what the Docker folks do with this. So the default comms will be the one with an OCP or if there are no aux ports. + let cfg = config.getSection('controller'); + for (let section in cfg) { + if (section.startsWith('comms')) { + let port = new RS485Port(cfg[section]); + this.rs485Ports.push(port); + await port.openAsync(); } - return success; } - return true; - } catch (err) { logger.error(`Error closing comms connection: ${err.message}`); return Promise.resolve(false); } - + } catch (err) { logger.error(`Error initializing RS485 ports ${err.message}`); } + } + public findPortById(portId?: number): RS485Port { return this.rs485Ports.find(elem => elem.portId === (portId || 0)); } + public async removePortById(portId: number) { + for (let i = this.rs485Ports.length - 1; i >= 0; i--) { + let port = this.rs485Ports[i]; + if (port.portId === portId) { + await port.closeAsync(); + // Don't remove the primary port. You cannot delete this one. + if(portId !== 0) this.rs485Ports.splice(i, 1); + } + } } - public async openAsync(): Promise { - if (typeof this.buffer === 'undefined') { - this.buffer = new SendRecieveBuffer(); - this.emitter.on('packetread', (pkt) => { this.buffer.pushIn(pkt); }); - this.emitter.on('messagewrite', (msg) => { this.buffer.pushOut(msg); }); + public getPortById(cfg: any) { + let port = this.findPortById(cfg.portId || 0); + if (typeof port === 'undefined') { + port = new RS485Port(cfg); + this.rs485Ports.push(port); } - if (!this._cfg.enabled) return; + return port; + } + public queueSendMessage(msg: Outbound) { + let port = this.findPortById(msg.portId); + if (typeof port !== 'undefined') + port.emitter.emit('messagewrite', msg); + else + logger.error(`queueSendMessage: Message was targeted for undefined port ${msg.portId || 0}`); + } + public pauseAll() { + for (let i = 0; i < this.rs485Ports.length; i++) { + let port = this.rs485Ports[i]; + port.pause(); + } + } + public resumeAll() { + for (let i = 0; i < this.rs485Ports.length; i++) { + let port = this.rs485Ports[i]; + port.resume(); + } + } +} +export class Counter { + constructor() { + this.bytesReceived = 0; + this.recSuccess = 0; + this.recFailed = 0; + this.recCollisions = 0; + this.bytesSent = 0; + this.sndAborted = 0; + this.sndRetries = 0; + this.sndSuccess = 0; + this.recFailureRate = 0; + this.sndFailureRate = 0; + this.recRewinds = 0; + } + public bytesReceived: number; + public bytesSent: number; + public recSuccess: number; + public recFailed: number; + public recCollisions: number; + public recFailureRate: number; + public sndSuccess: number; + public sndAborted: number; + public sndRetries: number; + public sndFailureRate: number; + public recRewinds: number; + public updatefailureRate(): void { + this.recFailureRate = (this.recFailed + this.recSuccess) !== 0 ? (this.recFailed / (this.recFailed + this.recSuccess) * 100) : 0; + this.sndFailureRate = (this.sndAborted + this.sndSuccess) !== 0 ? (this.sndAborted / (this.sndAborted + this.sndSuccess) * 100) : 0; + } + public toLog(): string { + return `{ "bytesReceived": ${this.bytesReceived} "success": ${this.recSuccess}, "failed": ${this.recFailed}, "bytesSent": ${this.bytesSent}, "collisions": ${this.recCollisions}, "failureRate": ${this.recFailureRate.toFixed(2)}% }`; + } +} +// The following class allows njsPC to have multiple RS485 buses. Each port has its own buffer and message processor +// so that devices on the bus can be isolated to a particular port. By doing this the communications are such that multiple +// ports can be used to accommodate differing port speeds and fixed port addresses. If an +export class RS485Port { + constructor(cfg: any) { + this._cfg = cfg; + + this.emitter = new EventEmitter(); + this._inBuffer = []; + this._outBuffer = []; + this.procTimer = null; + this.emitter.on('messagewrite', (msg) => { this.pushOut(msg); }); + } + public isRTS: boolean = true; + public reconnects:number = 0; + public emitter: EventEmitter; + public get portId() { return typeof this._cfg !== 'undefined' && typeof this._cfg.portId !== 'undefined' ? this._cfg.portId : 0; } + public isOpen: boolean = false; + private _closing: boolean = false; + private _cfg: any; + private _port: any; + public mockPort: boolean = false; + private isPaused: boolean = false; + private connTimer: NodeJS.Timeout; + //public buffer: SendRecieveBuffer; + public get enabled(): boolean { return typeof this._cfg !== 'undefined' && this._cfg.enabled; } + public counter: Counter = new Counter(); + private procTimer: NodeJS.Timeout; + private _processing: boolean = false; + private _inBytes: number[] = []; + private _inBuffer: number[] = []; + private _outBuffer: Outbound[] = []; + private _waitingPacket: Outbound; + private _msg: Inbound; + // Connection management functions + public async openAsync(cfg?: any): Promise { + if (this.isOpen) await this.closeAsync(); + if (typeof cfg !== 'undefined') this._cfg = cfg; + if (!this._cfg.enabled) return true; if (this._cfg.netConnect && !this._cfg.mockPort) { - if (typeof this._port !== 'undefined' && this._port.isOpen) { + let sock: net.Socket = this._port as net.Socket; + if (typeof this._port !== 'undefined' && this.isOpen) { // This used to try to reconnect and recreate events even though the socket was already connected. This resulted in - // instances where multiple event processors were present. - return Promise.resolve(true); + // instances where multiple event processors were present. Node doesn't give us any indication that the socket is + // still viable or if it is closing from either end. + return true; + } + else if (typeof this._port !== 'undefined') { + // We need to kill the existing connection by ending it. + this._port.end(); } let nc: net.Socket = new net.Socket(); - nc.on('connect', () => { logger.info(`Net connect (socat) connected to: ${this._cfg.netHost}:${this._cfg.netPort}`); }); // Socket is opened but not yet ready. - nc.on('ready', () => { + nc.once('connect', () => { logger.info(`Net connect (socat) ${this._cfg.portId} connected to: ${this._cfg.netHost}:${this._cfg.netPort}`); }); // Socket is opened but not yet ready. + nc.once('ready', () => { this.isOpen = true; this.isRTS = true; - logger.info(`Net connect (socat) ready and communicating: ${this._cfg.netHost}:${this._cfg.netPort}`); + logger.info(`Net connect (socat) ${this._cfg.portId} ready and communicating: ${this._cfg.netHost}:${this._cfg.netPort}`); nc.on('data', (data) => { //this.resetConnTimer(); - if (data.length > 0 && !this.isPaused) this.emitter.emit('packetread', data); + if (data.length > 0 && !this.isPaused) this.pushIn(data); }); + this.emitPortStats(); }); - nc.on('close', (p) => { + nc.once('close', (p) => { this.isOpen = false; - if (typeof this._port !== 'undefined') this._port.destroy(); + if (typeof this._port !== 'undefined' && !this._port.destroyed) this._port.destroy(); this._port = undefined; - this.buffer.clearOutbound(); + this.clearOutboundBuffer(); + this.emitPortStats(); if (!this._closing) { // If we are closing manually this event should have been cleared already and should never be called. If this is fired out // of sequence then we will check the closing flag to ensure we are not forcibly closing the socket. @@ -245,16 +290,15 @@ export class Connection { this.connTimer = setTimeout(async () => { try { // We are already closed so give some inactivity retry and try again. - await conn.openAsync(); + await this.openAsync(); } catch (err) { } }, this._cfg.inactivityRetry * 1000); } - logger.info(`Net connect (socat) closed ${p === true ? 'due to error' : ''}: ${this._cfg.netHost}:${this._cfg.netPort}`); + logger.info(`Net connect (socat) ${this._cfg.portId} closed ${p === true ? 'due to error' : ''}: ${this._cfg.netHost}:${this._cfg.netPort}`); }); nc.on('end', () => { // Happens when the other end of the socket closes. this.isOpen = false; - //this.resetConnTimer(); - logger.info(`Net connect (socat) end event was fired`); + logger.info(`Net connect (socat) ${this.portId} end event was fired`); }); //nc.on('drain', () => { logger.info(`The drain event was fired.`); }); //nc.on('lookup', (o) => { logger.info(`The lookup event was fired ${o}`); }); @@ -262,13 +306,15 @@ export class Connection { // left the connection in a weird state where the previous connection was processing events and the new connection was // doing so as well. This isn't an error it is a warning as the RS485 bus will most likely be communicating at all times. //nc.on('timeout', () => { logger.warn(`Net connect (socat) Connection Idle: ${this._cfg.netHost}:${this._cfg.netPort}`); }); - nc.setTimeout(Math.max(this._cfg.inactivityRetry, 10) * 1000, async () => { - logger.warn(`Net connect (socat) connection idle: ${this._cfg.netHost}:${this._cfg.netPort} retrying connection.`); - try { - await conn.endAsync(); - await conn.openAsync(); - } catch (err) { logger.error(`Net connect (socat) error retrying connection ${err.message}`); } - }); + if (this._cfg.inactivityRetry > 0) { + nc.setTimeout(Math.max(this._cfg.inactivityRetry, 10) * 1000, async () => { + logger.warn(`Net connect (socat) connection idle: ${this._cfg.netHost}:${this._cfg.netPort} retrying connection.`); + try { + await this.closeAsync(); + await this.openAsync(); + } catch (err) { logger.error(`Net connect (socat)$ {this.portId} error retrying connection ${err.message}`); } + }); + } return await new Promise((resolve, _) => { // We only connect an error once as we will destroy this connection on error then recreate a new socket on failure. @@ -276,19 +322,21 @@ export class Connection { //logger.error(`Net connect (socat) Connection: ${err}. ${this._cfg.inactivityRetry > 0 ? `Retry in ${this._cfg.inactivityRetry} seconds` : `Never retrying; inactivityRetry set to ${this._cfg.inactivityRetry}`}`); //this.resetConnTimer(); this.isOpen = false; + this.emitPortStats(); // if the promise has already been fulfilled, but the error happens later, we don't want to call the promise again. if (typeof resolve !== 'undefined') { resolve(false); } if (this._cfg.inactivityRetry > 0) { - logger.error(`Net connect (socat) connection error: ${err}. Retry in ${this._cfg.inactivityRetry} seconds`); - setTimeout(async () => { try { await conn.openAsync(); } catch (err) { } }, this._cfg.inactivityRetry * 1000); + logger.error(`Net connect (socat) connection ${this.portId} error: ${err}. Retry in ${this._cfg.inactivityRetry} seconds`); + setTimeout(async () => { try { await this.openAsync(); } catch (err) { } }, this._cfg.inactivityRetry * 1000); } - else logger.error(`Net connect (socat) connection error: ${err}. Never retrying -- No retry time set`); + else logger.error(`Net connect (socat) connection ${this.portId} error: ${err}. Never retrying -- No retry time set`); }); - nc.connect(conn._cfg.netPort, conn._cfg.netHost, () => { - if (typeof this._port !== 'undefined') logger.warn('Net connect (socat) recovered from lost connection.'); - logger.info(`Net connect (socat) Connection connected`); + nc.connect(this._cfg.netPort, this._cfg.netHost, () => { + if (typeof this._port !== 'undefined') logger.warn(`Net connect (socat) ${this.portId} recovered from lost connection.`); + logger.info(`Net connect (socat) Connection ${this.portId} connected`); this._port = nc; this.isOpen = true; + this.emitPortStats(); resolve(true); resolve = undefined; }); @@ -297,9 +345,9 @@ export class Connection { else { if (typeof this._port !== 'undefined' && this._port.isOpen) { // This used to try to reconnect even though the serial port was already connected. This resulted in - // instances where an access denied error was emitted. + // instances where an access denied error was emitted. So if the port is open we will simply return. this.resetConnTimer(); - return Promise.resolve(true); + return true; } let sp: SerialPort = null; if (this._cfg.mockPort) { @@ -311,9 +359,9 @@ export class Connection { } else { this.mockPort = false; - sp = new SerialPort(conn._cfg.rs485Port, conn._cfg.portSettings); + sp = new SerialPort(this._cfg.rs485Port, this._cfg.portSettings); } - return new Promise((resolve, _) => { + return await new Promise((resolve, _) => { // The serial port open method calls the callback just once. Unfortunately that is not the case for // network serial port connections. There really isn't a way to make it syncronous. The openAsync will truly // be open if a hardware interface is used and this method returns. @@ -321,7 +369,7 @@ export class Connection { if (err) { this.resetConnTimer(); this.isOpen = false; - logger.error(`Error opening port: ${err.message}. ${this._cfg.inactivityRetry > 0 ? `Retry in ${this._cfg.inactivityRetry} seconds` : `Never retrying; inactivityRetry set to ${this._cfg.inactivityRetry}`}`); + logger.error(`Error opening port ${this.portId}: ${err.message}. ${this._cfg.inactivityRetry > 0 ? `Retry in ${this._cfg.inactivityRetry} seconds` : `Never retrying; inactivityRetry set to ${this._cfg.inactivityRetry}`}`); resolve(false); } else resolve(true); @@ -331,28 +379,31 @@ export class Connection { // won't be called until long after the promise is resolved above. Yes we should never reject this promise. The resolution is true // for a successul connect and false otherwise. sp.on('open', () => { - if (typeof conn._port !== 'undefined') logger.info(`Serial Port: ${this._cfg.rs485Port} recovered from lost connection.`) + if (typeof this._port !== 'undefined') logger.info(`Serial Port ${this.portId}: ${this._cfg.rs485Port} recovered from lost connection.`) else logger.info(`Serial port: ${this._cfg.rs485Port} request to open successful`); this._port = sp; this.isOpen = true; - sp.on('data', (data) => { if (!this.mockPort && !this.isPaused) this.emitter.emit('packetread', data); this.resetConnTimer(); }); + sp.on('data', (data) => { if (!this.mockPort && !this.isPaused) this.resetConnTimer(); this.pushIn(data); }); this.resetConnTimer(); + this.emitPortStats(); }); sp.on('close', (err) => { this.isOpen = false; - logger.info(`Serial Port has been closed: ${err ? JSON.stringify(err) : ''}`); + logger.info(`Serial Port ${this.portId} has been closed ${this.portId}: ${err ? JSON.stringify(err) : ''}`); }); sp.on('error', (err) => { this.isOpen = false; if (sp.isOpen) sp.close((err) => { }); // call this with the error callback so that it doesn't emit to the error again. this.resetConnTimer(); - logger.error(`Serial Port: An error occurred : ${this._cfg.rs485Port}: ${JSON.stringify(err)}`); + logger.error(`Serial Port ${this.portId}: An error occurred : ${this._cfg.rs485Port}: ${JSON.stringify(err)}`); + this.emitPortStats(); }); }); } } public async closeAsync(): Promise { try { + if (this._closing) return false; this._closing = true; if (this.connTimer) clearTimeout(this.connTimer); if (typeof this._port !== 'undefined' && this.isOpen) { @@ -361,33 +412,39 @@ export class Connection { this._port.removeAllListeners(); this._port.once('error', (err) => { if (err) { - logger.error(`Error closing ${this._cfg.netHost}:${this._cfg.netPort}/${this._cfg.rs485Port}: ${err}`); + logger.error(`Error closing ${this.portId} ${ this._cfg.netHost }: ${ this._cfg.netPort } / ${ this._cfg.rs485Port }: ${ err }`); resolve(false); } else { - conn._port = undefined; + this._port = undefined; this.isOpen = false; - logger.info(`Successfully closed (socat) port ${this._cfg.netHost}:${this._cfg.netPort}/${this._cfg.rs485Port}`); + logger.info(`Successfully closed (socat) ${this.portId} port ${this._cfg.netHost}:${this._cfg.netPort} / ${this._cfg.rs485Port}`); resolve(true); } }); + this._port.once('end', () => { + logger.info(`Net connect (socat) ${this.portId} closing: ${this._cfg.netHost}:${this._cfg.netPort}`); + }); this._port.once('close', (p) => { this.isOpen = false; this._port = undefined; - logger.info(`Net connect (socat) successfully closed: ${this._cfg.netHost}:${this._cfg.netPort}`); + logger.info(`Net connect (socat) ${this.portId} successfully closed: ${this._cfg.netHost}:${this._cfg.netPort}`); resolve(true); }); + logger.info(`Net connect (socat) ${this.portId} request close: ${this._cfg.netHost}:${this._cfg.netPort}`); + // Unfortunately the end call does not actually work in node. It will simply not return anything so we are going to + // just call destroy and forcibly close it. this._port.destroy(); } - else if (typeof conn._port.close === 'function') { - conn._port.close((err) => { + else if (typeof this._port.close === 'function') { + this._port.close((err) => { if (err) { - logger.error(`Error closing ${this._cfg.rs485Port}: ${err}`); + logger.error(`Error closing ${this.portId} serial port ${this._cfg.rs485Port}: ${err}`); resolve(false); } else { - conn._port = undefined; - logger.info(`Successfully closed seral port ${this._cfg.rs485Port}`); + this._port = undefined; + logger.info(`Successfully closed ${this.portId} seral port ${this._cfg.rs485Port}`); resolve(true); this.isOpen = false; } @@ -395,179 +452,73 @@ export class Connection { } else { resolve(true); - conn._port = undefined; + this._port = undefined; } }); - if (success) { - if (typeof conn.buffer !== 'undefined') conn.buffer.close(); - } + if (success) { this.closeBuffer(); } return success; } return true; - } catch (err) { logger.error(`Error closing comms connection: ${err.message}`); return Promise.resolve(false); } + } catch (err) { logger.error(`Error closing comms connection ${this.portId}: ${err.message}`); return Promise.resolve(false); } + finally { this._closing = false; this.emitPortStats(); } } - public async endAsync(): Promise { - try { - this._closing = true; - if (this.connTimer) clearTimeout(this.connTimer); - if (typeof this._port !== 'undefined' && this.isOpen) { - let success = await new Promise((resolve, reject) => { - if (this._cfg.netConnect) { - this._port.removeAllListeners(); - this._port.once('error', (err) => { - if (err) { - logger.error(`Error closing ${this._cfg.netHost}:${this._cfg.netPort}/${this._cfg.rs485Port}: ${err}`); - resolve(false); - } - else { - conn._port = undefined; - this.isOpen = false; - logger.info(`Successfully closed (socat) port ${this._cfg.netHost}:${this._cfg.netPort}/${this._cfg.rs485Port}`); - resolve(true); - } - }); - this._port.once('close', (p) => { - this.isOpen = false; - this._port = undefined; - logger.info(`Net connect (socat) successfully closed: ${this._cfg.netHost}:${this._cfg.netPort}`); - resolve(true); - }); - this._port.destroy(); - } - else if (typeof conn._port.close === 'function') { - conn._port.close((err) => { - if (err) { - logger.error(`Error closing ${this._cfg.rs485Port}: ${err}`); - resolve(false); - } - else { - conn._port = undefined; - logger.info(`Successfully closed seral port ${this._cfg.rs485Port}`); - resolve(true); - this.isOpen = false; - } - }); - } - else { - resolve(true); - conn._port = undefined; - } - }); - if (success) { - if (typeof conn.buffer !== 'undefined') conn.buffer.close(); - } - return success; + public pause() { this.isPaused = true; this.clearBuffer(); this.drain(function (err) { }); } + // RKS: Resume is executed in a closure. This is because we want the current async process to complete + // before we resume. This way the messages are cleared right before we restart. + public resume() { if (this.isPaused) setTimeout(() => { this.clearBuffer(); this.isPaused = false; }, 0); } + protected resetConnTimer(...args) { + //console.log(`resetting connection timer`); + if (this.connTimer !== null) clearTimeout(this.connTimer); + if (!this._cfg.mockPort && this._cfg.inactivityRetry > 0 && !this._closing) this.connTimer = setTimeout(async () => { + try { + if (this._cfg.netConnect) + logger.warn(`Inactivity timeout for ${this.portId} serial port ${this._cfg.netHost}:${this._cfg.netPort}/${this._cfg.rs485Port} after ${this._cfg.inactivityRetry} seconds`); + else + logger.warn(`Inactivity timeout for ${this.portId} serial port ${this._cfg.rs485Port} after ${this._cfg.inactivityRetry} seconds`); + //await this.closeAsync(); + this.reconnects++; + await this.openAsync(); } - return true; - } catch (err) { logger.error(`Error closing comms connection: ${err.message}`); return Promise.resolve(false); } - finally { this._closing = false; } + catch (err) { logger.error(`Error resetting RS485 port on inactivity: ${err.message}`); }; + }, this._cfg.inactivityRetry * 1000); } + // Data management functions public drain(cb: Function) { - if (typeof (conn._port.drain) === 'function') - conn._port.drain(cb); + if (typeof this._port === 'undefined') { + logger.debug(`Serial Port ${this.portId}: Cannot perform drain function on port that is not open.`); + cb(); + } + if (typeof (this._port.drain) === 'function') + this._port.drain(cb); else // Call the method immediately as the port doesn't wait to send. cb(); } public write(bytes: Buffer, cb: Function) { - if (conn._cfg.netConnect) { + if (this._cfg.netConnect) { // SOCAT drops the connection and destroys the stream. Could be weeks or as little as a day. - if (typeof conn._port === 'undefined' || conn._port.destroyed !== false) { - conn.openAsync().then(() => { - conn._port.write(bytes, 'binary', cb); + if (typeof this._port === 'undefined' || this._port.destroyed !== false) { + this.openAsync().then(() => { + this._port.write(bytes, 'binary', cb); }); } else - conn._port.write(bytes, 'binary', cb); + this._port.write(bytes, 'binary', cb); } else - conn._port.write(bytes, cb); - } - public async stopAsync() { - try { - await conn.closeAsync(); - logger.info(`Closed serial communications connection.`); - } catch (err) { logger.error(`Error closing comms connection: ${err.message} `); } + this._port.write(bytes, cb); } - public init() { - conn._cfg = config.getSection('controller.comms', { - rs485Port: "/dev/ttyUSB0", - portSettings: { baudRate: 9600, dataBits: 8, parity: 'none', stopBits: 1, flowControl: false, autoOpen: false, lock: false }, - mockPort: false, - netConnect: false, - netHost: "raspberrypi", - netPort: 9801, - inactivityRetry: 10 - }); - conn.openAsync().then(() => { logger.debug(`Connection opened from init function;`); }).catch((err) => { logger.error(`Connection failed to open from init function. ${err}`); }); - config.emitter.on('reloaded', () => { - console.log('Config reloaded'); - this.reloadConfig(config.getSection('controller.comms', { - rs485Port: "/dev/ttyUSB0", - portSettings: { baudRate: 9600, dataBits: 8, parity: 'none', stopBits: 1, flowControl: false, autoOpen: false, lock: false }, - mockPort: false, - netConnect: false, - netHost: "raspberrypi", - netPort: 9801, - inactivityRetry: 10 - })); - }); - } - public reloadConfig(cfg) { - let c = extend({ - rs485Port: "/dev/ttyUSB0", - portSettings: { baudRate: 9600, dataBits: 8, parity: 'none', stopBits: 1, flowControl: false, autoOpen: false, lock: false }, - mockPort: false, - netConnect: false, - netHost: "raspberrypi", - netPort: 9801, - inactivityRetry: 10 - }, cfg); - if (JSON.stringify(c) !== JSON.stringify(this._cfg)) { - this.closeAsync(); - this._cfg = c; - if (this._cfg.enabled) this.openAsync(); - } - } - public queueSendMessage(msg: Outbound) { conn.emitter.emit('messagewrite', msg); } - public pause() { conn.isPaused = true; conn.buffer.clear(); conn.drain(function (err) { }); } - // RKS: Resume is executed in a closure. This is because we want the current async process to complete - // before we resume. This way the messages are cleared right before we restart. - public resume() { if (this.isPaused) setTimeout(function () { conn.buffer.clear(); conn.isPaused = false; }, 0); } - // RKS: This appears to not be used. - //public queueReceiveMessage(pkt: Inbound) { - // logger.info(`Receiving ${ pkt.action } `); - // conn.buffer.pushIn(pkt); - //} -} -export class SendRecieveBuffer { - constructor() { - this._inBuffer = []; - this._outBuffer = []; - this.procTimer = null;//setInterval(this.processPackets, 175); - } - public counter: Counter = new Counter(); - private procTimer: NodeJS.Timeout; - private _processing: boolean = false; - private _inBytes: number[] = []; - private _inBuffer: number[] = []; - private _outBuffer: Outbound[] = []; - private _waitingPacket: Outbound; - private _msg: Inbound; - public pushIn(pkt) { - let self = this; - conn.buffer._inBuffer.push.apply(conn.buffer._inBuffer, pkt.toJSON().data); setTimeout(() => { self.processPackets(); }, 0); - } - public pushOut(msg) { conn.buffer._outBuffer.push(msg); setTimeout(() => { this.processPackets(); }, 0); } - public clear() { conn.buffer._inBuffer.length = 0; this.clearOutbound(); } - public close() { clearTimeout(conn.buffer.procTimer); conn.buffer.clear(); this._msg = undefined; } - public clearOutbound() { - let processing = conn.buffer._processing; - clearTimeout(conn.buffer.procTimer); - conn.buffer.procTimer = null; - conn.buffer._processing = true; - conn.isRTS = false; - let msg: Outbound = typeof conn.buffer._waitingPacket !== 'undefined' ? conn.buffer._waitingPacket : conn.buffer._outBuffer.shift(); - conn.buffer._waitingPacket = null; + private pushIn(pkt) { this._inBuffer.push.apply(this._inBuffer, pkt.toJSON().data); if(sys.isReady) setImmediate(() => { this.processPackets(); }); } + private pushOut(msg) { this._outBuffer.push(msg); setTimeout(() => { this.processPackets(); }, 0); } + private clearBuffer() { this._inBuffer.length = 0; this.clearOutboundBuffer(); } + private closeBuffer() { clearTimeout(this.procTimer); this.clearBuffer(); this._msg = undefined; } + private clearOutboundBuffer() { + let processing = this._processing; + clearTimeout(this.procTimer); + this.procTimer = null; + this._processing = true; + this.isRTS = false; + let msg: Outbound = typeof this._waitingPacket !== 'undefined' ? this._waitingPacket : this._outBuffer.shift(); + this._waitingPacket = null; while (typeof msg !== 'undefined' && msg) { // Fail the message. msg.failed = true; @@ -579,114 +530,89 @@ export class SendRecieveBuffer { // Wait for this current process to complete then bombard all the processes with the callback. if (msg.response instanceof Response && typeof (msg.response.callback) === 'function') setImmediate(msg.response.callback, msg); } - conn.buffer.counter.sndAborted++; - msg = conn.buffer._outBuffer.shift(); + this.counter.sndAborted++; + msg = this._outBuffer.shift(); } - conn.buffer._processing = processing; - conn.isRTS = true; + this._processing = processing; + this.isRTS = true; } - /******************************************************************** - * RKS: 06-06-20 - * This used to process every 175ms. While the processing was light - * when there was nothing to process this should have always been - * event based so the processing timer has been reworked. - * - * Now this method gets called only during the following conditions. - * 1. A packetread event comes from the serial port and has data - * 2. A message is placed onto the outbound queue - * 3. The outbound queue has messages that are waiting to send. In - * this instance this method is called every 200ms until the queue - * is empty. If one of the above conditions are met then this method - * will be triggered earlier. - * - ****************************************************************** */ private processPackets() { - if (conn.buffer._processing) return; - if (conn.buffer.procTimer) { - clearTimeout(conn.buffer.procTimer); - conn.buffer.procTimer = null; + if (this._processing) return; + if (this.procTimer) { + clearTimeout(this.procTimer); + this.procTimer = null; } - conn.buffer._processing = true; - conn.buffer.processInbound(); - conn.buffer.processOutbound(); - conn.buffer._processing = false; + this._processing = true; + this.processInboundPackets(); + this.processOutboundPackets(); + this._processing = false; } private processWaitPacket(): boolean { - if (typeof conn.buffer._waitingPacket !== 'undefined' && conn.buffer._waitingPacket) { - let timeout = conn.buffer._waitingPacket.timeout || 1000; + if (typeof this._waitingPacket !== 'undefined' && this._waitingPacket) { + let timeout = this._waitingPacket.timeout || 1000; let dt = new Date(); - if (conn.buffer._waitingPacket.timestamp.getTime() + timeout < dt.getTime()) { - logger.silly(`Retrying outbound message after ${(dt.getTime() - conn.buffer._waitingPacket.timestamp.getTime()) / 1000} secs with ${conn.buffer._waitingPacket.remainingTries} attempt(s) left. - ${conn.buffer._waitingPacket.toShortPacket()} `); - conn.buffer.counter.sndRetries++; - conn.buffer.writeMessage(conn.buffer._waitingPacket); + if (this._waitingPacket.timestamp.getTime() + timeout < dt.getTime()) { + logger.silly(`Retrying outbound message after ${(dt.getTime() - this._waitingPacket.timestamp.getTime()) / 1000} secs with ${this._waitingPacket.remainingTries} attempt(s) left. - ${this._waitingPacket.toShortPacket()} `); + this.counter.sndRetries++; + this.writeMessage(this._waitingPacket); } return true; } return false; } - protected processOutbound() { + protected processOutboundPackets() { let msg: Outbound; - if (!conn.buffer.processWaitPacket() && conn.buffer._outBuffer.length > 0) { - if (conn.isOpen) { - if (conn.isRTS) { - msg = conn.buffer._outBuffer.shift(); + if (!this.processWaitPacket() && this._outBuffer.length > 0) { + if (this.isOpen) { + if (this.isRTS) { + msg = this._outBuffer.shift(); if (typeof msg === 'undefined' || !msg) return; // If the serial port is busy we don't want to process any outbound. However, this used to // not process the outbound even when the incoming bytes didn't mean anything. Now we only delay // the outbound when we actually have a message signatures to process. - conn.buffer.writeMessage(msg); + this.writeMessage(msg); } } else { // port is closed, reject message - msg = conn.buffer._outBuffer.shift(); + msg = this._outBuffer.shift(); msg.failed = true; - logger.warn(`Comms port is not open.Message aborted: ${msg.toShortPacket()} `); + logger.warn(`Comms port is not open. Message aborted: ${msg.toShortPacket()} `); // This is a hard fail. We don't have any more tries left and the message didn't // make it onto the wire. if (typeof msg.onAbort === 'function') msg.onAbort(); else logger.warn(`Message aborted after ${msg.tries} attempt(s): ${msg.toShortPacket()} `); - let error = new OutboundMessageError(msg, `Comms port is not open.Message aborted: ${msg.toShortPacket()} `); + let error = new OutboundMessageError(msg, `Comms port is not open. Message aborted: ${msg.toShortPacket()} `); if (typeof msg.onComplete === 'function') msg.onComplete(error, undefined); - conn.buffer._waitingPacket = null; - conn.buffer.counter.sndAborted++; - conn.buffer.counter.updatefailureRate(); - webApp.emitToChannel('rs485PortStats', 'rs485Stats', conn.buffer.counter); + this._waitingPacket = null; + this.counter.sndAborted++; + this.counter.updatefailureRate(); + this.emitPortStats(); } } // RG: added the last `|| typeof msg !== 'undef'` because virtual chem controller only sends a single packet - // but this condition would be eval'd before the callback of conn.write was calls and the outbound packet + // but this condition would be eval'd before the callback of port.write was calls and the outbound packet // would be sitting idle for eternity. - if (conn.buffer._outBuffer.length > 0 || typeof conn.buffer._waitingPacket !== 'undefined' || conn.buffer._waitingPacket || typeof msg !== 'undefined') { + if (this._outBuffer.length > 0 || typeof this._waitingPacket !== 'undefined' || this._waitingPacket || typeof msg !== 'undefined') { // Come back later as we still have items to send. let self = this; - conn.buffer.procTimer = setTimeout(() => self.processPackets(), 100); + this.procTimer = setTimeout(() => self.processPackets(), 100); } } - /* - * Writing messages on the queue is tricky to harden. The async nature of the serial port in node doesn't appropriately drain the port after each message - * so even though the callback is called for the .write method it doesn't guarantee that it has been written. Not such an issue when we are dealing with full-duplex - * but in this half-duplex environment we don't have an RTS. This is further complicated by the fact that no event is raised when the port finally gets around to - * dumping it's buffer on the wire. The only time we are notified is when there is a failure. Even then it does not point to a particular message since the - * port is unaware of our protocol. - * - * To that end we need to create a semaphore so that we don't place two messages back to back while we are waiting on the callback to return. - */ - private writeMessage(msg: Outbound) { // Make sure we are not re-entrant while the the port.write is going on. // This ends in goofiness as it can send more than one message at a time while it // waits for the command buffer to be flushed. NOTE: There is no success message and the callback to // write only verifies that the buffer got ahold of it. - if (!conn.isRTS || conn.mockPort) return; - conn.isRTS = false; + if (!this.isRTS || this.mockPort) return; + this.isRTS = false; var bytes = msg.toPacket(); - if (conn.isOpen) { + if (this.isOpen) { if (msg.remainingTries <= 0) { // It will almost never fall into here. The rare case where // we have an RTS semaphore and a waiting response might make it go here. msg.failed = true; - conn.buffer._waitingPacket = null; + this._waitingPacket = null; if (typeof msg.onAbort === 'function') msg.onAbort(); else logger.warn(`Message aborted after ${msg.tries} attempt(s): ${msg.toShortPacket()} `); let err = new OutboundMessageError(msg, `Message aborted after ${msg.tries} attempt(s): ${msg.toShortPacket()} `); @@ -696,20 +622,20 @@ export class SendRecieveBuffer { setTimeout(msg.response.callback, 100, msg); } } - conn.buffer.counter.sndAborted++; - conn.isRTS = true; + this.counter.sndAborted++; + this.isRTS = true; return; } - conn.buffer.counter.bytesSent += bytes.length; + this.counter.bytesSent += bytes.length; msg.timestamp = new Date(); logger.packet(msg); - conn.write(Buffer.from(bytes), function (err) { + this.write(Buffer.from(bytes), (err) => { msg.tries++; - conn.isRTS = true; + this.isRTS = true; if (err) { logger.error('Error writing packet %s', err); // We had an error so we need to set the waiting packet if there are retries - if (msg.remainingTries > 0) conn.buffer._waitingPacket = msg; + if (msg.remainingTries > 0) this._waitingPacket = msg; else { msg.failed = true; logger.warn(`Message aborted after ${msg.tries} attempt(s): ${bytes}: ${err} `); @@ -717,43 +643,44 @@ export class SendRecieveBuffer { // make it onto the wire. let error = new OutboundMessageError(msg, `Message aborted after ${msg.tries} attempt(s): ${err} `); if (typeof msg.onComplete === 'function') msg.onComplete(error, undefined); - conn.buffer._waitingPacket = null; - conn.buffer.counter.sndAborted++; + this._waitingPacket = null; + this.counter.sndAborted++; } } else { logger.verbose(`Wrote packet[${bytes}].Retries remaining: ${msg.remainingTries} `); // We have all the success we are going to get so if the call succeeded then // don't set the waiting packet when we aren't actually waiting for a response. - conn.buffer.counter.sndSuccess++; if (!msg.requiresResponse) { // As far as we know the message made it to OCP. - conn.buffer._waitingPacket = null; + this._waitingPacket = null; + this.counter.sndSuccess++; if (typeof msg.onComplete === 'function') msg.onComplete(err, undefined); } else if (msg.remainingTries >= 0) { - conn.buffer._waitingPacket = msg; + this._waitingPacket = msg; } } - conn.buffer.counter.updatefailureRate(); - webApp.emitToChannel('rs485PortStats', 'rs485Stats', conn.buffer.counter); + this.counter.updatefailureRate(); + this.emitPortStats(); }); } } private clearResponses(msgIn: Inbound) { - if (conn.buffer._outBuffer.length === 0 && typeof (conn.buffer._waitingPacket) !== 'object' && conn.buffer._waitingPacket) return; + if (this._outBuffer.length === 0 && typeof (this._waitingPacket) !== 'object' && this._waitingPacket) return; var callback; - let msgOut = conn.buffer._waitingPacket; - if (typeof (conn.buffer._waitingPacket) !== 'undefined' && conn.buffer._waitingPacket) { + let msgOut = this._waitingPacket; + if (typeof (this._waitingPacket) !== 'undefined' && this._waitingPacket) { var resp = msgOut.response; if (msgOut.requiresResponse) { if (resp instanceof Response && resp.isResponse(msgIn, msgOut)) { - conn.buffer._waitingPacket = null; + this._waitingPacket = null; if (typeof msgOut.onComplete === 'function') msgOut.onComplete(undefined, msgIn); callback = resp.callback; resp.message = msgIn; - if (resp.ack) conn.queueSendMessage(resp.ack); + this.counter.sndSuccess++; + if (resp.ack) this.pushOut(resp.ack); } } } @@ -761,9 +688,9 @@ export class SendRecieveBuffer { // RG - when would there be additional packets besides the first in the outbuffer that needs to be removed from a single incoming packet? // RKS: This occurs when two of the same message signature is thrown onto the queue. Most often when there is a queue full of configuration requests. The // triggers that cause the outbound message may come at the same time that another controller makes a call. - var i = conn.buffer._outBuffer.length - 1; + var i = this._outBuffer.length - 1; while (i >= 0) { - let out = conn.buffer._outBuffer[i--]; + let out = this._outBuffer[i--]; if (typeof out === 'undefined') continue; let resp = out.response; // RG - added check for msgOut because the *Touch chlor packet 153 adds an status packet 217 @@ -772,7 +699,7 @@ export class SendRecieveBuffer { if (resp instanceof Response && resp.isResponse(msgIn, out) && (typeof out.scope === 'undefined' || out.scope === msgOut.scope)) { resp.message = msgIn; if (typeof (resp.callback) === 'function' && resp.callback) callback = resp.callback; - conn.buffer._outBuffer.splice(i, 1); + this._outBuffer.splice(i, 1); } } } @@ -782,95 +709,72 @@ export class SendRecieveBuffer { // that we also need. This occurs when more than one panel on the bus requests a reconfig at the same time. if (typeof (callback) === 'function') { setTimeout(callback, 100, msgOut); } } + public get stats() { + let status = this.isOpen ? 'open' : this._cfg.enabled ? 'closed' : 'disabled'; + return extend(true, { portId: this.portId, status: status, reconnects: this.reconnects }, this.counter) + } + private emitPortStats() { + webApp.emitToChannel('rs485PortStats', 'rs485Stats', this.stats); + } private processCompletedMessage(msg: Inbound, ndx): number { msg.timestamp = new Date(); + msg.portId = this.portId; msg.id = Message.nextMessageId; - conn.buffer.counter.recCollisions += msg.collisions; + this.counter.recCollisions += msg.collisions; + this.counter.recRewinds += msg.rewinds; logger.packet(msg); - webApp.emitToChannel('rs485PortStats', 'rs485Stats', conn.buffer.counter); + this.emitPortStats(); if (msg.isValid) { - conn.buffer.counter.recSuccess++; - conn.buffer.counter.updatefailureRate(); + this.counter.recSuccess++; + this.counter.updatefailureRate(); msg.process(); - conn.buffer.clearResponses(msg); + this.clearResponses(msg); } else { - conn.buffer.counter.recFailed++; - conn.buffer.counter.updatefailureRate(); - console.log('RS485 Stats:' + conn.buffer.counter.toLog()); + this.counter.recFailed++; + this.counter.updatefailureRate(); + console.log('RS485 Stats:' + this.counter.toLog()); ndx = this.rewindFailedMessage(msg, ndx); } return ndx; } private rewindFailedMessage(msg: Inbound, ndx: number): number { + this.counter.recRewinds++; // Lets see if we can do a rewind to capture another message from the // crap on the bus. This will get us to the innermost message. While the outer message may have failed the inner message should // be able to buck up and make it happen. - conn.buffer._inBytes = conn.buffer._inBytes.slice(ndx); // Start by removing all of the bytes related to the original message. + this._inBytes = this._inBytes.slice(ndx); // Start by removing all of the bytes related to the original message. // Add all of the elements of the message back in reverse. - conn.buffer._inBytes.unshift(...msg.term); - conn.buffer._inBytes.unshift(...msg.payload); - conn.buffer._inBytes.unshift(...msg.header.slice(1)); // Trim off the first byte from the header. This means it won't find 16,2 or start with a 165. The + this._inBytes.unshift(...msg.term); + this._inBytes.unshift(...msg.payload); + this._inBytes.unshift(...msg.header.slice(1)); // Trim off the first byte from the header. This means it won't find 16,2 or start with a 165. The // algorithm looks for the header bytes to determine the protocol so the rewind shouldn't include the 16 in 16,2 otherwise it will just keep rewinding. - conn.buffer._msg = msg = new Inbound(); - ndx = msg.readPacket(conn.buffer._inBytes); + this._msg = msg = new Inbound(); + ndx = msg.readPacket(this._inBytes); if (msg.isComplete) { ndx = this.processCompletedMessage(msg, ndx); } return ndx; } - protected processInbound() { - conn.buffer.counter.bytesReceived += conn.buffer._inBuffer.length; - conn.buffer._inBytes.push.apply(conn.buffer._inBytes, conn.buffer._inBuffer.splice(0, conn.buffer._inBuffer.length)); - if (conn.buffer._inBytes.length >= 1) { // Wait until we have something to process. + protected processInboundPackets() { + this.counter.bytesReceived += this._inBuffer.length; + this._inBytes.push.apply(this._inBytes, this._inBuffer.splice(0, this._inBuffer.length)); + if (this._inBytes.length >= 1) { // Wait until we have something to process. let ndx: number = 0; - let msg: Inbound = conn.buffer._msg; + let msg: Inbound = this._msg; do { if (typeof (msg) === 'undefined' || msg === null || msg.isComplete || !msg.isValid) { - conn.buffer._msg = msg = new Inbound(); - ndx = msg.readPacket(conn.buffer._inBytes); + this._msg = msg = new Inbound(); + ndx = msg.readPacket(this._inBytes); } - else ndx = msg.mergeBytes(conn.buffer._inBytes); + else ndx = msg.mergeBytes(this._inBytes); if (msg.isComplete) ndx = this.processCompletedMessage(msg, ndx); if (ndx > 0) { - conn.buffer._inBytes = conn.buffer._inBytes.slice(ndx); + this._inBytes = this._inBytes.slice(ndx); ndx = 0; } else break; - } while (ndx < conn.buffer._inBytes.length); + } while (ndx < this._inBytes.length); } } } -export class Counter { - constructor() { - this.bytesReceived = 0; - this.recSuccess = 0; - this.recFailed = 0; - this.recCollisions = 0; - this.bytesSent = 0; - this.sndAborted = 0; - this.sndRetries = 0; - this.sndSuccess = 0; - this.recFailureRate = 0; - this.sndFailureRate = 0; - } - public bytesReceived: number; - public bytesSent: number; - public recSuccess: number; - public recFailed: number; - public recCollisions: number; - public recFailureRate: number; - public sndSuccess: number; - public sndAborted: number; - public sndRetries: number; - public sndFailureRate: number; - public updatefailureRate(): void { - conn.buffer.counter.recFailureRate = (this.recFailed + this.recSuccess) !== 0 ? (this.recFailed / (this.recFailed + this.recSuccess) * 100) : 0; - conn.buffer.counter.sndFailureRate = (this.sndAborted + this.sndSuccess) !== 0 ? (this.sndAborted / (this.sndAborted + this.sndSuccess) * 100) : 0; - //conn.buffer.counter.recFailureRate = `${(conn.buffer.counter.recFailed / (conn.buffer.counter.recFailed + conn.buffer.counter.recSuccess) * 100).toFixed(2)}% `; - //conn.buffer.counter.sndFailureRate = `${(conn.buffer.counter.sndAborted / (conn.buffer.counter.sndAborted + conn.buffer.counter.sndSuccess) * 100).toFixed(2)}% `; - } - public toLog(): string { - return `{ "bytesReceived": ${this.bytesReceived} "success": ${this.recSuccess}, "failed": ${this.recFailed}, "bytesSent": ${this.bytesSent}, "collisions": ${this.recCollisions}, "failureRate": ${this.recFailureRate.toFixed(2)}% }`; - } -} -export var conn: Connection = new Connection(); \ No newline at end of file +export var conn: Connection = new Connection(); diff --git a/controller/comms/messages/Messages.ts b/controller/comms/messages/Messages.ts index 5046c2fd..610e386c 100755 --- a/controller/comms/messages/Messages.ts +++ b/controller/comms/messages/Messages.ts @@ -71,7 +71,7 @@ export class Message { // Fields private static _messageId: number = 0; public static get nextMessageId(): number { return this._messageId < 80000 ? ++this._messageId : this._messageId = 0; } - + public portId = 0; // This will be the target or source port for the message. If this is from or to an Aux RS485 port the value will be > 0. public timestamp: Date = new Date(); public direction: Direction = Direction.In; public protocol: Protocol = Protocol.Unknown; @@ -186,6 +186,7 @@ export class Inbound extends Message { public responseFor: number[] = []; public isProcessed: boolean = false; public collisions: number = 0; + public rewinds: number = 0; // Private methods private isValidChecksum(): boolean { if (this.protocol === Protocol.Chlorinator || this.protocol === Protocol.AquaLink) return this.checksum % 256 === this.chkLo; @@ -193,8 +194,8 @@ export class Inbound extends Message { } public toLog() { if (this.responseFor.length > 0) - return `{"id":${this.id},"valid":${this.isValid},"dir":"${this.direction}","proto":"${this.protocol}","for":${JSON.stringify(this.responseFor)},"pkt":[${JSON.stringify(this.padding)},${JSON.stringify(this.preamble)},${JSON.stringify(this.header)},${JSON.stringify(this.payload)},${JSON.stringify(this.term)}],"ts": "${Timestamp.toISOLocal(this.timestamp)}"}`; - return `{"id":${this.id},"valid":${this.isValid},"dir":"${this.direction}","proto":"${this.protocol}","pkt":[${JSON.stringify(this.padding)},${JSON.stringify(this.preamble)},${JSON.stringify(this.header)},${JSON.stringify(this.payload)},${JSON.stringify(this.term)}],"ts": "${Timestamp.toISOLocal(this.timestamp)}"}`; + return `{"port:${this.portId || 0} id":${this.id},"valid":${this.isValid},"dir":"${ this.direction } ","proto":"${ this.protocol } ","for":${JSON.stringify(this.responseFor)},"pkt":[${JSON.stringify(this.padding)},${JSON.stringify(this.preamble)},${JSON.stringify(this.header)},${JSON.stringify(this.payload)},${JSON.stringify(this.term)}],"ts": "${ Timestamp.toISOLocal(this.timestamp) } "}`; + return `{"port: ${this.portId || 0} id":${this.id},"valid":${this.isValid},"dir":"${this.direction}","proto":"${this.protocol}","pkt":[${JSON.stringify(this.padding)},${JSON.stringify(this.preamble)},${JSON.stringify(this.header)},${JSON.stringify(this.payload)},${JSON.stringify(this.term)}],"ts": "${Timestamp.toISOLocal(this.timestamp)}"}`; } private testChlorHeader(bytes: number[], ndx: number): boolean { // if packets have 16,2 (eg status=16,2,29) in them and they come as partial packets, they would have @@ -247,6 +248,7 @@ export class Inbound extends Message { this.isValid = true; this.collisions++; + this.rewinds++; logger.info(`rewinding message collision ${this.collisions} ${ndx} ${bytes.length} ${JSON.stringify(buff)}`); this.readPacket(buff); return ndx; @@ -327,6 +329,7 @@ export class Inbound extends Message { this.preamble = []; this.header = []; this.collisions++; + this.rewinds++; return ndxHeader + 1; } break; diff --git a/defaultConfig.json b/defaultConfig.json index 7fc5f329..6cae6052 100755 --- a/defaultConfig.json +++ b/defaultConfig.json @@ -1,6 +1,7 @@ { "controller": { "comms": { + "portId": 0, "enabled": true, "rs485Port": "/dev/ttyUSB0", "mockPort": false, diff --git a/web/Server.ts b/web/Server.ts index 3e4311c8..24f238bf 100755 --- a/web/Server.ts +++ b/web/Server.ts @@ -618,42 +618,6 @@ export class HttpServer extends ProtoServer { self._sockets = await self.sockServer.fetchSockets(); }); sock.on('echo', (msg) => { sock.emit('echo', msg); }); -/* sock.on('receivePacketRaw', function (incomingPacket: any[]) { - //var str = 'Add packet(s) to incoming buffer: '; - logger.silly('User request (replay.html) to RECEIVE packet: %s', JSON.stringify(incomingPacket)); - for (var i = 0; i < incomingPacket.length; i++) { - conn.buffer.pushIn(Buffer.from(incomingPacket[i])); - // str += JSON.stringify(incomingPacket[i]) + ' '; - } - //logger.info(str); - }); - sock.on('replayPackets', function (inboundPkts: number[][]) { - // used for replay - logger.debug(`Received replayPackets: ${inboundPkts}`); - inboundPkts.forEach(inbound => { - conn.buffer.pushIn(Buffer.from([].concat.apply([], inbound))); - // conn.queueInboundMessage([].concat.apply([], inbound)); - }); - }); - sock.on('sendPackets', function (bytesToProcessArr: number[][]) { - // takes an input of bytes (src/dest/action/payload) and sends - if (!bytesToProcessArr.length) return; - logger.silly('User request (replay.html) to SEND packet: %s', JSON.stringify(bytesToProcessArr)); - - do { - let bytesToProcess: number[] = bytesToProcessArr.shift(); - - // todo: logic for chlor packets - let out = Outbound.create({ - source: bytesToProcess.shift(), - dest: bytesToProcess.shift(), - action: bytesToProcess.shift(), - payload: bytesToProcess.splice(1, bytesToProcess[0]) - }); - conn.queueSendMessage(out); - } while (bytesToProcessArr.length > 0); - - }); */ sock.on('sendOutboundMessage', (mdata) => { let msg: Outbound = Outbound.create({}); Object.assign(msg, mdata); diff --git a/web/services/config/Config.ts b/web/services/config/Config.ts index 29c57f04..33876610 100755 --- a/web/services/config/Config.ts +++ b/web/services/config/Config.ts @@ -63,10 +63,16 @@ export class ConfigRoute { return res.status(200).send(opts); }); app.get('/config/options/rs485', (req, res) => { - let opts = { - port: config.getSection('controller.comms', { enabled: false, netConnect: false }), - stats: conn.buffer.counter - }; + let opts = { ports: [] } + let cfg = config.getSection('controller'); + for (let section in cfg) { + if (section.startsWith('comms')) { + let cport = extend(true, { enabled: false, netConnect: false }, cfg[section]); + let port = conn.findPortById(cport.portId || 0); + if (typeof port !== 'undefined') cport.stats = port.stats; + opts.ports.push(cport); + } + } return res.status(200).send(opts); }); app.get('/config/options/circuits', async (req, res, next) => { @@ -756,6 +762,13 @@ export class ConfigRoute { } catch (err) { next(err); } }); + app.delete('/app/rs485Port', async (req, res, next) => { + try { + let port = await conn.deleteAuxPort(req.body); + return res.status(200).send(port); + } + catch (err) { next(err); } + }); app.get('/app/config/startPacketCapture', (req, res) => { startPacketCapture(true); return res.status(200).send('OK');