-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathmetadata_package_validator.py
432 lines (371 loc) · 22.5 KB
/
metadata_package_validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
import argparse
import logging
import json
import myutils
import sys
import collections
import re
def main():
num_error = 0 # This variable is used for counting the number of errors (no warnings) detected by the validator
my_parser = argparse.ArgumentParser(description='Metadata package validator')
my_parser.add_argument('-f', '--file', action="store", dest="input_filename", type=str, help='input filename')
args = my_parser.parse_args()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# create file handler which logs even debug messages
fh = logging.FileHandler('package_metadata_validator.log', encoding="utf-8")
# create console handler which logs even debug messages
ch = logging.StreamHandler()
# create formatter and add it to the handlers
formatter = logging.Formatter('* %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('-------------------------------------Starting validation-------------------------------------')
try:
open(args.input_filename)
except IOError:
print("Please provide a valid filename")
exit(-1)
else:
with open(args.input_filename, mode='r', encoding="utf-8") as json_file:
package = json.load(json_file)
# -------------------------------------
# Validation for options
o_mq_2 = {}
if "options" not in package:
package["options"] = []
for option in package["options"]:
# Group options by optionSet (for O-MQ-2)
optionSet = option["optionSet"]["id"]
if optionSet in o_mq_2:
o_mq_2[optionSet].append(option["sortOrder"])
else:
o_mq_2[optionSet] = list()
o_mq_2[optionSet].append(option["sortOrder"])
# O-MQ-2: Expected sortOrder for options of an optionSet (starts at 1 and ends at the size of the list of options)
for optionSet_uid, sortOrders in o_mq_2.items():
sortOrders.sort() # Order array of sortOrders
optionSet_size = len(sortOrders)
if (sortOrders[0] == 1) and (sortOrders[optionSet_size - 1] == optionSet_size):
pass # Everything is OK
else:
optionSet_name = myutils.get_name_by_type_and_uid(package=package, resource_type="optionSets", uid=optionSet_uid)
message = "O-MQ-2 - The optionSet '" + optionSet_name + "' (" + optionSet_uid + ") has errors in the sortOrder. Current sortOrder: "+", ".join([str(i) for i in sortOrders])
logging.error(message)
num_error += 1
# -------------------------------------
# OG-MQ-1. All options in optionGroups must belong to an optionSet
if "optionGroups" not in package:
package["optionGroups"] = []
option_uids_in_option_groups = myutils.json_extract_nested_ids(package["optionGroups"], "options")
if "optionSets" not in package:
package["optionSets"] = []
option_uids_in_optionset = myutils.json_extract_nested_ids(package["optionSets"], "options")
for option_uid in option_uids_in_option_groups:
if option_uid not in option_uids_in_optionset:
logger.error(f"OG-MQ-1 - Option in OptionGroup but not in OptionSet. Option '{myutils.get_name_by_type_and_uid(package, 'options', option_uid)}' ({option_uid})")
# -------------------------------------
def check_external(k, v):
if k == "externalAccess" and v is True:
logger.error("SHST-MQ-1 - There is a resource with external access. Suggestion: use grep command for finding '\"externalAccess\": true'")
myutils.iterate_complex(package, check_external)
def check_favorites(k, v):
if k == "favorites" and v:
logger.error("ALL-MQ-16. There is a reference to user ("+','.join(v)+") that saved the resource as favourite. Suggestion: use grep command for finding")
myutils.iterate_complex(package, check_favorites)
# -------------------------------------
# Translations
# -------------------------------------
def check_translations(k, v):
if k == "translations":
trans_duplicate = list()
for translation in v:
if "locale" not in translation:
logger.error(f"ALL-MQ-21: Unexpected translation. Missing locale in translation. Double check translation {translation}")
else:
trans_duplicate.append(translation["locale"] + "|" + translation["property"])
if not all(x.isalpha() or x in ["-", "_"] for x in translation["locale"]):
logger.error(f"ALL-MQ-21: Unexpected translation. Unexpected symbol in locale. Translation {translation}")
if len(trans_duplicate) != len(set(trans_duplicate)):
duplicates = set([x for x in trans_duplicate if trans_duplicate.count(x) > 1])
for dup in duplicates:
translation_values = list()
for translation in v:
if "locale"in translation and translation["locale"] == dup.split('|')[0] and translation["property"] == dup.split('|')[1]:
translation_values.append(translation["value"])
logger.error(f"ALL-MQ-19. Translation duplicated. Translation property={dup.split('|')[1]} locale={dup.split('|')[0]} values={translation_values}")
for resource_type, resource_list in package.items():
if resource_type in ["package", "system"]: # "package" and "system" are dictionaries, not lists.
continue
for resource in resource_list:
# Review translations of the package that are placed under the 2 hierarchy level (not directly under package).
for k, v in resource.items():
if k != "translations":
myutils.iterate_complex(v, check_translations)
if "translations" in resource:
trans_duplicate = list()
for translation in resource["translations"]:
if "locale" not in translation:
logger.error(f"ALL-MQ-21: Unexpected translation. Missing locale in translation. Resource {resource_type} with UID {resource['id']}. Translation {translation}")
num_error += 1
else:
# for locale-property duplicates
trans_duplicate.append(translation["locale"] + "|" + translation["property"])
if not all(x.isalpha() or x in ["-", "_"] for x in translation["locale"]):
logger.error(f"ALL-MQ-21: Unexpected translation. Unexpected symbol in locale. Resource {resource_type} with UID {resource['id']}. Translation {translation}")
num_error += 1
if len(trans_duplicate) != len(set(trans_duplicate)):
duplicates = set([x for x in trans_duplicate if trans_duplicate.count(x) > 1])
for dup in duplicates:
logger.error(f"ALL-MQ-19. Translation duplicated. Resource {resource_type} with UID {resource['id']}. Translation property='{dup.split('|')[1]}' locale='{dup.split('|')[0]}'")
num_error += 1
# -------------------------------------
# Program Rules
if "programRules" not in package:
package["programRules"] = []
for pr in package["programRules"]:
# PR-ST-3: Program Rule without action
if len(pr["programRuleActions"]) == 0:
logger.error(f"PR-ST-3 Program Rule '{pr['name']}' ({pr['id']}) without Program Rule Action")
num_error += 1
# PRV-MQ-1 More than one PRV with the same name for the same program
if "programRuleVariables" not in package:
package["programRuleVariables"] = []
# Get the list of programs that has at least one programRuleVariable
prv_programs = set([prv["program"]["id"] for prv in package["programRuleVariables"]])
for program in prv_programs:
prv_names = [prv["name"] for prv in package["programRuleVariables"] if prv["program"]["id"] == program]
if len(prv_names) != len(set(prv_names)):
logger.error(f"PRV-MQ-1 - In program '{myutils.get_name_by_type_and_uid(package, 'programs', program)}' ({program}), more than one PRV with the same name: {([item for item, count in collections.Counter(prv_names).items() if count > 1])}")
num_error += 1
forbidden = ["and", "or", "not"] # (dhis version >= 2.34)
for prv in package["programRuleVariables"]:
if any([" "+substring+" " in prv["name"] for substring in forbidden]) or \
any([prv["name"].startswith(substring+" ") for substring in forbidden]) or \
any([prv["name"].endswith(" "+substring) for substring in forbidden]):
message = f"PRV-MQ-2: The PRV '{prv['name']}' ({prv['id']}) contains 'and/or/not'"
logger.error(message)
if not bool(re.match("^[a-zA-Z\d_\-\.\ ]+$", prv["name"])):
message = f"PRV-MQ-2: The PRV '{prv['name']}' ({prv['id']}) contains unexpected characters"
logger.error(message)
# PR-ST-4: Data element associated to a program rule action MUST belong to the program that the program rule is associated to.
de_in_program = []
if "programStages" not in package:
package["programStages"] = []
for ps in package["programStages"]:
for psde in ps["programStageDataElements"]:
de_in_program.append(psde["dataElement"]["id"])
if "programRuleActions" not in package:
package["programRuleActions"] = []
for pra in package["programRuleActions"]:
if "dataElement" in pra and pra["dataElement"]["id"] not in de_in_program:
pr_uid = pra['programRule']['id']
pr_name = myutils.get_name_by_type_and_uid(package, 'programRules', pr_uid)
de_uid = pra['dataElement']['id']
de_name = myutils.get_name_by_type_and_uid(package, 'dataElements', de_uid)
logging.error(f"PR-ST-4 Program Rule '{pr_name}' ({pr_uid}) in the PR Action uses a DE '{de_name}' ({de_uid}) that does not belong to the associated program.")
# PR-ST-5: Tracked Entity Attribute associated to a program rule action MUST belong to the program/TET that the program rule is associated to.
teas_program = []
if "programs" in package:
for program in package["programs"]:
program_id = program["id"]
teas = program["programTrackedEntityAttributes"]
if "trackedEntityType" in program:
trackedEntityType_uid = program["trackedEntityType"]["id"]
for tet in package["trackedEntityTypes"]:
if tet["id"] == trackedEntityType_uid:
teas = teas + tet["trackedEntityTypeAttributes"]
for tea in teas:
teas_program.append(tea["trackedEntityAttribute"]["id"])
pr_in_this_program = []
for pr in package["programRules"]:
if pr["program"]["id"] == program_id:
pr_in_this_program.append(pr["id"])
for pra in package["programRuleActions"]:
if pra["programRule"]["id"] not in pr_in_this_program:
continue
if "trackedEntityAttribute" in pra and pra["trackedEntityAttribute"]["id"] not in teas_program:
pr_uid = pra['programRule']['id']
pr_name = myutils.get_name_by_type_and_uid(package, 'programRules', pr_uid)
tea_uid = pra['trackedEntityAttribute']['id']
tea_name = myutils.get_name_by_type_and_uid(package, 'trackedEntityAttributes', tea_uid)
logging.error(f"PR-ST-5 Program Rule '{pr_name}' ({pr_uid}) in the PR Action uses a TEA '{tea_name}' ({tea_uid}) that does not belong to the associated program.")
# -------------------------------------
# code
PATTERN_OPTION_CODE = re.compile("^([0-9A-Z_\|\-\.])+$")
PATTERN_CODE = re.compile("^([0-9A-Z_])+$")
resources_with_code = ['dashboards', 'dataSets', 'programs', 'indicatorGroups', 'dataElementGroups', 'predictorGroups', 'validationRuleGroups', 'userGroups', 'options']
for resource_type in resources_with_code:
if resource_type not in package:
continue
for resource in package[resource_type]:
if "code" not in resource:
message = f"ALL-MQ-17- Missed code field in {resource_type} (name='{resource['name']}' uid={resource['id']})"
logger.warning(message)
else:
# ALL-MQ-18: Codes MUST be upper case ASCII (alphabetic A-Z), and the symbols '_' (underscore),'-' (hyphen),'.' (dot),'|' (Bar o Pipe)
if "\t" in resource["code"]:
message = f"ALL-MQ-18- Tab character in code='{resource['code']}' (resource type='{resource_type}' name='{resource['name']}' uid={resource['id']})"
logger.error(message)
resource["code"] = resource["code"].replace("\t", "")
num_error += 1
if resource_type == "options":
if not PATTERN_OPTION_CODE.search(resource["code"]):
message = f"ALL-MQ-18- Invalid code='{resource['code']}' (resource type='{resource_type}' name='{resource['name']}' uid={resource['id']})"
logger.error(message)
num_error += 1
else:
if not PATTERN_CODE.search(resource["code"]):
message = f"ALL-MQ-18- Invalid code='{resource['code']}' (resource type='{resource_type}' name='{resource['name']}' uid={resource['id']})"
logger.error(message)
num_error += 1
# -------------------------------------
# DE-MQ-2: The name/shortName SHOULD not contains "Number of" or "number of"
if "dataElements" not in package:
package["dataElements"] = []
for de in package["dataElements"]:
keys_to_validate_de_mq_1 = ["name", "shortName"]
for n in keys_to_validate_de_mq_1:
if n in de and "NUMBER OF" in de[n].upper():
logger.warning(f"DE-MQ-2 - DataElement contains the words 'number of' ({de['id']}) {n}='{de[n]}'")
# -------------------------------------
# Indicators Indicators
if "indicators" not in package:
package["indicators"] = []
for indicator in package["indicators"]:
keys_to_validate_pi_mq_2 = ["name", "shortName"]
for n in keys_to_validate_pi_mq_2:
# I-MQ-3. Indicators should not contain "proportion" or "percentage" in the name, shortName
if n in indicator and any(t in indicator[n].upper() for t in ["PROPORTION", "PERCENTAGE"]):
logger.warning(
"I-MQ-3 - Indicator contains the word 'proportion' or 'percentage'. Resource Indicator with UID " + indicator['id'] + ". " + n + "='" + indicator[n] + "'")
# Program Indicators
if "programIndicators" not in package:
package["programIndicators"] = []
for pi in package["programIndicators"]:
keys_to_validate_pi_mq_2 = ["name", "shortName"]
for n in keys_to_validate_pi_mq_2:
# PI-MQ-3. Program Indicators should not contain "proportion" or "percentage" in the name, shortName
if n in pi and any(t in pi[n].upper() for t in ["PROPORTION", "PERCENTAGE"]):
logger.warning(
"PI-MQ-3 - Program Indicator contains the word 'proportion' or 'percentage'. Resource Program Indicator with UID " + pi['id'] + ". " + n + "='" + pi[n] + "'")
# -------------------------------------
# Program Indicators
if "programIndicators" not in package:
package["programIndicators"] = []
for pi in package["programIndicators"]:
keys_to_validate = ["filter", "expression"]
for n in keys_to_validate:
if n in pi and "program_stage_name" in pi[n]:
logger.error(f"ALL-MQ-20 From program '{myutils.get_name_by_type_and_uid(package, 'programs', pi['program']['id'])}' ({pi['program']['id']}), the PI '{pi['name']}' ({pi['id']}) contains 'program_stage_name' in the {n}.")
num_error += 1
# Program Rules
if "programRules" not in package:
package["programRules"] = []
for pr in package["programRules"]:
keys_to_validate = ["condition"]
for n in keys_to_validate:
if n in pr and "program_stage_name" in pr[n]:
logger.error(f"ALL-MQ-20 From program '{myutils.get_name_by_type_and_uid(package, 'programs', pr['program']['id'])}' ({pr['program']['id']}), the PR '{pr['name']}' ({pr['id']}) contains 'program_stage_name' in the {n}.")
num_error += 1
# Program Rule Actions
if "programRuleActions" not in package:
package["programRuleActions"] = []
for pra in package["programRuleActions"]:
keys_to_validate = ["data"]
for n in keys_to_validate:
if n in pra and "program_stage_name" in pra[n]:
pr_uid = pra["programRule"]["id"]
program_uid = myutils.get_program_referenced_by_type_and_uid(package, "programRules", pr_uid)
logger.error(f"ALL-MQ-20 From program '{myutils.get_name_by_type_and_uid(package, 'programs', program_uid)}' ({program_uid}), the PR '{myutils.get_name_by_type_and_uid(package, 'programRules', pr_uid)}' ({pr_uid}) contains 'program_stage_name' in a PRAction ({pra['id']}).")
num_error += 1
# -------------------------------------
# Custom forms in programStages
if "programStages" not in package:
package["programStages"] = []
if "dataEntryForms" not in package:
package["dataEntryForms"] = []
for ps in package["programStages"]:
program_uid = ps["program"]["id"]
program_name = myutils.get_name_by_type_and_uid(package, "programs", program_uid)
if "dataEntryForm" in ps:
dataEntryForm_uid = ps["dataEntryForm"]["id"]
if myutils.is_field_in_resource(package, "dataEntryForms", dataEntryForm_uid, "htmlCode"):
message = f"PS-MQ-1 In program '{program_name}' ({program_uid}), the program stage '{ps['name']}' ({ps['id']}) has a custom form"
logger.warning(message)
else:
message = f"PS-MQ-2 In program '{program_name}' ({program_uid}), the program stage '{ps['name']}' ({ps['id']}) has an empty custom form"
logger.error(message)
num_error += 1
# Custom forms in programs
if "programs" not in package:
package["programs"] = []
for p in package["programs"]:
program_uid = p["id"]
program_name = p["name"]
if "dataEntryForm" in p:
dataEntryForm_uid = p["dataEntryForm"]["id"]
if myutils.is_field_in_resource(package, "dataEntryForms", dataEntryForm_uid, "htmlCode"):
message = f"P-MQ-1 The program '{program_name}' ({program_uid}) has a custom form"
logger.warning(message)
else:
message = f"P-MQ-2 The program '{program_name}' ({program_uid}) has an empty custom form"
logger.error(message)
num_error += 1
# Custom forms in dataSets
if "dataSets" not in package:
package["dataSets"] = []
for ds in package["dataSets"]:
dataSet_uid = ds["id"]
dataSet_name = ds["name"]
if "dataEntryForm" in ds:
dataEntryForm_uid = ds["dataEntryForm"]["id"]
if myutils.is_field_in_resource(package, "dataEntryForms", dataEntryForm_uid, "htmlCode"):
message = f"DS-MQ-1 The dataSet '{dataSet_name}' ({dataSet_uid}) has a custom form"
logger.warning(message)
else:
message = f"DS-MQ-2 The dataSet '{dataSet_name}' ({dataSet_uid}) has an empty custom form"
logger.error(message)
num_error += 1
# Review only Data Element, Indicator, Program Indicator, Categories, Category Options, Category Combos, Maps, Visualizations
resources_to_review_naming = ["dataElements", "indicators", "programIndicators", "categories", "categoryOptions", "categoryCombos", "maps", "visualizations"]
for resource_type in resources_to_review_naming:
if resource_type not in package:
continue
for resource in package[resource_type]:
keys_to_validate = ["name", "shortName"]
for n in keys_to_validate:
if n not in resource:
continue
# ALL-MQ-9 validation. Name and shortName SHOULD NOT contain >,<, ≥, ≤.
if any(ch in resource[n] for ch in ('>', '<', '≤', '≥')):
logger.warning(f"ALL-MQ-9 {resource_type} ({resource['id']}) contains any of this characters '>', '<', '≤', '≥' in {n}: '{resource[n]}'")
# ALL-MQ-10 validation. Name and shortName SHOULD NOT contain the pattern "digit - digit"
pattern = r"\d - \d"
result = sum(1 for _ in re.finditer(pattern, resource[n]))
if result:
logger.warning(f"ALL-MQ-10 {resource_type} ({resource['id']}) contains the expression 'digit(0-9) - digit(0-9) in {n}: '{resource[n]}'")
# Check existence of a description
# Review only: programs, dataSets, dataElements, trackedEntityAttributes, trackedEntityTypes, indicators, programIndicators, validationRules, predictors, programRules, visualizations (event chart, event report, map, data visualizer), dashboards
resources_to_review_description = ["programs", "dataSets", "dataElements", "trackedEntityAttributes", "trackedEntityTypes", "indicators", "programIndicators", "validationRules", "predictors", "programRules", "visualizations", "dashboards"]
for resource_type in resources_to_review_description:
if resource_type not in package:
continue
for resource in package[resource_type]:
if "description" not in resource:
logger.warning(f"ALL-MQ-8 No description in {resource_type} ({resource['id']})")
logger.info('-------------------------------------Finished validation-------------------------------------')
# See https://stackoverflow.com/questions/15435652/python-does-not-release-filehandles-to-logfile
handlers = logger.handlers[:]
for handler in handlers:
handler.close()
logger.removeHandler(handler)
return num_error
if __name__ == '__main__':
num_error = main()
# if the number of errors > 0, exit with code -1
if num_error:
sys.exit(-1)