From ff2292c66988796bf44a318c30a17d954e18adc3 Mon Sep 17 00:00:00 2001 From: dillonstreator Date: Mon, 20 Jan 2025 19:33:25 -0600 Subject: [PATCH] move fetch retry client to core and add better timeout and listener cleanup --- packages/{server => core}/src/fetchRetry.ts | 83 +++++++++++++++------ packages/core/src/index.ts | 1 + packages/core/src/sleep.ts | 29 +++++++ packages/server/src/index.ts | 3 +- packages/server/src/webhook.ts | 3 +- 5 files changed, 91 insertions(+), 28 deletions(-) rename packages/{server => core}/src/fetchRetry.ts (50%) diff --git a/packages/server/src/fetchRetry.ts b/packages/core/src/fetchRetry.ts similarity index 50% rename from packages/server/src/fetchRetry.ts rename to packages/core/src/fetchRetry.ts index d40ff10..fc9e1a9 100644 --- a/packages/server/src/fetchRetry.ts +++ b/packages/core/src/fetchRetry.ts @@ -1,29 +1,65 @@ -import { sleep } from '@crypt.fyi/core'; +import { sleepWithCancel } from './sleep'; +/** + * Creates a fetch client with retry capabilities and configurable timeout behavior. + * + * @remarks + * This client handles two distinct types of timeouts/cancellations: + * 1. Internal timeout: Controlled by `requestTimeoutMs`. When triggered, the request will be retried + * if within `maxAttempts`. Uses a custom `TimeoutError` to distinguish from other abort events. + * 2. External cancellation: Via caller-provided AbortSignal in fetch options. When triggered, + * immediately stops execution without retry attempts. + * + * The client uses exponential backoff between retry attempts, with configurable delay parameters. + * + * @param options - Configuration options for the retry client + * @param options.maxAttempts - Maximum number of attempts before giving up + * @param options.requestTimeoutMs - Timeout for each individual request attempt (default: 5000ms) + * @param options.fetchFn - Optional custom fetch implementation (default: global fetch) + * @param options.shouldRetry - Optional custom function to determine if an error is retryable (default: isRetryableFetchError) + * @param options.calculateBackoff - Optional custom function to calculate the backoff delay between retries (default: defaultCalculateBackoff) + * + * @returns A fetch-compatible function that implements the retry logic + * + * @throws {FetchRetryError} When all retry attempts fail or a non-retryable error occurs + * + * @example + * ```ts + * const client = createFetchRetryClient({ + * maxAttempts: 3, + * requestTimeoutMs: 2000 + * }); + * + * // With caller cancellation + * const controller = new AbortController(); + * const response = await client('https://api.example.com', { + * signal: controller.signal + * }); + * ``` + */ export const createFetchRetryClient = ({ maxAttempts, requestTimeoutMs = 5_000, - backoffDelayMs = 5_000, - backoffFactor = 2, - backoffMaxDelayMs = 60_000, fetchFn = fetch, + shouldRetry = isRetryableFetchError, + calculateBackoff = defaultCalculateBackoff, }: { maxAttempts: number; requestTimeoutMs?: number; - backoffDelayMs?: number; - backoffFactor?: number; - backoffMaxDelayMs?: number; + shouldRetry?: (error: unknown) => boolean; + calculateBackoff?: (attempt: number) => number; fetchFn?: typeof fetch; }): typeof fetch => { return async (url, options) => { let attempts = 0; - let errors: unknown[] = []; + const errors: unknown[] = []; do { const signals = [options?.signal]; let timeoutId: ReturnType | undefined; if (requestTimeoutMs > 0) { - // We intentionally don't use AbortSignal.timeout because we want to control the abort reason with our sentinel TimeoutError + // We intentionally don't use AbortSignal.timeout because we want to control the abort reason with our own sentinel `TimeoutError` + // This allows our internal timeout to be retryable by default and a cancellation from the caller to be respected and not trigger a retry // Maybe monkey patch AbortSignal.timeout to allow an optional sentinel error or create a helper function for this pattern const ab = new AbortController(); timeoutId = setTimeout(() => ab.abort(new TimeoutError()), requestTimeoutMs); @@ -41,7 +77,7 @@ export const createFetchRetryClient = ({ } catch (error) { errors.push(error); - if (!isRetryableFetchError(error)) { + if (!shouldRetry(error)) { throw new FetchRetryError(errors, 'Non-retryable error occurred'); } } finally { @@ -49,16 +85,18 @@ export const createFetchRetryClient = ({ clearTimeout(timeoutId); } } - const backoffMs = calculateBackoff(attempts, { - delayMs: backoffDelayMs, - factor: backoffFactor, - maxDelayMs: backoffMaxDelayMs, - }); + const backoffMs = calculateBackoff(attempts); + const sleeper = sleepWithCancel(backoffMs); + const eventListenerAC = new AbortController(); await Promise.race([ - sleep(backoffMs), - new Promise((resolve) => signal.addEventListener('abort', resolve)), + sleeper.promise, + new Promise((resolve) => + signal.addEventListener('abort', resolve, { signal: eventListenerAC.signal, once: true }), + ), ]); + sleeper.cancel(); + eventListenerAC.abort(); } while (++attempts < maxAttempts); throw new FetchRetryError(errors, 'Failed to fetch after max retries'); @@ -87,15 +125,12 @@ const isRetryableStatusCode = (status: number): boolean => { return [408, 423, 425].includes(status); }; -const calculateBackoff = ( - attempt: number, - config: { delayMs: number; factor: number; maxDelayMs: number }, -): number => { - const backoffMs = config.delayMs * Math.pow(config.factor, attempt - 1); - return Math.min(backoffMs, config.maxDelayMs); +const defaultCalculateBackoff = (attempt: number): number => { + const backoffMs = 5_000 * Math.pow(2, attempt - 1); + return Math.min(backoffMs, 60_000); }; -class HTTPError extends Error { +export class HTTPError extends Error { status: number; constructor(status: number, message: string) { super(`${status} ${message}`); diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index e40cc24..6b6a6e7 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -6,3 +6,4 @@ export * from './hash'; export * from './random'; export * from './encryption'; export * from './sleep'; +export * from './fetchRetry'; diff --git a/packages/core/src/sleep.ts b/packages/core/src/sleep.ts index d12c8b3..a78fb22 100644 --- a/packages/core/src/sleep.ts +++ b/packages/core/src/sleep.ts @@ -1,2 +1,31 @@ export const sleep = (ms: number, { enabled = true }: { enabled?: boolean } = {}): Promise => enabled ? new Promise((resolve) => setTimeout(resolve, ms)) : Promise.resolve(); + +export type CancellableSleep = { + promise: Promise; + cancel: () => void; +}; + +export const sleepWithCancel = ( + ms: number, + { enabled = true }: { enabled?: boolean } = {}, +): CancellableSleep => { + if (!enabled) { + return { + promise: Promise.resolve(), + cancel: () => {}, // No-op for disabled sleep + }; + } + + let timeoutId: ReturnType; + const promise = new Promise((resolve) => { + timeoutId = setTimeout(resolve, ms); + }); + + return { + promise, + cancel: () => { + clearTimeout(timeoutId); + }, + }; +}; diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 8611d5a..de4b6d8 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -7,7 +7,7 @@ import Redis from 'ioredis'; import { createRedisVault } from './vault/redis'; import { createTokenGenerator } from './vault/tokens'; import { createBullMQWebhookSender, createHTTPWebhookSender, WebhookSender } from './webhook'; -import { createFetchRetryClient } from './fetchRetry'; +import { createFetchRetryClient } from '@crypt.fyi/core'; const main = async () => { const logger = await initLogging(config); @@ -21,7 +21,6 @@ const main = async () => { let webhookSender: WebhookSender; const webhookSenderFetch = createFetchRetryClient({ maxAttempts: config.webhookMaxAttempts, - backoffDelayMs: config.webhookBackoffDelayMs, }); if (config.webhookSender === 'bullmq') { const bullmqRedis = new Redis(config.redisUrl, { maxRetriesPerRequest: null, family: 0 }); diff --git a/packages/server/src/webhook.ts b/packages/server/src/webhook.ts index 8e290af..0fb6411 100644 --- a/packages/server/src/webhook.ts +++ b/packages/server/src/webhook.ts @@ -1,9 +1,8 @@ import { Queue, Worker } from 'bullmq'; -import { gcm } from '@crypt.fyi/core'; +import { gcm, isRetryableFetchError } from '@crypt.fyi/core'; import { Logger } from './logging'; import Redis from 'ioredis'; import { z } from 'zod'; -import { isRetryableFetchError } from './fetchRetry'; const webhookEventSchema = z.enum(['READ', 'BURN', 'FAILURE_KEY_PASSWORD', 'FAILURE_IP_ADDRESS']);