Skip to content

Commit dcd8a9f

Browse files
committed
refactor: simplify retry flow with _inner_run_command_with_retries
- Replace _execute_inner_command + nested _attempt() with a single _inner_run_command_with_retries method decorated with tenacity @Retry - Move exhausted-retry handling (log, re-raise or return exc.result) into _run_command try/except - Add module-level _before_retry_log(retry_state) for retry logging; log_command_args read from retry_state.kwargs - Call chain: _run_command -> _inner_run_command_with_retries -> _inner_run_command - Update test docstring to reference new method name Made-with: Cursor
1 parent b5de019 commit dcd8a9f

2 files changed

Lines changed: 93 additions & 114 deletions

File tree

elementary/clients/dbt/command_line_dbt_runner.py

Lines changed: 92 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,18 @@ def __init__(self, result: "DbtCommandResult", message: str) -> None:
3737
self.result = result
3838

3939

40+
def _before_retry_log(retry_state: RetryCallState) -> None:
41+
"""Log before each retry. Reads log_command_args from the retried call."""
42+
log_command_args = retry_state.kwargs.get("log_command_args", [])
43+
attempt = retry_state.attempt_number
44+
logger.warning(
45+
"Transient error detected for dbt command '%s' (attempt %d/%d). Retrying...",
46+
" ".join(log_command_args),
47+
attempt,
48+
_TRANSIENT_MAX_RETRIES,
49+
)
50+
51+
4052
MACRO_RESULT_PATTERN = re.compile(
4153
"Elementary: --ELEMENTARY-MACRO-OUTPUT-START--(.*)--ELEMENTARY-MACRO-OUTPUT-END--"
4254
)
@@ -131,134 +143,101 @@ def _run_command(
131143
else:
132144
logger.debug(log_msg)
133145

134-
result = self._execute_inner_command(
135-
dbt_command_args=dbt_command_args,
136-
log_command_args=log_command_args,
137-
quiet=quiet,
138-
log_output=log_output,
139-
log_format=log_format,
140-
)
141-
142-
return result
146+
try:
147+
return self._inner_run_command_with_retries(
148+
dbt_command_args=dbt_command_args,
149+
log_command_args=log_command_args,
150+
quiet=quiet,
151+
log_output=log_output,
152+
log_format=log_format,
153+
)
154+
except DbtTransientError as exc:
155+
logger.exception(
156+
"dbt command '%s' failed after %d attempts due to transient errors.",
157+
" ".join(log_command_args),
158+
_TRANSIENT_MAX_RETRIES,
159+
)
160+
if isinstance(exc.__cause__, DbtCommandError):
161+
raise exc.__cause__ from exc
162+
return exc.result
143163

144-
def _execute_inner_command(
164+
@retry(
165+
retry=retry_if_exception(lambda exc: isinstance(exc, DbtTransientError)),
166+
stop=stop_after_attempt(_TRANSIENT_MAX_RETRIES),
167+
wait=wait_exponential(
168+
multiplier=_TRANSIENT_WAIT_MULTIPLIER,
169+
max=_TRANSIENT_WAIT_MAX,
170+
),
171+
before_sleep=_before_retry_log,
172+
reraise=True,
173+
)
174+
def _inner_run_command_with_retries(
145175
self,
146176
dbt_command_args: List[str],
147177
log_command_args: List[str],
148178
quiet: bool,
149179
log_output: bool,
150180
log_format: str,
151181
) -> DbtCommandResult:
152-
"""Execute ``_inner_run_command`` with automatic retries for transient errors.
153-
154-
This method wraps the actual command execution, checks the result
155-
for known transient error patterns (per adapter), and retries using
156-
``tenacity`` when appropriate. Non-transient failures are returned
157-
immediately without retrying.
158-
"""
159-
160-
def _before_retry(retry_state: RetryCallState) -> None:
161-
attempt = retry_state.attempt_number
162-
logger.warning(
163-
"Transient error detected for dbt command '%s' "
164-
"(attempt %d/%d). Retrying...",
165-
" ".join(log_command_args),
166-
attempt,
167-
_TRANSIENT_MAX_RETRIES,
182+
"""Run one dbt command attempt. Raises DbtTransientError for transient failures so tenacity can retry."""
183+
try:
184+
result = self._inner_run_command(
185+
dbt_command_args,
186+
quiet=quiet,
187+
log_output=log_output,
188+
log_format=log_format,
168189
)
169-
170-
@retry(
171-
retry=retry_if_exception(lambda exc: isinstance(exc, DbtTransientError)),
172-
stop=stop_after_attempt(_TRANSIENT_MAX_RETRIES),
173-
wait=wait_exponential(
174-
multiplier=_TRANSIENT_WAIT_MULTIPLIER, max=_TRANSIENT_WAIT_MAX
175-
),
176-
before_sleep=_before_retry,
177-
reraise=True,
178-
)
179-
def _attempt() -> DbtCommandResult:
180-
try:
181-
result = self._inner_run_command(
182-
dbt_command_args,
183-
quiet=quiet,
184-
log_output=log_output,
185-
log_format=log_format,
186-
)
187-
except DbtCommandError as exc:
188-
# DbtCommandError is raised when raise_on_failure=True and
189-
# the command exits with a non-zero return code. Extract
190-
# actual output/stderr from the underlying process error
191-
# for more accurate transient-error detection.
192-
output_text = str(exc)
193-
stderr_text: Optional[str] = None
194-
if exc.proc_err is not None:
195-
if exc.proc_err.output:
196-
output_text = (
197-
exc.proc_err.output.decode()
198-
if isinstance(exc.proc_err.output, bytes)
199-
else str(exc.proc_err.output)
200-
)
201-
if exc.proc_err.stderr:
202-
stderr_text = (
203-
exc.proc_err.stderr.decode()
204-
if isinstance(exc.proc_err.stderr, bytes)
205-
else str(exc.proc_err.stderr)
206-
)
207-
if is_transient_error(
208-
self.target, output=output_text, stderr=stderr_text
209-
):
210-
raise DbtTransientError(
211-
result=DbtCommandResult(
212-
success=False,
213-
output=output_text,
214-
stderr=stderr_text,
215-
),
216-
message=(f"Transient error during dbt command: {exc}"),
217-
) from exc
218-
raise
219-
220-
if result.output:
221-
logger.debug(
222-
"Result bytes size for command '%s' is %d",
223-
log_command_args,
224-
len(result.output),
225-
)
226-
if log_output or is_debug():
227-
for log in parse_dbt_output(result.output, log_format):
228-
logger.info(log.msg)
229-
230-
# Command completed but may have failed (raise_on_failure=False).
231-
if not result.success and is_transient_error(
232-
self.target, output=result.output, stderr=result.stderr
190+
except DbtCommandError as exc:
191+
output_text = str(exc)
192+
stderr_text: Optional[str] = None
193+
if exc.proc_err is not None:
194+
if exc.proc_err.output:
195+
output_text = (
196+
exc.proc_err.output.decode()
197+
if isinstance(exc.proc_err.output, bytes)
198+
else str(exc.proc_err.output)
199+
)
200+
if exc.proc_err.stderr:
201+
stderr_text = (
202+
exc.proc_err.stderr.decode()
203+
if isinstance(exc.proc_err.stderr, bytes)
204+
else str(exc.proc_err.stderr)
205+
)
206+
if is_transient_error(
207+
self.target, output=output_text, stderr=stderr_text
233208
):
234209
raise DbtTransientError(
235-
result=result,
236-
message=(
237-
f"Transient error during dbt command: "
238-
f"{' '.join(log_command_args)}"
210+
result=DbtCommandResult(
211+
success=False,
212+
output=output_text,
213+
stderr=stderr_text,
239214
),
240-
)
241-
242-
return result
215+
message=f"Transient error during dbt command: {exc}",
216+
) from exc
217+
raise
243218

244-
try:
245-
return _attempt()
246-
except DbtTransientError as exc:
247-
# All retry attempts exhausted.
248-
logger.exception(
249-
"dbt command '%s' failed after %d attempts due to " "transient errors.",
219+
if result.output:
220+
logger.debug(
221+
"Result bytes size for command '%s' is %d",
250222
" ".join(log_command_args),
251-
_TRANSIENT_MAX_RETRIES,
223+
len(result.output),
252224
)
253-
# Preserve the raise_on_failure contract: if the original
254-
# failure was a DbtCommandError (i.e. raise_on_failure=True),
255-
# re-raise it so callers relying on exception handling still
256-
# see the expected exception type.
257-
if isinstance(exc.__cause__, DbtCommandError):
258-
raise exc.__cause__ from exc
259-
# Otherwise (raise_on_failure=False path), return the last
260-
# failed result so callers that check result.success work.
261-
return exc.result
225+
if log_output or is_debug():
226+
for log in parse_dbt_output(result.output, log_format):
227+
logger.info(log.msg)
228+
229+
if not result.success and is_transient_error(
230+
self.target, output=result.output, stderr=result.stderr
231+
):
232+
raise DbtTransientError(
233+
result=result,
234+
message=(
235+
f"Transient error during dbt command: "
236+
f"{' '.join(log_command_args)}"
237+
),
238+
)
239+
240+
return result
262241

263242
def deps(
264243
self,

tests/unit/clients/dbt_runner/test_retry_logic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Unit tests for transient-error retry logic in _execute_inner_command."""
1+
"""Unit tests for transient-error retry logic in _inner_run_command_with_retries."""
22

33
import subprocess
44
from unittest import mock

0 commit comments

Comments
 (0)