Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions integration_tests/dbt_project/macros/insert_sentinel_row.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{% macro insert_sentinel_row(table_name, sentinel_alias) %}
{#- Insert a sentinel row into the given Elementary table.
Used by integration tests to verify that replace_table_data
actually truncates the whole table (sentinel disappears) rather
than doing a diff-based update (sentinel would survive).

Dynamically reads the table's columns so that every column is
included in the INSERT (Spark / Delta Lake rejects partial
column lists). Columns not explicitly set get NULL. -#}
{% set relation = ref(table_name) %}
{% set columns = adapter.get_columns_in_relation(relation) %}

{% set col_names = [] %}
{% set col_values = [] %}
{% for col in columns %}
{% do col_names.append(col.name) %}
{% if col.name | lower == "unique_id" %}
{% do col_values.append("'test.sentinel'") %}
{% elif col.name | lower == "alias" %}
{% do col_values.append("'" ~ sentinel_alias ~ "'") %}
{% elif col.name | lower == "name" %} {% do col_values.append("'sentinel'") %}
{% else %} {% do col_values.append("NULL") %}
{% endif %}
{% endfor %}

{% do run_query(
"INSERT INTO " ~ relation ~ " (" ~ col_names
| join(", ") ~ ")" ~ " VALUES (" ~ col_values
| join(", ") ~ ")"
) %}
Comment on lines +26 to +30
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Escape sentinel_alias before embedding it in the SQL literal.

run_operation passes raw strings into the macro, so any alias containing ' will produce invalid SQL here and make this helper flaky for future test cases.

Proposed fix
 {% macro insert_sentinel_row(table_name, sentinel_alias) %}
     {`#-` Insert a sentinel row into the given Elementary table.
         Used by integration tests to verify that replace_table_data
         actually truncates the whole table (sentinel disappears) rather
         than doing a diff-based update (sentinel would survive). -#}
     {% set relation = ref(table_name) %}
+    {% set escaped_sentinel_alias = sentinel_alias | replace("'", "''") %}
     {% do run_query(
         "INSERT INTO " ~ relation ~ " (unique_id, alias, name)"
-        " VALUES ('test.sentinel', '" ~ sentinel_alias ~ "', 'sentinel')"
+        " VALUES ('test.sentinel', '" ~ escaped_sentinel_alias ~ "', 'sentinel')"
     ) %}
 {% endmacro %}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
{% do run_query(
"INSERT INTO " ~ relation ~ " (unique_id, alias, name)"
" VALUES ('test.sentinel', '" ~ sentinel_alias ~ "', 'sentinel')"
) %}
{% macro insert_sentinel_row(table_name, sentinel_alias) %}
{`#-` Insert a sentinel row into the given Elementary table.
Used by integration tests to verify that replace_table_data
actually truncates the whole table (sentinel disappears) rather
than doing a diff-based update (sentinel would survive). -#}
{% set relation = ref(table_name) %}
{% set escaped_sentinel_alias = sentinel_alias | replace("'", "''") %}
{% do run_query(
"INSERT INTO " ~ relation ~ " (unique_id, alias, name)"
" VALUES ('test.sentinel', '" ~ escaped_sentinel_alias ~ "', 'sentinel')"
) %}
{% endmacro %}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@integration_tests/dbt_project/macros/insert_sentinel_row.sql` around lines 7
- 10, The SQL literal currently injects sentinel_alias directly into the
run_query call in insert_sentinel_row.sql, which breaks if the alias contains
single quotes; fix by escaping sentinel_alias before embedding (e.g., create a
local variable like escaped_alias = sentinel_alias | replace("'", "''") or use a
dbt escape helper) and use escaped_alias in the VALUES string passed to
run_query (reference the run_query call and the sentinel_alias variable in this
macro).


{#- Most SQL adapters need an explicit COMMIT because run_query DML is
not auto-committed. Spark-based and serverless engines do not
support bare COMMIT statements, so we skip them. -#}
{% set no_commit_adapters = ["spark", "databricks", "bigquery", "athena"] %}
{% if target.type not in no_commit_adapters %}
{% do run_query("COMMIT") %}
{% endif %}
{% endmacro %}
45 changes: 45 additions & 0 deletions integration_tests/tests/test_dbt_artifacts/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,51 @@ def test_artifacts_collection_in_multiple_row_batches(dbt_project: DbtProject):
assert len(existing_artifacts) == len(new_artifacts)


def test_replace_table_data(dbt_project: DbtProject):
"""Validate that replace_table_data actually replaces (not diffs) data.

Sets cache_artifacts=False so the upload path uses replace_table_data.
Inserts an unrelated sentinel row into dbt_models *before* the replace
run, then asserts it was removed — proving a full table replace happened
rather than a diff-based update (which would leave unrelated rows intact).
"""
dbt_project.dbt_runner.vars["disable_dbt_artifacts_autoupload"] = False
dbt_project.dbt_runner.vars["cache_artifacts"] = False

SENTINEL_ALIAS = "__replace_test_sentinel__"

# Populate the table with real artifacts first.
dbt_project.dbt_runner.run(select=TEST_MODEL)

# Inject a sentinel row that no real dbt model would produce.
# Uses a dbt macro (run_operation) so the INSERT is committed properly
# across all adapters.
dbt_project.dbt_runner.run_operation(
"elementary_tests.insert_sentinel_row",
macro_args={"table_name": "dbt_models", "sentinel_alias": SENTINEL_ALIAS},
)
Comment thread
haritamar marked this conversation as resolved.
sentinel_rows = dbt_project.read_table(
"dbt_models", where=f"alias = '{SENTINEL_ALIAS}'", raise_if_empty=False
)
assert len(sentinel_rows) == 1, "Sentinel row was not inserted"

# Run again with cache_artifacts=False → triggers replace_table_data.
dbt_project.dbt_runner.run(select=TEST_MODEL)

# The sentinel must be gone — replace_table_data wipes the whole table.
sentinel_after = dbt_project.read_table(
"dbt_models", where=f"alias = '{SENTINEL_ALIAS}'", raise_if_empty=False
)
assert len(sentinel_after) == 0, (
"replace_table_data did not remove unrelated rows — "
"sentinel row still present (diff mode would keep it, replace should not)"
)

# The real model row must still exist.
real_row = read_model_artifact_row(dbt_project)
assert real_row["alias"] == TEST_MODEL

Comment thread
haritamar marked this conversation as resolved.

def test_dbt_invocations(dbt_project: DbtProject):
dbt_project.dbt_runner.vars["disable_dbt_invocation_autoupload"] = False
dbt_project.dbt_runner.run(selector="one")
Expand Down
87 changes: 71 additions & 16 deletions macros/utils/table_operations/replace_table_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,37 @@
) %}
{% endmacro %}

{# In Postgres / Redshift we do not want to replace the table, because that will cause views without
late binding to be deleted. So instead we atomically replace the data in a transaction #}
{# Postgres - atomically replace data without dropping the table (preserves views).
Each statement is executed separately for post_hook compatibility. #}
{% macro postgres__replace_table_data(relation, rows) %}
{% set intermediate_relation = elementary.create_intermediate_relation(
relation, rows, temporary=True
) %}

{% set query %}
begin transaction;
delete from {{ relation }}; -- truncate supported in Redshift transactions, but causes an immediate commit
insert into {{ relation }} select * from {{ intermediate_relation }};
commit;
{% endset %}
{% do elementary.run_query(query) %}
{% do elementary.run_query("begin") %}
{% do elementary.run_query("delete from " ~ relation) %}
{% do elementary.run_query(
"insert into " ~ relation ~ " select * from " ~ intermediate_relation
) %}
{% do elementary.run_query("commit") %}

{% do adapter.drop_relation(intermediate_relation) %}
{% endmacro %}

{# Redshift - replace data without dropping the table (preserves late-binding views).
Separate statements without explicit transaction for post_hook compatibility
(Redshift cannot run multiple statements in a single prepared statement).
NOTE: Non-atomic - if insert fails after delete, data is lost until the next run.
Acceptable here because these are internal artifact tables that are regenerated. #}
{% macro redshift__replace_table_data(relation, rows) %}
{% set intermediate_relation = elementary.create_intermediate_relation(
relation, rows, temporary=True
) %}

{% do elementary.run_query("delete from " ~ relation) %}
{% do elementary.run_query(
"insert into " ~ relation ~ " select * from " ~ intermediate_relation
) %}

{% do adapter.drop_relation(intermediate_relation) %}
{% endmacro %}
Expand All @@ -81,17 +98,16 @@
) %}
{% endmacro %}

{# Trino - drop and recreate (Trino does not support CREATE OR REPLACE TABLE) #}
{% macro trino__replace_table_data(relation, rows) %}
{% set intermediate_relation = elementary.create_intermediate_relation(
relation, rows, temporary=True
) %}
{% do elementary.run_query(
adapter.dispatch("create_table_as")(
False,
relation,
"select * from {}".format(intermediate_relation),
replace=true,
)
{% do elementary.edr_create_table_as(
False,
relation,
"select * from {}".format(intermediate_relation),
drop_first=true,
) %}
{% do adapter.drop_relation(intermediate_relation) %}
{% endmacro %}
Expand All @@ -106,3 +122,42 @@
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
) %}
{% endmacro %}

{# ClickHouse - cluster-aware truncate and insert (non-atomic).
Uses explicit TRUNCATE with on_cluster_clause for distributed/replicated tables,
matching the pattern in delete_and_insert.sql and clean_elementary_test_tables.sql. #}
{% macro clickhouse__replace_table_data(relation, rows) %}
{% do elementary.run_query(
"truncate table " ~ relation ~ " " ~ on_cluster_clause(relation)
) %}
{% do elementary.insert_rows(
relation,
rows,
should_commit=false,
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
) %}
{% endmacro %}

{# Vertica - truncate and insert (non-atomic) #}
{% macro vertica__replace_table_data(relation, rows) %}
{% do dbt.truncate_relation(relation) %}
{% do elementary.insert_rows(
relation,
rows,
should_commit=false,
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
) %}
{% endmacro %}

{# Fabric / SQL Server - truncate and insert (non-atomic).
sqlserver dispatches through fabric via the chain: sqlserver__ -> fabric__ -> default__,
so this covers both adapters. #}
{% macro fabric__replace_table_data(relation, rows) %}
{% do dbt.truncate_relation(relation) %}
{% do elementary.insert_rows(
relation,
rows,
should_commit=false,
chunk_size=elementary.get_config_var("dbt_artifacts_chunk_size"),
) %}
{% endmacro %}
Loading