From b3538777f0ffd1d21537bb6a4ba6bf91f3e43be6 Mon Sep 17 00:00:00 2001 From: gbmarc1 Date: Wed, 6 Dec 2023 21:16:43 -0500 Subject: [PATCH 1/2] feat: adding pyspark_job `properties` and `archive_uris` options --- dbt/adapters/bigquery/python_submissions.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 6ea5a5564..31945a711 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -103,6 +103,14 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: "placement": {"cluster_name": self._get_cluster_name()}, "pyspark_job": { "main_python_file_uri": self.gcs_location, + "archive_uris": self.parsed_model["config"].get( + "dataproc_archive_uris", + [], + ), + "properties": self.parsed_model["config"].get( + "dataproc_properties", + {}, + ), }, } operation = self.job_client.submit_job_as_operation( # type: ignore From 18bde3a2484985361cb13f2a5967269156c2b225 Mon Sep 17 00:00:00 2001 From: gbmarc1 Date: Wed, 6 Dec 2023 22:10:43 -0500 Subject: [PATCH 2/2] refactor to dataproc_pyspark_job to feed any parameter to pyspark_job --- dbt/adapters/bigquery/python_submissions.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 31945a711..b8c817534 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -102,17 +102,11 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: job = { "placement": {"cluster_name": self._get_cluster_name()}, "pyspark_job": { - "main_python_file_uri": self.gcs_location, - "archive_uris": self.parsed_model["config"].get( - "dataproc_archive_uris", - [], - ), - "properties": self.parsed_model["config"].get( - "dataproc_properties", - {}, - ), + "main_python_file_uri": self.gcs_location }, } + job["pyspark_job"].update(self.parsed_model["config"].get("dataproc_pyspark_job", {})) + operation = self.job_client.submit_job_as_operation( # type: ignore request={ "project_id": self.credential.execution_project,