From ac26249c40babffc7da7960345a1c1ea8fe6eb54 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Tue, 6 Jun 2017 14:29:58 -0400 Subject: [PATCH] Error handling for tx stream. --- lib/services/wallet-api/index.js | 40 +++++++++++++++++++++----------- regtest/utils.js | 29 ++++++++++++++--------- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/lib/services/wallet-api/index.js b/lib/services/wallet-api/index.js index ba3baa6fa..6c61b8987 100644 --- a/lib/services/wallet-api/index.js +++ b/lib/services/wallet-api/index.js @@ -644,13 +644,13 @@ WalletService.prototype._endpointResyncAddresses = function() { if(!oldAddresses) { return res.status(404).send('Not found'); } - + self._removeWallet(walletId, function(err) { if(err) { return utils.sendError(err, res); } - + self._createWallet(walletId, function() { var jobId = utils.generateJobId(); @@ -713,48 +713,60 @@ WalletService.prototype._endpointGetTransactions = function() { walletId: walletId }; - var missingTxidCount = 0; - var transform = new Transform({ objectMode: true, highWaterMark: 1000000 }); + var missingTxidCount = 0; + var txStream = new Transform({ objectMode: true, highWaterMark: 1000000 }); //txids are sent in and the actual tx's are found here - transform._transform = function(chunk, enc, callback) { + txStream._transform = function(chunk, enc, callback) { var txid = self._encoding.decodeWalletTransactionKey(chunk).txid.toString('hex'); if (txid.length !== 64 || txid === '0000000000000000000000000000000000000000000000000000000000000000') { missingTxidCount++; - log.error('missingTxidCount: ', missingTxidCount); + txStream.emit('error', new Error('Chunk: ' + chunk.toString('hex') + ' did not contain a txid.')); return callback(); } self._getTransactionFromDb(options, txid, function(err, tx) { + err = new Error('this is a test error' + txid); + if(err) { log.error(err); - transform.unpipe(); + txStream.emit('error', err); return callback(); } var formattedTx = utils.toJSONL(self._formatTransaction(tx)); - transform.push(formattedTx); + txStream.push(formattedTx); callback(); }); }; - transform._flush = function(callback) { + txStream.on('error', function(err) { + log.error(err); + utils.sendError(err, res); + txStream.unpipe(); + }); + + txStream._flush = function(callback) { self.db.resumeSync(); callback(); }; var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding); - var stream = self.db.createKeyStream(self._getSearchParams(encodingFn, options)); + var dbStream = self.db.createKeyStream(self._getSearchParams(encodingFn, options)); - stream.on('close', function() { - stream.unpipe(); + dbStream.on('close', function() { + dbStream.unpipe(); }); - stream.pipe(transform).pipe(res); + dbStream.pipe(txStream).pipe(res); + + res.on('end', function() { + console.log('res has ended'); + }); }); }); }; @@ -1373,7 +1385,7 @@ WalletService.prototype._setupWriteRoutes = function(app) { v.checkAddresses, s._endpointPostAddresses() ); - app.put('/wallets/:walletId/addresses/resync', + app.put('/wallets/:walletId/addresses/resync', s._endpointResyncAddresses() ); }; diff --git a/regtest/utils.js b/regtest/utils.js index c7c3f2190..c794360c0 100644 --- a/regtest/utils.js +++ b/regtest/utils.js @@ -76,16 +76,6 @@ utils.waitForBitcoreNode = function(opts, callback) { var self = this; - opts.bitcore.process.stdout.on('data', function(data) { - if (opts.debug) { - console.log(data.toString()); - } - }); - - opts.bitcore.process.stderr.on('data', function(data) { - console.log(data.toString()); - }); - var errorFilter = function(err, res) { try { var info = JSON.parse(res); @@ -153,7 +143,22 @@ utils.initializeAndStartService = function(opts, callback) { }; utils.startBitcoreNode = function(opts, callback) { - this.initializeAndStartService(opts.bitcore, callback); + this.initializeAndStartService(opts.bitcore, function(err) { + if (err) { + return callback(err); + } + + opts.bitcore.process.stdout.on('data', function(data) { + if (opts.debug) { + process.stdout.write(data.toString()); + } + }); + + opts.bitcore.process.stderr.on('data', function(data) { + process.stderr.write(data.toString()); + }); + callback(); + }); }; utils.startBitcoind = function(opts, callback) { @@ -341,6 +346,8 @@ utils.getListOfTxs = function(opts, callback) { path: '/wallet-api/wallets/' + opts.walletId + '/transactions?start=0&end=' + end }); self.queryBitcoreNode(httpOpts, function(err, res) { + +console.log(err, res); if(err) { return callback(err); }