Skip to content

Commit

Permalink
improve clarity of client methods and improve pg & mongodb clients pe…
Browse files Browse the repository at this point in the history
…rformance by skipping already processed or 'unready' events
  • Loading branch information
dillonstreator committed Dec 22, 2023
1 parent a99d56e commit 49c5922
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 92 deletions.
40 changes: 3 additions & 37 deletions examples/pg/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import http from "node:http";
import { randomUUID } from "node:crypto";
import { Client } from "pg";
import { EventProcessor } from "../../src/processor";
import { createProcessorClient } from "../../src/pg/client";
import dotenv from "dotenv";
import gracefulShutdown from "http-graceful-shutdown";
dotenv.config();
Expand All @@ -11,7 +9,7 @@ const eventTypes = {
ResourceSaved: "ResourceSaved",
} as const;

type EventType = keyof typeof eventTypes;
export type EventType = keyof typeof eventTypes;

const main = async () => {
const client = new Client({
Expand All @@ -22,34 +20,6 @@ const main = async () => {
await client.connect();
await migrate(client);

const processor = EventProcessor(
createProcessorClient<EventType>(client),
{
ResourceSaved: {
thing1: async (event) => {
console.log(`${event.id} thing1 ${event.correlation_id}`);
if (Math.random() > 0.9) throw new Error("some issue");

return;
},
thing2: async (event) => {
console.log(`${event.id} thing2 ${event.correlation_id}`);
if (Math.random() > 0.9) throw new Error("some issue");

return;
},
thing3: async (event) => {
console.log(`${event.id} thing3 ${event.correlation_id}`);
if (Math.random() > 0.75) throw new Error("some issue");

return;
},
},
},
{ sleepTimeMs: 5000, logger: console },
);
processor.start();

const server = http.createServer(async (req, res) => {
const correlationId = randomUUID();
try {
Expand Down Expand Up @@ -97,11 +67,7 @@ const main = async () => {
const port = process.env.PORT || 3000;
server.listen(port, () => console.log(`listening on ${port}`));

gracefulShutdown(server, {
onShutdown: async () => {
await processor.stop();
},
});
gracefulShutdown(server);
};

if (require.main === module) {
Expand All @@ -111,7 +77,7 @@ if (require.main === module) {
});
}

const migrate = async (client: Client): Promise<void> => {
export const migrate = async (client: Client): Promise<void> => {
await client.query(`CREATE TABLE IF NOT EXISTS events (
id UUID,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
Expand Down
2 changes: 1 addition & 1 deletion examples/pg/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"pg": "^8.11.3"
},
"devDependencies": {
"@types/node": "^20.10.4",
"@types/node": "^20.10.5",
"@types/pg": "^8.10.9",
"dotenv": "^16.3.1",
"nodemon": "^3.0.2",
Expand Down
67 changes: 67 additions & 0 deletions examples/pg/processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { Client } from "pg";
import { EventProcessor } from "../../src/processor";
import { createProcessorClient } from "../../src/pg/client";
import { migrate, type EventType } from "./index";
import dotenv from "dotenv";
dotenv.config();

let processor: ReturnType<typeof EventProcessor> | undefined = undefined;

(async () => {
const client = new Client({
user: process.env.POSTGRES_USER,
password: process.env.POSTGRES_PASSWORD,
database: process.env.POSTGRES_DB,
});
await client.connect();
await migrate(client);

processor = EventProcessor(
createProcessorClient<EventType>(client),
{
ResourceSaved: {
thing1: async (event) => {
console.log(`${event.id} thing1 ${event.correlation_id}`);
if (Math.random() > 0.9) throw new Error("some issue");

return;
},
thing2: async (event) => {
console.log(`${event.id} thing2 ${event.correlation_id}`);
if (Math.random() > 0.9) throw new Error("some issue");

return;
},
thing3: async (event) => {
console.log(`${event.id} thing3 ${event.correlation_id}`);
if (Math.random() > 0.75) throw new Error("some issue");

return;
},
},
},
{ sleepTimeMs: 5000, logger: console },
);
processor.start();
})();

const shutdown = (() => {
let shutdownStarted = false;
return () => {
if (shutdownStarted) return;

shutdownStarted = true;

processor
?.stop()
.then(() => {
process.exit(0);
})
.catch((err) => {
console.error(err);
process.exit(1);
});
};
})();
process.once("SIGTERM", shutdown);
process.once("SIGINT", shutdown);
9 changes: 8 additions & 1 deletion examples/pg/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,20 @@
resolved "https://registry.yarnpkg.com/@tsconfig/node16/-/node16-1.0.4.tgz#0b92dcc0cc1c81f6f306a381f28e31b1a56536e9"
integrity sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==

"@types/node@*", "@types/node@^20.10.4":
"@types/node@*":
version "20.10.4"
resolved "https://registry.yarnpkg.com/@types/node/-/node-20.10.4.tgz#b246fd84d55d5b1b71bf51f964bd514409347198"
integrity sha512-D08YG6rr8X90YB56tSIuBaddy/UXAA9RKJoFvrsnogAum/0pmjkgi4+2nx96A330FmioegBWmEYQ+syqCFaveg==
dependencies:
undici-types "~5.26.4"

"@types/node@^20.10.5":
version "20.10.5"
resolved "https://registry.yarnpkg.com/@types/node/-/node-20.10.5.tgz#47ad460b514096b7ed63a1dae26fad0914ed3ab2"
integrity sha512-nNPsNE65wjMxEKI93yOP+NPGGBJz/PoN3kZsVLee0XMiJolxSekEVD8wRwBUBqkwc7UWop0edW50yrCQW4CyRw==
dependencies:
undici-types "~5.26.4"

"@types/pg@^8.10.9":
version "8.10.9"
resolved "https://registry.yarnpkg.com/@types/pg/-/pg-8.10.9.tgz#d20bb948c6268c5bd847e2bf968f1194c5a2355a"
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"type": "git",
"url": "git://github.com/dillonstreator/txob.git"
},
"version": "0.0.15",
"version": "0.0.16",
"license": "MIT",
"files": [
"dist",
Expand Down
23 changes: 15 additions & 8 deletions src/mongodb/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ import { MongoClient, ObjectId } from "mongodb";
import { TxOBEvent, TxOBProcessorClient } from "../processor";
import { getDate } from "../date";

const createReadyToProcessFilter = (maxErrors: number) => ({
processed_at: null,
$or: [{ backoff_until: null }, { backoff_until: { $lt: getDate() } }],
errors: { $lt: maxErrors },
});

export const createProcessorClient = <EventType extends string>(
mongo: MongoClient,
db: string,
collection: string = "events",
): TxOBProcessorClient<EventType> => ({
getUnprocessedEvents: async (opts) => {
getReadyToProcessEvents: async (opts) => {
const events = (await mongo
.db(db)
.collection(collection)
.find({
processed_at: null,
$or: [{ backoff_until: null }, { backoff_until: { $lt: getDate() } }],
errors: { $lt: opts.maxErrors },
})
.find(createReadyToProcessFilter(opts.maxErrors))
.project({ id: 1, errors: 1 })
.toArray()) as Pick<TxOBEvent<EventType>, "id" | "errors">[];

Expand All @@ -24,14 +26,17 @@ export const createProcessorClient = <EventType extends string>(
transaction: async (fn) => {
await mongo.withSession(async (session): Promise<void> => {
await fn({
getEventByIdForUpdateSkipLocked: async (eventId) => {
getReadyToProcessEventByIdForUpdateSkipLocked: async (
eventId,
opts,
) => {
try {
// https://www.mongodb.com/blog/post/how-to-select--for-update-inside-mongodb-transactions
const event = (await mongo
.db(db)
.collection(collection)
.findOneAndUpdate(
{ id: eventId },
{ id: eventId, ...createReadyToProcessFilter(opts.maxErrors) },
{
$set: {
lock: new ObjectId(),
Expand All @@ -53,6 +58,8 @@ export const createProcessorClient = <EventType extends string>(
},
)) as unknown;

if (!event) return null;

return event as TxOBEvent<EventType>;
} catch (error) {
return null;
Expand Down
20 changes: 10 additions & 10 deletions src/pg/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ describe("createProcessorClient", () => {
query: vi.fn(),
};
const client = createProcessorClient(pgClient);
expect(typeof client.getUnprocessedEvents).toBe("function");
expect(typeof client.getReadyToProcessEvents).toBe("function");
expect(typeof client.transaction).toBe("function");
});
});

describe("getUnprocessedEvents", () => {
describe("getReadyToProcessEvents", () => {
it("should execute the correct query", async () => {
const rows = [1, 2, 3];
const pgClient = {
Expand All @@ -26,7 +26,7 @@ describe("getUnprocessedEvents", () => {
maxErrors: 10,
};
const client = createProcessorClient(pgClient);
const result = await client.getUnprocessedEvents(opts);
const result = await client.getReadyToProcessEvents(opts);
expect(pgClient.query).toHaveBeenCalledOnce();
expect(pgClient.query).toHaveBeenCalledWith(
"SELECT id, errors FROM events WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1",
Expand Down Expand Up @@ -62,7 +62,7 @@ describe("transaction", () => {
expect(pgClient.query).toHaveBeenNthCalledWith(2, "ROLLBACK");
});

describe("getEventByIdForUpdateSkipLocked", () => {
describe("getReadyToProcessEventByIdForUpdateSkipLocked", () => {
it("should execute the correct query", async () => {
const rows = [1, 2, 3];
const pgClient = {
Expand All @@ -77,13 +77,13 @@ describe("transaction", () => {
const client = createProcessorClient(pgClient);
let result: any;
await client.transaction(async (txClient) => {
result = await txClient.getEventByIdForUpdateSkipLocked(eventId, {});
result = await txClient.getReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 });
});

expect(pgClient.query).toHaveBeenCalledTimes(3);
expect(pgClient.query).toHaveBeenCalledWith(
"SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 FOR UPDATE SKIP LOCKED",
[eventId],
"SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED",
[eventId, 6],
);
expect(result).toBe(1);
});
Expand All @@ -102,13 +102,13 @@ describe("transaction", () => {
const client = createProcessorClient(pgClient);
let result: any;
await client.transaction(async (txClient) => {
result = await txClient.getEventByIdForUpdateSkipLocked(eventId, {});
result = await txClient.getReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 });
});

expect(pgClient.query).toHaveBeenCalledTimes(3);
expect(pgClient.query).toHaveBeenCalledWith(
"SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 FOR UPDATE SKIP LOCKED",
[eventId],
"SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED",
[eventId, 5],
);
expect(result).toBeNull();
});
Expand Down
10 changes: 5 additions & 5 deletions src/pg/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ interface Querier {
query: Client["query"];
}

// TODO: leverage the signal option that comes in on options for `getUnprocessedEvents` and `getEventByIdForUpdateSkipLocked`
// TODO: leverage the signal option that comes in on options for `getReadyToProcessEvents` and `getReadyToProcessEventByIdForUpdateSkipLocked`
// to cancel queries if/when supported by `pg` https://github.com/brianc/node-postgres/issues/2774

export const createProcessorClient = <EventType extends string>(
querier: Querier,
table: string = "events",
): TxOBProcessorClient<EventType> => ({
getUnprocessedEvents: async (opts) => {
getReadyToProcessEvents: async (opts) => {
const events = await querier.query<
Pick<TxOBEvent<EventType>, "id" | "errors">
>(
Expand All @@ -25,10 +25,10 @@ export const createProcessorClient = <EventType extends string>(
try {
await querier.query("BEGIN");
await fn({
getEventByIdForUpdateSkipLocked: async (eventId) => {
getReadyToProcessEventByIdForUpdateSkipLocked: async (eventId, opts) => {
const event = await querier.query<TxOBEvent<EventType>>(
`SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 FOR UPDATE SKIP LOCKED`,
[eventId],
`SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`,
[eventId, opts.maxErrors],
);
if (event.rowCount === 0) {
return null;
Expand Down
Loading

0 comments on commit 49c5922

Please sign in to comment.