Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions integration_tests/tests/test_dbt_artifacts/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,36 @@ def test_artifacts_collection_in_multiple_row_batches(dbt_project: DbtProject):
assert len(existing_artifacts) == len(new_artifacts)


def test_replace_table_data(dbt_project: DbtProject):
"""Validate that replace_table_data actually replaces data on the current adapter.

Sets cache_artifacts=False so the upload path uses replace_table_data
instead of the diff-based method. Changes the model owner between two
replace runs and asserts the new owner is present (proving the table was
actually replaced, not just appended to or left unchanged).
"""
dbt_project.dbt_runner.vars["disable_dbt_artifacts_autoupload"] = False
dbt_project.dbt_runner.vars["cache_artifacts"] = False

# First replace run — upload artifacts with default owner.
dbt_project.dbt_runner.run(select=TEST_MODEL)
first_row = read_model_artifact_row(dbt_project)

# Second replace run — change the owner so we can detect the replace.
dbt_project.dbt_runner.run(
select=TEST_MODEL, vars={"one_owner": "replace_test_owner"}
)
Comment thread
haritamar marked this conversation as resolved.
second_row = read_model_artifact_row(dbt_project)

# The row must reflect the new owner, proving an actual replace happened.
assert second_row["owner"] != first_row["owner"], (
"replace_table_data did not replace the data — "
"owner is still '{}' after a run with a different owner".format(
second_row["owner"]
)
)

Comment thread
haritamar marked this conversation as resolved.

def test_dbt_invocations(dbt_project: DbtProject):
dbt_project.dbt_runner.vars["disable_dbt_invocation_autoupload"] = False
dbt_project.dbt_runner.run(selector="one")
Expand Down
87 changes: 71 additions & 16 deletions macros/utils/table_operations/replace_table_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,37 @@
) %}
{% endmacro %}

{# In Postgres / Redshift we do not want to replace the table, because that will cause views without
late binding to be deleted. So instead we atomically replace the data in a transaction #}
{# Postgres - atomically replace data without dropping the table (preserves views).
Each statement is executed separately for post_hook compatibility. #}
{% macro postgres__replace_table_data(relation, rows) %}
{% set intermediate_relation = elementary.create_intermediate_relation(
relation, rows, temporary=True
) %}

{% set query %}
begin transaction;
delete from {{ relation }}; -- truncate supported in Redshift transactions, but causes an immediate commit
insert into {{ relation }} select * from {{ intermediate_relation }};
commit;
{% endset %}
{% do elementary.run_query(query) %}
{% do elementary.run_query("begin") %}
{% do elementary.run_query("delete from " ~ relation) %}
{% do elementary.run_query(
"insert into " ~ relation ~ " select * from " ~ intermediate_relation
) %}
{% do elementary.run_query("commit") %}

{% do adapter.drop_relation(intermediate_relation) %}
{% endmacro %}

{# Redshift - replace data without dropping the table (preserves late-binding views).
Separate statements without explicit transaction for post_hook compatibility
(Redshift cannot run multiple statements in a single prepared statement).
NOTE: Non-atomic - if insert fails after delete, data is lost until the next run.
Acceptable here because these are internal artifact tables that are regenerated. #}
{% macro redshift__replace_table_data(relation, rows) %}
{% set intermediate_relation = elementary.create_intermediate_relation(
relation, rows, temporary=True
) %}

{% do elementary.run_query("delete from " ~ relation) %}
{% do elementary.run_query(
"insert into " ~ relation ~ " select * from " ~ intermediate_relation
) %}

{% do adapter.drop_relation(intermediate_relation) %}
{% endmacro %}
Expand All @@ -81,17 +98,16 @@
) %}
{% endmacro %}

{# Trino - drop and recreate (Trino does not support CREATE OR REPLACE TABLE) #}
{% macro trino__replace_table_data(relation, rows) %}
{% set intermediate_relation = elementary.create_intermediate_relation(
relation, rows, temporary=True
) %}
{% do elementary.run_query(
adapter.dispatch("create_table_as")(
False,
relation,
"select * from {}".format(intermediate_relation),
replace=true,
)
{% do elementary.edr_create_table_as(
False,
relation,
"select * from {}".format(intermediate_relation),
drop_first=true,
) %}
{% do adapter.drop_relation(intermediate_relation) %}
{% endmacro %}
Expand All @@ -106,3 +122,42 @@
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
) %}
{% endmacro %}

{# ClickHouse - cluster-aware truncate and insert (non-atomic).
Uses explicit TRUNCATE with on_cluster_clause for distributed/replicated tables,
matching the pattern in delete_and_insert.sql and clean_elementary_test_tables.sql. #}
{% macro clickhouse__replace_table_data(relation, rows) %}
{% do elementary.run_query(
"truncate table " ~ relation ~ " " ~ on_cluster_clause(relation)
) %}
{% do elementary.insert_rows(
relation,
rows,
should_commit=false,
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
) %}
{% endmacro %}

{# Vertica - truncate and insert (non-atomic) #}
{% macro vertica__replace_table_data(relation, rows) %}
{% do dbt.truncate_relation(relation) %}
{% do elementary.insert_rows(
relation,
rows,
should_commit=false,
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
) %}
{% endmacro %}

{# Fabric / SQL Server - truncate and insert (non-atomic).
sqlserver dispatches through fabric via the chain: sqlserver__ -> fabric__ -> default__,
so this covers both adapters. #}
{% macro fabric__replace_table_data(relation, rows) %}
{% do dbt.truncate_relation(relation) %}
{% do elementary.insert_rows(
relation,
rows,
should_commit=false,
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
) %}
{% endmacro %}
Loading