Skip to content

Commit

Permalink
Add implementation of amqpPublisher
Browse files Browse the repository at this point in the history
  • Loading branch information
georg-schwarz committed Nov 23, 2020
1 parent f1bace2 commit 27a0c46
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 6 deletions.
122 changes: 117 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"build": "tsc"
},
"dependencies": {
"@jvalue/node-dry-basics": "0.0.3"
"@jvalue/node-dry-basics": "0.0.3",
"@types/amqplib": "0.x",
"amqplib": "0.x"
},
"devDependencies": {
"@types/node": "^14.14.7",
Expand Down
61 changes: 61 additions & 0 deletions src/amqpPublisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import * as AMQP from 'amqplib'

import { sleep, stringifiers } from '@jvalue/node-dry-basics'

export class AmqpPublisher {
private channel?: AMQP.Channel

public async init (amqpUrl: string, exchange: string, retries: number, msBackoff: number): Promise<void> {
for (let i = 0; i <= retries; i++) {
try {
const connection = await this.connect(amqpUrl)
this.channel = await this.initChannel(connection, exchange)
return
} catch (error) {
console.info(`Error initializing the AMQP Client (${i}/${retries}):
${error}. Retrying in ${msBackoff}...`)
await sleep(msBackoff)
}
}
throw new Error(`Could not connect to AMQP broker at ${amqpUrl}`)
}

private async connect (amqpUrl: string): Promise<AMQP.Connection> {
try {
const connection = await AMQP.connect(amqpUrl)
console.info(`Connection to amqp host at ${amqpUrl} successful`)
return connection
} catch (error) {
console.error(`Error connecting to amqp host at ${amqpUrl}: ${error}`)
throw error
}
}

private async initChannel (connection: AMQP.Connection, exchange: string): Promise<AMQP.Channel> {
try {
const channel = await connection.createChannel()
await channel.assertExchange(exchange, 'topic')
console.info(`Exchange ${exchange} successfully initialized.`)
return channel
} catch (error) {
console.error(`Error creating exchange ${exchange}: ${error}`)
throw error
}
}

public publish (exchange: string, topic: string, content: object): boolean {
if (this.channel === undefined) {
console.error('Publish not possible, AMQP client not initialized.')
return false
} else {
try {
const success = this.channel.publish(exchange, topic, Buffer.from(JSON.stringify(content)))
console.debug(`[EventProduce] ${topic}: ${stringifiers.stringify(content)}`)
return success
} catch (error) {
console.error(`Error publishing to exchange ${exchange} under key ${topic}: ${error}`)
return false
}
}
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { AmqpPublisher } from './amqpPublisher'

0 comments on commit 27a0c46

Please sign in to comment.