From 8c2d9bbca4ac42cd6f05d10cab42036bbd125efa Mon Sep 17 00:00:00 2001 From: axel_thevenot Date: Tue, 26 Nov 2024 00:01:58 +0100 Subject: [PATCH] feat(incremental): copy multiple tables in parallel (#1237) --- .../unreleased/Features-20241126-000241.yaml | 6 +++ dbt/adapters/bigquery/connections.py | 43 +++++++++++++------ dbt/adapters/bigquery/impl.py | 4 +- .../incremental_strategy/insert_overwrite.sql | 18 +++++--- 4 files changed, 48 insertions(+), 23 deletions(-) create mode 100644 .changes/unreleased/Features-20241126-000241.yaml diff --git a/.changes/unreleased/Features-20241126-000241.yaml b/.changes/unreleased/Features-20241126-000241.yaml new file mode 100644 index 000000000..41b659bc4 --- /dev/null +++ b/.changes/unreleased/Features-20241126-000241.yaml @@ -0,0 +1,6 @@ +kind: Features +body: copy tables and partitions in parallel +time: 2024-11-26T00:02:41.54479+01:00 +custom: + Author: AxelThevenot + Issue: "1237" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 61fa87d40..9f51d6c53 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -402,17 +402,14 @@ def standard_to_legacy(table): _, iterator = self.raw_execute(sql, use_legacy_sql=True) return self.get_table_from_response(iterator) - def copy_bq_table(self, source, destination, write_disposition) -> None: + def copy_bq_table(self, source, destination, write_disposition, partition_ids=None) -> None: conn = self.get_thread_connection() client: Client = conn.handle # ------------------------------------------------------------------------------- - # BigQuery allows to use copy API using two different formats: - # 1. client.copy_table(source_table_id, destination_table_id) - # where source_table_id = "your-project.source_dataset.source_table" - # 2. client.copy_table(source_table_ids, destination_table_id) - # where source_table_ids = ["your-project.your_dataset.your_table_name", ...] - # Let's use uniform function call and always pass list there + # BigQuery allows to use copy API on the same table in parallel + # so each source (and if partition of each source if given) is copied + # into the destination table in parallel. # ------------------------------------------------------------------------------- if type(source) is not list: source = [source] @@ -436,14 +433,32 @@ def copy_bq_table(self, source, destination, write_disposition) -> None: ", ".join(source_ref.path for source_ref in source_ref_array), destination_ref.path, ) + with self.exception_handler(msg): - copy_job = client.copy_table( - source_ref_array, - destination_ref, - job_config=CopyJobConfig(write_disposition=write_disposition), - retry=self._retry.create_reopen_with_deadline(conn), - ) - copy_job.result(timeout=self._retry.create_job_execution_timeout(fallback=300)) + + copy_jobs = [] + + # Runs all the copy jobs in parallel + for source_ref in source_ref_array: + + for partition_id in partition_ids or [None]: + source_ref_partition = ( + f"{source_ref}${partition_id}" if partition_id else source_ref + ) + destination_ref_partition = ( + f"{destination_ref}${partition_id}" if partition_id else destination_ref + ) + copy_job = client.copy_table( + source_ref_partition, + destination_ref_partition, + job_config=CopyJobConfig(write_disposition=write_disposition), + retry=self._retry.create_reopen_with_deadline(conn), + ) + copy_jobs.append(copy_job) + + # Waits for the jobs to finish + for copy_job in copy_jobs: + copy_job.result(timeout=self._retry.create_job_execution_timeout(fallback=300)) def write_dataframe_to_table( self, diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 51c457129..87cb1cb0d 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -409,7 +409,7 @@ def _agate_to_schema( return bq_schema @available.parse(lambda *a, **k: "") - def copy_table(self, source, destination, materialization): + def copy_table(self, source, destination, materialization, partition_ids=None): if materialization == "incremental": write_disposition = WRITE_APPEND elif materialization == "table": @@ -421,7 +421,7 @@ def copy_table(self, source, destination, materialization): f"{materialization}" ) - self.connections.copy_bq_table(source, destination, write_disposition) + self.connections.copy_bq_table(source, destination, write_disposition, partition_ids) return "COPY TABLE with materialization: {}".format(materialization) diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql index 3ba67931e..543a5f34f 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -18,23 +18,27 @@ {% macro bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %} + {% set partition_ids = [] %} + {% for partition in partitions %} {% if partition_by.data_type == 'int64' %} {% set partition = partition | as_text %} {% elif partition_by.granularity == 'hour' %} - {% set partition = partition.strftime("%Y%m%d%H") %} + {% set partition = partition.strftime('%Y%m%d%H') %} {% elif partition_by.granularity == 'day' %} - {% set partition = partition.strftime("%Y%m%d") %} + {% set partition = partition.strftime('%Y%m%d') %} {% elif partition_by.granularity == 'month' %} - {% set partition = partition.strftime("%Y%m") %} + {% set partition = partition.strftime('%Y%m') %} {% elif partition_by.granularity == 'year' %} - {% set partition = partition.strftime("%Y") %} + {% set partition = partition.strftime('%Y') %} {% endif %} - {% set tmp_relation_partitioned = api.Relation.create(database=tmp_relation.database, schema=tmp_relation.schema, identifier=tmp_relation.table ~ '$' ~ partition, type=tmp_relation.type) %} - {% set target_relation_partitioned = api.Relation.create(database=target_relation.database, schema=target_relation.schema, identifier=target_relation.table ~ '$' ~ partition, type=target_relation.type) %} - {% do adapter.copy_table(tmp_relation_partitioned, target_relation_partitioned, "table") %} + + {% do partition_ids.append(partition) %} + {% endfor %} + {% do adapter.copy_table(tmp_relation, target_relation, 'table', partition_ids) %} + {% endmacro %} {% macro bq_insert_overwrite_sql(