Skip to content

Commit

Permalink
Add amqpConsumer
Browse files Browse the repository at this point in the history
Refactor options

Improve logging
  • Loading branch information
georg-schwarz committed Nov 23, 2020
1 parent 27a0c46 commit 99707b4
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 32 deletions.
28 changes: 28 additions & 0 deletions src/amqpConnector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import * as AMQP from 'amqplib'

export async function connect (amqpUrl: string): Promise<AMQP.Connection> {
try {
const connection = await AMQP.connect(amqpUrl)
console.info(`Connection to amqp host at ${amqpUrl} successful`)
return connection
} catch (error) {
console.error(`Error connecting to amqp host at ${amqpUrl}: ${error}`)
throw error
}
}

export async function initChannel (
connection: AMQP.Connection,
exchange: {name: string, type: string},
exchangeOptions: AMQP.Options.AssertExchange
): Promise<AMQP.Channel> {
try {
const channel = await connection.createChannel()
await channel.assertExchange(exchange.name, exchange.type, exchangeOptions)
console.info(`Exchange ${exchange.name} successfully initialized.`)
return channel
} catch (error) {
console.error(`Error creating exchange ${exchange.name}: ${error}`)
throw error
}
}
51 changes: 51 additions & 0 deletions src/amqpConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import * as AMQP from 'amqplib'
import { sleep, stringifiers } from '@jvalue/node-dry-basics'

import * as AmqpConnector from './amqpConnector'

const LOG_MAX_LENGTH = 30

export class AmqpConsumer {
private connection?: AMQP.Connection

public async init (amqpUrl: string, retries: number, msBackoff: number): Promise<void> {
for (let i = 1; i <= retries; i++) {
try {
this.connection = await AmqpConnector.connect(amqpUrl)
return
} catch (error) {
console.info(`Error initializing the AMQP Client (${i}/${retries}):
${error}. Retrying in ${msBackoff}...`)
}
await sleep(msBackoff)
}
throw new Error(`Could not connect to AMQP broker at ${amqpUrl}`)
}

public async registerConsumer (
exchange: { name: string, type: string },
exchangeOptions: AMQP.Options.AssertExchange,
queue: { name: string, routingKey: string },
queueOptions: AMQP.Options.AssertQueue,
consumeEvent: (msg: AMQP.ConsumeMessage | null) => Promise<void>
): Promise<void> {
if (this.connection === undefined) {
throw new Error('Consume not possible, AMQP client not initialized.')
}

try {
const channel =
await AmqpConnector.initChannel(this.connection, { name: exchange.name, type: exchange.type }, exchangeOptions)
const q = await channel.assertQueue(queue.name, queueOptions)
await channel.bindQueue(q.queue, exchange.name, queue.routingKey)
await channel.consume(q.queue, msg => {
console.debug("[AMQP][Consume] %s:'%s'",
msg?.fields.routingKey, stringifiers.stringify(msg?.content.toString(), LOG_MAX_LENGTH))
consumeEvent(msg)
.catch(error => console.error(`Failed to handle ${msg?.fields.routingKey ?? 'null'} event`, error))
})
} catch (error) {
throw new Error(`Error subscribing to exchange ${exchange.name} under key ${queue.routingKey}: ${error}`)
}
}
}
54 changes: 22 additions & 32 deletions src/amqpPublisher.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
import * as AMQP from 'amqplib'

import { sleep, stringifiers } from '@jvalue/node-dry-basics'

import * as AmqpConnector from './amqpConnector'

const LOG_MAX_LENGTH = 30

export class AmqpPublisher {
private channel?: AMQP.Channel

public async init (amqpUrl: string, exchange: string, retries: number, msBackoff: number): Promise<void> {
for (let i = 0; i <= retries; i++) {
public async init (
amqpUrl: string,
retries: number,
msBackoff: number,
exchange: { name: string, type: string },
exchangeOptions: AMQP.Options.AssertExchange
): Promise<void> {
for (let i = 1; i <= retries; i++) {
try {
const connection = await this.connect(amqpUrl)
this.channel = await this.initChannel(connection, exchange)
const connection = await AmqpConnector.connect(amqpUrl)
this.channel = await AmqpConnector.initChannel(connection, exchange, exchangeOptions)
return
} catch (error) {
console.info(`Error initializing the AMQP Client (${i}/${retries}):
Expand All @@ -20,40 +29,21 @@ export class AmqpPublisher {
throw new Error(`Could not connect to AMQP broker at ${amqpUrl}`)
}

private async connect (amqpUrl: string): Promise<AMQP.Connection> {
try {
const connection = await AMQP.connect(amqpUrl)
console.info(`Connection to amqp host at ${amqpUrl} successful`)
return connection
} catch (error) {
console.error(`Error connecting to amqp host at ${amqpUrl}: ${error}`)
throw error
}
}

private async initChannel (connection: AMQP.Connection, exchange: string): Promise<AMQP.Channel> {
try {
const channel = await connection.createChannel()
await channel.assertExchange(exchange, 'topic')
console.info(`Exchange ${exchange} successfully initialized.`)
return channel
} catch (error) {
console.error(`Error creating exchange ${exchange}: ${error}`)
throw error
}
}

public publish (exchange: string, topic: string, content: object): boolean {
public publish (exchangeName: string, routingKey: string, content: object): boolean {
if (this.channel === undefined) {
console.error('Publish not possible, AMQP client not initialized.')
return false
} else {
try {
const success = this.channel.publish(exchange, topic, Buffer.from(JSON.stringify(content)))
console.debug(`[EventProduce] ${topic}: ${stringifiers.stringify(content)}`)
const success = this.channel.publish(
exchangeName,
routingKey,
Buffer.from(JSON.stringify(content))
)
console.debug(`[AMQP][Produce] ${routingKey}: ${stringifiers.stringify(content, LOG_MAX_LENGTH)}`)
return success
} catch (error) {
console.error(`Error publishing to exchange ${exchange} under key ${topic}: ${error}`)
console.error(`Error publishing to exchange ${exchangeName} under key ${routingKey}: ${error}`)
return false
}
}
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export { AmqpPublisher } from './amqpPublisher'
export { AmqpConsumer } from './amqpConsumer'

0 comments on commit 99707b4

Please sign in to comment.