Skip to content

Commit 3cf52fa

Browse files
authored
Merge branch 'master' into devin/1772278180-cleanup-stale-ci-schemas
2 parents a495427 + c3d0cf4 commit 3cf52fa

5 files changed

Lines changed: 169 additions & 2 deletions

File tree

.github/workflows/test-warehouse.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,3 +277,10 @@ jobs:
277277

278278
- name: Run Python package e2e tests
279279
run: pytest -vv tests/e2e --warehouse-type ${{ inputs.warehouse-type }}
280+
281+
- name: Drop test schemas
282+
if: always()
283+
working-directory: ${{ env.E2E_DBT_PROJECT_DIR }}
284+
continue-on-error: true
285+
run: |
286+
dbt run-operation elementary_integration_tests.drop_test_schemas --target "${{ inputs.warehouse-type }}"

elementary/clients/dbt/api_dbt_runner.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,24 @@ def collect_dbt_command_logs(event):
4949
with with_chdir(self.project_dir):
5050
res: dbtRunnerResult = dbt.invoke(dbt_command_args)
5151
output = "\n".join(dbt_logs) or None
52+
# Surface the exception text so that transient-error detection in
53+
# _inner_run_command_with_retries can match against it. The dbt
54+
# Python API doesn't write to stderr, so we repurpose that field
55+
# for the exception string (analogous to how SubprocessDbtRunner
56+
# captures subprocess stderr).
57+
exception_text = str(res.exception) if res.exception else None
5258
if self.raise_on_failure and not res.success:
5359
raise DbtCommandError(
5460
base_command_args=dbt_command_args,
55-
err_msg=(str(res.exception) if res.exception else output),
61+
err_msg=(exception_text or output),
5662
logs=[DbtLog.from_log_line(log) for log in dbt_logs],
5763
)
5864

5965
return APIDbtCommandResult(
60-
success=res.success, output=output, stderr=None, result_obj=res
66+
success=res.success,
67+
output=output,
68+
stderr=exception_text,
69+
result_obj=res,
6170
)
6271

6372
def _parse_ls_command_result(

tests/e2e_dbt_project/dbt_project.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ vars:
2020
days_back: 30
2121
debug_logs: "{{ env_var('DBT_EDR_DEBUG', False) }}"
2222
custom_run_started_at: "{{ modules.datetime.datetime.utcfromtimestamp(0) }}"
23+
clean_elementary_temp_tables: false
24+
disable_dbt_artifacts_autoupload: true
2325

2426
seeds:
2527
+schema: test_seeds
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{% macro drop_test_schemas() %}
2+
{# Drop both the main test schema and the elementary schema used by the CLI.
3+
The schema names are derived from the profile's target schema. #}
4+
{% set main_schema = target.schema %}
5+
{% set elementary_schema = main_schema ~ '_elementary' %}
6+
7+
{% do elementary_integration_tests.edr_drop_schema(elementary_schema) %}
8+
{% do elementary_integration_tests.edr_drop_schema(main_schema) %}
9+
{% do log("Dropped schemas: " ~ main_schema ~ ", " ~ elementary_schema, info=true) %}
10+
{% endmacro %}
11+
12+
{% macro edr_drop_schema(schema_name) %}
13+
{% do return(adapter.dispatch('edr_drop_schema', 'elementary_integration_tests')(schema_name)) %}
14+
{% endmacro %}
15+
16+
{% macro default__edr_drop_schema(schema_name) %}
17+
{% set schema_relation = api.Relation.create(database=target.database, schema=schema_name) %}
18+
{% do dbt.drop_schema(schema_relation) %}
19+
{% do adapter.commit() %}
20+
{% endmacro %}
21+
22+
{% macro bigquery__edr_drop_schema(schema_name) %}
23+
{% set schema_relation = api.Relation.create(database=target.database, schema=schema_name) %}
24+
{% do dbt.drop_schema(schema_relation) %}
25+
{% endmacro %}
26+
27+
{% macro clickhouse__edr_drop_schema(schema_name) %}
28+
{% set quoted_schema = adapter.quote(schema_name) %}
29+
{% do run_query("DROP DATABASE IF EXISTS " ~ quoted_schema) %}
30+
{% do adapter.commit() %}
31+
{% endmacro %}
32+
33+
{% macro athena__edr_drop_schema(schema_name) %}
34+
{% set schema_relation = api.Relation.create(database=target.database, schema=schema_name) %}
35+
{% do dbt.drop_schema(schema_relation) %}
36+
{% endmacro %}

tests/unit/clients/dbt_runner/test_retry_logic.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,3 +201,116 @@ def test_successful_command_not_retried(self, mock_subprocess_run):
201201

202202
assert mock_subprocess_run.call_count == 1
203203
assert result is True
204+
205+
206+
def _make_api_runner(**kwargs):
207+
"""Create an APIDbtRunner with deps/packages stubbed out."""
208+
defaults = dict(
209+
project_dir="fake_project",
210+
profiles_dir="fake_profiles",
211+
target=None,
212+
raise_on_failure=False,
213+
run_deps_if_needed=False,
214+
)
215+
defaults.update(kwargs)
216+
from elementary.clients.dbt.api_dbt_runner import APIDbtRunner
217+
218+
with mock.patch.object(APIDbtRunner, "_run_deps_if_needed"):
219+
return APIDbtRunner(**defaults)
220+
221+
222+
@_ZERO_WAIT
223+
class TestAPIDbtRunnerTransientDetection:
224+
"""Test that APIDbtRunner surfaces exception text for transient error detection.
225+
226+
The dbt Python API (APIDbtRunner) only captures JinjaLogInfo and
227+
RunningOperationCaughtError events into ``output``. Transient errors
228+
like RemoteDisconnected appear as ``res.exception`` — not in the
229+
captured output. Without surfacing this, the retry logic has nothing
230+
to match against and never fires.
231+
"""
232+
233+
@mock.patch(
234+
"elementary.clients.dbt.api_dbt_runner.with_chdir",
235+
return_value=mock.MagicMock(
236+
__enter__=mock.MagicMock(), __exit__=mock.MagicMock()
237+
),
238+
)
239+
@mock.patch("elementary.clients.dbt.api_dbt_runner.dbtRunner")
240+
def test_transient_exception_triggers_retry(self, mock_dbt_runner_cls, _mock_chdir):
241+
"""A transient exception in res.exception should be retried."""
242+
# Simulate dbtRunnerResult with a transient exception.
243+
fail_result = mock.MagicMock()
244+
fail_result.success = False
245+
fail_result.exception = ConnectionError(
246+
"('Connection aborted.', "
247+
"RemoteDisconnected('Remote end closed connection without response'))"
248+
)
249+
250+
success_result = mock.MagicMock()
251+
success_result.success = True
252+
success_result.exception = None
253+
254+
# dbtRunner().invoke returns fail first, then success.
255+
mock_dbt_instance = mock.MagicMock()
256+
mock_dbt_instance.invoke.side_effect = [fail_result, success_result]
257+
mock_dbt_runner_cls.return_value = mock_dbt_instance
258+
259+
runner = _make_api_runner(raise_on_failure=False)
260+
result = runner.seed()
261+
262+
assert mock_dbt_instance.invoke.call_count == 2
263+
assert result is True
264+
265+
@mock.patch(
266+
"elementary.clients.dbt.api_dbt_runner.with_chdir",
267+
return_value=mock.MagicMock(
268+
__enter__=mock.MagicMock(), __exit__=mock.MagicMock()
269+
),
270+
)
271+
@mock.patch("elementary.clients.dbt.api_dbt_runner.dbtRunner")
272+
def test_non_transient_exception_not_retried(
273+
self, mock_dbt_runner_cls, _mock_chdir
274+
):
275+
"""A non-transient exception should NOT be retried."""
276+
fail_result = mock.MagicMock()
277+
fail_result.success = False
278+
fail_result.exception = Exception("Compilation Error in model foo")
279+
280+
mock_dbt_instance = mock.MagicMock()
281+
mock_dbt_instance.invoke.return_value = fail_result
282+
mock_dbt_runner_cls.return_value = mock_dbt_instance
283+
284+
runner = _make_api_runner(raise_on_failure=False)
285+
result = runner.seed()
286+
287+
assert mock_dbt_instance.invoke.call_count == 1
288+
assert result is False
289+
290+
@mock.patch(
291+
"elementary.clients.dbt.api_dbt_runner.with_chdir",
292+
return_value=mock.MagicMock(
293+
__enter__=mock.MagicMock(), __exit__=mock.MagicMock()
294+
),
295+
)
296+
@mock.patch("elementary.clients.dbt.api_dbt_runner.dbtRunner")
297+
def test_transient_exception_exhausts_retries(
298+
self, mock_dbt_runner_cls, _mock_chdir
299+
):
300+
"""After exhausting retries, the last failed result is returned."""
301+
fail_result = mock.MagicMock()
302+
fail_result.success = False
303+
fail_result.exception = ConnectionError(
304+
"('Connection aborted.', "
305+
"RemoteDisconnected('Remote end closed connection without response'))"
306+
)
307+
308+
mock_dbt_instance = mock.MagicMock()
309+
mock_dbt_instance.invoke.return_value = fail_result
310+
mock_dbt_runner_cls.return_value = mock_dbt_instance
311+
312+
runner = _make_api_runner(raise_on_failure=False)
313+
result = runner.seed()
314+
315+
assert mock_dbt_instance.invoke.call_count == _TRANSIENT_MAX_RETRIES
316+
assert result is False

0 commit comments

Comments
 (0)