Skip to content

Commit 2c08c87

Browse files
feat: add Microsoft Fabric DWH integration (ELE-5282)
Add support for Microsoft Fabric Data Warehouse as a new adapter in the elementary dbt package. Fabric uses T-SQL syntax and is supported via the dbt-fabric adapter maintained by Microsoft. Changes: - Add fabric__ macro overrides for date/time functions (datediff, date_trunc, time_trunc, to_char, timeadd, dateadd, current_timestamp) - Add fabric__ data type overrides (data_type_list, type_timestamp, type_string, type_bool, get_normalized_data_type, try_cast_column_to_timestamp) - Add fabric__ table operations (complete_buckets_cte with recursive CTE, escape_special_chars) - Add fabric__ cross-DB utilities (target_database, incremental_strategy, generate_elementary_profile_args, day_of_week, hour_of_day, hour_of_week) - Add fabric__ nested CTE workarounds (query_test_result_rows, get_failed_row_count_calc_query) using temp tables to avoid nested CTEs and TOP instead of LIMIT - Add docker-compose-sqlserver.yml for local testing with env var credentials - Add fabric target to profiles.yml.j2 - Update CI workflows (test-warehouse.yml, test-all-warehouses.yml) with SQL Server Docker setup, ODBC driver installation, and fabric target Co-Authored-By: Itamar Hartstein <haritamar@gmail.com>
1 parent d88dc70 commit 2c08c87

25 files changed

Lines changed: 306 additions & 4 deletions

.github/workflows/test-all-warehouses.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ jobs:
4848
dbt-version:
4949
${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) ||
5050
fromJSON('["latest_official", "latest_pre"]') }}
51-
warehouse-type: [postgres, clickhouse, trino, dremio, spark, duckdb]
51+
warehouse-type:
52+
[postgres, clickhouse, trino, dremio, spark, duckdb, fabric]
5253
exclude:
5354
# latest_pre is only tested on postgres
5455
- dbt-version: latest_pre
@@ -61,6 +62,8 @@ jobs:
6162
warehouse-type: spark
6263
- dbt-version: latest_pre
6364
warehouse-type: duckdb
65+
- dbt-version: latest_pre
66+
warehouse-type: fabric
6467
uses: ./.github/workflows/test-warehouse.yml
6568
with:
6669
warehouse-type: ${{ matrix.warehouse-type }}

.github/workflows/test-warehouse.yml

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ on:
1818
- clickhouse
1919
- dremio
2020
- duckdb
21+
- fabric
2122
elementary-ref:
2223
type: string
2324
required: false
@@ -51,6 +52,7 @@ on:
5152
env:
5253
BRANCH_NAME: ${{ github.head_ref || github.ref_name }}
5354
TESTS_DIR: ${{ github.workspace }}/dbt-data-reliability/integration_tests
55+
MSSQL_SA_PASSWORD: ${{ secrets.MSSQL_SA_PASSWORD || 'Elementary123!' }}
5456

5557
jobs:
5658
test:
@@ -100,6 +102,25 @@ jobs:
100102
timeout 180 bash -c 'until [ "$(docker inspect -f {{.State.Health.Status}} dremio 2>/dev/null)" = "healthy" ]; do sleep 5; done'
101103
echo "Dremio is healthy."
102104
105+
- name: Start SQL Server
106+
if: inputs.warehouse-type == 'fabric'
107+
working-directory: ${{ env.TESTS_DIR }}
108+
env:
109+
MSSQL_SA_PASSWORD: ${{ env.MSSQL_SA_PASSWORD }}
110+
run: |
111+
docker compose -f docker-compose-sqlserver.yml up -d
112+
echo "Waiting for SQL Server to become healthy..."
113+
timeout 120 bash -c 'until [ "$(docker inspect -f {{.State.Health.Status}} sqlserver 2>/dev/null)" = "healthy" ]; do sleep 5; done'
114+
echo "SQL Server is healthy."
115+
116+
- name: Install ODBC Driver
117+
if: inputs.warehouse-type == 'fabric'
118+
run: |
119+
curl https://packages.microsoft.com/keys/microsoft.asc | sudo tee /etc/apt/trusted.gpg.d/microsoft.asc
120+
curl https://packages.microsoft.com/config/ubuntu/$(lsb_release -rs)/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list
121+
sudo apt-get update
122+
sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18 unixodbc-dev
123+
103124
- name: Start Spark
104125
if: inputs.warehouse-type == 'spark'
105126
working-directory: ${{ env.TESTS_DIR }}
@@ -136,19 +157,20 @@ jobs:
136157
run:
137158
pip install${{ (inputs.dbt-version == 'latest_pre' && ' --pre') || '' }}
138159
"dbt-core${{ (!startsWith(inputs.dbt-version, 'latest') && format('=={0}', inputs.dbt-version)) || '' }}"
139-
"dbt-${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || (inputs.warehouse-type == 'spark' && 'spark[PyHive]') || (inputs.warehouse-type == 'athena' && 'athena-community') || inputs.warehouse-type }}${{ (!startsWith(inputs.dbt-version, 'latest') && format('~={0}', inputs.dbt-version)) || '' }}"
160+
"dbt-${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || (inputs.warehouse-type == 'spark' && 'spark[PyHive]') || (inputs.warehouse-type == 'athena' && 'athena-community') || (inputs.warehouse-type == 'fabric' && 'fabric') || inputs.warehouse-type }}${{ (!startsWith(inputs.dbt-version, 'latest') && format('~={0}', inputs.dbt-version)) || '' }}"
140161

141162
- name: Install dbt-fusion
142163
if: inputs.dbt-version == 'fusion'
143164
run: |
144165
curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s --
145166
146167
- name: Install Elementary
147-
run: pip install "./elementary[${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || inputs.warehouse-type }}]"
168+
run: pip install "./elementary[${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || (inputs.warehouse-type == 'fabric' && 'fabric') || inputs.warehouse-type }}]"
148169

149170
- name: Write dbt profiles
150171
env:
151172
CI_WAREHOUSE_SECRETS: ${{ secrets.CI_WAREHOUSE_SECRETS || '' }}
173+
MSSQL_SA_PASSWORD: ${{ env.MSSQL_SA_PASSWORD }}
152174
run: |
153175
# Schema name = dbt_<YYMMDD_HHMMSS>_<branch≤18>_<8-char hash>
154176
# The hash prevents collisions across concurrent jobs; the branch
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
version: "3.8"
2+
services:
3+
sqlserver:
4+
image: mcr.microsoft.com/mssql/server:2022-latest
5+
container_name: sqlserver
6+
ports:
7+
- "127.0.0.1:1433:1433"
8+
environment:
9+
ACCEPT_EULA: "Y"
10+
MSSQL_SA_PASSWORD: "${MSSQL_SA_PASSWORD}"
11+
MSSQL_PID: "Developer"
12+
healthcheck:
13+
test: /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P "${MSSQL_SA_PASSWORD}" -C -Q "SELECT 1" -b
14+
interval: 10s
15+
timeout: 5s
16+
retries: 10

integration_tests/profiles/profiles.yml.j2

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,18 @@ elementary_tests:
6262
schema: {{ schema_name }}
6363
threads: 8
6464

65+
fabric: &fabric
66+
type: fabric
67+
driver: "ODBC Driver 18 for SQL Server"
68+
server: 127.0.0.1
69+
port: 1433
70+
database: master
71+
schema: {{ schema_name }}
72+
user: sa
73+
password: "{{ env_var('MSSQL_SA_PASSWORD') }}"
74+
trust_cert: true
75+
threads: 4
76+
6577
# ── Cloud targets (secrets substituted at CI time) ─────────────────
6678

6779
snowflake: &snowflake
@@ -122,7 +134,7 @@ elementary_tests:
122134
elementary:
123135
target: postgres
124136
outputs:
125-
{%- set targets = ['postgres', 'clickhouse', 'trino', 'dremio', 'spark', 'duckdb', 'snowflake', 'bigquery', 'redshift', 'databricks_catalog', 'athena'] %}
137+
{%- set targets = ['postgres', 'clickhouse', 'trino', 'dremio', 'spark', 'duckdb', 'fabric', 'snowflake', 'bigquery', 'redshift', 'databricks_catalog', 'athena'] %}
126138
{%- for t in targets %}
127139
{{ t }}:
128140
<<: *{{ t }}

macros/edr/materializations/test/failed_row_count.sql

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,32 @@
4242
{% endmacro %}
4343

4444
{% macro get_failed_row_count_calc_query(failed_row_count_calc) %}
45+
{{
46+
return(
47+
adapter.dispatch("get_failed_row_count_calc_query", "elementary")(
48+
failed_row_count_calc
49+
)
50+
)
51+
}}
52+
{% endmacro %}
53+
54+
{% macro default__get_failed_row_count_calc_query(failed_row_count_calc) %}
4555
with results as ({{ sql }})
4656
select
4757
{{ failed_row_count_calc }}
4858
as {{ elementary.escape_reserved_keywords("count") }}
4959
from results
5060
{% endmacro %}
61+
62+
{% macro fabric__get_failed_row_count_calc_query(failed_row_count_calc) %}
63+
{# Fabric / T-SQL does not support nested CTEs.
64+
We create a temp table from the test SQL, then select from it. #}
65+
{% set tmp_relation = elementary.make_temp_relation(model) %}
66+
{% do run_query(
67+
"select * into " ~ tmp_relation ~ " from (" ~ sql ~ ") as __edr_inner"
68+
) %}
69+
select
70+
{{ failed_row_count_calc }}
71+
as {{ elementary.escape_reserved_keywords("count") }}
72+
from {{ tmp_relation }}
73+
{% endmacro %}

macros/edr/materializations/test/test.sql

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,31 @@
188188
{% do return(elementary.agate_to_dicts(elementary.run_query(query))) %}
189189
{% endmacro %}
190190

191+
{% macro fabric__query_test_result_rows(sample_limit=none, ignore_passed_tests=false) %}
192+
{% if sample_limit == 0 %} {% do return([]) %} {% endif %}
193+
194+
{# Allow setting -1 for unlimited, as none values are stripped from meta in dbt-fusion #}
195+
{% if sample_limit == -1 %} {% set sample_limit = none %} {% endif %}
196+
197+
{% if ignore_passed_tests and elementary.did_test_pass() %}
198+
{% do elementary.debug_log("Skipping sample query because the test passed.") %}
199+
{% do return([]) %}
200+
{% endif %}
201+
202+
{# Fabric / T-SQL does not support nested CTEs or LIMIT.
203+
We create a temp table from the test SQL, then select from it using TOP. #}
204+
{% set tmp_relation = elementary.make_temp_relation(model) %}
205+
{% do run_query(
206+
"select * into " ~ tmp_relation ~ " from (" ~ sql ~ ") as __edr_inner"
207+
) %}
208+
{% set query %}
209+
select {% if sample_limit is not none %} top {{ sample_limit }} {% endif %} * from {{ tmp_relation }}
210+
{% endset %}
211+
{% set result = elementary.agate_to_dicts(elementary.run_query(query)) %}
212+
{% do run_query("drop table if exists " ~ tmp_relation) %}
213+
{% do return(result) %}
214+
{% endmacro %}
215+
191216
{% macro get_columns_to_exclude_from_sampling(flattened_test) %}
192217
{% set columns_to_exclude = [] %}
193218

macros/edr/system/system_utils/buckets_cte.sql

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,30 @@
231231
{{ return(complete_buckets_cte) }}
232232
{% endmacro %}
233233

234+
{% macro fabric__complete_buckets_cte(
235+
time_bucket,
236+
bucket_end_expr,
237+
min_bucket_start_expr,
238+
max_bucket_end_expr
239+
) -%}
240+
{%- set complete_buckets_cte %}
241+
with timestamps as (
242+
select {{ min_bucket_start_expr }} as edr_bucket_start
243+
union all
244+
select {{ bucket_end_expr }} as next_bucket
245+
from timestamps
246+
where next_bucket < {{ max_bucket_end_expr }}
247+
)
248+
select
249+
edr_bucket_start,
250+
{{ bucket_end_expr }} as edr_bucket_end
251+
from timestamps
252+
where {{ bucket_end_expr }} <= {{ max_bucket_end_expr }}
253+
option (maxrecursion 10000)
254+
{%- endset %}
255+
{{ return(complete_buckets_cte) }}
256+
{% endmacro %}
257+
234258
{% macro dremio__complete_buckets_cte(
235259
time_bucket, bucket_end_expr, min_bucket_start_expr, max_bucket_end_expr
236260
) %}

macros/utils/cross_db_utils/current_timestamp.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@
8484
cast(current_timestamp at time zone 'UTC' as timestamp(6))
8585
{%- endmacro -%}
8686

87+
{% macro fabric__edr_current_timestamp() -%} getdate() {%- endmacro -%}
88+
89+
{% macro fabric__edr_current_timestamp_in_utc() -%} sysutcdatetime() {%- endmacro -%}
90+
8791
{% macro dremio__edr_current_timestamp() -%} current_timestamp() {%- endmacro -%}
8892

8993
{% macro dremio__edr_current_timestamp_in_utc() -%}

macros/utils/cross_db_utils/date_trunc.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,7 @@
2020
{% macro bigquery__edr_date_trunc(date_part, date_expression) %}
2121
timestamp_trunc(cast({{ date_expression }} as timestamp), {{ date_part }})
2222
{% endmacro %}
23+
24+
{% macro fabric__edr_date_trunc(datepart, date_expression) %}
25+
datetrunc({{ datepart }}, {{ date_expression }})
26+
{% endmacro %}

macros/utils/cross_db_utils/dateadd.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
2525
This override outputs just TIMESTAMPADD(...) as an expression (no "select" prefix).
2626
#}
27+
{% macro fabric__edr_dateadd(datepart, interval, from_date_or_timestamp) %}
28+
dateadd({{ datepart }}, {{ interval }}, {{ from_date_or_timestamp }})
29+
{% endmacro %}
30+
2731
{% macro dremio__edr_dateadd(datepart, interval, from_date_or_timestamp) %}
2832
{% set datepart = datepart | lower %}
2933
{% if datepart == "year" %}

0 commit comments

Comments
 (0)