-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDVSorder.py
executable file
·275 lines (239 loc) · 10.1 KB
/
DVSorder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/env python3
# *_* coding: utf-8 *_*
"""This program illustrates and tests for the DVSorder vulnerability.
Given a CSV- or zipped-JSON-format CVR file, it attempts to unshuffle
each batch of ballots. It outputs an estimate of the number of
vulnerable ballots in the file.
"""
import os
import ntpath
import sys
import csv
import json
import zipfile
def icp_get_nth(n):
"""Returns the nth element from the ICP's PRNG sequence."""
x = 864803*n%1000000
d = [5,0,8,3,2,6,1,9,4,7]
return d[x//100%10]+d[x//1000%10]*10+d[x//10%10]*100+ \
d[x//100000]*1000+d[x%10]*10000+d[x//10000%10]*100000
def ice_get_nth(n):
"""Returns the nth element from the ICE's PRNG sequence."""
x = 864803*n%1000000
d = [5,0,8,3,2,6,1,9,4,7]
return d[x//10%10]+d[x//100000]*10+d[x%10]*100+ \
d[x//10000%10]*1000+d[x//100%10]*10000+d[x//1000%10]*100000
# Dicts that give the index of each record_id in the sequence:
icp_inverse = {icp_get_nth(i): i for i in range(1000000)}
ice_inverse = {ice_get_nth(i): i for i in range(1000000)}
class AttackFailed(Exception):
pass
def unshuffle(record_ids, scanner_model=None):
"""
Unshuffles a batch of ballots.
Parameters:
record_ids (list): the record_ids from a batch of ballots
scanner_model (string): "ImagecastPrecinct" or "ImagecastEvolution"
If None, attempts to guess the scanner model.
Returns a list of (index, record_id) tuples, ordered by index,
and the minimum number of ballots that are missing from the sequence.
"""
assert isinstance(record_ids, list)
assert len(record_ids) == len(set(record_ids))
if len(record_ids) == 0:
return [], []
ids = [r % 1000000 for r in record_ids]
def reduce_indices(indices):
"""
Given a list of possible record_id indices, shift as necessary
to account for wrapping around the modulus. Returns
the relative indices (with the lowest equal to zero).
"""
lowest, highest = min(indices), max(indices)
if lowest < 100 and highest > 999900:
indices = [(i + 500000) % 1000000 for i in indices]
lowest, highest = min(indices), max(indices)
return [(i - lowest) for i in indices]
def count_missing(indices):
"""
Given a list of relative indices, returns the minimum
number of ballots that are missing from the list. We use
this to test whether the unshuffling is plausible. If it
is not, the function returns 1000000.
"""
span = max(indices)-min(indices)+1
if span > len(indices)*10:
return 1000000
return span-len(indices)
if scanner_model == "ImagecastPrecinct":
indices = reduce_indices([icp_inverse[id] for id in ids])
missing = count_missing(indices)
elif scanner_model == "ImagecastEvolution":
indices = reduce_indices([ice_inverse[id] for id in ids])
missing = count_missing(indices)
elif scanner_model is None:
icp_indices = reduce_indices([icp_inverse[id] for id in ids])
icp_missing = count_missing(icp_indices)
ice_indices = reduce_indices([ice_inverse[id] for id in ids])
ice_missing = count_missing(ice_indices)
if icp_missing < ice_missing:
indices,missing = icp_indices,icp_missing
else:
indices,missing = ice_indices,ice_missing
else:
raise AttackFailed
if missing == 1000000:
raise AttackFailed
results = [(indices[i], ids[i]) for i in range(len(ids))]
results.sort(key=lambda t: t[0])
return results, missing
def read_csv_batches(input_filename):
"""
Generator that yields batches from a CSV-format CVR export.
Parameters:
input_filename(string): CSV-format CVR file to read
Yields a dict containing lists of record_ids for one or more batches.
"""
infile = open(input_filename, mode='r')
reader = csv.reader(infile)
def csv_int(string):
"""Extracts integers from various CSV quoting styles."""
if string[0] == '=' and string[1] == '"' and string[-1] == '"':
return int(string[2:-1])
return int(string)
try:
header_event = next(reader)
header_contest = next(reader)
header_choice = next(reader)
header_ballot = next(reader)
except StopIteration:
raise ValueError
print(f'event_name: {header_event[0]!r}, rtr_version: {header_event[1]!r}')
if "Tabulator" in header_ballot:
tab_n = header_ballot.index('Tabulator')
else:
tab_n = header_ballot.index('TabulatorNum')
if "Tabulator" in header_ballot:
bat_n = header_ballot.index('Batch')
else:
bat_n = header_ballot.index('BatchId')
if "Record" in header_ballot:
rec_n = header_ballot.index('Record')
else:
rec_n = header_ballot.index('RecordId')
assert tab_n == 1 and bat_n == 2 and rec_n == 3
batches = {}
for row in reader:
tab_id, bat_id, rec_id = csv_int(row[tab_n]), csv_int(row[bat_n]), csv_int(row[rec_n])
if (tab_id, bat_id, None) not in batches:
batches[(tab_id, bat_id, None)] = []
batches[(tab_id, bat_id, None)] += [rec_id]
yield batches
def read_json_zip_batches(input_filename):
"""
Generator that yields batches from a zipped JSON-format CVR export..
Parameters:
input_filename(string): Zipped JSON-format CVR data to read.
Yields a dict containing lists of record_ids for one or more batches.
"""
source_zip = zipfile.ZipFile(input_filename)
event_manifest = json.loads(source_zip.read('ElectionEventManifest.json'))
print(f'description: {event_manifest["List"][0]["Description"]}, version: {event_manifest["Version"]}')
tab_manifest = json.loads(source_zip.read('TabulatorManifest.json'))
tab_models = {
int(t['Id']): t['Type']
for t in tab_manifest['List']
}
n_vulnerable_tabs = len([k for k,m in tab_models.items()
if m in ['ImagecastPrecinct', 'ImagecastEvolution']])
print([m for k,m in tab_models.items()
if m in ['ImagecastPrecinct', 'ImagecastEvolution']])
print(f'{n_vulnerable_tabs} of {len(tab_manifest["List"])} tabulators are vulnerable models')
members = source_zip.namelist()
for i, name in enumerate(members):
batches = {}
print(f'reading file {i+1} of {len(members)}, {name}')
if name.startswith('CvrExport') and name.endswith('.json'):
obj = json.loads(source_zip.read(name))
for cvr in obj['Sessions']:
tab_id, bat_id, rec_id, model = cvr['TabulatorId'], cvr['BatchId'], cvr.get('RecordId','X'), tab_models[cvr['TabulatorId']]
if rec_id == 'X' and 'ImageMask' in cvr:
name = ntpath.basename(cvr['ImageMask'])
if name.endswith('*.*'):
name=name[:-3]
rec_id = int(name.split("_")[2])
if (tab_id, bat_id, model) not in batches:
batches[(tab_id, bat_id, model)] = []
if rec_id == 'X':
print('skipping sanitized record', file=sys.stderr)
continue
batches[(tab_id, bat_id, model)] += [rec_id]
yield batches
def read_image_zip_batches(input_filename):
"""
Generator that yields batches from a zipfile containing ballot images.
Parameters:
input_filename(string): Zip archive containing TIF ballot images.
Yields a dict containing lists of record_ids for one or more batches.
"""
with zipfile.ZipFile(input_filename) as zf:
names = zf.namelist()
batches = {}
for name in names:
base, ext = os.path.splitext(os.path.basename(name))
if ext == ".tif":
ids = base.split("_")
try:
tab_id, bat_id, rec_id = int(ids[0]), int(ids[1]), int(ids[2])
except:
print("skipping", name, file=sys.stderr)
continue
if (tab_id, bat_id, None) not in batches:
batches[(tab_id, bat_id, None)] = []
batches[(tab_id, bat_id, None)] += [rec_id]
yield batches
def multi_file_reader(filenames, use_images):
for filename in filenames:
batch_reader = None
if filename.endswith(".csv"):
if use_images:
raise Exception("Can't read images from a CSV file")
batch_reader = read_csv_batches
elif filename.endswith(".zip"):
if use_images:
batch_reader = read_image_zip_batches
else:
batch_reader = read_json_zip_batches
else:
raise Exception("Doesn't look like a .csv or .zip file")
for result in batch_reader(filename):
yield result
def process_files(filenames, use_images=False, show_unshuffled=False):
"""
Processes ballots from a specified file.
If show_unshuffled is True, prints the unshuffled record_ids
from each vulnerable batch.
"""
count_ballots = 0
count_vulnerable = 0
for batch_dict in multi_file_reader(filenames, use_images):
for (tab_id, bat_id, model), batch in batch_dict.items():
count_ballots += len(batch)
try:
results,fit = unshuffle(batch, scanner_model=model)
except AttackFailed:
print(f"tabulator {tab_id} batch {bat_id} appears safe ({len(batch)} ballots)")
continue
print(f"tabulator {tab_id} batch {bat_id} appears vulnerable ({len(batch)} ballots, missing {fit})")
if show_unshuffled:
print("unshuffled ballots:",results)
count_vulnerable += len(batch)
print(f"approximately {count_vulnerable} of {count_ballots} ballots ({int(100*count_vulnerable/count_ballots)}%) appear to be vulnerable")
if __name__ == "__main__":
if len(sys.argv) >= 3 and sys.argv[1] == "--cvrs":
process_files(sys.argv[2:], use_images=False)
elif len(sys.argv) >= 3 and sys.argv[1] == "--images":
process_files(sys.argv[2:], use_images=True)
else:
print(f"Usage: {sys.argv[0]} --images|cvrs FILE [FILE ...]")
sys.exit(1)