Skip to content

Commit

Permalink
Stable candidate v2.2.0 (#62)
Browse files Browse the repository at this point in the history
* Improve output, save validation results to file, add more date validations (#61)

* validation exception on failure

* add more date validations

* add progress bar req

* tidier stdout, save big errors to file

* add date error checking

* update tests

* add info method, formatting

* always save validation results

* fix validation output

* add tqdm to dependencies

* remove errors from map json

* add new output file to readme

* put back errors and warnings

* fix bugs, change to validation in json

* update readme

* update validation results comments

* add missing test error

* update files for new release
  • Loading branch information
mshadbolt authored Apr 9, 2024
1 parent 188f1e9 commit aa9967d
Show file tree
Hide file tree
Showing 15 changed files with 206 additions and 73 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ __pycache__/*
_local
.idea
.~lock*
build/
build/
tests/raw_data_*
9 changes: 1 addition & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,6 @@ A summarised example of the output is below:
"openapi_url": "https://raw.githubusercontent.com/CanDIG/katsu/develop/chord_metadata_service/mohpackets/docs/schema.yml",
"katsu_sha": < git sha of the katsu version used for the schema >,
"donors": < An array of JSON objects, each one representing a DonorWithClinicalData in katsu >,
"validation_warnings": [
< any validation warnings, e.g. >
"DONOR_5: cause_of_death required if is_deceased = Yes"
],
"validation_errors": [
< any validation errors, e.g. >
"DONOR_5 > PD_5 > TR_5 > Radiation 1: Only one radiation is allowed per treatment"
],
"statistics": {
"required_but_missing": {
< for each schema in the model, a list of required fields and how many cases are missing this value (out of the total number of occurrences) >
Expand All @@ -196,6 +188,7 @@ A summarised example of the output is below:
}
}
```
`<INPUT_DIR>_validation_results.json` contains all validation warnings and errors.

`<INPUT_DIR>_indexed.json` contains information about how the ETL is looking up the mappings and can be useful for debugging. It is only generated if the `--index` argument is specified when CSVConvert is run. Note: This file can be very large if the input data is large.

Expand Down
Binary file removed dist/clinical_ETL-2.0.0-py3-none-any.whl
Binary file not shown.
Binary file added dist/clinical_ETL-2.2.0-py3-none-any.whl
Binary file not shown.
Binary file added dist/clinical_ETL-2.2.0.tar.gz
Binary file not shown.
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = ["setuptools >= 61.0"]
build-backend = "setuptools.build_meta"

[project]
version = "2.0.0"
version = "2.2.0"
name = "clinical_ETL"
dependencies = [
"pandas>=2.1.0",
Expand All @@ -14,7 +14,8 @@ dependencies = [
"requests>=2.29.0",
"jsonschema~=4.19.2",
"openapi-spec-validator>=0.7.1",
"pdoc3>=0.10.0"
"pdoc3>=0.10.0",
"tqdm>=4.66.2"
]
requires-python = ">= 3.10"
description = "ETL module for transforming clinical CSV data into properly-formatted packets for ingest into Katsu"
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ requests~=2.29
jsonschema~=4.19.1
openapi-spec-validator~=0.7.1
pdoc3>=0.10.0
tqdm>=4.66.2
12 changes: 3 additions & 9 deletions src/clinical_ETL.egg-info/PKG-INFO
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: clinical_ETL
Version: 2.0.0
Version: 2.2.0
Summary: ETL module for transforming clinical CSV data into properly-formatted packets for ingest into Katsu
Project-URL: Repository, https://github.com/CanDIG/clinical_ETL_code
Requires-Python: >=3.10
Expand All @@ -15,6 +15,7 @@ Requires-Dist: requests>=2.29.0
Requires-Dist: jsonschema~=4.19.2
Requires-Dist: openapi-spec-validator>=0.7.1
Requires-Dist: pdoc3>=0.10.0
Requires-Dist: tqdm>=4.66.2

# clinical_ETL_code

Expand Down Expand Up @@ -180,14 +181,6 @@ A summarised example of the output is below:
"openapi_url": "https://raw.githubusercontent.com/CanDIG/katsu/develop/chord_metadata_service/mohpackets/docs/schema.yml",
"katsu_sha": < git sha of the katsu version used for the schema >,
"donors": < An array of JSON objects, each one representing a DonorWithClinicalData in katsu >,
"validation_warnings": [
< any validation warnings, e.g. >
"DONOR_5: cause_of_death required if is_deceased = Yes"
],
"validation_errors": [
< any validation errors, e.g. >
"DONOR_5 > PD_5 > TR_5 > Radiation 1: Only one radiation is allowed per treatment"
],
"statistics": {
"required_but_missing": {
< for each schema in the model, a list of required fields and how many cases are missing this value (out of the total number of occurrences) >
Expand All @@ -214,6 +207,7 @@ A summarised example of the output is below:
}
}
```
`<INPUT_DIR>_validation_results.json` contains all validation warnings and errors.

`<INPUT_DIR>_indexed.json` contains information about how the ETL is looking up the mappings and can be useful for debugging. It is only generated if the `--index` argument is specified when CSVConvert is run. Note: This file can be very large if the input data is large.

Expand Down
1 change: 1 addition & 0 deletions src/clinical_ETL.egg-info/requires.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ requests>=2.29.0
jsonschema~=4.19.2
openapi-spec-validator>=0.7.1
pdoc3>=0.10.0
tqdm>=4.66.2
106 changes: 74 additions & 32 deletions src/clinical_etl/CSVConvert.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
import re
import yaml
import argparse
from tqdm import tqdm
from clinical_etl import mappings
# Include clinical_etl parent directory in the module search path.
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
from clinical_etl import mappings


def verbose_print(message):
Expand All @@ -38,6 +39,18 @@ def parse_args():
return args


class Bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'


def map_data_to_scaffold(node, line, rownum):
"""
Given a particular individual's data, and a node in the schema, return the node with mapped data. Recursive.
Expand Down Expand Up @@ -311,14 +324,14 @@ def ingest_raw_data(input_path):
return raw_csv_dfs, output_file


def process_data(raw_csv_dfs):
def process_data(raw_csv_dfs, verbose):
"""Takes a set of raw dataframes with a common identifier and merges into a JSON data structure."""
final_merged = {}
cols_index = {}
individuals = []

print(f"\n{Bcolors.OKBLUE}Processing sheets: {Bcolors.ENDC}")
for page in raw_csv_dfs.keys():
print(f"Processing sheet {page}...")
print(f"{Bcolors.OKBLUE}{page} {Bcolors.ENDC}", end="")
df = raw_csv_dfs[page].dropna(axis='index', how='all') \
.dropna(axis='columns', how='all') \
.map(str) \
Expand Down Expand Up @@ -359,8 +372,8 @@ def process_data(raw_csv_dfs):
if val == 'nan':
val = None
merged_dict[i][k.strip()].append(val)
if len(row_to_merge) > 0:
mappings._warn(f"Duplicate row for {merged_dict[i][mappings.IDENTIFIER_FIELD][0]} in {page}")
if len(row_to_merge) > 0 and verbose:
mappings._info(f"Duplicate row for {merged_dict[i][mappings.IDENTIFIER_FIELD][0]} in {page}")

# Now we can clean up the dicts: index them by identifier instead of int
indexed_merged_dict = {}
Expand Down Expand Up @@ -548,9 +561,9 @@ def check_for_sheet_inconsistencies(template_sheets, csv_sheets):
csv_template_diff = csv_sheets.difference(template_sheets)
if len(template_csv_diff) > 0:
# Print a warning if verbose enabled, it is possible that the template sheet has more than is required
print("WARNING: The following csv/sheet names are in the mapping template but were not found in the input sheets"
"/csvs:" + nl + nl.join(template_csv_diff) + nl +
"If this is an error please correct it as it may result in errors with mapping your data." + nl)
print(nl + f"{Bcolors.WARNING}WARNING: The following csv/sheet names are in the mapping template but were not found in the input sheets/csvs:{Bcolors.ENDC}"
+ nl + nl.join(template_csv_diff) + nl +
f"{Bcolors.WARNING}If this is an error please correct it as it may result in errors with mapping your data.{Bcolors.ENDC}")
if len(csv_template_diff) > 0:
# Exit here because if we can't find a mapping for a field we can't properly map the inputs
sys.exit("The following sheet names are in the input csvs but not found in the mapping template:" + nl +
Expand Down Expand Up @@ -624,7 +637,7 @@ def load_manifest(manifest_file):
def csv_convert(input_path, manifest_file, minify=False, index_output=False, verbose=False):
mappings.VERBOSE = verbose
# read manifest data
print("Starting conversion")
print(f"{Bcolors.OKGREEN}Starting conversion...{Bcolors.ENDC}", end="")
manifest = load_manifest(manifest_file)
mappings.IDENTIFIER_FIELD = manifest["identifier"]
if mappings.IDENTIFIER_FIELD is None:
Expand All @@ -633,7 +646,7 @@ def csv_convert(input_path, manifest_file, minify=False, index_output=False, ver

# read the schema (from the url specified in the manifest) and generate
# a scaffold
print("Loading schema")
print(f"{Bcolors.OKGREEN}loading schema...{Bcolors.ENDC}", end="")
schema = manifest["schema"]
if schema.json_schema is None:
sys.exit(f"Could not read an openapi schema at {manifest['schema']};\n"
Expand All @@ -644,15 +657,15 @@ def csv_convert(input_path, manifest_file, minify=False, index_output=False, ver
template_lines = read_mapping_template(manifest["mapping"])

# read the raw data
print("Reading raw data")
print(f"{Bcolors.OKGREEN}reading raw data...{Bcolors.ENDC}", end="")
raw_csv_dfs, mappings.OUTPUT_FILE = ingest_raw_data(input_path)
if not raw_csv_dfs:
sys.exit(f"No ingestable files (csv or xlsx) were found at {input_path}. Check path and try again.")
check_for_sheet_inconsistencies(set([re.findall(r"\(([\w\" ]+)", x)[0].replace('"',"") for x in template_lines]),
set(raw_csv_dfs.keys()))

print("Indexing data")
mappings.INDEXED_DATA = process_data(raw_csv_dfs)
print(f"{Bcolors.OKGREEN}indexing data{Bcolors.ENDC}")
mappings.INDEXED_DATA = process_data(raw_csv_dfs, verbose)
if index_output:
with open(f"{mappings.OUTPUT_FILE}_indexed.json", 'w') as f:
if minify:
Expand All @@ -661,10 +674,11 @@ def csv_convert(input_path, manifest_file, minify=False, index_output=False, ver
json.dump(mappings.INDEXED_DATA, f, indent=4)

# if verbose flag is set, warn if column name is present in multiple sheets:
for col in mappings.INDEXED_DATA["columns"]:
if col != mappings.IDENTIFIER_FIELD and len(mappings.INDEXED_DATA["columns"][col]) > 1:
mappings._warn(
f"Column name {col} present in multiple sheets: {', '.join(mappings.INDEXED_DATA['columns'][col])}")
if verbose:
for col in mappings.INDEXED_DATA["columns"]:
if col != mappings.IDENTIFIER_FIELD and len(mappings.INDEXED_DATA["columns"][col]) > 1:
mappings._info(
f"Column name {col} present in multiple sheets: {', '.join(mappings.INDEXED_DATA['columns'][col])}")

# warn if any template lines map the same column to multiple lines:
scan_template_for_duplicate_mappings(template_lines)
Expand All @@ -676,8 +690,11 @@ def csv_convert(input_path, manifest_file, minify=False, index_output=False, ver

packets = []
# for each identifier's row, make a packet
for indiv in mappings.INDEXED_DATA["individuals"]:
print(f"Creating packet for {indiv}")
print(f"\n{Bcolors.OKGREEN}Creating packets: {Bcolors.ENDC}")
progress = tqdm(mappings.INDEXED_DATA["individuals"])
for indiv in progress:
progress.set_postfix_str(indiv)
# print(f"{Bcolors.OKGREEN}{indiv} {Bcolors.ENDC}", end="\r")
mappings.IDENTIFIER = indiv

# If there is a reference_date in the manifest, we need to calculate that and add CALCULATED.REFERENCE_DATE to the INDEXED_DATA
Expand Down Expand Up @@ -715,39 +732,64 @@ def csv_convert(input_path, manifest_file, minify=False, index_output=False, ver
}
if schema.katsu_sha is not None:
result["katsu_sha"] = schema.katsu_sha
print(f"{Bcolors.OKGREEN}Saving packets to file.{Bcolors.ENDC}")
with open(f"{mappings.OUTPUT_FILE}_map.json", 'w') as f: # write to json file for ingestion
if minify:
json.dump(result, f)
else:
json.dump(result, f, indent=4)

# add validation data:
print(f"\n{Bcolors.OKGREEN}Starting validation...{Bcolors.ENDC}")
schema.validate_ingest_map(result)
result["validation_errors"] = schema.validation_errors
result["validation_warnings"] = schema.validation_warnings
validation_results = {"validation_errors": schema.validation_errors,
"validation_warnings": schema.validation_warnings}
result["statistics"] = schema.statistics
with open(f"{mappings.OUTPUT_FILE}_map.json", 'w') as f: # write to json file for ingestion
if minify:
json.dump(result, f)
else:
json.dump(result, f, indent=4)
errors_present = False
with open(f"{input_path}_validation_results.json", 'w') as f:
json.dump(validation_results, f, indent=4)
print(f"Warnings written to {input_path}_validation_results.json.")
if len(validation_results["validation_warnings"]) > 0:
if len(validation_results["validation_warnings"]) > 20:
print(f"\n{Bcolors.WARNING}WARNING: There are {len(validation_results['validation_warnings'])} validation "
f"warnings in your data. It can be ingested but will not be considered complete until the warnings in "
f"{input_path}_validation_results.json are fixed.{Bcolors.ENDC}")
else:
print(f"\n{Bcolors.WARNING}WARNING: Your data is missing required fields from the MoHCCN data model! It can"
f" be ingested but will not be considered complete until the following problems are fixed:{Bcolors.ENDC}")
print("\n".join(validation_results["validation_warnings"]))

if len(validation_results["validation_errors"]) > 0:
if len(validation_results["validation_errors"]) > 20:
print(f"\n{Bcolors.FAIL}FAILURE: Your data has failed validation. There are "
f"{len(validation_results['validation_errors'])} validation errors in your data, it cannot be "
f"ingested until the errors in {input_path}_validation_results.json are corrected. {Bcolors.ENDC}")
else:
print(f"\n{Bcolors.FAIL}FAILURE: Your data has failed validation against the MoHCCN data model! It cannot "
f"be ingested until the following errors are fixed:{Bcolors.ENDC}")
print("\n".join(validation_results["validation_errors"]))

if len(result["validation_warnings"]) > 0:
print(
"\n\nWARNING: Your data is missing required data for the MoHCCN data model! The following problems were found:")
print("\n".join(result["validation_warnings"]))
if len(result["validation_errors"]) > 0:
print("\n\nWARNING: Your data is not valid for the MoHCCN data model! The following errors were found:")
print("\n".join(result["validation_errors"]))

return packets
errors_present = True
return packets, errors_present


def main():
args = parse_args()
input_path = args.input
manifest_file = args.manifest
csv_convert(input_path, manifest_file, minify=args.minify, index_output=args.index, verbose=args.verbose)
packets, errors = csv_convert(input_path, manifest_file, minify=args.minify, index_output=args.index,
verbose=args.verbose)
print(f"{Bcolors.OKGREEN}\nConverted file written to {mappings.OUTPUT_FILE}_map.json{Bcolors.ENDC}")
if errors:
print(f"{Bcolors.WARNING}WARNING: this file cannot be ingested until all errors are fixed.{Bcolors.ENDC}")
else:
print(f"{Bcolors.OKGREEN}INFO: this file can be ingested.{Bcolors.ENDC}")
sys.exit(0)


if __name__ == '__main__':
Expand Down
11 changes: 11 additions & 0 deletions src/clinical_etl/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,17 @@ def _warn(message, input_values=None):
print(f"WARNING: {message}. Input data: {input_values}")


def _info(message, input_values=None):
"""Provides information to a user when there may be an issue, along with the IDENTIFIER and FIELD."""
global IDENTIFIER
if IDENTIFIER is not None and input_values is not None:
print(f"INFO for {IDENTIFIER_FIELD}={IDENTIFIER}: {message}. Input data: {input_values}")
else:
print(f"INFO: {message}")
if input_values is not None:
print(f"INFO: {message}. Input data: {input_values}")


def _push_to_stack(sheet, id, rownum):
INDEX_STACK.append(
{
Expand Down
Loading

0 comments on commit aa9967d

Please sign in to comment.