Skip to content

Commit

Permalink
fix pg events table name injection
Browse files Browse the repository at this point in the history
  • Loading branch information
dillonstreator committed Dec 22, 2023
1 parent 60cb4a7 commit 83195c1
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 65 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"type": "git",
"url": "git://github.com/dillonstreator/txob.git"
},
"version": "0.0.12",
"version": "0.0.13",
"license": "MIT",
"files": [
"dist",
Expand Down
15 changes: 7 additions & 8 deletions src/pg/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ describe("getUnprocessedEvents", () => {
const result = await client.getUnprocessedEvents(opts);
expect(pgClient.query).toHaveBeenCalledOnce();
expect(pgClient.query).toHaveBeenCalledWith(
"SELECT id, errors FROM $1 WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2",
["events", opts.maxErrors],
"SELECT id, errors FROM events WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1",
[opts.maxErrors],
);
expect(result).toBe(rows);
});
Expand Down Expand Up @@ -82,8 +82,8 @@ describe("transaction", () => {

expect(pgClient.query).toHaveBeenCalledTimes(3);
expect(pgClient.query).toHaveBeenCalledWith(
"SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM $1 WHERE id = $2 FOR UPDATE SKIP LOCKED",
["events", eventId],
"SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 FOR UPDATE SKIP LOCKED",
[eventId],
);
expect(result).toBe(1);
});
Expand All @@ -107,8 +107,8 @@ describe("transaction", () => {

expect(pgClient.query).toHaveBeenCalledTimes(3);
expect(pgClient.query).toHaveBeenCalledWith(
"SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM $1 WHERE id = $2 FOR UPDATE SKIP LOCKED",
["events", eventId],
"SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM events WHERE id = $1 FOR UPDATE SKIP LOCKED",
[eventId],
);
expect(result).toBeNull();
});
Expand Down Expand Up @@ -144,9 +144,8 @@ describe("transaction", () => {

expect(pgClient.query).toHaveBeenCalledTimes(3);
expect(pgClient.query).toHaveBeenCalledWith(
"UPDATE $1 SET handler_results = $2, errors = $3, processed_at = $4, backoff_until = $5 WHERE id = $6",
"UPDATE events SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5",
[
"events",
event.handler_results,
event.errors,
event.processed_at,
Expand Down
111 changes: 55 additions & 56 deletions src/pg/client.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,55 @@
import { Client } from "pg";
import { TxOBEvent, TxOBProcessorClient } from "../processor";

interface Querier {
query: Client["query"];
}

export const createProcessorClient = <EventType extends string>(
querier: Querier,
table: string = "events",
): TxOBProcessorClient<EventType> => ({
getUnprocessedEvents: async (opts) => {
const events = await querier.query<
Pick<TxOBEvent<EventType>, "id" | "errors">
>(
"SELECT id, errors FROM $1 WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2",
[table, opts.maxErrors],
);
return events.rows;
},
transaction: async (fn) => {
try {
await querier.query("BEGIN");
await fn({
getEventByIdForUpdateSkipLocked: async (eventId) => {
const event = await querier.query<TxOBEvent<EventType>>(
`SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM $1 WHERE id = $2 FOR UPDATE SKIP LOCKED`,
[table, eventId],
);
if (event.rowCount === 0) {
return null;
}

return event.rows[0];
},
updateEvent: async (event) => {
await querier.query(
`UPDATE $1 SET handler_results = $2, errors = $3, processed_at = $4, backoff_until = $5 WHERE id = $6`,
[
table,
event.handler_results,
event.errors,
event.processed_at,
event.backoff_until,
event.id,
],
);
},
});
await querier.query("COMMIT");
} catch (error) {
await querier.query("ROLLBACK").catch(() => {});
throw error;
}
},
});
import { Client } from "pg";
import { TxOBEvent, TxOBProcessorClient } from "../processor";

interface Querier {
query: Client["query"];
}

export const createProcessorClient = <EventType extends string>(
querier: Querier,
table: string = "events",
): TxOBProcessorClient<EventType> => ({
getUnprocessedEvents: async (opts) => {
const events = await querier.query<
Pick<TxOBEvent<EventType>, "id" | "errors">
>(
`SELECT id, errors FROM ${table} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1`,
[opts.maxErrors],
);
return events.rows;
},
transaction: async (fn) => {
try {
await querier.query("BEGIN");
await fn({
getEventByIdForUpdateSkipLocked: async (eventId) => {
const event = await querier.query<TxOBEvent<EventType>>(
`SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 FOR UPDATE SKIP LOCKED`,
[eventId],
);
if (event.rowCount === 0) {
return null;
}

return event.rows[0];
},
updateEvent: async (event) => {
await querier.query(
`UPDATE ${table} SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5`,
[
event.handler_results,
event.errors,
event.processed_at,
event.backoff_until,
event.id,
],
);
},
});
await querier.query("COMMIT");
} catch (error) {
await querier.query("ROLLBACK").catch(() => {});
throw error;
}
},
});

0 comments on commit 83195c1

Please sign in to comment.