-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprocessify.py
135 lines (106 loc) · 3.63 KB
/
processify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# modified from https://gist.github.com/schlamar/2311116#file-processify-py-L17
# also see http://stackoverflow.com/questions/2046603/is-it-possible-to-run-function-in-a-subprocess-without-threading-or-writing-a-se
import inspect
import os
import sys
import traceback
from functools import wraps
from multiprocessing import Process, Queue
class Sentinel:
pass
def processify(func):
'''Decorator to run a function as a process.
Be sure that every argument and the return value
is *pickable*.
The created process is joined, so the code does not
run in parallel.
'''
def process_generator_func(q, *args, **kwargs):
result = None
error = None
it = iter(func())
while error is None and result != Sentinel:
try:
result = next(it)
error = None
except StopIteration:
result = Sentinel
error = None
except Exception:
ex_type, ex_value, tb = sys.exc_info()
error = ex_type, ex_value, ''.join(traceback.format_tb(tb))
result = None
q.put((result, error))
def process_func(q, *args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception:
ex_type, ex_value, tb = sys.exc_info()
error = ex_type, ex_value, ''.join(traceback.format_tb(tb))
result = None
else:
error = None
q.put((result, error))
def wrap_func(*args, **kwargs):
# register original function with different name
# in sys.modules so it is pickable
process_func.__name__ = func.__name__ + 'processify_func'
setattr(sys.modules[__name__], process_func.__name__, process_func)
q = Queue()
p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs)
p.start()
result, error = q.get()
p.join()
if error:
ex_type, ex_value, tb_str = error
message = '%s (in subprocess)\n%s' % (str(ex_value), tb_str)
raise ex_type(message)
return result
def wrap_generator_func(*args, **kwargs):
# register original function with different name
# in sys.modules so it is pickable
process_generator_func.__name__ = func.__name__ + 'processify_generator_func'
setattr(sys.modules[__name__], process_generator_func.__name__, process_generator_func)
q = Queue()
p = Process(target=process_generator_func, args=[q] + list(args), kwargs=kwargs)
p.start()
result = None
error = None
while error is None:
result, error = q.get()
if result == Sentinel:
break
yield result
p.join()
if error:
ex_type, ex_value, tb_str = error
message = '%s (in subprocess)\n%s' % (str(ex_value), tb_str)
raise ex_type(message)
@wraps(func)
def wrapper(*args, **kwargs):
if inspect.isgeneratorfunction(func):
return wrap_generator_func(*args, **kwargs)
else:
return wrap_func(*args, **kwargs)
return wrapper
@processify
def test_function():
return os.getpid()
@processify
def test_generator_func():
for msg in ["generator", "function"]:
yield msg
@processify
def test_deadlock():
return range(30000)
@processify
def test_exception():
raise RuntimeError('xyz')
def test():
print(os.getpid())
print(test_function())
print(list(test_generator_func()))
print(len(test_deadlock()))
test_exception()
if __name__ == '__main__':
test()