-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathWorker.h
112 lines (95 loc) · 3.51 KB
/
Worker.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#ifndef Worker_h
#define Worker_h
#include <atomic>
#include <vector>
//#include <iostream>
#include "Framework/WaitingTask.h"
#include "Framework/WaitingTaskHolder.h"
#include "Framework/WaitingTaskList.h"
#include "Framework/WaitingTaskWithArenaHolder.h"
namespace edm {
class Event;
class EventSetup;
class ProductRegistry;
class Worker {
public:
virtual ~Worker() = default;
// not thread safe
void setItemsToGet(std::vector<Worker*> workers) { itemsToGet_ = std::move(workers); }
// thread safe
void prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder iTask);
// not thread safe
virtual void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder iTask) = 0;
// not thread safe
virtual void doEndJob() = 0;
// not thread safe
void reset() {
prefetchRequested_ = false;
doReset();
}
protected:
virtual void doReset() = 0;
private:
std::vector<Worker*> itemsToGet_;
std::atomic<bool> prefetchRequested_ = false;
};
template <typename T>
class WorkerT : public Worker {
public:
explicit WorkerT(ProductRegistry& reg) : producer_(reg) {}
void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder task) override {
waitingTasksWork_.add(task);
//std::cout << "doWorkAsync for " << this << " with iTask " << iTask << std::endl;
bool expected = false;
if (workStarted_.compare_exchange_strong(expected, true)) {
//std::cout << "first doWorkAsync call" << std::endl;
WaitingTask* moduleTask =
make_waiting_task([this, &event, &eventSetup](std::exception_ptr const* iPtr) mutable {
if (iPtr) {
waitingTasksWork_.doneWaiting(*iPtr);
} else {
std::exception_ptr exceptionPtr;
try {
//std::cout << "calling doProduce " << this << std::endl;
producer_.doProduce(event, eventSetup);
} catch (...) {
exceptionPtr = std::current_exception();
}
//std::cout << "waitingTasksWork_.doneWaiting " << this << std::endl;
waitingTasksWork_.doneWaiting(exceptionPtr);
}
});
auto* group = task.group();
if (producer_.hasAcquire()) {
WaitingTaskWithArenaHolder runProduceHolder{*group, moduleTask};
moduleTask = make_waiting_task([this, &event, &eventSetup, runProduceHolder = std::move(runProduceHolder)](
std::exception_ptr const* iPtr) mutable {
if (iPtr) {
runProduceHolder.doneWaiting(*iPtr);
} else {
std::exception_ptr exceptionPtr;
try {
producer_.doAcquire(event, eventSetup, runProduceHolder);
} catch (...) {
exceptionPtr = std::current_exception();
}
runProduceHolder.doneWaiting(exceptionPtr);
}
});
}
//std::cout << "calling prefetchAsync " << this << " with moduleTask " << moduleTask << std::endl;
prefetchAsync(event, eventSetup, WaitingTaskHolder(*group, moduleTask));
}
}
void doEndJob() override { producer_.doEndJob(); }
private:
void doReset() override {
waitingTasksWork_.reset();
workStarted_ = false;
}
T producer_;
WaitingTaskList waitingTasksWork_;
std::atomic<bool> workStarted_ = false;
};
} // namespace edm
#endif