Skip to content

Commit

Permalink
fix(server/mongo): support missing fields with isnull/notnull [TCTC-9…
Browse files Browse the repository at this point in the history
…655]

A join step can introduce missing fields: if the `$lookup` step does not return anything,
the right fields will be missing from the resulting documents. This causes
`{$eq: [$missing_field, null]}` to always evaluate to false since the field cannot be
looked up.

This PR fixes that behaviour by defaulting to null for missing expressions with the
isnull/isnotnull operators.

The same logic was applied to the `then` and `else` branches: an `$addFields` aggreation with a
`$cond` evalutating to an empty field just does NOT add a new field.

Signed-off-by: Luka Peschke <[email protected]>
  • Loading branch information
lukapeschke committed Nov 28, 2024
1 parent 4a1740b commit 6bcd4be
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 8 deletions.
4 changes: 4 additions & 0 deletions server/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Fixed

- Mongo: the `isnull` and `notnull` operators now behave correctly in case of a missing field in the `ifthenelse` step

## [0.48.3] - 2024-11-21

### Fixed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from typing import Any, Literal

from weaverbird.backends.mongo_translator.steps.formula import build_mongo_formula_tree
from weaverbird.backends.mongo_translator.steps.types import MongoStep
from weaverbird.backends.mongo_translator.utils import build_cond_expression
from weaverbird.pipeline.formula_ast.eval import FormulaParser
from weaverbird.pipeline.steps.ifthenelse import IfThenElse, IfthenelseStep


def _default_to_null(expr: Any) -> dict[Literal["$ifNull"], list[Any]]:
"""Makes the passed expression default to NULL if not defined"""
return {"$ifNull": [expr, None]}


def transform_ifthenelse_step(step: IfThenElse) -> MongoStep:
else_expr: dict | str | int | float | bool
if isinstance(step.else_value, IfThenElse):
Expand All @@ -21,7 +28,7 @@ def transform_ifthenelse_step(step: IfThenElse) -> MongoStep:
except SyntaxError: # step is a badly formatted string
return step.then

return {"$cond": {"if": if_expr, "then": then_expr, "else": else_expr}}
return {"$cond": {"if": if_expr, "then": _default_to_null(then_expr), "else": _default_to_null(else_expr)}}


def translate_ifthenelse(step: IfthenelseStep) -> list[MongoStep]:
Expand Down
3 changes: 2 additions & 1 deletion server/src/weaverbird/backends/mongo_translator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def build_cond_expression(

else:
if cond.operator == "notnull" or cond.operator == "isnull":
return {operator_mapping[cond.operator]: [f"${cond.column}", None]}
# $ifNull allows to replace missing values with NULL
return {operator_mapping[cond.operator]: [{"$ifNull": [f"${cond.column}", None]}, None]}

else:
cond_expression = {operator_mapping[cond.operator]: [f"${cond.column}", cond.value]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,9 @@ def test_mongo_translator_pipeline(mongo_database, case_id, case_spec_file_path,
if "other_inputs" in spec and (
"join" in case_id or "append" in case_id
): # needed for join & append steps tests as we need a != collection
[
mongo_database[k].insert_many(
pd.read_json(StringIO(json.dumps(v)), orient="table").to_dict(orient="records")
)
for k, v in spec.get("other_inputs", {}).items()
]
for collection_name, raw_df in spec["other_inputs"].items():
df = pd.read_json(StringIO(json.dumps(raw_df)), orient="table")
mongo_database[collection_name].insert_many(df.to_dict(orient="records"))

# create query
steps = spec["step"]["pipeline"]
Expand Down

0 comments on commit 6bcd4be

Please sign in to comment.