-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathAsyncProcessor.cls
290 lines (250 loc) · 8.72 KB
/
AsyncProcessor.cls
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
public abstract without sharing class AsyncProcessor implements Database.AllowsCallouts, Database.Batchable<SObject>, Database.RaisesPlatformEvents, Process {
private Boolean getWasCalled = false;
private Boolean hasBeenEnqueuedAsync = false;
private String query;
private AsyncProcessorQueueable queueable;
protected List<SObject> records;
private final List<SObject> chunkRecords = new List<SObject>();
// hack to wrap what would have been a Queueable into a batch process:
private static final String FALLBACK_QUERY = 'SELECT Id FROM Organization';
@TestVisible
private static BatchApexErrorEvent firedErrorEvent;
public interface Process {
String kickoff();
}
/**
* Process interface-related methods
*/
public Process get(String query) {
return this.getProcess(query?.toLowerCase(), null);
}
public Process get(List<SObject> records) {
return this.getProcess(null, records);
}
public String kickoff() {
this.validate();
if (this.queueable != null && this.getCanEnqueue()) {
return this.queueable.kickoff();
}
return Database.executeBatch(this, this.getBatchChunkSize());
}
/**
* Batchable implementation methods, including overrideable "finish" method
*/
public Database.QueryLocator start(Database.BatchableContext bc) {
return Database.getQueryLocator(this.query ?? FALLBACK_QUERY);
}
public void execute(
Database.BatchableContext bc,
List<SObject> localRecords
) {
this.hasBeenEnqueuedAsync = false;
Integer chunkSize = this.getBatchChunkSize();
if (this.query == null && this.records != null) {
while (this.records.size() > chunkSize) {
this.chunkRecords.add(this.records.remove(this.records.size() - 1));
}
}
this.innerExecute(this.records ?? localRecords);
if (this.chunkRecords.isEmpty() == false) {
this.records = new List<SObject>(this.chunkRecords);
this.kickoff();
}
}
public virtual void finish(Database.BatchableContext bc) {
}
/** Subclasses define their async processing logic using an override for "innerExecute"**/
protected abstract void innerExecute(List<SObject> records);
/** Subclasses can override this method to ensure finish is only called once if recursive queueables are involved */
protected virtual Boolean isFinished() {
return true;
}
protected virtual Integer getLimitToBatch() {
return Limits.getLimitQueryRows();
}
protected virtual Integer getBatchChunkSize() {
return 2000;
}
protected virtual Integer getQueueableChunkSize() {
return this.records?.size() ?? this.getBatchChunkSize();
}
private void validate() {
if (this.getWasCalled == false) {
throw new AsyncException(
'Please call get() to retrieve the correct Process instance before calling kickoff'
);
} else if (
System.isBatch() &&
this.queueable == null &&
this.chunkRecords.isEmpty() == false
) {
this.queueable = new AsyncProcessorQueueable(this);
this.chunkRecords.clear();
}
}
private Process getProcess(String query, List<SObject> records) {
this.getWasCalled = true;
this.records = records;
this.query = query;
Integer recordCount = query == null
? records.size()
: this.getRecordCount(query);
Boolean shouldBatch = recordCount > this.getLimitToBatch();
Process process = this;
if (shouldBatch == false && this.getCanEnqueue()) {
process = new AsyncProcessorQueueable(this);
}
return process;
}
private Integer getRecordCount(String query) {
String countQuery = query.replace(
query.substringBeforeLast(' from '),
'select count() '
);
Integer possibleOrderStatement = query.lastIndexOfIgnoreCase('order by');
if (possibleOrderStatement > -1) {
countQuery = countQuery.replace(
query.substring(possibleOrderStatement),
''
);
}
return Database.countquery(countQuery);
}
private Boolean getCanEnqueue() {
Integer currentQueueableCount = Limits.getQueueableJobs();
// Sync transactions can enqueue up to 50 processes
// but only one Queueable can be started per async transaction
if (
this.hasBeenEnqueuedAsync == false &&
currentQueueableCount < Limits.getLimitQueueableJobs()
) {
this.hasBeenEnqueuedAsync = this.isAsync();
return true;
} else if (this.isAsync()) {
return currentQueueableCount < 1;
} else {
return true;
}
}
private Boolean isAsync() {
return System.isQueueable() || System.isBatch() || System.isFuture();
}
private class AsyncProcessorQueueable implements Database.AllowsCallouts, System.Queueable, System.Finalizer, Process {
private final AsyncProcessor processor;
private Boolean hasFinalizerBeenAttached = false;
private Boolean shouldRequeue = false;
public AsyncProcessorQueueable(AsyncProcessor processor) {
this.processor = processor;
this.processor.queueable = this;
}
public String kickoff() {
this.processor.validate();
if (this.processor.getCanEnqueue() == false) {
return this.processor.kickoff();
}
this.hasFinalizerBeenAttached = false;
return System.enqueueJob(this);
}
public void execute(System.QueueableContext qc) {
// once we've enqueued, it's fine to reset this flag
this.processor.hasBeenEnqueuedAsync = false;
if (this.hasFinalizerBeenAttached == false) {
this.hasFinalizerBeenAttached = true;
System.attachFinalizer(this);
}
if (this.processor.records == null && this.processor.query != null) {
this.processor.records = Database.query(this.processor.query);
}
List<SObject> splitRecords = this.splitProcessorRecords();
this.processor.innerExecute(this.processor.records);
if (splitRecords.isEmpty() == false) {
this.shouldRequeue = true;
this.processor.records.clear();
this.processor.records.addAll(splitRecords);
} else if (this.processor.isFinished()) {
this.processor.finish(new QueueableToBatchableContext(qc));
}
}
public void execute(System.FinalizerContext fc) {
switch on fc?.getResult() {
when UNHANDLED_EXCEPTION {
this.fireBatchApexErrorEvent(fc);
}
when else {
if (this.shouldRequeue) {
this.shouldRequeue = false;
this.kickoff();
}
}
}
}
private List<SObject> splitProcessorRecords() {
List<SObject> splitRecords = new List<SObject>();
while (
this.processor.records.size() > this.processor.getQueueableChunkSize()
) {
splitRecords.add(
this.processor.records.remove(this.processor.records.size() - 1)
);
}
return splitRecords;
}
private void fireBatchApexErrorEvent(System.FinalizerContext fc) {
String fullLengthJobScope = String.join(this.getRecordsInScope(), ',');
Integer jobScopeLengthLimit = 40000;
Integer textFieldLengthLimit = 5000;
// initializing a BatchApexErrorEvent works as of Spring 23
// but we can't promise it always will - use accordingly!
BatchApexErrorEvent errorEvent = new BatchApexErrorEvent(
AsyncApexJobId = fc.getAsyncApexJobId(),
DoesExceedJobScopeMaxLength = fullLengthJobScope.length() >
jobScopeLengthLimit,
ExceptionType = fc.getException().getTypeName(),
JobScope = this.getSafeSubstring(
fullLengthJobScope,
jobScopeLengthLimit
)
.removeEnd(','),
Message = this.getSafeSubstring(
fc.getException().getMessage(),
textFieldLengthLimit
),
Phase = 'EXECUTE',
StackTrace = this.getSafeSubstring(
fc.getException().getStacktraceString(),
textFieldLengthLimit
)
);
Database.SaveResult publishResult = EventBus.publish(errorEvent);
if (publishResult.isSuccess()) {
firedErrorEvent = errorEvent;
}
}
private List<String> getRecordsInScope() {
List<String> scope = new List<String>();
for (
Id recordId : new Map<Id, SObject>(this.processor.records).keySet()
) {
scope.add(recordId);
}
return scope;
}
private String getSafeSubstring(String target, Integer maxLength) {
return target.length() > maxLength
? target.substring(0, maxLength)
: target;
}
}
private class QueueableToBatchableContext implements Database.BatchableContext {
private final Id jobId;
public QueueableToBatchableContext(System.QueueableContext qc) {
this.jobId = qc.getJobId();
}
public Id getJobId() {
return this.jobId;
}
public Id getChildJobId() {
return null;
}
}
}