Skip to content

Commit

Permalink
Issues 50&51: validating input json & Issue 26: adding a dq_results d…
Browse files Browse the repository at this point in the history
…ataclass (#56)

* Move delta-spark dependency

* Fix broken test_read_data_quality_rules_from_json_returns_json_string

* Raise ValueError instead of print statement

* Rename function to load_data_quality_rules

* Add setup for validate_data_quality_rules_dict

* Start implementing validate_data_quality_rules_dict, add test

* Fix dq_rules.json test data

* Remove some checks

* Add some more validations to validate_data_quality_rules_dict

* Remove TODO for issue#50

* Add validation addressing issue#51

* Remove TODO issue#51, simplify create_and_configure_expectations

* Update Rule dataclass to reflect new input json format

* Update dq_rules.json test data

* Formatting and cleanup

* Split validate_data_quality_rules_dict into various functions

* Simplify/split further

* Add test to TestValidateDataQualityRulesDict

* Fix type hint in DataQualityRulesDict

* Further split validate_lowest_level function by key

* Add TestValidateDataSet

* Add TestValidateTables

* Add TestValidateRulesDict

* Add url validation

* Add TestValidateTableSchema

* Remove TODO on validation of url

* Formatting

* Minor change

* Update dq_rules_example.json to reflect new format

* Add TestValidateRule

* Use fixtures

* Formatting

* Add validators to pyproject.toml

* Fix broken test_common.py

* Minor change to address issue #26

---------

Co-authored-by: bas <[email protected]>
  • Loading branch information
SSchotten and bas authored Sep 23, 2024
1 parent 1198658 commit bdfdf9f
Show file tree
Hide file tree
Showing 11 changed files with 558 additions and 154 deletions.
49 changes: 19 additions & 30 deletions dq_rules_example.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,19 @@
"table_name": "well",
"rules": [
{
"rule_name": "expect_column_values_to_be_between",
"parameters": [
{
"rule_name": "ExpectColumnValuesToBeBetween",
"parameters": {
"column": "latitude",
"min_value": 6,
"max_value": 10000
}
]
},
{
"rule_name": "expect_column_distinct_values_to_equal_set",
"parameters": [
{
"rule_name": "ExpectColumnDistinctValuesToEqualSet",
"parameters": {
"column": "latitude",
"value_set": [1, 2]
}
]
}
]
},
Expand All @@ -34,12 +30,10 @@
"table_name": "container",
"rules": [
{
"rule_name": "expect_column_values_to_not_be_null",
"parameters": [
{
"rule_name": "ExpectColumnValuesToNotBeNull",
"parameters": {
"column": "containertype"
}
]
}
]
},
Expand All @@ -62,44 +56,39 @@
"table_name": "containertype",
"rules": [
{
"rule_name": "expect_table_row_count_to_be_between",
"parameters": [
{
"rule_name": "ExpectTableRowCountToBeBetween",
"parameters": {
"min_value": 1,
"max_value": 1000
}
]
},
{
"rule_name": "expect_column_values_to_not_be_null",
"parameters": [
{
"rule_name": "ExpectColumnValuesToNotBeNull",
"parameters": {
"column": "weight",
"row_condition": "col(\\\"volume\\\") < 5"
},
{
}
},
{
"rule_name": "ExpectColumnValuesToNotBeNull",
"parameters": {
"column": "volume"
}
]
},
{
"rule_name": "expect_column_values_to_be_between",
"parameters": [
{
"rule_name": "ExpectColumnValuesToBeBetween",
"parameters": {
"column": "volume",
"min_value": 0,
"max_value": 10000
}
]
},
{
"rule_name": "expect_column_values_to_be_of_type",
"parameters": [
{
"rule_name": "ExpectColumnValuesToBeOfType",
"parameters": {
"column": "volume",
"type_": "DoubleType"
}
]
}
]
}
Expand Down
13 changes: 7 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ description = "Wrapper for Great Expectations to fit the requirements of the Gem
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"great_expectations==1.0.3",
"pandas==2.1.4",
"pyspark==3.5.2",
"pyhumps==3.8.0",
"pyyaml==6.0.2",
"great_expectations==1.0.3",
"pandas==2.1.4",
"pyspark==3.5.2",
"pyhumps==3.8.0",
"pyyaml==6.0.2",
"delta-spark~=3.2.0",
"validators==0.34.0",
]


Expand All @@ -32,7 +34,6 @@ dev = [
'pylint ~= 2.16',
'autoflake ~= 2.0.1',
'coverage ~= 7.6.1',
'delta-spark ~= 3.2.0',
]

[tool.isort]
Expand Down
42 changes: 17 additions & 25 deletions src/dq_suite/common.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Literal
import yaml

from delta.tables import *
from great_expectations import ExpectationSuite, get_context
from great_expectations.data_context import (
AbstractDataContext,
EphemeralDataContext,
)
from great_expectations.data_context import AbstractDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
InMemoryStoreBackendDefaults,
Expand All @@ -15,7 +12,6 @@
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType
from delta.tables import *


@dataclass()
Expand All @@ -26,19 +22,17 @@ class Rule:
"""

rule_name: str # Name of the GX expectation
parameters: List[Dict[str, Any]] # Collection of parameters required for
parameters: Dict[str, Any] # Collection of parameters required for
# evaluating the expectation

def __post_init__(self):
if not isinstance(self.rule_name, str):
raise TypeError("'rule_name' should be of type str")

if not isinstance(self.parameters, list):
raise TypeError(
"'parameters' should be of type List[Dict[str, Any]]"
)
if not isinstance(self.parameters, dict):
raise TypeError("'parameters' should be of type Dict[str, Any]")

def __getitem__(self, key) -> str | List[Dict[str, Any]] | None:
def __getitem__(self, key) -> str | Dict[str, Any] | None:
if key == "rule_name":
return self.rule_name
elif key == "parameters":
Expand Down Expand Up @@ -98,7 +92,7 @@ def __post_init__(self):
if not isinstance(self.layer, str):
raise TypeError("'layer' should be of type str")

def __getitem__(self, key) -> str | RulesList | None:
def __getitem__(self, key) -> str | None:
if key == "name":
return self.name
elif key == "layer":
Expand Down Expand Up @@ -126,7 +120,7 @@ def __post_init__(self):
if not isinstance(self.tables, list):
raise TypeError("'tables' should be RulesDictList")

def __getitem__(self, key) -> str | RulesDictList | None:
def __getitem__(self, key) -> DatasetDict | RulesDictList | None:
if key == "dataset":
return self.dataset
elif key == "tables":
Expand Down Expand Up @@ -203,23 +197,21 @@ def merge_df_with_unity_table(
full_table_name = get_full_table_name(
catalog_name=catalog_name, table_name=table_name
)
df_alias = f'{table_name}_df'
regelTabel = DeltaTable.forName(spark_session, full_table_name)
regelTabel.alias(table_name) \
.merge(
df.alias(df_alias),
f'{table_name}.{table_merge_id} = {df_alias}.{df_merge_id}'
) \
.whenMatchedUpdate(set = merge_dict) \
.whenNotMatchedInsert(values = merge_dict) \
.execute()
df_alias = f"{table_name}_df"
regel_tabel = DeltaTable.forName(spark_session, full_table_name)
regel_tabel.alias(table_name).merge(
df.alias(df_alias),
f"{table_name}.{table_merge_id} = {df_alias}.{df_merge_id}",
).whenMatchedUpdate(set=merge_dict).whenNotMatchedInsert(
values=merge_dict
).execute()


def get_data_context() -> AbstractDataContext: # pragma: no cover - part of GX
return get_context(
project_config=DataContextConfig(
store_backend_defaults=InMemoryStoreBackendDefaults(),
analytics_enabled=False
analytics_enabled=False,
)
)

Expand Down
21 changes: 4 additions & 17 deletions src/dq_suite/df_checker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import List

import great_expectations
import humps
from great_expectations import Checkpoint, ValidationDefinition
from great_expectations.checkpoint.actions import CheckpointAction
from great_expectations.checkpoint.checkpoint import CheckpointResult
Expand Down Expand Up @@ -146,25 +145,14 @@ def create_and_configure_expectations(
for validation_rule in validation_rules_list:
# Get the name of expectation as defined by GX
gx_expectation_name = validation_rule["rule_name"]
gx_expectation_parameters: dict = validation_rule["parameters"]

# Get the actual expectation as defined by GX
gx_expectation = getattr(
great_expectations.expectations.core,
humps.pascalize(gx_expectation_name),
gx_expectation_name,
)
# Issue 50
# TODO: drop pascalization, and require this as input check
# when ingesting JSON? Could be done via humps.is_pascalcase()

for validation_parameter_dict in validation_rule["parameters"]:
kwargs = {}
# Issue 51
# TODO/check: is this loop really necessary? Intuitively, I added
# the same expectation for each column - I didn't consider using
# the same expectation with different parameters
for par_name, par_value in validation_parameter_dict.items():
kwargs[par_name] = par_value
suite.add_expectation(gx_expectation(**kwargs))
suite.add_expectation(gx_expectation(**gx_expectation_parameters))


def validate(
Expand Down Expand Up @@ -215,12 +203,11 @@ def run(
table_name=validation_settings_obj.table_name,
)
if rules_dict is None:
print(
raise ValueError(
f"No validations found for table_name "
f"'{validation_settings_obj.table_name}' in JSON file at '"
f"{json_path}'."
)
return

# 2) perform the validation on the dataframe
checkpoint_result = validate(
Expand Down
Loading

0 comments on commit bdfdf9f

Please sign in to comment.