Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a27063c
feat: add automatic retry for transient dbt command errors
devin-ai-integration[bot] Feb 27, 2026
07045ae
fix: guard _build_haystack against non-string arguments
devin-ai-integration[bot] Feb 27, 2026
bd25d9b
fix: address CodeRabbit review feedback
devin-ai-integration[bot] Feb 27, 2026
7594348
fix: address CodeRabbit round 2 feedback
devin-ai-integration[bot] Feb 27, 2026
74a436b
fix: address CodeRabbit round 3 feedback
devin-ai-integration[bot] Feb 27, 2026
2fb59ee
fix: remove unused imports and fix isort ordering in test_retry_logic
devin-ai-integration[bot] Feb 27, 2026
d2edbd0
style: fix black formatting in test_retry_logic imports
devin-ai-integration[bot] Feb 27, 2026
f6a91db
test: add early retry success test case
devin-ai-integration[bot] Feb 27, 2026
02521ed
fix: restore original capture_output passthrough to preserve streamin…
devin-ai-integration[bot] Feb 27, 2026
6ac8a9d
fix: always capture output for transient detection, print to terminal…
devin-ai-integration[bot] Feb 27, 2026
0c5f1e5
fix: guard sys.stdout/stderr.write with isinstance check
devin-ai-integration[bot] Feb 27, 2026
24e593c
refactor: always set --log-format and always capture output, make cap…
devin-ai-integration[bot] Feb 27, 2026
42b24da
fix: update test_alerts_fetcher positional indices for --log-format p…
devin-ai-integration[bot] Feb 27, 2026
0d2673d
fix: parse output regardless of log_format, not just json
devin-ai-integration[bot] Feb 27, 2026
05a2e45
fix: add BigQuery 409 duplicate job ID to transient error patterns
devin-ai-integration[bot] Feb 27, 2026
b5de019
fix: narrow BigQuery 409 pattern to 'error 409' instead of generic 'a…
devin-ai-integration[bot] Feb 28, 2026
dcd8a9f
refactor: simplify retry flow with _inner_run_command_with_retries
haritamar Feb 28, 2026
532e592
style: fix black formatting for is_transient_error call
devin-ai-integration[bot] Feb 28, 2026
54005bc
docs: fix docstring for target=None in is_transient_error (all patter…
devin-ai-integration[bot] Feb 28, 2026
3aeaf01
Merge remote-tracking branch 'origin/master' into devin/1772197495-ad…
devin-ai-integration[bot] Feb 28, 2026
91e41b3
feat: resolve adapter type from profiles.yml for transient error dete…
devin-ai-integration[bot] Feb 28, 2026
4866840
refactor: simplify _get_adapter_type — remove broad try/except, strea…
devin-ai-integration[bot] Feb 28, 2026
ff057ff
refactor: rename target→adapter_type in is_transient_error signature
devin-ai-integration[bot] Feb 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 148 additions & 9 deletions elementary/clients/dbt/command_line_dbt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,38 @@
from typing import Any, Dict, List, Optional

import yaml
from tenacity import (
RetryCallState,
retry,
retry_if_exception,
stop_after_attempt,
wait_exponential,
)

from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
from elementary.clients.dbt.dbt_log import parse_dbt_output
from elementary.clients.dbt.transient_errors import is_transient_error
from elementary.exceptions.exceptions import DbtCommandError, DbtLsCommandError
from elementary.monitor.dbt_project_utils import is_dbt_package_up_to_date
from elementary.utils.env_vars import is_debug
from elementary.utils.log import get_logger

logger = get_logger(__name__)

# Retry configuration for transient errors.
_TRANSIENT_MAX_RETRIES = 3
_TRANSIENT_WAIT_MULTIPLIER = 10 # seconds
_TRANSIENT_WAIT_MAX = 60 # seconds


class DbtTransientError(Exception):
"""Raised internally to signal a transient dbt failure that should be retried."""

def __init__(self, result: "DbtCommandResult", message: str) -> None:
super().__init__(message)
self.result = result


MACRO_RESULT_PATTERN = re.compile(
"Elementary: --ELEMENTARY-MACRO-OUTPUT-START--(.*)--ELEMENTARY-MACRO-OUTPUT-END--"
)
Expand Down Expand Up @@ -112,23 +134,140 @@ def _run_command(
else:
logger.debug(log_msg)

result = self._inner_run_command(
dbt_command_args,
result = self._execute_inner_command(
dbt_command_args=dbt_command_args,
log_command_args=log_command_args,
capture_output=capture_output,
quiet=quiet,
log_output=log_output,
log_format=log_format,
)

if capture_output and result.output:
logger.debug(
f"Result bytes size for command '{log_command_args}' is {len(result.output)}"
return result

def _execute_inner_command(
self,
dbt_command_args: List[str],
log_command_args: List[str],
capture_output: bool,
quiet: bool,
log_output: bool,
log_format: str,
) -> DbtCommandResult:
"""Execute ``_inner_run_command`` with automatic retries for transient errors.

This method wraps the actual command execution, checks the result
for known transient error patterns (per adapter), and retries using
``tenacity`` when appropriate. Non-transient failures are returned
immediately without retrying.
"""

def _before_retry(retry_state: RetryCallState) -> None:
attempt = retry_state.attempt_number
logger.warning(
"Transient error detected for dbt command '%s' "
"(attempt %d/%d). Retrying...",
" ".join(log_command_args),
attempt,
_TRANSIENT_MAX_RETRIES,
)
if log_output or is_debug():
for log in parse_dbt_output(result.output, log_format):
logger.info(log.msg)

return result
@retry(
retry=retry_if_exception(lambda exc: isinstance(exc, DbtTransientError)),
stop=stop_after_attempt(_TRANSIENT_MAX_RETRIES),
wait=wait_exponential(
multiplier=_TRANSIENT_WAIT_MULTIPLIER, max=_TRANSIENT_WAIT_MAX
),
before_sleep=_before_retry,
reraise=True,
)
def _attempt() -> DbtCommandResult:
# Always capture output so transient-error detection can inspect
# stdout/stderr. The original ``capture_output`` flag is still
# honoured for logging behaviour (see below).
try:
result = self._inner_run_command(
dbt_command_args,
capture_output=True,
quiet=quiet,
log_output=log_output,
log_format=log_format,
)
except DbtCommandError as exc:
# DbtCommandError is raised when raise_on_failure=True and
# the command exits with a non-zero return code. Extract
# actual output/stderr from the underlying process error
# for more accurate transient-error detection.
output_text = str(exc)
stderr_text: Optional[str] = None
if exc.proc_err is not None:
if exc.proc_err.output:
output_text = (
exc.proc_err.output.decode()
if isinstance(exc.proc_err.output, bytes)
else str(exc.proc_err.output)
)
if exc.proc_err.stderr:
stderr_text = (
exc.proc_err.stderr.decode()
if isinstance(exc.proc_err.stderr, bytes)
else str(exc.proc_err.stderr)
)
if is_transient_error(
self.target, output=output_text, stderr=stderr_text
):
raise DbtTransientError(
result=DbtCommandResult(
success=False,
output=output_text,
stderr=stderr_text,
),
message=(f"Transient error during dbt command: {exc}"),
) from exc
raise

if capture_output and result.output:
Comment thread
haritamar marked this conversation as resolved.
Outdated
logger.debug(
"Result bytes size for command '%s' is %d",
log_command_args,
len(result.output),
)
if log_output or is_debug():
for log in parse_dbt_output(result.output, log_format):
logger.info(log.msg)

# Command completed but may have failed (raise_on_failure=False).
if not result.success and is_transient_error(
self.target, output=result.output, stderr=result.stderr
):
raise DbtTransientError(
result=result,
message=(
f"Transient error during dbt command: "
f"{' '.join(log_command_args)}"
),
)

return result

try:
return _attempt()
except DbtTransientError as exc:
# All retry attempts exhausted.
logger.exception(
"dbt command '%s' failed after %d attempts due to " "transient errors.",
" ".join(log_command_args),
_TRANSIENT_MAX_RETRIES,
)
# Preserve the raise_on_failure contract: if the original
# failure was a DbtCommandError (i.e. raise_on_failure=True),
# re-raise it so callers relying on exception handling still
# see the expected exception type.
if isinstance(exc.__cause__, DbtCommandError):
raise exc.__cause__ from exc
# Otherwise (raise_on_failure=False path), return the last
# failed result so callers that check result.success work.
return exc.result

def deps(self, quiet: bool = False, capture_output: bool = True) -> bool:
result = self._run_command(
Expand Down
159 changes: 159 additions & 0 deletions elementary/clients/dbt/transient_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""Per-adapter transient error patterns for automatic retry.

Each adapter may produce transient errors that are safe to retry. This
module centralises those patterns so that the runner can decide whether a
failed dbt command should be retried transparently.

To add patterns for a new adapter, append a new entry to
``_ADAPTER_PATTERNS`` with the **adapter type** as key (e.g.
``"bigquery"``, ``"snowflake"``) and a tuple of **plain, lowercase**
substrings that appear in the error output. Matching is
case-insensitive substring search so regex is not needed.

Note: The ``target`` argument accepted by :func:`is_transient_error` may
be either the dbt adapter type *or* the profile target name (e.g.
``"dev"``, ``"prod"``). When it does not match any known adapter key,
**all** adapter patterns are checked defensively. This is safe because
adapter-specific error messages only appear in output from that adapter.
"""

from typing import Dict, Optional, Sequence, Tuple

# ---------------------------------------------------------------------------
# Per-adapter transient error substrings (all lowercase).
#
# A command failure is considered *transient* when the dbt output
# (stdout + stderr, lowercased) contains **any** of the substrings
# listed for the active adapter **or** in the ``_COMMON`` list.
# ---------------------------------------------------------------------------

_COMMON: Tuple[str, ...] = (
# Generic connection / HTTP errors that any adapter can surface.
"connection reset by peer",
"connection was closed",
"remotedisconnected",
"connectionerror",
"brokenpipeerror",
"connection aborted",
"read timed out",
)

_DATABRICKS_PATTERNS: Tuple[str, ...] = (
"temporarily_unavailable",
"504 gateway timeout",
"502 bad gateway",
"service unavailable",
)

_ADAPTER_PATTERNS: Dict[str, Tuple[str, ...]] = {
"bigquery": (
# Streaming-buffer delay after a streaming insert.
"streaming data from",
"is temporarily unavailable",
# Generic transient backend error (500).
"retrying may solve the problem",
"backenderror",
# Rate-limit / quota errors.
"exceeded rate limits",
"rateLimitExceeded".lower(),
"quota exceeded",
# Internal errors surfaced as 503 / "internal error".
"internal error encountered",
"503 service unavailable",
"http 503",
),
"snowflake": (
"could not connect to snowflake backend",
"authentication token has expired",
"incident id:",
"service is unavailable",
),
"redshift": (
"connection timed out",
"could not connect to the server",
"ssl syscall error",
),
"databricks": _DATABRICKS_PATTERNS,
"databricks_catalog": _DATABRICKS_PATTERNS,
"athena": (
"throttlingexception",
"toomanyrequestsexception",
"service unavailable",
),
"dremio": (
# Common patterns (remotedisconnected, connection was closed) already
# cover the most frequent Dremio transient errors. Add Dremio-specific
# patterns here as they are identified.
),
"postgres": (
"could not connect to server",
"connection timed out",
"server closed the connection unexpectedly",
"ssl syscall error",
),
"trino": (
"service unavailable",
"server returned http response code: 503",
),
"clickhouse": (
"connection timed out",
"broken pipe",
),
}

# Pre-computed union of all adapter-specific patterns for the unknown-target
# fallback path. Built once at import time to avoid repeated iteration.
_ALL_ADAPTER_PATTERNS: Tuple[str, ...] = tuple(
pattern for patterns in _ADAPTER_PATTERNS.values() for pattern in patterns
)


def is_transient_error(
target: Optional[str],
Comment thread
haritamar marked this conversation as resolved.
Outdated
output: Optional[str] = None,
stderr: Optional[str] = None,
) -> bool:
"""Return ``True`` if *output*/*stderr* contain a known transient error.

Parameters
----------
target:
The dbt adapter type (e.g. ``"bigquery"``, ``"snowflake"``) **or**
the dbt profile target name (e.g. ``"dev"``, ``"prod"``).
When the value matches a key in ``_ADAPTER_PATTERNS``, only that
adapter's patterns (plus ``_COMMON``) are used. When it does
**not** match any known adapter, **all** adapter patterns are
checked defensively to avoid missing transient errors.
When ``None`` only the common patterns are checked.
Comment thread
haritamar marked this conversation as resolved.
Outdated
output:
The captured stdout of the dbt command (may be ``None``).
stderr:
The captured stderr of the dbt command (may be ``None``).
"""
haystack = _build_haystack(output, stderr)
if not haystack:
return False

if isinstance(target, str):
adapter_patterns = _ADAPTER_PATTERNS.get(target.lower())
if adapter_patterns is not None:
# Known adapter — use common + adapter-specific patterns.
patterns: Sequence[str] = (*_COMMON, *adapter_patterns)
else:
# Unknown target key (e.g. profile target name). Check all adapters.
patterns = (*_COMMON, *_ALL_ADAPTER_PATTERNS)
else:
# No target provided; still check all adapters defensively.
patterns = (*_COMMON, *_ALL_ADAPTER_PATTERNS)

return any(pattern in haystack for pattern in patterns)


def _build_haystack(output: Optional[str] = None, stderr: Optional[str] = None) -> str:
"""Concatenate and lowercase *output* + *stderr* for matching."""
parts = []
if output and isinstance(output, str):
parts.append(output)
if stderr and isinstance(stderr, str):
parts.append(stderr)
return "\n".join(parts).lower()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ packaging = ">=20.9"
azure-storage-blob = ">=12.11.0"
pymsteams = ">=0.2.2,<1.0.0"
tabulate = ">= 0.9.0"
tenacity = ">=8.0,<10.0"
pytz = ">= 2025.1"

dbt-snowflake = {version = ">=0.20,<2.0.0", optional = true}
Expand Down
Loading
Loading