forked from dbt-msft/dbt-sqlserver
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmerge.sql
More file actions
89 lines (80 loc) · 4.03 KB
/
Copy pathmerge.sql
File metadata and controls
89 lines (80 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
{% macro sqlserver__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) %}
{{ default__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) }}
{{ get_query_options(parse_options=True) }}
{% endmacro %}
{% macro sqlserver__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) %}
{{ default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) }};
{% endmacro %}
{% macro sqlserver__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) %}
{% set query_label = get_query_options(parse_options=True) %}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{% if unique_key %}
{% if unique_key is sequence and unique_key is not string %}
delete from {{ target }}
where exists (
select null
from {{ source }}
where
{% for key in unique_key %}
{{ source }}.{{ key }} = {{ target }}.{{ key }}
{{ "and " if not loop.last }}
{% endfor %}
)
{% if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{% endif %}
{{ query_label }}
{% else %}
delete from {{ target }}
where (
{{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
)
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%}
{{ query_label }}
{% endif %}
{% endif %}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
){{ query_label }}
{% endmacro %}
{% macro sqlserver__get_incremental_microbatch_sql(arg_dict) %}
{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}
{%- set query_label = get_query_options(parse_options=True) -%}
{#-- Add additional incremental_predicates to filter for batch --#}
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
{{ log("incremental append event start time > DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= cast('" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "' as datetimeoffset)") }}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= cast('" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "' as datetimeoffset)") %}
{% endif %}
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
{{ log("incremental append event end time < DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < cast('" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "' as datetimeoffset)") }}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < cast('" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "' as datetimeoffset)") %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}
delete DBT_INTERNAL_TARGET from {{ target }} AS DBT_INTERNAL_TARGET
where (
{% for predicate in incremental_predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
)
{{ query_label }}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{{ query_label }}
{% endmacro %}