-
Notifications
You must be signed in to change notification settings - Fork 88
/
Copy pathcalculateAveragePypy.py
161 lines (133 loc) · 4.44 KB
/
calculateAveragePypy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# time pypy3 calculateAveragePypy.py
import os
import multiprocessing as mp
from gc import disable as gc_disable, enable as gc_enable
def get_file_chunks(
file_name: str,
max_cpu: int = 8,
) -> list:
"""Split flie into chunks"""
cpu_count = min(max_cpu, mp.cpu_count())
file_size = os.path.getsize(file_name)
chunk_size = file_size // cpu_count
start_end = list()
with open(file_name, encoding="utf-8", mode="r+b") as f:
def is_new_line(position):
if position == 0:
return True
else:
f.seek(position - 1)
return f.read(1) == b"\n"
def next_line(position):
f.seek(position)
f.readline()
return f.tell()
chunk_start = 0
while chunk_start < file_size:
chunk_end = min(file_size, chunk_start + chunk_size)
while not is_new_line(chunk_end):
chunk_end -= 1
if chunk_start == chunk_end:
chunk_end = next_line(chunk_end)
start_end.append(
(
file_name,
chunk_start,
chunk_end,
)
)
chunk_start = chunk_end
return (
cpu_count,
start_end,
)
def _process_file_chunk(
file_name: str,
chunk_start: int,
chunk_end: int,
blocksize: int = 1024 * 1024,
) -> dict:
"""Process each file chunk in a different process"""
result = dict()
with open(file_name, encoding="utf-8", mode="r+b") as fh:
fh.seek(chunk_start)
gc_disable()
tail = b""
location = None
byte_count = chunk_end - chunk_start
while byte_count > 0:
if blocksize > byte_count:
blocksize = byte_count
byte_count -= blocksize
index = 0
data = tail + fh.read(blocksize)
while data:
if location is None:
try:
semicolon = data.index(b";", index)
except ValueError:
tail = data[index:]
break
location = data[index:semicolon]
index = semicolon + 1
try:
newline = data.index(b"\n", index)
except ValueError:
tail = data[index:]
break
value = float(data[index:newline])
index = newline + 1
try:
_result = result[location]
if value < _result[0]:
_result[0] = value
if value > _result[1]:
_result[1] = value
_result[2] += value
_result[3] += 1
except KeyError:
result[location] = [
value,
value,
value,
1,
] # min, max, sum, count
location = None
gc_enable()
return result
def process_file(
cpu_count: int,
start_end: list,
) -> dict:
"""Process data file"""
with mp.Pool(cpu_count) as p:
# Run chunks in parallel
chunk_results = p.starmap(
_process_file_chunk,
start_end,
)
# Combine all results from all chunks
result = dict()
for chunk_result in chunk_results:
for location, measurements in chunk_result.items():
if location not in result:
result[location] = measurements
else:
_result = result[location]
if measurements[0] < _result[0]:
_result[0] = measurements[0]
if measurements[1] > _result[1]:
_result[1] = measurements[1]
_result[2] += measurements[2]
_result[3] += measurements[3]
# Print final results
print("{", end="")
for location, measurements in sorted(result.items()):
print(
f"{location.decode('utf-8')}={measurements[0]:.1f}/{(measurements[2] / measurements[3]) if measurements[3] !=0 else 0:.1f}/{measurements[1]:.1f}",
end=", ",
)
print("\b\b} ")
if __name__ == "__main__":
cpu_count, *start_end = get_file_chunks("measurements.txt")
process_file(cpu_count, start_end[0])