forked from ifnesi/1brc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcalculateAverage.py
132 lines (112 loc) · 3.68 KB
/
calculateAverage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# time python3 calculateAverage.py
import os
from gc import disable as gc_disable, enable as gc_enable
import multiprocessing as mp
def get_file_chunks(
file_name: str,
max_cpu: int = 8,
) -> tuple[int, list[tuple[str, int, int]]]:
"""Split flie into chunks"""
cpu_count = min(max_cpu, mp.cpu_count())
file_size = os.path.getsize(file_name)
chunk_size = file_size // cpu_count
start_end = list()
with open(file_name, encoding="utf-8", mode="r+b") as f:
def is_new_line(position):
if position == 0:
return True
else:
f.seek(position - 1)
return f.read(1) == b"\n"
def next_line(position):
f.seek(position)
f.readline()
return f.tell()
chunk_start = 0
while chunk_start < file_size:
chunk_end = min(file_size, chunk_start + chunk_size)
while not is_new_line(chunk_end):
chunk_end -= 1
if chunk_start == chunk_end:
chunk_end = next_line(chunk_end)
start_end.append(
(
file_name,
chunk_start,
chunk_end,
)
)
chunk_start = chunk_end
return (
cpu_count,
start_end,
)
def _process_file_chunk(
file_name: str,
chunk_start: int,
chunk_end: int,
) -> dict:
"""Process each file chunk in a different process"""
result = dict()
with open(file_name, encoding="utf-8", mode="rb") as f:
f.seek(chunk_start)
gc_disable()
for line in f:
chunk_start += len(line)
if chunk_start > chunk_end:
break
location, measurement = line.split(b";")
measurement = float(measurement)
_result = result.get(location)
if _result:
if measurement < _result[0]:
_result[0] = measurement
if measurement > _result[1]:
_result[1] = measurement
_result[2] += measurement
_result[3] += 1
else:
result[location] = [
measurement,
measurement,
measurement,
1,
] # min, max, sum, count
gc_enable()
return result
def process_file(
cpu_count: int,
start_end: list,
) -> dict:
"""Process data file"""
with mp.Pool(cpu_count) as p:
# Run chunks in parallel
chunk_results = p.starmap(
_process_file_chunk,
start_end,
)
# Combine all results from all chunks
result = dict()
for chunk_result in chunk_results:
for location, measurements in chunk_result.items():
_result = result.get(location)
if _result:
if measurements[0] < _result[0]:
_result[0] = measurements[0]
if measurements[1] > _result[1]:
_result[1] = measurements[1]
_result[2] += measurements[2]
_result[3] += measurements[3]
else:
result[location] = measurements
# Print final results
print("{", end="")
for location, measurements in sorted(result.items()):
print(
f"{location.decode('utf8')}={measurements[0]:.1f}/{(measurements[2] / measurements[3]) if measurements[3] != 0 else 0:.1f}/{measurements[1]:.1f}",
end=", ",
)
print("\b\b} ")
if __name__ == "__main__":
cpu_count, *start_end = get_file_chunks("measurements.txt")
process_file(cpu_count, start_end[0])