Skip to content

Commit

Permalink
change package type name prefixes and fix processEvents options
Browse files Browse the repository at this point in the history
  • Loading branch information
dillonstreator committed Dec 17, 2023
1 parent 1d7b203 commit 9539903
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 43 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<img src="https://codecov.io/gh/dillonstreator/txob/graph/badge.svg?token=E9M7G67VLL"/>
</a>
<a aria-label="NPM version" href="https://www.npmjs.com/package/txob">
<img alt="" src="https://badgen.net/npm/v/txob?t=1702842290">
<img alt="" src="https://badgen.net/npm/v/txob?t=1702843220">
</a>
<a aria-label="License" href="https://github.com/dillonstreator/txob/blob/main/LICENSE">
<img alt="" src="https://badgen.net/npm/license/txob">
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"type": "git",
"url": "git://github.com/dillonstreator/txob.git"
},
"version": "0.0.5",
"version": "0.0.6",
"license": "MIT",
"files": [
"dist",
Expand Down
8 changes: 4 additions & 4 deletions src/pg/client.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import { Client } from 'pg';
import { OutboxEvent, OutboxProcessorClient } from '../processor';
import { TxOBEvent, TxOBProcessorClient } from '../processor';

export const createEventProcessorClient = <EventType extends string>(
pgClient: Client
): OutboxProcessorClient<EventType> => ({
): TxOBProcessorClient<EventType> => ({
getUnprocessedEvents: async (opts) => {
const events = await pgClient.query<
Pick<OutboxEvent<EventType>, 'id' | 'errors'>
Pick<TxOBEvent<EventType>, 'id' | 'errors'>
>(
'SELECT id, errors FROM events WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1',
[opts.maxErrors]
);
return events.rows;
},
getEventByIdForUpdateSkipLocked: async (eventId) => {
const event = await pgClient.query<OutboxEvent<EventType>>(
const event = await pgClient.query<TxOBEvent<EventType>>(
`SELECT * FROM events WHERE id = $1 FOR UPDATE SKIP LOCKED`,
[eventId]
);
Expand Down
77 changes: 40 additions & 37 deletions src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,52 @@ import retry from 'retry';
import { retryable } from './retry';
import { getDate } from './date';

type OutboxEventHandlerResult = {
type TxOBEventHandlerResult = {
processed_at?: Date;
errors?: { error: any; timestamp: Date }[];
};

export type OutboxEvent<OutboxEventType extends string> = {
export type TxOBEvent<TxOBEventType extends string> = {
id: string;
timestamp: Date;
type: OutboxEventType;
type: TxOBEventType;
data: Record<string, any>;
correlation_id: string;
handler_results: Record<string, OutboxEventHandlerResult>;
handler_results: Record<string, TxOBEventHandlerResult>;
errors: number;
backoff_until?: Date | null;
processed_at?: Date;
};

type OutboxEventHandlerOpts = {
type TxOBEventHandlerOpts = {
signal?: AbortSignal;
};

export type OutboxEventHandler = <OutboxEventType extends string>(
event: OutboxEvent<OutboxEventType>,
opts: OutboxEventHandlerOpts
export type TxOBEventHandler = <TxOBEventType extends string>(
event: TxOBEvent<TxOBEventType>,
opts: TxOBEventHandlerOpts
) => Promise<void>;

export type OutboxEventHandlerMap<OutboxEventType extends string> = Record<
OutboxEventType,
export type TxOBEventHandlerMap<TxOBEventType extends string> = Record<
TxOBEventType,
{
[key: string]: OutboxEventHandler;
[key: string]: TxOBEventHandler;
}
>;

type OutboxProcessorClientOpts = {
type TxOBProcessorClientOpts = {
signal?: AbortSignal;
};

export interface OutboxProcessorClient<OutboxEventType extends string> {
export interface TxOBProcessorClient<TxOBEventType extends string> {
getUnprocessedEvents(
opts: OutboxProcessorClientOpts & { maxErrors: number }
): Promise<Pick<OutboxEvent<OutboxEventType>, 'id' | 'errors'>[]>;
opts: TxOBProcessorClientOpts & { maxErrors: number }
): Promise<Pick<TxOBEvent<TxOBEventType>, 'id' | 'errors'>[]>;
getEventByIdForUpdateSkipLocked(
eventId: OutboxEvent<OutboxEventType>['id'],
opts: OutboxProcessorClientOpts
): Promise<OutboxEvent<OutboxEventType> | null>;
updateEvent(event: OutboxEvent<OutboxEventType>): Promise<void>;
eventId: TxOBEvent<TxOBEventType>['id'],
opts: TxOBProcessorClientOpts
): Promise<TxOBEvent<TxOBEventType> | null>;
updateEvent(event: TxOBEvent<TxOBEventType>): Promise<void>;
}

const defaultBackoff = (errorCount: number): Date => {
Expand All @@ -60,38 +60,41 @@ const defaultBackoff = (errorCount: number): Date => {
};
const defaultMaxErrors = 5;

type OutboxProcessorOpts = {
signal?: AbortSignal;
type TxOBProcessEventsOpts = {
maxErrors: number;
backoff: (count: number) => Date;
retry?: retry.OperationOptions;
signal?: AbortSignal;
};

export const processEvents = async <OutboxEventType extends string>(
client: OutboxProcessorClient<OutboxEventType>,
handlerMap: OutboxEventHandlerMap<OutboxEventType>,
opts: OutboxProcessorOpts = {
export const processEvents = async <TxOBEventType extends string>(
client: TxOBProcessorClient<TxOBEventType>,
handlerMap: TxOBEventHandlerMap<TxOBEventType>,
opts?: Partial<TxOBProcessEventsOpts>
): Promise<void> => {
const _opts: TxOBProcessEventsOpts = {
maxErrors: defaultMaxErrors,
backoff: defaultBackoff,
}
): Promise<void> => {
const events = await client.getUnprocessedEvents(opts);
...opts,
};

const events = await client.getUnprocessedEvents(_opts);
if (events.length === 0) {
return;
}

for (const unlockedEvent of events) {
if (opts.signal?.aborted) {
if (_opts.signal?.aborted) {
return;
}
if (unlockedEvent.errors >= opts.maxErrors) {
if (unlockedEvent.errors >= _opts.maxErrors) {
// TODO: log potential issue with client configuration on finding unprocessed events
continue;
}

const lockedEvent = await client.getEventByIdForUpdateSkipLocked(
unlockedEvent.id,
opts
_opts
);
if (!lockedEvent) {
continue;
Expand All @@ -102,7 +105,7 @@ export const processEvents = async <OutboxEventType extends string>(
let eventHandlerMap = handlerMap[lockedEvent.type];
if (!eventHandlerMap) {
errored = true;
lockedEvent.errors = opts.maxErrors;
lockedEvent.errors = _opts.maxErrors;
eventHandlerMap = {};
}

Expand All @@ -118,7 +121,7 @@ export const processEvents = async <OutboxEventType extends string>(
handlerResults.errors ??= [];

try {
await handler(lockedEvent, { signal: opts.signal });
await handler(lockedEvent, { signal: _opts.signal });
handlerResults.processed_at = getDate();
} catch (error) {
errored = true;
Expand All @@ -136,10 +139,10 @@ export const processEvents = async <OutboxEventType extends string>(
if (errored) {
lockedEvent.errors = Math.min(
lockedEvent.errors + 1,
opts.maxErrors
_opts.maxErrors
);
lockedEvent.backoff_until = opts.backoff(lockedEvent.errors);
if (lockedEvent.errors === opts.maxErrors) {
lockedEvent.backoff_until = _opts.backoff(lockedEvent.errors);
if (lockedEvent.errors === _opts.maxErrors) {
lockedEvent.backoff_until = null;
}
} else {
Expand All @@ -156,7 +159,7 @@ export const processEvents = async <OutboxEventType extends string>(
minTimeout: 250,
maxTimeout: 2500,
randomize: true,
...(opts.retry ?? {}),
...(_opts.retry ?? {}),
});
}
};

0 comments on commit 9539903

Please sign in to comment.