diff --git a/.changes/unreleased/Fixes-20231206-172009.yaml b/.changes/unreleased/Fixes-20231206-172009.yaml new file mode 100644 index 000000000..f8dad46bf --- /dev/null +++ b/.changes/unreleased/Fixes-20231206-172009.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: retry wait for result independently from job creation +time: 2023-12-06T17:20:09.264805+01:00 +custom: + Author: Kayrnt + Issue: "1045" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index de84e4bf8..cdfc3bbe5 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -492,17 +492,23 @@ def raw_execute( job_creation_timeout = self.get_job_creation_timeout_seconds(conn) job_execution_timeout = self.get_job_execution_timeout_seconds(conn) - def fn(): + def job_creation_fn(): return self._query_and_results( - client, - sql, - job_params, - job_creation_timeout=job_creation_timeout, + client, sql, job_params, job_creation_timeout=job_creation_timeout + ) + + query_job = self._retry_and_handle(msg=sql, conn=conn, fn=job_creation_fn) + + def wait_for_job_results_fn(): + return self._wait_for_results( + query_job=query_job, job_execution_timeout=job_execution_timeout, limit=limit, ) - query_job, iterator = self._retry_and_handle(msg=sql, conn=conn, fn=fn) + query_job, iterator = self._retry_and_handle( + msg=sql, conn=conn, fn=wait_for_job_results_fn + ) return query_job, iterator @@ -729,15 +735,7 @@ def query_schemas(): return self._retry_and_handle(msg="list dataset", conn=conn, fn=query_schemas) - def _query_and_results( - self, - client, - sql, - job_params, - job_creation_timeout=None, - job_execution_timeout=None, - limit: Optional[int] = None, - ): + def _query_and_results(self, client, sql, job_params, job_creation_timeout=None): """Query the client and wait for results.""" # Cannot reuse job_config if destination is set and ddl is used job_config = google.cloud.bigquery.QueryJobConfig(**job_params) @@ -750,7 +748,11 @@ def _query_and_results( logger.debug( self._bq_job_link(query_job.location, query_job.project, query_job.job_id) ) + return query_job + def _wait_for_results( + self, query_job, job_execution_timeout=None, limit: Optional[int] = None + ): # only use async logic if user specifies a timeout if job_execution_timeout: loop = asyncio.new_event_loop() @@ -787,7 +789,7 @@ def reopen_conn_on_error(error): target=fn, predicate=_ErrorCounter(self.get_job_retries(conn)).count_error, sleep_generator=self._retry_generator(), - deadline=self.get_job_retry_deadline_seconds(conn), + timeout=self.get_job_retry_deadline_seconds(conn), on_error=reopen_conn_on_error, ) diff --git a/tests/unit/test_bigquery_connection_manager.py b/tests/unit/test_bigquery_connection_manager.py index d6c3f64fc..93f7fe3ff 100644 --- a/tests/unit/test_bigquery_connection_manager.py +++ b/tests/unit/test_bigquery_connection_manager.py @@ -22,6 +22,7 @@ def setUp(self): self.connections = BigQueryConnectionManager(profile=profile) self.mock_client = Mock(dbt.adapters.bigquery.impl.google.cloud.bigquery.Client) + self.query_job = Mock(dbt.adapters.bigquery.impl.google.cloud.bigquery.QueryJob) self.mock_connection = MagicMock() self.mock_connection.handle = self.mock_client @@ -114,7 +115,6 @@ def test_query_and_results(self, mock_bq): "sql", {"job_param_1": "blah"}, job_creation_timeout=15, - job_execution_timeout=3, ) mock_bq.QueryJobConfig.assert_called_once() @@ -123,23 +123,17 @@ def test_query_and_results(self, mock_bq): ) @patch("dbt.adapters.bigquery.impl.google.cloud.bigquery") - def test_query_and_results_timeout(self, mock_bq): - self.mock_client.query = Mock( - return_value=Mock(result=lambda *args, **kwargs: time.sleep(4)) - ) + def test_wait_for_results_timeout(self, mock_bq): + def mock_result(max_results): + time.sleep(3) + return None + + self.query_job.result = MagicMock(side_effect=mock_result) + with pytest.raises(dbt.exceptions.DbtRuntimeError) as exc: - self.connections._query_and_results( - self.mock_client, - "sql", - {"job_param_1": "blah"}, - job_creation_timeout=15, - job_execution_timeout=1, - ) + self.connections._wait_for_results(self.query_job, job_execution_timeout=1, limit=100) - mock_bq.QueryJobConfig.assert_called_once() - self.mock_client.query.assert_called_once_with( - query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15 - ) + self.query_job.result.assert_called_once_with(max_results=100) assert "Query exceeded configured timeout of 1s" in str(exc.value) def test_copy_bq_table_appends(self):