From 27a0c4619c635abf6e6f1396172bed620a4317bc Mon Sep 17 00:00:00 2001 From: Georg Schwarz Date: Mon, 23 Nov 2020 13:31:04 +0100 Subject: [PATCH] Add implementation of amqpPublisher --- package-lock.json | 122 +++++++++++++++++++++++++++++++++++++++++-- package.json | 4 +- src/amqpPublisher.ts | 61 ++++++++++++++++++++++ src/index.ts | 1 + 4 files changed, 182 insertions(+), 6 deletions(-) create mode 100644 src/amqpPublisher.ts diff --git a/package-lock.json b/package-lock.json index 55da9ad..aa54fcc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "@jvalue/node-dry-amqp", - "version": "0.0.3", + "version": "0.0.1", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -100,6 +100,20 @@ "fastq": "^1.6.0" } }, + "@types/amqplib": { + "version": "0.5.16", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.5.16.tgz", + "integrity": "sha512-M/D13gVboXAA07A6GX8df3H/Wew1CjfIceVasnVK5SiSB9PfRyPl8TKppVb/DeR1zIffToxOU5otDcoIsn2C6g==", + "requires": { + "@types/bluebird": "*", + "@types/node": "*" + } + }, + "@types/bluebird": { + "version": "3.5.33", + "resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.33.tgz", + "integrity": "sha512-ndEo1xvnYeHxm7I/5sF6tBvnsA4Tdi3zj1keRKRs12SP+2ye2A27NDJ1B6PqkfMbGAcT+mqQVqbZRIrhfOp5PQ==" + }, "@types/json-schema": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.6.tgz", @@ -115,8 +129,7 @@ "@types/node": { "version": "14.14.7", "resolved": "https://registry.npmjs.org/@types/node/-/node-14.14.7.tgz", - "integrity": "sha512-Zw1vhUSQZYw+7u5dAwNbIA9TuTotpzY/OF7sJM9FqPOF3SPjKnxrjoTktXDZgUjybf4cWVBP7O8wvKdSaGHweg==", - "dev": true + "integrity": "sha512-Zw1vhUSQZYw+7u5dAwNbIA9TuTotpzY/OF7sJM9FqPOF3SPjKnxrjoTktXDZgUjybf4cWVBP7O8wvKdSaGHweg==" }, "@typescript-eslint/eslint-plugin": { "version": "4.7.0", @@ -225,6 +238,19 @@ "uri-js": "^4.2.2" } }, + "amqplib": { + "version": "0.6.0", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.6.0.tgz", + "integrity": "sha512-zXCh4jQ77TBZe1YtvZ1n7sUxnTjnNagpy8MVi2yc1ive239pS3iLwm4e4d5o4XZGx1BdTKQ/U0ZmaDU3c8MxYQ==", + "requires": { + "bitsyntax": "~0.1.0", + "bluebird": "^3.5.2", + "buffer-more-ints": "~1.0.0", + "readable-stream": "1.x >=1.1.9", + "safe-buffer": "~5.1.2", + "url-parse": "~1.4.3" + } + }, "ansi-colors": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.1.tgz", @@ -294,6 +320,36 @@ "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=", "dev": true }, + "bitsyntax": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/bitsyntax/-/bitsyntax-0.1.0.tgz", + "integrity": "sha512-ikAdCnrloKmFOugAfxWws89/fPc+nw0OOG1IzIE72uSOg/A3cYptKCjSUhDTuj7fhsJtzkzlv7l3b8PzRHLN0Q==", + "requires": { + "buffer-more-ints": "~1.0.0", + "debug": "~2.6.9", + "safe-buffer": "~5.1.2" + }, + "dependencies": { + "debug": { + "version": "2.6.9", + "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", + "integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==", + "requires": { + "ms": "2.0.0" + } + }, + "ms": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", + "integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=" + } + } + }, + "bluebird": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.7.2.tgz", + "integrity": "sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg==" + }, "brace-expansion": { "version": "1.1.11", "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.11.tgz", @@ -313,6 +369,11 @@ "fill-range": "^7.0.1" } }, + "buffer-more-ints": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", + "integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==" + }, "call-bind": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/call-bind/-/call-bind-1.0.0.tgz", @@ -407,6 +468,11 @@ "integrity": "sha1-/ozxhP9mcLa67wGp1IYaXL7EEgo=", "dev": true }, + "core-util-is": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz", + "integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac=" + }, "cross-spawn": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", @@ -1067,8 +1133,7 @@ "inherits": { "version": "2.0.4", "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", - "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", - "dev": true + "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==" }, "is-arrayish": { "version": "0.2.1", @@ -1483,6 +1548,11 @@ "integrity": "sha512-XRsRjdf+j5ml+y/6GKHPZbrF/8p2Yga0JPtdqTIY2Xe5ohJPD9saDJJLPvp9+NSBprVvevdXZybnj2cv8OEd0A==", "dev": true }, + "querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" + }, "read-pkg": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/read-pkg/-/read-pkg-2.0.0.tgz", @@ -1515,12 +1585,35 @@ "read-pkg": "^2.0.0" } }, + "readable-stream": { + "version": "1.1.14", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "integrity": "sha1-fPTFTvZI44EwhMY23SB54WbAgdk=", + "requires": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.1", + "isarray": "0.0.1", + "string_decoder": "~0.10.x" + }, + "dependencies": { + "isarray": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", + "integrity": "sha1-ihis/Kmo9Bd+Cav8YDiTmwXR7t8=" + } + } + }, "regexpp": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/regexpp/-/regexpp-3.1.0.tgz", "integrity": "sha512-ZOIzd8yVsQQA7j8GCSlPGXwg5PfmA1mrq0JP4nGhh54LaKN3xdai/vHUDu74pKwV8OxseMS65u2NImosQcSD0Q==", "dev": true }, + "requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha1-kl0mAdOaxIXgkc8NpcbmlNw9yv8=" + }, "resolve": { "version": "1.19.0", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.19.0.tgz", @@ -1558,6 +1651,11 @@ "integrity": "sha512-zb/1OuZ6flOlH6tQyMPUrE3x3Ulxjlo9WIVXR4yVYi4H9UXQaeIsPbLn2R3O3vQCnDKkAl2qHiuocKKX4Tz/Sw==", "dev": true }, + "safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + }, "semver": { "version": "7.3.2", "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.2.tgz", @@ -1726,6 +1824,11 @@ } } }, + "string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=" + }, "strip-ansi": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-6.0.0.tgz", @@ -1840,6 +1943,15 @@ "punycode": "^2.1.0" } }, + "url-parse": { + "version": "1.4.7", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.4.7.tgz", + "integrity": "sha512-d3uaVyzDB9tQoSXFvuSUNFibTd9zxd2bkVrDRvF5TmvWWQwqE4lgYJ5m+x1DbecWkw+LK4RNl2CU1hHuOKPVlg==", + "requires": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "v8-compile-cache": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/v8-compile-cache/-/v8-compile-cache-2.2.0.tgz", diff --git a/package.json b/package.json index de36943..c8a54af 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,9 @@ "build": "tsc" }, "dependencies": { - "@jvalue/node-dry-basics": "0.0.3" + "@jvalue/node-dry-basics": "0.0.3", + "@types/amqplib": "0.x", + "amqplib": "0.x" }, "devDependencies": { "@types/node": "^14.14.7", diff --git a/src/amqpPublisher.ts b/src/amqpPublisher.ts new file mode 100644 index 0000000..e3b5aae --- /dev/null +++ b/src/amqpPublisher.ts @@ -0,0 +1,61 @@ +import * as AMQP from 'amqplib' + +import { sleep, stringifiers } from '@jvalue/node-dry-basics' + +export class AmqpPublisher { + private channel?: AMQP.Channel + + public async init (amqpUrl: string, exchange: string, retries: number, msBackoff: number): Promise { + for (let i = 0; i <= retries; i++) { + try { + const connection = await this.connect(amqpUrl) + this.channel = await this.initChannel(connection, exchange) + return + } catch (error) { + console.info(`Error initializing the AMQP Client (${i}/${retries}): + ${error}. Retrying in ${msBackoff}...`) + await sleep(msBackoff) + } + } + throw new Error(`Could not connect to AMQP broker at ${amqpUrl}`) + } + + private async connect (amqpUrl: string): Promise { + try { + const connection = await AMQP.connect(amqpUrl) + console.info(`Connection to amqp host at ${amqpUrl} successful`) + return connection + } catch (error) { + console.error(`Error connecting to amqp host at ${amqpUrl}: ${error}`) + throw error + } + } + + private async initChannel (connection: AMQP.Connection, exchange: string): Promise { + try { + const channel = await connection.createChannel() + await channel.assertExchange(exchange, 'topic') + console.info(`Exchange ${exchange} successfully initialized.`) + return channel + } catch (error) { + console.error(`Error creating exchange ${exchange}: ${error}`) + throw error + } + } + + public publish (exchange: string, topic: string, content: object): boolean { + if (this.channel === undefined) { + console.error('Publish not possible, AMQP client not initialized.') + return false + } else { + try { + const success = this.channel.publish(exchange, topic, Buffer.from(JSON.stringify(content))) + console.debug(`[EventProduce] ${topic}: ${stringifiers.stringify(content)}`) + return success + } catch (error) { + console.error(`Error publishing to exchange ${exchange} under key ${topic}: ${error}`) + return false + } + } + } +} diff --git a/src/index.ts b/src/index.ts index e69de29..428f4e8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -0,0 +1 @@ +export { AmqpPublisher } from './amqpPublisher'