diff --git a/.github/workflows/test-all-warehouses.yml b/.github/workflows/test-all-warehouses.yml index 09f6a616f..83e06b56c 100644 --- a/.github/workflows/test-all-warehouses.yml +++ b/.github/workflows/test-all-warehouses.yml @@ -48,7 +48,8 @@ jobs: dbt-version: ${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) || fromJSON('["latest_official", "latest_pre"]') }} - warehouse-type: [postgres, clickhouse, trino, dremio, spark, duckdb] + warehouse-type: + [postgres, clickhouse, trino, dremio, spark, duckdb, sqlserver] exclude: # latest_pre is only tested on postgres - dbt-version: latest_pre @@ -61,6 +62,8 @@ jobs: warehouse-type: spark - dbt-version: latest_pre warehouse-type: duckdb + - dbt-version: latest_pre + warehouse-type: sqlserver uses: ./.github/workflows/test-warehouse.yml with: warehouse-type: ${{ matrix.warehouse-type }} @@ -124,7 +127,7 @@ jobs: ${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) || fromJSON('["latest_official"]') }} warehouse-type: - [snowflake, bigquery, redshift, databricks_catalog, athena] + [snowflake, bigquery, redshift, databricks_catalog, athena, fabric] # Fusion includes: always run fusion alongside the base version for # supported warehouses. When inputs.dbt-version is already 'fusion' the # matrix deduplicates automatically. diff --git a/.github/workflows/test-warehouse.yml b/.github/workflows/test-warehouse.yml index 4fa5b1072..baff275a6 100644 --- a/.github/workflows/test-warehouse.yml +++ b/.github/workflows/test-warehouse.yml @@ -18,6 +18,8 @@ on: - clickhouse - dremio - duckdb + - sqlserver + - fabric elementary-ref: type: string required: false @@ -55,6 +57,7 @@ env: jobs: test: runs-on: ubuntu-latest + timeout-minutes: 60 concurrency: # Serialises runs for the same warehouse × dbt-version × branch. # The schema name is derived from a hash of this group (see "Write dbt profiles"). @@ -100,6 +103,23 @@ jobs: timeout 180 bash -c 'until [ "$(docker inspect -f {{.State.Health.Status}} dremio 2>/dev/null)" = "healthy" ]; do sleep 5; done' echo "Dremio is healthy." + - name: Start SQL Server + if: inputs.warehouse-type == 'sqlserver' + working-directory: ${{ env.TESTS_DIR }} + run: | + docker compose -f docker-compose-sqlserver.yml up -d + echo "Waiting for SQL Server to become healthy..." + timeout 120 bash -c 'until [ "$(docker inspect -f {{.State.Health.Status}} sqlserver 2>/dev/null)" = "healthy" ]; do sleep 5; done' + echo "SQL Server is healthy." + + - name: Install ODBC Driver + if: inputs.warehouse-type == 'sqlserver' || inputs.warehouse-type == 'fabric' + run: | + curl https://packages.microsoft.com/keys/microsoft.asc | sudo tee /etc/apt/trusted.gpg.d/microsoft.asc + curl https://packages.microsoft.com/config/ubuntu/$(lsb_release -rs)/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list + sudo apt-get update + sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18 unixodbc-dev + - name: Start Spark if: inputs.warehouse-type == 'spark' working-directory: ${{ env.TESTS_DIR }} @@ -206,7 +226,7 @@ jobs: - name: Drop test schemas if: >- always() && - contains(fromJSON('["snowflake","bigquery","redshift","databricks_catalog","athena"]'), inputs.warehouse-type) + contains(fromJSON('["snowflake","bigquery","redshift","databricks_catalog","athena","fabric"]'), inputs.warehouse-type) working-directory: ${{ env.TESTS_DIR }} continue-on-error: true run: | diff --git a/integration_tests/dbt_project/macros/schema_utils/list_schemas.sql b/integration_tests/dbt_project/macros/schema_utils/list_schemas.sql index ff8e8dd5e..e5986f810 100644 --- a/integration_tests/dbt_project/macros/schema_utils/list_schemas.sql +++ b/integration_tests/dbt_project/macros/schema_utils/list_schemas.sql @@ -33,6 +33,14 @@ {% do return(schemas) %} {% endmacro %} +{% macro fabric__edr_list_schemas(database) %} + {# Fabric does not support information_schema.schemata; use sys.schemas instead #} + {% set results = run_query("SELECT name FROM sys.schemas") %} + {% set schemas = [] %} + {% for row in results %} {% do schemas.append(row[0]) %} {% endfor %} + {% do return(schemas) %} +{% endmacro %} + {% macro clickhouse__edr_list_schemas(database) %} {% set results = run_query("SHOW DATABASES") %} {% set schemas = [] %} diff --git a/integration_tests/dbt_project/macros/schema_utils/schema_exists.sql b/integration_tests/dbt_project/macros/schema_utils/schema_exists.sql index e40d19c33..06fab2592 100644 --- a/integration_tests/dbt_project/macros/schema_utils/schema_exists.sql +++ b/integration_tests/dbt_project/macros/schema_utils/schema_exists.sql @@ -39,6 +39,16 @@ {% do return(result | length > 0) %} {% endmacro %} +{% macro fabric__edr_schema_exists(database, schema_name) %} + {% set safe_schema = schema_name | replace("'", "''") %} + {% set result = run_query( + "SELECT name FROM sys.schemas WHERE lower(name) = lower('" + ~ safe_schema + ~ "')" + ) %} + {% do return(result | length > 0) %} +{% endmacro %} + {% macro clickhouse__edr_schema_exists(database, schema_name) %} {% set safe_schema = schema_name | replace("'", "''") %} {% set result = run_query( diff --git a/integration_tests/docker-compose-sqlserver.yml b/integration_tests/docker-compose-sqlserver.yml new file mode 100644 index 000000000..b32863418 --- /dev/null +++ b/integration_tests/docker-compose-sqlserver.yml @@ -0,0 +1,16 @@ +version: "3.8" +services: + sqlserver: + image: mcr.microsoft.com/mssql/server:2022-latest + container_name: sqlserver + ports: + - "127.0.0.1:1433:1433" + environment: + ACCEPT_EULA: "Y" + MSSQL_SA_PASSWORD: "Elementary123!" + MSSQL_PID: "Developer" + healthcheck: + test: /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P "Elementary123!" -C -Q "SELECT 1" -b + interval: 10s + timeout: 5s + retries: 10 diff --git a/integration_tests/profiles/profiles.yml.j2 b/integration_tests/profiles/profiles.yml.j2 index 10fad988a..e2f387847 100644 --- a/integration_tests/profiles/profiles.yml.j2 +++ b/integration_tests/profiles/profiles.yml.j2 @@ -62,6 +62,19 @@ elementary_tests: schema: {{ schema_name }} threads: 8 + sqlserver: &sqlserver + type: sqlserver + driver: "ODBC Driver 18 for SQL Server" + server: 127.0.0.1 + port: 1433 + database: master + schema: {{ schema_name }} + user: sa + password: "Elementary123!" + encrypt: false + trust_cert: true + threads: 4 + # ── Cloud targets (secrets substituted at CI time) ───────────────── snowflake: &snowflake @@ -106,6 +119,21 @@ elementary_tests: client_secret: {{ databricks_client_secret | toyaml }} threads: 4 + fabric: &fabric + type: fabric + driver: "ODBC Driver 18 for SQL Server" + server: {{ fabric_server | toyaml }} + port: 1433 + database: {{ fabric_database | toyaml }} + schema: {{ schema_name }} + authentication: ServicePrincipal + tenant_id: {{ fabric_tenant_id | toyaml }} + client_id: {{ fabric_client_id | toyaml }} + client_secret: {{ fabric_client_secret | toyaml }} + encrypt: true + trust_cert: false + threads: 4 + athena: &athena type: athena s3_staging_dir: {{ athena_s3_staging_dir | toyaml }} @@ -122,7 +150,7 @@ elementary_tests: elementary: target: postgres outputs: -{%- set targets = ['postgres', 'clickhouse', 'trino', 'dremio', 'spark', 'duckdb', 'snowflake', 'bigquery', 'redshift', 'databricks_catalog', 'athena'] %} +{%- set targets = ['postgres', 'clickhouse', 'trino', 'dremio', 'spark', 'duckdb', 'sqlserver', 'snowflake', 'bigquery', 'redshift', 'databricks_catalog', 'athena', 'fabric'] %} {%- for t in targets %} {{ t }}: <<: *{{ t }} diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 66ec5d30a..67c4373da 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -32,6 +32,23 @@ logger = get_logger(__name__) +class SelectLimit: + """Cross-adapter TOP / LIMIT helper. + + On T-SQL (Fabric / SQL Server) ``SELECT TOP n ...`` is used instead of + ``... LIMIT n``. Instances expose two string properties so that callers + can build dialect-agnostic queries:: + + sl = SelectLimit(1, is_tsql=True) + query = f"SELECT {sl.top}col FROM t ORDER BY x {sl.limit}" + # → "SELECT TOP 1 col FROM t ORDER BY x " + """ + + def __init__(self, n: int, is_tsql: bool) -> None: + self.top = f"TOP {n} " if is_tsql else "" + self.limit = "" if is_tsql else f"LIMIT {n}" + + def get_dbt_runner( target: str, project_dir: str, runner_method: Optional[RunnerMethod] = None ) -> BaseDbtRunner: @@ -95,8 +112,44 @@ def _run_query_with_run_operation(self, prerendered_query: str): ) return json.loads(run_operation_results[0]) - @staticmethod + @property + def is_tsql(self) -> bool: + """Return True when the target uses T-SQL dialect (Fabric / SQL Server).""" + return self.target in ("fabric", "sqlserver") + + def select_limit(self, n: int = 1) -> "SelectLimit": + """Return cross-adapter TOP/LIMIT helpers for use in raw SQL strings. + + Usage:: + + sl = dbt_project.select_limit(1) + query = f"SELECT {sl.top}col FROM t ORDER BY x {sl.limit}" + """ + return SelectLimit(n, self.is_tsql) + + def samples_query(self, test_id: str, order_by: str = "created_at desc") -> str: + """Build a cross-adapter query to fetch test result sample rows. + + This is the shared implementation of the ``SAMPLES_QUERY`` template + that was previously duplicated across multiple test files. + """ + sl = self.select_limit(1) + return f""" + with latest_elementary_test_result as ( + select {sl.top}id + from {{{{ ref("elementary_test_results") }}}} + where lower(table_name) = lower('{test_id}') + order by {order_by} + {sl.limit} + ) + + select result_row + from {{{{ ref("test_result_rows") }}}} + where elementary_test_results_id in (select * from latest_elementary_test_result) + """ + def read_table_query( + self, table_name: str, where: Optional[str] = None, group_by: Optional[str] = None, @@ -104,13 +157,16 @@ def read_table_query( limit: Optional[int] = None, column_names: Optional[List[str]] = None, ): + columns = ", ".join(column_names) if column_names else "*" + top_clause = f"TOP {limit} " if limit and self.is_tsql else "" + limit_clause = f"LIMIT {limit}" if limit and not self.is_tsql else "" return f""" - SELECT {', '.join(column_names) if column_names else '*'} + SELECT {top_clause}{columns} FROM {{{{ ref('{table_name}') }}}} {f"WHERE {where}" if where else ""} {f"GROUP BY {group_by}" if group_by else ""} {f"ORDER BY {order_by}" if order_by else ""} - {f"LIMIT {limit}" if limit else ""} + {limit_clause} """ def read_table( diff --git a/integration_tests/tests/test_anomalies_backfill_logic.py b/integration_tests/tests/test_anomalies_backfill_logic.py index 88f6ad677..a4b181b11 100644 --- a/integration_tests/tests/test_anomalies_backfill_logic.py +++ b/integration_tests/tests/test_anomalies_backfill_logic.py @@ -29,11 +29,11 @@ # This returns data points used in the latest anomaly test ANOMALY_TEST_POINTS_QUERY = """ with latest_elementary_test_result as ( - select id + select {top_clause}id from {{{{ ref("elementary_test_results") }}}} where lower(table_name) = lower('{test_id}') order by created_at desc - limit 1 + {limit_clause} ) select result_row @@ -62,7 +62,13 @@ def get_daily_row_count_metrics(dbt_project: DbtProject, test_id: str): def get_latest_anomaly_test_metrics(dbt_project: DbtProject, test_id: str): - results = dbt_project.run_query(ANOMALY_TEST_POINTS_QUERY.format(test_id=test_id)) + sl = dbt_project.select_limit(1) + query = ANOMALY_TEST_POINTS_QUERY.format( + test_id=test_id, + top_clause=sl.top, + limit_clause=sl.limit, + ) + results = dbt_project.run_query(query) result_rows = [json.loads(result["result_row"]) for result in results] return { ( diff --git a/integration_tests/tests/test_anomalies_ranges.py b/integration_tests/tests/test_anomalies_ranges.py index 890f421a4..596c63748 100644 --- a/integration_tests/tests/test_anomalies_ranges.py +++ b/integration_tests/tests/test_anomalies_ranges.py @@ -15,11 +15,11 @@ ANOMALY_TEST_POINTS_QUERY = """ with latest_elementary_test_result as ( - select id + select {top_clause}id from {{{{ ref("elementary_test_results") }}}} where lower(table_name) = lower('{test_id}') order by created_at desc - limit 1 + {limit_clause} ) select result_row @@ -29,7 +29,13 @@ def get_latest_anomaly_test_points(dbt_project: DbtProject, test_id: str): - results = dbt_project.run_query(ANOMALY_TEST_POINTS_QUERY.format(test_id=test_id)) + sl = dbt_project.select_limit(1) + query = ANOMALY_TEST_POINTS_QUERY.format( + test_id=test_id, + top_clause=sl.top, + limit_clause=sl.limit, + ) + results = dbt_project.run_query(query) return [json.loads(result["result_row"]) for result in results] diff --git a/integration_tests/tests/test_anomaly_exclude_metrics.py b/integration_tests/tests/test_anomaly_exclude_metrics.py index 40892506f..13c6d050c 100644 --- a/integration_tests/tests/test_anomaly_exclude_metrics.py +++ b/integration_tests/tests/test_anomaly_exclude_metrics.py @@ -101,9 +101,11 @@ def test_exclude_specific_timestamps(test_id: str, dbt_project: DbtProject): ) assert test_result["status"] == "pass" + # T-SQL uses datetime2 instead of timestamp. + ts_type = "datetime2" if dbt_project.is_tsql else "timestamp" excluded_buckets_str = ", ".join( [ - "cast('%s' as timestamp)" % cur_ts.strftime(DATE_FORMAT) + "cast('%s' as %s)" % (cur_ts.strftime(DATE_FORMAT), ts_type) for cur_ts in excluded_buckets ] ) diff --git a/integration_tests/tests/test_column_pii_sampling.py b/integration_tests/tests/test_column_pii_sampling.py index 9955c37d5..b0010b7f4 100644 --- a/integration_tests/tests/test_column_pii_sampling.py +++ b/integration_tests/tests/test_column_pii_sampling.py @@ -5,20 +5,6 @@ SENSITIVE_COLUMN = "email" SAFE_COLUMN = "order_count" -SAMPLES_QUERY = """ - with latest_elementary_test_result as ( - select id - from {{{{ ref("elementary_test_results") }}}} - where lower(table_name) = lower('{test_id}') - order by created_at desc - limit 1 - ) - - select result_row - from {{{{ ref("test_result_rows") }}}} - where elementary_test_results_id in (select * from latest_elementary_test_result) -""" - TEST_SAMPLE_ROW_COUNT = 5 @@ -47,7 +33,7 @@ def test_column_pii_sampling_enabled(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 0 @@ -77,7 +63,7 @@ def test_column_pii_sampling_disabled(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] # sample should be {'unique_field': 'user@example.com', 'n_records': 10} @@ -113,7 +99,7 @@ def test_column_pii_default_tag_override(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] # sample should be {'unique_field': 'user@example.com', 'n_records': 10} @@ -150,7 +136,7 @@ def test_column_pii_sampling_tags_exist_but_flag_disabled( samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] # When flag is disabled, we get the full sample (not limited by PII filtering) @@ -188,7 +174,7 @@ def test_column_pii_sampling_all_columns_pii(test_id: str, dbt_project: DbtProje samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] # When all columns are PII, no samples should be collected @@ -219,7 +205,7 @@ def test_unique_test_custom_tag(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 0 @@ -249,7 +235,7 @@ def test_accepted_values_multi_tags(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 0 @@ -279,7 +265,7 @@ def test_not_null_test_multi_matched_tags(test_id: str, dbt_project: DbtProject) samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 0 @@ -312,7 +298,7 @@ def test_multiple_pii_columns_mapping(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 0 @@ -342,7 +328,7 @@ def test_custom_sql_test_with_pii_column_simple(test_id: str, dbt_project: DbtPr samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 0 @@ -372,7 +358,7 @@ def test_meta_tags_and_accepted_values(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 0 diff --git a/integration_tests/tests/test_dimension_anomalies.py b/integration_tests/tests/test_dimension_anomalies.py index fd240bf90..51d7a05b0 100644 --- a/integration_tests/tests/test_dimension_anomalies.py +++ b/integration_tests/tests/test_dimension_anomalies.py @@ -10,14 +10,15 @@ DBT_TEST_NAME = "elementary.dimension_anomalies" DBT_TEST_ARGS = {"timestamp_column": TIMESTAMP_COLUMN, "dimensions": ["superhero"]} -# This returns data points used in the latest anomaly test +# This returns data points used in the latest anomaly test. +# T-SQL does not support LIMIT; use TOP instead when target is fabric/sqlserver. ANOMALY_TEST_POINTS_QUERY = """ with latest_elementary_test_result as ( - select id + select {top_clause}id from {{{{ ref("elementary_test_results") }}}} where lower(table_name) = lower('{test_id}') order by created_at desc - limit 1 + {limit_clause} ) select result_row @@ -27,7 +28,13 @@ def get_latest_anomaly_test_points(dbt_project: DbtProject, test_id: str): - results = dbt_project.run_query(ANOMALY_TEST_POINTS_QUERY.format(test_id=test_id)) + sl = dbt_project.select_limit(1) + query = ANOMALY_TEST_POINTS_QUERY.format( + test_id=test_id, + top_clause=sl.top, + limit_clause=sl.limit, + ) + results = dbt_project.run_query(query) return [json.loads(result["result_row"]) for result in results] diff --git a/integration_tests/tests/test_disable_samples_config.py b/integration_tests/tests/test_disable_samples_config.py index 662a6c758..f1d6d23f6 100644 --- a/integration_tests/tests/test_disable_samples_config.py +++ b/integration_tests/tests/test_disable_samples_config.py @@ -4,20 +4,6 @@ COLUMN_NAME = "sensitive_data" -SAMPLES_QUERY = """ - with latest_elementary_test_result as ( - select id - from {{{{ ref("elementary_test_results") }}}} - where lower(table_name) = lower('{test_id}') - order by created_at desc - limit 1 - ) - - select result_row - from {{{{ ref("test_result_rows") }}}} - where elementary_test_results_id in (select * from latest_elementary_test_result) -""" - def test_disable_samples_config_prevents_sampling( test_id: str, dbt_project: DbtProject @@ -41,7 +27,7 @@ def test_disable_samples_config_prevents_sampling( samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 0 @@ -66,7 +52,7 @@ def test_disable_samples_false_allows_sampling(test_id: str, dbt_project: DbtPro samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 5 for sample in samples: @@ -97,7 +83,7 @@ def test_disable_samples_config_overrides_pii_tags( samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 0 @@ -126,7 +112,7 @@ def test_disable_samples_and_pii_interaction(test_id: str, dbt_project: DbtProje samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 0 @@ -152,7 +138,7 @@ def test_disable_samples_with_multiple_columns(test_id: str, dbt_project: DbtPro samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 0 diff --git a/integration_tests/tests/test_exposure_schema_validity.py b/integration_tests/tests/test_exposure_schema_validity.py index dee5f03f0..466236928 100644 --- a/integration_tests/tests/test_exposure_schema_validity.py +++ b/integration_tests/tests/test_exposure_schema_validity.py @@ -5,21 +5,6 @@ DBT_TEST_NAME = "elementary.exposure_schema_validity" -INVALID_EXPOSURES_QUERY = """ - with latest_elementary_test_result as ( - select id - from {{{{ ref("elementary_test_results") }}}} - where lower(table_name) = lower('{test_id}') - order by created_at desc - limit 1 - ) - - select result_row - from {{{{ ref("test_result_rows") }}}} - where elementary_test_results_id in (select * from latest_elementary_test_result) -""" - - def seed(dbt_project: DbtProject): seed_result = dbt_project.dbt_runner.seed(full_refresh=True) assert seed_result is True @@ -120,9 +105,7 @@ def test_exposure_schema_validity_correct_columns_and_invalid_type( invalid_exposures = [ json.loads(row["result_row"]) - for row in dbt_project.run_query( - INVALID_EXPOSURES_QUERY.format(test_id=test_id) - ) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(invalid_exposures) == 1 assert invalid_exposures[0]["exposure"] == "ZOMG" @@ -170,9 +153,7 @@ def test_exposure_schema_validity_invalid_type_name_present_in_error( invalid_exposures = [ json.loads(row["result_row"]) - for row in dbt_project.run_query( - INVALID_EXPOSURES_QUERY.format(test_id=test_id) - ) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(invalid_exposures) == 1 assert invalid_exposures[0]["exposure"] == "ZOMG" @@ -231,9 +212,7 @@ def test_exposure_schema_validity_missing_columns( invalid_exposures = [ json.loads(row["result_row"]) - for row in dbt_project.run_query( - INVALID_EXPOSURES_QUERY.format(test_id=test_id) - ) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(invalid_exposures) == 1 assert invalid_exposures[0]["exposure"] == "ZOMG" diff --git a/integration_tests/tests/test_override_samples_config.py b/integration_tests/tests/test_override_samples_config.py index c5d2b70ec..f0bb92aac 100644 --- a/integration_tests/tests/test_override_samples_config.py +++ b/integration_tests/tests/test_override_samples_config.py @@ -4,20 +4,6 @@ COLUMN_NAME = "some_data" -SAMPLES_QUERY = """ - with latest_elementary_test_result as ( - select id - from {{{{ ref("elementary_test_results") }}}} - where lower(table_name) = lower('{test_id}') - order by created_at desc - limit 1 - ) - - select result_row - from {{{{ ref("test_result_rows") }}}} - where elementary_test_results_id in (select * from latest_elementary_test_result) -""" - def test_sample_count_unlimited(test_id: str, dbt_project: DbtProject): null_count = 20 @@ -39,7 +25,7 @@ def test_sample_count_unlimited(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 20 for sample in samples: @@ -67,7 +53,7 @@ def test_sample_count_small(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == 3 for sample in samples: diff --git a/integration_tests/tests/test_sampling.py b/integration_tests/tests/test_sampling.py index 660bc03c8..60aa05b78 100644 --- a/integration_tests/tests/test_sampling.py +++ b/integration_tests/tests/test_sampling.py @@ -5,20 +5,6 @@ COLUMN_NAME = "some_column" -SAMPLES_QUERY = """ - with latest_elementary_test_result as ( - select id - from {{{{ ref("elementary_test_results") }}}} - where lower(table_name) = lower('{test_id}') - order by created_at desc - limit 1 - ) - - select result_row - from {{{{ ref("test_result_rows") }}}} - where elementary_test_results_id in (select * from latest_elementary_test_result) -""" - TEST_SAMPLE_ROW_COUNT = 7 @@ -39,7 +25,7 @@ def test_sampling(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query(dbt_project.samples_query(test_id)) ] assert len(samples) == TEST_SAMPLE_ROW_COUNT assert all([row == {COLUMN_NAME: None} for row in samples]) diff --git a/integration_tests/tests/test_sampling_pii.py b/integration_tests/tests/test_sampling_pii.py index beaad2fda..fabfad875 100644 --- a/integration_tests/tests/test_sampling_pii.py +++ b/integration_tests/tests/test_sampling_pii.py @@ -5,20 +5,6 @@ COLUMN_NAME = "some_column" -SAMPLES_QUERY = """ - with latest_elementary_test_result as ( - select id - from {{{{ ref("elementary_test_results") }}}} - where lower(table_name) = lower('{test_id}') - order by created_at desc, id desc - limit 1 - ) - - select result_row - from {{{{ ref("test_result_rows") }}}} - where elementary_test_results_id in (select * from latest_elementary_test_result) -""" - TEST_SAMPLE_ROW_COUNT = 7 @@ -45,7 +31,9 @@ def test_sampling_pii_disabled(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query( + dbt_project.samples_query(test_id, order_by="created_at desc, id desc") + ) ] assert len(samples) == 0 @@ -73,7 +61,9 @@ def test_sampling_pii_disabled_with_default_config_and_casing( samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query( + dbt_project.samples_query(test_id, order_by="created_at desc, id desc") + ) ] assert len(samples) == 0 @@ -101,7 +91,9 @@ def test_sampling_pii_enabled_with_default_config( samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query( + dbt_project.samples_query(test_id, order_by="created_at desc, id desc") + ) ] assert len(samples) == TEST_SAMPLE_ROW_COUNT @@ -129,7 +121,9 @@ def test_sampling_non_pii_enabled(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query( + dbt_project.samples_query(test_id, order_by="created_at desc, id desc") + ) ] assert len(samples) == TEST_SAMPLE_ROW_COUNT @@ -157,7 +151,9 @@ def test_sampling_pii_feature_disabled(test_id: str, dbt_project: DbtProject): samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query( + dbt_project.samples_query(test_id, order_by="created_at desc, id desc") + ) ] assert len(samples) == TEST_SAMPLE_ROW_COUNT @@ -187,7 +183,9 @@ def test_sampling_disable_samples_overrides_pii(test_id: str, dbt_project: DbtPr samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query( + dbt_project.samples_query(test_id, order_by="created_at desc, id desc") + ) ] assert len(samples) == 0 @@ -219,6 +217,8 @@ def test_sampling_disable_samples_false_allows_samples( samples = [ json.loads(row["result_row"]) - for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id)) + for row in dbt_project.run_query( + dbt_project.samples_query(test_id, order_by="created_at desc, id desc") + ) ] assert len(samples) == TEST_SAMPLE_ROW_COUNT diff --git a/macros/edr/alerts/anomaly_detection_description.sql b/macros/edr/alerts/anomaly_detection_description.sql index 9634eeedd..be0dae8cc 100644 --- a/macros/edr/alerts/anomaly_detection_description.sql +++ b/macros/edr/alerts/anomaly_detection_description.sql @@ -15,106 +15,121 @@ {% endmacro %} {% macro freshness_description() %} - 'Last update was at ' - || anomalous_value - || ', ' - || 'which is' - || {{ - elementary.edr_cast_as_string( - "abs(round(" - ~ elementary.edr_cast_as_numeric("metric_value/3600") - ~ ", 2))" + {%- set metric_hours -%} + {{ elementary.edr_cast_as_string("abs(round(" ~ elementary.edr_cast_as_numeric("metric_value/3600") ~ ", 2))") }} + {%- endset -%} + {%- set training_hours -%} + {{ elementary.edr_cast_as_string("abs(round(" ~ elementary.edr_cast_as_numeric("training_avg/3600") ~ ", 2))") }} + {%- endset -%} + {{ + dbt.concat( + [ + "'Last update was at '", + "anomalous_value", + "', which is'", + metric_hours | trim, + "' hours without updates (only full buckets are considered). Usually the table is updated within '", + training_hours | trim, + "' hours.'", + ] ) }} - || ' hours without updates (only full buckets are considered). Usually the table is updated within ' - || {{ - elementary.edr_cast_as_string( - "abs(round(" - ~ elementary.edr_cast_as_numeric("training_avg/3600") - ~ ", 2))" - ) - }} - || ' hours.' {% endmacro %} {% macro table_metric_description() %} - 'The last ' - || metric_name - || ' value is ' - || {{ - elementary.edr_cast_as_string( - "round(" ~ elementary.edr_cast_as_numeric("metric_value") ~ ", 3)" + {%- set metric_val -%} + {{ elementary.edr_cast_as_string("round(" ~ elementary.edr_cast_as_numeric("metric_value") ~ ", 3)") }} + {%- endset -%} + {%- set training_val -%} + {{ elementary.edr_cast_as_string("round(" ~ elementary.edr_cast_as_numeric("training_avg") ~ ", 3)") }} + {%- endset -%} + {{ + dbt.concat( + [ + "'The last '", + "metric_name", + "' value is '", + metric_val | trim, + "'. The average for this metric is '", + training_val | trim, + "'.'", + ] ) }} - || '. The average for this metric is ' - || {{ - elementary.edr_cast_as_string( - "round(" ~ elementary.edr_cast_as_numeric("training_avg") ~ ", 3)" - ) - }} - || '.' {% endmacro %} {% macro column_metric_description() %} - 'In column ' - || column_name - || ', the last ' - || metric_name - || ' value is ' - || {{ - elementary.edr_cast_as_string( - "round(" ~ elementary.edr_cast_as_numeric("metric_value") ~ ", 3)" - ) - }} - || '. The average for this metric is ' - || {{ - elementary.edr_cast_as_string( - "round(" ~ elementary.edr_cast_as_numeric("training_avg") ~ ", 3)" + {%- set metric_val -%} + {{ elementary.edr_cast_as_string("round(" ~ elementary.edr_cast_as_numeric("metric_value") ~ ", 3)") }} + {%- endset -%} + {%- set training_val -%} + {{ elementary.edr_cast_as_string("round(" ~ elementary.edr_cast_as_numeric("training_avg") ~ ", 3)") }} + {%- endset -%} + {{ + dbt.concat( + [ + "'In column '", + "column_name", + "', the last '", + "metric_name", + "' value is '", + metric_val | trim, + "'. The average for this metric is '", + training_val | trim, + "'.'", + ] ) }} - || '.' {% endmacro %} {% macro column_dimension_metric_description() %} - 'In column ' - || column_name - || ', the last ' - || metric_name - || ' value for dimension ' - || dimension - || ' is ' - || {{ - elementary.edr_cast_as_string( - "round(" ~ elementary.edr_cast_as_numeric("metric_value") ~ ", 3)" - ) - }} - || '. The average for this metric is ' - || {{ - elementary.edr_cast_as_string( - "round(" ~ elementary.edr_cast_as_numeric("training_avg") ~ ", 3)" + {%- set metric_val -%} + {{ elementary.edr_cast_as_string("round(" ~ elementary.edr_cast_as_numeric("metric_value") ~ ", 3)") }} + {%- endset -%} + {%- set training_val -%} + {{ elementary.edr_cast_as_string("round(" ~ elementary.edr_cast_as_numeric("training_avg") ~ ", 3)") }} + {%- endset -%} + {{ + dbt.concat( + [ + "'In column '", + "column_name", + "', the last '", + "metric_name", + "' value for dimension '", + "dimension", + "' is '", + metric_val | trim, + "'. The average for this metric is '", + training_val | trim, + "'.'", + ] ) }} - || '.' {% endmacro %} {% macro dimension_metric_description() %} - 'The last ' - || metric_name - || ' value for dimension ' - || dimension - || ' - ' - || case when dimension_value is null then 'NULL' else dimension_value end - || ' is ' - || {{ - elementary.edr_cast_as_string( - "round(" ~ elementary.edr_cast_as_numeric("metric_value") ~ ", 3)" - ) - }} - || '. The average for this metric is ' - || {{ - elementary.edr_cast_as_string( - "round(" ~ elementary.edr_cast_as_numeric("training_avg") ~ ", 3)" + {%- set metric_val -%} + {{ elementary.edr_cast_as_string("round(" ~ elementary.edr_cast_as_numeric("metric_value") ~ ", 3)") }} + {%- endset -%} + {%- set training_val -%} + {{ elementary.edr_cast_as_string("round(" ~ elementary.edr_cast_as_numeric("training_avg") ~ ", 3)") }} + {%- endset -%} + {{ + dbt.concat( + [ + "'The last '", + "metric_name", + "' value for dimension '", + "dimension", + "' - '", + "case when dimension_value is null then 'NULL' else dimension_value end", + "' is '", + metric_val | trim, + "'. The average for this metric is '", + training_val | trim, + "'.'", + ] ) }} - || '.' {% endmacro %} diff --git a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql index 7798d06b7..5c3eeacdd 100644 --- a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql +++ b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql @@ -93,6 +93,11 @@ bucket end (which is the actual time of the test) #} {%- set metric_time_bucket_expr = "case when bucket_start is not null then bucket_start else bucket_end end" %} + {%- set exclude_filter = ( + "should_exclude_from_training = " + ~ elementary.edr_boolean_literal(false) + ) -%} + {%- set anomaly_scores_query %} {% if test_configuration.timestamp_column %} with buckets as ( @@ -199,12 +204,12 @@ bucket_start, bucket_end, {{ bucket_seasonality_expr }} as bucket_seasonality, - {{ test_configuration.anomaly_exclude_metrics or 'FALSE' }} as is_excluded, + {{ elementary.edr_condition_as_boolean(test_configuration.anomaly_exclude_metrics) }} as is_excluded, {# Flag detection period metrics for exclusion from training #} {% if test_configuration.exclude_detection_period_from_training %} - bucket_end > {{ detection_period_start_expr }} + {{ elementary.edr_condition_as_boolean('bucket_end > ' ~ detection_period_start_expr) }} {% else %} - FALSE + {{ elementary.edr_boolean_literal(false) }} {% endif %} as should_exclude_from_training, bucket_duration_hours, updated_at @@ -229,14 +234,17 @@ bucket_duration_hours, updated_at, should_exclude_from_training, - avg(case when not should_exclude_from_training then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_avg, - {{ elementary.standard_deviation('case when not should_exclude_from_training then metric_value end') }} over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_stddev, - count(case when not should_exclude_from_training then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_set_size, - last_value(case when not should_exclude_from_training then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) training_end, - first_value(case when not should_exclude_from_training then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_start + avg(case when {{ exclude_filter }} then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_avg, + {{ elementary.standard_deviation('case when ' ~ exclude_filter ~ ' then metric_value end') }} over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_stddev, + count(case when {{ exclude_filter }} then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_set_size, + last_value(case when {{ exclude_filter }} then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) training_end, + first_value(case when {{ exclude_filter }} then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_start from grouped_metrics - where not is_excluded - {{ dbt_utils.group_by(14) }} + where {{ elementary.edr_is_false('is_excluded') }} + group by metric_id, full_table_name, column_name, dimension, + dimension_value, metric_name, metric_value, source_value, + bucket_start, bucket_end, bucket_seasonality, + bucket_duration_hours, updated_at, should_exclude_from_training ), anomaly_scores as ( diff --git a/macros/edr/data_monitoring/monitors/column_boolean_monitors.sql b/macros/edr/data_monitoring/monitors/column_boolean_monitors.sql index 1f97591b6..448f84980 100644 --- a/macros/edr/data_monitoring/monitors/column_boolean_monitors.sql +++ b/macros/edr/data_monitoring/monitors/column_boolean_monitors.sql @@ -1,25 +1,17 @@ {% macro count_true(column_name) -%} + {%- set bool_expr = ( + "cast(" ~ column_name ~ " as " ~ elementary.edr_type_bool() ~ ")" + ) -%} coalesce( - sum( - case - when cast({{ column_name }} as {{ elementary.edr_type_bool() }}) = true - then 1 - else 0 - end - ), - 0 + sum(case when {{ elementary.edr_is_true(bool_expr) }} then 1 else 0 end), 0 ) {%- endmacro %} {% macro count_false(column_name) -%} + {%- set bool_expr = ( + "cast(" ~ column_name ~ " as " ~ elementary.edr_type_bool() ~ ")" + ) -%} coalesce( - sum( - case - when cast({{ column_name }} as {{ elementary.edr_type_bool() }}) = true - then 0 - else 1 - end - ), - 0 + sum(case when {{ elementary.edr_is_false(bool_expr) }} then 1 else 0 end), 0 ) {%- endmacro %} diff --git a/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql b/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql index 432717fcb..1618bb205 100644 --- a/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql +++ b/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql @@ -61,6 +61,11 @@ stddev_pop(cast({{ column_name }} as {{ elementary.edr_type_float() }})) {%- endmacro %} +{# T-SQL uses STDEV instead of stddev #} +{% macro fabric__standard_deviation(column_name) -%} + stdev(cast({{ column_name }} as {{ elementary.edr_type_float() }})) +{%- endmacro %} + {% macro variance(column_name) -%} {{ return(adapter.dispatch("variance", "elementary")(column_name)) }} {%- endmacro %} @@ -73,6 +78,11 @@ varSamp(cast({{ column_name }} as Nullable({{ elementary.edr_type_float() }}))) {%- endmacro %} +{# T-SQL uses VAR instead of variance #} +{% macro fabric__variance(column_name) -%} + var(cast({{ column_name }} as {{ elementary.edr_type_float() }})) +{%- endmacro %} + {% macro sum(column_name) -%} sum(cast({{ column_name }} as {{ elementary.edr_type_float() }})) {%- endmacro %} diff --git a/macros/edr/data_monitoring/monitors/column_string_monitors.sql b/macros/edr/data_monitoring/monitors/column_string_monitors.sql index fc615e9f1..a06036ed3 100644 --- a/macros/edr/data_monitoring/monitors/column_string_monitors.sql +++ b/macros/edr/data_monitoring/monitors/column_string_monitors.sql @@ -1,8 +1,28 @@ -{% macro max_length(column_name) -%} max(length({{ column_name }})) {%- endmacro %} +{% macro max_length(column_name) -%} + {{ adapter.dispatch("max_length", "elementary")(column_name) }} +{%- endmacro %} +{% macro default__max_length(column_name) -%} + max(length({{ column_name }})) +{%- endmacro %} +{% macro fabric__max_length(column_name) -%} max(len({{ column_name }})) {%- endmacro %} -{% macro min_length(column_name) -%} min(length({{ column_name }})) {%- endmacro %} +{% macro min_length(column_name) -%} + {{ adapter.dispatch("min_length", "elementary")(column_name) }} +{%- endmacro %} +{% macro default__min_length(column_name) -%} + min(length({{ column_name }})) +{%- endmacro %} +{% macro fabric__min_length(column_name) -%} min(len({{ column_name }})) {%- endmacro %} -{% macro average_length(column_name) -%} avg(length({{ column_name }})) {%- endmacro %} +{% macro average_length(column_name) -%} + {{ adapter.dispatch("average_length", "elementary")(column_name) }} +{%- endmacro %} +{% macro default__average_length(column_name) -%} + avg(length({{ column_name }})) +{%- endmacro %} +{% macro fabric__average_length(column_name) -%} + avg(len({{ column_name }})) +{%- endmacro %} {% macro missing_count(column_name) %} coalesce( diff --git a/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql index 9a8823441..ecd1d3564 100644 --- a/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql @@ -192,13 +192,11 @@ {%- if timestamp_column %} left join buckets on (edr_bucket_start = start_bucket_in_data) {%- endif %} - {% if dimensions | length > 0 %} - group by - 1, - 2, - {{ elementary.select_dimensions_columns(prefixed_dimensions) }} - {% else %} group by 1, 2 - {% endif %} + {{ + elementary.column_monitoring_group_by( + timestamp_column, dimensions, prefixed_dimensions + ) + }} {%- else %}{{ elementary.empty_column_monitors_cte() }} {%- endif %} @@ -328,6 +326,21 @@ {% endmacro %} +{% macro column_monitoring_group_by( + timestamp_column, dimensions, prefixed_dimensions +) %} + {% if timestamp_column %} + group by + edr_bucket_start, + edr_bucket_end + {% if dimensions | length > 0 %} + , {{ elementary.select_dimensions_columns(prefixed_dimensions) }} + {% endif %} + {% elif dimensions | length > 0 %} + group by {{ elementary.select_dimensions_columns(prefixed_dimensions) }} + {% endif %} +{% endmacro %} + {% macro select_dimensions_columns(dimension_columns, as_prefix="") %} {% set select_statements %} {%- for column in dimension_columns -%} diff --git a/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql index 9bda21f0f..3aa20fe19 100644 --- a/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql @@ -80,9 +80,9 @@ dimension_value, 1 as joiner, min(bucket_end) as dimension_min_bucket_end, - sum(metric_value) + sum(metric_value) as total_metric_value from all_dimension_metrics - group by 1, 2 + group by dimension_value {# Remove outdated dimension values (dimensions with all metrics of 0 in the range of the test time) #} having sum(metric_value) > 0 ), @@ -117,7 +117,11 @@ left join time_filtered_monitored_table on (edr_bucket_start = start_bucket_in_data) - group by 1, 2, 3, 4 + group by + edr_bucket_start, + edr_bucket_end, + start_bucket_in_data, + dimension_value ), {# Merging between the row count and the dimensions buckets #} @@ -212,9 +216,9 @@ ), training_set_dimensions as ( - select distinct dimension_value, sum(metric_value) + select distinct dimension_value, sum(metric_value) as total_metric_value from all_dimension_metrics - group by 1 + group by dimension_value {# Remove outdated dimension values (dimensions with all metrics of 0 in the range of the test time) #} having sum(metric_value) > 0 ), @@ -233,7 +237,7 @@ {{ elementary.edr_cast_as_float(elementary.row_count()) }} as row_count_value from filtered_monitored_table - group by 1, 2 + group by dimension_value ), {# This way we make sure that if a dimension has no rows, it will get a metric with value 0 #} diff --git a/macros/edr/data_monitoring/monitors_query/get_latest_full_refresh.sql b/macros/edr/data_monitoring/monitors_query/get_latest_full_refresh.sql index e4ec96bc0..7c8e563aa 100644 --- a/macros/edr/data_monitoring/monitors_query/get_latest_full_refresh.sql +++ b/macros/edr/data_monitoring/monitors_query/get_latest_full_refresh.sql @@ -1,4 +1,8 @@ {% macro get_latest_full_refresh(model_node) %} + {{ return(adapter.dispatch("get_latest_full_refresh", "elementary")(model_node)) }} +{% endmacro %} + +{% macro default__get_latest_full_refresh(model_node) %} {%- set dbt_run_results_relation = elementary.get_elementary_relation( "dbt_run_results" ) %} @@ -12,3 +16,17 @@ {% endset %} {% do return(elementary.result_value(query)) %} {% endmacro %} + +{% macro fabric__get_latest_full_refresh(model_node) %} + {%- set dbt_run_results_relation = elementary.get_elementary_relation( + "dbt_run_results" + ) %} + {% set query %} + select top 1 generated_at from {{ dbt_run_results_relation }} + where + unique_id = '{{ model_node.unique_id }}' and + full_refresh = cast(1 as bit) + order by generated_at desc + {% endset %} + {% do return(elementary.result_value(query)) %} +{% endmacro %} diff --git a/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql index 3f6f494dc..a16169c61 100644 --- a/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql @@ -255,13 +255,19 @@ {% endmacro %} {% macro get_unified_metrics_query(table_metrics, metric_properties) %} - {%- set metric_name_to_query = {} %} + {{ adapter.dispatch("get_unified_metrics_query", "elementary")(table_metrics, metric_properties) }} +{% endmacro %} + +{% macro default__get_unified_metrics_query(table_metrics, metric_properties) %} + {# This macro is embedded inside a parent CTE ("metrics as (...)"), + so we directly UNION ALL the metric queries without a WITH clause. #} + {%- set metric_queries = [] %} {%- for metric in table_metrics %} {% set metric_query = elementary.get_metric_query(metric, metric_properties) %} - {% do metric_name_to_query.update({metric.name: metric_query}) %} + {% do metric_queries.append(metric_query) %} {%- endfor %} - {% if not metric_name_to_query %} + {% if not metric_queries %} {% if metric_properties.timestamp_column %} {% do return( elementary.empty_table( @@ -288,14 +294,8 @@ {% endif %} {% endif %} - with - {%- for metric_name, metric_query in metric_name_to_query.items() %} - {{ metric_name }} as ({{ metric_query }}){% if not loop.last %},{% endif %} - {%- endfor %} - - {%- for metric_name in metric_name_to_query %} - select * - from {{ metric_name }} + {%- for metric_query in metric_queries %} + {{ metric_query }} {% if not loop.last %} union all {% endif %} @@ -334,35 +334,90 @@ {% endmacro %} {% macro row_count_metric_query(metric, metric_properties) %} - with - row_count_values as ( - select - edr_bucket_start, - edr_bucket_end, - start_bucket_in_data, - case - when start_bucket_in_data is null - then 0 - else {{ elementary.edr_cast_as_float(elementary.row_count()) }} - end as row_count_value - from buckets - left join - time_filtered_monitored_table - on (edr_bucket_start = start_bucket_in_data) - group by 1, 2, 3 - ) + {{ adapter.dispatch("row_count_metric_query", "elementary")(metric, metric_properties) }} +{% endmacro %} +{% macro default__row_count_metric_query(metric, metric_properties) %} + {# CTE-free so it works both at top level and when embedded inside a parent CTE + (required by T-SQL / Fabric where nested CTEs are not allowed). #} select edr_bucket_start, edr_bucket_end, {{ elementary.const_as_string(metric.name) }} as metric_name, {{ elementary.const_as_string("row_count") }} as metric_type, {{ elementary.null_string() }} as source_value, - row_count_value as metric_value - from row_count_values + case + when start_bucket_in_data is null + then 0 + else {{ elementary.edr_cast_as_float(elementary.row_count()) }} + end as metric_value + from buckets + left join + time_filtered_monitored_table + on (edr_bucket_start = start_bucket_in_data) + group by edr_bucket_start, edr_bucket_end, start_bucket_in_data {% endmacro %} {% macro freshness_metric_query(metric, metric_properties) %} + {{ adapter.dispatch("freshness_metric_query", "elementary")(metric, metric_properties) }} +{% endmacro %} + +{#-- Helper: SQL expression for computing freshness as time since last update --#} +{% macro _freshness_lag_expr() %} + {{ + elementary.timediff( + "second", + elementary.lag("timestamp_val") + ~ " over (order by timestamp_val)", + "timestamp_val", + ) + }} +{% endmacro %} + +{#-- Helper: SQL expression for bucket-end freshness (time from last update to bucket end). + Uses dispatched _bucket_end_cap to handle least() vs CASE WHEN across dialects. --#} +{% macro _bucket_end_freshness_expr() %} + {{ adapter.dispatch("_bucket_end_freshness_expr", "elementary")() }} +{% endmacro %} + +{% macro default___bucket_end_freshness_expr() %} + {{ + elementary.timediff( + "second", + elementary.edr_cast_as_timestamp("max(timestamp_val)"), + "least(edr_bucket_end, {})".format( + elementary.current_timestamp_column() + ), + ) + }} +{% endmacro %} + +{% macro fabric___bucket_end_freshness_expr() %} + {# T-SQL does not support least(); use CASE WHEN instead #} + {{ + elementary.timediff( + "second", + elementary.edr_cast_as_timestamp("max(timestamp_val)"), + "case when edr_bucket_end < {} then edr_bucket_end else {} end".format( + elementary.current_timestamp_column(), + elementary.current_timestamp_column(), + ), + ) + }} +{% endmacro %} + +{#-- Helper: final SELECT columns for freshness metric --#} +{% macro _freshness_final_select(metric) %} + select + edr_bucket_start, + edr_bucket_end, + {{ elementary.const_as_string(metric.name) }} as metric_name, + {{ elementary.const_as_string("freshness") }} as metric_type, + {{ elementary.edr_cast_as_string("update_timestamp") }} as source_value, + freshness as metric_value +{% endmacro %} + +{% macro default__freshness_metric_query(metric, metric_properties) %} -- get ordered consecutive update timestamps in the source data with unique_timestamps as ( @@ -376,14 +431,7 @@ consecutive_updates_freshness as ( select timestamp_val as update_timestamp, - {{ - elementary.timediff( - "second", - elementary.lag("timestamp_val") - ~ " over (order by timestamp_val)", - "timestamp_val", - ) - }} as freshness + {{ elementary._freshness_lag_expr() }} as freshness from unique_timestamps ), time_filtered_consecutive_updates_freshness as ( @@ -410,19 +458,11 @@ edr_bucket_start, edr_bucket_end, max(timestamp_val) as update_timestamp, - {{ - elementary.timediff( - "second", - elementary.edr_cast_as_timestamp("max(timestamp_val)"), - "least(edr_bucket_end, {})".format( - elementary.current_timestamp_column() - ), - ) - }} as freshness + {{ elementary._bucket_end_freshness_expr() }} as freshness from buckets cross join unique_timestamps where timestamp_val < edr_bucket_end - group by 1, 2 + group by edr_bucket_start, edr_bucket_end ), -- create a single table with all the freshness values @@ -447,17 +487,64 @@ from bucket_all_freshness_metrics ) - select - edr_bucket_start, - edr_bucket_end, - {{ elementary.const_as_string(metric.name) }} as metric_name, - {{ elementary.const_as_string("freshness") }} as metric_type, - {{ elementary.edr_cast_as_string("update_timestamp") }} as source_value, - freshness as metric_value + {{ elementary._freshness_final_select(metric) }} from bucket_freshness_ranked where row_num = 1 {% endmacro %} +{% macro fabric__freshness_metric_query(metric, metric_properties) %} + {# Fabric / T-SQL: rewrite without CTEs (nested CTEs are not allowed). + All intermediate results are expressed as inline subqueries. + Uses the same helper macros as default__ for shared SQL expressions. #} + {{ elementary._freshness_final_select(metric) }} + from ( + select + *, + row_number() over ( + partition by edr_bucket_end + order by case when freshness is null then 1 else 0 end, freshness desc + ) as row_num + from ( + {# bucketed consecutive updates freshness #} + select edr_bucket_start, edr_bucket_end, update_timestamp, freshness + from buckets + cross join ( + select * + from ( + select + timestamp_val as update_timestamp, + {{ elementary._freshness_lag_expr() }} as freshness + from ( + select distinct monitored_table_timestamp_column as timestamp_val + from partially_time_filtered_monitored_table + ) as unique_timestamps_inner + ) as consecutive_updates_freshness_inner + where update_timestamp >= (select min(edr_bucket_start) from buckets) + ) as time_filtered_cuf + where + update_timestamp >= edr_bucket_start + and update_timestamp < edr_bucket_end + + union all + + {# bucket end freshness #} + select + edr_bucket_start, + edr_bucket_end, + max(timestamp_val) as update_timestamp, + {{ elementary._bucket_end_freshness_expr() }} as freshness + from buckets + cross join ( + select distinct monitored_table_timestamp_column as timestamp_val + from partially_time_filtered_monitored_table + ) as unique_timestamps_end + where timestamp_val < edr_bucket_end + group by edr_bucket_start, edr_bucket_end + ) as bucket_all_freshness_metrics + ) as bucket_freshness_ranked + where row_num = 1 +{% endmacro %} + {% macro event_freshness_metric_query(metric, metric_properties) %} {{ return( @@ -495,7 +582,7 @@ }} as metric_value from buckets left join time_filtered_monitored_table on (edr_bucket_start = start_bucket_in_data) - group by 1, 2 + group by edr_bucket_start, edr_bucket_end {% endmacro %} {% macro clickhouse__event_freshness_metric_query(metric, metric_properties) %} @@ -530,7 +617,7 @@ end as metric_value from buckets left join time_filtered_monitored_table on (edr_bucket_start = start_bucket_in_data) - group by 1, 2 + group by edr_bucket_start, edr_bucket_end {% endmacro %} {% macro get_no_timestamp_event_freshness_query(metric, metric_properties) %} diff --git a/macros/edr/data_monitoring/schema_changes/get_columns_changes_query.sql b/macros/edr/data_monitoring/schema_changes/get_columns_changes_query.sql index 802be264f..4f3b9f49c 100644 --- a/macros/edr/data_monitoring/schema_changes/get_columns_changes_query.sql +++ b/macros/edr/data_monitoring/schema_changes/get_columns_changes_query.sql @@ -20,7 +20,6 @@ from {{ schema_columns_snapshot_relation }} where lower(full_table_name) = lower('{{ full_table_name }}') and detected_at = {{ previous_schema_time_query }} - order by detected_at desc {% endset %} {{ elementary.get_columns_changes_query_generic(full_table_name, cur, pre) }} @@ -33,22 +32,15 @@ include_added=False ) %} {% set cur %} - with baseline as ( - select lower(column_name) as column_name, data_type - from {{ model_baseline_relation }} - ) - - select - columns_snapshot.full_table_name, - lower(columns_snapshot.column_name) as column_name, - columns_snapshot.data_type, - (baseline.column_name IS NULL) as is_new, - {{ elementary.datetime_now_utc_as_timestamp_column() }} as detected_at - from ({{ elementary.get_columns_snapshot_query(model_relation, full_table_name) }}) columns_snapshot - left join baseline on ( - lower(columns_snapshot.column_name) = lower(baseline.column_name) - ) - where lower(columns_snapshot.full_table_name) = lower('{{ full_table_name }}') + {{ + adapter.dispatch( + "get_column_changes_from_baseline_cur", "elementary" + )( + model_relation, + full_table_name, + model_baseline_relation, + ) + }} {% endset %} {% set pre %} @@ -114,7 +106,7 @@ {{ elementary.null_string() }} as pre_data_type, detected_at as detected_at from cur - where is_new = true + where is_new = {{ elementary.edr_boolean_literal(true) }} ), {% endif %} @@ -191,22 +183,21 @@ column_name, 'schema_change' as test_type, change as test_sub_type, - case - when change = 'column_added' - then 'The column "' || column_name || '" was added' - when change = 'column_removed' - then 'The column "' || column_name || '" was removed' - when change = 'type_changed' - then - 'The type of "' - || column_name - || '" was changed from ' - || pre_data_type - || ' to ' - || data_type - else null - end as test_results_description - from all_column_changes {{ dbt_utils.group_by(9) }} + {{ elementary.schema_change_description_column() }} + from all_column_changes + {% if elementary.is_tsql() %} + {#- T-SQL does not support positional GROUP BY references. + Group by the 6 source columns from all_column_changes; + all 9 output columns are deterministic functions of these. -#} + group by + full_table_name, + change, + column_name, + data_type, + pre_data_type, + detected_at + {% else %} {{ dbt_utils.group_by(9) }} + {% endif %} ) @@ -223,3 +214,102 @@ from column_changes_test_results {%- endmacro %} + +{% macro default__get_column_changes_from_baseline_cur( + model_relation, full_table_name, model_baseline_relation +) %} + with + baseline as ( + select lower(column_name) as column_name, data_type + from {{ model_baseline_relation }} + ) + + select + columns_snapshot.full_table_name, + lower(columns_snapshot.column_name) as column_name, + columns_snapshot.data_type, + (baseline.column_name is null) as is_new, + {{ elementary.datetime_now_utc_as_timestamp_column() }} as detected_at + from + ( + {{ elementary.get_columns_snapshot_query(model_relation, full_table_name) }} + ) columns_snapshot + left join + baseline on (lower(columns_snapshot.column_name) = lower(baseline.column_name)) + where lower(columns_snapshot.full_table_name) = lower('{{ full_table_name }}') +{% endmacro %} + +{% macro schema_change_description_column() %} + case + when change = 'column_added' + then + {{ + elementary.edr_concat( + ["'The column \"'", "column_name", "'\" was added'"] + ) + }} + when change = 'column_removed' + then + {{ + elementary.edr_concat( + ["'The column \"'", "column_name", "'\" was removed'"] + ) + }} + when change = 'type_changed' + then + {{ + elementary.edr_concat( + [ + "'The type of \"'", + "column_name", + "'\" was changed from '", + "pre_data_type", + "' to '", + "data_type", + ] + ) + }} + else null + end as test_results_description +{% endmacro %} + +{% macro fabric__get_column_changes_from_baseline_cur( + model_relation, full_table_name, model_baseline_relation +) %} + {#- Fabric / T-SQL does not allow CTEs inside subqueries or derived tables. + get_columns_snapshot_query returns a CTE-based query, so we materialise + its result into a temp table first, then reference it with a plain SELECT. + We pass into_relation so the INTO clause is placed inside the CTE's final + SELECT (the only valid position in T-SQL). -#} + {% set tmp_snapshot = api.Relation.create( + database=model_relation.database, + schema=model_relation.schema, + identifier=model_relation.identifier ~ "__snap_tmp", + type="table", + ) %} + {% do run_query("drop table if exists " ~ tmp_snapshot) %} + {% do run_query( + elementary.get_columns_snapshot_query( + model_relation, full_table_name, into_relation=tmp_snapshot + ) + ) %} + + select + cs.full_table_name, + lower(cs.column_name) as column_name, + cs.data_type, + case + when bl.column_name is null + then {{ elementary.edr_boolean_literal(true) }} + else {{ elementary.edr_boolean_literal(false) }} + end as is_new, + {{ elementary.datetime_now_utc_as_timestamp_column() }} as detected_at + from {{ tmp_snapshot }} cs + left join + ( + select lower(column_name) as column_name, data_type + from {{ model_baseline_relation }} + ) bl + on (lower(cs.column_name) = lower(bl.column_name)) + where lower(cs.full_table_name) = lower('{{ full_table_name }}') +{% endmacro %} diff --git a/macros/edr/data_monitoring/schema_changes/get_columns_snapshot_query.sql b/macros/edr/data_monitoring/schema_changes/get_columns_snapshot_query.sql index 967b3a58e..aa18e8dbb 100644 --- a/macros/edr/data_monitoring/schema_changes/get_columns_snapshot_query.sql +++ b/macros/edr/data_monitoring/schema_changes/get_columns_snapshot_query.sql @@ -1,4 +1,6 @@ -{% macro get_columns_snapshot_query(model_relation, full_table_name) %} +{% macro get_columns_snapshot_query( + model_relation, full_table_name, into_relation=none +) %} {%- set schema_columns_snapshot_relation = elementary.get_elementary_relation( "schema_columns_snapshot" ) %} @@ -93,14 +95,16 @@ column_name, data_type, detected_at, - case - when - {{ elementary.full_column_name() }} - not in ({{ known_columns_query }}) - and full_table_name in ({{ known_tables_query }}) - then true - else false - end as is_new + {{ + elementary.edr_condition_as_boolean( + elementary.full_column_name() + ~ " not in (" + ~ known_columns_query + ~ ") and full_table_name in (" + ~ known_tables_query + ~ ")" + ) + }} as is_new from columns_info ), @@ -122,7 +126,18 @@ is_new, detected_at from columns_snapshot - group by 1, 2, 3, 4, 5, 6, 7 + group by + {{ + elementary.generate_surrogate_key( + ["full_table_name", "column_name", "data_type"] + ) + }}, + {{ elementary.full_column_name() }}, + full_table_name, + column_name, + data_type, + is_new, + detected_at ) select @@ -133,6 +148,7 @@ {{ elementary.edr_cast_as_string("data_type") }} as data_type, {{ elementary.edr_cast_as_bool("is_new") }} as is_new, {{ elementary.edr_cast_as_timestamp("detected_at") }} as detected_at + {% if into_relation %} into {{ into_relation }}{% endif %} from columns_snapshot_with_id {%- endmacro %} diff --git a/macros/edr/materializations/test/failed_row_count.sql b/macros/edr/materializations/test/failed_row_count.sql index fd4aa37ef..a7a23ca0d 100644 --- a/macros/edr/materializations/test/failed_row_count.sql +++ b/macros/edr/materializations/test/failed_row_count.sql @@ -42,9 +42,33 @@ {% endmacro %} {% macro get_failed_row_count_calc_query(failed_row_count_calc) %} + {{ + return( + adapter.dispatch("get_failed_row_count_calc_query", "elementary")( + failed_row_count_calc + ) + ) + }} +{% endmacro %} + +{% macro default__get_failed_row_count_calc_query(failed_row_count_calc) %} with results as ({{ sql }}) select {{ failed_row_count_calc }} as {{ elementary.escape_reserved_keywords("count") }} from results {% endmacro %} + +{% macro fabric__get_failed_row_count_calc_query(failed_row_count_calc) %} + {# Fabric / T-SQL does not support nested CTEs. + We create a temp table from the test SQL, then select from it. + The temp table is session-scoped and cleaned up by on_run_end. #} + {% set tmp_relation = elementary.edr_make_temp_relation(model) %} + {% do run_query( + "select * into " ~ tmp_relation ~ " from (" ~ sql ~ ") as __edr_inner" + ) %} + select + {{ failed_row_count_calc }} + as {{ elementary.escape_reserved_keywords("count") }} + from {{ tmp_relation }} +{% endmacro %} diff --git a/macros/edr/materializations/test/test.sql b/macros/edr/materializations/test/test.sql index 4b1747b44..dfa2e8e6f 100644 --- a/macros/edr/materializations/test/test.sql +++ b/macros/edr/materializations/test/test.sql @@ -21,7 +21,7 @@ {% do elementary.debug_log( test_unique_id ~ ": starting test materialization hook" ) %} - {% if elementary.get_config_var("tests_use_temp_tables") %} + {% if elementary.is_tsql() or elementary.get_config_var("tests_use_temp_tables") %} {% set temp_table_sql = elementary.create_test_result_temp_table() %} {% do context.update({"sql": temp_table_sql}) %} {% do elementary.debug_log(test_unique_id ~ ": created test temp table") %} @@ -167,6 +167,18 @@ {% endmacro %} {% macro query_test_result_rows(sample_limit=none, ignore_passed_tests=false) %} + {{ + return( + adapter.dispatch("query_test_result_rows", "elementary")( + sample_limit=sample_limit, ignore_passed_tests=ignore_passed_tests + ) + ) + }} +{% endmacro %} + +{% macro default__query_test_result_rows( + sample_limit=none, ignore_passed_tests=false +) %} {% if sample_limit == 0 %} {# performance: no need to run a sql query that we know returns an empty list #} {% do return([]) %} {% endif %} @@ -188,6 +200,59 @@ {% do return(elementary.agate_to_dicts(elementary.run_query(query))) %} {% endmacro %} +{% macro fabric__query_test_result_rows(sample_limit=none, ignore_passed_tests=false) %} + {% if sample_limit == 0 %} {% do return([]) %} {% endif %} + + {# Allow setting -1 for unlimited, as none values are stripped from meta in dbt-fusion #} + {% if sample_limit == -1 %} {% set sample_limit = none %} {% endif %} + + {% if ignore_passed_tests and elementary.did_test_pass() %} + {% do elementary.debug_log("Skipping sample query because the test passed.") %} + {% do return([]) %} + {% endif %} + + {# + Fabric / T-SQL does not support LIMIT, and also does not allow CTEs nested + inside derived tables/subqueries. + + Many dbt generic tests (e.g. accepted_values) compile to CTE-based SQL. + To handle sampling efficiently, we materialise the compiled test SQL into a + temp view, then SELECT TOP from that view. This avoids fetching all rows into + Python memory. + + We use a regular view (not a #temp table) because EXEC-based run_query + isolates #temp table scope. The view is dropped after the SELECT. + #} + {% set view_name = ( + "edr_test_sample_" + ~ modules.datetime.datetime.now().strftime("%Y%m%d%H%M%S") + ~ "_" + ~ range(10000) + | random + ) %} + {# Use the elementary package schema (guaranteed to exist) rather than + model.schema (per-worker test audit schema) or target.schema (base schema), + which may not have been created yet in parallel CI runs. #} + {% set _edr_db, _edr_schema = elementary.get_package_database_and_schema() %} + {% set view_schema = _edr_schema if _edr_schema else target.schema %} + {% set full_view_name = view_schema ~ "." ~ view_name %} + + {# Create view from the compiled test SQL #} + {% do run_query("create view " ~ full_view_name ~ " as " ~ sql) %} + + {% set query %} + select {% if sample_limit is not none %} top {{ sample_limit }} {% endif %} * + from {{ full_view_name }} + {% endset %} + + {% set result = elementary.agate_to_dicts(elementary.run_query(query)) %} + + {# Clean up the temp view #} + {% do run_query("drop view if exists " ~ full_view_name) %} + + {% do return(result) %} +{% endmacro %} + {% macro get_columns_to_exclude_from_sampling(flattened_test) %} {% set columns_to_exclude = [] %} diff --git a/macros/edr/system/system_utils/buckets_cte.sql b/macros/edr/system/system_utils/buckets_cte.sql index 693812169..b45b2b999 100644 --- a/macros/edr/system/system_utils/buckets_cte.sql +++ b/macros/edr/system/system_utils/buckets_cte.sql @@ -231,6 +231,32 @@ {{ return(complete_buckets_cte) }} {% endmacro %} +{% macro fabric__complete_buckets_cte( + time_bucket, + bucket_end_expr, + min_bucket_start_expr, + max_bucket_end_expr +) -%} + {# Fabric / T-SQL: use inline tally via VALUES cross-join instead of a recursive CTE. + This avoids a WITH clause so the result can safely be embedded in a subquery + or inside another CTE without triggering the T-SQL nested-CTE restriction. + Supports up to 10 000 buckets (10^4). #} + {%- set complete_buckets_cte %} + select + {{ elementary.edr_timeadd(time_bucket.period, "num * " ~ time_bucket.count, min_bucket_start_expr) }} as edr_bucket_start, + {{ elementary.edr_timeadd(time_bucket.period, "(num + 1) * " ~ time_bucket.count, min_bucket_start_expr) }} as edr_bucket_end + from ( + select (row_number() over (order by (select null))) - 1 as num + from (values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)) t1(val) + cross join (values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)) t2(val) + cross join (values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)) t3(val) + cross join (values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)) t4(val) + ) as integers + where {{ elementary.edr_timeadd(time_bucket.period, "(num + 1) * " ~ time_bucket.count, min_bucket_start_expr) }} <= {{ max_bucket_end_expr }} + {%- endset %} + {{ return(complete_buckets_cte) }} +{% endmacro %} + {% macro dremio__complete_buckets_cte( time_bucket, bucket_end_expr, min_bucket_start_expr, max_bucket_end_expr ) %} diff --git a/macros/edr/system/system_utils/empty_table.sql b/macros/edr/system/system_utils/empty_table.sql index ea67e0a53..9ddcb055f 100644 --- a/macros/edr/system/system_utils/empty_table.sql +++ b/macros/edr/system/system_utils/empty_table.sql @@ -223,7 +223,10 @@ {% macro dummy_values() %} + {{ return(adapter.dispatch("dummy_values", "elementary")()) }} +{% endmacro %} +{% macro default__dummy_values() %} {%- set dummy_values = { "string": "dummy_string", "long_string": "this_is_just_a_long_dummy_string", @@ -233,7 +236,20 @@ "float": 123456789.99, "timestamp": "2091-02-17", } %} - {{ return(dummy_values) }} +{% endmacro %} +{# T-SQL does not have boolean literals True/False. + Use 1 which can be cast to bit. #} +{% macro fabric__dummy_values() %} + {%- set dummy_values = { + "string": "dummy_string", + "long_string": "this_is_just_a_long_dummy_string", + "boolean": 1, + "int": 123456789, + "bigint": 31474836478, + "float": 123456789.99, + "timestamp": "2091-02-17", + } %} + {{ return(dummy_values) }} {% endmacro %} diff --git a/macros/edr/system/system_utils/full_names.sql b/macros/edr/system/system_utils/full_names.sql index 3a2c27170..00e7cdc2a 100644 --- a/macros/edr/system/system_utils/full_names.sql +++ b/macros/edr/system/system_utils/full_names.sql @@ -5,11 +5,7 @@ {% macro default__full_table_name(alias) -%} {% if alias is defined %} {%- set alias_dot = alias ~ "." %} {% endif %} upper( - {{ alias_dot }}database_name - || '.' - || {{ alias_dot }}schema_name - || '.' - || {{ alias_dot }}table_name + {{ elementary.edr_concat([alias_dot ~ 'database_name', "'.'", alias_dot ~ 'schema_name', "'.'", alias_dot ~ 'table_name']) }} ) {%- endmacro %} @@ -25,7 +21,7 @@ {%- endmacro %} {% macro default__full_schema_name() -%} - upper(database_name || '.' || schema_name) + upper({{ elementary.edr_concat(["database_name", "'.'", "schema_name"]) }}) {%- endmacro %} {% macro clickhouse__full_schema_name() -%} @@ -40,7 +36,7 @@ {% macro default__full_column_name() -%} upper( - database_name || '.' || schema_name || '.' || table_name || '.' || column_name + {{ elementary.edr_concat(["database_name", "'.'", "schema_name", "'.'", "table_name", "'.'", "column_name"]) }} ) {%- endmacro %} @@ -79,6 +75,17 @@ {% endmacro %} +{% macro fabric__full_name_split(part_name) %} + {# T-SQL: use PARSENAME which splits dotted names (parts numbered right-to-left). + PARSENAME returns nvarchar which Fabric does not support, so cast to varchar. #} + {%- if part_name == "database_name" -%} {%- set part_index = 3 -%} + {%- elif part_name == "schema_name" -%} {%- set part_index = 2 -%} + {%- elif part_name == "table_name" -%} {%- set part_index = 1 -%} + {%- else -%} {{ return("") }} + {%- endif -%} + cast(replace(parsename(full_table_name, {{ part_index }}), '"', '') as varchar(256)) as {{ part_name }} +{% endmacro %} + {% macro bigquery__full_name_split(part_name) %} {%- if part_name == "database_name" -%} {%- set part_index = 0 %} {%- elif part_name == "schema_name" -%} {%- set part_index = 1 %} diff --git a/macros/edr/system/system_utils/get_config_var.sql b/macros/edr/system/system_utils/get_config_var.sql index 166185db8..10e9f7501 100644 --- a/macros/edr/system/system_utils/get_config_var.sql +++ b/macros/edr/system/system_utils/get_config_var.sql @@ -25,6 +25,69 @@ {% do return(var_value) %} {% endmacro %} +{# Render a boolean config var as a SQL-compatible boolean expression. + Standard SQL uses TRUE/FALSE but T-SQL does not support bare boolean literals. + This macro outputs '1=1' or '1=0' which works across all dialects. #} +{% macro render_bool_config_var(var_name, negate=false) %} + {% set val = elementary.get_config_var(var_name) %} + {% if negate %} {% if val %} (1 = 0) {% else %} (1 = 1){% endif %} + {% else %} {% if val %} (1 = 1) {% else %} (1 = 0){% endif %} + {% endif %} +{% endmacro %} + +{# Render a SQL boolean literal that works across all dialects. + Standard SQL uses TRUE/FALSE but T-SQL requires cast(1/0 as bit). #} +{% macro edr_boolean_literal(value) %} + {{ return(adapter.dispatch("edr_boolean_literal", "elementary")(value)) }} +{% endmacro %} + +{% macro default__edr_boolean_literal(value) %} + {% if value %} true {% else %} false{% endif %} +{% endmacro %} + +{% macro fabric__edr_boolean_literal(value) %} + {% if value %} cast(1 as bit) {% else %} cast(0 as bit){% endif %} +{% endmacro %} + +{# Render a SQL condition as a boolean column value. + Produces: case when then TRUE else FALSE end + On T-SQL, TRUE/FALSE become cast(1/0 as bit). + If condition is none/empty, defaults to FALSE. #} +{% macro edr_condition_as_boolean(condition) %} + case + when {{ condition or "1=0" }} + then {{ elementary.edr_boolean_literal(true) }} + else {{ elementary.edr_boolean_literal(false) }} + end +{% endmacro %} + +{# Compare a SQL expression to TRUE. Works across all dialects including T-SQL (bit). #} +{% macro edr_is_true(expr) %} + {{ return(adapter.dispatch("edr_is_true", "elementary")(expr)) }} +{% endmacro %} + +{% macro default__edr_is_true(expr) %} {{ expr }} = true {% endmacro %} + +{% macro fabric__edr_is_true(expr) %} {{ expr }} = cast(1 as bit) {% endmacro %} + +{# Compare a SQL expression to FALSE. Works across all dialects including T-SQL (bit). #} +{% macro edr_is_false(expr) %} + {{ return(adapter.dispatch("edr_is_false", "elementary")(expr)) }} +{% endmacro %} + +{% macro default__edr_is_false(expr) %} {{ expr }} = false {% endmacro %} + +{% macro fabric__edr_is_false(expr) %} {{ expr }} = cast(0 as bit) {% endmacro %} + +{# Returns true if the current adapter uses T-SQL dialect (Fabric or SQL Server). #} +{% macro is_tsql() %} + {{ return(adapter.dispatch("is_tsql", "elementary")()) }} +{% endmacro %} + +{% macro default__is_tsql() %} {{ return(false) }} {% endmacro %} + +{% macro fabric__is_tsql() %} {{ return(true) }} {% endmacro %} + {% macro get_default_config(var_name) %} {{ return(adapter.dispatch("get_default_config", "elementary")()) }} {%- endmacro -%} diff --git a/macros/edr/tests/test_execution_sla.sql b/macros/edr/tests/test_execution_sla.sql index ae23fc25f..aaac4240e 100644 --- a/macros/edr/tests/test_execution_sla.sql +++ b/macros/edr/tests/test_execution_sla.sql @@ -291,12 +291,12 @@ successful_runs_today, case when sla_status = 'MET_SLA' - then false + then {{ elementary.edr_boolean_literal(false) }} {# If deadline hasn't passed, don't fail yet #} - {% if deadline_passed %} when not true then false - {% else %} when not false then false + {% if not deadline_passed %} + when 1 = 1 then {{ elementary.edr_boolean_literal(false) }} {% endif %} - else true + else {{ elementary.edr_boolean_literal(true) }} end as is_failure, case when sla_status = 'NOT_RUN' @@ -335,6 +335,6 @@ successful_runs_today, result_description from final_result - where is_failure = true + where is_failure = {{ elementary.edr_boolean_literal(true) }} {% endmacro %} diff --git a/macros/edr/tests/test_utils/get_anomaly_query.sql b/macros/edr/tests/test_utils/get_anomaly_query.sql index 7cf191b4a..314285f41 100644 --- a/macros/edr/tests/test_utils/get_anomaly_query.sql +++ b/macros/edr/tests/test_utils/get_anomaly_query.sql @@ -1,15 +1,42 @@ {%- macro get_anomaly_query(flattened_test=none) -%} + {{- return(adapter.dispatch("get_anomaly_query", "elementary")(flattened_test)) -}} +{%- endmacro -%} + +{%- macro default__get_anomaly_query(flattened_test=none) -%} {%- set query -%} select * from ({{ elementary.get_read_anomaly_scores_query(flattened_test) }}) results - where is_anomalous = true + where {{ elementary.edr_is_true('is_anomalous') }} {%- endset -%} {{- return(query) -}} {%- endmacro -%} +{%- macro fabric__get_anomaly_query(flattened_test=none) -%} + {#- T-SQL does not allow CTEs inside subqueries / derived tables. + Pass the filter directly into the final SELECT of the CTE chain. -#} + {{- + return( + elementary.get_read_anomaly_scores_query( + flattened_test, + additional_where=elementary.edr_is_true("is_anomalous"), + ) + ) + -}} +{%- endmacro -%} + {%- macro get_anomaly_query_for_dimension_anomalies(flattened_test=none) -%} + {{- + return( + adapter.dispatch( + "get_anomaly_query_for_dimension_anomalies", "elementary" + )(flattened_test) + ) + -}} +{%- endmacro -%} + +{%- macro default__get_anomaly_query_for_dimension_anomalies(flattened_test=none) -%} {%- set dimension_values_query -%} select distinct dimension_value from ({{ elementary.get_read_anomaly_scores_query(flattened_test) }}) results - where is_anomalous = true + where {{ elementary.edr_is_true('is_anomalous') }} {%- endset -%} {% set dimension_anomalies_query -%} @@ -20,7 +47,29 @@ {{- return(dimension_anomalies_query) -}} {%- endmacro -%} -{% macro get_read_anomaly_scores_query(flattened_test=none) %} +{%- macro fabric__get_anomaly_query_for_dimension_anomalies(flattened_test=none) -%} + {#- T-SQL: CTEs cannot appear inside subqueries or IN (...) clauses. + Build a single CTE chain that adds an extra CTE to identify anomalous + dimension values, then filter the final SELECT by that set. -#} + {{- + return( + elementary.get_read_anomaly_scores_query( + flattened_test, + extra_cte="anomalous_dimensions as (select distinct dimension_value from final_results where " + ~ elementary.edr_is_true("is_anomalous") + ~ ")", + additional_where="dimension_value in (select dimension_value from anomalous_dimensions)", + ) + ) + -}} +{%- endmacro -%} + +{% macro get_read_anomaly_scores_query( + flattened_test=none, + additional_where=none, + select_override=none, + extra_cte=none +) %} {% if not flattened_test %} {% set flattened_test = elementary.flatten_test(model) %} {% endif %} @@ -71,12 +120,9 @@ anomaly_scores_with_is_anomalous as ( select *, -case when - ( - {{ elementary.anomaly_score_condition(test_configuration) }} - ) - and bucket_end > {{ elementary.edr_timeadd('day', backfill_period, 'max_bucket_end') }} - then TRUE else FALSE end as is_anomalous +{{ elementary.edr_condition_as_boolean( + '(' ~ elementary.anomaly_score_condition(test_configuration) ~ ') and bucket_end > ' ~ elementary.edr_timeadd('day', backfill_period, 'max_bucket_end') + ) }} as is_anomalous from anomaly_scores ), @@ -86,17 +132,17 @@ case when training_avg as average, {# when there is an anomaly we would want to use the last value of the metric (lag), otherwise visually the expectations would look out of bounds #} case - when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' = 'spike' then + when {{ elementary.edr_is_true('is_anomalous') }} and '{{ test_configuration.anomaly_direction }}' = 'spike' then {{ elementary.lag('metric_value') }} over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end) - when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' != 'spike' then + when {{ elementary.edr_is_true('is_anomalous') }} and '{{ test_configuration.anomaly_direction }}' != 'spike' then {{ elementary.lag('min_metric_value') }} over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end) when '{{ test_configuration.anomaly_direction }}' = 'spike' then metric_value else min_metric_value end as min_value, case - when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' = 'drop' then + when {{ elementary.edr_is_true('is_anomalous') }} and '{{ test_configuration.anomaly_direction }}' = 'drop' then {{ elementary.lag('metric_value') }} over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end) - when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' != 'drop' then + when {{ elementary.edr_is_true('is_anomalous') }} and '{{ test_configuration.anomaly_direction }}' != 'drop' then {{ elementary.lag('max_metric_value') }} over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end) when '{{ test_configuration.anomaly_direction }}' = 'drop' then metric_value else max_metric_value @@ -105,11 +151,12 @@ case when bucket_end as end_time, * from anomaly_scores_with_is_anomalous - order by bucket_end, dimension_value ) + {% if extra_cte %}, {{ extra_cte }} {% endif %} - select * from final_results + {{ select_override if select_override else 'select * ' }} from final_results where {{ test_configuration.exclude_final_results }} + {% if additional_where %} and {{ additional_where }} {% endif %} {%- endset -%} {{- return(anomaly_query) -}} {% endmacro %} @@ -124,16 +171,21 @@ case when {%- macro is_score_anomalous_condition(sensitivity, anomaly_direction) -%} {%- set spikes_only_metrics = ["freshness", "event_freshness"] -%} - case - when metric_name in {{ elementary.strings_list_to_tuple(spikes_only_metrics) }} - then anomaly_score > {{ sensitivity }} - else - {{ + ( + ( + metric_name in {{ elementary.strings_list_to_tuple(spikes_only_metrics) }} + and anomaly_score > {{ sensitivity }} + ) + or ( + metric_name + not in {{ elementary.strings_list_to_tuple(spikes_only_metrics) }} + and {{ elementary.set_directional_anomaly( anomaly_direction, anomaly_score, sensitivity ) }} - end + ) + ) {%- endmacro -%} {%- macro avg_percent_anomalous_condition( diff --git a/macros/utils/cross_db_utils/concat.sql b/macros/utils/cross_db_utils/concat.sql index c10432be1..625cea9ab 100644 --- a/macros/utils/cross_db_utils/concat.sql +++ b/macros/utils/cross_db_utils/concat.sql @@ -1,6 +1,15 @@ -{%- macro edr_concat(val1, val2) -%} - concat( - {{ elementary.edr_cast_as_string(val1) }}, - {{ elementary.edr_cast_as_string(val2) }} - ) -{%- endmacro -%} +{#- Cross-adapter concat that wraps dbt.concat(). + Accepts a list of SQL expressions to concatenate. + On Fabric the result is cast to varchar(4000) because CONCAT() returns nvarchar + which Fabric Warehouse does not support. + + NOTE: varchar(4000) is a known limitation and may truncate very long strings. -#} +{% macro edr_concat(fields) %} + {{ return(adapter.dispatch("edr_concat", "elementary")(fields)) }} +{% endmacro %} + +{% macro default__edr_concat(fields) %} {{ dbt.concat(fields) }} {% endmacro %} + +{% macro fabric__edr_concat(fields) %} + cast({{ dbt.concat(fields) }} as varchar(4000)) +{% endmacro %} diff --git a/macros/utils/cross_db_utils/current_timestamp.sql b/macros/utils/cross_db_utils/current_timestamp.sql index e441eb527..af9b8cd6c 100644 --- a/macros/utils/cross_db_utils/current_timestamp.sql +++ b/macros/utils/cross_db_utils/current_timestamp.sql @@ -84,6 +84,14 @@ cast(current_timestamp at time zone 'UTC' as timestamp(6)) {%- endmacro -%} +{% macro fabric__edr_current_timestamp() -%} + cast(getdate() as datetime2(6)) +{%- endmacro -%} + +{% macro fabric__edr_current_timestamp_in_utc() -%} + cast(sysutcdatetime() as datetime2(6)) +{%- endmacro -%} + {% macro dremio__edr_current_timestamp() -%} current_timestamp() {%- endmacro -%} {% macro dremio__edr_current_timestamp_in_utc() -%} diff --git a/macros/utils/cross_db_utils/date_trunc.sql b/macros/utils/cross_db_utils/date_trunc.sql index 674aba84e..74844e59b 100644 --- a/macros/utils/cross_db_utils/date_trunc.sql +++ b/macros/utils/cross_db_utils/date_trunc.sql @@ -20,3 +20,7 @@ {% macro bigquery__edr_date_trunc(date_part, date_expression) %} timestamp_trunc(cast({{ date_expression }} as timestamp), {{ date_part }}) {% endmacro %} + +{% macro fabric__edr_date_trunc(datepart, date_expression) %} + datetrunc({{ datepart }}, {{ date_expression }}) +{% endmacro %} diff --git a/macros/utils/cross_db_utils/dateadd.sql b/macros/utils/cross_db_utils/dateadd.sql index 209edea8e..b1226e258 100644 --- a/macros/utils/cross_db_utils/dateadd.sql +++ b/macros/utils/cross_db_utils/dateadd.sql @@ -24,6 +24,10 @@ This override outputs just TIMESTAMPADD(...) as an expression (no "select" prefix). #} +{% macro fabric__edr_dateadd(datepart, interval, from_date_or_timestamp) %} + dateadd({{ datepart }}, {{ interval }}, {{ from_date_or_timestamp }}) +{% endmacro %} + {% macro dremio__edr_dateadd(datepart, interval, from_date_or_timestamp) %} {% set datepart = datepart | lower %} {% if datepart == "year" %} diff --git a/macros/utils/cross_db_utils/datediff.sql b/macros/utils/cross_db_utils/datediff.sql index bc0ed16e8..c66b35778 100644 --- a/macros/utils/cross_db_utils/datediff.sql +++ b/macros/utils/cross_db_utils/datediff.sql @@ -272,6 +272,10 @@ }} {% endmacro %} +{% macro fabric__edr_datediff(first_date, second_date, date_part) %} + datediff({{ date_part }}, {{ first_date }}, {{ second_date }}) +{% endmacro %} + {% macro dremio__edr_datediff(first_date, second_date, date_part) %} {%- set seconds_diff_expr -%} cast(unix_timestamp(substr(cast(({{ second_date }}) as varchar), 1, 19)) - diff --git a/macros/utils/cross_db_utils/day_of_week.sql b/macros/utils/cross_db_utils/day_of_week.sql index c0dce168f..212857263 100644 --- a/macros/utils/cross_db_utils/day_of_week.sql +++ b/macros/utils/cross_db_utils/day_of_week.sql @@ -63,6 +63,10 @@ to_char({{ date_expr }}, 'DAY') {% endmacro %} +{% macro fabric__edr_day_of_week_expression(date_expr) %} + cast(datename(weekday, {{ date_expr }}) as varchar(30)) +{% endmacro %} + -- fmt: off {% macro clickhouse__edr_day_of_week_expression(date_expr) %} formatDateTime({{ date_expr }}, '%W') diff --git a/macros/utils/cross_db_utils/generate_elementary_profile_args.sql b/macros/utils/cross_db_utils/generate_elementary_profile_args.sql index 52ba0a935..fdfb54156 100644 --- a/macros/utils/cross_db_utils/generate_elementary_profile_args.sql +++ b/macros/utils/cross_db_utils/generate_elementary_profile_args.sql @@ -256,6 +256,48 @@ ) %} {% endmacro %} +{% macro fabric__generate_elementary_profile_args( + method, elementary_database, elementary_schema +) %} + {% set parameters = [ + _parameter("type", "fabric"), + _parameter("driver", target.driver), + _parameter("server", target.server), + _parameter("port", target.port), + _parameter("database", elementary_database), + _parameter("schema", elementary_schema), + ] %} + {% if method == "cli" %} {% do parameters.append(_parameter("method", "cli")) %} + {% elif method == "service-principal" %} + {% do parameters.append(_parameter("method", "service-principal")) %} + {% do parameters.append(_parameter("tenant_id", "")) %} + {% do parameters.append(_parameter("client_id", "")) %} + {% do parameters.append(_parameter("client_secret", "")) %} + {% else %} + {% do parameters.append(_parameter("user", target.user)) %} + {% do parameters.append(_parameter("password", "")) %} + {% endif %} + {% do parameters.append(_parameter("threads", target.threads)) %} + {% do return(parameters) %} +{% endmacro %} + +{% macro sqlserver__generate_elementary_profile_args( + method, elementary_database, elementary_schema +) %} + {% set parameters = [ + _parameter("type", "sqlserver"), + _parameter("driver", target.driver), + _parameter("server", target.server), + _parameter("port", target.port), + _parameter("database", elementary_database), + _parameter("schema", elementary_schema), + _parameter("user", target.user), + _parameter("password", ""), + ] %} + {% do parameters.append(_parameter("threads", target.threads)) %} + {% do return(parameters) %} +{% endmacro %} + {% macro duckdb__generate_elementary_profile_args( method, elementary_database, elementary_schema ) %} diff --git a/macros/utils/cross_db_utils/generate_surrogate_key.sql b/macros/utils/cross_db_utils/generate_surrogate_key.sql index 83cf9f87e..ac77b5386 100644 --- a/macros/utils/cross_db_utils/generate_surrogate_key.sql +++ b/macros/utils/cross_db_utils/generate_surrogate_key.sql @@ -18,7 +18,6 @@ {%- endmacro -%} {%- macro default__generate_surrogate_key(fields) -%} - {% set concat_macro = dbt.concat or dbt_utils.concat %} {% set hash_macro = dbt.hash or dbt_utils.hash %} {% set default_null_value = "" %} @@ -35,7 +34,8 @@ ) -%} {%- if not loop.last %} {%- do field_sqls.append("'-'") -%} {%- endif -%} {%- endfor -%} - {{ hash_macro(concat_macro(field_sqls)) }} + {%- set concat_result -%}{{ elementary.edr_concat(field_sqls) }}{%- endset -%} + {{ hash_macro(concat_result | trim) }} {%- endmacro -%} {%- macro clickhouse__generate_surrogate_key(fields) -%} diff --git a/macros/utils/cross_db_utils/hour_of_day.sql b/macros/utils/cross_db_utils/hour_of_day.sql index de34d2a1e..395bfbd86 100644 --- a/macros/utils/cross_db_utils/hour_of_day.sql +++ b/macros/utils/cross_db_utils/hour_of_day.sql @@ -28,3 +28,7 @@ {% macro snowflake__edr_hour_of_day_expression(date_expr) %} hour({{ date_expr }}) {% endmacro %} + +{% macro fabric__edr_hour_of_day_expression(date_expr) %} + datepart(hour, {{ date_expr }}) +{% endmacro %} diff --git a/macros/utils/cross_db_utils/hour_of_week.sql b/macros/utils/cross_db_utils/hour_of_week.sql index a29c2d7a3..a4758f395 100644 --- a/macros/utils/cross_db_utils/hour_of_week.sql +++ b/macros/utils/cross_db_utils/hour_of_week.sql @@ -78,6 +78,13 @@ {% endmacro %} -- fmt: on +{% macro fabric__edr_hour_of_week_expression(date_expr) %} + concat( + cast(datename(weekday, {{ date_expr }}) as {{ elementary.edr_type_string() }}), + cast(datepart(hour, {{ date_expr }}) as {{ elementary.edr_type_string() }}) + ) +{% endmacro %} + {% macro duckdb__edr_hour_of_week_expression(date_expr) %} concat( cast(dayname({{ date_expr }}) as {{ elementary.edr_type_string() }}), diff --git a/macros/utils/cross_db_utils/incremental_strategy.sql b/macros/utils/cross_db_utils/incremental_strategy.sql index 4d5d11101..ed3391fa9 100644 --- a/macros/utils/cross_db_utils/incremental_strategy.sql +++ b/macros/utils/cross_db_utils/incremental_strategy.sql @@ -20,6 +20,10 @@ {% do return("merge") %} {% endmacro %} +{%- macro fabric__get_default_incremental_strategy() %} + {% do return("merge") %} +{% endmacro %} + {% macro default__get_default_incremental_strategy() %} {% do return(none) %} {% endmacro %} diff --git a/macros/utils/cross_db_utils/multi_value_in.sql b/macros/utils/cross_db_utils/multi_value_in.sql index a069c631e..cfb1cb0c5 100644 --- a/macros/utils/cross_db_utils/multi_value_in.sql +++ b/macros/utils/cross_db_utils/multi_value_in.sql @@ -39,6 +39,20 @@ ) {%- endmacro -%} +{%- macro fabric__edr_multi_value_in(source_cols, target_cols, target_table) -%} + {# T-SQL does not support tuple IN subqueries like (col1, col2) IN (SELECT ...). + Use EXISTS with a correlated subquery instead. #} + exists ( + select 1 + from {{ target_table }} as __edr_mvi + where + {%- for i in range(source_cols | length) %} + __edr_mvi.{{ target_cols[i] }} = {{ source_cols[i] }} + {%- if not loop.last %} and {% endif %} + {%- endfor %} + ) +{%- endmacro -%} + {%- macro redshift__edr_multi_value_in(source_cols, target_cols, target_table) -%} {# Redshift doesn't support multi-column IN subqueries (tuple IN) like: (col1, col2) IN (SELECT col1, col2 FROM table) diff --git a/macros/utils/cross_db_utils/target_database.sql b/macros/utils/cross_db_utils/target_database.sql index fef9f4d28..5361f5d86 100644 --- a/macros/utils/cross_db_utils/target_database.sql +++ b/macros/utils/cross_db_utils/target_database.sql @@ -22,3 +22,7 @@ {% macro dremio__target_database() %} {% do return(target.database) %} {% endmacro %} {% macro duckdb__target_database() %} {% do return(target.database) %} {% endmacro %} + +{% macro fabric__target_database() %} {% do return(target.database) %} {% endmacro %} + +{% macro sqlserver__target_database() %} {% do return(target.database) %} {% endmacro %} diff --git a/macros/utils/cross_db_utils/time_trunc.sql b/macros/utils/cross_db_utils/time_trunc.sql index 48b4e17e3..eb42a5fd5 100644 --- a/macros/utils/cross_db_utils/time_trunc.sql +++ b/macros/utils/cross_db_utils/time_trunc.sql @@ -19,3 +19,10 @@ {% macro bigquery__edr_time_trunc(date_part, date_expression) %} timestamp_trunc(cast({{ date_expression }} as timestamp), {{ date_part }}) {% endmacro %} + +{% macro fabric__edr_time_trunc(date_part, date_expression) %} + datetrunc( + {{ date_part }}, + cast({{ date_expression }} as {{ elementary.edr_type_timestamp() }}) + ) +{% endmacro %} diff --git a/macros/utils/cross_db_utils/timeadd.sql b/macros/utils/cross_db_utils/timeadd.sql index 0901b2f30..08e7465c0 100644 --- a/macros/utils/cross_db_utils/timeadd.sql +++ b/macros/utils/cross_db_utils/timeadd.sql @@ -72,6 +72,14 @@ + {{ elementary.edr_cast_as_int(number) }} * interval '1 {{ date_part }}' {% endmacro %} +{% macro fabric__edr_timeadd(date_part, number, timestamp_expression) %} + dateadd( + {{ date_part }}, + {{ elementary.edr_cast_as_int(number) }}, + {{ elementary.edr_cast_as_timestamp(timestamp_expression) }} + ) +{% endmacro %} + {% macro dremio__edr_timeadd(date_part, number, timestamp_expression) %} timestampadd( {{ date_part }}, diff --git a/macros/utils/cross_db_utils/to_char.sql b/macros/utils/cross_db_utils/to_char.sql index e53419c2f..ae9e40347 100644 --- a/macros/utils/cross_db_utils/to_char.sql +++ b/macros/utils/cross_db_utils/to_char.sql @@ -23,3 +23,10 @@ {%- else %}, 'YYYY-MM-DD HH:MI:SS') {%- endif %} {% endmacro %} + +{% macro fabric__edr_to_char(column, format) %} + convert(varchar, {{ column }} + {%- if format %}, {{ format }}) + {%- else %}, 120) + {%- endif %} +{% endmacro %} diff --git a/macros/utils/data_types/data_type.sql b/macros/utils/data_types/data_type.sql index 8317973ee..96a371b1d 100644 --- a/macros/utils/data_types/data_type.sql +++ b/macros/utils/data_types/data_type.sql @@ -10,6 +10,8 @@ {% macro bigquery__edr_type_bool() %} {% do return("BOOL") %} {% endmacro %} +{% macro fabric__edr_type_bool() %} {% do return("bit") %} {% endmacro %} + {%- macro edr_type_string() -%} {{ return(adapter.dispatch("edr_type_string", "elementary")()) }} @@ -44,6 +46,8 @@ {% macro trino__edr_type_string() %} {% do return("varchar") %} {% endmacro %} +{% macro fabric__edr_type_string() %} {% do return("varchar(4096)") %} {% endmacro %} + {%- macro edr_type_long_string() -%} {{ return(adapter.dispatch("edr_type_long_string", "elementary")()) }} @@ -65,6 +69,12 @@ {% set long_string = "text" %} {{ return(long_string) }} {%- endmacro -%} +{#- T-SQL: varchar(4096) is too small for compiled query text. + Use varchar(max) which supports up to 2 GB. -#} +{%- macro fabric__edr_type_long_string() -%} + {% do return("varchar(max)") %} +{%- endmacro -%} + {% macro edr_type_bigint() %} {% set macro = dbt.type_bigint or dbt_utils.type_bigint %} @@ -132,3 +142,5 @@ {% macro trino__edr_type_timestamp() %} timestamp(6) {% endmacro %} {% macro dremio__edr_type_timestamp() %} timestamp {% endmacro %} + +{% macro fabric__edr_type_timestamp() %} datetime2(6) {% endmacro %} diff --git a/macros/utils/data_types/data_type_list.sql b/macros/utils/data_types/data_type_list.sql index 3b21c8966..0b471fe2c 100644 --- a/macros/utils/data_types/data_type_list.sql +++ b/macros/utils/data_types/data_type_list.sql @@ -285,6 +285,48 @@ {% endmacro %} +{% macro fabric__data_type_list(data_type) %} + + {% set string_list = [ + "varchar", + "nvarchar", + "char", + "nchar", + "text", + "ntext", + "string", + ] | list %} + {% set numeric_list = [ + "int", + "bigint", + "smallint", + "tinyint", + "decimal", + "numeric", + "float", + "real", + "money", + "smallmoney", + ] | list %} + {% set timestamp_list = [ + "datetime", + "datetime2", + "date", + "smalldatetime", + "datetimeoffset", + "time", + ] | list %} + {% set boolean_list = ["bit"] | list %} + + {%- if data_type == "string" %} {{ return(string_list) }} + {%- elif data_type == "numeric" %} {{ return(numeric_list) }} + {%- elif data_type == "timestamp" %} {{ return(timestamp_list) }} + {%- elif data_type == "boolean" %} {{ return(boolean_list) }} + {%- else %} {{ return([]) }} + {%- endif %} + +{% endmacro %} + {% macro dremio__data_type_list(data_type) %} {% set string_list = ["varchar", "character varying"] | list %} {% set numeric_list = [ diff --git a/macros/utils/data_types/get_normalized_data_type.sql b/macros/utils/data_types/get_normalized_data_type.sql index fc5b14c80..c4a24fda7 100644 --- a/macros/utils/data_types/get_normalized_data_type.sql +++ b/macros/utils/data_types/get_normalized_data_type.sql @@ -150,6 +150,66 @@ {% endmacro %} +{% macro sqlserver__get_normalized_data_type(exact_data_type) %} + {# SQL Server data type synonyms (true synonyms only): + https://learn.microsoft.com/en-us/sql/t-sql/data-types/data-type-synonyms-transact-sql + Unlike Fabric, SQL Server supports DATETIME, SMALLDATETIME, MONEY, SMALLMONEY + as first-class types - INFORMATION_SCHEMA returns them as-is. #} + {% set exact_data_type_to_data_type_returned_by_the_info_schema = { + "NVARCHAR": "VARCHAR", + "NCHAR": "CHAR", + "NTEXT": "VARCHAR", + "TEXT": "VARCHAR", + "ROWVERSION": "TIMESTAMP", + "DOUBLE PRECISION": "FLOAT", + "REAL": "FLOAT", + "INTEGER": "INT", + "BOOLEAN": "BIT", + } %} + {%- if exact_data_type in exact_data_type_to_data_type_returned_by_the_info_schema %} + {{ + return( + exact_data_type_to_data_type_returned_by_the_info_schema[ + exact_data_type + ] + ) + }} + {%- else %} {{ return(exact_data_type) }} + {%- endif %} +{% endmacro %} + + +{% macro fabric__get_normalized_data_type(exact_data_type) %} + {# understanding Fabric / SQL Server data type synonyms: + https://learn.microsoft.com/en-us/sql/t-sql/data-types/data-type-synonyms-transact-sql #} + {% set exact_data_type_to_data_type_returned_by_the_info_schema = { + "NVARCHAR": "VARCHAR", + "NCHAR": "CHAR", + "NTEXT": "VARCHAR", + "TEXT": "VARCHAR", + "STRING": "VARCHAR", + "ROWVERSION": "TIMESTAMP", + "DATETIME": "DATETIME2", + "SMALLDATETIME": "DATETIME2", + "MONEY": "DECIMAL", + "SMALLMONEY": "DECIMAL", + "DOUBLE PRECISION": "FLOAT", + "REAL": "FLOAT", + "INTEGER": "INT", + "BOOLEAN": "BIT", + } %} + {%- if exact_data_type in exact_data_type_to_data_type_returned_by_the_info_schema %} + {{ + return( + exact_data_type_to_data_type_returned_by_the_info_schema[ + exact_data_type + ] + ) + }} + {%- else %} {{ return(exact_data_type) }} + {%- endif %} +{% endmacro %} + {% macro postgres__get_normalized_data_type(exact_data_type) %} {# understanding Postgres data type synonyms: https://www.postgresql.org/docs/current/datatype.html #} diff --git a/macros/utils/data_types/try_cast_column_to_timestamp.sql b/macros/utils/data_types/try_cast_column_to_timestamp.sql index 78b2e5162..cbd1d547d 100644 --- a/macros/utils/data_types/try_cast_column_to_timestamp.sql +++ b/macros/utils/data_types/try_cast_column_to_timestamp.sql @@ -26,3 +26,17 @@ {% macro postgres__try_cast_column_to_timestamp(table_relation, timestamp_column) %} {{ return(false) }} {% endmacro %} + +{% macro fabric__try_cast_column_to_timestamp(table_relation, timestamp_column) %} + {%- set query %} + select top 1 1 + from {{ table_relation }} + where {{ timestamp_column }} is not null + and try_cast({{ timestamp_column }} as {{ elementary.edr_type_timestamp() }}) is not null + {%- endset %} + + {%- set result = elementary.result_value(query) %} + {%- if result is not none %} {{ return(true) }} {%- endif %} + {{ return(false) }} + +{% endmacro %} diff --git a/macros/utils/sql_utils/list_concat_with_separator.sql b/macros/utils/sql_utils/list_concat_with_separator.sql index 8cb5f97cd..1bc833560 100644 --- a/macros/utils/sql_utils/list_concat_with_separator.sql +++ b/macros/utils/sql_utils/list_concat_with_separator.sql @@ -28,7 +28,8 @@ {% do new_list.append(elementary.edr_quote(separator)) %} {% endif %} {% endfor %} - {{ return(elementary.join_list(new_list, " || ")) }} + {%- set result -%}{{ elementary.edr_concat(new_list) }}{%- endset -%} + {{ return(result | trim) }} {% endmacro %} {% macro clickhouse__list_concat_with_separator( diff --git a/macros/utils/table_operations/create_temp_table.sql b/macros/utils/table_operations/create_temp_table.sql index cf6a20dc5..95f68093c 100644 --- a/macros/utils/table_operations/create_temp_table.sql +++ b/macros/utils/table_operations/create_temp_table.sql @@ -24,6 +24,54 @@ {{ return(temp_table_relation) }} {% endmacro %} +{% macro fabric__create_temp_table(database_name, schema_name, table_name, sql_query) %} + {# + T-SQL (Fabric + SQL Server) does not allow CTEs inside subqueries, so the usual + CREATE TABLE … AS (sql) pattern fails when sql contains a CTE + (e.g. the accepted_values test). + + Workaround: + 1. CREATE VIEW … AS — CTEs are valid in view definitions + 2. SELECT * INTO FROM — simple SELECT, no CTE + 3. DROP VIEW … + + We use a regular table (not #temp) because the EXEC scope isolation + of SQL Server makes #temp tables invisible to the caller. + + sqlserver__ is not needed here because dbt-sqlserver declares + dependencies=["fabric"], so this macro is found automatically + via the dispatch chain: sqlserver__ → fabric__ → default__. + #} + {% set table_exists, table_relation = dbt.get_or_create_relation( + database=database_name, + schema=schema_name, + identifier=table_name, + type="table", + ) -%} + + {% if table_exists %} {% do adapter.drop_relation(table_relation) %} {% endif %} + + {# Helper view — short suffix to stay within identifier-length limits #} + {% set vw_identifier = (table_name ~ "_vw")[:128] %} + {% set vw_relation = api.Relation.create( + database=database_name, + schema=schema_name, + identifier=vw_identifier, + type="view", + ) %} + + {# SQL Server does not allow database prefix on DROP VIEW / CREATE VIEW #} + {% set vw_ref = vw_relation.include(database=false) %} + {% set tbl_ref = table_relation.include(database=false) %} + + {% do elementary.run_query("DROP VIEW IF EXISTS " ~ vw_ref) %} + {% do elementary.run_query("CREATE VIEW " ~ vw_ref ~ " AS " ~ sql_query) %} + {% do elementary.run_query("SELECT * INTO " ~ tbl_ref ~ " FROM " ~ vw_ref) %} + {% do elementary.run_query("DROP VIEW " ~ vw_ref) %} + + {{ return(table_relation) }} +{% endmacro %} + {% macro snowflake__create_temp_table( database_name, schema_name, table_name, sql_query ) %} diff --git a/macros/utils/table_operations/get_relation_max_length.sql b/macros/utils/table_operations/get_relation_max_length.sql index 8c7964db9..7dfaf4a2c 100644 --- a/macros/utils/table_operations/get_relation_max_length.sql +++ b/macros/utils/table_operations/get_relation_max_length.sql @@ -38,3 +38,12 @@ {% macro dremio__get_relation_max_name_length(temporary, relation, sql_query) %} {{ return(128) }} {% endmacro %} + +{% macro fabric__get_relation_max_name_length(temporary, relation, sql_query) %} + {# SQL Server / Fabric limits identifiers to 128 chars. dbt-sqlserver + may prefix the schema name onto the table identifier when creating + relations, so we must reserve room for the full schema + separator. + Typical CI schema is ~60 chars; 128 - 60 - 1 = 67. We use 60 to + leave headroom for longer schemas and __dbt_tmp_vw suffixes. #} + {{ return(60) }} +{% endmacro %} diff --git a/macros/utils/table_operations/insert_as_select.sql b/macros/utils/table_operations/insert_as_select.sql index 7b4bc4ea8..2770a78d1 100644 --- a/macros/utils/table_operations/insert_as_select.sql +++ b/macros/utils/table_operations/insert_as_select.sql @@ -1,6 +1,16 @@ {% macro insert_as_select(table_relation, select_query) %} {# when calling this macro, you need to add depends on ref comment #} {# ref_model and select_query need to have the same columns #} + {{ + return( + adapter.dispatch("insert_as_select", "elementary")( + table_relation, select_query + ) + ) + }} +{% endmacro %} + +{% macro default__insert_as_select(table_relation, select_query) %} {%- set insert_query %} insert into {{ table_relation }} with tmp_table as ( @@ -9,7 +19,27 @@ select * from tmp_table {{ elementary.get_query_settings() }} {%- endset %} - {{ return(insert_query) }} +{% endmacro %} +{% macro fabric__insert_as_select(table_relation, select_query) %} + {#- Fabric does not support INSERT ... EXEC or CTEs after INSERT INTO. + Wrap the select_query in a temp view, then INSERT ... SELECT from it. + Fabric also forbids 3-part names in DROP VIEW, so use schema.identifier only. + + NOTE: The replace("'", "''") escaping is minimal — if select_query already + contains escaped quotes (e.g. from user-defined test configs), this could + double-escape and produce invalid SQL. In practice the queries passed here + are machine-generated and do not contain pre-escaped quotes. -#} + {%- set tmp_view_name = ( + table_relation.schema ~ "." ~ table_relation.identifier ~ "__ins_vw" + ) -%} + {%- set insert_query %} + IF OBJECT_ID('{{ tmp_view_name }}', 'V') IS NOT NULL DROP VIEW {{ tmp_view_name }}; + EXEC('CREATE VIEW {{ tmp_view_name }} AS {{ select_query | replace("'", "''") }}'); + INSERT INTO {{ table_relation }} + SELECT * FROM {{ tmp_view_name }}; + DROP VIEW {{ tmp_view_name }}; + {%- endset %} + {{ return(insert_query) }} {% endmacro %} diff --git a/macros/utils/table_operations/insert_rows.sql b/macros/utils/table_operations/insert_rows.sql index aa999b855..da23e9c5a 100644 --- a/macros/utils/table_operations/insert_rows.sql +++ b/macros/utils/table_operations/insert_rows.sql @@ -246,6 +246,10 @@ {{- return(string_value | replace("'", "''")) -}} {%- endmacro -%} +{%- macro fabric__escape_special_chars(string_value) -%} + {{- return(string_value | replace("'", "''")) -}} +{%- endmacro -%} + {%- macro dremio__escape_special_chars(string_value) -%} {{- return( @@ -279,8 +283,13 @@ {%- endmacro -%} {%- macro render_value(value, data_type) -%} + {{- adapter.dispatch("render_value", "elementary")(value, data_type) -}} +{%- endmacro -%} + +{%- macro default__render_value(value, data_type) -%} {%- if value is defined and value is not none -%} - {%- if value is number -%} {{- value -}} + {%- if value is boolean -%} {{- elementary.edr_boolean_literal(value) -}} + {%- elif value is number -%} {{- value -}} {%- elif value is string and data_type == "timestamp" -%} {{- elementary.edr_cast_as_timestamp( @@ -295,3 +304,7 @@ {%- else -%} null {%- endif -%} {%- endmacro -%} + +{# Note: Python booleans pass Jinja's "is number" test, so we check + "is boolean" first. edr_boolean_literal renders the correct SQL literal + per adapter (TRUE/FALSE for standard SQL, cast(1/0 as bit) for T-SQL). #} diff --git a/macros/utils/table_operations/make_temp_relation.sql b/macros/utils/table_operations/make_temp_relation.sql index b3fb1574e..42116f69f 100644 --- a/macros/utils/table_operations/make_temp_relation.sql +++ b/macros/utils/table_operations/make_temp_relation.sql @@ -10,6 +10,49 @@ {% do return(dbt.make_temp_relation(base_relation, suffix)) %} {% endmacro %} +{% macro fabric__edr_make_temp_relation(base_relation, suffix) %} + {# + In some contexts (notably test materializations), callers may pass the dbt node + (a dict) rather than a Relation. dbt.make_temp_relation expects a Relation and + will fail with "dict object has no attribute incorporate". + + For Fabric / T-SQL, we can safely create a regular table relation in the active + target schema and treat it as our "temp" relation. + #} + {% if not suffix %} + {% set suffix = elementary.get_timestamped_table_suffix() %} + {% endif %} + + {% if base_relation is mapping %} + {# Prefer the Elementary package schema, which we know exists in the project. #} + {% set package_database, package_schema = ( + elementary.get_package_database_and_schema() + ) %} + + {% set base_identifier = ( + base_relation.get("alias") + or base_relation.get("name") + or "edr_tmp" + ) %} + {% set tmp_identifier = elementary.table_name_with_suffix( + base_identifier, suffix + ) %} + {% set tmp_relation = api.Relation.create( + database=package_database + or base_relation.get("database") + or target.database, + schema=package_schema + or base_relation.get("schema") + or target.schema, + identifier=tmp_identifier, + type="table", + ) %} + {% do return(tmp_relation) %} + {% endif %} + + {% do return(dbt.make_temp_relation(base_relation, suffix)) %} +{% endmacro %} + {% macro spark__edr_make_temp_relation(base_relation, suffix) %} {% set tmp_identifier = elementary.table_name_with_suffix( base_relation.identifier, suffix diff --git a/models/edr/alerts/alerts_anomaly_detection.sql b/models/edr/alerts/alerts_anomaly_detection.sql index d91e1b1e6..82ef095ca 100644 --- a/models/edr/alerts/alerts_anomaly_detection.sql +++ b/models/edr/alerts/alerts_anomaly_detection.sql @@ -30,7 +30,7 @@ with result_rows from elementary_test_results where - {{ not elementary.get_config_var("disable_test_alerts") }} + {{ elementary.render_bool_config_var("disable_test_alerts", negate=true) }} and lower(status) != 'pass' {%- if elementary.get_config_var("disable_warn_alerts") -%} and lower(status) != 'warn' diff --git a/models/edr/alerts/alerts_dbt_models.sql b/models/edr/alerts/alerts_dbt_models.sql index a2b00a30e..deb90f0dc 100644 --- a/models/edr/alerts/alerts_dbt_models.sql +++ b/models/edr/alerts/alerts_dbt_models.sql @@ -75,7 +75,7 @@ select full_refresh from error_models where - {{ not elementary.get_config_var("disable_model_alerts") }} + {{ elementary.render_bool_config_var("disable_model_alerts", negate=true) }} and lower(status) != 'success' {%- if elementary.get_config_var("disable_skipped_model_alerts") -%} and lower(status) != 'skipped' diff --git a/models/edr/alerts/alerts_dbt_source_freshness.sql b/models/edr/alerts/alerts_dbt_source_freshness.sql index 97a656a9e..d48c019bf 100644 --- a/models/edr/alerts/alerts_dbt_source_freshness.sql +++ b/models/edr/alerts/alerts_dbt_source_freshness.sql @@ -38,5 +38,8 @@ select from results join sources on results.unique_id = sources.unique_id where - {{ not elementary.get_config_var("disable_source_freshness_alerts") }} - and lower(status) != 'pass' + {{ + elementary.render_bool_config_var( + "disable_source_freshness_alerts", negate=true + ) + }} and lower(status) != 'pass' diff --git a/models/edr/alerts/alerts_dbt_tests.sql b/models/edr/alerts/alerts_dbt_tests.sql index cb0610ff2..28dd1b20e 100644 --- a/models/edr/alerts/alerts_dbt_tests.sql +++ b/models/edr/alerts/alerts_dbt_tests.sql @@ -30,7 +30,7 @@ with result_rows from elementary_test_results where - {{ not elementary.get_config_var("disable_test_alerts") }} + {{ elementary.render_bool_config_var("disable_test_alerts", negate=true) }} and lower(status) != 'pass' {% if elementary.get_config_var("disable_warn_alerts") %} and lower(status) != 'warn' diff --git a/models/edr/alerts/alerts_schema_changes.sql b/models/edr/alerts/alerts_schema_changes.sql index 7c718542b..925059284 100644 --- a/models/edr/alerts/alerts_schema_changes.sql +++ b/models/edr/alerts/alerts_schema_changes.sql @@ -36,7 +36,7 @@ with result_rows from elementary_test_results where - {{ not elementary.get_config_var("disable_test_alerts") }} + {{ elementary.render_bool_config_var("disable_test_alerts", negate=true) }} and lower(status) != 'pass' {%- if elementary.get_config_var("disable_warn_alerts") -%} and lower(status) != 'warn' diff --git a/models/edr/data_monitoring/anomaly_detection/anomaly_threshold_sensitivity.sql b/models/edr/data_monitoring/anomaly_detection/anomaly_threshold_sensitivity.sql index d6cea9a50..090886da9 100644 --- a/models/edr/data_monitoring/anomaly_detection/anomaly_threshold_sensitivity.sql +++ b/models/edr/data_monitoring/anomaly_detection/anomaly_threshold_sensitivity.sql @@ -13,27 +13,20 @@ with training_avg as metric_avg, training_stddev as metric_stddev, anomaly_score, - case - when abs(anomaly_score) >= 1.5 then true else false - end as {{ elementary.edr_quote_column("is_anomaly_1_5") }}, - case - when abs(anomaly_score) >= 2 then true else false - end as {{ elementary.edr_quote_column("is_anomaly_2") }}, - case - when abs(anomaly_score) >= 2.5 then true else false - end as {{ elementary.edr_quote_column("is_anomaly_2_5") }}, - case - when abs(anomaly_score) >= 3 then true else false - end as {{ elementary.edr_quote_column("is_anomaly_3") }}, - case - when abs(anomaly_score) >= 3.5 then true else false - end as {{ elementary.edr_quote_column("is_anomaly_3_5") }}, - case - when abs(anomaly_score) >= 4 then true else false - end as {{ elementary.edr_quote_column("is_anomaly_4") }}, - case - when abs(anomaly_score) >= 4.5 then true else false - end as {{ elementary.edr_quote_column("is_anomaly_4_5") }} + {{ elementary.edr_condition_as_boolean("abs(anomaly_score) >= 1.5") }} + as {{ elementary.edr_quote_column("is_anomaly_1_5") }}, + {{ elementary.edr_condition_as_boolean("abs(anomaly_score) >= 2") }} + as {{ elementary.edr_quote_column("is_anomaly_2") }}, + {{ elementary.edr_condition_as_boolean("abs(anomaly_score) >= 2.5") }} + as {{ elementary.edr_quote_column("is_anomaly_2_5") }}, + {{ elementary.edr_condition_as_boolean("abs(anomaly_score) >= 3") }} + as {{ elementary.edr_quote_column("is_anomaly_3") }}, + {{ elementary.edr_condition_as_boolean("abs(anomaly_score) >= 3.5") }} + as {{ elementary.edr_quote_column("is_anomaly_3_5") }}, + {{ elementary.edr_condition_as_boolean("abs(anomaly_score) >= 4") }} + as {{ elementary.edr_quote_column("is_anomaly_4") }}, + {{ elementary.edr_condition_as_boolean("abs(anomaly_score) >= 4.5") }} + as {{ elementary.edr_quote_column("is_anomaly_4_5") }} from metrics_anomaly_score where abs(anomaly_score) >= 1.5 diff --git a/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql b/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql index 56672b5a6..8f9b8b42c 100644 --- a/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql +++ b/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql @@ -43,18 +43,26 @@ with order by bucket_start asc rows between unbounded preceding and current row ) as training_start - from data_monitoring_metrics {{ dbt_utils.group_by(12) }} - ), - - metrics_anomaly_score as ( - - select + from data_monitoring_metrics + group by id, full_table_name, column_name, dimension, dimension_value, metric_name, + metric_value, + source_value, + bucket_start, + bucket_end, + bucket_duration_hours, + updated_at + ), + + time_window_scored as ( + + select + *, case when training_stddev is null then null @@ -63,7 +71,21 @@ with when training_stddev = 0 then 0 -- Stationary data case - valid, all values are identical else (metric_value - training_avg) / (training_stddev) - end as anomaly_score, + end as anomaly_score + from time_window_aggregation + + ), + + metrics_anomaly_score as ( + + select + id, + full_table_name, + column_name, + dimension, + dimension_value, + metric_name, + anomaly_score, metric_value as latest_metric_value, bucket_start, bucket_end, @@ -73,7 +95,7 @@ with training_end, training_set_size, max(updated_at) as updated_at - from time_window_aggregation + from time_window_scored where metric_value is not null and training_avg is not null @@ -87,8 +109,22 @@ with ), ) }} - {{ dbt_utils.group_by(15) }} - order by bucket_end desc + group by + id, + full_table_name, + column_name, + dimension, + dimension_value, + metric_name, + anomaly_score, + metric_value, + bucket_start, + bucket_end, + training_avg, + training_stddev, + training_start, + training_end, + training_set_size ), @@ -111,13 +147,12 @@ with training_end, training_set_size, updated_at, - case - when - abs(anomaly_score) - > {{ elementary.get_config_var("anomaly_sensitivity") }} - then true - else false - end as is_anomaly + {{ + elementary.edr_condition_as_boolean( + "abs(anomaly_score) > " + ~ elementary.get_config_var("anomaly_sensitivity") + ) + }} as is_anomaly from metrics_anomaly_score ) diff --git a/models/edr/dbt_artifacts/dbt_artifacts_hashes.sql b/models/edr/dbt_artifacts/dbt_artifacts_hashes.sql index 629c4a921..88918224b 100644 --- a/models/edr/dbt_artifacts/dbt_artifacts_hashes.sql +++ b/models/edr/dbt_artifacts/dbt_artifacts_hashes.sql @@ -19,4 +19,3 @@ union all {% endif %} {% endfor %} -order by metadata_hash diff --git a/models/edr/run_results/model_run_results.sql b/models/edr/run_results/model_run_results.sql index 8b45c618b..27a5d29ee 100644 --- a/models/edr/run_results/model_run_results.sql +++ b/models/edr/run_results/model_run_results.sql @@ -1,5 +1,18 @@ {{ config(materialized="view", bind=False) }} +{%- set day_partition = elementary.edr_time_trunc("day", "run_results.generated_at") -%} +{%- set day_window = ( + "over (partition by " + ~ day_partition + ~ " order by run_results.generated_at asc rows between unbounded preceding and unbounded following)" +) -%} +{%- set first_inv_cond = ( + "first_value(invocation_id) " ~ day_window ~ " = invocation_id" +) -%} +{%- set last_inv_cond = ( + "last_value(invocation_id) " ~ day_window ~ " = invocation_id" +) -%} + with dbt_run_results as (select * from {{ ref("dbt_run_results") }}), @@ -36,30 +49,10 @@ select row_number() over ( partition by run_results.unique_id order by run_results.generated_at desc ) as model_invocation_reverse_index, - case - when - first_value(invocation_id) over ( - partition by - {{ elementary.edr_time_trunc("day", "run_results.generated_at") }} - order by run_results.generated_at asc - rows between unbounded preceding and unbounded following - ) - = invocation_id - then true - else false - end as is_the_first_invocation_of_the_day, - case - when - last_value(invocation_id) over ( - partition by - {{ elementary.edr_time_trunc("day", "run_results.generated_at") }} - order by run_results.generated_at asc - rows between unbounded preceding and unbounded following - ) - = invocation_id - then true - else false - end as is_the_last_invocation_of_the_day + {{ elementary.edr_condition_as_boolean(first_inv_cond) }} + as is_the_first_invocation_of_the_day, + {{ elementary.edr_condition_as_boolean(last_inv_cond) }} + as is_the_last_invocation_of_the_day from dbt_run_results run_results join dbt_models models on run_results.unique_id = models.unique_id diff --git a/models/edr/system/monitors_runs.sql b/models/edr/system/monitors_runs.sql index 5e19281ac..f1ea6799c 100644 --- a/models/edr/system/monitors_runs.sql +++ b/models/edr/system/monitors_runs.sql @@ -13,7 +13,7 @@ with max(bucket_end) as last_bucket_end, min(bucket_end) as first_bucket_end from data_monitoring_metrics - group by 1, 2, 3, 4 + group by full_table_name, column_name, metric_name, metric_properties )