From a2b3b1fc9954a02a88da4c77f56955084a6538c3 Mon Sep 17 00:00:00 2001 From: Ben Knight Date: Wed, 20 May 2026 12:49:31 +0000 Subject: [PATCH] Add DML table refresh method for table materializations (#641). Adds an opt-in `table_refresh_method` model config: - `'rename'` (default): unchanged rename-swap behaviour. - `'dml'`: builds new data into a scratch table, then DELETE + INSERT into the target inside an explicit BEGIN/COMMIT TRANSACTION. Under RCSI, the DML path keeps the target name resolvable throughout the swap so concurrent readers see the old data until COMMIT. The scratch table is built via a temp view so CTEs in the model SQL work, and the INSERT uses the target's physical column order so a reordered SELECT projection cannot misalign columns. Schema changes fall back to rename-swap for the affected run; the DML path also falls back when the existing relation is a view (DML cannot operate on views). Adds functional coverage for first run, subsequent run, schema-change fallback, default vs explicit method config, invalid method validation, clustered columnstore preservation, contract enforcement, models with CTEs, view-to-table migration, column-order divergence, empty models, and leftover-scratch cleanup. --- .../materializations/models/table/table.sql | 49 +- .../models/table/table_dml_refresh.sql | 95 +++ .../mssql/test_table_refresh_method.py | 644 ++++++++++++++++++ 3 files changed, 773 insertions(+), 15 deletions(-) create mode 100644 dbt/include/sqlserver/macros/materializations/models/table/table_dml_refresh.sql create mode 100644 tests/functional/adapter/mssql/test_table_refresh_method.py diff --git a/dbt/include/sqlserver/macros/materializations/models/table/table.sql b/dbt/include/sqlserver/macros/materializations/models/table/table.sql index c347701bd..466ec372b 100644 --- a/dbt/include/sqlserver/macros/materializations/models/table/table.sql +++ b/dbt/include/sqlserver/macros/materializations/models/table/table.sql @@ -17,6 +17,19 @@ -- grab current tables grants config for comparision later on {% set grant_config = config.get('grants') %} + {%- set table_refresh_method = config.get('table_refresh_method', 'rename') -%} + {%- if table_refresh_method not in ['rename', 'dml'] -%} + {{ exceptions.raise_compiler_error( + "Invalid table_refresh_method '" ~ table_refresh_method ~ "'. " + "Valid values are: 'rename' (default), 'dml'." + ) }} + {%- endif -%} + {%- set use_dml_refresh = ( + table_refresh_method == 'dml' + and existing_relation is not none + and existing_relation.type == 'table' + ) -%} + -- drop the temp relations if they exist already in the database {{ drop_relation_if_exists(preexisting_intermediate_relation) }} {{ drop_relation_if_exists(preexisting_backup_relation) }} @@ -26,24 +39,28 @@ -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} - -- build model - {% call statement('main') -%} - {{ get_create_table_as_sql(False, intermediate_relation, sql) }} - {%- endcall %} + {% if use_dml_refresh %} + {{ sqlserver__table_dml_refresh(target_relation, sql) }} + {% else %} + -- build model + {% call statement('main') -%} + {{ get_create_table_as_sql(False, intermediate_relation, sql) }} + {%- endcall %} - -- cleanup - {% if existing_relation is not none %} - /* Do the equivalent of rename_if_exists. 'existing_relation' could have been dropped - since the variable was first set. */ - {% set existing_relation = load_cached_relation(existing_relation) %} + -- cleanup {% if existing_relation is not none %} - {{ adapter.rename_relation(existing_relation, backup_relation) }} + /* Do the equivalent of rename_if_exists. 'existing_relation' could have been dropped + since the variable was first set. */ + {% set existing_relation = load_cached_relation(existing_relation) %} + {% if existing_relation is not none %} + {{ adapter.rename_relation(existing_relation, backup_relation) }} + {% endif %} {% endif %} - {% endif %} - {{ adapter.rename_relation(intermediate_relation, target_relation) }} + {{ adapter.rename_relation(intermediate_relation, target_relation) }} - {% do create_indexes(target_relation) %} + {% do create_indexes(target_relation) %} + {% endif %} {{ run_hooks(post_hooks, inside_transaction=True) }} @@ -55,8 +72,10 @@ -- `COMMIT` happens here {{ adapter.commit() }} - -- finally, drop the existing/backup relation after the commit - {{ drop_relation_if_exists(backup_relation) }} + {% if not use_dml_refresh %} + -- finally, drop the existing/backup relation after the commit + {{ drop_relation_if_exists(backup_relation) }} + {% endif %} {{ run_hooks(post_hooks, inside_transaction=False) }} diff --git a/dbt/include/sqlserver/macros/materializations/models/table/table_dml_refresh.sql b/dbt/include/sqlserver/macros/materializations/models/table/table_dml_refresh.sql new file mode 100644 index 000000000..4a5d095c4 --- /dev/null +++ b/dbt/include/sqlserver/macros/materializations/models/table/table_dml_refresh.sql @@ -0,0 +1,95 @@ +{% macro sqlserver__table_dml_refresh(target_relation, sql) %} + {# + DML-only table refresh for use under RCSI. + + Instead of rename-swap (which uses DDL and creates a window where the + table name doesnt resolve), this macro: + 1. Builds new data into a scratch table via SELECT INTO (minimally logged) + 2. Compares schemas — if columns changed, falls back to rename-swap + 3. Swaps data via DELETE + INSERT inside an explicit transaction + (RCSI ensures concurrent readers see old data until COMMIT) + 4. Cleans up the scratch table + + The scratch table is a regular table with a __dbt_refresh suffix, + not a global temp table. This avoids cross-session visibility issues + and ensures cleanup on failure (DROP IF EXISTS at the start of each run). + #} + + {%- set refresh_relation = target_relation.incorporate( + path={"identifier": target_relation.identifier ~ '__dbt_refresh'} + ) -%} + {%- set tmp_vw_relation = refresh_relation.incorporate( + path={"identifier": refresh_relation.identifier ~ '__dbt_tmp_vw'} + ) -%} + + {# Clean up any leftovers from a prior failed run #} + {% call statement('dml_refresh_cleanup_pre') -%} + DROP VIEW IF EXISTS {{ tmp_vw_relation.include(database=False) }}; + DROP TABLE IF EXISTS {{ refresh_relation }}; + {%- endcall %} + + {# Build new data into scratch table via temp view (handles CTEs in model SQL) #} + {# Named 'main' because dbt requires a statement('main') call in every materialization #} + {% call statement('dml_refresh_create_view') -%} + {{ get_create_view_as_sql(tmp_vw_relation, sql) }} + {%- endcall %} + + {% call statement('main') -%} + SELECT * INTO {{ refresh_relation }} FROM {{ tmp_vw_relation }}; + {%- endcall %} + + {% call statement('dml_refresh_drop_view') -%} + DROP VIEW IF EXISTS {{ tmp_vw_relation.include(database=False) }}; + {%- endcall %} + + {# Compare schemas: if columns differ, fall back to rename-swap #} + {%- set schema_changes = check_for_schema_changes(refresh_relation, target_relation) -%} + {%- set schema_match = not schema_changes['schema_changed'] -%} + + {% if schema_match %} + {# Use the target's physical column order for both INSERT and SELECT. #} + {# The scratch table has the same columns but possibly in a different order, #} + {# so naming columns explicitly makes the swap order-independent. #} + {%- set target_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set column_list = target_columns | map(attribute='quoted') | join(', ') -%} + + {# Atomic DML swap — RCSI protects concurrent readers #} + {# dbt-sqlserver uses autocommit=True and add_begin_query/add_commit_query #} + {# are no-ops, so this creates a simple (non-nested) transaction. #} + {% call statement('dml_refresh_swap') -%} + BEGIN TRANSACTION; + DELETE FROM {{ target_relation }}; + INSERT INTO {{ target_relation }} ({{ column_list }}) + SELECT {{ column_list }} FROM {{ refresh_relation }}; + COMMIT TRANSACTION; + {%- endcall %} + + {# Cleanup scratch table #} + {% call statement('dml_refresh_cleanup_post') -%} + DROP TABLE IF EXISTS {{ refresh_relation }}; + {%- endcall %} + + {% else %} + {# Schema changed — fall back to rename-swap for this run #} + {{ log("Schema change detected for " ~ target_relation ~ " — falling back to rename-swap", info=true) }} + + {%- set backup_relation_type = target_relation.type -%} + {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} + {{ drop_relation_if_exists(backup_relation) }} + + {# Rename scratch table into position #} + {% set existing_relation = load_cached_relation(target_relation) %} + {% if existing_relation is not none %} + {{ adapter.rename_relation(existing_relation, backup_relation) }} + {% endif %} + + {{ adapter.rename_relation(refresh_relation, target_relation) }} + + {% do create_indexes(target_relation) %} + + {{ drop_relation_if_exists(backup_relation) }} + + {# scratch table is now the target, nothing to drop #} + {% endif %} + +{% endmacro %} diff --git a/tests/functional/adapter/mssql/test_table_refresh_method.py b/tests/functional/adapter/mssql/test_table_refresh_method.py new file mode 100644 index 000000000..d1dc79c7b --- /dev/null +++ b/tests/functional/adapter/mssql/test_table_refresh_method.py @@ -0,0 +1,644 @@ +import os + +import pytest + +from dbt.tests.util import get_connection, run_dbt + +# -- Model fixtures -- + +dml_model_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "dml", + "as_columnstore": False + }) +}} +select 1 as id, 'hello' as val +""" + +dml_model_v2_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "dml", + "as_columnstore": False + }) +}} +select 2 as id, 'world' as val +""" + +dml_model_schema_change_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "dml", + "as_columnstore": False + }) +}} +select 1 as id, 'hello' as val, 42 as new_col +""" + +rename_model_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "rename", + "as_columnstore": False + }) +}} +select 1 as id, 'hello' as val +""" + +default_model_sql = """ +{{ + config({ + "materialized": "table", + "as_columnstore": False + }) +}} +select 1 as id, 'hello' as val +""" + +invalid_method_model_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "invalid", + "as_columnstore": False + }) +}} +select 1 as id +""" + +dml_with_columnstore_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "dml" + }) +}} +select 1 as id, 'hello' as val +""" + +dml_with_columnstore_v2_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "dml" + }) +}} +select 2 as id, 'world' as val +""" + +view_then_dml_table_sql = """ +{{ + config({ + "materialized": "view" + }) +}} +select 1 as id, 'hello' as val +""" + +view_then_dml_table_v2_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "dml", + "as_columnstore": False + }) +}} +select 2 as id, 'world' as val +""" + +dml_contract_model_sql = """ +{{ config(materialized="table", table_refresh_method="dml", as_columnstore=False) }} +select 1 as id, 'hello' as val +""" + +dml_contract_model_v2_sql = """ +{{ config(materialized="table", table_refresh_method="dml", as_columnstore=False) }} +select 2 as id, 'world' as val +""" + +dml_cte_model_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "dml", + "as_columnstore": False + }) +}} +with cte as ( + select 1 as id, 'hello' as val +) +select * from cte +""" + +dml_cte_model_v2_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "dml", + "as_columnstore": False + }) +}} +with cte as ( + select 2 as id, 'world' as val +) +select * from cte +""" + +dml_contract_schema_yml = """ +version: 2 +models: + - name: dml_contract_model + config: + contract: + enforced: true + columns: + - name: id + data_type: int + - name: val + data_type: varchar(5) +""" + + +def write_model(project, filename, contents): + """Write a model file into the project's models directory.""" + path = os.path.join(project.project_root, "models", filename) + with open(path, "w") as f: + f.write(contents) + + +def query_table(project, table_name): + """Query all rows from a table, return as list of tuples.""" + sql = f"SELECT * FROM {project.test_schema}.{table_name} ORDER BY id" + with get_connection(project.adapter): + _, table = project.adapter.execute(sql, fetch=True) + return table.rows + + +def table_exists(project, table_name): + """Check if a table exists in the test schema.""" + sql = ( + f"SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES " + f"WHERE TABLE_SCHEMA = '{project.test_schema}' " + f"AND TABLE_NAME = '{table_name}'" + ) + with get_connection(project.adapter): + _, table = project.adapter.execute(sql, fetch=True) + return table.rows[0][0] == 1 + + +def get_column_names(project, table_name): + """Get column names for a table in order.""" + sql = ( + f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS " + f"WHERE TABLE_SCHEMA = '{project.test_schema}' " + f"AND TABLE_NAME = '{table_name}' " + f"ORDER BY ORDINAL_POSITION" + ) + with get_connection(project.adapter): + _, table = project.adapter.execute(sql, fetch=True) + return [row[0] for row in table.rows] + + +def has_columnstore_index(project, table_name): + """Check if a table has a clustered columnstore index.""" + sql = ( + f"SELECT COUNT(*) FROM sys.indexes i " + f"JOIN sys.tables t ON i.object_id = t.object_id " + f"JOIN sys.schemas s ON t.schema_id = s.schema_id " + f"WHERE s.name = '{project.test_schema}' " + f"AND t.name = '{table_name}' " + f"AND i.type = 5" + ) + with get_connection(project.adapter): + _, table = project.adapter.execute(sql, fetch=True) + return table.rows[0][0] > 0 + + +# -- Test: First run uses standard CREATE path (table doesn't exist yet) -- + + +class TestDmlRefreshFirstRun: + @pytest.fixture(scope="class") + def models(self): + return {"dml_model.sql": dml_model_sql} + + def test_first_run_creates_table(self, project): + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + rows = query_table(project, "dml_model") + assert len(rows) == 1 + assert rows[0][0] == 1 + assert rows[0][1] == "hello" + + +# -- Test: Second run with same schema uses DML refresh (DELETE + INSERT) -- + + +class TestDmlRefreshSubsequentRun: + @pytest.fixture(scope="class") + def models(self): + return {"dml_model.sql": dml_model_sql} + + def test_dml_refresh_updates_data(self, project): + # First run — creates the table + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + rows = query_table(project, "dml_model") + assert len(rows) == 1 + assert rows[0][0] == 1 + + # Swap in the v2 model with different data but same schema + write_model(project, "dml_model.sql", dml_model_v2_sql) + + # Second run — should use DML refresh + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + rows = query_table(project, "dml_model") + assert len(rows) == 1 + assert rows[0][0] == 2 + assert rows[0][1] == "world" + + # Scratch table should be cleaned up + assert not table_exists(project, "dml_model__dbt_refresh") + + +# -- Test: Schema change triggers rename-swap fallback -- + + +class TestDmlRefreshSchemaChange: + @pytest.fixture(scope="class") + def models(self): + return {"dml_model.sql": dml_model_sql} + + def test_schema_change_falls_back_to_rename(self, project): + # First run — creates the table + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + cols = get_column_names(project, "dml_model") + assert cols == ["id", "val"] + + # Swap in model with an extra column + write_model(project, "dml_model.sql", dml_model_schema_change_sql) + + # Second run — schema changed, should fall back to rename-swap + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + cols = get_column_names(project, "dml_model") + assert "new_col" in cols + + rows = query_table(project, "dml_model") + assert len(rows) == 1 + + # Scratch table should be cleaned up + assert not table_exists(project, "dml_model__dbt_refresh") + + +# -- Test: Default config uses rename-swap (backwards compatible) -- + + +class TestDefaultMethodUsesRename: + @pytest.fixture(scope="class") + def models(self): + return {"default_model.sql": default_model_sql} + + def test_default_uses_rename(self, project): + # First run + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + # Second run — should use rename-swap (no scratch table created) + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + rows = query_table(project, "default_model") + assert len(rows) == 1 + + +# -- Test: Explicit rename method works -- + + +class TestExplicitRenameMethod: + @pytest.fixture(scope="class") + def models(self): + return {"rename_model.sql": rename_model_sql} + + def test_explicit_rename(self, project): + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + # Second run + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + rows = query_table(project, "rename_model") + assert len(rows) == 1 + + +# -- Test: Invalid config value raises compiler error -- + + +class TestInvalidRefreshMethod: + @pytest.fixture(scope="class") + def models(self): + return {"invalid_model.sql": invalid_method_model_sql} + + def test_invalid_method_raises_error(self, project): + results = run_dbt(["run"], expect_pass=False) + assert len(results) == 1 + assert results[0].status == "error" + + +# -- Test: DML refresh with as_columnstore (CCI survives DML) -- + + +class TestDmlRefreshWithColumnstore: + @pytest.fixture(scope="class") + def models(self): + return {"dml_cci_model.sql": dml_with_columnstore_sql} + + def test_cci_survives_dml_refresh(self, project): + # First run — creates table with CCI + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + assert has_columnstore_index(project, "dml_cci_model") + + # Swap in v2 data + write_model(project, "dml_cci_model.sql", dml_with_columnstore_v2_sql) + + # Second run — DML refresh, CCI should survive + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + assert has_columnstore_index(project, "dml_cci_model") + + rows = query_table(project, "dml_cci_model") + assert len(rows) == 1 + assert rows[0][0] == 2 + + +# -- Test: DML refresh with contract enforced -- + + +class TestDmlRefreshWithContract: + @pytest.fixture(scope="class") + def models(self): + return { + "dml_contract_model.sql": dml_contract_model_sql, + "schema.yml": dml_contract_schema_yml, + } + + def test_contract_with_dml_refresh(self, project): + # First run — contract creates table with explicit types + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + rows = query_table(project, "dml_contract_model") + assert len(rows) == 1 + assert rows[0][0] == 1 + + # Swap in v2 data (same schema) + write_model(project, "dml_contract_model.sql", dml_contract_model_v2_sql) + + # Second run — should use DML refresh since schema matches + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + rows = query_table(project, "dml_contract_model") + assert len(rows) == 1 + assert rows[0][0] == 2 + + +# -- Test: DML refresh works with CTEs in model SQL -- + + +class TestDmlRefreshWithCTE: + @pytest.fixture(scope="class") + def models(self): + return {"dml_cte_model.sql": dml_cte_model_sql} + + def test_cte_model_dml_refresh(self, project): + # First run — creates the table (uses CREATE path, no DML refresh) + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + rows = query_table(project, "dml_cte_model") + assert len(rows) == 1 + assert rows[0][0] == 1 + assert rows[0][1] == "hello" + + # Swap in v2 model with CTE but different data + write_model(project, "dml_cte_model.sql", dml_cte_model_v2_sql) + + # Second run — DML refresh with CTE-based SQL + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + rows = query_table(project, "dml_cte_model") + assert len(rows) == 1 + assert rows[0][0] == 2 + assert rows[0][1] == "world" + + # Scratch table should be cleaned up + assert not table_exists(project, "dml_cte_model__dbt_refresh") + + +# -- Test: Existing view with DML refresh falls back to rename-swap -- + + +class TestDmlRefreshExistingViewFallback: + @pytest.fixture(scope="class") + def models(self): + return {"dml_view_model.sql": view_then_dml_table_sql} + + def test_view_to_table_with_dml_config(self, project): + # First run — creates a view + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + # Verify it's a view + sql = ( + f"SELECT COUNT(*) FROM INFORMATION_SCHEMA.VIEWS " + f"WHERE TABLE_SCHEMA = '{project.test_schema}' " + f"AND TABLE_NAME = 'dml_view_model'" + ) + with get_connection(project.adapter): + _, result = project.adapter.execute(sql, fetch=True) + assert result.rows[0][0] == 1 + + # Swap in v2 model that materializes as table with dml refresh + write_model(project, "dml_view_model.sql", view_then_dml_table_v2_sql) + + # Second run — existing relation is a view, DML refresh should + # skip the DELETE+INSERT path and fall back to rename-swap + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + # Verify it's now a table, not a view + assert table_exists(project, "dml_view_model") + + rows = query_table(project, "dml_view_model") + assert len(rows) == 1 + assert rows[0][0] == 2 + assert rows[0][1] == "world" + + +# -- Test: DML refresh preserves target column order when scratch SELECT reorders -- + +dml_model_reorder_v1_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "dml", + "as_columnstore": False + }) +}} +select 1 as id, 'hello' as val +""" + +# Same columns, reversed projection order — SELECT INTO scratch will have +# (val, id) physical order while target keeps (id, val). +dml_model_reorder_v2_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "dml", + "as_columnstore": False + }) +}} +select 'world' as val, 2 as id +""" + + +class TestDmlRefreshColumnOrderMismatch: + @pytest.fixture(scope="class") + def models(self): + return {"dml_reorder_model.sql": dml_model_reorder_v1_sql} + + def test_column_order_mismatch_inserts_by_name(self, project): + # First run — creates target with physical order (id, val) + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + # Swap in a model that selects the same columns in reversed order + write_model(project, "dml_reorder_model.sql", dml_model_reorder_v2_sql) + + # Second run — DML refresh path. Without the explicit column list, + # `INSERT INTO target SELECT * FROM scratch` would map scratch.val -> + # target.id and scratch.id -> target.val, producing wrong values. + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + rows = query_table(project, "dml_reorder_model") + assert len(rows) == 1 + # id column still holds an int, val column still holds the string — + # the named INSERT is what makes this work. + assert rows[0][0] == 2 + assert rows[0][1] == "world" + + # Scratch cleaned up + assert not table_exists(project, "dml_reorder_model__dbt_refresh") + + +# -- Test: DML refresh handles a model that produces zero rows -- + +dml_model_empty_sql = """ +{{ + config({ + "materialized": "table", + "table_refresh_method": "dml", + "as_columnstore": False + }) +}} +select 1 as id, 'hello' as val where 1 = 0 +""" + + +class TestDmlRefreshEmptyModel: + @pytest.fixture(scope="class") + def models(self): + return {"dml_model.sql": dml_model_sql} + + def test_empty_model_swap(self, project): + # First run — one row + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + assert len(query_table(project, "dml_model")) == 1 + + # Swap in an empty-projection version + write_model(project, "dml_model.sql", dml_model_empty_sql) + + # Second run — DELETE removes the original row, INSERT inserts nothing + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + assert query_table(project, "dml_model") == [] + assert not table_exists(project, "dml_model__dbt_refresh") + + +# -- Test: leftover scratch table from a prior failed run is cleaned up -- + + +class TestDmlRefreshLeftoverScratchCleanup: + @pytest.fixture(scope="class") + def models(self): + return {"dml_model.sql": dml_model_sql} + + def test_leftover_scratch_does_not_block_refresh(self, project): + # First run — establishes the target + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + # Simulate a prior failed run by manually creating a leftover scratch table + with get_connection(project.adapter): + project.adapter.execute( + f"SELECT CAST(99 AS INT) AS id, CAST('stale' AS VARCHAR(5)) AS val " + f"INTO {project.test_schema}.dml_model__dbt_refresh" + ) + assert table_exists(project, "dml_model__dbt_refresh") + + # Second run — the macro's pre-DROP TABLE IF EXISTS should clean up the + # leftover before re-creating it; the run should succeed normally. + results = run_dbt(["run"]) + assert len(results) == 1 + assert results[0].status == "success" + + # Target has the live row (not the stale one) + rows = query_table(project, "dml_model") + assert len(rows) == 1 + assert rows[0][0] == 1 + assert rows[0][1] == "hello" + + # Scratch fully cleaned up after the refresh + assert not table_exists(project, "dml_model__dbt_refresh")