diff --git a/.github/workflows/test-all-warehouses.yml b/.github/workflows/test-all-warehouses.yml index 83e06b56c..89b077b59 100644 --- a/.github/workflows/test-all-warehouses.yml +++ b/.github/workflows/test-all-warehouses.yml @@ -49,7 +49,16 @@ jobs: ${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) || fromJSON('["latest_official", "latest_pre"]') }} warehouse-type: - [postgres, clickhouse, trino, dremio, spark, duckdb, sqlserver] + [ + postgres, + clickhouse, + trino, + dremio, + spark, + duckdb, + sqlserver, + vertica, + ] exclude: # latest_pre is only tested on postgres - dbt-version: latest_pre @@ -64,6 +73,8 @@ jobs: warehouse-type: duckdb - dbt-version: latest_pre warehouse-type: sqlserver + - dbt-version: latest_pre + warehouse-type: vertica uses: ./.github/workflows/test-warehouse.yml with: warehouse-type: ${{ matrix.warehouse-type }} diff --git a/.github/workflows/test-warehouse.yml b/.github/workflows/test-warehouse.yml index baff275a6..b90c243be 100644 --- a/.github/workflows/test-warehouse.yml +++ b/.github/workflows/test-warehouse.yml @@ -20,6 +20,7 @@ on: - duckdb - sqlserver - fabric + - vertica elementary-ref: type: string required: false @@ -151,8 +152,26 @@ jobs: if: startsWith(inputs.warehouse-type, 'databricks') && inputs.dbt-version < '1.7.0' run: pip install databricks-sql-connector==2.9.3 + - name: Reject unsupported Vertica + Fusion combination + if: inputs.warehouse-type == 'vertica' && inputs.dbt-version == 'fusion' + run: | + echo "::error::dbt Fusion does not support third-party adapters such as dbt-vertica." + exit 1 + + - name: Install dbt-vertica + if: inputs.warehouse-type == 'vertica' && inputs.dbt-version != 'fusion' + run: | + # dbt-vertica pins dbt-core~=1.8 which lacks native support for the + # "arguments" test property used by the integration-test framework. + # Install dbt-vertica without deps, then install the requested + # dbt-core version separately (dbt-vertica works fine with newer + # dbt-core versions). + pip install dbt-vertica --no-deps + pip install vertica-python \ + "dbt-core${{ (!startsWith(inputs.dbt-version, 'latest') && format('=={0}', inputs.dbt-version)) || '' }}" + - name: Install dbt - if: ${{ inputs.dbt-version != 'fusion' }} + if: ${{ inputs.dbt-version != 'fusion' && inputs.warehouse-type != 'vertica' }} run: pip install${{ (inputs.dbt-version == 'latest_pre' && ' --pre') || '' }} "dbt-core${{ (!startsWith(inputs.dbt-version, 'latest') && format('=={0}', inputs.dbt-version)) || '' }}" @@ -198,6 +217,18 @@ jobs: ln -sfn ${{ github.workspace }}/dbt-data-reliability dbt_project/dbt_packages/elementary pip install -r requirements.txt + - name: Start Vertica + if: inputs.warehouse-type == 'vertica' + working-directory: ${{ env.TESTS_DIR }} + run: docker compose -f docker-compose-vertica.yml up -d + + - name: Wait for Vertica to be ready + if: inputs.warehouse-type == 'vertica' + run: | + echo "Waiting for Vertica to be healthy..." + timeout 60 bash -c 'until [ "$(docker inspect --format="{{.State.Health.Status}}" vertica)" == "healthy" ]; do echo "Waiting..."; sleep 5; done' + echo "Vertica is ready!" + - name: Check DWH connection working-directory: ${{ env.TESTS_DIR }} run: | diff --git a/.gitignore b/.gitignore index d5c73ee4a..e11a772a8 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ dbt_internal_packages/ logs/ scripts/ +.github/fixtures/.user.yml .idea .DS_Store diff --git a/integration_tests/dbt_project/macros/ci_schemas_cleanup/test_drop_stale_ci_schemas.sql b/integration_tests/dbt_project/macros/ci_schemas_cleanup/test_drop_stale_ci_schemas.sql index 88006f3bb..b3165387d 100644 --- a/integration_tests/dbt_project/macros/ci_schemas_cleanup/test_drop_stale_ci_schemas.sql +++ b/integration_tests/dbt_project/macros/ci_schemas_cleanup/test_drop_stale_ci_schemas.sql @@ -98,3 +98,9 @@ {% set safe_schema = schema_name | replace("`", "``") %} {% do run_query("CREATE DATABASE IF NOT EXISTS `" ~ safe_schema ~ "`") %} {% endmacro %} + +{% macro vertica__edr_create_schema(database, schema_name) %} + {#- Vertica DDL is auto-committed; an explicit adapter.commit() would + fail with "no transaction in progress". -#} + {% do run_query("CREATE SCHEMA IF NOT EXISTS " ~ schema_name) %} +{% endmacro %} diff --git a/integration_tests/dbt_project/macros/clear_env.sql b/integration_tests/dbt_project/macros/clear_env.sql index de935417a..4ed014dd6 100644 --- a/integration_tests/dbt_project/macros/clear_env.sql +++ b/integration_tests/dbt_project/macros/clear_env.sql @@ -82,3 +82,9 @@ {% do run_query("DROP SCHEMA IF EXISTS " ~ schema_name ~ " CASCADE") %} {% do adapter.commit() %} {% endmacro %} + +{% macro vertica__edr_drop_schema(database_name, schema_name) %} + {#- Vertica DDL is auto-committed; an explicit adapter.commit() would + fail with "no transaction in progress". -#} + {% do run_query("DROP SCHEMA IF EXISTS " ~ schema_name ~ " CASCADE") %} +{% endmacro %} diff --git a/integration_tests/dbt_project/macros/schema_utils/list_schemas.sql b/integration_tests/dbt_project/macros/schema_utils/list_schemas.sql index e5986f810..7e1afe0c7 100644 --- a/integration_tests/dbt_project/macros/schema_utils/list_schemas.sql +++ b/integration_tests/dbt_project/macros/schema_utils/list_schemas.sql @@ -54,3 +54,12 @@ {% for row in results %} {% do schemas.append(row[0]) %} {% endfor %} {% do return(schemas) %} {% endmacro %} + +{% macro vertica__edr_list_schemas(database) %} + {#- Vertica's v_catalog.schemata is scoped to the current database and + does not have a database_name filter column. -#} + {% set results = run_query("SELECT schema_name FROM v_catalog.schemata") %} + {% set schemas = [] %} + {% for row in results %} {% do schemas.append(row[0]) %} {% endfor %} + {% do return(schemas) %} +{% endmacro %} diff --git a/integration_tests/dbt_project/macros/schema_utils/schema_exists.sql b/integration_tests/dbt_project/macros/schema_utils/schema_exists.sql index 06fab2592..eb65dba34 100644 --- a/integration_tests/dbt_project/macros/schema_utils/schema_exists.sql +++ b/integration_tests/dbt_project/macros/schema_utils/schema_exists.sql @@ -64,3 +64,14 @@ {% set result = run_query("SHOW DATABASES LIKE '" ~ safe_schema ~ "'") %} {% do return(result | length > 0) %} {% endmacro %} + +{% macro vertica__edr_schema_exists(database, schema_name) %} + {#- Vertica's v_catalog.schemata is scoped to the current database. -#} + {% set safe_schema = schema_name | replace("'", "''") %} + {% set result = run_query( + "SELECT schema_name FROM v_catalog.schemata WHERE lower(schema_name) = lower('" + ~ safe_schema + ~ "')" + ) %} + {% do return(result | length > 0) %} +{% endmacro %} diff --git a/integration_tests/dbt_project/macros/vertica_seed_override.sql b/integration_tests/dbt_project/macros/vertica_seed_override.sql new file mode 100644 index 000000000..ab9c3a25b --- /dev/null +++ b/integration_tests/dbt_project/macros/vertica_seed_override.sql @@ -0,0 +1,23 @@ +{#- Override the dbt-vertica seed helper so that each seed file uses a + unique reject-table name. The upstream macro hardcodes + ``seed_rejects`` for every seed, which causes "Object already exists" + errors when ``dbt seed`` processes more than one file. -#} +{% macro copy_local_load_csv_rows(model, agate_table) %} + {% set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) %} + + {#- Build a per-seed reject table name so concurrent seeds don't clash. -#} + {% set reject_table = model["alias"] ~ "_rejects" %} + + {% set sql %} + copy {{ this.render() }} + ({{ cols_sql }}) + from local '{{ agate_table.original_abspath }}' + delimiter ',' + enclosed by '"' + skip 1 + abort on error + rejected data as table {{ this.without_identifier() }}.{{ reject_table }}; + {% endset %} + + {{ return(sql) }} +{% endmacro %} diff --git a/integration_tests/docker-compose-vertica.yml b/integration_tests/docker-compose-vertica.yml new file mode 100644 index 000000000..d418d667a --- /dev/null +++ b/integration_tests/docker-compose-vertica.yml @@ -0,0 +1,40 @@ +services: + vertica: + environment: + VERTICA_USER: dbadmin + VERTICA_PASS: vertica + VERTICA_HOST: localhost + VERTICA_PORT: 5433 + VERTICA_DATABASE: elementary_tests + VERTICA_SCHEMA: ${SCHEMA_NAME} + APP_DB_USER: dbadmin + APP_DB_PASSWORD: vertica + TZ: "America/Los_Angeles" + VERTICA_DB_NAME: elementary_tests + VMART_ETL_SCRIPT: "" + container_name: vertica + image: ghcr.io/ratiopbc/vertica-ce + ports: + - "5433:5433" + - "5444:5444" + deploy: + mode: global + ulimits: + nofile: + soft: 65536 + hard: 65536 + volumes: + - type: volume + source: vertica-data + target: /data + healthcheck: + test: + [ + "CMD-SHELL", + "/opt/vertica/bin/vsql -U dbadmin -w vertica -c 'SELECT 1;'", + ] + interval: 5s + timeout: 5s + retries: 10 +volumes: + vertica-data: diff --git a/integration_tests/profiles/profiles.yml.j2 b/integration_tests/profiles/profiles.yml.j2 index e2f387847..86cf5eae7 100644 --- a/integration_tests/profiles/profiles.yml.j2 +++ b/integration_tests/profiles/profiles.yml.j2 @@ -75,6 +75,18 @@ elementary_tests: trust_cert: true threads: 4 + vertica: &vertica + type: vertica + host: localhost + port: 5433 + username: dbadmin + password: vertica + database: elementary_tests + schema: {{ schema_name }} + connection_load_balance: false + retries: 2 + threads: 4 + # ── Cloud targets (secrets substituted at CI time) ───────────────── snowflake: &snowflake @@ -150,7 +162,7 @@ elementary_tests: elementary: target: postgres outputs: -{%- set targets = ['postgres', 'clickhouse', 'trino', 'dremio', 'spark', 'duckdb', 'sqlserver', 'snowflake', 'bigquery', 'redshift', 'databricks_catalog', 'athena', 'fabric'] %} +{%- set targets = ['postgres', 'clickhouse', 'trino', 'dremio', 'spark', 'duckdb', 'sqlserver', 'vertica', 'snowflake', 'bigquery', 'redshift', 'databricks_catalog', 'athena', 'fabric'] %} {%- for t in targets %} {{ t }}: <<: *{{ t }} diff --git a/integration_tests/tests/data_seeder.py b/integration_tests/tests/data_seeder.py index 383e6d9b8..232afb665 100644 --- a/integration_tests/tests/data_seeder.py +++ b/integration_tests/tests/data_seeder.py @@ -4,7 +4,7 @@ from contextlib import contextmanager from pathlib import Path from types import MappingProxyType -from typing import TYPE_CHECKING, ClassVar, Dict, Generator, List, Mapping +from typing import TYPE_CHECKING, ClassVar, Dict, Generator, List, Mapping, Optional from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner from logger import get_logger @@ -121,7 +121,7 @@ class BaseSqlInsertSeeder(ABC): def __init__( self, - query_runner: "AdapterQueryRunner", + query_runner: Optional["AdapterQueryRunner"], schema: str, seeds_dir_path: Path, ) -> None: @@ -454,3 +454,119 @@ def _create_table_sql(self, fq_table: str, col_defs: str) -> str: f"CREATE TABLE {fq_table} ({col_defs}) " f"ENGINE = MergeTree() ORDER BY tuple()" ) + + +class VerticaDirectSeeder(BaseSqlInsertSeeder): + """Fast seeder for Vertica: executes CREATE TABLE + INSERT directly. + + Bypasses ``dbt seed`` (which uses Vertica's COPY command) because COPY + rejects empty CSV fields for non-string columns instead of treating them + as NULL. Direct INSERT statements handle NULL correctly. + + Uses a *direct* ``vertica_python`` connection (rather than dbt's adapter + connection pool) so that all DDL + DML runs in a single session and can + be committed atomically. dbt's ``connection_named`` context manager + releases (and effectively rolls back) the connection after each + ``execute_sql`` call, which caused INSERT data to be invisible to + subsequent ``dbt test`` sessions. + + Vertica uses double-quote identifiers (not backticks), so this class + overrides the ``seed`` method to use ``"col"`` quoting. + """ + + def _type_string(self) -> str: + # Must match edr_type_string (varchar(16000)) so that schema-change + # detection sees a consistent type between seeded tables and + # elementary metadata columns. + return "VARCHAR(16000)" + + def _type_boolean(self) -> str: + return "BOOLEAN" + + def _type_integer(self) -> str: + return "INTEGER" + + def _type_float(self) -> str: + return "FLOAT" + + def _format_value(self, value: object, col_type: str) -> str: + if value is None or (isinstance(value, str) and value == ""): + return "NULL" + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, (int, float)): + return str(value) + text = str(value) + text = text.replace("'", "''") + return f"'{text}'" + + def _create_table_sql(self, fq_table: str, col_defs: str) -> str: + return f"CREATE TABLE {fq_table} ({col_defs})" + + @staticmethod + def _vertica_connection(): + """Open a direct vertica_python connection from env / defaults.""" + import vertica_python # available in the test venv + + conn_info = { + "host": os.environ.get("VERTICA_HOST", "localhost"), + "port": int(os.environ.get("VERTICA_PORT", "5433")), + "user": os.environ.get("VERTICA_USER", "dbadmin"), + "password": os.environ.get("VERTICA_PASSWORD", "vertica"), + "database": os.environ.get("VERTICA_DATABASE", "elementary_tests"), + } + return vertica_python.connect(**conn_info) + + @contextmanager + def seed(self, data: List[dict], table_name: str) -> Generator[None, None, None]: + """Override base seed to use double-quote identifiers for Vertica.""" + if not data: + raise ValueError(f"Seed data for '{table_name}' must not be empty") + columns = list(data[0].keys()) + col_types: Dict[str, str] = { + col: self._infer_column_type([row.get(col) for row in data]) + for col in columns + } + # Vertica uses double-quote identifiers, not backticks. + col_defs = ", ".join(f'"{col}" {col_types[col]}' for col in columns) + fq_table = f'"{self._schema}"."{table_name}"' + + seed_path = self._write_csv(data, table_name) + + try: + # Use a direct connection so DDL + DML share the same session + # and the COMMIT is guaranteed to persist the data. + conn = self._vertica_connection() + try: + cur = conn.cursor() + cur.execute(f"DROP TABLE IF EXISTS {fq_table}") + cur.execute(self._create_table_sql(fq_table, col_defs)) + + for batch_start in range(0, len(data), _INSERT_BATCH_SIZE): + batch = data[batch_start : batch_start + _INSERT_BATCH_SIZE] + rows_sql = ", ".join( + "(" + + ", ".join( + self._format_value(row.get(c), col_types[c]) + for c in columns + ) + + ")" + for row in batch + ) + cur.execute(f"INSERT INTO {fq_table} VALUES {rows_sql}") + + conn.commit() + finally: + conn.close() + + logger.info( + "%s: loaded %d rows into %s (%s)", + type(self).__name__, + len(data), + fq_table, + ", ".join(f"{c}: {t}" for c, t in col_types.items()), + ) + + yield + finally: + seed_path.unlink(missing_ok=True) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 67c4373da..af32b9ca5 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -7,7 +7,12 @@ from uuid import uuid4 from adapter_query_runner import AdapterQueryRunner, UnsupportedJinjaError -from data_seeder import ClickHouseDirectSeeder, DbtDataSeeder, SparkS3CsvSeeder +from data_seeder import ( + ClickHouseDirectSeeder, + DbtDataSeeder, + SparkS3CsvSeeder, + VerticaDirectSeeder, +) from dbt_utils import get_database_and_schema_properties from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner from elementary.clients.dbt.factory import RunnerMethod, create_dbt_runner @@ -357,7 +362,9 @@ def _read_profile_schema(self) -> str: def _create_seeder( self, - ) -> Union[DbtDataSeeder, ClickHouseDirectSeeder, SparkS3CsvSeeder]: + ) -> Union[ + DbtDataSeeder, ClickHouseDirectSeeder, SparkS3CsvSeeder, VerticaDirectSeeder + ]: """Return the fastest available seeder for the current target.""" if self.target == "clickhouse": runner = self._get_query_runner() @@ -369,6 +376,14 @@ def _create_seeder( # set_from_args / reset_adapters). schema = self._read_profile_schema() + SCHEMA_NAME_SUFFIX return SparkS3CsvSeeder(schema, self.seeds_dir_path) + if self.target == "vertica": + # Vertica's COPY command (used by dbt seed) rejects empty CSV + # fields for non-string columns. Use direct INSERT instead. + # Read schema from profiles directly (like Spark) to avoid + # initialising an AdapterQueryRunner we don't need — Vertica + # uses a direct vertica_python connection, not the dbt adapter. + schema = self._read_profile_schema() + SCHEMA_NAME_SUFFIX + return VerticaDirectSeeder(None, schema, self.seeds_dir_path) return DbtDataSeeder( self.dbt_runner, self.project_dir_path, self.seeds_dir_path ) diff --git a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql index 5c3eeacdd..3e76e3dab 100644 --- a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql +++ b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql @@ -262,10 +262,10 @@ column_name, metric_name, case - when training_stddev is null then null + when {{ elementary.edr_normalize_stddev('training_stddev') }} is null then null when training_set_size = 1 then null -- Single value case - no historical context for anomaly detection - when training_stddev = 0 then 0 -- Stationary data case - valid, all values are identical - else (metric_value - training_avg) / (training_stddev) + when {{ elementary.edr_normalize_stddev('training_stddev') }} = 0 then 0 -- Stationary data case - valid, all values are identical + else (metric_value - training_avg) / ({{ elementary.edr_normalize_stddev('training_stddev') }}) end as anomaly_score, {{ test_configuration.anomaly_sensitivity }} as anomaly_score_threshold, source_value as anomalous_value, @@ -276,16 +276,16 @@ {% set limit_values = elementary.get_limit_metric_values(test_configuration) %} case - when training_stddev is null or training_set_size = 1 then null + when {{ elementary.edr_normalize_stddev('training_stddev') }} is null or training_set_size = 1 then null when {{ limit_values.min_metric_value }} > 0 or metric_name in {{ elementary.to_sql_list(elementary.get_negative_value_supported_metrics()) }} then {{ limit_values.min_metric_value }} else 0 end as min_metric_value, case - when training_stddev is null or training_set_size = 1 then null + when {{ elementary.edr_normalize_stddev('training_stddev') }} is null or training_set_size = 1 then null else {{ limit_values.max_metric_value }} end as max_metric_value, training_avg, - training_stddev, + {{ elementary.edr_normalize_stddev('training_stddev') }} as training_stddev, training_set_size, {{ elementary.edr_cast_as_timestamp('training_start') }} as training_start, {{ elementary.edr_cast_as_timestamp('training_end') }} as training_end, @@ -307,8 +307,9 @@ {% endmacro %} {% macro get_limit_metric_values(test_configuration) %} + {%- set normalized_stddev = elementary.edr_normalize_stddev("training_stddev") -%} {%- set min_val -%} - ((-1) * {{ test_configuration.anomaly_sensitivity }} * training_stddev + training_avg) + ((-1) * {{ test_configuration.anomaly_sensitivity }} * {{ normalized_stddev }} + training_avg) {%- endset -%} {% if test_configuration.ignore_small_changes.drop_failure_percent_threshold %} @@ -321,7 +322,7 @@ {% endif %} {%- set max_val -%} - ({{ test_configuration.anomaly_sensitivity }} * training_stddev + training_avg) + ({{ test_configuration.anomaly_sensitivity }} * {{ normalized_stddev }} + training_avg) {%- endset -%} {% if test_configuration.ignore_small_changes.spike_failure_percent_threshold %} diff --git a/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql b/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql index 1618bb205..7eb82c77c 100644 --- a/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql +++ b/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql @@ -86,3 +86,20 @@ {% macro sum(column_name) -%} sum(cast({{ column_name }} as {{ elementary.edr_type_float() }})) {%- endmacro %} + +{#- edr_normalize_stddev – post-process a stddev column reference so that + floating-point artefacts (tiny non-zero values for constant inputs) are + cleaned up. The default implementation is the identity function; Vertica + overrides it with round() because its STDDEV can return ~4e-08 for + perfectly identical values. -#} +{% macro edr_normalize_stddev(column_expr) -%} + {{ adapter.dispatch("edr_normalize_stddev", "elementary")(column_expr) }} +{%- endmacro %} + +{% macro default__edr_normalize_stddev(column_expr) -%} + {{ column_expr }} +{%- endmacro %} + +{% macro vertica__edr_normalize_stddev(column_expr) -%} + round({{ column_expr }}, 6) +{%- endmacro %} diff --git a/macros/edr/dbt_artifacts/upload_run_results.sql b/macros/edr/dbt_artifacts/upload_run_results.sql index bef9794c1..251f9737b 100644 --- a/macros/edr/dbt_artifacts/upload_run_results.sql +++ b/macros/edr/dbt_artifacts/upload_run_results.sql @@ -124,4 +124,12 @@ {% do flattened_node.update( {"compiled_code": elementary.get_compiled_code_too_long_err_msg()} ) %} + {#- On adapters with limited string-literal / varchar sizes (e.g. Vertica + 65 000 bytes) the error *message* can also embed the full compiled SQL, + making the INSERT statement exceed the adapter's limits. Truncate the + message so the row can still be persisted. -#} + {% set msg = flattened_node.get("message", "") %} + {% if msg is string and msg | length > 4096 %} + {% do flattened_node.update({"message": msg[:4096] ~ "... (truncated)"}) %} + {% endif %} {% endmacro %} diff --git a/macros/edr/system/system_utils/buckets_cte.sql b/macros/edr/system/system_utils/buckets_cte.sql index b45b2b999..a775b40f0 100644 --- a/macros/edr/system/system_utils/buckets_cte.sql +++ b/macros/edr/system/system_utils/buckets_cte.sql @@ -257,6 +257,29 @@ {{ return(complete_buckets_cte) }} {% endmacro %} +{% macro vertica__complete_buckets_cte( + time_bucket, + bucket_end_expr, + min_bucket_start_expr, + max_bucket_end_expr +) -%} + {%- set complete_buckets_cte %} + with integers as ( + select (row_number() over (order by t1.v, t2.v, t3.v, t4.v)) - 1 as num + from (select 1 union all select 2 union all select 3 union all select 4 union all select 5 union all select 6 union all select 7 union all select 8 union all select 9 union all select 10) t1(v) + cross join (select 1 union all select 2 union all select 3 union all select 4 union all select 5 union all select 6 union all select 7 union all select 8 union all select 9 union all select 10) t2(v) + cross join (select 1 union all select 2 union all select 3 union all select 4 union all select 5 union all select 6 union all select 7 union all select 8 union all select 9 union all select 10) t3(v) + cross join (select 1 union all select 2 union all select 3 union all select 4 union all select 5 union all select 6 union all select 7 union all select 8 union all select 9 union all select 10) t4(v) + ) + select + {{ elementary.edr_timeadd(time_bucket.period, 'num * ' ~ time_bucket.count, min_bucket_start_expr) }} as edr_bucket_start, + {{ elementary.edr_timeadd(time_bucket.period, '(num + 1) * ' ~ time_bucket.count, min_bucket_start_expr) }} as edr_bucket_end + from integers + where {{ elementary.edr_timeadd(time_bucket.period, '(num + 1) * ' ~ time_bucket.count, min_bucket_start_expr) }} <= {{ max_bucket_end_expr }} + {%- endset %} + {{ return(complete_buckets_cte) }} +{% endmacro %} + {% macro dremio__complete_buckets_cte( time_bucket, bucket_end_expr, min_bucket_start_expr, max_bucket_end_expr ) %} diff --git a/macros/edr/system/system_utils/empty_table.sql b/macros/edr/system/system_utils/empty_table.sql index 9ddcb055f..8a4deea37 100644 --- a/macros/edr/system/system_utils/empty_table.sql +++ b/macros/edr/system/system_utils/empty_table.sql @@ -150,7 +150,7 @@ {%- set empty_table_query -%} select * from ( select - {% for column in column_name_and_type_list %} + {%- for column in column_name_and_type_list -%} {{ elementary.empty_column(column[0], column[1]) }} {%- if not loop.last -%},{%- endif %} {%- endfor %} ) as empty_table diff --git a/macros/edr/system/system_utils/full_names.sql b/macros/edr/system/system_utils/full_names.sql index 00e7cdc2a..6cdd1e6b6 100644 --- a/macros/edr/system/system_utils/full_names.sql +++ b/macros/edr/system/system_utils/full_names.sql @@ -15,6 +15,14 @@ upper({{ alias_dot }}schema_name || '.' || {{ alias_dot }}table_name) {%- endmacro %} +{% macro vertica__full_table_name(alias) -%} + {# Vertica: upper() doubles varchar byte-length; cast to varchar(1000) first to stay under 65000 limit #} + {% if alias is defined %} {%- set alias_dot = alias ~ "." %} {% endif %} + upper(cast( + {{ alias_dot }}database_name || '.' || {{ alias_dot }}schema_name || '.' || {{ alias_dot }}table_name + as varchar(1000))) +{%- endmacro %} + {% macro full_schema_name() -%} {{ adapter.dispatch("full_schema_name", "elementary")() }} @@ -29,6 +37,11 @@ upper(schema_name) {%- endmacro %} +{% macro vertica__full_schema_name() -%} + {# Vertica: upper() doubles varchar byte-length; cast first to stay under 65000 limit #} + upper(cast(database_name || '.' || schema_name as varchar(1000))) +{%- endmacro %} + {% macro full_column_name() -%} {{ adapter.dispatch("full_column_name", "elementary")() }} @@ -45,11 +58,28 @@ upper(schema_name || '.' || table_name || '.' || column_name) {%- endmacro %} +{% macro vertica__full_column_name() -%} + {# Vertica: upper() doubles varchar byte-length; cast first to stay under 65000 limit #} + upper(cast( + database_name || '.' || schema_name || '.' || table_name || '.' || column_name + as varchar(1000))) +{%- endmacro %} + {% macro full_name_split(part_name) %} {{ adapter.dispatch("full_name_split", "elementary")(part_name) }} {% endmacro %} +{% macro vertica__full_name_split(part_name) %} + {# Vertica supports split_part (1-based index) but not array subscript syntax #} + {%- if part_name == "database_name" -%} {%- set part_index = 1 -%} + {%- elif part_name == "schema_name" -%} {%- set part_index = 2 -%} + {%- elif part_name == "table_name" -%} {%- set part_index = 3 -%} + {%- else -%} {{ return("") }} + {%- endif -%} + trim(both '"' from split_part(full_table_name, '.', {{ part_index }})) as {{ part_name }} +{% endmacro %} + {% macro default__full_name_split(part_name) %} {%- if part_name == "database_name" -%} {%- set part_index = 0 -%} diff --git a/macros/edr/system/system_utils/get_config_var.sql b/macros/edr/system/system_utils/get_config_var.sql index 10e9f7501..4d82741aa 100644 --- a/macros/edr/system/system_utils/get_config_var.sql +++ b/macros/edr/system/system_utils/get_config_var.sql @@ -171,6 +171,15 @@ {{- return(default_config) -}} {%- endmacro -%} +{%- macro vertica__get_default_config() -%} + {% set default_config = elementary.default__get_default_config() %} + {# Reduce batch INSERT query size from default 1,000,000 to avoid + overwhelming Vertica with very large single statements. Individual + column values are bounded by edr_type_long_string (varchar(32000)). #} + {% do default_config.update({"query_max_size": 250000}) %} + {{- return(default_config) -}} +{%- endmacro -%} + {%- macro dremio__get_default_config() -%} {% set default_config = elementary.default__get_default_config() %} {% do default_config.update({"dbt_artifacts_chunk_size": 100}) %} diff --git a/macros/utils/cross_db_utils/day_of_week.sql b/macros/utils/cross_db_utils/day_of_week.sql index 212857263..88bec3caa 100644 --- a/macros/utils/cross_db_utils/day_of_week.sql +++ b/macros/utils/cross_db_utils/day_of_week.sql @@ -63,6 +63,10 @@ to_char({{ date_expr }}, 'DAY') {% endmacro %} +{% macro vertica__edr_day_of_week_expression(date_expr) %} + trim(' ' from to_char({{ date_expr }}, 'Day')) +{% endmacro %} + {% macro fabric__edr_day_of_week_expression(date_expr) %} cast(datename(weekday, {{ date_expr }}) as varchar(30)) {% endmacro %} diff --git a/macros/utils/cross_db_utils/hour_of_week.sql b/macros/utils/cross_db_utils/hour_of_week.sql index a4758f395..632c503bf 100644 --- a/macros/utils/cross_db_utils/hour_of_week.sql +++ b/macros/utils/cross_db_utils/hour_of_week.sql @@ -78,6 +78,17 @@ {% endmacro %} -- fmt: on +{% macro vertica__edr_hour_of_week_expression(date_expr) %} + concat( + cast( + trim( + ' ' from to_char({{ date_expr }}, 'Day') + ) as {{ elementary.edr_type_string() }} + ), + cast(extract(hour from {{ date_expr }}) as {{ elementary.edr_type_string() }}) + ) +{% endmacro %} + {% macro fabric__edr_hour_of_week_expression(date_expr) %} concat( cast(datename(weekday, {{ date_expr }}) as {{ elementary.edr_type_string() }}), diff --git a/macros/utils/cross_db_utils/target_database.sql b/macros/utils/cross_db_utils/target_database.sql index 5361f5d86..e1653732b 100644 --- a/macros/utils/cross_db_utils/target_database.sql +++ b/macros/utils/cross_db_utils/target_database.sql @@ -26,3 +26,5 @@ {% macro fabric__target_database() %} {% do return(target.database) %} {% endmacro %} {% macro sqlserver__target_database() %} {% do return(target.database) %} {% endmacro %} + +{% macro vertica__target_database() %} {% do return(target.database) %} {% endmacro %} diff --git a/macros/utils/cross_db_utils/timeadd.sql b/macros/utils/cross_db_utils/timeadd.sql index 08e7465c0..87a1d9b18 100644 --- a/macros/utils/cross_db_utils/timeadd.sql +++ b/macros/utils/cross_db_utils/timeadd.sql @@ -43,6 +43,14 @@ + {{ elementary.edr_cast_as_int(number) }} * interval '1 {{ date_part }}' {% endmacro %} +{% macro vertica__edr_timeadd(date_part, number, timestamp_expression) %} + timestampadd( + {{ date_part | upper }}, + {{ elementary.edr_cast_as_int(number) }}, + {{ elementary.edr_cast_as_timestamp(timestamp_expression) }} + ) +{% endmacro %} + {% macro redshift__edr_timeadd(date_part, number, timestamp_expression) %} dateadd( {{ date_part }}, diff --git a/macros/utils/data_types/data_type.sql b/macros/utils/data_types/data_type.sql index 96a371b1d..0dc45d9b3 100644 --- a/macros/utils/data_types/data_type.sql +++ b/macros/utils/data_types/data_type.sql @@ -48,6 +48,8 @@ {% macro fabric__edr_type_string() %} {% do return("varchar(4096)") %} {% endmacro %} +{% macro vertica__edr_type_string() %} {% do return("varchar(16000)") %} {% endmacro %} + {%- macro edr_type_long_string() -%} {{ return(adapter.dispatch("edr_type_long_string", "elementary")()) }} @@ -69,6 +71,14 @@ {% set long_string = "text" %} {{ return(long_string) }} {%- endmacro -%} +{#- Vertica note: edr_type_string uses varchar(16000) because Vertica's + lower()/upper() double the byte-length. 16000 * 2 = 32000, safely + under the 65000 octet limit even when the function is applied twice + (e.g. lower(lower(col)) in nested subqueries). -#} +{%- macro vertica__edr_type_long_string() -%} + {% do return("varchar(32000)") %} +{%- endmacro -%} + {#- T-SQL: varchar(4096) is too small for compiled query text. Use varchar(max) which supports up to 2 GB. -#} {%- macro fabric__edr_type_long_string() -%} diff --git a/macros/utils/data_types/get_normalized_data_type.sql b/macros/utils/data_types/get_normalized_data_type.sql index c4a24fda7..aca2baa03 100644 --- a/macros/utils/data_types/get_normalized_data_type.sql +++ b/macros/utils/data_types/get_normalized_data_type.sql @@ -210,6 +210,19 @@ {%- endif %} {% endmacro %} +{% macro vertica__get_normalized_data_type(exact_data_type) %} + {# Vertica reports types like VARCHAR(16000), INT, BOOLEAN. + Normalize to match the canonical names used in test baselines and + other adapters. #} + {%- if exact_data_type.startswith("VARCHAR") or exact_data_type.startswith( + "CHAR" + ) or exact_data_type == "LONG VARCHAR" %} + {{ return("TEXT") }} + {%- elif exact_data_type == "INT" %} {{ return("INTEGER") }} + {%- else %} {{ return(exact_data_type) }} + {%- endif %} +{% endmacro %} + {% macro postgres__get_normalized_data_type(exact_data_type) %} {# understanding Postgres data type synonyms: https://www.postgresql.org/docs/current/datatype.html #} diff --git a/macros/utils/table_operations/get_relation_max_length.sql b/macros/utils/table_operations/get_relation_max_length.sql index 7dfaf4a2c..02e9577f1 100644 --- a/macros/utils/table_operations/get_relation_max_length.sql +++ b/macros/utils/table_operations/get_relation_max_length.sql @@ -39,6 +39,10 @@ {{ return(128) }} {% endmacro %} +{% macro vertica__get_relation_max_name_length(temporary, relation, sql_query) %} + {{ return(128) }} +{% endmacro %} + {% macro fabric__get_relation_max_name_length(temporary, relation, sql_query) %} {# SQL Server / Fabric limits identifiers to 128 chars. dbt-sqlserver may prefix the schema name onto the table identifier when creating diff --git a/macros/utils/table_operations/insert_rows.sql b/macros/utils/table_operations/insert_rows.sql index da23e9c5a..28ade72da 100644 --- a/macros/utils/table_operations/insert_rows.sql +++ b/macros/utils/table_operations/insert_rows.sql @@ -242,6 +242,10 @@ {{- return(string_value | replace("'", "''")) -}} {%- endmacro -%} +{%- macro vertica__escape_special_chars(string_value) -%} + {{- return(string_value | replace("'", "''")) -}} +{%- endmacro -%} + {%- macro athena__escape_special_chars(string_value) -%} {{- return(string_value | replace("'", "''")) -}} {%- endmacro -%} diff --git a/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql b/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql index 8f9b8b42c..6bdee2ce7 100644 --- a/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql +++ b/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql @@ -64,13 +64,15 @@ with select *, case - when training_stddev is null + when {{ elementary.edr_normalize_stddev("training_stddev") }} is null then null when training_set_size = 1 then null -- Single value case - no historical context for anomaly detection - when training_stddev = 0 + when {{ elementary.edr_normalize_stddev("training_stddev") }} = 0 then 0 -- Stationary data case - valid, all values are identical - else (metric_value - training_avg) / (training_stddev) + else + (metric_value - training_avg) + / ({{ elementary.edr_normalize_stddev("training_stddev") }}) end as anomaly_score from time_window_aggregation