-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelpers.py
184 lines (145 loc) · 4.95 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import multiprocessing
import os
import signal
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Manager, Process
import psutil
class PredictionProcessingError(Exception):
def __init__(self, prediction, error):
self.prediction = prediction
self.error = error
def __str__(self):
return f"Error for prediction {self.prediction}: {self.error}"
def get_max_workers():
"""
Returns the maximum number of concurrent workers
The optimal number of workers ultimately depends on how many resources
each process will call upon.
To limit this, update the Dockerfile GRAND_CHALLENGE_MAX_WORKERS
"""
environ_cpu_limit = os.getenv("GRAND_CHALLENGE_MAX_WORKERS")
cpu_count = multiprocessing.cpu_count()
return min(
[
int(environ_cpu_limit or cpu_count),
cpu_count,
]
)
def run_prediction_processing(*, fn, predictions):
"""
Processes predictions one by one.
"""
results = []
errors = []
for prediction in predictions:
try:
result = fn(prediction)
results.append(result)
except Exception as e:
errors.append((prediction, e))
if errors:
for prediction, e in errors:
raise PredictionProcessingError(prediction=prediction, error=e) from e
return results
def run_prediction_processing_parallel(*, fn, predictions):
"""
Processes predictions in a separate process.
This takes child processes into account:
- if any child process is terminated, all prediction processing will abort
- after prediction processing is done, all child processes are terminated
Note that the results are returned in completing order.
Parameters
----------
fn : function
Function to execute that will process each prediction
predictions : list
List of predictions.
Returns
-------
A list of results
"""
with Manager() as manager:
results = manager.list()
errors = manager.list()
pool_worker = _start_pool_worker(
fn=fn,
predictions=predictions,
max_workers=get_max_workers(),
results=results,
errors=errors,
)
try:
pool_worker.join()
finally:
pool_worker.terminate()
for prediction, e in errors:
raise PredictionProcessingError(
prediction=prediction,
error=e,
) from e
return list(results)
def _start_pool_worker(fn, predictions, max_workers, results, errors):
process = Process(
target=_pool_worker,
name="PredictionProcessing",
kwargs=dict(
fn=fn,
predictions=predictions,
max_workers=max_workers,
results=results,
errors=errors,
),
)
process.start()
return process
def _pool_worker(*, fn, predictions, max_workers, results, errors):
caught_exception = False
with ProcessPoolExecutor(max_workers=max_workers) as executor:
try:
# Submit the processing tasks of the predictions
futures = [
executor.submit(fn, prediction) for prediction in predictions
]
future_to_predictions = {
future: item
for future, item in zip(futures, predictions)
}
for future in as_completed(future_to_predictions):
try:
result = future.result()
results.append(result)
except Exception as e:
errors.append((future_to_predictions[future], e))
if not caught_exception: # Hard stop
caught_exception = True
executor.shutdown(wait=False, cancel_futures=True)
_terminate_child_processes()
finally:
# Be aggresive in cleaning up any left-over processes
_terminate_child_processes()
def _terminate_child_processes():
process = psutil.Process(os.getpid())
children = process.children(recursive=True)
for child in children:
try:
child.terminate()
except psutil.NoSuchProcess:
pass # Not a problem
# Wait for processes to terminate
_, still_alive = psutil.wait_procs(children, timeout=5)
# Forcefully kill any remaining processes
for p in still_alive:
try:
p.kill()
except psutil.NoSuchProcess:
pass # That is fine
# Finally, prevent zombies by waiting for all child processes
try:
os.waitpid(-1, 0)
except ChildProcessError:
pass # No child processes, that if fine
def listen_to_children_errors():
def handler(*_, **__):
print("A child failed, terminating all other children")
_terminate_child_processes()
signal.signal(signal.SIGCHLD, handler)