diff --git a/src/consumer.ts b/src/consumer.ts index c4bfc25..5049b49 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -51,7 +51,7 @@ export class Consumer extends TypedEventEmitter { private visibilityTimeout: number; private terminateVisibilityTimeout: boolean; private waitTimeSeconds: number; - private authenticationErrorTimeout: number; + private connectionErrorTimeout: number; private pollingWaitTimeMs: number; private pollingCompleteWaitTimeMs: number; private heartbeatInterval: number; @@ -76,8 +76,8 @@ export class Consumer extends TypedEventEmitter { options.terminateVisibilityTimeout || false; this.heartbeatInterval = options.heartbeatInterval; this.waitTimeSeconds = options.waitTimeSeconds ?? 20; - this.authenticationErrorTimeout = - options.authenticationErrorTimeout ?? 10000; + this.connectionErrorTimeout = + options.connectionErrorTimeout ?? 10000; this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0; this.pollingCompleteWaitTimeMs = options.pollingCompleteWaitTimeMs ?? 0; this.shouldDeleteMessages = options.shouldDeleteMessages ?? true; @@ -249,14 +249,14 @@ export class Consumer extends TypedEventEmitter { ) .catch((err): void => { this.emitError(err); + if (isConnectionError(err)) { - logger.debug("authentication_error", { + logger.debug("connection_error", { detail: - "There was an authentication error. Pausing before retrying.", + `${err.code}: There was an connection error. Pausing before retrying.`, }); - currentPollingTimeout = this.authenticationErrorTimeout; + currentPollingTimeout = this.connectionErrorTimeout; } - return; }) .then((): void => { if (this.pollingTimeoutId) { diff --git a/src/errors.ts b/src/errors.ts index 71a2580..201671d 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -44,10 +44,13 @@ function isConnectionError(err: Error): boolean { if (err instanceof SQSError) { return ( err.statusCode === 403 || + err.name === "SQSError" || err.code === "CredentialsError" || err.code === "UnknownEndpoint" || err.code === "AWS.SimpleQueueService.NonExistentQueue" || - err.code === "CredentialsProviderError" + err.code === "CredentialsProviderError" || + err.code === "QueueDoesNotExist" || + err.code === "InvalidAddress" ); } return false; diff --git a/src/types.ts b/src/types.ts index 2f01b0b..c178b1c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -40,10 +40,10 @@ export interface ConsumerOptions { */ waitTimeSeconds?: number; /** - * The duration (in milliseconds) to wait before retrying after an authentication error. + * The duration (in milliseconds) to wait before retrying after a connection or authentication error. * @defaultvalue `10000` */ - authenticationErrorTimeout?: number; + connectionErrorTimeout?: number; /** * The duration (in milliseconds) to wait before repolling the queue. * @defaultvalue `0` diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index ab5300a..925b405 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -17,7 +17,7 @@ import { logger } from "../../src/logger.js"; const sandbox = sinon.createSandbox(); -const AUTHENTICATION_ERROR_TIMEOUT = 20; +const CONNECTION_ERROR_TIMEOUT = 20; const POLLING_TIMEOUT = 100; const QUEUE_URL = "some-queue-url"; const REGION = "some-region"; @@ -86,7 +86,7 @@ describe("Consumer", () => { region: REGION, handleMessage, sqs, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, }); }); @@ -256,7 +256,7 @@ describe("Consumer", () => { new Promise((resolve) => setTimeout(resolve, 1000)), handleMessageTimeout, sqs, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, }); consumer.start(); @@ -281,7 +281,7 @@ describe("Consumer", () => { throw new Error("unexpected parsing error"); }, sqs, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, }); consumer.start(); @@ -315,7 +315,7 @@ describe("Consumer", () => { throw new CustomError("unexpected parsing error"); }, sqs, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, }); consumer.start(); @@ -339,7 +339,7 @@ describe("Consumer", () => { throw customError; }, sqs, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, }); consumer.start(); @@ -418,7 +418,7 @@ describe("Consumer", () => { consumer.on("error", errorListener); consumer.start(); - await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); + await clock.tickAsync(CONNECTION_ERROR_TIMEOUT); consumer.stop(); sandbox.assert.calledTwice(errorListener); @@ -438,7 +438,7 @@ describe("Consumer", () => { consumer.on("error", errorListener); consumer.start(); - await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); + await clock.tickAsync(CONNECTION_ERROR_TIMEOUT); consumer.stop(); sandbox.assert.calledTwice(errorListener); @@ -457,7 +457,28 @@ describe("Consumer", () => { consumer.on("error", errorListener); consumer.start(); - await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); + await clock.tickAsync(CONNECTION_ERROR_TIMEOUT); + consumer.stop(); + + sandbox.assert.calledTwice(errorListener); + sandbox.assert.calledTwice(sqs.send); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage); + }); + + it("waits before repolling when a connection error occurs", async () => { + const unknownEndpointErr = { + name: "SQSError", + code: "SQSError", + message: + "SQS receive message failed: getaddrinfo ENOTFOUND sqs.eu-west-1.amazonaws.com", + }; + sqs.send.withArgs(mockReceiveMessage).rejects(unknownEndpointErr); + const errorListener = sandbox.stub(); + consumer.on("error", errorListener); + + consumer.start(); + await clock.tickAsync(CONNECTION_ERROR_TIMEOUT); consumer.stop(); sandbox.assert.calledTwice(errorListener); @@ -476,7 +497,7 @@ describe("Consumer", () => { consumer.on("error", errorListener); consumer.start(); - await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); + await clock.tickAsync(CONNECTION_ERROR_TIMEOUT); consumer.stop(); sandbox.assert.calledTwice(errorListener); @@ -495,7 +516,7 @@ describe("Consumer", () => { consumer.on("error", errorListener); consumer.start(); - await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); + await clock.tickAsync(CONNECTION_ERROR_TIMEOUT); consumer.stop(); sandbox.assert.calledTwice(errorListener); @@ -510,7 +531,7 @@ describe("Consumer", () => { region: REGION, handleMessage, sqs, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, pollingWaitTimeMs: POLLING_TIMEOUT, }); @@ -560,7 +581,7 @@ describe("Consumer", () => { region: REGION, handleMessage, sqs, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, preReceiveMessageCallback: preReceiveMessageCallbackStub, postReceiveMessageCallback: postReceiveMessageCallbackStub, }); @@ -596,7 +617,7 @@ describe("Consumer", () => { region: REGION, handleMessage, sqs, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, shouldDeleteMessages: false, }); @@ -701,7 +722,7 @@ describe("Consumer", () => { AttributeNames: [], MessageAttributeNames: ["attribute-1", "attribute-2"], MaxNumberOfMessages: 3, - WaitTimeSeconds: AUTHENTICATION_ERROR_TIMEOUT, + WaitTimeSeconds: CONNECTION_ERROR_TIMEOUT, VisibilityTimeout: undefined, }), ); @@ -745,7 +766,7 @@ describe("Consumer", () => { AttributeNames: ["ApproximateReceiveCount"], MessageAttributeNames: [], MaxNumberOfMessages: 1, - WaitTimeSeconds: AUTHENTICATION_ERROR_TIMEOUT, + WaitTimeSeconds: CONNECTION_ERROR_TIMEOUT, VisibilityTimeout: undefined, }), ); @@ -881,7 +902,7 @@ describe("Consumer", () => { }, batchSize: 2, sqs, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, }); consumer.start(); @@ -917,7 +938,7 @@ describe("Consumer", () => { }, batchSize: 2, sqs, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, }); consumer.start(); @@ -943,7 +964,7 @@ describe("Consumer", () => { }, batchSize: 2, sqs, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, }); consumer.start(); @@ -1525,7 +1546,7 @@ describe("Consumer", () => { handleMessage, sqs, pollingCompleteWaitTimeMs: 5000, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, }); consumer.on("stopped", handleStop); @@ -1579,7 +1600,7 @@ describe("Consumer", () => { handleMessage, sqs, pollingCompleteWaitTimeMs: 500, - authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT, }); consumer.on("stopped", handleStop);