Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a27063c
feat: add automatic retry for transient dbt command errors
devin-ai-integration[bot] Feb 27, 2026
07045ae
fix: guard _build_haystack against non-string arguments
devin-ai-integration[bot] Feb 27, 2026
bd25d9b
fix: address CodeRabbit review feedback
devin-ai-integration[bot] Feb 27, 2026
7594348
fix: address CodeRabbit round 2 feedback
devin-ai-integration[bot] Feb 27, 2026
74a436b
fix: address CodeRabbit round 3 feedback
devin-ai-integration[bot] Feb 27, 2026
2fb59ee
fix: remove unused imports and fix isort ordering in test_retry_logic
devin-ai-integration[bot] Feb 27, 2026
d2edbd0
style: fix black formatting in test_retry_logic imports
devin-ai-integration[bot] Feb 27, 2026
f6a91db
test: add early retry success test case
devin-ai-integration[bot] Feb 27, 2026
02521ed
fix: restore original capture_output passthrough to preserve streamin…
devin-ai-integration[bot] Feb 27, 2026
6ac8a9d
fix: always capture output for transient detection, print to terminal…
devin-ai-integration[bot] Feb 27, 2026
0c5f1e5
fix: guard sys.stdout/stderr.write with isinstance check
devin-ai-integration[bot] Feb 27, 2026
24e593c
refactor: always set --log-format and always capture output, make cap…
devin-ai-integration[bot] Feb 27, 2026
42b24da
fix: update test_alerts_fetcher positional indices for --log-format p…
devin-ai-integration[bot] Feb 27, 2026
0d2673d
fix: parse output regardless of log_format, not just json
devin-ai-integration[bot] Feb 27, 2026
05a2e45
fix: add BigQuery 409 duplicate job ID to transient error patterns
devin-ai-integration[bot] Feb 27, 2026
b5de019
fix: narrow BigQuery 409 pattern to 'error 409' instead of generic 'a…
devin-ai-integration[bot] Feb 28, 2026
dcd8a9f
refactor: simplify retry flow with _inner_run_command_with_retries
haritamar Feb 28, 2026
532e592
style: fix black formatting for is_transient_error call
devin-ai-integration[bot] Feb 28, 2026
54005bc
docs: fix docstring for target=None in is_transient_error (all patter…
devin-ai-integration[bot] Feb 28, 2026
3aeaf01
Merge remote-tracking branch 'origin/master' into devin/1772197495-ad…
devin-ai-integration[bot] Feb 28, 2026
91e41b3
feat: resolve adapter type from profiles.yml for transient error dete…
devin-ai-integration[bot] Feb 28, 2026
4866840
refactor: simplify _get_adapter_type — remove broad try/except, strea…
devin-ai-integration[bot] Feb 28, 2026
ff057ff
refactor: rename target→adapter_type in is_transient_error signature
devin-ai-integration[bot] Feb 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion elementary/clients/dbt/api_dbt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class APIDbtRunner(CommandLineDbtRunner):
def _inner_run_command(
self,
dbt_command_args: List[str],
capture_output: bool,
quiet: bool,
log_output: bool,
log_format: str,
Expand Down
255 changes: 211 additions & 44 deletions elementary/clients/dbt/command_line_dbt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,50 @@
from typing import Any, Dict, List, Optional

import yaml
from tenacity import (
RetryCallState,
retry,
retry_if_exception,
stop_after_attempt,
wait_exponential,
)

from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
from elementary.clients.dbt.dbt_log import parse_dbt_output
from elementary.clients.dbt.transient_errors import is_transient_error
from elementary.exceptions.exceptions import DbtCommandError, DbtLsCommandError
from elementary.monitor.dbt_project_utils import is_dbt_package_up_to_date
from elementary.utils.env_vars import is_debug
from elementary.utils.log import get_logger

logger = get_logger(__name__)

# Retry configuration for transient errors.
_TRANSIENT_MAX_RETRIES = 3
_TRANSIENT_WAIT_MULTIPLIER = 10 # seconds
_TRANSIENT_WAIT_MAX = 60 # seconds


class DbtTransientError(Exception):
"""Raised internally to signal a transient dbt failure that should be retried."""

def __init__(self, result: "DbtCommandResult", message: str) -> None:
super().__init__(message)
self.result = result


def _before_retry_log(retry_state: RetryCallState) -> None:
"""Log before each retry. Reads log_command_args from the retried call."""
log_command_args = retry_state.kwargs.get("log_command_args", [])
attempt = retry_state.attempt_number
logger.warning(
"Transient error detected for dbt command '%s' (attempt %d/%d). Retrying...",
" ".join(log_command_args),
attempt,
_TRANSIENT_MAX_RETRIES,
)


MACRO_RESULT_PATTERN = re.compile(
"Elementary: --ELEMENTARY-MACRO-OUTPUT-START--(.*)--ELEMENTARY-MACRO-OUTPUT-END--"
)
Expand Down Expand Up @@ -50,17 +84,78 @@ def __init__(
secret_vars,
allow_macros_without_package_prefix,
)
self.adapter_type = self._get_adapter_type()
self.raise_on_failure = raise_on_failure
self.env_vars = env_vars
if force_dbt_deps:
self.deps()
elif run_deps_if_needed:
self._run_deps_if_needed()

def _get_adapter_type(self) -> Optional[str]:
"""Resolve the adapter type from ``profiles.yml``.

Reads the profile name from ``dbt_project.yml``, then looks up the
selected target in ``profiles.yml`` to extract its ``type`` field
(e.g. ``"bigquery"``, ``"snowflake"``).

Returns ``None`` when profiles.yml or the expected keys are missing.
"""
Comment thread
haritamar marked this conversation as resolved.
profiles_dir = (
self.profiles_dir
if self.profiles_dir
else os.path.join(os.path.expanduser("~"), ".dbt")
)
profiles_path = os.path.join(profiles_dir, "profiles.yml")
if not os.path.exists(profiles_path):
logger.debug("profiles.yml not found at %s", profiles_path)
return None

with open(profiles_path) as f:
profiles = yaml.safe_load(f)

# Read dbt_project.yml to get the profile name.
dbt_project_path = os.path.join(self.project_dir, "dbt_project.yml")
if not os.path.exists(dbt_project_path):
logger.debug("dbt_project.yml not found at %s", dbt_project_path)
return None

with open(dbt_project_path) as f:
dbt_project = yaml.safe_load(f)

profile_name = dbt_project.get("profile")
if not profile_name:
logger.debug("No profile name found in dbt_project.yml")
return None

profile = profiles.get(profile_name) if profiles else None
if not profile:
logger.debug("Profile '%s' not found in profiles.yml", profile_name)
return None

# Determine which target to use.
target_name = self.target or profile.get("target")
if not target_name:
logger.debug("No target specified and no default target in profile")
return None

target_config = profile.get("outputs", {}).get(target_name)
if not target_config:
logger.debug("Target '%s' not found in profile outputs", target_name)
return None

adapter_type = target_config.get("type")
if adapter_type:
logger.debug(
"Resolved adapter type '%s' for target '%s'",
adapter_type,
target_name,
)
return adapter_type

def _inner_run_command(
self,
dbt_command_args: List[str],
capture_output: bool,
quiet: bool,
log_output: bool,
log_format: str,
Expand All @@ -75,15 +170,13 @@ def _parse_ls_command_result(
def _run_command(
self,
command_args: List[str],
capture_output: bool = False,
log_format: str = "json",
vars: Optional[dict] = None,
quiet: bool = False,
log_output: bool = True,
) -> DbtCommandResult:
dbt_command_args = []
if capture_output:
dbt_command_args.extend(["--log-format", log_format])
dbt_command_args.extend(["--log-format", log_format])
dbt_command_args.extend(command_args)
dbt_command_args.extend(["--project-dir", os.path.abspath(self.project_dir)])
if self.profiles_dir:
Expand Down Expand Up @@ -112,28 +205,108 @@ def _run_command(
else:
logger.debug(log_msg)

result = self._inner_run_command(
dbt_command_args,
capture_output=capture_output,
quiet=quiet,
log_output=log_output,
log_format=log_format,
)

if capture_output and result.output:
try:
return self._inner_run_command_with_retries(
dbt_command_args=dbt_command_args,
log_command_args=log_command_args,
quiet=quiet,
log_output=log_output,
log_format=log_format,
)
except DbtTransientError as exc:
logger.exception(
"dbt command '%s' failed after %d attempts due to transient errors.",
" ".join(log_command_args),
_TRANSIENT_MAX_RETRIES,
)
if isinstance(exc.__cause__, DbtCommandError):
raise exc.__cause__ from exc
return exc.result

@retry(
retry=retry_if_exception(lambda exc: isinstance(exc, DbtTransientError)),
stop=stop_after_attempt(_TRANSIENT_MAX_RETRIES),
wait=wait_exponential(
multiplier=_TRANSIENT_WAIT_MULTIPLIER,
max=_TRANSIENT_WAIT_MAX,
),
before_sleep=_before_retry_log,
reraise=True,
)
def _inner_run_command_with_retries(
self,
dbt_command_args: List[str],
log_command_args: List[str],
quiet: bool,
log_output: bool,
log_format: str,
) -> DbtCommandResult:
"""Run one dbt command attempt. Raises DbtTransientError for transient failures so tenacity can retry."""
try:
result = self._inner_run_command(
dbt_command_args,
quiet=quiet,
log_output=log_output,
log_format=log_format,
)
except DbtCommandError as exc:
output_text = str(exc)
stderr_text: Optional[str] = None
if exc.proc_err is not None:
if exc.proc_err.output:
output_text = (
exc.proc_err.output.decode()
if isinstance(exc.proc_err.output, bytes)
else str(exc.proc_err.output)
)
if exc.proc_err.stderr:
stderr_text = (
exc.proc_err.stderr.decode()
if isinstance(exc.proc_err.stderr, bytes)
else str(exc.proc_err.stderr)
)
if is_transient_error(
self.adapter_type, output=output_text, stderr=stderr_text
):
raise DbtTransientError(
result=DbtCommandResult(
success=False,
output=output_text,
stderr=stderr_text,
),
message=f"Transient error during dbt command: {exc}",
) from exc
raise

if result.output:
logger.debug(
f"Result bytes size for command '{log_command_args}' is {len(result.output)}"
"Result bytes size for command '%s' is %d",
" ".join(log_command_args),
len(result.output),
)
if log_output or is_debug():
for log in parse_dbt_output(result.output, log_format):
logger.info(log.msg)

if not result.success and is_transient_error(
self.adapter_type, output=result.output, stderr=result.stderr
):
raise DbtTransientError(
result=result,
message=(
f"Transient error during dbt command: "
f"{' '.join(log_command_args)}"
),
)

return result

def deps(self, quiet: bool = False, capture_output: bool = True) -> bool:
result = self._run_command(
command_args=["deps"], quiet=quiet, capture_output=capture_output
)
def deps(
self,
quiet: bool = False,
capture_output: bool = True, # Deprecated: no-op, kept for backward compatibility.
) -> bool:
result = self._run_command(command_args=["deps"], quiet=quiet)
return result.success

def seed(self, select: Optional[str] = None, full_refresh: bool = False) -> bool:
Expand All @@ -152,7 +325,7 @@ def snapshot(self) -> bool:
def run_operation(
self,
macro_name: str,
capture_output: bool = True,
capture_output: bool = True, # Deprecated: no-op, kept for backward compatibility.
macro_args: Optional[dict] = None,
log_errors: bool = True,
vars: Optional[dict] = None,
Expand All @@ -177,7 +350,6 @@ def run_operation(
command_args.extend(["--args", json_args])
result = self._run_command(
command_args=command_args,
capture_output=capture_output,
vars=vars,
quiet=quiet,
log_output=log_output,
Expand All @@ -191,23 +363,22 @@ def run_operation(
log_pattern = (
RAW_EDR_LOGS_PATTERN if return_raw_edr_logs else MACRO_RESULT_PATTERN
)
if capture_output:
if result.output is not None:
for log in parse_dbt_output(result.output):
if log_errors and log.level == "error":
logger.error(log.msg)
continue

if log.msg:
match = log_pattern.match(log.msg)
if match:
run_operation_results.append(match.group(1))

if result.stderr is not None and log_errors:
for log in parse_dbt_output(result.stderr):
if log.level == "error":
logger.error(log.msg)
continue
if result.output is not None:
for log in parse_dbt_output(result.output):
if log_errors and log.level == "error":
logger.error(log.msg)
continue

if log.msg:
match = log_pattern.match(log.msg)
if match:
run_operation_results.append(match.group(1))

if result.stderr is not None and log_errors:
for log in parse_dbt_output(result.stderr):
if log.level == "error":
logger.error(log.msg)
continue

return run_operation_results

Expand All @@ -218,7 +389,7 @@ def run(
full_refresh: bool = False,
vars: Optional[dict] = None,
quiet: bool = False,
capture_output: bool = False,
capture_output: bool = False, # Deprecated: no-op, kept for backward compatibility.
) -> bool:
command_args = ["run"]
if full_refresh:
Expand All @@ -231,7 +402,6 @@ def run(
command_args=command_args,
vars=vars,
quiet=quiet,
capture_output=capture_output,
)
return result.success

Expand All @@ -240,7 +410,7 @@ def test(
select: Optional[str] = None,
vars: Optional[dict] = None,
quiet: bool = False,
capture_output: bool = False,
capture_output: bool = False, # Deprecated: no-op, kept for backward compatibility.
) -> bool:
command_args = ["test"]
if select:
Expand All @@ -249,7 +419,6 @@ def test(
command_args=command_args,
vars=vars,
quiet=quiet,
capture_output=capture_output,
)
return result.success

Expand All @@ -266,9 +435,7 @@ def ls(self, select: Optional[str] = None) -> list:
if select:
command_args.extend(["-s", select])
try:
result = self._run_command(
command_args=command_args, capture_output=True, log_format="text"
)
result = self._run_command(command_args=command_args, log_format="text")
return self._parse_ls_command_result(select, result)
except DbtCommandError:
raise DbtLsCommandError(select)
Expand Down
Loading
Loading