diff --git a/integration_tests/dbt_project/macros/insert_sentinel_row.sql b/integration_tests/dbt_project/macros/insert_sentinel_row.sql new file mode 100644 index 000000000..022480977 --- /dev/null +++ b/integration_tests/dbt_project/macros/insert_sentinel_row.sql @@ -0,0 +1,39 @@ +{% macro insert_sentinel_row(table_name, sentinel_alias) %} + {#- Insert a sentinel row into the given Elementary table. + Used by integration tests to verify that replace_table_data + actually truncates the whole table (sentinel disappears) rather + than doing a diff-based update (sentinel would survive). + + Dynamically reads the table's columns so that every column is + included in the INSERT (Spark / Delta Lake rejects partial + column lists). Columns not explicitly set get NULL. -#} + {% set relation = ref(table_name) %} + {% set columns = adapter.get_columns_in_relation(relation) %} + + {% set col_names = [] %} + {% set col_values = [] %} + {% for col in columns %} + {% do col_names.append(col.name) %} + {% if col.name | lower == "unique_id" %} + {% do col_values.append("'test.sentinel'") %} + {% elif col.name | lower == "alias" %} + {% do col_values.append("'" ~ sentinel_alias ~ "'") %} + {% elif col.name | lower == "name" %} {% do col_values.append("'sentinel'") %} + {% else %} {% do col_values.append("NULL") %} + {% endif %} + {% endfor %} + + {% do run_query( + "INSERT INTO " ~ relation ~ " (" ~ col_names + | join(", ") ~ ")" ~ " VALUES (" ~ col_values + | join(", ") ~ ")" + ) %} + + {#- Most SQL adapters need an explicit COMMIT because run_query DML is + not auto-committed. Spark-based and serverless engines do not + support bare COMMIT statements, so we skip them. -#} + {% set no_commit_adapters = ["spark", "databricks", "bigquery", "athena"] %} + {% if target.type not in no_commit_adapters %} + {% do run_query("COMMIT") %} + {% endif %} +{% endmacro %} diff --git a/integration_tests/tests/test_dbt_artifacts/test_artifacts.py b/integration_tests/tests/test_dbt_artifacts/test_artifacts.py index f8dd18989..99137c010 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_artifacts.py +++ b/integration_tests/tests/test_dbt_artifacts/test_artifacts.py @@ -57,6 +57,51 @@ def test_artifacts_collection_in_multiple_row_batches(dbt_project: DbtProject): assert len(existing_artifacts) == len(new_artifacts) +def test_replace_table_data(dbt_project: DbtProject): + """Validate that replace_table_data actually replaces (not diffs) data. + + Sets cache_artifacts=False so the upload path uses replace_table_data. + Inserts an unrelated sentinel row into dbt_models *before* the replace + run, then asserts it was removed — proving a full table replace happened + rather than a diff-based update (which would leave unrelated rows intact). + """ + dbt_project.dbt_runner.vars["disable_dbt_artifacts_autoupload"] = False + dbt_project.dbt_runner.vars["cache_artifacts"] = False + + SENTINEL_ALIAS = "__replace_test_sentinel__" + + # Populate the table with real artifacts first. + dbt_project.dbt_runner.run(select=TEST_MODEL) + + # Inject a sentinel row that no real dbt model would produce. + # Uses a dbt macro (run_operation) so the INSERT is committed properly + # across all adapters. + dbt_project.dbt_runner.run_operation( + "elementary_tests.insert_sentinel_row", + macro_args={"table_name": "dbt_models", "sentinel_alias": SENTINEL_ALIAS}, + ) + sentinel_rows = dbt_project.read_table( + "dbt_models", where=f"alias = '{SENTINEL_ALIAS}'", raise_if_empty=False + ) + assert len(sentinel_rows) == 1, "Sentinel row was not inserted" + + # Run again with cache_artifacts=False → triggers replace_table_data. + dbt_project.dbt_runner.run(select=TEST_MODEL) + + # The sentinel must be gone — replace_table_data wipes the whole table. + sentinel_after = dbt_project.read_table( + "dbt_models", where=f"alias = '{SENTINEL_ALIAS}'", raise_if_empty=False + ) + assert len(sentinel_after) == 0, ( + "replace_table_data did not remove unrelated rows — " + "sentinel row still present (diff mode would keep it, replace should not)" + ) + + # The real model row must still exist. + real_row = read_model_artifact_row(dbt_project) + assert real_row["alias"] == TEST_MODEL + + def test_dbt_invocations(dbt_project: DbtProject): dbt_project.dbt_runner.vars["disable_dbt_invocation_autoupload"] = False dbt_project.dbt_runner.run(selector="one") diff --git a/macros/utils/table_operations/replace_table_data.sql b/macros/utils/table_operations/replace_table_data.sql index 5ee153d50..c55dba998 100644 --- a/macros/utils/table_operations/replace_table_data.sql +++ b/macros/utils/table_operations/replace_table_data.sql @@ -53,20 +53,37 @@ ) %} {% endmacro %} -{# In Postgres / Redshift we do not want to replace the table, because that will cause views without - late binding to be deleted. So instead we atomically replace the data in a transaction #} +{# Postgres - atomically replace data without dropping the table (preserves views). + Each statement is executed separately for post_hook compatibility. #} {% macro postgres__replace_table_data(relation, rows) %} {% set intermediate_relation = elementary.create_intermediate_relation( relation, rows, temporary=True ) %} - {% set query %} - begin transaction; - delete from {{ relation }}; -- truncate supported in Redshift transactions, but causes an immediate commit - insert into {{ relation }} select * from {{ intermediate_relation }}; - commit; - {% endset %} - {% do elementary.run_query(query) %} + {% do elementary.run_query("begin") %} + {% do elementary.run_query("delete from " ~ relation) %} + {% do elementary.run_query( + "insert into " ~ relation ~ " select * from " ~ intermediate_relation + ) %} + {% do elementary.run_query("commit") %} + + {% do adapter.drop_relation(intermediate_relation) %} +{% endmacro %} + +{# Redshift - replace data without dropping the table (preserves late-binding views). + Separate statements without explicit transaction for post_hook compatibility + (Redshift cannot run multiple statements in a single prepared statement). + NOTE: Non-atomic - if insert fails after delete, data is lost until the next run. + Acceptable here because these are internal artifact tables that are regenerated. #} +{% macro redshift__replace_table_data(relation, rows) %} + {% set intermediate_relation = elementary.create_intermediate_relation( + relation, rows, temporary=True + ) %} + + {% do elementary.run_query("delete from " ~ relation) %} + {% do elementary.run_query( + "insert into " ~ relation ~ " select * from " ~ intermediate_relation + ) %} {% do adapter.drop_relation(intermediate_relation) %} {% endmacro %} @@ -81,17 +98,16 @@ ) %} {% endmacro %} +{# Trino - drop and recreate (Trino does not support CREATE OR REPLACE TABLE) #} {% macro trino__replace_table_data(relation, rows) %} {% set intermediate_relation = elementary.create_intermediate_relation( relation, rows, temporary=True ) %} - {% do elementary.run_query( - adapter.dispatch("create_table_as")( - False, - relation, - "select * from {}".format(intermediate_relation), - replace=true, - ) + {% do elementary.edr_create_table_as( + False, + relation, + "select * from {}".format(intermediate_relation), + drop_first=true, ) %} {% do adapter.drop_relation(intermediate_relation) %} {% endmacro %} @@ -106,3 +122,42 @@ chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"), ) %} {% endmacro %} + +{# ClickHouse - cluster-aware truncate and insert (non-atomic). + Uses explicit TRUNCATE with on_cluster_clause for distributed/replicated tables, + matching the pattern in delete_and_insert.sql and clean_elementary_test_tables.sql. #} +{% macro clickhouse__replace_table_data(relation, rows) %} + {% do elementary.run_query( + "truncate table " ~ relation ~ " " ~ on_cluster_clause(relation) + ) %} + {% do elementary.insert_rows( + relation, + rows, + should_commit=false, + chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"), + ) %} +{% endmacro %} + +{# Vertica - truncate and insert (non-atomic) #} +{% macro vertica__replace_table_data(relation, rows) %} + {% do dbt.truncate_relation(relation) %} + {% do elementary.insert_rows( + relation, + rows, + should_commit=false, + chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"), + ) %} +{% endmacro %} + +{# Fabric / SQL Server - truncate and insert (non-atomic). + sqlserver dispatches through fabric via the chain: sqlserver__ -> fabric__ -> default__, + so this covers both adapters. #} +{% macro fabric__replace_table_data(relation, rows) %} + {% do dbt.truncate_relation(relation) %} + {% do elementary.insert_rows( + relation, + rows, + should_commit=false, + chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"), + ) %} +{% endmacro %}