diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index f86925fde..091a49c58 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -165,6 +165,9 @@ def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwar if not on_warning_callback: super().__init__(**kwargs) else: + # Make the following code independent of arguments being defaulted or not. + if "default_args" in kwargs.keys(): + kwargs.update(kwargs.pop("default_args")) self.on_warning_callback = on_warning_callback self.is_delete_operator_pod_original = kwargs.get("is_delete_operator_pod", None) if self.is_delete_operator_pod_original is not None: diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index e6ccdc4d7..a827ad2c0 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -186,6 +186,26 @@ def test_dbt_kubernetes_build_command(): ), ({"is_delete_operator_pod": None, "on_finish_action": "keep_pod"}, (1, 1, False, "keep_pod")), ({}, (1, 1, True, "delete_pod")), + ( + { + "default_args": { + "on_failure_callback": (lambda **kwargs: None), + "is_delete_operator_pod": None, + }, + "on_finish_action": "delete_pod", + }, + (1, 2, True, "delete_pod"), + ), + ( + { + "default_args": { + "on_failure_callback": [(lambda **kwargs: None), (lambda **kwargs: None)], + "is_delete_operator_pod": None, + }, + "on_finish_action": "delete_succeeded_pod", + }, + (1, 3, False, "delete_succeeded_pod"), + ), ], ) @pytest.mark.skipif(