Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always emit warning when microbatch models lack any filtered input node #11196

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20250107-173719.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Ensure warning about microbatch lacking filter inputs is always fired
time: 2025-01-07T17:37:19.373261-06:00
custom:
Author: QMalcolm
Issue: "11159"
38 changes: 23 additions & 15 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@
self.check_for_spaces_in_resource_names()
self.check_for_microbatch_deprecations()
self.check_forcing_batch_concurrency()
self.check_microbatch_model_has_a_filtered_input()

return self.manifest

Expand Down Expand Up @@ -1472,21 +1473,6 @@
f"Microbatch model '{node.name}' optional 'concurrent_batches' config must be of type `bool` if specified, but got: {type(concurrent_batches)})."
)

# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time:
if not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
has_input_with_event_time_config = True

if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def check_forcing_batch_concurrency(self) -> None:
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
adapter = get_adapter(self.root_project)
Expand All @@ -1508,6 +1494,28 @@
)
)

def check_microbatch_model_has_a_filtered_input(self):
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
and node.config.incremental_strategy == "microbatch"
):
# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time:
if not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(

Check warning on line 1511 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L1511

Added line #L1511 was not covered by tests
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
has_input_with_event_time_config = True

if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
Expand Down
4 changes: 3 additions & 1 deletion tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,11 @@ def test_run_with_event_time(self, project):
assert len(catcher.caught_events) == 1

# our partition grain is "day" so running the same day without new data should produce the same results
catcher.caught_events = []
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run"])
run_dbt(["run"], callbacks=[catcher.catch])
self.assert_row_count(project, "microbatch_model", 3)
assert len(catcher.caught_events) == 1

# add next two days of data
test_schema_relation = project.adapter.Relation.create(
Expand Down
Loading