Skip to content

Commit c3d0cf4

Browse files
fix: surface exception text in APIDbtRunner for transient error detection (#2129)
* fix: surface exception text in APIDbtRunner for transient error detection APIDbtRunner only captures JinjaLogInfo and RunningOperationCaughtError events into the output field. When a command fails with a transient error (e.g. RemoteDisconnected), the error text lives in res.exception — not in the captured output. This means _inner_run_command_with_retries has nothing to match against and never fires the retry. Fix: extract res.exception text and put it into the stderr field of APIDbtCommandResult (the dbt Python API doesn't use stderr). This allows the transient error detection in _inner_run_command_with_retries to examine the exception text, analogous to how SubprocessDbtRunner captures subprocess stderr. Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * style: address CodeRabbit review - remove /tmp paths and prefix unused args Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * style: fix black formatting - wrap long function signatures Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * style: fix black formatting - keep short signature on one line Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Itamar Hartstein <haritamar@gmail.com>
1 parent 465d023 commit c3d0cf4

2 files changed

Lines changed: 124 additions & 2 deletions

File tree

elementary/clients/dbt/api_dbt_runner.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,24 @@ def collect_dbt_command_logs(event):
4949
with with_chdir(self.project_dir):
5050
res: dbtRunnerResult = dbt.invoke(dbt_command_args)
5151
output = "\n".join(dbt_logs) or None
52+
# Surface the exception text so that transient-error detection in
53+
# _inner_run_command_with_retries can match against it. The dbt
54+
# Python API doesn't write to stderr, so we repurpose that field
55+
# for the exception string (analogous to how SubprocessDbtRunner
56+
# captures subprocess stderr).
57+
exception_text = str(res.exception) if res.exception else None
5258
if self.raise_on_failure and not res.success:
5359
raise DbtCommandError(
5460
base_command_args=dbt_command_args,
55-
err_msg=(str(res.exception) if res.exception else output),
61+
err_msg=(exception_text or output),
5662
logs=[DbtLog.from_log_line(log) for log in dbt_logs],
5763
)
5864

5965
return APIDbtCommandResult(
60-
success=res.success, output=output, stderr=None, result_obj=res
66+
success=res.success,
67+
output=output,
68+
stderr=exception_text,
69+
result_obj=res,
6170
)
6271

6372
def _parse_ls_command_result(

tests/unit/clients/dbt_runner/test_retry_logic.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,3 +201,116 @@ def test_successful_command_not_retried(self, mock_subprocess_run):
201201

202202
assert mock_subprocess_run.call_count == 1
203203
assert result is True
204+
205+
206+
def _make_api_runner(**kwargs):
207+
"""Create an APIDbtRunner with deps/packages stubbed out."""
208+
defaults = dict(
209+
project_dir="fake_project",
210+
profiles_dir="fake_profiles",
211+
target=None,
212+
raise_on_failure=False,
213+
run_deps_if_needed=False,
214+
)
215+
defaults.update(kwargs)
216+
from elementary.clients.dbt.api_dbt_runner import APIDbtRunner
217+
218+
with mock.patch.object(APIDbtRunner, "_run_deps_if_needed"):
219+
return APIDbtRunner(**defaults)
220+
221+
222+
@_ZERO_WAIT
223+
class TestAPIDbtRunnerTransientDetection:
224+
"""Test that APIDbtRunner surfaces exception text for transient error detection.
225+
226+
The dbt Python API (APIDbtRunner) only captures JinjaLogInfo and
227+
RunningOperationCaughtError events into ``output``. Transient errors
228+
like RemoteDisconnected appear as ``res.exception`` — not in the
229+
captured output. Without surfacing this, the retry logic has nothing
230+
to match against and never fires.
231+
"""
232+
233+
@mock.patch(
234+
"elementary.clients.dbt.api_dbt_runner.with_chdir",
235+
return_value=mock.MagicMock(
236+
__enter__=mock.MagicMock(), __exit__=mock.MagicMock()
237+
),
238+
)
239+
@mock.patch("elementary.clients.dbt.api_dbt_runner.dbtRunner")
240+
def test_transient_exception_triggers_retry(self, mock_dbt_runner_cls, _mock_chdir):
241+
"""A transient exception in res.exception should be retried."""
242+
# Simulate dbtRunnerResult with a transient exception.
243+
fail_result = mock.MagicMock()
244+
fail_result.success = False
245+
fail_result.exception = ConnectionError(
246+
"('Connection aborted.', "
247+
"RemoteDisconnected('Remote end closed connection without response'))"
248+
)
249+
250+
success_result = mock.MagicMock()
251+
success_result.success = True
252+
success_result.exception = None
253+
254+
# dbtRunner().invoke returns fail first, then success.
255+
mock_dbt_instance = mock.MagicMock()
256+
mock_dbt_instance.invoke.side_effect = [fail_result, success_result]
257+
mock_dbt_runner_cls.return_value = mock_dbt_instance
258+
259+
runner = _make_api_runner(raise_on_failure=False)
260+
result = runner.seed()
261+
262+
assert mock_dbt_instance.invoke.call_count == 2
263+
assert result is True
264+
265+
@mock.patch(
266+
"elementary.clients.dbt.api_dbt_runner.with_chdir",
267+
return_value=mock.MagicMock(
268+
__enter__=mock.MagicMock(), __exit__=mock.MagicMock()
269+
),
270+
)
271+
@mock.patch("elementary.clients.dbt.api_dbt_runner.dbtRunner")
272+
def test_non_transient_exception_not_retried(
273+
self, mock_dbt_runner_cls, _mock_chdir
274+
):
275+
"""A non-transient exception should NOT be retried."""
276+
fail_result = mock.MagicMock()
277+
fail_result.success = False
278+
fail_result.exception = Exception("Compilation Error in model foo")
279+
280+
mock_dbt_instance = mock.MagicMock()
281+
mock_dbt_instance.invoke.return_value = fail_result
282+
mock_dbt_runner_cls.return_value = mock_dbt_instance
283+
284+
runner = _make_api_runner(raise_on_failure=False)
285+
result = runner.seed()
286+
287+
assert mock_dbt_instance.invoke.call_count == 1
288+
assert result is False
289+
290+
@mock.patch(
291+
"elementary.clients.dbt.api_dbt_runner.with_chdir",
292+
return_value=mock.MagicMock(
293+
__enter__=mock.MagicMock(), __exit__=mock.MagicMock()
294+
),
295+
)
296+
@mock.patch("elementary.clients.dbt.api_dbt_runner.dbtRunner")
297+
def test_transient_exception_exhausts_retries(
298+
self, mock_dbt_runner_cls, _mock_chdir
299+
):
300+
"""After exhausting retries, the last failed result is returned."""
301+
fail_result = mock.MagicMock()
302+
fail_result.success = False
303+
fail_result.exception = ConnectionError(
304+
"('Connection aborted.', "
305+
"RemoteDisconnected('Remote end closed connection without response'))"
306+
)
307+
308+
mock_dbt_instance = mock.MagicMock()
309+
mock_dbt_instance.invoke.return_value = fail_result
310+
mock_dbt_runner_cls.return_value = mock_dbt_instance
311+
312+
runner = _make_api_runner(raise_on_failure=False)
313+
result = runner.seed()
314+
315+
assert mock_dbt_instance.invoke.call_count == _TRANSIENT_MAX_RETRIES
316+
assert result is False

0 commit comments

Comments
 (0)