diff --git a/integration_tests/dbt_project/macros/replace_empty_strings_with_nulls.sql b/integration_tests/dbt_project/macros/replace_empty_strings_with_nulls.sql index c7c0f20cc..cd2a54d38 100644 --- a/integration_tests/dbt_project/macros/replace_empty_strings_with_nulls.sql +++ b/integration_tests/dbt_project/macros/replace_empty_strings_with_nulls.sql @@ -1,3 +1,6 @@ +{# This macro is only used for BigQuery fusion seeds (see dbt_project.py _fix_seed_if_needed). + ClickHouse uses ClickHouseDirectSeeder (data_seeder.py) which creates Nullable(String) + columns directly, so no post-hoc repair is needed. #} {% macro replace_empty_strings_with_nulls(table_name) %} {% set relation = ref(table_name) %} {% set columns = adapter.get_columns_in_relation(relation) %} @@ -5,12 +8,13 @@ {% for col in columns %} {% set data_type = elementary.get_column_data_type(col) %} {% set normalized_data_type = elementary.normalize_data_type(data_type) %} - + {% if normalized_data_type == "string" %} + {% set quoted_col = adapter.quote(col["name"]) %} {% set update_query %} update {{ relation }} - set {{ col["name"] }} = NULL - where {{ col["name"] }} = '' + set {{ quoted_col }} = NULL + where {{ quoted_col }} = '' {% endset %} {% do elementary.run_query(update_query) %} {% endif %} diff --git a/integration_tests/docker-compose.yml b/integration_tests/docker-compose.yml index d8fc1c398..ee30414ec 100644 --- a/integration_tests/docker-compose.yml +++ b/integration_tests/docker-compose.yml @@ -20,6 +20,7 @@ services: - "9000:9000" volumes: - clickhouse:/var/lib/clickhouse + - ./docker/clickhouse/users.xml:/etc/clickhouse-server/users.d/elementary.xml environment: CLICKHOUSE_DB: default CLICKHOUSE_USER: default diff --git a/integration_tests/docker/clickhouse/users.xml b/integration_tests/docker/clickhouse/users.xml new file mode 100644 index 000000000..03f8fb2dc --- /dev/null +++ b/integration_tests/docker/clickhouse/users.xml @@ -0,0 +1,8 @@ + + + + 1 + 1 + + + diff --git a/integration_tests/tests/adapter_query_runner.py b/integration_tests/tests/adapter_query_runner.py index 7e924bda6..6ac9d96ff 100644 --- a/integration_tests/tests/adapter_query_runner.py +++ b/integration_tests/tests/adapter_query_runner.py @@ -239,6 +239,16 @@ def has_non_ref_jinja(query: str) -> bool: stripped = _SOURCE_PATTERN.sub("", stripped) return bool(_JINJA_EXPR_PATTERN.search(stripped)) + def execute_sql(self, sql: str) -> None: + """Execute a SQL statement that does not return results (DDL/DML).""" + with self._adapter.connection_named("execute_sql"): + self._adapter.execute(sql, fetch=False) + + @property + def schema_name(self) -> str: + """Return the base schema name from the adapter credentials.""" + return self._adapter.config.credentials.schema + def run_query(self, prerendered_query: str) -> List[Dict[str, Any]]: """Render Jinja refs/sources and execute a query, returning rows as dicts. diff --git a/integration_tests/tests/data_seeder.py b/integration_tests/tests/data_seeder.py index f89c0c5eb..a65d65c11 100644 --- a/integration_tests/tests/data_seeder.py +++ b/integration_tests/tests/data_seeder.py @@ -1,12 +1,13 @@ import csv from contextlib import contextmanager from pathlib import Path -from typing import Generator, List +from typing import TYPE_CHECKING, Generator, List from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner from logger import get_logger -# TODO: Write more performant data seeders per adapter. +if TYPE_CHECKING: + from adapter_query_runner import AdapterQueryRunner logger = get_logger(__name__) @@ -48,3 +49,126 @@ def seed(self, data: List[dict], table_name: str) -> Generator[None, None, None] yield finally: seed_path.unlink() + + +# Maximum number of rows per INSERT VALUES statement. +_INSERT_BATCH_SIZE = 500 + + +class ClickHouseDirectSeeder: + """Fast seeder for ClickHouse: executes CREATE TABLE + INSERT directly. + + Bypasses the ``dbt seed`` *subprocess* (and its post-hoc NULL repair), + but still writes a CSV file to the seeds directory so that dbt can + discover the seed node for ``{{ ref() }}`` resolution during + ``run_operation``. + + Column types are inferred from the Python values in the seed data and + wrapped in ``Nullable()`` so that NULL values are preserved correctly + (ClickHouse columns are non-Nullable by default). + """ + + def __init__( + self, + query_runner: "AdapterQueryRunner", + schema: str, + seeds_dir_path: Path, + ) -> None: + self._query_runner = query_runner + self._schema = schema + self._seeds_dir_path = seeds_dir_path + + @staticmethod + def _infer_column_type(values: List[object]) -> str: + """Infer a ClickHouse column type from a list of Python values. + + Examines non-None, non-empty-string values and returns a + ``Nullable(...)`` type string. Falls back to ``Nullable(String)`` + when all values are None/empty or when types are mixed. + """ + non_null = [v for v in values if v is not None and v != ""] + if not non_null: + return "Nullable(String)" + + # bool is a subclass of int in Python, so check it first. + # dbt seed infers "True"/"False" CSV values as boolean; dbt-clickhouse + # maps this to Bool (alias for UInt8). + if all(isinstance(v, bool) for v in non_null): + return "Nullable(Bool)" + if all(isinstance(v, int) and not isinstance(v, bool) for v in non_null): + return "Nullable(Int64)" + if all( + isinstance(v, (int, float)) and not isinstance(v, bool) for v in non_null + ): + return "Nullable(Float64)" + return "Nullable(String)" + + @staticmethod + def _escape(value: object) -> str: + """Escape a value for a ClickHouse SQL literal. + + Returns ``NULL`` for None / empty-string, unquoted literals for + numeric / boolean types, and a quoted+escaped string otherwise. + """ + if value is None or (isinstance(value, str) and value == ""): + return "NULL" + # Booleans → ClickHouse Bool literals (true/false). + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, (int, float)): + return str(value) + text = str(value) + text = text.replace("\\", "\\\\") + text = text.replace("'", "\\'") + return f"'{text}'" + + @contextmanager + def seed(self, data: List[dict], table_name: str) -> Generator[None, None, None]: + """Create a table with correctly-typed Nullable columns and insert data. + + A CSV file is written to the seeds directory so that dbt can + discover the seed node for ``{{ ref() }}`` resolution. The file + is removed when the context manager exits. + """ + columns = list(data[0].keys()) + col_types = { + col: self._infer_column_type([row.get(col) for row in data]) + for col in columns + } + col_defs = ", ".join(f"`{col}` {col_types[col]}" for col in columns) + fq_table = f"`{self._schema}`.`{table_name}`" + + # Write a CSV so dbt discovers the seed node (needed for {{ ref() }}). + seed_path = self._seeds_dir_path / f"{table_name}.csv" + with seed_path.open("w") as f: + writer = csv.DictWriter(f, fieldnames=columns) + writer.writeheader() + writer.writerows(data) + + try: + self._query_runner.execute_sql(f"DROP TABLE IF EXISTS {fq_table}") + self._query_runner.execute_sql( + f"CREATE TABLE {fq_table} ({col_defs}) " + f"ENGINE = MergeTree() ORDER BY tuple()" + ) + + for batch_start in range(0, len(data), _INSERT_BATCH_SIZE): + batch = data[batch_start : batch_start + _INSERT_BATCH_SIZE] + rows_sql = ", ".join( + "(" + ", ".join(self._escape(row.get(c)) for c in columns) + ")" + for row in batch + ) + self._query_runner.execute_sql( + f"INSERT INTO {fq_table} VALUES {rows_sql}" + ) + + logger.info( + "ClickHouseDirectSeeder: loaded %d rows into %s (%s)", + len(data), + fq_table, + ", ".join(f"{c}: {t}" for c, t in col_types.items()), + ) + + yield + finally: + seed_path.unlink(missing_ok=True) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 44b9f3f41..41d64c1f2 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -7,29 +7,16 @@ from uuid import uuid4 from adapter_query_runner import AdapterQueryRunner, UnsupportedJinjaError -from data_seeder import DbtDataSeeder +from data_seeder import ClickHouseDirectSeeder, DbtDataSeeder from dbt_utils import get_database_and_schema_properties from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner from elementary.clients.dbt.factory import RunnerMethod, create_dbt_runner from logger import get_logger from ruamel.yaml import YAML -from tenacity import ( - RetryCallState, - retry, - retry_if_result, - stop_after_attempt, - wait_fixed, -) PYTEST_XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", None) SCHEMA_NAME_SUFFIX = f"_{PYTEST_XDIST_WORKER}" if PYTEST_XDIST_WORKER else "" -# Retry settings for the run_operation fallback path. run_operation() can -# intermittently return an empty list when the MACRO_RESULT_PATTERN log line -# is not captured from dbt's output. -_RUN_QUERY_MAX_RETRIES = 3 -_RUN_QUERY_RETRY_DELAY_SECONDS = 0.5 - _DEFAULT_VARS = { "disable_dbt_invocation_autoupload": True, "disable_dbt_artifacts_autoupload": True, @@ -92,51 +79,21 @@ def run_query(self, prerendered_query: str): except UnsupportedJinjaError: logger.debug("Query contains complex Jinja; falling back to run_operation") - # Slow path: full Jinja rendering via run_operation (with retry). + # Slow path: full Jinja rendering via run_operation. return self._run_query_with_run_operation(prerendered_query) - @staticmethod - def _log_retry(retry_state: RetryCallState) -> None: - """Tenacity before_sleep callback — logs each retry with attempt number.""" - logger.warning( - "run_operation('elementary.render_run_query') returned no output; " - "retry %d/%d in %.1fs", - retry_state.attempt_number, - _RUN_QUERY_MAX_RETRIES, - _RUN_QUERY_RETRY_DELAY_SECONDS, - ) - - @retry( - retry=retry_if_result(lambda r: r is None), - stop=stop_after_attempt(_RUN_QUERY_MAX_RETRIES), - wait=wait_fixed(_RUN_QUERY_RETRY_DELAY_SECONDS), - before_sleep=_log_retry.__func__, - reraise=True, - ) - def _run_operation_with_retry(self, prerendered_query: str) -> Optional[list]: - """Call run_operation and return the parsed result, or None to trigger retry.""" + def _run_query_with_run_operation(self, prerendered_query: str): + """Execute a query via run_operation.""" run_operation_results = self.dbt_runner.run_operation( "elementary.render_run_query", macro_args={"prerendered_query": prerendered_query}, ) - if run_operation_results: - return json.loads(run_operation_results[0]) - return None - - def _run_query_with_run_operation(self, prerendered_query: str): - """Execute a query via run_operation with retry on empty output. - - run_operation() can intermittently return an empty list when the - MACRO_RESULT_PATTERN log line is not captured from dbt's output. - """ - result = self._run_operation_with_retry(prerendered_query) - if result is None: + if not run_operation_results: raise RuntimeError( - f"run_operation('elementary.render_run_query') returned no output " - f"after {_RUN_QUERY_MAX_RETRIES} attempts. " + f"run_operation('elementary.render_run_query') returned no output. " f"Query: {prerendered_query!r}" ) - return result + return json.loads(run_operation_results[0]) @staticmethod def read_table_query( @@ -326,15 +283,25 @@ def test( } return [test_result] if multiple_results else test_result - def seed(self, data: List[dict], table_name: str): - with DbtDataSeeder( + def _create_seeder( + self, + ) -> Union[DbtDataSeeder, "ClickHouseDirectSeeder"]: + """Return the appropriate seeder for the current target.""" + if self.target == "clickhouse": + runner = self._get_query_runner() + schema = runner.schema_name + SCHEMA_NAME_SUFFIX + return ClickHouseDirectSeeder(runner, schema, self.seeds_dir_path) + return DbtDataSeeder( self.dbt_runner, self.project_dir_path, self.seeds_dir_path - ).seed(data, table_name): + ) + + def seed(self, data: List[dict], table_name: str): + with self._create_seeder().seed(data, table_name): self._fix_seed_if_needed(table_name) - def _fix_seed_if_needed(self, table_name: str): + def _fix_seed_if_needed(self, table_name: str) -> None: # Hack for BigQuery - seems like we get empty strings instead of nulls in seeds, so we - # fix them here + # fix them here. if self.runner_method == RunnerMethod.FUSION and self.target == "bigquery": self.dbt_runner.run_operation( "elementary_tests.replace_empty_strings_with_nulls", @@ -345,9 +312,8 @@ def _fix_seed_if_needed(self, table_name: str): def seed_context( self, data: List[dict], table_name: str ) -> Generator[None, None, None]: - with DbtDataSeeder( - self.dbt_runner, self.project_dir_path, self.seeds_dir_path - ).seed(data, table_name): + with self._create_seeder().seed(data, table_name): + self._fix_seed_if_needed(table_name) yield @contextmanager diff --git a/integration_tests/tests/test_all_columns_anomalies.py b/integration_tests/tests/test_all_columns_anomalies.py index cf645808b..0bf519942 100644 --- a/integration_tests/tests/test_all_columns_anomalies.py +++ b/integration_tests/tests/test_all_columns_anomalies.py @@ -13,8 +13,6 @@ } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_all_columns_anomalies(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data: List[Dict[str, Any]] = [ @@ -31,8 +29,6 @@ def test_anomalyless_all_columns_anomalies(test_id: str, dbt_project: DbtProject assert all([res["status"] == "pass" for res in test_results]) -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalous_all_columns_anomalies(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) @@ -57,8 +53,6 @@ def test_anomalous_all_columns_anomalies(test_id: str, dbt_project: DbtProject): assert col_to_status == {"superhero": "fail", TIMESTAMP_COLUMN: "pass"} -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_all_columns_anomalies_with_where_parameter( test_id: str, dbt_project: DbtProject ): @@ -128,8 +122,6 @@ def test_all_columns_anomalies_with_where_parameter( } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_all_columns_anomalies_all_monitors_sanity( test_id: str, dbt_project: DbtProject ): @@ -155,8 +147,6 @@ def test_anomalyless_all_columns_anomalies_all_monitors_sanity( assert all([res["status"] == "pass" for res in test_results]) -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) @pytest.mark.parametrize( "exclude_detection,expected_status", [ diff --git a/integration_tests/tests/test_anomalies_backfill_logic.py b/integration_tests/tests/test_anomalies_backfill_logic.py index d9a4dea66..88f6ad677 100644 --- a/integration_tests/tests/test_anomalies_backfill_logic.py +++ b/integration_tests/tests/test_anomalies_backfill_logic.py @@ -2,7 +2,6 @@ from datetime import datetime, time, timedelta import dateutil.parser -import pytest from data_generator import DATE_FORMAT, generate_dates from dbt_project import DbtProject @@ -74,8 +73,6 @@ def get_latest_anomaly_test_metrics(dbt_project: DbtProject, test_id: str): } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_full_backfill_for_non_incremental_model(dbt_project: DbtProject, test_id: str): utc_today = datetime.utcnow().date() data_dates = generate_dates(base_date=utc_today - timedelta(1)) @@ -109,8 +106,6 @@ def test_full_backfill_for_non_incremental_model(dbt_project: DbtProject, test_i } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_partial_backfill_for_incremental_models(dbt_project: DbtProject, test_id: str): utc_today = datetime.utcnow().date() data_dates = generate_dates(base_date=utc_today - timedelta(1)) @@ -157,8 +152,6 @@ def test_partial_backfill_for_incremental_models(dbt_project: DbtProject, test_i } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_longer_backfill_in_case_of_a_gap(dbt_project: DbtProject, test_id: str): date_gap_size = 5 utc_today = datetime.utcnow().date() @@ -211,8 +204,6 @@ def test_longer_backfill_in_case_of_a_gap(dbt_project: DbtProject, test_id: str) } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_full_backfill_if_metric_not_updated_for_a_long_time( dbt_project: DbtProject, test_id: str ): @@ -272,8 +263,6 @@ def test_full_backfill_if_metric_not_updated_for_a_long_time( } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_backfill_when_metric_doesnt_exist_back_enough( dbt_project: DbtProject, test_id: str ): @@ -318,8 +307,6 @@ def test_backfill_when_metric_doesnt_exist_back_enough( } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_backfill_with_middle_buckets_gap(dbt_project: DbtProject, test_id: str): utc_today = datetime.utcnow().date() data_start = utc_today - timedelta(21) @@ -388,8 +375,6 @@ def test_backfill_with_middle_buckets_gap(dbt_project: DbtProject, test_id: str) } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_bucket_size_not_aligned_with_days(dbt_project: DbtProject, test_id: str): """ In this test we choose a bucket size that is not aligned with one day - specifically 7 hours. diff --git a/integration_tests/tests/test_anomalies_ranges.py b/integration_tests/tests/test_anomalies_ranges.py index bd78ead71..890f421a4 100644 --- a/integration_tests/tests/test_anomalies_ranges.py +++ b/integration_tests/tests/test_anomalies_ranges.py @@ -2,7 +2,6 @@ from datetime import datetime, timedelta from typing import Any, Dict, List -import pytest from data_generator import DATE_FORMAT, generate_dates from dbt_project import DbtProject @@ -34,8 +33,6 @@ def get_latest_anomaly_test_points(dbt_project: DbtProject, test_id: str): return [json.loads(result["result_row"]) for result in results] -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomaly_ranges_are_valid(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) @@ -69,8 +66,6 @@ def test_anomaly_ranges_are_valid(test_id: str, dbt_project: DbtProject): assert all([row["min_value"] == row["max_value"] for row in anomaly_test_points]) -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomaly_ranges_are_valid_with_seasonality( test_id: str, dbt_project: DbtProject ): diff --git a/integration_tests/tests/test_anomaly_exclude_metrics.py b/integration_tests/tests/test_anomaly_exclude_metrics.py index 16d609730..40892506f 100644 --- a/integration_tests/tests/test_anomaly_exclude_metrics.py +++ b/integration_tests/tests/test_anomaly_exclude_metrics.py @@ -1,7 +1,6 @@ from datetime import datetime, timedelta from typing import Any, Dict, List -import pytest from data_generator import DATE_FORMAT, generate_dates from dbt_project import DbtProject from parametrization import Parametrization @@ -25,8 +24,6 @@ time_bucket={"period": "hour", "count": 6}, dates_step=timedelta(hours=6), ) -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_exclude_specific_dates( test_id: str, dbt_project: DbtProject, time_bucket: dict, dates_step: timedelta ): @@ -71,8 +68,6 @@ def test_exclude_specific_dates( assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_exclude_specific_timestamps(test_id: str, dbt_project: DbtProject): # To avoid races, set the "custom_started_at" to the beginning of the hour test_started_at = datetime.utcnow().replace(minute=0, second=0) @@ -128,8 +123,6 @@ def test_exclude_specific_timestamps(test_id: str, dbt_project: DbtProject): assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_exclude_date_range(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) @@ -165,8 +158,6 @@ def test_exclude_date_range(test_id: str, dbt_project: DbtProject): assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_exclude_by_metric_value(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) diff --git a/integration_tests/tests/test_anomaly_test_configuration.py b/integration_tests/tests/test_anomaly_test_configuration.py index b91507fde..1e318ee59 100644 --- a/integration_tests/tests/test_anomaly_test_configuration.py +++ b/integration_tests/tests/test_anomaly_test_configuration.py @@ -114,7 +114,6 @@ def get_value(key: str): test_config={key: value.test for key, value in PARAM_VALUES.items()}, expected_config=_get_expected_adapted_config("test"), ) -@pytest.mark.skip_targets(["clickhouse"]) @pytest.mark.skip_for_dbt_fusion def test_anomaly_test_configuration( dbt_project: DbtProject, diff --git a/integration_tests/tests/test_collect_metrics.py b/integration_tests/tests/test_collect_metrics.py index 467e57fd2..336bef02b 100644 --- a/integration_tests/tests/test_collect_metrics.py +++ b/integration_tests/tests/test_collect_metrics.py @@ -2,7 +2,6 @@ from datetime import datetime, timedelta from typing import Any, Dict, List -import pytest from data_generator import DATE_FORMAT, generate_dates from dbt_project import DbtProject @@ -35,8 +34,6 @@ } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_collect_metrics(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data: List[Dict[str, Any]] = [ @@ -65,8 +62,6 @@ def test_collect_metrics(test_id: str, dbt_project: DbtProject): assert col_to_metric_names == EXPECTED_COL_TO_METRIC_NAMES -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_collect_no_timestamp_metrics(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data: List[Dict[str, Any]] = [ @@ -96,8 +91,6 @@ def test_collect_no_timestamp_metrics(test_id: str, dbt_project: DbtProject): assert col_to_metric_names == EXPECTED_COL_TO_METRIC_NAMES -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_collect_group_by_metrics(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data: List[Dict[str, Any]] = [ @@ -148,8 +141,6 @@ def test_collect_group_by_metrics(test_id: str, dbt_project: DbtProject): assert dim_to_col_to_metric_names == expected_dim_to_col_to_metric_names -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_collect_metrics_unique_metric_name(test_id: str, dbt_project: DbtProject): args = DBT_TEST_ARGS.copy() args["metrics"].append(args["metrics"][0]) diff --git a/integration_tests/tests/test_column_anomalies.py b/integration_tests/tests/test_column_anomalies.py index d4217009c..57c641f25 100644 --- a/integration_tests/tests/test_column_anomalies.py +++ b/integration_tests/tests/test_column_anomalies.py @@ -1,7 +1,6 @@ from datetime import datetime, timedelta from typing import Any, Dict, List -import pytest from data_generator import DATE_FORMAT, generate_dates from dbt_project import DbtProject from parametrization import Parametrization @@ -14,8 +13,6 @@ } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_column_anomalies(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data: List[Dict[str, Any]] = [ @@ -32,8 +29,6 @@ def test_anomalyless_column_anomalies(test_id: str, dbt_project: DbtProject): assert test_result["status"] == "pass" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_no_timestamp_column_anomalies( test_id: str, dbt_project: DbtProject ): @@ -54,8 +49,6 @@ def test_anomalyless_no_timestamp_column_anomalies( assert test_result["status"] == "pass" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalous_column_anomalies(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) @@ -79,8 +72,6 @@ def test_anomalous_column_anomalies(test_id: str, dbt_project: DbtProject): assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_column_anomalies_with_where_parameter(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) @@ -138,8 +129,6 @@ def test_column_anomalies_with_where_parameter(test_id: str, dbt_project: DbtPro assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_column_anomalies_with_timestamp_as_sql_expression( test_id: str, dbt_project: DbtProject ): @@ -182,8 +171,6 @@ def test_column_anomalies_with_timestamp_as_sql_expression( drop_failure_percent_threshold=5, metric_value=1, ) -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_volume_anomaly_static_data_drop( test_id: str, dbt_project: DbtProject, @@ -228,8 +215,6 @@ def test_volume_anomaly_static_data_drop( assert test_result["status"] == expected_result -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_column_anomalies_group(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data: List[Dict[str, Any]] = [ @@ -246,8 +231,6 @@ def test_anomalyless_column_anomalies_group(test_id: str, dbt_project: DbtProjec assert test_result["status"] == "pass" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_column_anomalies_group_by(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) @@ -301,8 +284,6 @@ def test_column_anomalies_group_by(test_id: str, dbt_project: DbtProject): assert test_result["failures"] == 2 -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_column_anomalies_group_by_none_dimension( test_id: str, dbt_project: DbtProject ): @@ -347,8 +328,6 @@ def test_anomalyless_column_anomalies_group_by_none_dimension( assert test_result["failures"] == 2 -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_column_anomalies_group_by_multi( test_id: str, dbt_project: DbtProject ): @@ -400,8 +379,6 @@ def test_anomalyless_column_anomalies_group_by_multi( assert test_result["failures"] == 3 -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_column_anomalies_group_by_description( test_id: str, dbt_project: DbtProject ): @@ -436,8 +413,6 @@ def test_anomalyless_column_anomalies_group_by_description( assert "not enough data" not in test_result["test_results_description"].lower() -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalous_boolean_column_anomalies(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) @@ -478,8 +453,6 @@ def test_anomalous_boolean_column_anomalies(test_id: str, dbt_project: DbtProjec } -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_col_anom_excl_detect_train(test_id: str, dbt_project: DbtProject): """ Test the exclude_detection_period_from_training flag functionality for column anomalies. diff --git a/integration_tests/tests/test_column_pii_sampling.py b/integration_tests/tests/test_column_pii_sampling.py index fdabf37cc..9955c37d5 100644 --- a/integration_tests/tests/test_column_pii_sampling.py +++ b/integration_tests/tests/test_column_pii_sampling.py @@ -1,6 +1,5 @@ import json -import pytest from dbt_project import DbtProject SENSITIVE_COLUMN = "email" @@ -23,7 +22,6 @@ TEST_SAMPLE_ROW_COUNT = 5 -@pytest.mark.skip_targets(["clickhouse"]) def test_column_pii_sampling_enabled(test_id: str, dbt_project: DbtProject): """Test that PII columns are excluded when column-level PII protection is enabled""" data = [ @@ -55,7 +53,6 @@ def test_column_pii_sampling_enabled(test_id: str, dbt_project: DbtProject): assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_column_pii_sampling_disabled(test_id: str, dbt_project: DbtProject): """Test that all columns are included when column-level PII protection is disabled""" data = [ @@ -91,7 +88,6 @@ def test_column_pii_sampling_disabled(test_id: str, dbt_project: DbtProject): assert samples[0]["n_records"] == 10 -@pytest.mark.skip_targets(["clickhouse"]) def test_column_pii_default_tag_override(test_id: str, dbt_project: DbtProject): """Test that default PII tag can be overridden with a custom tag""" data = [ @@ -128,7 +124,6 @@ def test_column_pii_default_tag_override(test_id: str, dbt_project: DbtProject): assert samples[0]["n_records"] == 10 -@pytest.mark.skip_targets(["clickhouse"]) def test_column_pii_sampling_tags_exist_but_flag_disabled( test_id: str, dbt_project: DbtProject ): @@ -166,7 +161,6 @@ def test_column_pii_sampling_tags_exist_but_flag_disabled( assert samples[0]["n_records"] == 10 -@pytest.mark.skip_targets(["clickhouse"]) def test_column_pii_sampling_all_columns_pii(test_id: str, dbt_project: DbtProject): """Test behavior when all columns are tagged as PII""" data = [ @@ -201,7 +195,6 @@ def test_column_pii_sampling_all_columns_pii(test_id: str, dbt_project: DbtProje assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_unique_test_custom_tag(test_id: str, dbt_project: DbtProject): """Test that column mapping correctly maps unique test columns""" data = [{SENSITIVE_COLUMN: "user@example.com", SAFE_COLUMN: i} for i in range(10)] @@ -232,7 +225,6 @@ def test_unique_test_custom_tag(test_id: str, dbt_project: DbtProject): assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_accepted_values_multi_tags(test_id: str, dbt_project: DbtProject): """Test that column mapping correctly maps accepted_values test columns""" data = [{SENSITIVE_COLUMN: "invalid_value", SAFE_COLUMN: i} for i in range(10)] @@ -263,7 +255,6 @@ def test_accepted_values_multi_tags(test_id: str, dbt_project: DbtProject): assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_not_null_test_multi_matched_tags(test_id: str, dbt_project: DbtProject): """Test that column mapping correctly handles not_null test columns""" data = [{SENSITIVE_COLUMN: None, SAFE_COLUMN: i} for i in range(10)] @@ -294,7 +285,6 @@ def test_not_null_test_multi_matched_tags(test_id: str, dbt_project: DbtProject) assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_multiple_pii_columns_mapping(test_id: str, dbt_project: DbtProject): """Test that column mapping handles multiple PII columns correctly""" data = [ @@ -328,7 +318,6 @@ def test_multiple_pii_columns_mapping(test_id: str, dbt_project: DbtProject): assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_custom_sql_test_with_pii_column_simple(test_id: str, dbt_project: DbtProject): """Test that custom SQL tests with PII columns are handled correctly""" data = [{SENSITIVE_COLUMN: "user@example.com", SAFE_COLUMN: i} for i in range(10)] @@ -359,7 +348,6 @@ def test_custom_sql_test_with_pii_column_simple(test_id: str, dbt_project: DbtPr assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_meta_tags_and_accepted_values(test_id: str, dbt_project: DbtProject): data = [{SENSITIVE_COLUMN: "user@example.com", SAFE_COLUMN: i} for i in range(10)] diff --git a/integration_tests/tests/test_dbt_artifacts/test_artifacts.py b/integration_tests/tests/test_dbt_artifacts/test_artifacts.py index defb3a665..c6505afc5 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_artifacts.py +++ b/integration_tests/tests/test_dbt_artifacts/test_artifacts.py @@ -89,12 +89,10 @@ def test_dbt_artifacts_hashes(dbt_project: DbtProject): dbt_project.read_table("dbt_artifacts_hashes", raise_if_empty=False) -@pytest.mark.skip_targets(["clickhouse"]) def test_anomaly_threshold_sensitivity(dbt_project: DbtProject): dbt_project.read_table("anomaly_threshold_sensitivity", raise_if_empty=False) -@pytest.mark.skip_targets(["clickhouse"]) def test_metrics_anomaly_score(dbt_project: DbtProject): dbt_project.read_table("metrics_anomaly_score", raise_if_empty=False) diff --git a/integration_tests/tests/test_dimension_anomalies.py b/integration_tests/tests/test_dimension_anomalies.py index d55354a56..fd240bf90 100644 --- a/integration_tests/tests/test_dimension_anomalies.py +++ b/integration_tests/tests/test_dimension_anomalies.py @@ -31,8 +31,6 @@ def get_latest_anomaly_test_points(dbt_project: DbtProject, test_id: str): return [json.loads(result["result_row"]) for result in results] -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_dimension_anomalies(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data: List[Dict[str, Any]] = [ @@ -51,8 +49,6 @@ def test_anomalyless_dimension_anomalies(test_id: str, dbt_project: DbtProject): assert len(anomaly_test_points) == 0 -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_dimension_anomalies_with_timestamp_as_sql_expression( test_id: str, dbt_project: DbtProject ): @@ -73,8 +69,6 @@ def test_dimension_anomalies_with_timestamp_as_sql_expression( assert test_result["status"] == "pass" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalous_dimension_anomalies(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) @@ -114,8 +108,6 @@ def test_anomalous_dimension_anomalies(test_id: str, dbt_project: DbtProject): assert any(x["is_anomalous"] for x in superman_anomaly_test_points) -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_dimensions_anomalies_with_where_parameter( test_id: str, dbt_project: DbtProject ): @@ -166,8 +158,6 @@ def test_dimensions_anomalies_with_where_parameter( assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_dimension_anomalies_with_timestamp_exclude_final_results( test_id: str, dbt_project: DbtProject ): @@ -225,7 +215,6 @@ def test_dimension_anomalies_with_timestamp_exclude_final_results( # 1. Detection period contains anomalous distribution data that would normally be included in training # 2. With exclude_detection=False: anomaly is missed (test passes) because training includes the anomaly # 3. With exclude_detection=True: anomaly is detected (test fails) because training excludes the anomaly -@pytest.mark.skip_targets(["clickhouse"]) @pytest.mark.parametrize( "exclude_detection,expected_status", [ diff --git a/integration_tests/tests/test_disable_samples_config.py b/integration_tests/tests/test_disable_samples_config.py index 3ab2d97ec..662a6c758 100644 --- a/integration_tests/tests/test_disable_samples_config.py +++ b/integration_tests/tests/test_disable_samples_config.py @@ -1,6 +1,5 @@ import json -import pytest from dbt_project import DbtProject COLUMN_NAME = "sensitive_data" @@ -20,7 +19,6 @@ """ -@pytest.mark.skip_targets(["clickhouse"]) def test_disable_samples_config_prevents_sampling( test_id: str, dbt_project: DbtProject ): @@ -48,7 +46,6 @@ def test_disable_samples_config_prevents_sampling( assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_disable_samples_false_allows_sampling(test_id: str, dbt_project: DbtProject): null_count = 20 data = [{COLUMN_NAME: None} for _ in range(null_count)] @@ -77,7 +74,6 @@ def test_disable_samples_false_allows_sampling(test_id: str, dbt_project: DbtPro assert sample[COLUMN_NAME] is None -@pytest.mark.skip_targets(["clickhouse"]) def test_disable_samples_config_overrides_pii_tags( test_id: str, dbt_project: DbtProject ): @@ -106,7 +102,6 @@ def test_disable_samples_config_overrides_pii_tags( assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_disable_samples_and_pii_interaction(test_id: str, dbt_project: DbtProject): """Test that disable_test_samples and PII columns both get excluded""" data = [ @@ -137,7 +132,6 @@ def test_disable_samples_and_pii_interaction(test_id: str, dbt_project: DbtProje assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_disable_samples_with_multiple_columns(test_id: str, dbt_project: DbtProject): """Test that disable_test_samples excludes only the disabled column""" data = [{"col1": None, "col2": f"value{i}"} for i in range(10)] diff --git a/integration_tests/tests/test_event_freshness_anomalies.py b/integration_tests/tests/test_event_freshness_anomalies.py index c07f338e1..a52793cd9 100644 --- a/integration_tests/tests/test_event_freshness_anomalies.py +++ b/integration_tests/tests/test_event_freshness_anomalies.py @@ -1,6 +1,5 @@ from datetime import datetime, timedelta -import pytest from data_generator import DATE_FORMAT, generate_dates from dbt_project import DbtProject @@ -10,8 +9,6 @@ STEP = timedelta(hours=1) -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_event_freshness(test_id: str, dbt_project: DbtProject): data = [ { @@ -32,8 +29,6 @@ def test_anomalyless_event_freshness(test_id: str, dbt_project: DbtProject): assert result["status"] == "pass" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_stop_event_freshness(test_id: str, dbt_project: DbtProject): anomaly_date = datetime.now() - timedelta(days=2) data = [ @@ -55,8 +50,6 @@ def test_stop_event_freshness(test_id: str, dbt_project: DbtProject): assert result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_slower_rate_event_freshness(test_id: str, dbt_project: DbtProject): # To avoid races, set the "custom_started_at" to the beginning of the day test_started_at = datetime.utcnow().replace(hour=0, minute=0, second=0) @@ -90,8 +83,6 @@ def test_slower_rate_event_freshness(test_id: str, dbt_project: DbtProject): assert result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject): """ Test the exclude_detection_period_from_training flag functionality for event freshness anomalies. diff --git a/integration_tests/tests/test_exposure_schema_validity.py b/integration_tests/tests/test_exposure_schema_validity.py index 0a9d45251..79e504a58 100644 --- a/integration_tests/tests/test_exposure_schema_validity.py +++ b/integration_tests/tests/test_exposure_schema_validity.py @@ -65,8 +65,7 @@ def test_exposure_schema_validity_no_exposures(test_id: str, dbt_project: DbtPro assert test_result["status"] == "pass" -# Schema validity currently not supported on ClickHouse -@pytest.mark.skip_targets(["spark", "clickhouse"]) +@pytest.mark.skip_targets(["spark"]) def test_exposure_schema_validity_correct_columns_and_types( test_id: str, dbt_project: DbtProject ): @@ -138,8 +137,7 @@ def test_exposure_schema_validity_correct_columns_and_invalid_type( ) -# Schema validity currently not supported on ClickHouse -@pytest.mark.skip_targets(["spark", "clickhouse"]) +@pytest.mark.skip_targets(["spark"]) def test_exposure_schema_validity_invalid_type_name_present_in_error( test_id: str, dbt_project: DbtProject ): diff --git a/integration_tests/tests/test_failed_row_count.py b/integration_tests/tests/test_failed_row_count.py index 81943019b..27299fe27 100644 --- a/integration_tests/tests/test_failed_row_count.py +++ b/integration_tests/tests/test_failed_row_count.py @@ -1,11 +1,8 @@ -import pytest from dbt_project import DbtProject COLUMN_NAME = "some_column" -# Failed row count currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_count_failed_row_count(test_id: str, dbt_project: DbtProject): null_count = 50 data = [{COLUMN_NAME: None} for _ in range(null_count)] @@ -40,8 +37,6 @@ def test_sum_failed_row_count(test_id: str, dbt_project: DbtProject): ) # when the failed_row_count_calc is sum(), these should not be equal -# Failed row count currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_custom_failed_row_count(test_id: str, dbt_project: DbtProject): null_count = 50 overwrite_failed_row_count = 5 diff --git a/integration_tests/tests/test_freshness_anomalies.py b/integration_tests/tests/test_freshness_anomalies.py index 8a2c55db5..39cc6bd23 100644 --- a/integration_tests/tests/test_freshness_anomalies.py +++ b/integration_tests/tests/test_freshness_anomalies.py @@ -75,8 +75,6 @@ def _skip_redshift_monthly( if target == "redshift" and config.period == "month": pytest.skip("Redshift does not support monthly time buckets.") - # Anomalies currently not supported on ClickHouse - @pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_table( self, test_id: str, @@ -96,8 +94,6 @@ def test_anomalyless_table( ) assert result["status"] == "pass" - # Anomalies currently not supported on ClickHouse - @pytest.mark.skip_targets(["clickhouse"]) def test_stop( self, test_id: str, @@ -118,8 +114,6 @@ def test_stop( ) assert result["status"] == "fail" - # Anomalies currently not supported on ClickHouse - @pytest.mark.skip_targets(["clickhouse"]) def test_stop_with_delay( self, test_id: str, @@ -142,8 +136,6 @@ def test_stop_with_delay( ) assert result["status"] == "pass" - # Anomalies currently not supported on ClickHouse - @pytest.mark.skip_targets(["clickhouse"]) def test_slower_rate( self, test_id: str, @@ -171,8 +163,6 @@ def test_slower_rate( ) assert result["status"] == "fail" - # Anomalies currently not supported on ClickHouse - @pytest.mark.skip_targets(["clickhouse"]) def test_faster_rate( self, test_id: str, @@ -201,8 +191,6 @@ def test_faster_rate( assert result["status"] == "pass" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_first_metric_null(test_id, dbt_project: DbtProject): config = dict( timestamp_column=TIMESTAMP_COLUMN, @@ -235,7 +223,6 @@ def test_first_metric_null(test_id, dbt_project: DbtProject): assert result["status"] == "pass" -@pytest.mark.skip_targets(["clickhouse"]) def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject): """ Test exclude_detection_period_from_training flag for freshness anomalies. diff --git a/integration_tests/tests/test_override_samples_config.py b/integration_tests/tests/test_override_samples_config.py index 13f09169a..c5d2b70ec 100644 --- a/integration_tests/tests/test_override_samples_config.py +++ b/integration_tests/tests/test_override_samples_config.py @@ -1,6 +1,5 @@ import json -import pytest from dbt_project import DbtProject COLUMN_NAME = "some_data" @@ -20,7 +19,6 @@ """ -@pytest.mark.skip_targets(["clickhouse"]) def test_sample_count_unlimited(test_id: str, dbt_project: DbtProject): null_count = 20 data = [{COLUMN_NAME: None} for _ in range(null_count)] @@ -49,7 +47,6 @@ def test_sample_count_unlimited(test_id: str, dbt_project: DbtProject): assert sample[COLUMN_NAME] is None -@pytest.mark.skip_targets(["clickhouse"]) def test_sample_count_small(test_id: str, dbt_project: DbtProject): null_count = 20 data = [{COLUMN_NAME: None} for _ in range(null_count)] diff --git a/integration_tests/tests/test_sampling.py b/integration_tests/tests/test_sampling.py index 77391e664..660bc03c8 100644 --- a/integration_tests/tests/test_sampling.py +++ b/integration_tests/tests/test_sampling.py @@ -1,6 +1,5 @@ import json -import pytest from dbt_project import DbtProject COLUMN_NAME = "some_column" @@ -23,8 +22,6 @@ TEST_SAMPLE_ROW_COUNT = 7 -# Sampling currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_sampling(test_id: str, dbt_project: DbtProject): null_count = 50 data = [{COLUMN_NAME: None} for _ in range(null_count)] diff --git a/integration_tests/tests/test_sampling_pii.py b/integration_tests/tests/test_sampling_pii.py index 39fceaa7b..beaad2fda 100644 --- a/integration_tests/tests/test_sampling_pii.py +++ b/integration_tests/tests/test_sampling_pii.py @@ -1,6 +1,5 @@ import json -import pytest from dbt_project import DbtProject COLUMN_NAME = "some_column" @@ -23,7 +22,6 @@ TEST_SAMPLE_ROW_COUNT = 7 -@pytest.mark.skip_targets(["clickhouse"]) def test_sampling_pii_disabled(test_id: str, dbt_project: DbtProject): """Test that PII-tagged tables don't upload samples even when tests fail""" null_count = 50 @@ -52,7 +50,6 @@ def test_sampling_pii_disabled(test_id: str, dbt_project: DbtProject): assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_sampling_pii_disabled_with_default_config_and_casing( test_id: str, dbt_project: DbtProject ): @@ -81,7 +78,6 @@ def test_sampling_pii_disabled_with_default_config_and_casing( assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_sampling_pii_enabled_with_default_config( test_id: str, dbt_project: DbtProject ): @@ -110,7 +106,6 @@ def test_sampling_pii_enabled_with_default_config( assert len(samples) == TEST_SAMPLE_ROW_COUNT -@pytest.mark.skip_targets(["clickhouse"]) def test_sampling_non_pii_enabled(test_id: str, dbt_project: DbtProject): """Test that non-PII tables still collect samples normally""" null_count = 50 @@ -139,7 +134,6 @@ def test_sampling_non_pii_enabled(test_id: str, dbt_project: DbtProject): assert len(samples) == TEST_SAMPLE_ROW_COUNT -@pytest.mark.skip_targets(["clickhouse"]) def test_sampling_pii_feature_disabled(test_id: str, dbt_project: DbtProject): """Test that when PII feature is disabled, PII tables still collect samples""" null_count = 50 @@ -168,7 +162,6 @@ def test_sampling_pii_feature_disabled(test_id: str, dbt_project: DbtProject): assert len(samples) == TEST_SAMPLE_ROW_COUNT -@pytest.mark.skip_targets(["clickhouse"]) def test_sampling_disable_samples_overrides_pii(test_id: str, dbt_project: DbtProject): """Test that disable_test_samples flag overrides PII detection when both are present""" null_count = 50 @@ -199,7 +192,6 @@ def test_sampling_disable_samples_overrides_pii(test_id: str, dbt_project: DbtPr assert len(samples) == 0 -@pytest.mark.skip_targets(["clickhouse"]) def test_sampling_disable_samples_false_allows_samples( test_id: str, dbt_project: DbtProject ): diff --git a/integration_tests/tests/test_schema_changes.py b/integration_tests/tests/test_schema_changes.py index d246b5bf1..ca4eae114 100644 --- a/integration_tests/tests/test_schema_changes.py +++ b/integration_tests/tests/test_schema_changes.py @@ -22,7 +22,7 @@ ("name", "column_removed"), ] -STRING_JINJA = r"{{ 'STRING' if (target.type == 'bigquery' or target.type == 'databricks') else 'character varying' if (target.type == 'redshift' or target.type == 'dremio') else 'TEXT' }}" +STRING_JINJA = r"{{ 'STRING' if (target.type == 'bigquery' or target.type == 'databricks') else 'character varying' if (target.type == 'redshift' or target.type == 'dremio') else 'String' if target.type == 'clickhouse' else 'TEXT' }}" def assert_test_results(test_results: List[dict]): @@ -43,7 +43,7 @@ def assert_test_results(test_results: List[dict]): # Schema changes currently not supported on targets # dbt-fusion caches column information and doesn't refresh when tables are recreated -@pytest.mark.skip_targets(["databricks", "spark", "athena", "trino", "clickhouse"]) +@pytest.mark.skip_targets(["databricks", "spark", "athena", "trino"]) @pytest.mark.skip_for_dbt_fusion def test_schema_changes(test_id: str, dbt_project: DbtProject): dbt_test_name = "elementary.schema_changes" @@ -58,7 +58,7 @@ def test_schema_changes(test_id: str, dbt_project: DbtProject): # Schema changes currently not supported on targets -@pytest.mark.skip_targets(["databricks", "spark", "athena", "trino", "clickhouse"]) +@pytest.mark.skip_targets(["databricks", "spark", "athena", "trino"]) def test_schema_changes_from_baseline(test_id: str, dbt_project: DbtProject): dbt_test_name = "elementary.schema_changes_from_baseline" test_results = dbt_project.test( diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index 10015d038..c73c10d57 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -11,8 +11,6 @@ DBT_TEST_ARGS = {"timestamp_column": TIMESTAMP_COLUMN} -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_table_volume_anomalies(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data = [ @@ -23,8 +21,6 @@ def test_anomalyless_table_volume_anomalies(test_id: str, dbt_project: DbtProjec assert test_result["status"] == "pass" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_table_volume_anomalies_with_timestamp_as_sql_expression( test_id: str, dbt_project: DbtProject ): @@ -40,8 +36,6 @@ def test_table_volume_anomalies_with_timestamp_as_sql_expression( assert test_result["status"] == "pass" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_full_drop_table_volume_anomalies(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data = [ @@ -53,11 +47,9 @@ def test_full_drop_table_volume_anomalies(test_id: str, dbt_project: DbtProject) assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse @Parametrization.autodetect_parameters() @Parametrization.case(name="source", as_model=False) @Parametrization.case(name="model", as_model=True) -@pytest.mark.skip_targets(["clickhouse"]) def test_volume_anomalies_with_where_parameter( test_id: str, dbt_project: DbtProject, as_model: bool ): @@ -100,8 +92,6 @@ def test_volume_anomalies_with_where_parameter( assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_volume_anomalies_with_time_buckets(test_id: str, dbt_project: DbtProject): now = datetime.utcnow() - timedelta(hours=2) data = [ @@ -133,8 +123,6 @@ def test_volume_anomalies_with_time_buckets(test_id: str, dbt_project: DbtProjec assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_volume_anomalies_with_direction_spike(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data = [ @@ -151,8 +139,6 @@ def test_volume_anomalies_with_direction_spike(test_id: str, dbt_project: DbtPro assert test_result["status"] == "pass" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_volume_anomalies_with_direction_drop(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data = [ @@ -168,8 +154,6 @@ def test_volume_anomalies_with_direction_drop(test_id: str, dbt_project: DbtProj assert test_result["status"] == "pass" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_volume_anomalies_with_seasonality(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() dates = generate_dates( @@ -190,8 +174,6 @@ def test_volume_anomalies_with_seasonality(test_id: str, dbt_project: DbtProject assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_volume_anomalies_with_sensitivity(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() data = [ @@ -209,8 +191,6 @@ def test_volume_anomalies_with_sensitivity(test_id: str, dbt_project: DbtProject assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_volume_anomalies_no_timestamp(test_id: str, dbt_project: DbtProject): data = [{"hello": "world"}] min_training_set_size = 4 @@ -230,7 +210,6 @@ def test_volume_anomalies_no_timestamp(test_id: str, dbt_project: DbtProject): assert test_result["status"] == "fail" -# Anomalies currently not supported on ClickHouse @pytest.mark.only_on_targets(["bigquery"]) def test_wildcard_name_table_volume_anomalies(test_id: str, dbt_project: DbtProject): utc_today = datetime.utcnow().date() @@ -269,8 +248,6 @@ def test_wildcard_name_table_volume_anomalies(test_id: str, dbt_project: DbtProj drop_failure_percent_threshold=5, metric_value=29, ) -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_volume_anomaly_static_data_drop( test_id: str, dbt_project: DbtProject, @@ -322,8 +299,6 @@ def test_volume_anomaly_static_data_drop( spike_failure_percent_threshold=5, metric_value=31, ) -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_volume_anomaly_static_data_spike( test_id: str, dbt_project: DbtProject, @@ -356,8 +331,6 @@ def test_volume_anomaly_static_data_spike( assert test_result["status"] == expected_result -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_not_fail_on_zero(test_id: str, dbt_project: DbtProject): now = datetime.utcnow() data = [ @@ -371,8 +344,6 @@ def test_not_fail_on_zero(test_id: str, dbt_project: DbtProject): assert test_result["status"] == "pass" -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_fail_on_zero(test_id: str, dbt_project: DbtProject): now = datetime.utcnow() data = [ @@ -399,8 +370,6 @@ def test_fail_on_zero(test_id: str, dbt_project: DbtProject): fail_value=4 * 24, pass_value=24, ) -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_table_volume_anomalies_periods_params( test_id: str, dbt_project: DbtProject, period: str, fail_value: int, pass_value: int ): @@ -476,8 +445,6 @@ def test_anomalyless_table_volume_anomalies_periods_params( anomaly_sensitivity=3000, metric_value=44, ) -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_ignore_small_changes_both( test_id: str, dbt_project: DbtProject, @@ -513,8 +480,6 @@ def test_ignore_small_changes_both( assert test_result["status"] == expected_result -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) def test_anomalyless_vol_anomalies_with_test_materialization( test_id: str, dbt_project: DbtProject ): @@ -541,7 +506,6 @@ def test_anomalyless_vol_anomalies_with_test_materialization( # 1. Detection period contains anomalous data that would normally be included in training # 2. With exclude_detection_period_from_training=False: anomaly is missed (test passes) because training includes the anomaly # 3. With exclude_detection_period_from_training=True: anomaly is detected (test fails) because training excludes the anomaly -@pytest.mark.skip_targets(["clickhouse"]) def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject): """ Test the exclude_detection_period_from_training flag functionality. diff --git a/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql b/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql index ad0c631fd..95309b0c7 100644 --- a/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql +++ b/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql @@ -31,7 +31,7 @@ {%- endmacro %} {% macro clickhouse__standard_deviation(column_name) -%} - stddevPop(cast({{ column_name }} as {{ elementary.edr_type_float() }})) + stddevPop(cast({{ column_name }} as Nullable({{ elementary.edr_type_float() }}))) {%- endmacro %} {% macro dremio__standard_deviation(column_name) -%} @@ -50,7 +50,7 @@ {%- endmacro %} {% macro clickhouse__variance(column_name) -%} - varSamp(cast({{ column_name }} as {{ elementary.edr_type_float() }})) + varSamp(cast({{ column_name }} as Nullable({{ elementary.edr_type_float() }}))) {%- endmacro %} {% macro sum(column_name) -%} diff --git a/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql index 23ad73a55..adae1e651 100644 --- a/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql @@ -287,6 +287,10 @@ {% endmacro %} {% macro event_freshness_metric_query(metric, metric_properties) %} + {{ return(adapter.dispatch('event_freshness_metric_query', 'elementary')(metric, metric_properties)) }} +{% endmacro %} + +{% macro default__event_freshness_metric_query(metric, metric_properties) %} select edr_bucket_start, edr_bucket_end, @@ -301,6 +305,24 @@ group by 1,2 {% endmacro %} +{% macro clickhouse__event_freshness_metric_query(metric, metric_properties) %} + select + edr_bucket_start, + edr_bucket_end, + {{ elementary.const_as_string(metric.name) }} as metric_name, + {{ elementary.const_as_string("event_freshness") }} as metric_type, + cast(max(monitored_table_event_timestamp_column) as Nullable({{ elementary.edr_type_string() }})) as source_value, + {# In ClickHouse, edr_cast_as_timestamp coalesces NULL to epoch and edr_datediff coalesces NULL to 0, + so we must explicitly check for NULL before computing timediff to preserve the coalesce fallback logic #} + case + when max(monitored_table_event_timestamp_column) is not null + then max(dateDiff('second', monitored_table_event_timestamp_column, monitored_table_timestamp_column)) + else {{ elementary.timediff('second', 'edr_bucket_start', 'edr_bucket_end') }} + end as metric_value + from buckets left join time_filtered_monitored_table on (edr_bucket_start = start_bucket_in_data) + group by 1,2 +{% endmacro %} + {% macro get_no_timestamp_event_freshness_query(metric, metric_properties) %} {{ return(adapter.dispatch('get_no_timestamp_event_freshness_query', 'elementary')(metric, metric_properties)) }} {% endmacro %} diff --git a/macros/edr/system/system_utils/buckets_cte.sql b/macros/edr/system/system_utils/buckets_cte.sql index ef1dcea99..528660bce 100644 --- a/macros/edr/system/system_utils/buckets_cte.sql +++ b/macros/edr/system/system_utils/buckets_cte.sql @@ -13,13 +13,13 @@ {% macro clickhouse__complete_buckets_cte(time_bucket, bucket_end_expr, min_bucket_start_expr, max_bucket_end_expr) %} with numbers as ( - select arrayJoin(range(0, toUInt32({{ elementary.edr_datediff(min_bucket_start_expr, max_bucket_end_expr, time_bucket.period) }}))) as n + select arrayJoin(range(0, toUInt32({{ elementary.edr_datediff(min_bucket_start_expr, max_bucket_end_expr, time_bucket.period) }} / {{ time_bucket.count }}) + 1)) as n ) select - {{ elementary.edr_timeadd(time_bucket.period, 'n', min_bucket_start_expr) }} as edr_bucket_start, - {{ elementary.edr_timeadd(time_bucket.period, 'n + 1', min_bucket_start_expr) }} as edr_bucket_end + {{ elementary.edr_timeadd(time_bucket.period, 'n * ' ~ time_bucket.count, min_bucket_start_expr) }} as edr_bucket_start, + {{ elementary.edr_timeadd(time_bucket.period, '(n + 1) * ' ~ time_bucket.count, min_bucket_start_expr) }} as edr_bucket_end from numbers - where {{ elementary.edr_timeadd(time_bucket.period, 'n + 1', min_bucket_start_expr) }} <= {{ max_bucket_end_expr }} + where {{ elementary.edr_timeadd(time_bucket.period, '(n + 1) * ' ~ time_bucket.count, min_bucket_start_expr) }} <= {{ max_bucket_end_expr }} {% endmacro %} {% macro spark__complete_buckets_cte(time_bucket, bucket_end_expr, min_bucket_start_expr, max_bucket_end_expr) %} diff --git a/macros/edr/system/system_utils/full_names.sql b/macros/edr/system/system_utils/full_names.sql index a50f74474..8a61805d3 100644 --- a/macros/edr/system/system_utils/full_names.sql +++ b/macros/edr/system/system_utils/full_names.sql @@ -1,18 +1,46 @@ {% macro full_table_name(alias) -%} + {{ adapter.dispatch('full_table_name', 'elementary')(alias) }} +{%- endmacro %} + +{% macro default__full_table_name(alias) -%} {% if alias is defined %}{%- set alias_dot = alias ~ '.' %}{% endif %} upper({{ alias_dot }}database_name || '.' || {{ alias_dot }}schema_name || '.' || {{ alias_dot }}table_name) {%- endmacro %} +{% macro clickhouse__full_table_name(alias) -%} + {# ClickHouse uses database=schema, so use 2-part names (schema.table) #} + {% if alias is defined %}{%- set alias_dot = alias ~ '.' %}{% endif %} + upper({{ alias_dot }}schema_name || '.' || {{ alias_dot }}table_name) +{%- endmacro %} + {% macro full_schema_name() -%} + {{ adapter.dispatch('full_schema_name', 'elementary')() }} +{%- endmacro %} + +{% macro default__full_schema_name() -%} upper(database_name || '.' || schema_name) {%- endmacro %} +{% macro clickhouse__full_schema_name() -%} + {# ClickHouse uses database=schema, so schema_name alone is the full schema name #} + upper(schema_name) +{%- endmacro %} + {% macro full_column_name() -%} + {{ adapter.dispatch('full_column_name', 'elementary')() }} +{%- endmacro %} + +{% macro default__full_column_name() -%} upper(database_name || '.' || schema_name || '.' || table_name || '.' || column_name) {%- endmacro %} +{% macro clickhouse__full_column_name() -%} + {# ClickHouse uses database=schema, so use schema.table.column #} + upper(schema_name || '.' || table_name || '.' || column_name) +{%- endmacro %} + {% macro full_name_split(part_name) %} {{ adapter.dispatch('full_name_split','elementary')(part_name) }} @@ -34,16 +62,17 @@ {% macro clickhouse__full_name_split(part_name) %} + {# ClickHouse full_table_name is 2-part: schema.table #} {%- if part_name == 'database_name' -%} - {%- set part_index = 1 -%} + {# database = schema in ClickHouse #} + trim(BOTH '"' FROM splitByChar('.', full_table_name)[1]) AS {{ part_name }} {%- elif part_name == 'schema_name' -%} - {%- set part_index = 2 -%} + trim(BOTH '"' FROM splitByChar('.', full_table_name)[1]) AS {{ part_name }} {%- elif part_name == 'table_name' -%} - {%- set part_index = 3 -%} + trim(BOTH '"' FROM splitByChar('.', full_table_name)[2]) AS {{ part_name }} {%- else -%} {{ return('') }} {%- endif -%} - trim(BOTH '"' FROM splitByChar('.', full_table_name)[{{ part_index }}]) AS {{ part_name }} {% endmacro %} @@ -131,6 +160,10 @@ {% macro relation_to_full_name(relation) %} + {{ return(adapter.dispatch('relation_to_full_name', 'elementary')(relation)) }} +{% endmacro %} + +{% macro default__relation_to_full_name(relation) %} {%- if relation.is_cte %} {# Ephemeral models don't have db and schema #} {%- set full_table_name = relation.identifier | upper %} @@ -143,8 +176,22 @@ {{ return(full_table_name) }} {% endmacro %} +{% macro clickhouse__relation_to_full_name(relation) %} + {# ClickHouse uses database=schema, so always use 2-part names (schema.table) #} + {%- if relation.is_cte %} + {%- set full_table_name = relation.identifier | upper %} + {%- else %} + {%- set full_table_name = relation.schema | upper ~'.'~ relation.identifier | upper %} + {%- endif %} + {{ return(full_table_name) }} +{% endmacro %} + {% macro model_node_to_full_name(model_node) %} + {{ return(adapter.dispatch('model_node_to_full_name', 'elementary')(model_node)) }} +{% endmacro %} + +{% macro default__model_node_to_full_name(model_node) %} {% set identifier = model_node.identifier or model_node.alias %} {%- if model_node.database %} {%- set full_table_name = model_node.database | upper ~'.'~ model_node.schema | upper ~'.'~ identifier | upper %} @@ -155,8 +202,19 @@ {{ return(full_table_name) }} {% endmacro %} +{% macro clickhouse__model_node_to_full_name(model_node) %} + {# ClickHouse uses database=schema, so always use 2-part names (schema.table) #} + {% set identifier = model_node.identifier or model_node.alias %} + {%- set full_table_name = model_node.schema | upper ~'.'~ identifier | upper %} + {{ return(full_table_name) }} +{% endmacro %} + {% macro configured_schemas_from_graph_as_tuple() %} + {{ return(adapter.dispatch('configured_schemas_from_graph_as_tuple', 'elementary')()) }} +{% endmacro %} + +{% macro default__configured_schemas_from_graph_as_tuple() %} {%- set configured_schema_tuples = elementary.get_configured_schemas_from_graph() %} {%- set schemas_list = [] %} @@ -171,3 +229,19 @@ {{ return(schemas_tuple) }} {% endmacro %} + +{% macro clickhouse__configured_schemas_from_graph_as_tuple() %} + {# ClickHouse uses database=schema, so use just schema_name #} + {%- set configured_schema_tuples = elementary.get_configured_schemas_from_graph() %} + {%- set schemas_list = [] %} + + {%- for configured_schema_tuple in configured_schema_tuples %} + {%- set database_name, schema_name = configured_schema_tuple %} + {%- set full_schema_name = schema_name | upper %} + {%- do schemas_list.append(full_schema_name) -%} + {%- endfor %} + + {% set schemas_tuple = elementary.strings_list_to_tuple(schemas_list) %} + {{ return(schemas_tuple) }} + +{% endmacro %} diff --git a/macros/utils/cross_db_utils/day_of_week.sql b/macros/utils/cross_db_utils/day_of_week.sql index da5f09226..57ab25e39 100644 --- a/macros/utils/cross_db_utils/day_of_week.sql +++ b/macros/utils/cross_db_utils/day_of_week.sql @@ -48,3 +48,7 @@ {% macro dremio__edr_day_of_week_expression(date_expr) %} TO_CHAR({{ date_expr }}, 'DAY') {% endmacro %} + +{% macro clickhouse__edr_day_of_week_expression(date_expr) %} + formatDateTime({{ date_expr }}, '%W') +{% endmacro %} diff --git a/macros/utils/cross_db_utils/hour_of_week.sql b/macros/utils/cross_db_utils/hour_of_week.sql index 36c8255e7..8733b98a8 100644 --- a/macros/utils/cross_db_utils/hour_of_week.sql +++ b/macros/utils/cross_db_utils/hour_of_week.sql @@ -39,6 +39,10 @@ date_format({{ date_expr }}, '%W%H') {% endmacro %} +{% macro clickhouse__edr_hour_of_week_expression(date_expr) %} + concat(formatDateTime({{ date_expr }}, '%W'), formatDateTime({{ date_expr }}, '%H')) +{% endmacro %} + {% macro duckdb__edr_hour_of_week_expression(date_expr) %} concat(cast(dayname({{ date_expr }}) as {{ elementary.edr_type_string() }}), cast(EXTRACT(hour from {{ date_expr }}) as {{ elementary.edr_type_string() }})) {% endmacro %} diff --git a/macros/utils/sql_utils/list_concat_with_separator.sql b/macros/utils/sql_utils/list_concat_with_separator.sql index 255482ec7..13f4719c8 100644 --- a/macros/utils/sql_utils/list_concat_with_separator.sql +++ b/macros/utils/sql_utils/list_concat_with_separator.sql @@ -1,4 +1,8 @@ {% macro list_concat_with_separator(item_list, separator, handle_nulls = true) %} + {{ return(adapter.dispatch('list_concat_with_separator', 'elementary')(item_list, separator, handle_nulls)) }} +{% endmacro %} + +{% macro default__list_concat_with_separator(item_list, separator, handle_nulls = true) %} {% set new_list = [] %} {% for item in item_list %} {% set new_item = elementary.edr_quote(item) %} @@ -11,4 +15,21 @@ {% endif %} {% endfor %} {{ return(elementary.join_list(new_list, " || ")) }} -{% endmacro %} \ No newline at end of file +{% endmacro %} + +{% macro clickhouse__list_concat_with_separator(item_list, separator, handle_nulls = true) %} + {% set new_list = [] %} + {% for item in item_list %} + {% set new_item = elementary.edr_quote(item) %} + {% if handle_nulls %} + {# In ClickHouse, CAST(NULL, 'String') fails because String is non-Nullable. + Check for NULL before casting to avoid the error. #} + {% set new_item = "case when " ~ item ~ " is null then 'NULL' else " ~ elementary.edr_cast_as_string(item) ~ " end" %} + {% endif %} + {% do new_list.append(new_item) %} + {% if not loop.last %} + {% do new_list.append(elementary.edr_quote(separator)) %} + {% endif %} + {% endfor %} + {{ return(elementary.join_list(new_list, " || ")) }} +{% endmacro %} diff --git a/macros/utils/table_operations/has_temp_table_support.sql b/macros/utils/table_operations/has_temp_table_support.sql index 940ceddf9..ca7b11021 100644 --- a/macros/utils/table_operations/has_temp_table_support.sql +++ b/macros/utils/table_operations/has_temp_table_support.sql @@ -23,6 +23,12 @@ {% endmacro %} {% macro clickhouse__has_temp_table_support() %} + {# ClickHouse CREATE TEMPORARY TABLE is session-scoped (Memory engine only, + no database qualification). The dbt-clickhouse adapter does not guarantee + session persistence across execute() calls, so a temp table created in one + statement may not be visible in the next. Elementary's intermediate + relations need cross-statement visibility and MergeTree engine, so we fall + back to regular tables with cleanup instead. #} {% do return(false) %} {% endmacro %}