Skip to content

Commit

Permalink
Issue 36: increase test coverage of validation.py (#68)
Browse files Browse the repository at this point in the history
* Rename to validation.py

* Move filter_validation_dict_by_table_name to input_helpers.py, modify/add related tests

* Minor change

* Formatting

* Add placeholder test classes

* Move a bunch of GX data_context method calls to ValidationSettings.create_batch_definition

* Start implementing TestGetOrAddValidationDefinition, mock create_batch_definition call

* Make sure create_batch_definition only initialise_or_update_attributes when necessary

* Stop printing of expectation suite

* Simplify get_or_add_validation_definition, move parts to ValidationSettings

* Add some todos

* Remove dataclass decorator

* Fix initialisation of ValidationSettings

* Add another todo

* Refactor more logic into ValidationSettings.create_validation_definition

* Formatting

* Remove get_or_add_validation_definition and test_get_or_add_validation_definition

* Separate into ValidationSettings dataclass and ValidationRunner class

* Formatting

* Refactor some functions to only use ValidationRunner instance

* Separate call to _set_data_context during init

* Fix some imports

* Add TestValidationRunner

* Formatting

* Pull action_list into ValidationRunner

* Re-add __post_init__ to ValidationSettings

* Add post-init checks

* Make name derived variables private

* Fix post-init checks

* Fix broken tests

* Added tests for create_action_list

* Create empty list upon each call to create_action_list

* Make _initialise_or_update_name_parameters private, call upon init

* Pull add_expectations_to_suite into ValidationRunner

* Pull _get_or_add_checkpoint and run_validation into ValidationRunner

* Add docstring and type hint, rename to run

* Formatting

* Fix broken tests

* Remove validation_settings_obj as an argument

* Update README.md

* Rename check_name to validation_name

* Fix broken tests

* Add docstring and todo

* Add test_get_gx_expectation_object

* Simplify code by removing boolean send_..._notification arguments

* Formatting

* Add todo

* Fix broken tests and imports

* Add test_rules_dict_without_rules_field_results_in_table_schema_validation

* Modify README.md

* Add docstrings

* Rename to validation_name

* Remove test_get_or_add_checkpoint

* Formatting

* Add TODO + comments

* Add docstrings

* Update README.md

* Modify docstrings, update README.md

* Increase version

* Update README.md

* Fix broken test_extract_table_data_returns_correct_list

* Fix some tests in test_common.py

* Modify README.md

* Add test for _get_or_add_expectation_suite

* Add another test for _get_gx_expectation_object

* Add test for add_expectations_to_suite

* Formatting

* Remove test

* Add some comments and todos

* Make action_list tests more explicit

* Remove some 'pragma: no cover' statements

* Fix use of MicrosoftTeamsNotificationAction

* Minor changes

* Minor changes

* Modify docstring

---------

Co-authored-by: bas <[email protected]>
  • Loading branch information
SSchotten and bas authored Nov 14, 2024
1 parent 727dc9d commit d5e12e3
Show file tree
Hide file tree
Showing 12 changed files with 840 additions and 334 deletions.
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,31 @@ pip install dq-suite-amsterdam


3. Get ready to validate your first table. To do so, define
- `catalog_name` as the name of your catalog
- `table_name` as the name of the table for which a data quality check is required. This name should also occur in the JSON file
- `dq_rule_json_path` as a path to a JSON file, formatted in [this](dq_rules_example.json) way
- `df` as a Spark dataframe containing the table that needs to be validated (e.g. via `spark.read.csv` or `spark.read.table`)
- `spark` as a SparkSession object (in Databricks notebooks, this is by default called `spark`)
- `catalog_name` as the name of your catalog ('dpxx_dev' or 'dpxx_prd')
- `table_name` as the name of the table for which a data quality check is required. This name should also occur in the JSON file at `dq_rule_json_path`



4. Finally, perform the validation by running
```python
import dq_suite

validation_settings_obj = dq_suite.ValidationSettings(spark_session=spark,
catalog_name=catalog_name,
table_name=table_name,
check_name="name_of_check_goes_here")
dq_suite.run(json_path=dq_rule_json_path, df=df, validation_settings_obj=validation_settings_obj)
dq_suite.validation.run(
json_path=dq_rule_json_path,
df=df,
spark_session=spark,
catalog_name=catalog_name,
table_name=table_name,
validation_name="my_validation_name",
)
```
Note: Looping over multiple data frames may require a redefinition of the `json_path` and `validation_settings` variables.

See the documentation of `ValidationSettings` for what other parameters can be passed upon intialisation.
See the documentation of `dq_suite.validation.run` for what other parameters can be passed.


# Known exceptions
# Known exceptions / issues
- The functions can run on Databricks using a Personal Compute Cluster or using a Job Cluster.
Using a Shared Compute Cluster will result in an error, as it does not have the permissions that Great Expectations requires.

Expand All @@ -57,7 +60,7 @@ Older versions of DBR will result in errors upon install of the `dq-suite-amster

- At time of writing (late Aug 2024), Great Expectations v1.0.0 has just been released, and is not (yet) compatible with Python 3.12. Hence, make sure you are using the correct version of Python as interpreter for your project.

- The run_time is defined separately from Great Expectations in df_checker. We plan on fixing it when Great Expectations has documented how to access it from the RunIdentifier object.
- The `run_time` value is defined separately from Great Expectations in `validation.py`. We plan on fixing this when Great Expectations has documented how to access it from the RunIdentifier object.


# Updates
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dq-suite-amsterdam"
version = "0.11.2"
version = "0.11.3"
authors = [
{ name="Arthur Kordes", email="[email protected]" },
{ name="Aysegul Cayir Aydar", email="[email protected]" },
Expand Down
9 changes: 5 additions & 4 deletions src/dq_suite/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""DQ API."""

# from .common import ValidationSettings
# from .other import schema_to_json_string
# from .validation import run
#
# # Use __all__ to let developers know what is part of the public API.
# __all__ = ["export_schema_to_json_string", "run", "ValidationSettings"]
#from .common import ValidationSettings
#from .df_checker import run
#from .other import schema_to_json_string
# __all__ = ["schema_to_json_string", "run", "ValidationSettings"]
102 changes: 44 additions & 58 deletions src/dq_suite/common.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Literal
from typing import Literal

from delta.tables import *
from great_expectations import ExpectationSuite, get_context
from great_expectations.data_context import AbstractDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
InMemoryStoreBackendDefaults,
)
from great_expectations.exceptions import DataContextError
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType
Expand Down Expand Up @@ -114,7 +107,7 @@ class DataQualityRulesDict:
tables: RulesDictList

def __post_init__(self):
if not isinstance(self.dataset, dict):
if not isinstance(self.dataset, DatasetDict):
raise TypeError("'dataset' should be DatasetDict")

if not isinstance(self.tables, list):
Expand All @@ -128,6 +121,7 @@ def __getitem__(self, key) -> DatasetDict | RulesDictList | None:
raise KeyError(key)


# TODO: replace by df.isEmpty()
def is_empty_dataframe(df: DataFrame) -> bool:
return len(df.take(1)) == 0

Expand Down Expand Up @@ -207,50 +201,32 @@ def merge_df_with_unity_table(
).execute()


def get_data_context() -> AbstractDataContext: # pragma: no cover - part of GX
return get_context(
project_config=DataContextConfig(
store_backend_defaults=InMemoryStoreBackendDefaults(),
analytics_enabled=False,
)
)


@dataclass()
class ValidationSettings:
"""
Contains all user input required for running a validation. Typically,
this means catalog, table and validation names and a SparkSession object.
spark_session: SparkSession object
catalog_name: name of unity catalog
table_name: name of table in unity catalog
check_name: name of data quality check
validation_name: name of data quality check
data_context_root_dir: path to write GX data
context - default "/dbfs/great_expectations/"
data_context: a data context object
expectation_suite_name: name of the GX expectation suite
checkpoint_name: name of the GX checkpoint
run_name: name of the data quality run
send_slack_notification: indicator to use GX's built-in Slack
notification action
slack_webhook: webhook, recommended to store in key vault
send_ms_teams_notification: indicator to use GX's built-in Microsoft
Teams notification action
ms_teams_webhook: webhook, recommended to store in key vault
slack_webhook: webhook, recommended to store in key vault. If not None,
a Slack notification will be sent
ms_teams_webhook: webhook, recommended to store in key vault. If not None,
an MS Teams notification will be sent
notify_on: when to send notifications, can be equal to "all",
"success" or "failure"
"success" or "failure"
"""

spark_session: SparkSession
catalog_name: str
table_name: str
check_name: str
validation_name: str
data_context_root_dir: str = "/dbfs/great_expectations/"
data_context: AbstractDataContext | None = None
expectation_suite_name: str | None = None
checkpoint_name: str | None = None
run_name: str | None = None
send_slack_notification: bool = False
slack_webhook: str | None = None
send_ms_teams_notification: bool = False
ms_teams_webhook: str | None = None
notify_on: Literal["all", "success", "failure"] = "failure"

Expand All @@ -261,40 +237,50 @@ def __post_init__(self):
raise TypeError("'catalog_name' should be of type str")
if not isinstance(self.table_name, str):
raise TypeError("'table_name' should be of type str")
if not isinstance(self.check_name, str):
raise TypeError("'check_name' should be of type str")
if not isinstance(self.validation_name, str):
raise TypeError("'validation_name' should be of type str")
if not isinstance(self.data_context_root_dir, str):
raise TypeError("'data_context_root_dir' should be of type str")
if not isinstance(self.slack_webhook, str):
if self.slack_webhook is not None:
raise TypeError("'slack_webhook' should be of type str")
if not isinstance(self.ms_teams_webhook, str):
if self.ms_teams_webhook is not None:
raise TypeError("'ms_teams_webhook' should be of type str")
if self.notify_on not in ["all", "success", "failure"]:
raise ValueError(
"'notify_on' should be equal to 'all', 'success' or 'failure'"
)
self._initialise_or_update_name_parameters()

def initialise_or_update_attributes(self): # pragma: no cover - complex
# function
self._set_data_context()

# TODO/check: do we want to allow for custom names via parameters?
def _initialise_or_update_name_parameters(self):
# TODO/check: nearly all names are related to 'validation_name' - do we want
# to allow for custom names via parameters?
self._set_expectation_suite_name()
self._set_checkpoint_name()
self._set_run_name()

# Finally, add/retrieve the suite to/from the data context
try:
self.data_context.suites.get(name=self.expectation_suite_name)
except DataContextError:
self.data_context.suites.add(
suite=ExpectationSuite(name=self.expectation_suite_name)
)

def _set_data_context(self): # pragma: no cover - uses part of GX
self.data_context = get_data_context()
self._set_data_source_name()
self._set_validation_definition_name()
self._set_batch_definition_name()

def _set_expectation_suite_name(self):
self.expectation_suite_name = f"{self.check_name}_expectation_suite"
self._expectation_suite_name = (
f"{self.validation_name}_expectation_suite"
)

def _set_checkpoint_name(self):
self.checkpoint_name = f"{self.check_name}_checkpoint"
self._checkpoint_name = f"{self.validation_name}_checkpoint"

def _set_run_name(self):
self.run_name = f"%Y%m%d-%H%M%S-{self.check_name}"
self._run_name = f"%Y%m%d-%H%M%S-{self.validation_name}"

def _set_data_source_name(self):
self._data_source_name = f"spark_data_source_{self.validation_name}"

def _set_validation_definition_name(self):
self._validation_definition_name = (
f"{self.validation_name}_validation_definition"
)

def _set_batch_definition_name(self):
self._batch_definition_name = f"{self.validation_name}_batch_definition"
Loading

0 comments on commit d5e12e3

Please sign in to comment.