Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating to Dagster 1.9.7 results in DagsterInvalidDefinitionError on @asset_check definintions using additional_ins #27003

Open
zaneselvans opened this issue Jan 10, 2025 · 0 comments
Labels
type: bug Something isn't working

Comments

@zaneselvans
Copy link

What's the issue?

After updating our environment to use dagster 1.9.7, we are now getting a couple of instances of DagsterInvalidDefinitionError when starting the dagster UI. This happens even when the only change is the dagster version bump. Previously our DAG had no such errors being flagged.

Looking at the release notes associated with the 1.9.7 tag, the only change the seems like maybe it's related to this is the first item, the addition of load_definitions_from_module. However, when attempting to read the documentation on that new feature, it doesn't appear on the page linked from the release notes, and I'm unable to find it searching the documentation, even though the dropdown at the top of the documentation says it pertains to dagster 1.9.7. I see that 1.9.7 isn't showing up in the changelog yet either.

There seem to be a number of newly unresolvable inputs, and the Dagster UI startup bails as soon as it hits one, but there are several of them, and different ones get hit first on different attempts to start the UI. I have seen 3 different instances show up so far, and all of them are asset_checks in which the supposedly unresolvable input is being provided using additional_in=AssetIn("some_asset_key_here")

In our code base there is also a 4th instance of an @asset_check taking additional_ins but I have not seen it be the one that causes the Dagster UI to fail yet.

See code snippets and stack traces in additional information below.

What did you expect to happen?

I expected the Dagster UI to start up without any errors or warnings, as it does with v1.9.6.

How to reproduce?

I tried adding this simple example of the apparent pattern causing the failure to our project, but the other asset checks mentioned keep being the ones that cause the failure, so I don't know if this would cause the failure in isolation:

import pandas as pd
from dagster import asset, asset_check, AssetIn, AssetCheckResult

@asset
def test_static_df1() -> pd.DataFrame:
    """A simple static dataframe asset for testing asset checks."""
    return pd.DataFrame(
        {
            "col1": [1, 2, 3, 4, 5],
            "col2": ["a", "b", "c", "d", "e"],
        }
    )


@asset
def test_static_df2() -> pd.DataFrame:
    """A simple static dataframe asset for testing asset checks."""
    return pd.DataFrame(
        {
            "col1": [1, 2, 3, 4, 5],
            "col2": ["a", "b", "c", "d", "e"],
        }
    )


@asset_check(
    name="check_test_static_dataframe_assets",
    asset="test_static_df1",
    additional_ins={"df2": AssetIn("test_static_df2")},
)
def check_test_static_dataframe_assets(
    test_static_df1: pd.DataFrame, df2: pd.DataFrame
) -> AssetCheckResult:
    """Check that the dataframes are identical"""
    if test_static_df1.equals(df2):
        return AssetCheckResult(passed=True)
    return AssetCheckResult(
        passed=False,
        metadata={"reason": "The two dataframes are NOT identical."},
    )

Dagster version

dagster, version 1.9.7

Deployment type

Local

Deployment details

  • Local deployment using a multi-process executor
  • Postgres for event logging
  • Persisted assets are written to Parquet files in the local filesystem
  • Python 3.12.8 on MacOS

Additional information

One of the problematic asset checks is being defined by an asset check factory, but it seems like the input it's complaining about (tags_df) is indeed defined in another asset and then passed in explicitly using additional_in.

check_specs_detailed_tables_tags = [
    {
        "asset": "_out_ferc1__detailed_balance_sheet_assets",
        "tag_columns": ["in_rate_base", "aggregatable_utility_type"],
    },
    {
        "asset": "_out_ferc1__detailed_balance_sheet_liabilities",
        "tag_columns": ["in_rate_base"],
    },
]


def make_check_tag_propagation(spec) -> AssetChecksDefinition:
    """Check the propagation of tags."""

    @asset_check(
        name="check_tag_propagation",
        asset=spec["asset"],
        additional_ins={"tags_df": AssetIn("_out_ferc1__detailed_tags")},
    )
    def _check(df: pd.DataFrame, tags_df: pd.DataFrame):
        for tag in spec["tag_columns"]:
            check_tag_propagation_compared_to_compiled_tags(df, tag, tags_df)
        return AssetCheckResult(passed=True)

    return _check


def make_check_correction_tags(spec) -> AssetChecksDefinition:
    """Check the propagation of tags."""

    @asset_check(
        name="check_correction_tags",
        asset=spec["asset"],
    )
    def _check(df):
        for tag in spec["tag_columns"]:
            check_for_correction_xbrl_factoids_with_tag(df, tag)
        return AssetCheckResult(passed=True)

    return _check


_checks = [
    make_check_tag_propagation(spec) for spec in check_specs_detailed_tables_tags
] + [make_check_correction_tags(spec) for spec in check_specs_detailed_tables_tags]

Another problematic @asset_check is not produced by an asset factory, but the "unresolvable" input (aggs) is also being passed in using additional_in:

@asset_check(
    asset="out_gridpathratoolkit__hourly_available_capacity_factor",
    additional_ins={
        "aggs": AssetIn("core_gridpathratoolkit__assn_generator_aggregation_group")
    },
    blocking=True,
)
def check_valid_aggregation_groups(
    out_gridpathratoolkit__hourly_available_capacity_factor,
    aggs: pd.DataFrame,
) -> AssetCheckResult:
    """Check that every capacity factor aggregation key appears in the aggregations.

    This isn't a normal foreign-key relationship, since the aggregation group isn't the
    primary key in the aggregation tables, and is not unique in either of these tables,
    but if an aggregation group appears in the capacity factor time series and never
    appears in the aggregation table, then something is wrong.
    """
    missing_agg_groups = set(
        out_gridpathratoolkit__hourly_available_capacity_factor.aggregation_group.unique()
    ).difference(set(aggs.aggregation_group.unique()))
    return AssetCheckResult(passed=missing_agg_groups == set())

Output from starting dagster-webserver

dagster-webserver
2025-01-09 23:15:49 -0600 - dagster.code_server - INFO - Starting Dagster code server for module pudl.etl in process 28731
2025-01-09 23:15:54 -0600 - dagster.code_server - ERROR - Error while importing code
Traceback (most recent call last):
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_grpc/server.py", line 425, in __init__
    self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
                                                              ^^^^^^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_grpc/server.py", line 283, in __init__
    repo_def.load_all_definitions()
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/repository_definition.py", line 188, in load_all_definitions
    self._repository_data.load_all_definitions()
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/repository_data.py", line 203, in load_all_definitions
    self.get_all_jobs()
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/repository_data.py", line 426, in get_all_jobs
    self._all_jobs = self._jobs.get_all_definitions()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/caching_index.py", line 92, in get_all_definitions
    sorted(
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/caching_index.py", line 123, in get_definition
    definition = cast(Callable, definition_source)()
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/asset_job.py", line 75, in build_asset_job_lambda
    job_def = build_asset_job(
              ^^^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/asset_job.py", line 205, in build_asset_job
    return graph.to_job(
           ^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/graph_definition.py", line 691, in to_job
    return JobDefinition.dagster_internal_init(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/job_definition.py", line 288, in dagster_internal_init
    return JobDefinition(
           ^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/job_definition.py", line 154, in __init__
    self._graph_def.get_inputs_must_be_resolved_top_level(self._asset_layer)
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/graph_definition.py", line 326, in get_inputs_must_be_resolved_top_level
    raise DagsterInvalidDefinitionError(
dagster._core.errors.DagsterInvalidDefinitionError: Input 'tags_df' of op '_out_ferc1__detailed_balance_sheet_liabilities_check_tag_propagation' has no way of being resolved. Must provide a resolution to this input via another op/graph, or via a direct input value mapped from the top-level graph. To learn more, see the docs for unconnected inputs: https://docs.dagster.io/concepts/io-management/unconnected-inputs#unconnected-inputs.
2025-01-09 23:15:54 -0600 - dagster.code_server - INFO - Started Dagster code server for module pudl.etl in process 28731
2025-01-09 23:15:54 -0600 - dagster - WARNING - /Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/workspace/context.py:783: UserWarning: Error loading repository location pudl.etl:dagster._core.errors.DagsterInvalidDefinitionError: Input 'tags_df' of op '_out_ferc1__detailed_balance_sheet_liabilities_check_tag_propagation' has no way of being resolved. Must provide a resolution to this input via another op/graph, or via a direct input value mapped from the top-level graph. To learn more, see the docs for unconnected inputs: https://docs.dagster.io/concepts/io-management/unconnected-inputs#unconnected-inputs.

Stack Trace:
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_grpc/server.py", line 425, in __init__
    self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
                                                              ^^^^^^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_grpc/server.py", line 283, in __init__
    repo_def.load_all_definitions()
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/repository_definition.py", line 188, in load_all_definitions
    self._repository_data.load_all_definitions()
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/repository_data.py", line 203, in load_all_definitions
    self.get_all_jobs()
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/repository_data.py", line 426, in get_all_jobs
    self._all_jobs = self._jobs.get_all_definitions()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/caching_index.py", line 92, in get_all_definitions
    sorted(
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/caching_index.py", line 123, in get_definition
    definition = cast(Callable, definition_source)()
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/asset_job.py", line 75, in build_asset_job_lambda
    job_def = build_asset_job(
              ^^^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/asset_job.py", line 205, in build_asset_job
    return graph.to_job(
           ^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/graph_definition.py", line 691, in to_job
    return JobDefinition.dagster_internal_init(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/job_definition.py", line 288, in dagster_internal_init
    return JobDefinition(
           ^^^^^^^^^^^^^^
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/job_definition.py", line 154, in __init__
    self._graph_def.get_inputs_must_be_resolved_top_level(self._asset_layer)
  File "/Users/zane/miniforge3/envs/pudl-dev/lib/python3.12/site-packages/dagster/_core/definitions/graph_definition.py", line 326, in get_inputs_must_be_resolved_top_level
    raise DagsterInvalidDefinitionError(

  warnings.warn(f"Error loading repository location {location_name}:{error.to_string()}")

2025-01-09 23:15:54 -0600 - dagster.code_server - INFO - Starting Dagster code server for module pudl.ferc_to_sqlite in process 28748
2025-01-09 23:15:59 -0600 - dagster.code_server - INFO - Started Dagster code server for module pudl.ferc_to_sqlite in process 28748
2025-01-09 23:15:59 -0600 - dagster-webserver - INFO - Serving dagster-webserver on http://127.0.0.1:3000 in process 28715
2025-01-09 23:16:39 -0600 - dagster.code_server - INFO - Stopping server once all current RPC calls terminate or 60 seconds pass
2025-01-09 23:16:39 -0600 - dagster.code_server - INFO - Shutting down Dagster code server for module pudl.etl in process 28731

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@zaneselvans zaneselvans added the type: bug Something isn't working label Jan 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant