diff --git a/examples/pg/index.ts b/examples/pg/index.ts index 9a47cb7..6ea2d72 100644 --- a/examples/pg/index.ts +++ b/examples/pg/index.ts @@ -1,8 +1,6 @@ import http from "node:http"; import { randomUUID } from "node:crypto"; import { Client } from "pg"; -import { EventProcessor } from "../../src/processor"; -import { createProcessorClient } from "../../src/pg/client"; import dotenv from "dotenv"; import gracefulShutdown from "http-graceful-shutdown"; dotenv.config(); @@ -11,7 +9,7 @@ const eventTypes = { ResourceSaved: "ResourceSaved", } as const; -type EventType = keyof typeof eventTypes; +export type EventType = keyof typeof eventTypes; const main = async () => { const client = new Client({ @@ -22,34 +20,6 @@ const main = async () => { await client.connect(); await migrate(client); - const processor = EventProcessor( - createProcessorClient(client), - { - ResourceSaved: { - thing1: async (event) => { - console.log(`${event.id} thing1 ${event.correlation_id}`); - if (Math.random() > 0.9) throw new Error("some issue"); - - return; - }, - thing2: async (event) => { - console.log(`${event.id} thing2 ${event.correlation_id}`); - if (Math.random() > 0.9) throw new Error("some issue"); - - return; - }, - thing3: async (event) => { - console.log(`${event.id} thing3 ${event.correlation_id}`); - if (Math.random() > 0.75) throw new Error("some issue"); - - return; - }, - }, - }, - { sleepTimeMs: 5000, logger: console }, - ); - processor.start(); - const server = http.createServer(async (req, res) => { const correlationId = randomUUID(); try { @@ -97,11 +67,7 @@ const main = async () => { const port = process.env.PORT || 3000; server.listen(port, () => console.log(`listening on ${port}`)); - gracefulShutdown(server, { - onShutdown: async () => { - await processor.stop(); - }, - }); + gracefulShutdown(server); }; if (require.main === module) { @@ -111,7 +77,7 @@ if (require.main === module) { }); } -const migrate = async (client: Client): Promise => { +export const migrate = async (client: Client): Promise => { await client.query(`CREATE TABLE IF NOT EXISTS events ( id UUID, timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, diff --git a/examples/pg/package.json b/examples/pg/package.json index 5af636c..350658e 100644 --- a/examples/pg/package.json +++ b/examples/pg/package.json @@ -11,7 +11,7 @@ "pg": "^8.11.3" }, "devDependencies": { - "@types/node": "^20.10.4", + "@types/node": "^20.10.5", "@types/pg": "^8.10.9", "dotenv": "^16.3.1", "nodemon": "^3.0.2", diff --git a/examples/pg/processor.ts b/examples/pg/processor.ts new file mode 100644 index 0000000..bb4ec4f --- /dev/null +++ b/examples/pg/processor.ts @@ -0,0 +1,67 @@ +import { Client } from "pg"; +import { EventProcessor } from "../../src/processor"; +import { createProcessorClient } from "../../src/pg/client"; +import { migrate, type EventType } from "./index"; +import dotenv from "dotenv"; +dotenv.config(); + +let processor: ReturnType | undefined = undefined; + +(async () => { + const client = new Client({ + user: process.env.POSTGRES_USER, + password: process.env.POSTGRES_PASSWORD, + database: process.env.POSTGRES_DB, + }); + await client.connect(); + await migrate(client); + + processor = EventProcessor( + createProcessorClient(client), + { + ResourceSaved: { + thing1: async (event) => { + console.log(`${event.id} thing1 ${event.correlation_id}`); + if (Math.random() > 0.9) throw new Error("some issue"); + + return; + }, + thing2: async (event) => { + console.log(`${event.id} thing2 ${event.correlation_id}`); + if (Math.random() > 0.9) throw new Error("some issue"); + + return; + }, + thing3: async (event) => { + console.log(`${event.id} thing3 ${event.correlation_id}`); + if (Math.random() > 0.75) throw new Error("some issue"); + + return; + }, + }, + }, + { sleepTimeMs: 5000, logger: console }, + ); + processor.start(); +})(); + +const shutdown = (() => { + let shutdownStarted = false; + return () => { + if (shutdownStarted) return; + + shutdownStarted = true; + + processor + ?.stop() + .then(() => { + process.exit(0); + }) + .catch((err) => { + console.error(err); + process.exit(1); + }); + }; +})(); +process.once("SIGTERM", shutdown); +process.once("SIGINT", shutdown); diff --git a/examples/pg/yarn.lock b/examples/pg/yarn.lock index 94f1b6f..4da29f6 100644 --- a/examples/pg/yarn.lock +++ b/examples/pg/yarn.lock @@ -47,13 +47,20 @@ resolved "https://registry.yarnpkg.com/@tsconfig/node16/-/node16-1.0.4.tgz#0b92dcc0cc1c81f6f306a381f28e31b1a56536e9" integrity sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA== -"@types/node@*", "@types/node@^20.10.4": +"@types/node@*": version "20.10.4" resolved "https://registry.yarnpkg.com/@types/node/-/node-20.10.4.tgz#b246fd84d55d5b1b71bf51f964bd514409347198" integrity sha512-D08YG6rr8X90YB56tSIuBaddy/UXAA9RKJoFvrsnogAum/0pmjkgi4+2nx96A330FmioegBWmEYQ+syqCFaveg== dependencies: undici-types "~5.26.4" +"@types/node@^20.10.5": + version "20.10.5" + resolved "https://registry.yarnpkg.com/@types/node/-/node-20.10.5.tgz#47ad460b514096b7ed63a1dae26fad0914ed3ab2" + integrity sha512-nNPsNE65wjMxEKI93yOP+NPGGBJz/PoN3kZsVLee0XMiJolxSekEVD8wRwBUBqkwc7UWop0edW50yrCQW4CyRw== + dependencies: + undici-types "~5.26.4" + "@types/pg@^8.10.9": version "8.10.9" resolved "https://registry.yarnpkg.com/@types/pg/-/pg-8.10.9.tgz#d20bb948c6268c5bd847e2bf968f1194c5a2355a" diff --git a/package.json b/package.json index 25d6f63..0c75471 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "type": "git", "url": "git://github.com/dillonstreator/txob.git" }, - "version": "0.0.15", + "version": "0.0.16", "license": "MIT", "files": [ "dist", diff --git a/src/mongodb/client.ts b/src/mongodb/client.ts index 51ea462..7e00e93 100644 --- a/src/mongodb/client.ts +++ b/src/mongodb/client.ts @@ -2,20 +2,22 @@ import { MongoClient, ObjectId } from "mongodb"; import { TxOBEvent, TxOBProcessorClient } from "../processor"; import { getDate } from "../date"; +const createReadyToProcessFilter = (maxErrors: number) => ({ + processed_at: null, + $or: [{ backoff_until: null }, { backoff_until: { $lt: getDate() } }], + errors: { $lt: maxErrors }, +}); + export const createProcessorClient = ( mongo: MongoClient, db: string, collection: string = "events", ): TxOBProcessorClient => ({ - getUnprocessedEvents: async (opts) => { + getReadyToProcessEvents: async (opts) => { const events = (await mongo .db(db) .collection(collection) - .find({ - processed_at: null, - $or: [{ backoff_until: null }, { backoff_until: { $lt: getDate() } }], - errors: { $lt: opts.maxErrors }, - }) + .find(createReadyToProcessFilter(opts.maxErrors)) .project({ id: 1, errors: 1 }) .toArray()) as Pick, "id" | "errors">[]; @@ -24,14 +26,17 @@ export const createProcessorClient = ( transaction: async (fn) => { await mongo.withSession(async (session): Promise => { await fn({ - getEventByIdForUpdateSkipLocked: async (eventId) => { + getReadyToProcessEventByIdForUpdateSkipLocked: async ( + eventId, + opts, + ) => { try { // https://www.mongodb.com/blog/post/how-to-select--for-update-inside-mongodb-transactions const event = (await mongo .db(db) .collection(collection) .findOneAndUpdate( - { id: eventId }, + { id: eventId, ...createReadyToProcessFilter(opts.maxErrors) }, { $set: { lock: new ObjectId(), @@ -53,6 +58,8 @@ export const createProcessorClient = ( }, )) as unknown; + if (!event) return null; + return event as TxOBEvent; } catch (error) { return null; diff --git a/src/pg/client.test.ts b/src/pg/client.test.ts index 14c5cab..e4c4d0f 100644 --- a/src/pg/client.test.ts +++ b/src/pg/client.test.ts @@ -7,12 +7,12 @@ describe("createProcessorClient", () => { query: vi.fn(), }; const client = createProcessorClient(pgClient); - expect(typeof client.getUnprocessedEvents).toBe("function"); + expect(typeof client.getReadyToProcessEvents).toBe("function"); expect(typeof client.transaction).toBe("function"); }); }); -describe("getUnprocessedEvents", () => { +describe("getReadyToProcessEvents", () => { it("should execute the correct query", async () => { const rows = [1, 2, 3]; const pgClient = { @@ -26,7 +26,7 @@ describe("getUnprocessedEvents", () => { maxErrors: 10, }; const client = createProcessorClient(pgClient); - const result = await client.getUnprocessedEvents(opts); + const result = await client.getReadyToProcessEvents(opts); expect(pgClient.query).toHaveBeenCalledOnce(); expect(pgClient.query).toHaveBeenCalledWith( "SELECT id, errors FROM events WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1", @@ -62,7 +62,7 @@ describe("transaction", () => { expect(pgClient.query).toHaveBeenNthCalledWith(2, "ROLLBACK"); }); - describe("getEventByIdForUpdateSkipLocked", () => { + describe("getReadyToProcessEventByIdForUpdateSkipLocked", () => { it("should execute the correct query", async () => { const rows = [1, 2, 3]; const pgClient = { @@ -77,13 +77,13 @@ describe("transaction", () => { const client = createProcessorClient(pgClient); let result: any; await client.transaction(async (txClient) => { - result = await txClient.getEventByIdForUpdateSkipLocked(eventId, {}); + result = await txClient.getReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 }); }); expect(pgClient.query).toHaveBeenCalledTimes(3); expect(pgClient.query).toHaveBeenCalledWith( - "SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 FOR UPDATE SKIP LOCKED", - [eventId], + "SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED", + [eventId, 6], ); expect(result).toBe(1); }); @@ -102,13 +102,13 @@ describe("transaction", () => { const client = createProcessorClient(pgClient); let result: any; await client.transaction(async (txClient) => { - result = await txClient.getEventByIdForUpdateSkipLocked(eventId, {}); + result = await txClient.getReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 }); }); expect(pgClient.query).toHaveBeenCalledTimes(3); expect(pgClient.query).toHaveBeenCalledWith( - "SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 FOR UPDATE SKIP LOCKED", - [eventId], + "SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED", + [eventId, 5], ); expect(result).toBeNull(); }); diff --git a/src/pg/client.ts b/src/pg/client.ts index ddd9411..2b79664 100644 --- a/src/pg/client.ts +++ b/src/pg/client.ts @@ -5,14 +5,14 @@ interface Querier { query: Client["query"]; } -// TODO: leverage the signal option that comes in on options for `getUnprocessedEvents` and `getEventByIdForUpdateSkipLocked` +// TODO: leverage the signal option that comes in on options for `getReadyToProcessEvents` and `getReadyToProcessEventByIdForUpdateSkipLocked` // to cancel queries if/when supported by `pg` https://github.com/brianc/node-postgres/issues/2774 export const createProcessorClient = ( querier: Querier, table: string = "events", ): TxOBProcessorClient => ({ - getUnprocessedEvents: async (opts) => { + getReadyToProcessEvents: async (opts) => { const events = await querier.query< Pick, "id" | "errors"> >( @@ -25,10 +25,10 @@ export const createProcessorClient = ( try { await querier.query("BEGIN"); await fn({ - getEventByIdForUpdateSkipLocked: async (eventId) => { + getReadyToProcessEventByIdForUpdateSkipLocked: async (eventId, opts) => { const event = await querier.query>( - `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 FOR UPDATE SKIP LOCKED`, - [eventId], + `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`, + [eventId, opts.maxErrors], ); if (event.rowCount === 0) { return null; diff --git a/src/processor.test.ts b/src/processor.test.ts index 36f4ecc..437a258 100644 --- a/src/processor.test.ts +++ b/src/processor.test.ts @@ -9,11 +9,11 @@ import { import { sleep } from "./sleep"; const mockTxClient = { - getEventByIdForUpdateSkipLocked: vi.fn(), + getReadyToProcessEventByIdForUpdateSkipLocked: vi.fn(), updateEvent: vi.fn(), }; const mockClient = { - getUnprocessedEvents: vi.fn(), + getReadyToProcessEvents: vi.fn(), transaction: vi.fn(async (fn) => fn(mockTxClient)), }; @@ -37,12 +37,12 @@ describe("processEvents", () => { backoff: () => now, }; const handlerMap = {}; - mockClient.getUnprocessedEvents.mockImplementation(() => []); + mockClient.getReadyToProcessEvents.mockImplementation(() => []); processEvents(mockClient, handlerMap, opts); - expect(mockClient.getUnprocessedEvents).toHaveBeenCalledOnce(); - expect(mockClient.getUnprocessedEvents).toHaveBeenCalledWith(opts); + expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce(); + expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledWith(opts); expect(mockClient.transaction).not.toHaveBeenCalled(); - expect(mockTxClient.getEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); + expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); expect(mockTxClient.updateEvent).not.toHaveBeenCalled(); }); @@ -113,8 +113,8 @@ describe("processEvents", () => { processed_at: now, }; const events = [evt1, evt2, evt3, evt4]; - mockClient.getUnprocessedEvents.mockImplementation(() => events); - mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => { + mockClient.getReadyToProcessEvents.mockImplementation(() => events); + mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => { if (id === evt3.id) return null; return events.find((e) => e.id === id); @@ -128,8 +128,8 @@ describe("processEvents", () => { await processEvents(mockClient, handlerMap, opts); - expect(mockClient.getUnprocessedEvents).toHaveBeenCalledOnce(); - expect(mockClient.getUnprocessedEvents).toHaveBeenCalledWith(opts); + expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce(); + expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledWith(opts); expect(mockClient.transaction).toHaveBeenCalledTimes(3); @@ -143,7 +143,7 @@ describe("processEvents", () => { }); expect(handlerMap.evtType1.handler3).not.toHaveBeenCalled(); - expect(mockTxClient.getEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( + expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( 3, ); @@ -213,8 +213,8 @@ describe("processEvents", () => { errors: 1, }; const events = [evt1]; - mockClient.getUnprocessedEvents.mockImplementation(() => events); - mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => { + mockClient.getReadyToProcessEvents.mockImplementation(() => events); + mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => { return events.find((e) => e.id === id); }); mockTxClient.updateEvent.mockImplementation(() => { @@ -223,8 +223,8 @@ describe("processEvents", () => { await processEvents(mockClient, handlerMap, opts); - expect(mockClient.getUnprocessedEvents).toHaveBeenCalledOnce(); - expect(mockClient.getUnprocessedEvents).toHaveBeenCalledWith(opts); + expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce(); + expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledWith(opts); expect(mockClient.transaction).toHaveBeenCalledTimes(1); @@ -232,7 +232,7 @@ describe("processEvents", () => { expect(handlerMap.evtType1.handler1).toHaveBeenCalledWith(evt1, { signal: undefined, }); - expect(mockTxClient.getEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( + expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( 1, ); @@ -320,15 +320,15 @@ describe("EventProcessor", () => { backoff: () => now, }; const handlerMap = {}; - mockClient.getUnprocessedEvents.mockImplementation(() => []); + mockClient.getReadyToProcessEvents.mockImplementation(() => []); const processor = EventProcessor(mockClient, handlerMap, opts); processor.start(); await processor.stop(); - expect(mockClient.getUnprocessedEvents).toHaveBeenCalledOnce(); + expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce(); expect(mockClient.transaction).not.toHaveBeenCalled(); - expect(mockTxClient.getEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); + expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); expect(mockTxClient.updateEvent).not.toHaveBeenCalled(); }); }); diff --git a/src/processor.ts b/src/processor.ts index 20840aa..e23349a 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -38,11 +38,12 @@ export type TxOBEventHandlerMap = Record< type TxOBProcessorClientOpts = { signal?: AbortSignal; + maxErrors: number }; export interface TxOBProcessorClient { - getUnprocessedEvents( - opts: TxOBProcessorClientOpts & { maxErrors: number }, + getReadyToProcessEvents( + opts: TxOBProcessorClientOpts, ): Promise, "id" | "errors">[]>; transaction( fn: ( @@ -52,7 +53,7 @@ export interface TxOBProcessorClient { } export interface TxOBTransactionProcessorClient { - getEventByIdForUpdateSkipLocked( + getReadyToProcessEventByIdForUpdateSkipLocked( eventId: TxOBEvent["id"], opts: TxOBProcessorClientOpts, ): Promise | null>; @@ -88,7 +89,7 @@ export const processEvents = async ( ...opts, }; - const events = await client.getUnprocessedEvents(_opts); + const events = await client.getReadyToProcessEvents(_opts); _opts.logger?.debug(`found ${events.length} events to process`); // TODO: consider concurrently processing events with max concurrency configuration @@ -98,9 +99,9 @@ export const processEvents = async ( } if (unlockedEvent.errors >= _opts.maxErrors) { // Potential issue with client configuration on finding unprocessed events - // Events with maximum allowed errors should not be returned from `getUnprocessedEvents` + // Events with maximum allowed errors should not be returned from `getReadyToProcessEvents` _opts.logger?.warn( - "unexpected event with max errors returned from `getUnprocessedEvents`", + "unexpected event with max errors returned from `getReadyToProcessEvents`", { eventId: unlockedEvent.id, errors: unlockedEvent.errors, @@ -112,19 +113,20 @@ export const processEvents = async ( try { await client.transaction(async (txClient) => { - const lockedEvent = await txClient.getEventByIdForUpdateSkipLocked( + const lockedEvent = await txClient.getReadyToProcessEventByIdForUpdateSkipLocked( unlockedEvent.id, - { signal: _opts.signal }, + { signal: _opts.signal, maxErrors: _opts.maxErrors }, ); if (!lockedEvent) { - _opts.logger?.debug("skipping locked event", { + _opts.logger?.debug("skipping locked or already processed event", { eventId: unlockedEvent.id, }); return; } if (lockedEvent.processed_at) { // While unlikely, this is possible if a concurrent processor finished processing this event between the time - // that this processor found the event with `getUnprocessedEvents` and called `getEventByIdForUpdateSkipLocked` + // that this processor found the event with `getReadyToProcessEvents` and called `getReadyToProcessEventByIdForUpdateSkipLocked` + // `getReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources _opts.logger?.debug("skipping already processed event", { eventId: lockedEvent.id, });