From 6b7303eddbc3fbca30761d1dfa5873292ca0b2ca Mon Sep 17 00:00:00 2001 From: Bohdan Date: Mon, 18 Mar 2024 18:31:52 +0200 Subject: [PATCH 1/2] fix: Refactor consumer - Add missing type declarations - Remove autoBind usage - Update passing in-class methods to intervals using arrow functions --- src/bind.ts | 22 ---------------------- src/consumer.ts | 47 +++++++++++++++++++++++++---------------------- 2 files changed, 25 insertions(+), 44 deletions(-) delete mode 100644 src/bind.ts diff --git a/src/bind.ts b/src/bind.ts deleted file mode 100644 index 2fd897a..0000000 --- a/src/bind.ts +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Determines if the property is a method - * @param propertyName the name of the property - * @param value the value of the property - */ -function isMethod(propertyName: string, value: any): boolean { - return propertyName !== "constructor" && typeof value === "function"; -} - -/** - * Auto binds the provided properties - * @param obj an object containing the available properties - */ -export function autoBind(obj: object): void { - const propertyNames = Object.getOwnPropertyNames(obj.constructor.prototype); - propertyNames.forEach((propertyName) => { - const value = obj[propertyName]; - if (isMethod(propertyName, value)) { - obj[propertyName] = value.bind(obj); - } - }); -} diff --git a/src/consumer.ts b/src/consumer.ts index 772fdd6..283dae3 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -19,7 +19,6 @@ import { import { ConsumerOptions, StopOptions, UpdatableOptions } from "./types.js"; import { TypedEventEmitter } from "./emitter.js"; -import { autoBind } from "./bind.js"; import { SQSError, TimeoutError, @@ -89,7 +88,6 @@ export class Consumer extends TypedEventEmitter { useQueueUrlAsEndpoint: options.useQueueUrlAsEndpoint ?? true, region: options.region || process.env.AWS_REGION || "eu-west-1", }); - autoBind(this); } /** @@ -161,7 +159,7 @@ export class Consumer extends TypedEventEmitter { return; } - const exceededTimeout = + const exceededTimeout: boolean = Date.now() - this.stopRequestedAtTimestamp > this.pollingCompleteWaitTimeMs; if (exceededTimeout) { @@ -171,7 +169,7 @@ export class Consumer extends TypedEventEmitter { } this.emit("waiting_for_polling_to_complete"); - setTimeout(this.waitForPollingToComplete, 1000); + setTimeout(() => this.waitForPollingToComplete(), 1000); } /** @@ -196,7 +194,7 @@ export class Consumer extends TypedEventEmitter { public updateOption( option: UpdatableOptions, value: ConsumerOptions[UpdatableOptions], - ) { + ): void { validateOption(option, value, this, true); this[option] = value; @@ -237,7 +235,7 @@ export class Consumer extends TypedEventEmitter { this.isPolling = true; - let currentPollingTimeout = this.pollingWaitTimeMs; + let currentPollingTimeout: number = this.pollingWaitTimeMs; this.receiveMessage({ QueueUrl: this.queueUrl, AttributeNames: this.attributeNames, @@ -246,8 +244,10 @@ export class Consumer extends TypedEventEmitter { WaitTimeSeconds: this.waitTimeSeconds, VisibilityTimeout: this.visibilityTimeout, }) - .then(this.handleSqsResponse) - .catch((err) => { + .then((output: ReceiveMessageCommandOutput) => + this.handleSqsResponse(output), + ) + .catch((err): void => { this.emitError(err); if (isConnectionError(err)) { logger.debug("authentication_error", { @@ -258,16 +258,19 @@ export class Consumer extends TypedEventEmitter { } return; }) - .then(() => { + .then((): void => { if (this.pollingTimeoutId) { clearTimeout(this.pollingTimeoutId); } - this.pollingTimeoutId = setTimeout(this.poll, currentPollingTimeout); + this.pollingTimeoutId = setTimeout( + () => this.poll(), + currentPollingTimeout, + ); }) - .catch((err) => { + .catch((err): void => { this.emitError(err); }) - .finally(() => { + .finally((): void => { this.isPolling = false; }); } @@ -283,7 +286,7 @@ export class Consumer extends TypedEventEmitter { if (this.preReceiveMessageCallback) { await this.preReceiveMessageCallback(); } - const result = await this.sqs.send( + const result: ReceiveMessageCommandOutput = await this.sqs.send( new ReceiveMessageCommand(params), this.sqsSendOptions, ); @@ -333,7 +336,7 @@ export class Consumer extends TypedEventEmitter { heartbeatTimeoutId = this.startHeartbeat(message); } - const ackedMessage = await this.executeHandler(message); + const ackedMessage: Message = await this.executeHandler(message); if (ackedMessage?.MessageId === message.MessageId) { await this.deleteMessage(message); @@ -361,7 +364,7 @@ export class Consumer extends TypedEventEmitter { let heartbeatTimeoutId: NodeJS.Timeout | undefined = undefined; try { - messages.forEach((message) => { + messages.forEach((message: Message): void => { this.emit("message_received", message); }); @@ -369,12 +372,12 @@ export class Consumer extends TypedEventEmitter { heartbeatTimeoutId = this.startHeartbeat(null, messages); } - const ackedMessages = await this.executeBatchHandler(messages); + const ackedMessages: Message[] = await this.executeBatchHandler(messages); if (ackedMessages?.length > 0) { await this.deleteMessageBatch(ackedMessages); - ackedMessages.forEach((message) => { + ackedMessages.forEach((message: Message): void => { this.emit("message_processed", message); }); } @@ -448,7 +451,7 @@ export class Consumer extends TypedEventEmitter { ): Promise { const params: ChangeMessageVisibilityBatchCommandInput = { QueueUrl: this.queueUrl, - Entries: messages.map((message) => ({ + Entries: messages.map((message: Message) => ({ Id: message.MessageId, ReceiptHandle: message.ReceiptHandle, VisibilityTimeout: timeout, @@ -479,7 +482,7 @@ export class Consumer extends TypedEventEmitter { let result; if (this.handleMessageTimeout) { - const pending = new Promise((_, reject) => { + const pending: Promise = new Promise((_, reject): void => { handleMessageTimeoutId = setTimeout((): void => { reject(new TimeoutError()); }, this.handleMessageTimeout); @@ -518,7 +521,7 @@ export class Consumer extends TypedEventEmitter { */ private async executeBatchHandler(messages: Message[]): Promise { try { - const result = await this.handleMessageBatch(messages); + const result: void | Message[] = await this.handleMessageBatch(messages); return !this.alwaysAcknowledge && result instanceof Object ? result @@ -576,12 +579,12 @@ export class Consumer extends TypedEventEmitter { return; } logger.debug("deleting_messages", { - messageIds: messages.map((msg) => msg.MessageId), + messageIds: messages.map((msg: Message) => msg.MessageId), }); const deleteParams: DeleteMessageBatchCommandInput = { QueueUrl: this.queueUrl, - Entries: messages.map((message) => ({ + Entries: messages.map((message: Message) => ({ Id: message.MessageId, ReceiptHandle: message.ReceiptHandle, })), From 3a292f23b03f82756335194ad96cd61d80f0a26f Mon Sep 17 00:00:00 2001 From: Bohdan Date: Mon, 18 Mar 2024 18:38:23 +0200 Subject: [PATCH 2/2] fix: Fix calling processMessage in handleSqsResponse --- src/consumer.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/consumer.ts b/src/consumer.ts index 283dae3..c4bfc25 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -312,7 +312,11 @@ export class Consumer extends TypedEventEmitter { if (this.handleMessageBatch) { await this.processMessageBatch(response.Messages); } else { - await Promise.all(response.Messages.map(this.processMessage)); + await Promise.all( + response.Messages.map((message: Message) => + this.processMessage(message), + ), + ); } this.emit("response_processed");