diff --git a/.changes/unreleased/Fixes-20241120-163101.yaml b/.changes/unreleased/Fixes-20241120-163101.yaml new file mode 100644 index 000000000..ba1f4e937 --- /dev/null +++ b/.changes/unreleased/Fixes-20241120-163101.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: Fix issue where dbt-bigquery was not retrying in certain retryable scenarios, + e.g. 503's +time: 2024-11-20T16:31:01.60689-05:00 +custom: + Author: mikealfare + Issue: "682" diff --git a/dbt/adapters/bigquery/clients.py b/dbt/adapters/bigquery/clients.py index 18c59fc12..722266240 100644 --- a/dbt/adapters/bigquery/clients.py +++ b/dbt/adapters/bigquery/clients.py @@ -1,10 +1,10 @@ from google.api_core.client_info import ClientInfo from google.api_core.client_options import ClientOptions -from google.api_core.retry import Retry from google.auth.exceptions import DefaultCredentialsError -from google.cloud.bigquery import Client as BigQueryClient +from google.cloud.bigquery import Client as BigQueryClient, DEFAULT_RETRY as BQ_DEFAULT_RETRY from google.cloud.dataproc_v1 import BatchControllerClient, JobControllerClient from google.cloud.storage import Client as StorageClient +from google.cloud.storage.retry import DEFAULT_RETRY as GCS_DEFAULT_RETRY from dbt.adapters.events.logging import AdapterLogger @@ -28,7 +28,7 @@ def create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient: return _create_bigquery_client(credentials) -@Retry() # google decorator. retries on transient errors with exponential backoff +@GCS_DEFAULT_RETRY def create_gcs_client(credentials: BigQueryCredentials) -> StorageClient: return StorageClient( project=credentials.execution_project, @@ -36,7 +36,7 @@ def create_gcs_client(credentials: BigQueryCredentials) -> StorageClient: ) -@Retry() # google decorator. retries on transient errors with exponential backoff +# dataproc does not appear to have a default retry like BQ and GCS def create_dataproc_job_controller_client(credentials: BigQueryCredentials) -> JobControllerClient: return JobControllerClient( credentials=create_google_credentials(credentials), @@ -44,7 +44,7 @@ def create_dataproc_job_controller_client(credentials: BigQueryCredentials) -> J ) -@Retry() # google decorator. retries on transient errors with exponential backoff +# dataproc does not appear to have a default retry like BQ and GCS def create_dataproc_batch_controller_client( credentials: BigQueryCredentials, ) -> BatchControllerClient: @@ -54,7 +54,7 @@ def create_dataproc_batch_controller_client( ) -@Retry() # google decorator. retries on transient errors with exponential backoff +@BQ_DEFAULT_RETRY def _create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient: return BigQueryClient( credentials.execution_project, diff --git a/dbt/adapters/bigquery/retry.py b/dbt/adapters/bigquery/retry.py index 2cbdaa245..cc197a7d3 100644 --- a/dbt/adapters/bigquery/retry.py +++ b/dbt/adapters/bigquery/retry.py @@ -2,7 +2,7 @@ from google.api_core.future.polling import DEFAULT_POLLING from google.api_core.retry import Retry -from google.cloud.bigquery.retry import DEFAULT_RETRY, _job_should_retry +from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY, _job_should_retry from requests.exceptions import ConnectionError from dbt.adapters.contracts.connection import Connection, ConnectionState @@ -15,14 +15,8 @@ _logger = AdapterLogger("BigQuery") - -_SECOND = 1.0 -_MINUTE = 60 * _SECOND -_HOUR = 60 * _MINUTE -_DAY = 24 * _HOUR -_DEFAULT_INITIAL_DELAY = _SECOND -_DEFAULT_MAXIMUM_DELAY = 3 * _SECOND -_DEFAULT_POLLING_MAXIMUM_DELAY = 10 * _SECOND +_MINUTE = 60.0 +_DAY = 24 * 60 * 60.0 class RetryFactory: @@ -44,7 +38,7 @@ def create_job_execution_timeout(self, fallback: float = _DAY) -> float: ) # keep _DAY here so it's not overridden by passing fallback=None def create_retry(self, fallback: Optional[float] = None) -> Retry: - return DEFAULT_RETRY.with_timeout(self._job_execution_timeout or fallback or _DAY) + return DEFAULT_JOB_RETRY.with_timeout(self._job_execution_timeout or fallback or _DAY) def create_polling(self, model_timeout: Optional[float] = None) -> Retry: return DEFAULT_POLLING.with_timeout(model_timeout or self._job_execution_timeout or _DAY) @@ -53,14 +47,21 @@ def create_reopen_with_deadline(self, connection: Connection) -> Retry: """ This strategy mimics what was accomplished with _retry_and_handle """ - return Retry( - predicate=_DeferredException(self._retries), - initial=_DEFAULT_INITIAL_DELAY, - maximum=_DEFAULT_MAXIMUM_DELAY, - deadline=self._job_deadline, - on_error=_create_reopen_on_error(connection), + + retry = DEFAULT_JOB_RETRY.with_delay(maximum=3.0).with_predicate( + _DeferredException(self._retries) ) + # there is no `with_on_error` method, but we want to retain the defaults on `DEFAULT_JOB_RETRY + retry._on_error = _create_reopen_on_error(connection) + + # don't override the default deadline to None if the user did not provide one, + # the process will never end + if deadline := self._job_deadline: + return retry.with_deadline(deadline) + + return retry + class _DeferredException: """