-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathWaitingTaskList.h
163 lines (138 loc) · 5.42 KB
/
WaitingTaskList.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#ifndef FWCore_Concurrency_WaitingTaskList_h
#define FWCore_Concurrency_WaitingTaskList_h
// -*- C++ -*-
//
// Package: Concurrency
// Class : WaitingTaskList
//
/**\class WaitingTaskList WaitingTaskList.h FWCore/Concurrency/interface/WaitingTaskList.h
Description: Handles starting tasks once some resource becomes available.
Usage:
This class can be used to have tasks wait to be spawned until a resource is available.
Tasks that want to use the resource are added to the list by calling add(tbb::task*).
When the resource becomes available one calls doneWaiting() and then any waiting tasks will
be spawned. If a call to add() is made after doneWaiting() the newly added task will
immediately be spawned.
The class can be reused by calling reset(). However, reset() is not thread-safe so one
must be certain neither add(...) nor doneWaiting() is called while reset() is running.
An example usage would be if you had a task doing a long calculation (the resource) and
then several other tasks have been created in a different thread and before running those
new tasks you need the result of the long calculation.
\code
class CalcTask : public edm::WaitingTask {
public:
CalcTask(edm::WaitingTaskList* iWL, Value* v):
m_waitList(iWL), m_output(v) {}
tbb::task* execute() {
std::exception_ptr ptr;
try {
*m_output = doCalculation();
} catch(...) {
ptr = std::current_exception();
}
m_waitList.doneWaiting(ptr);
return nullptr;
}
private:
edm::WaitingTaskList* m_waitList;
Value* m_output;
};
\endcode
In one part of the code we can setup the shared resource
\code
WaitingTaskList waitList;
Value v;
\endcode
In another part we can start the calculation
\code
tbb::task* calc = new(tbb::task::allocate_root()) CalcTask(&waitList,&v);
tbb::task::spawn(calc);
\endcode
Finally in some unrelated part of the code we can create tasks that need the calculation
\code
tbb::task* t1 = makeTask1(v);
waitList.add(t1);
tbb::task* t2 = makeTask2(v);
waitList.add(t2);
\endcode
*/
//
// Original Author: Chris Jones
// Created: Thu Feb 21 13:46:31 CST 2013
// $Id$
//
// system include files
#include <atomic>
// user include files
#include "Framework/WaitingTask.h"
#include "Framework/WaitingTaskHolder.h"
// forward declarations
namespace edm {
class WaitingTaskList {
public:
///Constructor
/**The WaitingTaskList is initial set to waiting.
* \param[in] iInitialSize specifies the initial size of the cache used to hold waiting tasks.
* The value is only useful for optimization as the object can resize itself.
*/
explicit WaitingTaskList(unsigned int iInitialSize = 2);
WaitingTaskList(const WaitingTaskList&) = delete; // stop default
const WaitingTaskList& operator=(const WaitingTaskList&) = delete; // stop default
~WaitingTaskList() = default;
// ---------- member functions ---------------------------
/** Use in the case where you need to inform the parent task of a
failure before some other child task which may be run later reports
a different, but related failure. You must later call doneWaiting
with same exception later in the same thread.
*/
void presetTaskAsFailed(std::exception_ptr iExcept);
///Adds task to the waiting list
/**If doneWaiting() has already been called then the added task will immediately be spawned.
* If that is not the case then the task will be held until doneWaiting() is called and will
* then be spawned.
* Calls to add() and doneWaiting() can safely be done concurrently.
*/
void add(tbb::task_group*, WaitingTask*);
///Adds task to the waiting list
/**Calls to add() and doneWaiting() can safely be done concurrently.
*/
void add(WaitingTaskHolder);
///Signals that the resource is now available and tasks should be spawned
/**The owner of the resource calls this function to allow the waiting tasks to
* start accessing it.
* If the task fails, a non 'null' std::exception_ptr should be used.
* To have tasks wait again one must call reset().
* Calls to add() and doneWaiting() can safely be done concurrently.
*/
void doneWaiting(std::exception_ptr iPtr);
///Resets access to the resource so that added tasks will wait.
/**The owner of the resouce calls reset() to make tasks wait.
* Calling reset() is NOT thread safe. The system must guarantee that no tasks are
* using the resource when reset() is called and neither add() nor doneWaiting() can
* be called concurrently with reset().
*/
void reset();
private:
/**Handles spawning the tasks,
* safe to call from multiple threads
*/
void announce();
struct WaitNode {
WaitingTask* m_task;
tbb::task_group* m_group;
std::atomic<WaitNode*> m_next;
bool m_fromCache;
void setNextNode(WaitNode* iNext) { m_next = iNext; }
WaitNode* nextNode() const { return m_next; }
};
WaitNode* createNode(tbb::task_group* iGroup, WaitingTask* iTask);
// ---------- member data --------------------------------
std::atomic<WaitNode*> m_head;
std::unique_ptr<WaitNode[]> m_nodeCache;
std::exception_ptr m_exceptionPtr;
unsigned int m_nodeCacheSize;
std::atomic<unsigned int> m_lastAssignedCacheIndex;
std::atomic<bool> m_waiting;
};
} // namespace edm
#endif