Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry wait for result independently from job creation #1042

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20231206-172009.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: retry wait for result independently from job creation
time: 2023-12-06T17:20:09.264805+01:00
custom:
Author: Kayrnt
Issue: "1045"
34 changes: 18 additions & 16 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,17 +492,23 @@ def raw_execute(
job_creation_timeout = self.get_job_creation_timeout_seconds(conn)
job_execution_timeout = self.get_job_execution_timeout_seconds(conn)

def fn():
def job_creation_fn():
return self._query_and_results(
client,
sql,
job_params,
job_creation_timeout=job_creation_timeout,
client, sql, job_params, job_creation_timeout=job_creation_timeout
)

query_job = self._retry_and_handle(msg=sql, conn=conn, fn=job_creation_fn)

def wait_for_job_results_fn():
return self._wait_for_results(
query_job=query_job,
job_execution_timeout=job_execution_timeout,
limit=limit,
)

query_job, iterator = self._retry_and_handle(msg=sql, conn=conn, fn=fn)
query_job, iterator = self._retry_and_handle(
msg=sql, conn=conn, fn=wait_for_job_results_fn
)

return query_job, iterator

Expand Down Expand Up @@ -729,15 +735,7 @@ def query_schemas():

return self._retry_and_handle(msg="list dataset", conn=conn, fn=query_schemas)

def _query_and_results(
self,
client,
sql,
job_params,
job_creation_timeout=None,
job_execution_timeout=None,
limit: Optional[int] = None,
):
def _query_and_results(self, client, sql, job_params, job_creation_timeout=None):
"""Query the client and wait for results."""
# Cannot reuse job_config if destination is set and ddl is used
job_config = google.cloud.bigquery.QueryJobConfig(**job_params)
Expand All @@ -750,7 +748,11 @@ def _query_and_results(
logger.debug(
self._bq_job_link(query_job.location, query_job.project, query_job.job_id)
)
return query_job

def _wait_for_results(
self, query_job, job_execution_timeout=None, limit: Optional[int] = None
):
# only use async logic if user specifies a timeout
if job_execution_timeout:
loop = asyncio.new_event_loop()
Expand Down Expand Up @@ -787,7 +789,7 @@ def reopen_conn_on_error(error):
target=fn,
predicate=_ErrorCounter(self.get_job_retries(conn)).count_error,
sleep_generator=self._retry_generator(),
deadline=self.get_job_retry_deadline_seconds(conn),
timeout=self.get_job_retry_deadline_seconds(conn),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just that the previous field is deprecated in the driver, it's the same behavior

on_error=reopen_conn_on_error,
)

Expand Down
26 changes: 10 additions & 16 deletions tests/unit/test_bigquery_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def setUp(self):
self.connections = BigQueryConnectionManager(profile=profile)

self.mock_client = Mock(dbt.adapters.bigquery.impl.google.cloud.bigquery.Client)
self.query_job = Mock(dbt.adapters.bigquery.impl.google.cloud.bigquery.QueryJob)
self.mock_connection = MagicMock()

self.mock_connection.handle = self.mock_client
Expand Down Expand Up @@ -114,7 +115,6 @@ def test_query_and_results(self, mock_bq):
"sql",
{"job_param_1": "blah"},
job_creation_timeout=15,
job_execution_timeout=3,
)

mock_bq.QueryJobConfig.assert_called_once()
Expand All @@ -123,23 +123,17 @@ def test_query_and_results(self, mock_bq):
)

@patch("dbt.adapters.bigquery.impl.google.cloud.bigquery")
def test_query_and_results_timeout(self, mock_bq):
self.mock_client.query = Mock(
return_value=Mock(result=lambda *args, **kwargs: time.sleep(4))
)
def test_wait_for_results_timeout(self, mock_bq):
def mock_result(max_results):
time.sleep(3)
return None

self.query_job.result = MagicMock(side_effect=mock_result)

with pytest.raises(dbt.exceptions.DbtRuntimeError) as exc:
self.connections._query_and_results(
self.mock_client,
"sql",
{"job_param_1": "blah"},
job_creation_timeout=15,
job_execution_timeout=1,
)
self.connections._wait_for_results(self.query_job, job_execution_timeout=1, limit=100)

mock_bq.QueryJobConfig.assert_called_once()
self.mock_client.query.assert_called_once_with(
query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15
)
self.query_job.result.assert_called_once_with(max_results=100)
assert "Query exceeded configured timeout of 1s" in str(exc.value)

def test_copy_bq_table_appends(self):
Expand Down