From 042908ceefd562ae29843a08914c9c2c026de377 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Mon, 9 Dec 2024 09:55:41 -0600 Subject: [PATCH] Patch/update partition expiration sandbox (#549) * HUBBLE-623 Update Canvas retention window * None schedule interval creates Nonetype object errors * cast string as date --- dags/queries/create_table.sql | 6 +++--- dags/sandbox_create_dag.py | 8 ++++++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/dags/queries/create_table.sql b/dags/queries/create_table.sql index cf4880b3..12826f5e 100644 --- a/dags/queries/create_table.sql +++ b/dags/queries/create_table.sql @@ -1,9 +1,9 @@ create or replace table `{target_project}.{target_dataset}.{table_id}` partition by date_trunc(batch_run_date, day) -options (partition_expiration_days = 180) as ( +options (partition_expiration_days = 450) as ( select * from `{project_id}.{dataset_id}.{table_id}` where - batch_run_date >= date_trunc(date_sub(current_date(), interval 6 month), day) - and batch_run_date < date_trunc(current_date(), day) + batch_run_date >= date_trunc(date_sub(date('{batch_run_date}'), interval 15 month), day) + and batch_run_date < date_trunc(date('{batch_run_date}'), day) ) diff --git a/dags/sandbox_create_dag.py b/dags/sandbox_create_dag.py index 5b6b76da..9f75b684 100644 --- a/dags/sandbox_create_dag.py +++ b/dags/sandbox_create_dag.py @@ -9,6 +9,7 @@ from airflow.models.variable import Variable from airflow.operators.empty import EmptyOperator from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from stellar_etl_airflow import macros from stellar_etl_airflow.build_bq_insert_job_task import ( file_to_string, get_query_filepath, @@ -32,6 +33,10 @@ "fromjson": lambda s: loads(s), }, catchup=False, + user_defined_macros={ + "subtract_data_interval": macros.subtract_data_interval, + "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, + }, sla_miss_callback=alert_sla_miss, ) as dag: PROJECT = Variable.get("public_project") @@ -42,6 +47,8 @@ TABLES_ID = Variable.get("table_ids", deserialize_json=True) DBT_TABLES = Variable.get("dbt_tables", deserialize_json=True) + batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" + start_tables_task = EmptyOperator(task_id="start_tables_task") start_views_task = EmptyOperator(task_id="start_views_task") @@ -54,6 +61,7 @@ "table_id": TABLES_ID[table_id], "target_project": SANDBOX_PROJECT, "target_dataset": SANDBOX_DATASET, + "batch_run_date": batch_run_date, } query = query.format(**sql_params) tables_create_task = BigQueryInsertJobOperator(