Skip to content

Commit d0ab63b

Browse files
just prevent the crashing
1 parent 1c3a526 commit d0ab63b

2 files changed

Lines changed: 19 additions & 95 deletions

File tree

integration_tests/tests/test_dbt_artifacts/test_artifacts.py

Lines changed: 1 addition & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ def test_metrics_anomaly_score(dbt_project: DbtProject):
143143

144144

145145
@pytest.mark.requires_dbt_version("1.8.0")
146+
@pytest.mark.skip_for_dbt_fusion
146147
def test_source_freshness_results(test_id: str, dbt_project: DbtProject):
147148
database_property, schema_property = get_database_and_schema_properties(
148149
dbt_project.target
@@ -197,65 +198,6 @@ def test_source_freshness_results(test_id: str, dbt_project: DbtProject):
197198
)
198199

199200

200-
@pytest.mark.requires_dbt_version("1.8.0")
201-
def test_source_freshness_results_with_errored_source(
202-
test_id: str, dbt_project: DbtProject
203-
):
204-
# An errored source (bad loaded_at_field) produces a "runtime error" freshness
205-
# result. On dbt-fusion such results have a none `node`, which used to crash the
206-
# on_run_end upload hook for the whole batch. This guards that a healthy source
207-
# in the same run still gets uploaded even when an errored source is present.
208-
database_property, schema_property = get_database_and_schema_properties(
209-
dbt_project.target
210-
)
211-
healthy_loaded_at_field = (
212-
'"UPDATE_TIME"::timestamp'
213-
if dbt_project.target != "dremio"
214-
else "TO_TIMESTAMP(SUBSTRING(UPDATE_TIME, 0, 23), 'YYYY-MM-DD HH24:MI:SS.FFF')"
215-
)
216-
healthy_name = f"{test_id}_healthy"
217-
errored_name = f"{test_id}_errored"
218-
219-
def _table_def(name, loaded_at_field):
220-
return {
221-
"name": name,
222-
"config": {
223-
"loaded_at_field": loaded_at_field,
224-
"freshness": {"warn_after": {"count": 1, "period": "hour"}},
225-
},
226-
}
227-
228-
source_def = {
229-
"name": "test_source",
230-
"schema": f"{{{{ target.{schema_property} }}}}",
231-
"tables": [
232-
_table_def(healthy_name, healthy_loaded_at_field),
233-
# Non-existent column -> freshness query fails -> "runtime error" result.
234-
_table_def(errored_name, '"DOES_NOT_EXIST"::timestamp'),
235-
],
236-
}
237-
if database_property is not None:
238-
source_def["database"] = f"{{{{ target.{database_property} }}}}"
239-
source_config = {"version": 2, "sources": [source_def]}
240-
241-
seed_row = [{"UPDATE_TIME": datetime.now()}]
242-
dbt_project.seed(seed_row, healthy_name)
243-
dbt_project.seed(seed_row, errored_name)
244-
245-
dbt_project.dbt_runner.vars["disable_freshness_results"] = False
246-
with dbt_project.write_yaml(content=source_config), set_flags(
247-
dbt_project, {"source_freshness_run_project_hooks": True}
248-
):
249-
# Must not raise from the on_run_end hook even though one source errors.
250-
dbt_project.dbt_runner.source_freshness()
251-
# The healthy source is still uploaded despite the errored one in the batch.
252-
dbt_project.read_table(
253-
"dbt_source_freshness_results",
254-
where=f"unique_id = 'source.elementary_tests.test_source.{healthy_name}'",
255-
raise_if_empty=True,
256-
)
257-
258-
259201
def test_timings(dbt_project: DbtProject):
260202
dbt_project.dbt_runner.vars["disable_dbt_artifacts_autoupload"] = False
261203
dbt_project.dbt_runner.vars["disable_run_results"] = False

macros/edr/dbt_artifacts/upload_source_freshness.sql

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,51 +20,33 @@
2020

2121
{% macro process_freshness_result(result) %}
2222
{% set result_dict = result.to_dict() %}
23-
2423
{#
25-
dbt-core nests the source identifiers under `node` (a full source node),
26-
while dbt-fusion returns a flat result that mirrors `sources.json`: `node`
27-
is none and `unique_id` / `criteria` live at the top level. Resolve from
28-
whichever is populated so both engines upload complete results.
24+
dbt-fusion returns a none `node` for some freshness results (e.g. errored
25+
sources), unlike dbt-core. Skip them so the on_run_end hook does not crash
26+
on `result_dict.node.unique_id`.
2927
#}
30-
{% set node = result_dict.get("node") %}
31-
{% set has_node = node is not none and node is not undefined %}
32-
{% set unique_id = (
33-
node.get("unique_id") if has_node else result_dict.get("unique_id")
34-
) %}
35-
36-
{% if unique_id is none %}
37-
{# Nothing identifiable to record (e.g. fusion error result with no node). #}
38-
{% do return(none) %}
39-
{% endif %}
40-
41-
{% if result_dict.get("status") == "runtime error" %}
28+
{% if result_dict.get("node") is none %} {% do return(none) %} {% endif %}
29+
{% if result_dict.status == "runtime error" %}
4230
{% do return(
4331
{
44-
"unique_id": unique_id,
45-
"status": result_dict.get("status"),
46-
"error": result_dict.get("message") or result_dict.get("error"),
32+
"unique_id": result_dict.node.unique_id,
33+
"status": result_dict.status,
34+
"error": result_dict.message,
4735
}
4836
) %}
4937
{% endif %}
50-
51-
{% set criteria = (
52-
node.get("freshness", {}) if has_node else result_dict.get("criteria", {})
53-
) %}
5438
{% do return(
5539
{
56-
"unique_id": unique_id,
57-
"status": result_dict.get("status"),
58-
"max_loaded_at": result_dict.get("max_loaded_at"),
59-
"snapshotted_at": result_dict.get("snapshotted_at"),
60-
"max_loaded_at_time_ago_in_s": result_dict.get("age")
61-
if result_dict.get("age") is not none
62-
else result_dict.get("max_loaded_at_time_ago_in_s"),
63-
"criteria": criteria or {},
64-
"adapter_response": result_dict.get("adapter_response"),
65-
"timing": result_dict.get("timing"),
66-
"thread_id": result_dict.get("thread_id"),
67-
"execution_time": result_dict.get("execution_time"),
40+
"unique_id": result_dict.node.unique_id,
41+
"status": result_dict.status,
42+
"max_loaded_at": result_dict.max_loaded_at,
43+
"snapshotted_at": result_dict.snapshotted_at,
44+
"max_loaded_at_time_ago_in_s": result_dict.age,
45+
"criteria": result_dict.node.get("freshness", {}),
46+
"adapter_response": result_dict.adapter_response,
47+
"timing": result_dict.timing,
48+
"thread_id": result_dict.thread_id,
49+
"execution_time": result_dict.execution_time,
6850
}
6951
) %}
7052
{% endmacro %}

0 commit comments

Comments
 (0)