diff --git a/Readme.md b/Readme.md index abfb4e7..9e57666 100644 --- a/Readme.md +++ b/Readme.md @@ -1,34 +1,37 @@ # Mikronode Full-Featured asynchronous Mikrotik API interface for [NodeJS](http://nodejs.org). - + ```javscript var MikroNode = require('mikronode'); var device = new MikroNode('192.168.0.1'); - device.connect('admin','password').then(function(connection) { - - var chan=conn.openChannel("addresses"); // open a named channel - var chan2=conn.openChannel("firewall_connections",true); // open a named channel, turn on "closeOnDone" + device.connect() + .then(([login])=>{ + return login('username','password'); + }) + .then(function(connection) { - chan.write('/ip/address/print'); + var chan=conn.openChannel("addresses"); // open a named channel + var chan2=conn.openChannel("firewall_connections",true); // open a named channel, turn on "closeOnDone" - chan.on('done',function(data) { + chan.write('/ip/address/print'); + chan.on('done',function(data) { + // data is all of the sentences in an array. data.forEach(function(item) { console.log('Interface/IP: '+item.data.interface+"/"+item.data.address); }); - chan.close(); // close the channel. + chan.close(); // close the channel. It is not autoclosed by default. conn.close(); // when closing connection, the socket is closed and program ends. - }); - }); + }); - chan.write('/ip/firewall/print'); + chan.write('/ip/firewall/print'); - chan.done.subscribe(function(data){ + chan.done.subscribe(function(data){ // data is all of the sentences in an array. data.forEach(function(item) { @@ -36,11 +39,10 @@ console.log('Interface/IP: '+data.interface+"/"+data.address); }); - }); - }); + }); }); - +``` ## Installation Clone this repository into your node_modules directory. @@ -78,13 +80,28 @@ ### Connection - Calling new MikroNode(host) returns an API object. - - * apiInstance.connect('user','pass',optionsObject) - Connect to the target device. The callback function is called after successful login with the current connection object as its parameter. - * conn.openChannel(id) - Open and return a new channel object. Each channel is a unique command line to the mikrotik, allowing simultaneous execution of commands. The ID parameter is optional. - * conn.connected() + Calling new MikroNode(host[,port,socketTimeout]) returns an object representing the device. +```javascript + var MikroNode = require('mikronode'); + var Device =new MikroNode(host,port); + Device.connect().then(([login])=>login('admin','password')).then(function(conn) { + var chan=conn.openChannel(); + }); +``` +With the above code, the following is API description. conn is Connection object, chan is Channel object. + * MikroNode.resultsToObj(dataObj) + Convert the sentence format of the mikrotik into a more easily readable + * Device.connect([cb]) + Connect to the target device. The optional callback function is called after successful connect with the function to call to login as the 2nd parameter, and any connection errors as the first. + the connect method returns a Promise that is resolved when connecting. + * Device.socketOpts (write-only property) + Optionally allows the setting of parameters that the socket connecting to the mikrotik will use. + * Device.TLS(tlsOpts) + Enable TLS and set it's options. Take note that you will need to be sure the port the API is trying to connect is an SSL/TLS port. For unauthenticated SSL connections (no signed certs) only ADH cipher is supported. This is a limitation of the RouterOS software + * Device.setDebug(level) + Set the default debug logging level for the device, and all subsequent created connections. + * conn.openChannel(id|name) + Open and return a new channel object. Each channel is a unique command line to the mikrotik, allowing simultaneous execution of commands. The ID parameter is optional. If not specified, the current timestamp is used. If too many channels are opened at one time without specifying a name, there could be duplicate names. * conn.connected() Returns true is currently connected to a mikrotik device. * conn.closeChannel(id) Closes an open channel. This will call the close method of the channel object. @@ -98,20 +115,22 @@ ### Channel - The following methods are available for channels: + The following property/methods are available for channels: - * channel.done + * channel.done "done" is the stream that contains events when the done sentence comes from the device. When subscribing, the stream's data contans an object with each line received in an array. - * channel.read + * channel.data For each sentence received, this has an observable event. Only sentences designated for this channel will pass through this sentence. This is handy when following trailing output from a listen command, where the data could be endless. + * channel.trap + Any traps that occur on a channel can be captured in this observable stream. * chanenl.sync(b) If b == true, each command is run synchronously. Otherwise commands are executed as they are passed. * channel.closeOnDone(b) If b == true, when a done event occurs, close the channel after all commands queued have been executed. * channel.getId() - * channel.write(lines[,optionsObject]) + * channel.write(lines[,optionsObject]) Returns a promise that is resolved when the command sent is complete and is "done" The promise is rejected if a trap or fatal error occurs. Lines can be a string, or an array of strings. If it is a string, then it is split on the EOL character and each resulting line is sent as a separate word (in API speak) @@ -124,11 +143,11 @@ ## Examples ### Connect to a Mikrotik, and add an address to ether1 - +```javascript var api = require('mikronode'); var device = new api('192.168.0.1'); - device.connect('admin','password').then(function(conn) { + device.connect().then(([login])=>login('admin','password')).then(function(conn) { var chan=conn.openChannel(); @@ -142,19 +161,18 @@ chan.close(); conn.close(); }); - +``` ### Writing the program for the example API conversation on the [Mikrotik Wiki](http://wiki.mikrotik.com/wiki/API#.2Fcancel.2C_simultaneous_commands) - +```javascript var MikroNode = require('mikronode'); var device = new MikroNode('192.168.0.1'); - device.connect('admin','password').then(function(conn) { - + device.connect().then(([login])=>login('admin','password')).then(function(conn) { conn.closeOnDone(true); // when all channels are "done" the connection should close. var chan1=conn.openChannel("interface_listener"); chan1.write('/interface/listen'); - chan1.read.subscribe(function(item) { + chan1.data.subscribe(function(item) { var packet=MikroNode.resultsToObj(item.data); console.log('Interface change: '+JSON.stringify(packet)); }); @@ -193,14 +211,14 @@ }); }); }); - +``` ### Simplifying the above by reducing the number of channels. Notice how the callback embedding is not needed using the syncronous capability. - +```javascript var MikroNode = require('mikronode'); var device = new MikroNode('192.168.0.1'); - device.connect('admin','password').then(function(conn) { + device.connect().then(([login])=>login('admin','password')).then(function(conn) { conn.closeOnDone(true); // All channels need to complete before the connection will close. var listenChannel=conn.openChannel(); listenChannel.write('/interface/listen'); @@ -227,13 +245,13 @@ }); actionChannel.close(); // The above commands will complete before this is closed. }); - +``` ### Promises add simplicity: - +```javascript var MikroNode = require('mikronode'); var device = new MikroNode('192.168.0.1'); - device.connect('admin','password').then(function(conn) { - console.log("Logged in.") + device.connect().then(([login])=>login('admin','password')).then(function(conn) { + console.log("Logged in."); conn.closeOnDone(true); // All channels need to complete before the connection will close. var listenChannel=conn.openChannel("listen"); @@ -255,25 +273,39 @@ .catch(error=>console.log("Listen channel rejection:",error)); // All our actions go through this. - var actionChannel=conn.openChannel("action"); + var actionChannel=conn.openChannel("action",false); // don't close on done... because we are running these using promises, the commands complete before each then is complete. // Do things async. This is to prove that promises work as expected along side streams. actionChannel.sync(false); + actionChannel.closeOnDone(false); // Turn off closeOnDone because the timeouts set to allow the mikrotik to reflect the changes takes too long. The channel would close. // These will run synchronsously (even though sync is not set to true) console.log("Disabling interface"); actionChannel.write('/interface/set',{'disabled':'yes','.id':'ether1'}).then(results=>{ console.log("Disable complete."); - // Delay 1 second before running next command so that the Interface change listener can report the change. - return new Promise((r,x)=>setTimeout(r,1000)).then(()=>actionChannel.write('/interface/set',{'disabled':'no','.id':'ether1'})); + // when the first item comes in from the listen channel, it should send the next command. + const {promise,resolve,reject}=MikroNode.getUnwrappedPromise(); + listenChannel.data + .take(1) + // This is just to prove that it grabbed the first one. + .do(d=>console.log("Data:",MikroNode.resultsToObj(d.data))) + .subscribe(d=>actionChannel.write('/interface/set',{'disabled':'no','.id':'ether1'}).then(resolve,reject)); + return promise; }) .then(results=>{ console.log("Enabled complete."); - // Delay 1 second before running next command so that the Interface change listener can report the change. - return new Promise((r,x)=>setTimeout(r,1000)).then(()=>actionChannel.write('/interface/getall')); + // return new Promise((r,x)=>setTimeout(r,1000)).then(()=>actionChannel.write('/interface/getall')); + const {promise,resolve,reject}=MikroNode.getUnwrappedPromise(); + // when the second item comes in from the listen channel, it should send the next command. + listenChannel.data + .take(1) + // This is just to prove that it grabbed the second one. + .do(d=>console.log("Data:",MikroNode.resultsToObj(d.data))) + .subscribe(d=>actionChannel.write('/interface/getall').then(resolve,reject)); + return promise; }) .then(results=>{ - var formatted=MikroNode.resultsToObj(results); + var formatted=MikroNode.resultsToObj(results.data); var columns=[".id","name","mac-address","comment"]; var filtered=formatted.map(line=>columns.reduce((p,c)=>{p[c]=line[c];return p},{})); console.log('Interface [ID,Name,MAC-Address]: ',JSON.stringify(filtered,true,4)); @@ -286,18 +318,9 @@ console.log("Closing everything."); listenChannel.close(true); // This should call the /cancel command to stop the listen. actionChannel.close(); - }); - - // This just watches for responses from the writes in the promises above. There are no results in the set commands, but there is a large result for the getall - actionChannel.done.subscribe(function(results) { - console.log('Interface (done): ',results); - },error=>{ - console.log("Error during done subscription",error) - },()=>{ - console.log("Action channel done."); }); }); - +``` ### The methods *decodeLength* and *encodeString* were written based on code [here on the Mikrotik Wiki](http://wiki.mikrotik.com/wiki/API_PHP_class#Class). @@ -305,7 +328,7 @@ (The MIT License) -Copyright (c) 2011,2012,2013,2014,2015,2016 Brandon Myers +Copyright (c) 2011,2012,2013,2014,2015,2016,2017 Brandon Myers Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the diff --git a/examples/getInterfacesAndRoutes.js b/examples/getInterfacesAndRoutes.js index 6fd0637..a572ced 100644 --- a/examples/getInterfacesAndRoutes.js +++ b/examples/getInterfacesAndRoutes.js @@ -1,10 +1,10 @@ var api=require('../dist/mikronode.js'); -var device=new api(/* Host */'10.10.10.1' /*, Port */ /*, Timeout */); +var device=new api(/* Host */'10.10.10.10' /*, Port */ /*, Timeout */); // device.setDebug(api.DEBUG); // connect: user, password. -device.connect('username','password').then(function(conn) { +device.connect().then(([login])=>login('username','password')).then(function(conn) { var c1=conn.openChannel(); var c2=conn.openChannel(); c1.closeOnDone(true); diff --git a/examples/ipCountOnly.js b/examples/ipCountOnly.js index 63beaf2..ffeb507 100644 --- a/examples/ipCountOnly.js +++ b/examples/ipCountOnly.js @@ -1,11 +1,11 @@ var MikroNode = require('../dist/mikronode.js'); // Create API instance to a host. -var device = new MikroNode('10.10.10.1'); +var device = new MikroNode('10.10.10.10'); // device.setDebug(MikroNode.DEBUG); // Connect to MikroTik device -device.connect('username','password').then( +device.connect(/* socketOpts */).then(([login])=>login('username','password')).then( function(conn) { // When all channels are marked done, close the connection. conn.closeOnDone(true); diff --git a/examples/promise.js b/examples/promise.js index a0291de..9104133 100644 --- a/examples/promise.js +++ b/examples/promise.js @@ -1,8 +1,9 @@ -var MikroNode = require('../src/index.js'); +require('babel-register');var MikroNode = require('../src'); var device = new MikroNode('10.10.10.10'); -device.connect('admin','password').then(function(conn) { - console.log("Logged in.") +// device.setDebug(MikroNode.DEBUG); +device.connect().then(([login])=>login('admin','password')).then(function(conn) { + console.log("Logged in."); conn.closeOnDone(true); // All channels need to complete before the connection will close. var listenChannel=conn.openChannel("listen"); @@ -24,25 +25,39 @@ device.connect('admin','password').then(function(conn) { .catch(error=>console.log("Listen channel rejection:",error)); // All our actions go through this. - var actionChannel=conn.openChannel("action"); + var actionChannel=conn.openChannel("action",false); // don't close on done... because we are running these using promises, the commands complete before each then is complete. // Do things async. This is to prove that promises work as expected along side streams. actionChannel.sync(false); + actionChannel.closeOnDone(false); // Turn off closeOnDone because the timeouts set to allow the mikrotik to reflect the changes takes too long. The channel would close. // These will run synchronsously (even though sync is not set to true) console.log("Disabling interface"); actionChannel.write('/interface/set',{'disabled':'yes','.id':'ether1'}).then(results=>{ console.log("Disable complete."); - // Delay 1 second before running next command so that the Interface change listener can report the change. - return new Promise((r,x)=>setTimeout(r,1000)).then(()=>actionChannel.write('/interface/set',{'disabled':'no','.id':'ether1'})); + // when the first item comes in from the listen channel, it should send the next command. + const {promise,resolve,reject}=MikroNode.getUnwrappedPromise(); + listenChannel.data + .take(1) + // This is just to prove that it grabbed the first one. + .do(d=>console.log("Data:",MikroNode.resultsToObj(d.data))) + .subscribe(d=>actionChannel.write('/interface/set',{'disabled':'no','.id':'ether1'}).then(resolve,reject)); + return promise; }) .then(results=>{ console.log("Enabled complete."); - // Delay 1 second before running next command so that the Interface change listener can report the change. - return new Promise((r,x)=>setTimeout(r,1000)).then(()=>actionChannel.write('/interface/getall')); + // return new Promise((r,x)=>setTimeout(r,1000)).then(()=>actionChannel.write('/interface/getall')); + const {promise,resolve,reject}=MikroNode.getUnwrappedPromise(); + // when the second item comes in from the listen channel, it should send the next command. + listenChannel.data + .take(1) + // This is just to prove that it grabbed the second one. + .do(d=>console.log("Data:",MikroNode.resultsToObj(d.data))) + .subscribe(d=>actionChannel.write('/interface/getall').then(resolve,reject)); + return promise; }) .then(results=>{ - var formatted=MikroNode.resultsToObj(results); + var formatted=MikroNode.resultsToObj(results.data); var columns=[".id","name","mac-address","comment"]; var filtered=formatted.map(line=>columns.reduce((p,c)=>{p[c]=line[c];return p},{})); console.log('Interface [ID,Name,MAC-Address]: ',JSON.stringify(filtered,true,4)); @@ -56,4 +71,4 @@ device.connect('admin','password').then(function(conn) { listenChannel.close(true); // This should call the /cancel command to stop the listen. actionChannel.close(); }); -}); +}).catch(e=>console.log("Error connecting: ",e)); diff --git a/examples/simpleWrite.js b/examples/simpleWrite.js index e54d787..0592cdb 100644 --- a/examples/simpleWrite.js +++ b/examples/simpleWrite.js @@ -1,11 +1,11 @@ -var MikroNode = require('../src'); +var MikroNode = require('../dist/mikronode.js'); // Create API instance to a host. var device = new MikroNode('10.10.10.10'); device.setDebug(MikroNode.DEBUG); // Connect to MikroTik device -device.connect('admin','password').then(conn=>{ +device.connect().then(([login])=>login('admin','password')).then(conn=>{ // When all channels are marked done, close the connection. console.log('connected'); diff --git a/examples/watchIpList.js b/examples/watchIpList.js index fb1a9d3..93b00c5 100644 --- a/examples/watchIpList.js +++ b/examples/watchIpList.js @@ -2,14 +2,14 @@ var MikroNode = require('../dist/mikronode.js'); // Create API link to host. No connection yet.. -var device = new MikroNode('10.10.10.1'); +var device = new MikroNode('10.10.10.10'); // Debug level is "DEBUG" // device.setDebug(MikroNode.DEBUG); var removeId=[]; // Connect to the MikroTik device. -device.connect('username','password').then(function(conn) { +device.connect().then(([login])=>login('username','password')).then(function(conn) { var channel=conn.openChannel('all_addresses'); channel.closeOnDone(true); // only use this channel for one command. diff --git a/src/Channel.js b/src/Channel.js index 1747089..53cc0df 100644 --- a/src/Channel.js +++ b/src/Channel.js @@ -187,28 +187,26 @@ export default class Channel extends events.EventEmitter { const cmd=this.getCommandId(data); return Observable.of({...data,tag:data.tag.substring(0,data.tag.lastIndexOf('-')),cmd:(this.getCommand(cmd)||{cmd:null}).cmd}); }).share(); + // Stream for sentences with data. this.data = this.createStream(this.read,[EVENT.DATA,EVENT.DONE_RET]).share(); + // Stream for signaling when done. this.done = this.createStream(this.read,[EVENT.DONE,EVENT.DONE_RET,EVENT.DONE_TAG]).share(); // Stream for all traps from device. - this.trap=this.read.filter(e=>e.type==EVENT.TRAP||e.type===EVENT.TRAP_TAG) - .do(e=>this.debug>=DEBUG.DEBUG&&console.log('Channel (%s)::TRAP ',id)) - .share(); - // this.trap.subscribe(e=>{ - // if (this.closeOnTrap||this.status&CHANNEL.CLOSING) { - // this.status=CHANNEL.CLOSING; - // this.debug>=DEBUG.INFO&&console.log('Channel (%s)::CLOSING',id); - // this.close(); - // } - // }); + this.trap=this.read + .filter(e=>e.type==EVENT.TRAP||e.type===EVENT.TRAP_TAG) + .do(e=>this.debug>=DEBUG.DEBUG&&console.log('Channel (%s)::TRAP ',id)) + .share(); + this.read.filter(e=>e.type==EVENT.FATAL) .subscribe(e=>{ this.debug>=DEBUG.DEBUG&&console.log('Channel (%s)::FATAL ',id); this.status=CHANNEL.CLOSING; this.close(); }); + this.bufferedStream=new Subject(); } @@ -220,7 +218,15 @@ export default class Channel extends events.EventEmitter { write(command,args=[]) { if (this.status&(CHANNEL.CLOSED|CHANNEL.CLOSING)) { this.debug>=DEBUG.WARN&&console.error("Cannot write on closed or closing channel"); - return this; + const p = new Promise((resolve,reject)=>reject({tag:this.id,data:{message:"Cannot write on closed or closing channel"},cmd:{command,args}})); + // p.catch(e=>{console.error(e.data.message)}); + return p; + } + if (command==='/cancel') { + Object.keys(this.cmd).forEach(id=>{ + this.stream.write(command,args,id); + }); + return Promise.resolve({tag:this.id,data:{message:"/cancel sent."}}); } const {promise,resolve,reject}=getUnwrappedPromise(); @@ -242,13 +248,7 @@ export default class Channel extends events.EventEmitter { if (this.sync) last.promise.then(()=>{ this.status=CHANNEL.RUNNING; this.stream.write(command,args,commandId); - },()=>{ - if (this.closeOnTrap) { - this.status=CHANNEL.CLOSING; - return; - } - if (this.status&CHANNEL.CLOSING) return; - this.status=CHANNEL.RUNNING; + }).catch(()=>{ this.stream.write(command,args,commandId); }); // Otherwise since the last command was sent, we can send this one now. @@ -258,21 +258,6 @@ export default class Channel extends events.EventEmitter { } } - promise.then((e)=>{ - // If we want to close on done, and there are no commands waiting to run - this.status=CHANNEL.DONE; - if (!Object.keys(this.cmd).length) { - if (this.closeOnDone) this.close(); - } - }); - // Collapsing on error... - promise.catch(e=>{ - this.status=CHANNEL.DONE; - if (this.closeOnTrap) { - this.status=CHANNEL.CLOSING; - this.debug>=DEBUG.DEBUG&&console.log('Channel (%s):: read-done catch CLOSING',this.id); - } - }); return promise; } @@ -289,12 +274,17 @@ export default class Channel extends events.EventEmitter { return this.clearCommand(commandId.id); return null; } + this.debug>=DEBUG.DEBUG&&console.log("Clearing command cache for #",commandId); const cmd = this.cmd[commandId]; if (!cmd) return; + delete cmd.promise.cmd; delete cmd.promise.resolve; delete cmd.promise.reject; delete cmd.promise; delete this.cmd[commandId]; + if (!Object.keys(this.cmd).length) { + if (this.closeOnDone) this.close(); + } } /** * Get the last command relative to the commandId @@ -347,22 +337,35 @@ export default class Channel extends events.EventEmitter { ).take(1); race.partition(data=>data.type==EVENT.TRAP||data.type===EVENT.TRAP_TAG) - .reduce((r,o,i)=>{ - if (i==0) { - o.subscribe(error=>{ - this.debug>=DEBUG.SILLY&&console.error("*** Register Command: trap",id,error); - p.reject(error); - }); - } else - return o; - },{}) - - const data=this.data + .reduce((r,o,i)=>{ + if (i==0) { + o.subscribe(error=>{ + this.debug>=DEBUG.DEBUG&&console.error("*** Register Command: trap",id,error); + this.status=CHANNEL.DONE; + if (this.closeOnTrap) { + this.status=CHANNEL.CLOSING; + this.debug>=DEBUG.DEBUG&&console.log('Channel (%s):: read-done catch CLOSING',this.id); + this.close(true); + } + p.reject(error); + this.emit('trap',error); + },null, + // this should happen for every command + ()=>{ + this.debug>=DEBUG.SILLY&&console.log("*** Register Command: complete from trap", commandId); + }); + } else + return o; + },{}) + + const isListen=command.split('/').indexOf('listen')>0; + const data= + this.data .filter(data=>data.cmd.id===id) .takeUntil(race) .do(d=>this.debug>=DEBUG.SILLY&&console.log("*** Data in %s:%s",d.cmd.id,id)) .reduce((acc,d)=>{ - if (d.data) acc.data=acc.data.concat([d.data]); + if (d.data&&!isListen) acc.data=acc.data.concat([d.data]); return acc; },{cmd:this.cmd[id].cmd,tag:this.id,data:[]}) .do(d=>this.debug>=DEBUG.SILLY&&console.log("*** Reduced Data in ",d)) @@ -372,13 +375,15 @@ export default class Channel extends events.EventEmitter { this.status=CHANNEL.DONE; this.bufferedStream.next(data); p.resolve(data); + this.emit('done',data); }, error=>{ this.debug>=DEBUG.SILLY&&console.error("*** Register Command: error",id,error); }, + // this should happen for every command ()=>{ - this.debug>=DEBUG.SILLY&&console.log("*** Register Command: complete"); - this.clearCommand(id); + this.debug>=DEBUG.SILLY&&console.log("*** Register Command: complete",commandId); + setTimeout(()=>this.clearCommand(id),50); // make sure all promises complete before running this. }); }.bind(this))(commandId,promise); return this.cmd[commandId].cmd; @@ -414,7 +419,10 @@ export default class Channel extends events.EventEmitter { // status() { return this.status } close(force) { if (this.status&CHANNEL.RUNNING) { - if (force) this.stream.write('/cancel'); + if (force) + Object.keys(this.cmd).forEach(id=>{ + this.stream.write('/cancel',[],id); + }); this.closeOnDone=true; this.sync=true; this.status=CHANNEL.CLOSING; diff --git a/src/Connection.js b/src/Connection.js index 86f2e19..5b7a9ab 100644 --- a/src/Connection.js +++ b/src/Connection.js @@ -72,11 +72,17 @@ export default class Connection extends events.EventEmitter { ); stream.read - .subscribe(null,null,e=>this.channels.forEach(c=>c.close(true))); + .subscribe(null,null,e=>{ + this.channels.forEach(c=>c.close(true)); + setTimeout(()=>{ + this.emit('close',this); + },50) + }); } close() { this.debug>=DEBUG.SILLY&&console.log("Closing connection through stream"); + this.emit('close',this); this.stream.close(); } @@ -108,7 +114,7 @@ export default class Connection extends events.EventEmitter { } get connected() { - return !!(this.status&(CONNECTION.CONNECTED|CONNECTION.WAITING)); + return !!(this.status&(CONNECTION.CONNECTED|CONNECTION.WAITING|CONNECTION.CLOSING)); } @autobind openChannel(id,closeOnDone) { @@ -136,6 +142,7 @@ export default class Connection extends events.EventEmitter { var channel=this.getChannel(id); if (channel) { this.debug>=DEBUG.DEBUG&&console.log("Closing channel ",id); + setTimeout(channel.emit.bind(channel,'close',channel),50); this.channels.splice(this.channels.indexOf(channel),1); if (this.channels.filter(c=>c.status&(CHANNEL.OPEN|CHANNEL.RUNNING)).length===0 && this.closeOnDone) this.close(); } else @@ -145,7 +152,6 @@ export default class Connection extends events.EventEmitter { // If Connection closeOnDone, then check if all channels are done. if (this.closeOnDone) { const cl=this.channels.filter(c=>c.status&(CHANNEL.OPEN|CHANNEL.RUNNING)); - console.log("Channel done (%s)",cl); if (cl.length) return false; this.debug>=DEBUG.DEBUG&&console.log("Channel done (%s)",id); this.channels.filter(c=>c.status&(CHANNEL.DONE)).forEach(c=>console.log("Closing...",c)); diff --git a/src/Util.js b/src/Util.js index a95ffdd..e7526cf 100644 --- a/src/Util.js +++ b/src/Util.js @@ -98,12 +98,58 @@ function hexDump(data) { // This is probably over done... // Uncomment if you want to detail trace your promises. function nullfunc(){} +const rejectionWatcher=new WeakMap(); + +// function clearRejectionTrack(catcher,reason) { +// const x=rejectionWatcher.get(this); +// x.splice(x.findIndex(catcher),1); +// return catcher.call(this,reason); +// } + +// function proxyThenCatch(promise) { +// const catchEx = promise.catch; +// const thenEx = promise.then; + +// console.log("Adding promise to watcher map"); +// // rejectionWatcher.set(promise,[]); + +// promise.then=function(handler,catcher) { +// if (catcher) { +// // rejectionWatcher.get(promise).push(catcher); +// console.log("tracking catcher"); +// } +// return proxyThenCatch(thenEx.call(promise,handler,clearRejectionTrack.bind(promise,catcher))); +// }.bind(promise); + +// promise.catch=function(catcher) { +// if (!catcher) return; +// // rejectionWatcher.get(promise).push(catcher); +// return proxyThenCatch(catchEx.call(promise,catcher)); +// }.bind(promise); +// return promise; +// } + +process.on('unhandledRejection',function(event,promise){ + if (event.cmd) return; + // console.log("caught unhandled rejection. Command still running..."); + // rejectionWatcher.set(promise,event); + // } else + console.error("Unhandled rejection: ",JSON.stringify(event,true,4),'\n',promise); +}); + +// process.on('rejectionHandled',function(p){ +// console.log('Rejection handled.'); +// rejectionWatcher.delete(p); +// }); + function getUnwrappedPromise() { let resolve,reject; + const e = new Error(); const promise = new Promise((res,rej)=>{ resolve=res; reject=rej; }); + promise.createdAt=e.stack.split('\n').slice(2,3).join('\n'); return { get promise() { return promise; diff --git a/src/index.js b/src/index.js index 38d88c9..4f6e2ff 100644 --- a/src/index.js +++ b/src/index.js @@ -1,19 +1,21 @@ import util from 'util'; import net from 'net'; +import TLS from 'tls'; import Promise from 'promise'; import {Subject, Observable} from 'rxjs'; import {autobind} from 'core-decorators'; import crypto from 'crypto'; +import dns from 'dns'; -import {hexDump, decodeLength, encodeString, objToAPIParams, resultsToObj} from './Util.js'; +import {hexDump, decodeLength, encodeString, objToAPIParams, resultsToObj, getUnwrappedPromise} from './Util.js'; import {STRING_TYPE, DEBUG, CONNECTION,CHANNEL,EVENT} from './constants.js'; import parser from './parser.js'; import Connection from './Connection'; const Socket=net.Socket; + const nullString=String.fromCharCode(0); -const emptyFunction=()=>{}; class MikroNode { @@ -39,6 +41,15 @@ class MikroNode { @Private status=CONNECTION.DISCONNECTED; + + @Private + tls=null; + + @Private + socketOpts={}; + + @Private + socketProto='tcp4'; /** * Creates a MikroNode API object. * @exports mikronode @@ -108,15 +119,22 @@ class MikroNode { this.port=port; } - /** set tls options for this connection */ - setTLS(options) { - if (options) { - this.tls=options + /** get/set tls options for this connection */ + TLS(opts={}) { + if (opts) { + this.tls=opts; + if (opts.host) this.host=opts.host; + if (opts.port) this.port=opts.port; return this; } return this.tls; } + set socketOpts(opts) { + this.socketOpts=opts; + if (opts.host) this.host=opts.host; + if (opts.port) this.port=opts.port; + } /** Set timeout for socket connecion */ setTimeout(timeout) { this.timeout=timeout; @@ -124,37 +142,75 @@ class MikroNode { } /** Connect to remote server using ID and password */ - connect(user,password) { + connect(arg1,arg2) { this.debug>=DEBUG.INFO&&console.log('Connecting to '+this.host); + let cb; this.debug>=DEBUG.SILLY&&console.log('Creating socket'); - this.sock = new SocketStream(this.timeout,this.debug); + this.sock = new SocketStream(this.timeout,this.debug,this.tls?typeof this.tls===typeof {}?this.tls:{}:false); const stream=this.sock.getStream(); - const login=challenge=>{ - const md5=crypto.createHash('md5'); - md5.update(Buffer.concat([Buffer.from(nullString+password),Buffer.from(challenge)])); - stream.write([ - "/login", - "=name="+user, - "=response=00"+md5.digest("hex") - ]); - }; + if (typeof arg1===typeof {}) { + this.socketOpts={...this.socketOpts,arg1}; + if (typeof arg1===typeof function(){}) + cb=arg2; + } else if (typeof arg1===typeof function(){}) cb=arg1; const close=()=>this.sock.getStream().sentence.complete(); + + const login=(user,password,cb)=>{ + this.debug>=DEBUG.DEBUG&&console.log('Logging in'); + stream.write('/login'); + const {promise,resolve,reject}=getUnwrappedPromise(); + // Create a connection handler + this.connection=new Connection( + {...stream,close}, + challenge=>{ + const md5=crypto.createHash('md5'); + md5.update(Buffer.concat([Buffer.from(nullString+password),Buffer.from(challenge)])); + stream.write([ + "/login", + "=name="+user, + "=response=00"+md5.digest("hex") + ]); + },{resolve,reject} + ); + this.connection.setDebug(this.debug); + promise.then(()=>{ + if (cb) cb(null,this.connection); + },err=>{ + if (cb) cb(err,null); + }); + return promise; + }; + this.debug>=DEBUG.SILLY&&console.log('Creating promise for socket connect'); const promise = new Promise((resolve,reject) => { - const connected = () => { - // Create a connection handler - this.debug>=DEBUG.DEBUG&&console.log('Logging in'); - stream.write('/login'); - this.connection=new Connection({...stream,close},login,{resolve,reject}); - this.connection.setDebug(this.debug); - /* Initiate Login */ - }; - this.debug>=DEBUG.SILLY&&console.log('Connecting to remote host'); - this.sock.connect(this.host,this.port,connected); - this.sock.getStream().sentence.subscribe(null,reject,null); // reject promise on error. + this.debug>=DEBUG.SILLY&&console.log('Connecting to remote host. Detected %s',net.isIPv6(this.host)?'ipv6':net.isIPv4(this.host)?'ipv4':'DNS lookup'); + const fn=((net.isIPv4(this.host)||net.isIPv6(this.host))?((this.socketOpts.family=net.isIPv6(this.host)?6:4),(a,b)=>b(null,[a])):((this.socketOpts.family==6)?dns.resolve4:dns.resolve6)); + fn(this.host,(err,data)=>{ + if (err) { + return reject("Host resolve error: ",err); + } + // this.debug>=DEBUG.DEBUG&&console.log('Socket connect: ',{...this.socketOpts,...this.tls,host:this.host,port:this.port}); + this.sock.connect({ + ...this.socketOpts, + ...this.tls, + host:data[0], + port:this.port + }).then(([socketOpts,...args])=>{ + this.debug>=DEBUG.DEBUG&&console.log('Connected. Waiting for login.'); + // initiate the login process + resolve([login,socketOpts,...args]); + if (cb) cb(null,login,socketOpts,...args); + /* Initiate Login */ + this.sock.getStream().sentence.take(1).subscribe(null,reject,null); + }).catch(err=>{ + if (cb) cb(err,null); + reject("Caught error in socket connect",err); + }); + // reject connect promise if the socket throws an error. + }); }); // Connect to the server. return promise; @@ -163,12 +219,15 @@ class MikroNode { // Object.keys(DEBUG).forEach(k=>MikroNode[k]=DEBUG[k]); const api=Object.assign(MikroNode,DEBUG); -export default Object.assign(api,{CONNECTION, CHANNEL, EVENT, resultsToObj}); +export default Object.assign(api,{CONNECTION, CHANNEL, EVENT, resultsToObj, getUnwrappedPromise}); /** Handles the socket connection and parsing of infcoming data. */ /* This entire class is private (not exported) */ class SocketStream { + @Private + rawSocket; + @Private socket; @@ -187,11 +246,13 @@ class SocketStream { @Private data$; - constructor(timeout,debug) { + constructor(timeout,debug,tls) { debug>=DEBUG.DEBUG&&console.log('SocketStream::new',[timeout,debug]); this.debug=debug; - this.socket = new Socket({type:'tcp4'}); + this.rawSocket = new Socket(); + + this.socket=tls?new TLS.TLSSocket(this.rawSocket,tls):this.rawSocket; this.sentence$=new Subject(); // Each raw sentence from the stream passes through this parser. @@ -274,11 +335,14 @@ class SocketStream { this.setTimeout(timeout); // This is the function handler for error or complete for the parsing functions. - const closeSocket=(e)=>{this.debug>=DEBUG.DEBUG&&console.log("Closing Socket ",e);e?this.socket.destroy(e):this.socket.destroy();} + const closeSocket=(e)=>{ + this.debug>=DEBUG.DEBUG&&console.log("Closing Socket ",e); + e?this.rawSocket.destroy(e):this.rawSocket.destroy(); + } /** Listen for complete on stream to dictate if socket will close */ this.sentence$ // .do(d=>console.log("Sentence: ",d)) - .subscribe(emptyFunction,closeSocket,closeSocket); + .subscribe(null,closeSocket,closeSocket); // This will be called if there is no activity to the server. // If this occurs before the login is successful, it could be @@ -305,15 +369,39 @@ class SocketStream { } }); } + /** Connect the socket */ - connect(host,port,cb) { - this.debug>=DEBUG.DEBUG&&console.log('SocketStream::Connect',[host,port,cb]); + connect(socketOpts) { + this.debug>=DEBUG.DEBUG&&console.log('SocketStream::Connect %s',this.tls?"(TLS)":"",socketOpts); this.status=CONNECTION.CONNECTING; - this.host = host; - this.socket.connect(port,host,(...args)=>{ - this.debug>=DEBUG.INFO&&console.log('Socket Connected'); - this.status=CONNECTION.CONNECTED; - cb(...args); + this.host = socketOpts.host||'localhost'; + return new Promise((res,rej)=>{ + // Connect to the socket. This works for both TLS and non TLS sockets. + try { + this.rawSocket.connect(socketOpts,(...args)=>{ + this.debug>=DEBUG.INFO&&console.log('SocketStream::Connected ',args,socketOpts); + this.status=CONNECTION.CONNECTED; + socketOpts={ + ...socketOpts, + localAddress:this.socket.localAddress, + localPort:this.socket.localPort + }; + if (this.socket.encrypted) + res([{ + ...socketOpts, + authorized:this.socket.authorized, + authorizationError:this.socket.authorizationError, + protocol: this.socket.getProtocol(), + alpnProtocol:this.socket.alpnProtocol, + npnProtocol:this.socket.npnProtocol, + cipher: this.socket.getCipher(), + cert: this.socket.getPeerCertificate(), + },...args]); + else res([socketOpts,...args]); + }); + } catch (e) { + rej("Caught exception while opening socket: ",e) + } }); } diff --git a/test/close.js b/test/close.js index a2bd620..cf09bbe 100644 --- a/test/close.js +++ b/test/close.js @@ -2,22 +2,28 @@ // This only verifies that all channels have been eliminated. // A more full-featured test is in the works. -var api=require('../lib/index.js') +var api=require('../dist/mikronode.js') -var config=require('./config.js'); -config.push({debug:2}); // Add debug options to see what's happening. -var connection=api.prototype.constructor.apply(api,config) +var device=new api('10.10.10.10'); +// device.setDebug(api.DEBUG); + +device.connect( + function(err,login) { + login('admin','password',runProgram); + } +); + +function runProgram(err,c) { -connection.connect(function(c) { console.log('Connection established'); - channel1 = c.openChannel(); - channel2 = c.openChannel(); - channel3 = c.openChannel(); - + const channel1 = c.openChannel(1); + const channel2 = c.openChannel(2); + const channel3 = c.openChannel(3); c.on('close',function(c2) { - id=channel1.getId(); + var id=channel1.getId(); + console.log("Channel closing...") try { c2.getChannel(id); console.log('Channel %s is still available. Error.',id); @@ -39,6 +45,6 @@ connection.connect(function(c) { console.log('Channel %s is gone!',id); } }); - channel1.write('/quit') -}); + channel1.write('/quit').catch(e=>{console.log("Error writing quit",e)}) +} diff --git a/test/ssl.js b/test/ssl.js new file mode 100644 index 0000000..6dbe832 --- /dev/null +++ b/test/ssl.js @@ -0,0 +1,41 @@ +var MikroNode = require('../dist/mikronode.js'); +var device = new MikroNode('10.10.10.10',8729); // We specify the SSL/TLS port of our Mikrotik here. +0 +// device.setDebug(MikroNode.DEBUG); + +// By setting TLS options, TLS connection is enabled. +device.TLS({ + rejectUnauthorized : false, + // If your mikrotik does not have a valid certificate, this cipher is the only one that will work. + ciphers:'ADH' +}); + +device.connect(/* socketOpts */).then(function([login,socketInfo]){ + // The ability to login or not depending on resolting socket info. + console.log("Connected.\nLogging in."); + return login('admin','password'); // must return result of login(); +}).then(function(conn) { + try { + console.log("Login complete. Ready for command."); + conn.closeOnDone(true); + var channel=conn.openChannel("address_export"); + channel.closeOnDone(true); + + console.log("Writing command to listen for DHCP lease changes."); + const p=channel.write('/ip/dhcp-server/lease/listen').catch(e=>{ + console.log("Cancel processed"); + }); + + // Cancel the listen in 60 seconds. Should cause stuff to complete. + setTimeout(()=>{channel.write('/cancel').then(()=>{console.log("Sent cancel.")})},10*1000); + // p.then(data=>console.log("Data received in promise: ",data)); + + channel.data.subscribe(e=>console.log("Data Sub: ",MikroNode.resultsToObj(e.data))); + // channel.done.subscribe(data=>console.log("Done Sub %s:",data.cmd.command,MikroNode.resultsToObj(data.data))); + + } catch (e) { + console.log("Error while running ",e); + } +}).catch(err=>{ + console.log("Error occured while connecting/logging in ",err); +}); diff --git a/test/sync.js b/test/sync.js index 943ecb7..ebf9ed0 100644 --- a/test/sync.js +++ b/test/sync.js @@ -1,8 +1,8 @@ -var MikroNode = require('../src'); +var MikroNode = require('../dist/mikronode.js'); var device = new MikroNode('10.10.10.10'); // device.setDebug(MikroNode.DEBUG); -device.connect('admin', 'password').then(function (conn) { +device.connect().then(([login])=>login('admin', 'password').then(function (conn) { console.log("Connected"); // When all channels are complete close the connection. conn.closeOnDone(true); diff --git a/test/test1.js b/test/test1.js index d5675ce..04d4979 100644 --- a/test/test1.js +++ b/test/test1.js @@ -1,7 +1,13 @@ -var MikroNode = require('../src'); +var MikroNode = require('../dist/mikronode.js'); var device = new MikroNode('10.10.10.10'); // device.setDebug(MikroNode.DEBUG); -device.connect('admin','password').then(conn=>{ +// By setting TLS options, TLS connection is enabled. + +device.connect(/* socketOpts */).then(function([login,socketInfo,...args]){ + // The ability to login or not depending on resolting socket info. + console.log("Connected: ",socketInfo); + return login('admin','password'); // must return result of login(); +}).then(conn=>{ try { console.log("Connected"); conn.closeOnDone(true); @@ -9,17 +15,18 @@ device.connect('admin','password').then(conn=>{ channel.closeOnDone(true); console.log("Writing command..."); - const p=channel.write('/ip/address/print'); + const p=channel.write('/ip/dhcp-server/lease/listen'); - p.then(()=>console.log("Command Written")); - p.done.then(data=>console.log("Data received in promise: ",data)); + // Cancel the listen in 60 seconds. Should cause stuff to complete. + setTimeout(()=>{channel.write('/cancel')},60*1000); + // p.then(data=>console.log("Data received in promise: ",data)); channel.data.subscribe(e=>console.log("Data Sub: ",e.data)); - channel.done.subscribe(data=>console.log("Done Sub:",data)); + // channel.done.subscribe(data=>console.log("Done Sub %s:",data.cmd.command,MikroNode.resultsToObj(data.data))); } catch (e) { console.log("Error while running ",e); } },err=>{ - console.log("Error occured while connecting ",err); + console.log("Error occured while connecting/logging in ",err); }); diff --git a/test/testAPI.js b/test/testAPI.js index 51703dd..c6443c2 100644 --- a/test/testAPI.js +++ b/test/testAPI.js @@ -4,8 +4,8 @@ var api=require('../dist/mikronode.js'); var device=new api(/* Host */'127.0.0.1' /*, Port */ /*, Timeout */); // device.setDebug(api.DEBUG); -// connect: user, password. -device.connect('usrename','password').then(function(conn) { +// connect: +device.connect().then(([login])=>login('usrename','password')).then(function(conn) { var c1=conn.openChannel(); console.log('Getting Packages'); c1.write('/system/package/getall');