-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.ts
92 lines (87 loc) · 2.63 KB
/
client.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import { MongoClient, ObjectId } from "mongodb";
import { TxOBEvent, TxOBProcessorClient } from "../processor";
import { getDate } from "../date";
const createReadyToProcessFilter = (maxErrors: number) => ({
processed_at: null,
$or: [{ backoff_until: null }, { backoff_until: { $lt: getDate() } }],
errors: { $lt: maxErrors },
});
export const createProcessorClient = <EventType extends string>(
mongo: MongoClient,
db: string,
collection: string = "events",
): TxOBProcessorClient<EventType> => ({
getEventsToProcess: async (opts) => {
const events = (await mongo
.db(db)
.collection(collection)
.find(createReadyToProcessFilter(opts.maxErrors))
.project({ id: 1, errors: 1 })
.toArray()) as Pick<TxOBEvent<EventType>, "id" | "errors">[];
return events;
},
transaction: async (fn) => {
await mongo.withSession(async (session): Promise<void> => {
await fn({
getEventByIdForUpdateSkipLocked: async (
eventId,
opts,
) => {
try {
// https://www.mongodb.com/blog/post/how-to-select--for-update-inside-mongodb-transactions
const event = (await mongo
.db(db)
.collection(collection)
.findOneAndUpdate(
{ id: eventId, ...createReadyToProcessFilter(opts.maxErrors) },
{
$set: {
lock: new ObjectId(),
},
},
{
session,
projection: {
id: 1,
timestamp: 1,
type: 1,
data: 1,
correlation_id: 1,
handler_results: 1,
errors: 1,
backoff_until: 1,
processed_at: 1,
},
},
)) as unknown;
if (!event) return null;
return event as TxOBEvent<EventType>;
} catch (error) {
return null;
}
},
updateEvent: async (event) => {
await mongo
.db(db)
.collection(collection)
.updateOne(
{
id: event.id,
},
{
$set: {
handler_results: event.handler_results,
errors: event.errors,
processed_at: event.processed_at,
backoff_until: event.backoff_until,
},
},
{
session,
},
);
},
});
});
},
});