From 83195c19e4295a3ddc7dc39e460107784826396c Mon Sep 17 00:00:00 2001 From: dillonstreator Date: Thu, 21 Dec 2023 18:58:48 -0600 Subject: [PATCH] fix pg events table name injection --- package.json | 2 +- src/pg/client.test.ts | 15 +++--- src/pg/client.ts | 111 +++++++++++++++++++++--------------------- 3 files changed, 63 insertions(+), 65 deletions(-) diff --git a/package.json b/package.json index 4220e13..cec3bd5 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "type": "git", "url": "git://github.com/dillonstreator/txob.git" }, - "version": "0.0.12", + "version": "0.0.13", "license": "MIT", "files": [ "dist", diff --git a/src/pg/client.test.ts b/src/pg/client.test.ts index 0e29cfe..14c5cab 100644 --- a/src/pg/client.test.ts +++ b/src/pg/client.test.ts @@ -29,8 +29,8 @@ describe("getUnprocessedEvents", () => { const result = await client.getUnprocessedEvents(opts); expect(pgClient.query).toHaveBeenCalledOnce(); expect(pgClient.query).toHaveBeenCalledWith( - "SELECT id, errors FROM $1 WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2", - ["events", opts.maxErrors], + "SELECT id, errors FROM events WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1", + [opts.maxErrors], ); expect(result).toBe(rows); }); @@ -82,8 +82,8 @@ describe("transaction", () => { expect(pgClient.query).toHaveBeenCalledTimes(3); expect(pgClient.query).toHaveBeenCalledWith( - "SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM $1 WHERE id = $2 FOR UPDATE SKIP LOCKED", - ["events", eventId], + "SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 FOR UPDATE SKIP LOCKED", + [eventId], ); expect(result).toBe(1); }); @@ -107,8 +107,8 @@ describe("transaction", () => { expect(pgClient.query).toHaveBeenCalledTimes(3); expect(pgClient.query).toHaveBeenCalledWith( - "SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM $1 WHERE id = $2 FOR UPDATE SKIP LOCKED", - ["events", eventId], + "SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 FOR UPDATE SKIP LOCKED", + [eventId], ); expect(result).toBeNull(); }); @@ -144,9 +144,8 @@ describe("transaction", () => { expect(pgClient.query).toHaveBeenCalledTimes(3); expect(pgClient.query).toHaveBeenCalledWith( - "UPDATE $1 SET handler_results = $2, errors = $3, processed_at = $4, backoff_until = $5 WHERE id = $6", + "UPDATE events SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5", [ - "events", event.handler_results, event.errors, event.processed_at, diff --git a/src/pg/client.ts b/src/pg/client.ts index 67f9663..4352f8b 100644 --- a/src/pg/client.ts +++ b/src/pg/client.ts @@ -1,56 +1,55 @@ -import { Client } from "pg"; -import { TxOBEvent, TxOBProcessorClient } from "../processor"; - -interface Querier { - query: Client["query"]; -} - -export const createProcessorClient = ( - querier: Querier, - table: string = "events", -): TxOBProcessorClient => ({ - getUnprocessedEvents: async (opts) => { - const events = await querier.query< - Pick, "id" | "errors"> - >( - "SELECT id, errors FROM $1 WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2", - [table, opts.maxErrors], - ); - return events.rows; - }, - transaction: async (fn) => { - try { - await querier.query("BEGIN"); - await fn({ - getEventByIdForUpdateSkipLocked: async (eventId) => { - const event = await querier.query>( - `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM $1 WHERE id = $2 FOR UPDATE SKIP LOCKED`, - [table, eventId], - ); - if (event.rowCount === 0) { - return null; - } - - return event.rows[0]; - }, - updateEvent: async (event) => { - await querier.query( - `UPDATE $1 SET handler_results = $2, errors = $3, processed_at = $4, backoff_until = $5 WHERE id = $6`, - [ - table, - event.handler_results, - event.errors, - event.processed_at, - event.backoff_until, - event.id, - ], - ); - }, - }); - await querier.query("COMMIT"); - } catch (error) { - await querier.query("ROLLBACK").catch(() => {}); - throw error; - } - }, -}); +import { Client } from "pg"; +import { TxOBEvent, TxOBProcessorClient } from "../processor"; + +interface Querier { + query: Client["query"]; +} + +export const createProcessorClient = ( + querier: Querier, + table: string = "events", +): TxOBProcessorClient => ({ + getUnprocessedEvents: async (opts) => { + const events = await querier.query< + Pick, "id" | "errors"> + >( + `SELECT id, errors FROM ${table} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1`, + [opts.maxErrors], + ); + return events.rows; + }, + transaction: async (fn) => { + try { + await querier.query("BEGIN"); + await fn({ + getEventByIdForUpdateSkipLocked: async (eventId) => { + const event = await querier.query>( + `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 FOR UPDATE SKIP LOCKED`, + [eventId], + ); + if (event.rowCount === 0) { + return null; + } + + return event.rows[0]; + }, + updateEvent: async (event) => { + await querier.query( + `UPDATE ${table} SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5`, + [ + event.handler_results, + event.errors, + event.processed_at, + event.backoff_until, + event.id, + ], + ); + }, + }); + await querier.query("COMMIT"); + } catch (error) { + await querier.query("ROLLBACK").catch(() => {}); + throw error; + } + }, +});