From 4d657c43ea7be2147fedac9f6371aea0f810fe7c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 25 Feb 2026 17:20:52 +0000 Subject: [PATCH 01/13] fix: retry run_query on empty run_operation result (flaky fusion test) DbtProject.run_query() blindly indexed [0] on the result of run_operation('elementary.render_run_query'), which intermittently returns an empty list on dbt-fusion when the macro output log line is not captured. This caused IndexError in tests like test_seed_group_attribute on fusion/bigquery. Add a retry loop (up to 3 attempts) with warning logs, and raise a clear RuntimeError with the query if all retries are exhausted. Co-Authored-By: unknown <> --- integration_tests/tests/dbt_project.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index ca45d2a09..4ca1904e3 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -59,14 +59,27 @@ def __init__( self.tmp_models_dir_path = self.models_dir_path / "tmp" self.seeds_dir_path = self.project_dir_path / "data" + _RUN_QUERY_MAX_RETRIES = 3 + def run_query(self, prerendered_query: str): - results = json.loads( - self.dbt_runner.run_operation( + for attempt in range(1, self._RUN_QUERY_MAX_RETRIES + 1): + run_operation_results = self.dbt_runner.run_operation( "elementary.render_run_query", macro_args={"prerendered_query": prerendered_query}, - )[0] + ) + if run_operation_results: + return json.loads(run_operation_results[0]) + logger.warning( + "run_operation('elementary.render_run_query') returned no output " + "(attempt %d/%d)", + attempt, + self._RUN_QUERY_MAX_RETRIES, + ) + raise RuntimeError( + f"run_operation('elementary.render_run_query') returned no output " + f"after {self._RUN_QUERY_MAX_RETRIES} attempts. " + f"Query: {prerendered_query!r}" ) - return results @staticmethod def read_table_query( From c975eefe906e538dad8e28ed1c801f523943e877 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 25 Feb 2026 17:25:16 +0000 Subject: [PATCH 02/13] fix: add short delay between run_query retries Co-Authored-By: unknown <> --- integration_tests/tests/dbt_project.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 4ca1904e3..e1875ff4b 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -1,5 +1,6 @@ import json import os +import time from contextlib import contextmanager, nullcontext from pathlib import Path from tempfile import NamedTemporaryFile @@ -60,6 +61,7 @@ def __init__( self.seeds_dir_path = self.project_dir_path / "data" _RUN_QUERY_MAX_RETRIES = 3 + _RUN_QUERY_RETRY_DELAY_SECONDS = 0.5 def run_query(self, prerendered_query: str): for attempt in range(1, self._RUN_QUERY_MAX_RETRIES + 1): @@ -75,6 +77,8 @@ def run_query(self, prerendered_query: str): attempt, self._RUN_QUERY_MAX_RETRIES, ) + if attempt < self._RUN_QUERY_MAX_RETRIES: + time.sleep(self._RUN_QUERY_RETRY_DELAY_SECONDS) raise RuntimeError( f"run_operation('elementary.render_run_query') returned no output " f"after {self._RUN_QUERY_MAX_RETRIES} attempts. " From f1f1ca5c8cef01ff72a98f7e8ff799664ed0fe19 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 01:46:22 +0000 Subject: [PATCH 03/13] fix: scope run_query retry to fusion runner only Co-Authored-By: unknown <> --- integration_tests/tests/dbt_project.py | 32 ++++++++++++++++---------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index e1875ff4b..312f5cbc6 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -60,28 +60,36 @@ def __init__( self.tmp_models_dir_path = self.models_dir_path / "tmp" self.seeds_dir_path = self.project_dir_path / "data" - _RUN_QUERY_MAX_RETRIES = 3 - _RUN_QUERY_RETRY_DELAY_SECONDS = 0.5 + # dbt-fusion occasionally drops the MACRO_RESULT_PATTERN log line, + # causing run_operation() to return an empty list. Retry only when + # running under fusion to work around this log-capture race. + _FUSION_RUN_QUERY_MAX_RETRIES = 3 + _FUSION_RUN_QUERY_RETRY_DELAY_SECONDS = 0.5 def run_query(self, prerendered_query: str): - for attempt in range(1, self._RUN_QUERY_MAX_RETRIES + 1): + max_attempts = ( + self._FUSION_RUN_QUERY_MAX_RETRIES + if self.runner_method == RunnerMethod.FUSION + else 1 + ) + for attempt in range(1, max_attempts + 1): run_operation_results = self.dbt_runner.run_operation( "elementary.render_run_query", macro_args={"prerendered_query": prerendered_query}, ) if run_operation_results: return json.loads(run_operation_results[0]) - logger.warning( - "run_operation('elementary.render_run_query') returned no output " - "(attempt %d/%d)", - attempt, - self._RUN_QUERY_MAX_RETRIES, - ) - if attempt < self._RUN_QUERY_MAX_RETRIES: - time.sleep(self._RUN_QUERY_RETRY_DELAY_SECONDS) + if attempt < max_attempts: + logger.warning( + "run_operation('elementary.render_run_query') returned no " + "output on fusion runner (attempt %d/%d, retrying)", + attempt, + max_attempts, + ) + time.sleep(self._FUSION_RUN_QUERY_RETRY_DELAY_SECONDS) raise RuntimeError( f"run_operation('elementary.render_run_query') returned no output " - f"after {self._RUN_QUERY_MAX_RETRIES} attempts. " + f"after {max_attempts} attempt(s). " f"Query: {prerendered_query!r}" ) From 43848978de44ed6adf5b9e65b5e38e81cddf087a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 02:24:18 +0000 Subject: [PATCH 04/13] fix: apply run_query retry to all runners (not fusion-only) CI evidence shows the empty run_operation issue also affects dbt-core on databricks_catalog (latest_official), not just dbt-fusion. Revert fusion-only scoping and apply retry universally. Co-Authored-By: unknown <> --- integration_tests/tests/dbt_project.py | 27 +++++++++++--------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 312f5cbc6..f829a5ec6 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -60,36 +60,31 @@ def __init__( self.tmp_models_dir_path = self.models_dir_path / "tmp" self.seeds_dir_path = self.project_dir_path / "data" - # dbt-fusion occasionally drops the MACRO_RESULT_PATTERN log line, - # causing run_operation() to return an empty list. Retry only when - # running under fusion to work around this log-capture race. - _FUSION_RUN_QUERY_MAX_RETRIES = 3 - _FUSION_RUN_QUERY_RETRY_DELAY_SECONDS = 0.5 + # run_operation() can intermittently return an empty list when the + # MACRO_RESULT_PATTERN log line is not captured from dbt's output. + # Observed on both dbt-fusion (bigquery) and dbt-core (databricks). + _RUN_QUERY_MAX_RETRIES = 3 + _RUN_QUERY_RETRY_DELAY_SECONDS = 0.5 def run_query(self, prerendered_query: str): - max_attempts = ( - self._FUSION_RUN_QUERY_MAX_RETRIES - if self.runner_method == RunnerMethod.FUSION - else 1 - ) - for attempt in range(1, max_attempts + 1): + for attempt in range(1, self._RUN_QUERY_MAX_RETRIES + 1): run_operation_results = self.dbt_runner.run_operation( "elementary.render_run_query", macro_args={"prerendered_query": prerendered_query}, ) if run_operation_results: return json.loads(run_operation_results[0]) - if attempt < max_attempts: + if attempt < self._RUN_QUERY_MAX_RETRIES: logger.warning( "run_operation('elementary.render_run_query') returned no " - "output on fusion runner (attempt %d/%d, retrying)", + "output (attempt %d/%d, retrying)", attempt, - max_attempts, + self._RUN_QUERY_MAX_RETRIES, ) - time.sleep(self._FUSION_RUN_QUERY_RETRY_DELAY_SECONDS) + time.sleep(self._RUN_QUERY_RETRY_DELAY_SECONDS) raise RuntimeError( f"run_operation('elementary.render_run_query') returned no output " - f"after {max_attempts} attempt(s). " + f"after {self._RUN_QUERY_MAX_RETRIES} attempts. " f"Query: {prerendered_query!r}" ) From 268d3d09ec06965dfddb9fb7142d059263caafc6 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 09:09:44 +0000 Subject: [PATCH 05/13] fix: bypass run_operation log parsing with direct adapter connection Replace the retry-based workaround for flaky run_operation() results with a direct dbt adapter connection that executes SQL queries without going through CLI log parsing. New AdapterQueryRunner helper class: - Instantiates a dbt adapter from project config - Resolves {{ ref('...') }} Jinja by reading the dbt manifest - Executes SQL directly via adapter.execute(sql, fetch=True) - Converts agate results to dicts matching agate_to_dicts behaviour This eliminates the root cause of flakiness (intermittent log capture failures in run_operation) rather than retrying around it. Co-Authored-By: unknown <> --- .../tests/adapter_query_runner.py | 156 ++++++++++++++++++ integration_tests/tests/dbt_project.py | 30 +--- 2 files changed, 159 insertions(+), 27 deletions(-) create mode 100644 integration_tests/tests/adapter_query_runner.py diff --git a/integration_tests/tests/adapter_query_runner.py b/integration_tests/tests/adapter_query_runner.py new file mode 100644 index 000000000..c83396d2a --- /dev/null +++ b/integration_tests/tests/adapter_query_runner.py @@ -0,0 +1,156 @@ +"""Direct database query execution via dbt adapter connection. + +Bypasses ``run_operation`` log-parsing entirely so that query results are +never lost due to intermittent log-capture issues in the CLI / fusion +runners. +""" + +import json +import multiprocessing +import os +import re +from datetime import date, datetime, time +from decimal import Decimal +from pathlib import Path +from typing import Any, Dict, List, Optional + +from logger import get_logger + +logger = get_logger(__name__) + +# Pattern that matches {{ ref('name') }} or {{ ref("name") }} with optional whitespace +_REF_PATTERN = re.compile(r"\{\{\s*ref\(\s*['\"]([^'\"]+)['\"]\s*\)\s*\}\}") + + +def _serialize_value(val: Any) -> Any: + """Mimic elementary's ``agate_to_dicts`` serialisation. + + * ``Decimal`` → ``int`` (no fractional part) or ``float`` + * ``datetime`` / ``date`` / ``time`` → ISO-format string + * Everything else is returned unchanged. + """ + if isinstance(val, Decimal): + # Match the Jinja macro: normalize, then int or float + normalized = val.normalize() + if normalized.as_tuple().exponent >= 0: + return int(normalized) + return float(normalized) + if isinstance(val, (datetime, date, time)): + return val.isoformat() + return val + + +class AdapterQueryRunner: + """Execute SQL directly through a dbt adapter connection. + + Parameters + ---------- + project_dir : str + Path to the dbt project directory. + target : str + Name of the dbt target / profile output to use. + """ + + def __init__(self, project_dir: str, target: str) -> None: + self._project_dir = project_dir + self._target = target + self._adapter = self._create_adapter(project_dir, target) + self._ref_map: Optional[Dict[str, str]] = None + + # ------------------------------------------------------------------ + # Adapter bootstrap + # ------------------------------------------------------------------ + + @staticmethod + def _create_adapter(project_dir: str, target: str) -> Any: + from argparse import Namespace + + from dbt.adapters.factory import get_adapter, register_adapter, reset_adapters + from dbt.config.runtime import RuntimeConfig + from dbt.flags import set_from_args + + args = Namespace( + project_dir=project_dir, + profiles_dir=os.path.expanduser("~/.dbt"), + target=target, + threads=1, + vars={}, + profile=None, + PROFILES_DIR=os.path.expanduser("~/.dbt"), + PROJECT_DIR=project_dir, + ) + set_from_args(args, None) + config = RuntimeConfig.from_args(args) + + reset_adapters() + mp_context = multiprocessing.get_context("spawn") + register_adapter(config, mp_context) + return get_adapter(config) + + # ------------------------------------------------------------------ + # Ref resolution + # ------------------------------------------------------------------ + + def _load_ref_map(self) -> Dict[str, str]: + """Build a ``{model_name: relation_name}`` map from the dbt manifest.""" + manifest_path = Path(self._project_dir) / "target" / "manifest.json" + if not manifest_path.exists(): + raise FileNotFoundError( + f"Manifest not found at {manifest_path}. " + "Run `dbt run` or `dbt compile` first." + ) + with open(manifest_path) as fh: + manifest = json.load(fh) + + ref_map: Dict[str, str] = {} + for node in manifest.get("nodes", {}).values(): + relation_name = node.get("relation_name") + name = node.get("name") + if relation_name and name: + ref_map[name] = relation_name + + # Also include sources (some queries reference source tables) + for source in manifest.get("sources", {}).values(): + relation_name = source.get("relation_name") + name = source.get("name") + if relation_name and name: + ref_map[name] = relation_name + + return ref_map + + def resolve_refs(self, query: str) -> str: + """Replace ``{{ ref('name') }}`` with the fully-qualified relation name.""" + if self._ref_map is None: + self._ref_map = self._load_ref_map() + + def _replace(match: re.Match) -> str: # type: ignore[type-arg] + name = match.group(1) + if name not in self._ref_map: + raise ValueError( + f"Cannot resolve ref('{name}'): not found in dbt manifest. " + f"Known models: {sorted(self._ref_map)!r}" + ) + return self._ref_map[name] + + return _REF_PATTERN.sub(_replace, query) + + # ------------------------------------------------------------------ + # Query execution + # ------------------------------------------------------------------ + + def run_query(self, prerendered_query: str) -> List[Dict[str, Any]]: + """Render Jinja refs and execute a query, returning rows as dicts. + + Column names are lower-cased and values are serialised to match the + behaviour of ``elementary.agate_to_dicts``. + """ + sql = self.resolve_refs(prerendered_query) + with self._adapter.connection_named("run_query"): + _response, table = self._adapter.execute(sql, fetch=True) + + # Convert agate Table → list[dict] matching agate_to_dicts behaviour + columns = [c.lower() for c in table.column_names] + return [ + {col: _serialize_value(val) for col, val in zip(columns, row)} + for row in table + ] diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index f829a5ec6..481f9b96e 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -1,12 +1,11 @@ -import json import os -import time from contextlib import contextmanager, nullcontext from pathlib import Path from tempfile import NamedTemporaryFile from typing import Any, Dict, Generator, List, Literal, Optional, Union, overload from uuid import uuid4 +from adapter_query_runner import AdapterQueryRunner from data_seeder import DbtDataSeeder from dbt_utils import get_database_and_schema_properties from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner @@ -60,33 +59,10 @@ def __init__( self.tmp_models_dir_path = self.models_dir_path / "tmp" self.seeds_dir_path = self.project_dir_path / "data" - # run_operation() can intermittently return an empty list when the - # MACRO_RESULT_PATTERN log line is not captured from dbt's output. - # Observed on both dbt-fusion (bigquery) and dbt-core (databricks). - _RUN_QUERY_MAX_RETRIES = 3 - _RUN_QUERY_RETRY_DELAY_SECONDS = 0.5 + self._query_runner = AdapterQueryRunner(project_dir, target) def run_query(self, prerendered_query: str): - for attempt in range(1, self._RUN_QUERY_MAX_RETRIES + 1): - run_operation_results = self.dbt_runner.run_operation( - "elementary.render_run_query", - macro_args={"prerendered_query": prerendered_query}, - ) - if run_operation_results: - return json.loads(run_operation_results[0]) - if attempt < self._RUN_QUERY_MAX_RETRIES: - logger.warning( - "run_operation('elementary.render_run_query') returned no " - "output (attempt %d/%d, retrying)", - attempt, - self._RUN_QUERY_MAX_RETRIES, - ) - time.sleep(self._RUN_QUERY_RETRY_DELAY_SECONDS) - raise RuntimeError( - f"run_operation('elementary.render_run_query') returned no output " - f"after {self._RUN_QUERY_MAX_RETRIES} attempts. " - f"Query: {prerendered_query!r}" - ) + return self._query_runner.run_query(prerendered_query) @staticmethod def read_table_query( From ae53f254d689dbcf137803ecf11664ab7840e301 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 09:16:19 +0000 Subject: [PATCH 06/13] fix: fall back to run_operation for queries with non-ref Jinja Queries containing Jinja macros beyond {{ ref() }} (e.g. {{ elementary.missing_count(...) }}) cannot be resolved from the manifest alone. Detect these and fall back to the run_operation path with retry, while ref-only queries still use the fast direct adapter connection. Co-Authored-By: unknown <> --- .../tests/adapter_query_runner.py | 20 ++++++++++ integration_tests/tests/dbt_project.py | 38 ++++++++++++++++++- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/integration_tests/tests/adapter_query_runner.py b/integration_tests/tests/adapter_query_runner.py index c83396d2a..fe3012434 100644 --- a/integration_tests/tests/adapter_query_runner.py +++ b/integration_tests/tests/adapter_query_runner.py @@ -21,6 +21,9 @@ # Pattern that matches {{ ref('name') }} or {{ ref("name") }} with optional whitespace _REF_PATTERN = re.compile(r"\{\{\s*ref\(\s*['\"]([^'\"]+)['\"]\s*\)\s*\}\}") +# Pattern that matches any Jinja expression {{ ... }} +_JINJA_EXPR_PATTERN = re.compile(r"\{\{.*?\}\}") + def _serialize_value(val: Any) -> Any: """Mimic elementary's ``agate_to_dicts`` serialisation. @@ -138,12 +141,29 @@ def _replace(match: re.Match) -> str: # type: ignore[type-arg] # Query execution # ------------------------------------------------------------------ + @staticmethod + def has_non_ref_jinja(query: str) -> bool: + """Return True if *query* contains Jinja expressions other than ``{{ ref(...) }}``.""" + stripped = _REF_PATTERN.sub("", query) + return bool(_JINJA_EXPR_PATTERN.search(stripped)) + def run_query(self, prerendered_query: str) -> List[Dict[str, Any]]: """Render Jinja refs and execute a query, returning rows as dicts. Column names are lower-cased and values are serialised to match the behaviour of ``elementary.agate_to_dicts``. + + If the query contains Jinja expressions beyond simple ``{{ ref() }}`` + calls (e.g. ``{{ elementary.missing_count(...) }}``), this method + raises ``ValueError`` so the caller can fall back to + ``run_operation`` which handles full Jinja rendering. """ + if self.has_non_ref_jinja(prerendered_query): + raise ValueError( + "Query contains Jinja expressions that cannot be resolved " + "from the manifest alone (only {{ ref() }} is supported)." + ) + sql = self.resolve_refs(prerendered_query) with self._adapter.connection_named("run_query"): _response, table = self._adapter.execute(sql, fetch=True) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 481f9b96e..6515de19e 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -1,4 +1,6 @@ +import json import os +import time from contextlib import contextmanager, nullcontext from pathlib import Path from tempfile import NamedTemporaryFile @@ -61,8 +63,42 @@ def __init__( self._query_runner = AdapterQueryRunner(project_dir, target) + # Retry constants for the run_operation fallback path. run_operation() + # can intermittently return an empty list when the MACRO_RESULT_PATTERN + # log line is not captured from dbt's output. + _RUN_QUERY_MAX_RETRIES = 3 + _RUN_QUERY_RETRY_DELAY_SECONDS = 0.5 + def run_query(self, prerendered_query: str): - return self._query_runner.run_query(prerendered_query) + # Fast path: queries that only contain {{ ref() }} can be executed + # directly through the adapter, bypassing run_operation log parsing. + try: + return self._query_runner.run_query(prerendered_query) + except ValueError: + # Query contains Jinja beyond ref(); fall back to run_operation. + pass + + # Slow path: full Jinja rendering via run_operation (with retry). + for attempt in range(1, self._RUN_QUERY_MAX_RETRIES + 1): + run_operation_results = self.dbt_runner.run_operation( + "elementary.render_run_query", + macro_args={"prerendered_query": prerendered_query}, + ) + if run_operation_results: + return json.loads(run_operation_results[0]) + if attempt < self._RUN_QUERY_MAX_RETRIES: + logger.warning( + "run_operation('elementary.render_run_query') returned no " + "output (attempt %d/%d, retrying)", + attempt, + self._RUN_QUERY_MAX_RETRIES, + ) + time.sleep(self._RUN_QUERY_RETRY_DELAY_SECONDS) + raise RuntimeError( + f"run_operation('elementary.render_run_query') returned no output " + f"after {self._RUN_QUERY_MAX_RETRIES} attempts. " + f"Query: {prerendered_query!r}" + ) @staticmethod def read_table_query( From 69ac36eb93c62d81d2bb821e5dbb3dfb87329d97 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 09:59:40 +0000 Subject: [PATCH 07/13] refactor: address review comments - lazy init, source resolution, manifest reload, explicit jinja check Co-Authored-By: unknown <> --- .../tests/adapter_query_runner.py | 87 ++++++++++++------- integration_tests/tests/dbt_project.py | 50 +++++++---- 2 files changed, 90 insertions(+), 47 deletions(-) diff --git a/integration_tests/tests/adapter_query_runner.py b/integration_tests/tests/adapter_query_runner.py index fe3012434..988014d4b 100644 --- a/integration_tests/tests/adapter_query_runner.py +++ b/integration_tests/tests/adapter_query_runner.py @@ -21,6 +21,11 @@ # Pattern that matches {{ ref('name') }} or {{ ref("name") }} with optional whitespace _REF_PATTERN = re.compile(r"\{\{\s*ref\(\s*['\"]([^'\"]+)['\"]\s*\)\s*\}\}") +# Pattern that matches {{ source('source_name', 'table_name') }} +_SOURCE_PATTERN = re.compile( + r"\{\{\s*source\(\s*['\"]([^'\"]+)['\"]\s*,\s*['\"]([^'\"]+)['\"]\s*\)\s*\}\}" +) + # Pattern that matches any Jinja expression {{ ... }} _JINJA_EXPR_PATTERN = re.compile(r"\{\{.*?\}\}") @@ -59,6 +64,7 @@ def __init__(self, project_dir: str, target: str) -> None: self._target = target self._adapter = self._create_adapter(project_dir, target) self._ref_map: Optional[Dict[str, str]] = None + self._source_map: Optional[Dict[tuple, str]] = None # ------------------------------------------------------------------ # Adapter bootstrap @@ -72,14 +78,15 @@ def _create_adapter(project_dir: str, target: str) -> Any: from dbt.config.runtime import RuntimeConfig from dbt.flags import set_from_args + profiles_dir = os.environ.get("DBT_PROFILES_DIR", os.path.expanduser("~/.dbt")) args = Namespace( project_dir=project_dir, - profiles_dir=os.path.expanduser("~/.dbt"), + profiles_dir=profiles_dir, target=target, threads=1, vars={}, profile=None, - PROFILES_DIR=os.path.expanduser("~/.dbt"), + PROFILES_DIR=profiles_dir, PROJECT_DIR=project_dir, ) set_from_args(args, None) @@ -94,8 +101,8 @@ def _create_adapter(project_dir: str, target: str) -> Any: # Ref resolution # ------------------------------------------------------------------ - def _load_ref_map(self) -> Dict[str, str]: - """Build a ``{model_name: relation_name}`` map from the dbt manifest.""" + def _load_manifest_maps(self) -> None: + """Load ref and source maps from the dbt manifest.""" manifest_path = Path(self._project_dir) / "target" / "manifest.json" if not manifest_path.exists(): raise FileNotFoundError( @@ -112,30 +119,58 @@ def _load_ref_map(self) -> Dict[str, str]: if relation_name and name: ref_map[name] = relation_name - # Also include sources (some queries reference source tables) + source_map: Dict[tuple, str] = {} for source in manifest.get("sources", {}).values(): relation_name = source.get("relation_name") name = source.get("name") - if relation_name and name: - ref_map[name] = relation_name + source_name = source.get("source_name") + if relation_name and source_name and name: + source_map[(source_name, name)] = relation_name + # Also register source tables by name for simple ref() lookups + ref_map.setdefault(name, relation_name) - return ref_map + self._ref_map = ref_map + self._source_map = source_map - def resolve_refs(self, query: str) -> str: - """Replace ``{{ ref('name') }}`` with the fully-qualified relation name.""" + def _ensure_maps_loaded(self) -> None: + """Lazily load manifest maps on first use.""" if self._ref_map is None: - self._ref_map = self._load_ref_map() + self._load_manifest_maps() - def _replace(match: re.Match) -> str: # type: ignore[type-arg] + def resolve_refs(self, query: str) -> str: + """Replace ``{{ ref('name') }}`` and ``{{ source('x','y') }}`` with relation names.""" + self._ensure_maps_loaded() + assert self._ref_map is not None + assert self._source_map is not None + + def _replace_ref(match: re.Match) -> str: # type: ignore[type-arg] name = match.group(1) if name not in self._ref_map: - raise ValueError( - f"Cannot resolve ref('{name}'): not found in dbt manifest. " - f"Known models: {sorted(self._ref_map)!r}" - ) + # Manifest may have changed (temp models/seeds); reload once. + self._load_manifest_maps() + assert self._ref_map is not None + if name not in self._ref_map: + raise ValueError( + f"Cannot resolve ref('{name}'): not found in dbt manifest." + ) return self._ref_map[name] - return _REF_PATTERN.sub(_replace, query) + def _replace_source(match: re.Match) -> str: # type: ignore[type-arg] + source_name, table_name = match.group(1), match.group(2) + key = (source_name, table_name) + if self._source_map is None or key not in self._source_map: + self._load_manifest_maps() + assert self._source_map is not None + if key not in self._source_map: + raise ValueError( + f"Cannot resolve source('{source_name}', '{table_name}'): " + "not found in dbt manifest." + ) + return self._source_map[key] + + query = _REF_PATTERN.sub(_replace_ref, query) + query = _SOURCE_PATTERN.sub(_replace_source, query) + return query # ------------------------------------------------------------------ # Query execution @@ -143,27 +178,21 @@ def _replace(match: re.Match) -> str: # type: ignore[type-arg] @staticmethod def has_non_ref_jinja(query: str) -> bool: - """Return True if *query* contains Jinja expressions other than ``{{ ref(...) }}``.""" + """Return True if *query* contains Jinja beyond ``{{ ref() }}`` / ``{{ source() }}``.""" stripped = _REF_PATTERN.sub("", query) + stripped = _SOURCE_PATTERN.sub("", stripped) return bool(_JINJA_EXPR_PATTERN.search(stripped)) def run_query(self, prerendered_query: str) -> List[Dict[str, Any]]: - """Render Jinja refs and execute a query, returning rows as dicts. + """Render Jinja refs/sources and execute a query, returning rows as dicts. Column names are lower-cased and values are serialised to match the behaviour of ``elementary.agate_to_dicts``. - If the query contains Jinja expressions beyond simple ``{{ ref() }}`` - calls (e.g. ``{{ elementary.missing_count(...) }}``), this method - raises ``ValueError`` so the caller can fall back to - ``run_operation`` which handles full Jinja rendering. + Only ``{{ ref() }}`` and ``{{ source() }}`` Jinja expressions are + supported. The caller should check ``has_non_ref_jinja()`` first and + use a different execution path for complex Jinja. """ - if self.has_non_ref_jinja(prerendered_query): - raise ValueError( - "Query contains Jinja expressions that cannot be resolved " - "from the manifest alone (only {{ ref() }} is supported)." - ) - sql = self.resolve_refs(prerendered_query) with self._adapter.connection_named("run_query"): _response, table = self._adapter.execute(sql, fetch=True) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 6515de19e..5699be903 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -18,6 +18,12 @@ PYTEST_XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", None) SCHEMA_NAME_SUFFIX = f"_{PYTEST_XDIST_WORKER}" if PYTEST_XDIST_WORKER else "" +# Retry constants for the run_operation fallback path. run_operation() can +# intermittently return an empty list when the MACRO_RESULT_PATTERN log line +# is not captured from dbt's output. +_RUN_QUERY_MAX_RETRIES = 3 +_RUN_QUERY_RETRY_DELAY_SECONDS = 0.5 + _DEFAULT_VARS = { "disable_dbt_invocation_autoupload": True, "disable_dbt_artifacts_autoupload": True, @@ -61,42 +67,50 @@ def __init__( self.tmp_models_dir_path = self.models_dir_path / "tmp" self.seeds_dir_path = self.project_dir_path / "data" - self._query_runner = AdapterQueryRunner(project_dir, target) + self._query_runner: Optional[AdapterQueryRunner] = None - # Retry constants for the run_operation fallback path. run_operation() - # can intermittently return an empty list when the MACRO_RESULT_PATTERN - # log line is not captured from dbt's output. - _RUN_QUERY_MAX_RETRIES = 3 - _RUN_QUERY_RETRY_DELAY_SECONDS = 0.5 + def _get_query_runner(self) -> AdapterQueryRunner: + """Lazily initialize the direct adapter query runner.""" + if self._query_runner is None: + self._query_runner = AdapterQueryRunner( + str(self.project_dir_path), self.target + ) + return self._query_runner def run_query(self, prerendered_query: str): - # Fast path: queries that only contain {{ ref() }} can be executed - # directly through the adapter, bypassing run_operation log parsing. - try: - return self._query_runner.run_query(prerendered_query) - except ValueError: - # Query contains Jinja beyond ref(); fall back to run_operation. - pass + # Fast path: queries that only contain {{ ref() }} / {{ source() }} + # can be executed directly through the adapter, bypassing + # run_operation log parsing entirely. + if not AdapterQueryRunner.has_non_ref_jinja(prerendered_query): + return self._get_query_runner().run_query(prerendered_query) # Slow path: full Jinja rendering via run_operation (with retry). - for attempt in range(1, self._RUN_QUERY_MAX_RETRIES + 1): + return self._run_query_with_run_operation(prerendered_query) + + def _run_query_with_run_operation(self, prerendered_query: str): + """Execute a query via run_operation with retry on empty output. + + run_operation() can intermittently return an empty list when the + MACRO_RESULT_PATTERN log line is not captured from dbt's output. + """ + for attempt in range(1, _RUN_QUERY_MAX_RETRIES + 1): run_operation_results = self.dbt_runner.run_operation( "elementary.render_run_query", macro_args={"prerendered_query": prerendered_query}, ) if run_operation_results: return json.loads(run_operation_results[0]) - if attempt < self._RUN_QUERY_MAX_RETRIES: + if attempt < _RUN_QUERY_MAX_RETRIES: logger.warning( "run_operation('elementary.render_run_query') returned no " "output (attempt %d/%d, retrying)", attempt, - self._RUN_QUERY_MAX_RETRIES, + _RUN_QUERY_MAX_RETRIES, ) - time.sleep(self._RUN_QUERY_RETRY_DELAY_SECONDS) + time.sleep(_RUN_QUERY_RETRY_DELAY_SECONDS) raise RuntimeError( f"run_operation('elementary.render_run_query') returned no output " - f"after {self._RUN_QUERY_MAX_RETRIES} attempts. " + f"after {_RUN_QUERY_MAX_RETRIES} attempts. " f"Query: {prerendered_query!r}" ) From 0ca8ca39feae63f81c9570d0ad5b47eb2a6457ad Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 10:06:04 +0000 Subject: [PATCH 08/13] refactor: use tenacity for run_operation retry Co-Authored-By: unknown <> --- integration_tests/tests/dbt_project.py | 49 +++++++++++++++----------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 5699be903..557bfe8a0 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -1,6 +1,5 @@ import json import os -import time from contextlib import contextmanager, nullcontext from pathlib import Path from tempfile import NamedTemporaryFile @@ -14,11 +13,12 @@ from elementary.clients.dbt.factory import RunnerMethod, create_dbt_runner from logger import get_logger from ruamel.yaml import YAML +from tenacity import retry, retry_if_result, stop_after_attempt, wait_fixed PYTEST_XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", None) SCHEMA_NAME_SUFFIX = f"_{PYTEST_XDIST_WORKER}" if PYTEST_XDIST_WORKER else "" -# Retry constants for the run_operation fallback path. run_operation() can +# Retry settings for the run_operation fallback path. run_operation() can # intermittently return an empty list when the MACRO_RESULT_PATTERN log line # is not captured from dbt's output. _RUN_QUERY_MAX_RETRIES = 3 @@ -87,32 +87,39 @@ def run_query(self, prerendered_query: str): # Slow path: full Jinja rendering via run_operation (with retry). return self._run_query_with_run_operation(prerendered_query) + @retry( + retry=retry_if_result(lambda r: r is None), + stop=stop_after_attempt(_RUN_QUERY_MAX_RETRIES), + wait=wait_fixed(_RUN_QUERY_RETRY_DELAY_SECONDS), + reraise=True, + ) + def _run_operation_with_retry(self, prerendered_query: str) -> Optional[list]: + """Call run_operation and return the parsed result, or None to trigger retry.""" + run_operation_results = self.dbt_runner.run_operation( + "elementary.render_run_query", + macro_args={"prerendered_query": prerendered_query}, + ) + if run_operation_results: + return json.loads(run_operation_results[0]) + logger.warning( + "run_operation('elementary.render_run_query') returned no output, retrying" + ) + return None + def _run_query_with_run_operation(self, prerendered_query: str): """Execute a query via run_operation with retry on empty output. run_operation() can intermittently return an empty list when the MACRO_RESULT_PATTERN log line is not captured from dbt's output. """ - for attempt in range(1, _RUN_QUERY_MAX_RETRIES + 1): - run_operation_results = self.dbt_runner.run_operation( - "elementary.render_run_query", - macro_args={"prerendered_query": prerendered_query}, + result = self._run_operation_with_retry(prerendered_query) + if result is None: + raise RuntimeError( + f"run_operation('elementary.render_run_query') returned no output " + f"after {_RUN_QUERY_MAX_RETRIES} attempts. " + f"Query: {prerendered_query!r}" ) - if run_operation_results: - return json.loads(run_operation_results[0]) - if attempt < _RUN_QUERY_MAX_RETRIES: - logger.warning( - "run_operation('elementary.render_run_query') returned no " - "output (attempt %d/%d, retrying)", - attempt, - _RUN_QUERY_MAX_RETRIES, - ) - time.sleep(_RUN_QUERY_RETRY_DELAY_SECONDS) - raise RuntimeError( - f"run_operation('elementary.render_run_query') returned no output " - f"after {_RUN_QUERY_MAX_RETRIES} attempts. " - f"Query: {prerendered_query!r}" - ) + return result @staticmethod def read_table_query( From 77f5ddcf5c5c77921e08cf0df7a459f3019ecca7 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 10:07:08 +0000 Subject: [PATCH 09/13] chore: add tenacity as direct test dependency Co-Authored-By: unknown <> --- integration_tests/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/integration_tests/requirements.txt b/integration_tests/requirements.txt index 4bc90f57f..d43fc7f3e 100644 --- a/integration_tests/requirements.txt +++ b/integration_tests/requirements.txt @@ -3,6 +3,7 @@ pytest-xdist pytest-parametrization pytest-html filelock +tenacity # urllib3>=2.2.2 fixes CVE-2023-45803 and CVE-2024-37891 # Upper bound <3.0.0 prevents breaking changes from future major versions urllib3>=2.2.2,<3.0.0 From 08a6ebbe7b1ddf8a522d4d03e50be8f7895e5618 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 11:34:43 +0000 Subject: [PATCH 10/13] fix: surface seed and init failures instead of swallowing them - DbtDataSeeder.seed(): check return value and raise RuntimeError on failure (previously ignored the bool return, causing confusing TABLE_OR_VIEW_NOT_FOUND errors downstream when the real cause was SCHEMA_NOT_FOUND during seed) - Environment.init(): check return values of dbt run commands and raise RuntimeError on failure (previously silently continued even if schema creation failed) - conftest.py: log worker ID and schema_suffix during init for debugging parallel test runs (pytest-xdist workers gw0-gw7) Co-Authored-By: unknown <> --- integration_tests/tests/conftest.py | 13 ++++++++++--- integration_tests/tests/data_seeder.py | 16 +++++++++++++++- integration_tests/tests/env.py | 24 ++++++++++++++++++++++-- 3 files changed, 47 insertions(+), 6 deletions(-) diff --git a/integration_tests/tests/conftest.py b/integration_tests/tests/conftest.py index 257854ccf..579c5f70e 100644 --- a/integration_tests/tests/conftest.py +++ b/integration_tests/tests/conftest.py @@ -6,7 +6,7 @@ import pytest import yaml from dbt.version import __version__ as dbt_version -from dbt_project import DbtProject +from dbt_project import PYTEST_XDIST_WORKER, SCHEMA_NAME_SUFFIX, DbtProject from elementary.clients.dbt.factory import RunnerMethod from env import Environment from logger import get_logger @@ -90,10 +90,17 @@ def init_tests_env( ): env = Environment(target, project_dir_copy, runner_method) if not skip_init: - logger.info("Initializing test environment") + logger.info( + "Initializing test environment (worker=%s, schema_suffix='%s')", + PYTEST_XDIST_WORKER or "main", + SCHEMA_NAME_SUFFIX, + ) env.clear() env.init() - logger.info("Initialization complete") + logger.info( + "Initialization complete (worker=%s)", + PYTEST_XDIST_WORKER or "main", + ) yield diff --git a/integration_tests/tests/data_seeder.py b/integration_tests/tests/data_seeder.py index dfba10100..f89c0c5eb 100644 --- a/integration_tests/tests/data_seeder.py +++ b/integration_tests/tests/data_seeder.py @@ -29,7 +29,21 @@ def seed(self, data: List[dict], table_name: str) -> Generator[None, None, None] writer.writeheader() writer.writerows(data) seed_file.flush() - self.dbt_runner.seed(select=str(relative_seed_path), full_refresh=True) + success = self.dbt_runner.seed( + select=str(relative_seed_path), full_refresh=True + ) + if not success: + logger.error( + "dbt seed failed for '%s'. This usually means the " + "target schema does not exist or could not be created. " + "Downstream queries will fail with " + "TABLE_OR_VIEW_NOT_FOUND.", + table_name, + ) + raise RuntimeError( + f"dbt seed failed for '{table_name}'. Check the dbt " + f"output above for the root cause (e.g. SCHEMA_NOT_FOUND)." + ) yield finally: diff --git a/integration_tests/tests/env.py b/integration_tests/tests/env.py index b5d108d25..12e5a47e2 100644 --- a/integration_tests/tests/env.py +++ b/integration_tests/tests/env.py @@ -2,6 +2,9 @@ import dbt_project from elementary.clients.dbt.factory import RunnerMethod +from logger import get_logger + +logger = get_logger(__name__) class Environment: @@ -22,5 +25,22 @@ def clear(self): self.dbt_runner.run_operation("elementary_tests.clear_env") def init(self): - self.dbt_runner.run(selector="init") - self.dbt_runner.run(select="elementary") + init_success = self.dbt_runner.run(selector="init") + if not init_success: + logger.error( + "Environment init failed: 'dbt run --selector init' returned " + "failure. The target schema may not have been created. " + "Subsequent seeds and queries will likely fail with " + "SCHEMA_NOT_FOUND or TABLE_OR_VIEW_NOT_FOUND." + ) + elementary_success = self.dbt_runner.run(select="elementary") + if not elementary_success: + logger.error( + "Environment init failed: 'dbt run --select elementary' " + "returned failure. Elementary models may not be available." + ) + if not init_success or not elementary_success: + raise RuntimeError( + "Test environment initialization failed. Check the dbt " + "output above for the root cause." + ) From 6b4acf9e96a70fd1eabc38d75ea52b70e453e741 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 11:39:21 +0000 Subject: [PATCH 11/13] refactor: early exit on init failure to reduce log noise When 'dbt run --selector init' fails, skip the dependent 'dbt run --select elementary' step since it will almost certainly fail too, producing additional noise that obscures the root cause. Addresses CodeRabbit nitpick. Co-Authored-By: unknown <> --- integration_tests/tests/env.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/integration_tests/tests/env.py b/integration_tests/tests/env.py index 12e5a47e2..0ed9f7c5e 100644 --- a/integration_tests/tests/env.py +++ b/integration_tests/tests/env.py @@ -33,14 +33,19 @@ def init(self): "Subsequent seeds and queries will likely fail with " "SCHEMA_NOT_FOUND or TABLE_OR_VIEW_NOT_FOUND." ) + raise RuntimeError( + "Test environment initialization failed during " + "'dbt run --selector init'. Check the dbt output above " + "for the root cause." + ) elementary_success = self.dbt_runner.run(select="elementary") if not elementary_success: logger.error( "Environment init failed: 'dbt run --select elementary' " "returned failure. Elementary models may not be available." ) - if not init_success or not elementary_success: raise RuntimeError( - "Test environment initialization failed. Check the dbt " - "output above for the root cause." + "Test environment initialization failed during " + "'dbt run --select elementary'. Check the dbt output " + "above for the root cause." ) From 4869797d9bc065914c9165203d4a8cae4452b536 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 18:49:19 +0000 Subject: [PATCH 12/13] refactor: address review comments - BaseAdapter type, custom UnsupportedJinjaError, merge master Co-Authored-By: unknown <> --- .../tests/adapter_query_runner.py | 24 +++++++++++++++---- integration_tests/tests/dbt_project.py | 6 +++-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/integration_tests/tests/adapter_query_runner.py b/integration_tests/tests/adapter_query_runner.py index 988014d4b..580d600cc 100644 --- a/integration_tests/tests/adapter_query_runner.py +++ b/integration_tests/tests/adapter_query_runner.py @@ -14,10 +14,24 @@ from pathlib import Path from typing import Any, Dict, List, Optional +from dbt.adapters.base import BaseAdapter from logger import get_logger logger = get_logger(__name__) + +class UnsupportedJinjaError(Exception): + """Raised when a query contains Jinja expressions beyond ref()/source().""" + + def __init__(self, query: str) -> None: + self.query = query + super().__init__( + "Query contains Jinja expressions beyond {{ ref() }} / {{ source() }} " + "which cannot be executed via the direct adapter path. " + "Use the run_operation fallback instead." + ) + + # Pattern that matches {{ ref('name') }} or {{ ref("name") }} with optional whitespace _REF_PATTERN = re.compile(r"\{\{\s*ref\(\s*['\"]([^'\"]+)['\"]\s*\)\s*\}\}") @@ -62,7 +76,7 @@ class AdapterQueryRunner: def __init__(self, project_dir: str, target: str) -> None: self._project_dir = project_dir self._target = target - self._adapter = self._create_adapter(project_dir, target) + self._adapter: BaseAdapter = self._create_adapter(project_dir, target) self._ref_map: Optional[Dict[str, str]] = None self._source_map: Optional[Dict[tuple, str]] = None @@ -71,7 +85,7 @@ def __init__(self, project_dir: str, target: str) -> None: # ------------------------------------------------------------------ @staticmethod - def _create_adapter(project_dir: str, target: str) -> Any: + def _create_adapter(project_dir: str, target: str) -> BaseAdapter: from argparse import Namespace from dbt.adapters.factory import get_adapter, register_adapter, reset_adapters @@ -190,9 +204,11 @@ def run_query(self, prerendered_query: str) -> List[Dict[str, Any]]: behaviour of ``elementary.agate_to_dicts``. Only ``{{ ref() }}`` and ``{{ source() }}`` Jinja expressions are - supported. The caller should check ``has_non_ref_jinja()`` first and - use a different execution path for complex Jinja. + supported. Raises ``UnsupportedJinjaError`` if the query contains + other Jinja expressions. """ + if self.has_non_ref_jinja(prerendered_query): + raise UnsupportedJinjaError(prerendered_query) sql = self.resolve_refs(prerendered_query) with self._adapter.connection_named("run_query"): _response, table = self._adapter.execute(sql, fetch=True) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 557bfe8a0..1d8bb0e85 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -6,7 +6,7 @@ from typing import Any, Dict, Generator, List, Literal, Optional, Union, overload from uuid import uuid4 -from adapter_query_runner import AdapterQueryRunner +from adapter_query_runner import AdapterQueryRunner, UnsupportedJinjaError from data_seeder import DbtDataSeeder from dbt_utils import get_database_and_schema_properties from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner @@ -81,8 +81,10 @@ def run_query(self, prerendered_query: str): # Fast path: queries that only contain {{ ref() }} / {{ source() }} # can be executed directly through the adapter, bypassing # run_operation log parsing entirely. - if not AdapterQueryRunner.has_non_ref_jinja(prerendered_query): + try: return self._get_query_runner().run_query(prerendered_query) + except UnsupportedJinjaError: + pass # Slow path: full Jinja rendering via run_operation (with retry). return self._run_query_with_run_operation(prerendered_query) From b7229059b6d847d8be4c7b3f5909f1e48b637632 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 20:26:03 +0000 Subject: [PATCH 13/13] refactor: log fallback to run_operation and use tenacity before_sleep for retry logging Co-Authored-By: unknown <> --- integration_tests/tests/dbt_project.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 1d8bb0e85..44b9f3f41 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -13,7 +13,13 @@ from elementary.clients.dbt.factory import RunnerMethod, create_dbt_runner from logger import get_logger from ruamel.yaml import YAML -from tenacity import retry, retry_if_result, stop_after_attempt, wait_fixed +from tenacity import ( + RetryCallState, + retry, + retry_if_result, + stop_after_attempt, + wait_fixed, +) PYTEST_XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", None) SCHEMA_NAME_SUFFIX = f"_{PYTEST_XDIST_WORKER}" if PYTEST_XDIST_WORKER else "" @@ -84,15 +90,27 @@ def run_query(self, prerendered_query: str): try: return self._get_query_runner().run_query(prerendered_query) except UnsupportedJinjaError: - pass + logger.debug("Query contains complex Jinja; falling back to run_operation") # Slow path: full Jinja rendering via run_operation (with retry). return self._run_query_with_run_operation(prerendered_query) + @staticmethod + def _log_retry(retry_state: RetryCallState) -> None: + """Tenacity before_sleep callback — logs each retry with attempt number.""" + logger.warning( + "run_operation('elementary.render_run_query') returned no output; " + "retry %d/%d in %.1fs", + retry_state.attempt_number, + _RUN_QUERY_MAX_RETRIES, + _RUN_QUERY_RETRY_DELAY_SECONDS, + ) + @retry( retry=retry_if_result(lambda r: r is None), stop=stop_after_attempt(_RUN_QUERY_MAX_RETRIES), wait=wait_fixed(_RUN_QUERY_RETRY_DELAY_SECONDS), + before_sleep=_log_retry.__func__, reraise=True, ) def _run_operation_with_retry(self, prerendered_query: str) -> Optional[list]: @@ -103,9 +121,6 @@ def _run_operation_with_retry(self, prerendered_query: str) -> Optional[list]: ) if run_operation_results: return json.loads(run_operation_results[0]) - logger.warning( - "run_operation('elementary.render_run_query') returned no output, retrying" - ) return None def _run_query_with_run_operation(self, prerendered_query: str):