Skip to content

Commit f820e11

Browse files
fix: fix replace_table_data broken on multiple adapters (Trino, Redshift, etc.) (#969)
1 parent 87b8dbc commit f820e11

File tree

3 files changed

+155
-16
lines changed

3 files changed

+155
-16
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{% macro insert_sentinel_row(table_name, sentinel_alias) %}
2+
{#- Insert a sentinel row into the given Elementary table.
3+
Used by integration tests to verify that replace_table_data
4+
actually truncates the whole table (sentinel disappears) rather
5+
than doing a diff-based update (sentinel would survive).
6+
7+
Dynamically reads the table's columns so that every column is
8+
included in the INSERT (Spark / Delta Lake rejects partial
9+
column lists). Columns not explicitly set get NULL. -#}
10+
{% set relation = ref(table_name) %}
11+
{% set columns = adapter.get_columns_in_relation(relation) %}
12+
13+
{% set col_names = [] %}
14+
{% set col_values = [] %}
15+
{% for col in columns %}
16+
{% do col_names.append(col.name) %}
17+
{% if col.name | lower == "unique_id" %}
18+
{% do col_values.append("'test.sentinel'") %}
19+
{% elif col.name | lower == "alias" %}
20+
{% do col_values.append("'" ~ sentinel_alias ~ "'") %}
21+
{% elif col.name | lower == "name" %} {% do col_values.append("'sentinel'") %}
22+
{% else %} {% do col_values.append("NULL") %}
23+
{% endif %}
24+
{% endfor %}
25+
26+
{% do run_query(
27+
"INSERT INTO " ~ relation ~ " (" ~ col_names
28+
| join(", ") ~ ")" ~ " VALUES (" ~ col_values
29+
| join(", ") ~ ")"
30+
) %}
31+
32+
{#- Most SQL adapters need an explicit COMMIT because run_query DML is
33+
not auto-committed. Spark-based and serverless engines do not
34+
support bare COMMIT statements, so we skip them. -#}
35+
{% set no_commit_adapters = ["spark", "databricks", "bigquery", "athena"] %}
36+
{% if target.type not in no_commit_adapters %}
37+
{% do run_query("COMMIT") %}
38+
{% endif %}
39+
{% endmacro %}

integration_tests/tests/test_dbt_artifacts/test_artifacts.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,51 @@ def test_artifacts_collection_in_multiple_row_batches(dbt_project: DbtProject):
5757
assert len(existing_artifacts) == len(new_artifacts)
5858

5959

60+
def test_replace_table_data(dbt_project: DbtProject):
61+
"""Validate that replace_table_data actually replaces (not diffs) data.
62+
63+
Sets cache_artifacts=False so the upload path uses replace_table_data.
64+
Inserts an unrelated sentinel row into dbt_models *before* the replace
65+
run, then asserts it was removed — proving a full table replace happened
66+
rather than a diff-based update (which would leave unrelated rows intact).
67+
"""
68+
dbt_project.dbt_runner.vars["disable_dbt_artifacts_autoupload"] = False
69+
dbt_project.dbt_runner.vars["cache_artifacts"] = False
70+
71+
SENTINEL_ALIAS = "__replace_test_sentinel__"
72+
73+
# Populate the table with real artifacts first.
74+
dbt_project.dbt_runner.run(select=TEST_MODEL)
75+
76+
# Inject a sentinel row that no real dbt model would produce.
77+
# Uses a dbt macro (run_operation) so the INSERT is committed properly
78+
# across all adapters.
79+
dbt_project.dbt_runner.run_operation(
80+
"elementary_tests.insert_sentinel_row",
81+
macro_args={"table_name": "dbt_models", "sentinel_alias": SENTINEL_ALIAS},
82+
)
83+
sentinel_rows = dbt_project.read_table(
84+
"dbt_models", where=f"alias = '{SENTINEL_ALIAS}'", raise_if_empty=False
85+
)
86+
assert len(sentinel_rows) == 1, "Sentinel row was not inserted"
87+
88+
# Run again with cache_artifacts=False → triggers replace_table_data.
89+
dbt_project.dbt_runner.run(select=TEST_MODEL)
90+
91+
# The sentinel must be gone — replace_table_data wipes the whole table.
92+
sentinel_after = dbt_project.read_table(
93+
"dbt_models", where=f"alias = '{SENTINEL_ALIAS}'", raise_if_empty=False
94+
)
95+
assert len(sentinel_after) == 0, (
96+
"replace_table_data did not remove unrelated rows — "
97+
"sentinel row still present (diff mode would keep it, replace should not)"
98+
)
99+
100+
# The real model row must still exist.
101+
real_row = read_model_artifact_row(dbt_project)
102+
assert real_row["alias"] == TEST_MODEL
103+
104+
60105
def test_dbt_invocations(dbt_project: DbtProject):
61106
dbt_project.dbt_runner.vars["disable_dbt_invocation_autoupload"] = False
62107
dbt_project.dbt_runner.run(selector="one")

macros/utils/table_operations/replace_table_data.sql

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,37 @@
5353
) %}
5454
{% endmacro %}
5555

56-
{# In Postgres / Redshift we do not want to replace the table, because that will cause views without
57-
late binding to be deleted. So instead we atomically replace the data in a transaction #}
56+
{# Postgres - atomically replace data without dropping the table (preserves views).
57+
Each statement is executed separately for post_hook compatibility. #}
5858
{% macro postgres__replace_table_data(relation, rows) %}
5959
{% set intermediate_relation = elementary.create_intermediate_relation(
6060
relation, rows, temporary=True
6161
) %}
6262

63-
{% set query %}
64-
begin transaction;
65-
delete from {{ relation }}; -- truncate supported in Redshift transactions, but causes an immediate commit
66-
insert into {{ relation }} select * from {{ intermediate_relation }};
67-
commit;
68-
{% endset %}
69-
{% do elementary.run_query(query) %}
63+
{% do elementary.run_query("begin") %}
64+
{% do elementary.run_query("delete from " ~ relation) %}
65+
{% do elementary.run_query(
66+
"insert into " ~ relation ~ " select * from " ~ intermediate_relation
67+
) %}
68+
{% do elementary.run_query("commit") %}
69+
70+
{% do adapter.drop_relation(intermediate_relation) %}
71+
{% endmacro %}
72+
73+
{# Redshift - replace data without dropping the table (preserves late-binding views).
74+
Separate statements without explicit transaction for post_hook compatibility
75+
(Redshift cannot run multiple statements in a single prepared statement).
76+
NOTE: Non-atomic - if insert fails after delete, data is lost until the next run.
77+
Acceptable here because these are internal artifact tables that are regenerated. #}
78+
{% macro redshift__replace_table_data(relation, rows) %}
79+
{% set intermediate_relation = elementary.create_intermediate_relation(
80+
relation, rows, temporary=True
81+
) %}
82+
83+
{% do elementary.run_query("delete from " ~ relation) %}
84+
{% do elementary.run_query(
85+
"insert into " ~ relation ~ " select * from " ~ intermediate_relation
86+
) %}
7087

7188
{% do adapter.drop_relation(intermediate_relation) %}
7289
{% endmacro %}
@@ -81,17 +98,16 @@
8198
) %}
8299
{% endmacro %}
83100

101+
{# Trino - drop and recreate (Trino does not support CREATE OR REPLACE TABLE) #}
84102
{% macro trino__replace_table_data(relation, rows) %}
85103
{% set intermediate_relation = elementary.create_intermediate_relation(
86104
relation, rows, temporary=True
87105
) %}
88-
{% do elementary.run_query(
89-
adapter.dispatch("create_table_as")(
90-
False,
91-
relation,
92-
"select * from {}".format(intermediate_relation),
93-
replace=true,
94-
)
106+
{% do elementary.edr_create_table_as(
107+
False,
108+
relation,
109+
"select * from {}".format(intermediate_relation),
110+
drop_first=true,
95111
) %}
96112
{% do adapter.drop_relation(intermediate_relation) %}
97113
{% endmacro %}
@@ -106,3 +122,42 @@
106122
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
107123
) %}
108124
{% endmacro %}
125+
126+
{# ClickHouse - cluster-aware truncate and insert (non-atomic).
127+
Uses explicit TRUNCATE with on_cluster_clause for distributed/replicated tables,
128+
matching the pattern in delete_and_insert.sql and clean_elementary_test_tables.sql. #}
129+
{% macro clickhouse__replace_table_data(relation, rows) %}
130+
{% do elementary.run_query(
131+
"truncate table " ~ relation ~ " " ~ on_cluster_clause(relation)
132+
) %}
133+
{% do elementary.insert_rows(
134+
relation,
135+
rows,
136+
should_commit=false,
137+
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
138+
) %}
139+
{% endmacro %}
140+
141+
{# Vertica - truncate and insert (non-atomic) #}
142+
{% macro vertica__replace_table_data(relation, rows) %}
143+
{% do dbt.truncate_relation(relation) %}
144+
{% do elementary.insert_rows(
145+
relation,
146+
rows,
147+
should_commit=false,
148+
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
149+
) %}
150+
{% endmacro %}
151+
152+
{# Fabric / SQL Server - truncate and insert (non-atomic).
153+
sqlserver dispatches through fabric via the chain: sqlserver__ -> fabric__ -> default__,
154+
so this covers both adapters. #}
155+
{% macro fabric__replace_table_data(relation, rows) %}
156+
{% do dbt.truncate_relation(relation) %}
157+
{% do elementary.insert_rows(
158+
relation,
159+
rows,
160+
should_commit=false,
161+
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
162+
) %}
163+
{% endmacro %}

0 commit comments

Comments
 (0)