Skip to content

Commit 534afc6

Browse files
feat: add Microsoft Fabric DWH integration (ELE-5282) (#962)
1 parent d88dc70 commit 534afc6

72 files changed

Lines changed: 1513 additions & 477 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/test-all-warehouses.yml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ jobs:
4848
dbt-version:
4949
${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) ||
5050
fromJSON('["latest_official", "latest_pre"]') }}
51-
warehouse-type: [postgres, clickhouse, trino, dremio, spark, duckdb]
51+
warehouse-type:
52+
[postgres, clickhouse, trino, dremio, spark, duckdb, sqlserver]
5253
exclude:
5354
# latest_pre is only tested on postgres
5455
- dbt-version: latest_pre
@@ -61,6 +62,8 @@ jobs:
6162
warehouse-type: spark
6263
- dbt-version: latest_pre
6364
warehouse-type: duckdb
65+
- dbt-version: latest_pre
66+
warehouse-type: sqlserver
6467
uses: ./.github/workflows/test-warehouse.yml
6568
with:
6669
warehouse-type: ${{ matrix.warehouse-type }}
@@ -124,7 +127,7 @@ jobs:
124127
${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) ||
125128
fromJSON('["latest_official"]') }}
126129
warehouse-type:
127-
[snowflake, bigquery, redshift, databricks_catalog, athena]
130+
[snowflake, bigquery, redshift, databricks_catalog, athena, fabric]
128131
# Fusion includes: always run fusion alongside the base version for
129132
# supported warehouses. When inputs.dbt-version is already 'fusion' the
130133
# matrix deduplicates automatically.

.github/workflows/test-warehouse.yml

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ on:
1818
- clickhouse
1919
- dremio
2020
- duckdb
21+
- sqlserver
22+
- fabric
2123
elementary-ref:
2224
type: string
2325
required: false
@@ -55,6 +57,7 @@ env:
5557
jobs:
5658
test:
5759
runs-on: ubuntu-latest
60+
timeout-minutes: 60
5861
concurrency:
5962
# Serialises runs for the same warehouse × dbt-version × branch.
6063
# The schema name is derived from a hash of this group (see "Write dbt profiles").
@@ -100,6 +103,23 @@ jobs:
100103
timeout 180 bash -c 'until [ "$(docker inspect -f {{.State.Health.Status}} dremio 2>/dev/null)" = "healthy" ]; do sleep 5; done'
101104
echo "Dremio is healthy."
102105
106+
- name: Start SQL Server
107+
if: inputs.warehouse-type == 'sqlserver'
108+
working-directory: ${{ env.TESTS_DIR }}
109+
run: |
110+
docker compose -f docker-compose-sqlserver.yml up -d
111+
echo "Waiting for SQL Server to become healthy..."
112+
timeout 120 bash -c 'until [ "$(docker inspect -f {{.State.Health.Status}} sqlserver 2>/dev/null)" = "healthy" ]; do sleep 5; done'
113+
echo "SQL Server is healthy."
114+
115+
- name: Install ODBC Driver
116+
if: inputs.warehouse-type == 'sqlserver' || inputs.warehouse-type == 'fabric'
117+
run: |
118+
curl https://packages.microsoft.com/keys/microsoft.asc | sudo tee /etc/apt/trusted.gpg.d/microsoft.asc
119+
curl https://packages.microsoft.com/config/ubuntu/$(lsb_release -rs)/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list
120+
sudo apt-get update
121+
sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18 unixodbc-dev
122+
103123
- name: Start Spark
104124
if: inputs.warehouse-type == 'spark'
105125
working-directory: ${{ env.TESTS_DIR }}
@@ -206,7 +226,7 @@ jobs:
206226
- name: Drop test schemas
207227
if: >-
208228
always() &&
209-
contains(fromJSON('["snowflake","bigquery","redshift","databricks_catalog","athena"]'), inputs.warehouse-type)
229+
contains(fromJSON('["snowflake","bigquery","redshift","databricks_catalog","athena","fabric"]'), inputs.warehouse-type)
210230
working-directory: ${{ env.TESTS_DIR }}
211231
continue-on-error: true
212232
run: |

integration_tests/dbt_project/macros/schema_utils/list_schemas.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@
3333
{% do return(schemas) %}
3434
{% endmacro %}
3535
36+
{% macro fabric__edr_list_schemas(database) %}
37+
{# Fabric does not support information_schema.schemata; use sys.schemas instead #}
38+
{% set results = run_query("SELECT name FROM sys.schemas") %}
39+
{% set schemas = [] %}
40+
{% for row in results %} {% do schemas.append(row[0]) %} {% endfor %}
41+
{% do return(schemas) %}
42+
{% endmacro %}
43+
3644
{% macro clickhouse__edr_list_schemas(database) %}
3745
{% set results = run_query("SHOW DATABASES") %}
3846
{% set schemas = [] %}

integration_tests/dbt_project/macros/schema_utils/schema_exists.sql

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@
3939
{% do return(result | length > 0) %}
4040
{% endmacro %}
4141
42+
{% macro fabric__edr_schema_exists(database, schema_name) %}
43+
{% set safe_schema = schema_name | replace("'", "''") %}
44+
{% set result = run_query(
45+
"SELECT name FROM sys.schemas WHERE lower(name) = lower('"
46+
~ safe_schema
47+
~ "')"
48+
) %}
49+
{% do return(result | length > 0) %}
50+
{% endmacro %}
51+
4252
{% macro clickhouse__edr_schema_exists(database, schema_name) %}
4353
{% set safe_schema = schema_name | replace("'", "''") %}
4454
{% set result = run_query(
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
version: "3.8"
2+
services:
3+
sqlserver:
4+
image: mcr.microsoft.com/mssql/server:2022-latest
5+
container_name: sqlserver
6+
ports:
7+
- "127.0.0.1:1433:1433"
8+
environment:
9+
ACCEPT_EULA: "Y"
10+
MSSQL_SA_PASSWORD: "Elementary123!"
11+
MSSQL_PID: "Developer"
12+
healthcheck:
13+
test: /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P "Elementary123!" -C -Q "SELECT 1" -b
14+
interval: 10s
15+
timeout: 5s
16+
retries: 10

integration_tests/profiles/profiles.yml.j2

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,19 @@ elementary_tests:
6262
schema: {{ schema_name }}
6363
threads: 8
6464

65+
sqlserver: &sqlserver
66+
type: sqlserver
67+
driver: "ODBC Driver 18 for SQL Server"
68+
server: 127.0.0.1
69+
port: 1433
70+
database: master
71+
schema: {{ schema_name }}
72+
user: sa
73+
password: "Elementary123!"
74+
encrypt: false
75+
trust_cert: true
76+
threads: 4
77+
6578
# ── Cloud targets (secrets substituted at CI time) ─────────────────
6679

6780
snowflake: &snowflake
@@ -106,6 +119,21 @@ elementary_tests:
106119
client_secret: {{ databricks_client_secret | toyaml }}
107120
threads: 4
108121

122+
fabric: &fabric
123+
type: fabric
124+
driver: "ODBC Driver 18 for SQL Server"
125+
server: {{ fabric_server | toyaml }}
126+
port: 1433
127+
database: {{ fabric_database | toyaml }}
128+
schema: {{ schema_name }}
129+
authentication: ServicePrincipal
130+
tenant_id: {{ fabric_tenant_id | toyaml }}
131+
client_id: {{ fabric_client_id | toyaml }}
132+
client_secret: {{ fabric_client_secret | toyaml }}
133+
encrypt: true
134+
trust_cert: false
135+
threads: 4
136+
109137
athena: &athena
110138
type: athena
111139
s3_staging_dir: {{ athena_s3_staging_dir | toyaml }}
@@ -122,7 +150,7 @@ elementary_tests:
122150
elementary:
123151
target: postgres
124152
outputs:
125-
{%- set targets = ['postgres', 'clickhouse', 'trino', 'dremio', 'spark', 'duckdb', 'snowflake', 'bigquery', 'redshift', 'databricks_catalog', 'athena'] %}
153+
{%- set targets = ['postgres', 'clickhouse', 'trino', 'dremio', 'spark', 'duckdb', 'sqlserver', 'snowflake', 'bigquery', 'redshift', 'databricks_catalog', 'athena', 'fabric'] %}
126154
{%- for t in targets %}
127155
{{ t }}:
128156
<<: *{{ t }}

integration_tests/tests/dbt_project.py

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@
3232
logger = get_logger(__name__)
3333

3434

35+
class SelectLimit:
36+
"""Cross-adapter TOP / LIMIT helper.
37+
38+
On T-SQL (Fabric / SQL Server) ``SELECT TOP n ...`` is used instead of
39+
``... LIMIT n``. Instances expose two string properties so that callers
40+
can build dialect-agnostic queries::
41+
42+
sl = SelectLimit(1, is_tsql=True)
43+
query = f"SELECT {sl.top}col FROM t ORDER BY x {sl.limit}"
44+
# → "SELECT TOP 1 col FROM t ORDER BY x "
45+
"""
46+
47+
def __init__(self, n: int, is_tsql: bool) -> None:
48+
self.top = f"TOP {n} " if is_tsql else ""
49+
self.limit = "" if is_tsql else f"LIMIT {n}"
50+
51+
3552
def get_dbt_runner(
3653
target: str, project_dir: str, runner_method: Optional[RunnerMethod] = None
3754
) -> BaseDbtRunner:
@@ -95,22 +112,61 @@ def _run_query_with_run_operation(self, prerendered_query: str):
95112
)
96113
return json.loads(run_operation_results[0])
97114

98-
@staticmethod
115+
@property
116+
def is_tsql(self) -> bool:
117+
"""Return True when the target uses T-SQL dialect (Fabric / SQL Server)."""
118+
return self.target in ("fabric", "sqlserver")
119+
120+
def select_limit(self, n: int = 1) -> "SelectLimit":
121+
"""Return cross-adapter TOP/LIMIT helpers for use in raw SQL strings.
122+
123+
Usage::
124+
125+
sl = dbt_project.select_limit(1)
126+
query = f"SELECT {sl.top}col FROM t ORDER BY x {sl.limit}"
127+
"""
128+
return SelectLimit(n, self.is_tsql)
129+
130+
def samples_query(self, test_id: str, order_by: str = "created_at desc") -> str:
131+
"""Build a cross-adapter query to fetch test result sample rows.
132+
133+
This is the shared implementation of the ``SAMPLES_QUERY`` template
134+
that was previously duplicated across multiple test files.
135+
"""
136+
sl = self.select_limit(1)
137+
return f"""
138+
with latest_elementary_test_result as (
139+
select {sl.top}id
140+
from {{{{ ref("elementary_test_results") }}}}
141+
where lower(table_name) = lower('{test_id}')
142+
order by {order_by}
143+
{sl.limit}
144+
)
145+
146+
select result_row
147+
from {{{{ ref("test_result_rows") }}}}
148+
where elementary_test_results_id in (select * from latest_elementary_test_result)
149+
"""
150+
99151
def read_table_query(
152+
self,
100153
table_name: str,
101154
where: Optional[str] = None,
102155
group_by: Optional[str] = None,
103156
order_by: Optional[str] = None,
104157
limit: Optional[int] = None,
105158
column_names: Optional[List[str]] = None,
106159
):
160+
columns = ", ".join(column_names) if column_names else "*"
161+
top_clause = f"TOP {limit} " if limit and self.is_tsql else ""
162+
limit_clause = f"LIMIT {limit}" if limit and not self.is_tsql else ""
107163
return f"""
108-
SELECT {', '.join(column_names) if column_names else '*'}
164+
SELECT {top_clause}{columns}
109165
FROM {{{{ ref('{table_name}') }}}}
110166
{f"WHERE {where}" if where else ""}
111167
{f"GROUP BY {group_by}" if group_by else ""}
112168
{f"ORDER BY {order_by}" if order_by else ""}
113-
{f"LIMIT {limit}" if limit else ""}
169+
{limit_clause}
114170
"""
115171

116172
def read_table(

integration_tests/tests/test_anomalies_backfill_logic.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@
2929
# This returns data points used in the latest anomaly test
3030
ANOMALY_TEST_POINTS_QUERY = """
3131
with latest_elementary_test_result as (
32-
select id
32+
select {top_clause}id
3333
from {{{{ ref("elementary_test_results") }}}}
3434
where lower(table_name) = lower('{test_id}')
3535
order by created_at desc
36-
limit 1
36+
{limit_clause}
3737
)
3838
3939
select result_row
@@ -62,7 +62,13 @@ def get_daily_row_count_metrics(dbt_project: DbtProject, test_id: str):
6262

6363

6464
def get_latest_anomaly_test_metrics(dbt_project: DbtProject, test_id: str):
65-
results = dbt_project.run_query(ANOMALY_TEST_POINTS_QUERY.format(test_id=test_id))
65+
sl = dbt_project.select_limit(1)
66+
query = ANOMALY_TEST_POINTS_QUERY.format(
67+
test_id=test_id,
68+
top_clause=sl.top,
69+
limit_clause=sl.limit,
70+
)
71+
results = dbt_project.run_query(query)
6672
result_rows = [json.loads(result["result_row"]) for result in results]
6773
return {
6874
(

integration_tests/tests/test_anomalies_ranges.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515

1616
ANOMALY_TEST_POINTS_QUERY = """
1717
with latest_elementary_test_result as (
18-
select id
18+
select {top_clause}id
1919
from {{{{ ref("elementary_test_results") }}}}
2020
where lower(table_name) = lower('{test_id}')
2121
order by created_at desc
22-
limit 1
22+
{limit_clause}
2323
)
2424
2525
select result_row
@@ -29,7 +29,13 @@
2929

3030

3131
def get_latest_anomaly_test_points(dbt_project: DbtProject, test_id: str):
32-
results = dbt_project.run_query(ANOMALY_TEST_POINTS_QUERY.format(test_id=test_id))
32+
sl = dbt_project.select_limit(1)
33+
query = ANOMALY_TEST_POINTS_QUERY.format(
34+
test_id=test_id,
35+
top_clause=sl.top,
36+
limit_clause=sl.limit,
37+
)
38+
results = dbt_project.run_query(query)
3339
return [json.loads(result["result_row"]) for result in results]
3440

3541

integration_tests/tests/test_anomaly_exclude_metrics.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,11 @@ def test_exclude_specific_timestamps(test_id: str, dbt_project: DbtProject):
101101
)
102102
assert test_result["status"] == "pass"
103103

104+
# T-SQL uses datetime2 instead of timestamp.
105+
ts_type = "datetime2" if dbt_project.is_tsql else "timestamp"
104106
excluded_buckets_str = ", ".join(
105107
[
106-
"cast('%s' as timestamp)" % cur_ts.strftime(DATE_FORMAT)
108+
"cast('%s' as %s)" % (cur_ts.strftime(DATE_FORMAT), ts_type)
107109
for cur_ts in excluded_buckets
108110
]
109111
)

0 commit comments

Comments
 (0)