Skip to content

Commit

Permalink
Retry on 503 (#1408) (#1445)
Browse files Browse the repository at this point in the history
* Retry on 503 (#1408)

* add default retry on all client factories, which includes 502 and 503 errors
* update retries to use defaults and ensure that a timeout or deadline is set

(cherry picked from commit a219818)

* remove hatch.toml

---------

Co-authored-by: Mike Alfare <[email protected]>
  • Loading branch information
colin-rogers-dbt and mikealfare authored Jan 10, 2025
1 parent 069472d commit 655ff1a
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20241120-163101.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Fix issue where dbt-bigquery was not retrying in certain retryable scenarios,
e.g. 503's
time: 2024-11-20T16:31:01.60689-05:00
custom:
Author: mikealfare
Issue: "682"
12 changes: 6 additions & 6 deletions dbt/adapters/bigquery/clients.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from google.api_core.client_info import ClientInfo
from google.api_core.client_options import ClientOptions
from google.api_core.retry import Retry
from google.auth.exceptions import DefaultCredentialsError
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.bigquery import Client as BigQueryClient, DEFAULT_RETRY as BQ_DEFAULT_RETRY
from google.cloud.dataproc_v1 import BatchControllerClient, JobControllerClient
from google.cloud.storage import Client as StorageClient
from google.cloud.storage.retry import DEFAULT_RETRY as GCS_DEFAULT_RETRY

from dbt.adapters.events.logging import AdapterLogger

Expand All @@ -28,23 +28,23 @@ def create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
return _create_bigquery_client(credentials)


@Retry() # google decorator. retries on transient errors with exponential backoff
@GCS_DEFAULT_RETRY
def create_gcs_client(credentials: BigQueryCredentials) -> StorageClient:
return StorageClient(
project=credentials.execution_project,
credentials=create_google_credentials(credentials),
)


@Retry() # google decorator. retries on transient errors with exponential backoff
# dataproc does not appear to have a default retry like BQ and GCS
def create_dataproc_job_controller_client(credentials: BigQueryCredentials) -> JobControllerClient:
return JobControllerClient(
credentials=create_google_credentials(credentials),
client_options=ClientOptions(api_endpoint=_dataproc_endpoint(credentials)),
)


@Retry() # google decorator. retries on transient errors with exponential backoff
# dataproc does not appear to have a default retry like BQ and GCS
def create_dataproc_batch_controller_client(
credentials: BigQueryCredentials,
) -> BatchControllerClient:
Expand All @@ -54,7 +54,7 @@ def create_dataproc_batch_controller_client(
)


@Retry() # google decorator. retries on transient errors with exponential backoff
@BQ_DEFAULT_RETRY
def _create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
return BigQueryClient(
credentials.execution_project,
Expand Down
33 changes: 17 additions & 16 deletions dbt/adapters/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from google.api_core.future.polling import DEFAULT_POLLING
from google.api_core.retry import Retry
from google.cloud.bigquery.retry import DEFAULT_RETRY, _job_should_retry
from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY, _job_should_retry
from requests.exceptions import ConnectionError

from dbt.adapters.contracts.connection import Connection, ConnectionState
Expand All @@ -15,14 +15,8 @@

_logger = AdapterLogger("BigQuery")


_SECOND = 1.0
_MINUTE = 60 * _SECOND
_HOUR = 60 * _MINUTE
_DAY = 24 * _HOUR
_DEFAULT_INITIAL_DELAY = _SECOND
_DEFAULT_MAXIMUM_DELAY = 3 * _SECOND
_DEFAULT_POLLING_MAXIMUM_DELAY = 10 * _SECOND
_MINUTE = 60.0
_DAY = 24 * 60 * 60.0


class RetryFactory:
Expand All @@ -44,7 +38,7 @@ def create_job_execution_timeout(self, fallback: float = _DAY) -> float:
) # keep _DAY here so it's not overridden by passing fallback=None

def create_retry(self, fallback: Optional[float] = None) -> Retry:
return DEFAULT_RETRY.with_timeout(self._job_execution_timeout or fallback or _DAY)
return DEFAULT_JOB_RETRY.with_timeout(self._job_execution_timeout or fallback or _DAY)

def create_polling(self, model_timeout: Optional[float] = None) -> Retry:
return DEFAULT_POLLING.with_timeout(model_timeout or self._job_execution_timeout or _DAY)
Expand All @@ -53,14 +47,21 @@ def create_reopen_with_deadline(self, connection: Connection) -> Retry:
"""
This strategy mimics what was accomplished with _retry_and_handle
"""
return Retry(
predicate=_DeferredException(self._retries),
initial=_DEFAULT_INITIAL_DELAY,
maximum=_DEFAULT_MAXIMUM_DELAY,
deadline=self._job_deadline,
on_error=_create_reopen_on_error(connection),

retry = DEFAULT_JOB_RETRY.with_delay(maximum=3.0).with_predicate(
_DeferredException(self._retries)
)

# there is no `with_on_error` method, but we want to retain the defaults on `DEFAULT_JOB_RETRY
retry._on_error = _create_reopen_on_error(connection)

# don't override the default deadline to None if the user did not provide one,
# the process will never end
if deadline := self._job_deadline:
return retry.with_deadline(deadline)

return retry


class _DeferredException:
"""
Expand Down

0 comments on commit 655ff1a

Please sign in to comment.