Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@
-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}

{%- set table_refresh_method = config.get('table_refresh_method', 'rename') -%}
{%- if table_refresh_method not in ['rename', 'dml'] -%}
{{ exceptions.raise_compiler_error(
"Invalid table_refresh_method '" ~ table_refresh_method ~ "'. "
"Valid values are: 'rename' (default), 'dml'."
) }}
{%- endif -%}
{%- set use_dml_refresh = (
table_refresh_method == 'dml'
and existing_relation is not none
and existing_relation.type == 'table'
) -%}

-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}
Expand All @@ -26,24 +39,28 @@
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% call statement('main') -%}
{{ get_create_table_as_sql(False, intermediate_relation, sql) }}
{%- endcall %}
{% if use_dml_refresh %}
{{ sqlserver__table_dml_refresh(target_relation, sql) }}
{% else %}
-- build model
{% call statement('main') -%}
{{ get_create_table_as_sql(False, intermediate_relation, sql) }}
{%- endcall %}

-- cleanup
{% if existing_relation is not none %}
/* Do the equivalent of rename_if_exists. 'existing_relation' could have been dropped
since the variable was first set. */
{% set existing_relation = load_cached_relation(existing_relation) %}
-- cleanup
{% if existing_relation is not none %}
{{ adapter.rename_relation(existing_relation, backup_relation) }}
/* Do the equivalent of rename_if_exists. 'existing_relation' could have been dropped
since the variable was first set. */
{% set existing_relation = load_cached_relation(existing_relation) %}
{% if existing_relation is not none %}
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{% endif %}
{% endif %}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{% do create_indexes(target_relation) %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

Expand All @@ -55,8 +72,10 @@
-- `COMMIT` happens here
{{ adapter.commit() }}

-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}
{% if not use_dml_refresh %}
-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
{% macro sqlserver__table_dml_refresh(target_relation, sql) %}
{#
DML-only table refresh for use under RCSI.

Instead of rename-swap (which uses DDL and creates a window where the
table name doesnt resolve), this macro:
1. Builds new data into a scratch table via SELECT INTO (minimally logged)
2. Compares schemas — if columns changed, falls back to rename-swap
3. Swaps data via DELETE + INSERT inside an explicit transaction
(RCSI ensures concurrent readers see old data until COMMIT)
4. Cleans up the scratch table

The scratch table is a regular table with a __dbt_refresh suffix,
not a global temp table. This avoids cross-session visibility issues
and ensures cleanup on failure (DROP IF EXISTS at the start of each run).
#}

{%- set refresh_relation = target_relation.incorporate(
path={"identifier": target_relation.identifier ~ '__dbt_refresh'}
) -%}
{%- set tmp_vw_relation = refresh_relation.incorporate(
path={"identifier": refresh_relation.identifier ~ '__dbt_tmp_vw'}
) -%}

{# Clean up any leftovers from a prior failed run #}
{% call statement('dml_refresh_cleanup_pre') -%}
DROP VIEW IF EXISTS {{ tmp_vw_relation.include(database=False) }};
DROP TABLE IF EXISTS {{ refresh_relation }};
{%- endcall %}

{# Build new data into scratch table via temp view (handles CTEs in model SQL) #}
{# Named 'main' because dbt requires a statement('main') call in every materialization #}
{% call statement('dml_refresh_create_view') -%}
{{ get_create_view_as_sql(tmp_vw_relation, sql) }}
{%- endcall %}

{% call statement('main') -%}
SELECT * INTO {{ refresh_relation }} FROM {{ tmp_vw_relation }};
{%- endcall %}

{% call statement('dml_refresh_drop_view') -%}
DROP VIEW IF EXISTS {{ tmp_vw_relation.include(database=False) }};
{%- endcall %}

{# Compare schemas: if columns differ, fall back to rename-swap #}
{%- set schema_changes = check_for_schema_changes(refresh_relation, target_relation) -%}
{%- set schema_match = not schema_changes['schema_changed'] -%}

{% if schema_match %}
{# Use the target's physical column order for both INSERT and SELECT. #}
{# The scratch table has the same columns but possibly in a different order, #}
{# so naming columns explicitly makes the swap order-independent. #}
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set column_list = target_columns | map(attribute='quoted') | join(', ') -%}

{# Atomic DML swap — RCSI protects concurrent readers #}
{# dbt-sqlserver uses autocommit=True and add_begin_query/add_commit_query #}
{# are no-ops, so this creates a simple (non-nested) transaction. #}
{% call statement('dml_refresh_swap') -%}
BEGIN TRANSACTION;
DELETE FROM {{ target_relation }};
INSERT INTO {{ target_relation }} ({{ column_list }})
SELECT {{ column_list }} FROM {{ refresh_relation }};
COMMIT TRANSACTION;
{%- endcall %}

{# Cleanup scratch table #}
{% call statement('dml_refresh_cleanup_post') -%}
DROP TABLE IF EXISTS {{ refresh_relation }};
{%- endcall %}

{% else %}
{# Schema changed — fall back to rename-swap for this run #}
{{ log("Schema change detected for " ~ target_relation ~ " — falling back to rename-swap", info=true) }}

{%- set backup_relation_type = target_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
{{ drop_relation_if_exists(backup_relation) }}

{# Rename scratch table into position #}
{% set existing_relation = load_cached_relation(target_relation) %}
{% if existing_relation is not none %}
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{% endif %}

{{ adapter.rename_relation(refresh_relation, target_relation) }}

{% do create_indexes(target_relation) %}

{{ drop_relation_if_exists(backup_relation) }}

{# scratch table is now the target, nothing to drop #}
{% endif %}

{% endmacro %}
Loading