Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
dd75799
chore: remove all skip_targets(["clickhouse"]) markers from test files
devin-ai-integration[bot] Feb 25, 2026
0a2dd80
ci: temporarily limit CI matrix to clickhouse-only for iteration
devin-ai-integration[bot] Feb 25, 2026
41701e9
fix: use NOT IN instead of LEFT JOIN IS NULL for ClickHouse compatibi…
devin-ai-integration[bot] Feb 25, 2026
1d2fe6c
fix: ClickHouse Nullable(Float32) cast + HTTP API seed null fix
devin-ai-integration[bot] Feb 25, 2026
d43adc6
fix: address CodeRabbit review + revert NOT IN back to LEFT JOIN
devin-ai-integration[bot] Feb 25, 2026
e36e33e
fix: address CodeRabbit review round 2 - env vars, timeout, SQL injec…
devin-ai-integration[bot] Feb 25, 2026
ca0db37
fix: ClickHouse full_names adapter.dispatch, seasonality macros, even…
devin-ai-integration[bot] Feb 25, 2026
749d582
fix: ClickHouse event_freshness timediff NULL handling + list_concat …
devin-ai-integration[bot] Feb 25, 2026
4cd2e2a
fix: dynamically resolve ClickHouse schema from dbt profiles.yml inst…
devin-ai-integration[bot] Feb 25, 2026
956c061
ci: restore full CI matrix with all warehouse types
devin-ai-integration[bot] Feb 25, 2026
ad08596
refactor: extract ClickHouse seed repair utils + dispatch empty-strin…
devin-ai-integration[bot] Feb 25, 2026
92d0e89
refactor: remove unused clickhouse__ dispatch from replace_empty_stri…
devin-ai-integration[bot] Feb 25, 2026
48ea275
ci: retrigger CI to verify flaky test_seed_group_attribute failure
devin-ai-integration[bot] Feb 25, 2026
81219e3
Merge origin/master into core-397-clickhouse-support
devin-ai-integration[bot] Feb 27, 2026
8893238
Merge origin/master into core-397-clickhouse-support
devin-ai-integration[bot] Feb 28, 2026
1915c34
refactor: replace clickhouse_utils.py with ClickHouseDirectSeeder
devin-ai-integration[bot] Mar 1, 2026
5b38158
fix: add type inference to ClickHouseDirectSeeder
devin-ai-integration[bot] Mar 1, 2026
4d6e7c2
fix: treat booleans as strings in ClickHouseDirectSeeder
devin-ai-integration[bot] Mar 1, 2026
601db95
fix: use Nullable(Bool) for boolean columns in ClickHouseDirectSeeder
devin-ai-integration[bot] Mar 1, 2026
207d9bd
fix: write CSV for dbt node discovery in ClickHouseDirectSeeder
devin-ai-integration[bot] Mar 1, 2026
335fbf3
refactor: remove run_operation retry logic from run_query path
devin-ai-integration[bot] Mar 1, 2026
6f886e7
Merge origin/master into core-397-clickhouse-support
devin-ai-integration[bot] Mar 1, 2026
8823952
docs: add comment explaining why clickhouse__has_temp_table_support r…
devin-ai-integration[bot] Mar 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,36 @@
{% set relation = ref(table_name) %}
{% set columns = adapter.get_columns_in_relation(relation) %}

{% for col in columns %}
{% set data_type = elementary.get_column_data_type(col) %}
{% set normalized_data_type = elementary.normalize_data_type(data_type) %}

{% if normalized_data_type == "string" %}
{% set update_query %}
update {{ relation }}
set {{ col["name"] }} = NULL
where {{ col["name"] }} = ''
{% endset %}
{% do elementary.run_query(update_query) %}
{% endif %}
{% endfor %}
{% if target.type == "clickhouse" %}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We normally solve this by using a macro with a "default__" and "clickhouse__" implementation, could we do the same here?

{# On ClickHouse, columns are non-Nullable by default so NULLs in CSV seeds become
empty strings. We first ALTER each string column to Nullable(String), then use
ALTER TABLE UPDATE to convert empty strings to NULLs.
We use statement blocks for DDL since dbt.run_query may not handle DDL on ClickHouse. #}
{% for col in columns %}
{% set data_type = elementary.get_column_data_type(col) %}
{% set normalized_data_type = elementary.normalize_data_type(data_type) %}
{% if normalized_data_type == "string" %}
{% call statement('alter_nullable_' ~ col['name'], fetch_result=False) %}
alter table {{ relation }} modify column `{{ col['name'] }}` Nullable(String)
{% endcall %}
{% call statement('update_nulls_' ~ col['name'], fetch_result=False) %}
alter table {{ relation }} update `{{ col['name'] }}` = NULL where `{{ col['name'] }}` = '' settings mutations_sync = 1
{% endcall %}
{% endif %}
{% endfor %}
{% else %}
{% for col in columns %}
{% set data_type = elementary.get_column_data_type(col) %}
{% set normalized_data_type = elementary.normalize_data_type(data_type) %}

{% if normalized_data_type == "string" %}
{% set update_query %}
update {{ relation }}
set {{ col["name"] }} = NULL
where {{ col["name"] }} = ''
{% endset %}
{% do elementary.run_query(update_query) %}
{% endif %}
{% endfor %}
{% endif %}
{% endmacro %}
1 change: 1 addition & 0 deletions integration_tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ services:
- "9000:9000"
volumes:
- clickhouse:/var/lib/clickhouse
- ./docker/clickhouse/users.xml:/etc/clickhouse-server/users.d/elementary.xml
environment:
CLICKHOUSE_DB: default
CLICKHOUSE_USER: default
Expand Down
8 changes: 8 additions & 0 deletions integration_tests/docker/clickhouse/users.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<clickhouse>
<profiles>
<default>
Comment thread
haritamar marked this conversation as resolved.
<join_use_nulls>1</join_use_nulls>
Comment thread
haritamar marked this conversation as resolved.
<mutations_sync>1</mutations_sync>
</default>
</profiles>
</clickhouse>
153 changes: 150 additions & 3 deletions integration_tests/tests/dbt_project.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json
import os
import re
import urllib.parse
import urllib.request
from contextlib import contextmanager, nullcontext
from pathlib import Path
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -260,16 +263,159 @@ def seed(self, data: List[dict], table_name: str):
with DbtDataSeeder(
self.dbt_runner, self.project_dir_path, self.seeds_dir_path
).seed(data, table_name):
self._fix_seed_if_needed(table_name)
self._fix_seed_if_needed(table_name, data)

def _fix_seed_if_needed(self, table_name: str):
def _fix_seed_if_needed(self, table_name: str, data: Optional[List[dict]] = None):
# Hack for BigQuery - seems like we get empty strings instead of nulls in seeds, so we
# fix them here
# fix them here.
if self.runner_method == RunnerMethod.FUSION and self.target == "bigquery":
self.dbt_runner.run_operation(
"elementary_tests.replace_empty_strings_with_nulls",
macro_args={"table_name": table_name},
)
# On ClickHouse, columns are non-Nullable by default, so NULL values in CSVs become
# default values (0 for Int, '' for String, etc.). We fix this by altering columns to
# Nullable and updating default values back to NULLs directly via the ClickHouse HTTP
# API, since dbt's run_query/statement don't reliably execute DDL on ClickHouse.
elif self.target == "clickhouse" and data:
self._fix_clickhouse_seed_nulls(table_name, data)

def _get_clickhouse_schema(self) -> str:
"""Get the ClickHouse database (schema) name from dbt profiles.yml.

In ClickHouse, database and schema are the same concept. The schema
name comes from the dbt profile's 'schema' property, with the
SCHEMA_NAME_SUFFIX appended for parallel test workers.
"""
profiles_path = Path.home() / ".dbt" / "profiles.yml"
yaml = YAML()
with open(profiles_path) as f:
profiles = yaml.load(f)
# Navigate to the clickhouse target schema
base_schema = (
profiles.get("elementary_tests", {})
.get("outputs", {})
.get("clickhouse", {})
.get("schema", "default")
)
return f"{base_schema}{SCHEMA_NAME_SUFFIX}"

def _fix_clickhouse_seed_nulls(self, table_name: str, data: List[dict]):
"""Fix ClickHouse seed tables where NULL values became default values.

ClickHouse columns are non-Nullable by default, so NULL values in CSV seeds
become default values (0 for Int, '' for String, etc.). This method:
1. Determines which columns had NULL values in the original data
2. ALTERs those columns to Nullable types
3. Rebuilds the table via INSERT SELECT with nullIf() to restore NULLs

Uses the ClickHouse HTTP API directly because dbt's run_query/statement
don't reliably execute DDL on ClickHouse.
"""
Comment thread
haritamar marked this conversation as resolved.
Outdated
# Find columns that contain at least one NULL in the original data
nullable_columns: set = set()
for row in data:
for col_name, value in row.items():
if value is None:
nullable_columns.add(col_name)
if not nullable_columns:
return

schema = self._get_clickhouse_schema()
ch_host = os.environ.get("CLICKHOUSE_HOST", "localhost")
ch_port = os.environ.get("CLICKHOUSE_PORT", "8123")
ch_user = os.environ.get("CLICKHOUSE_USER", "default")
ch_password = os.environ.get("CLICKHOUSE_PASSWORD", "default")
ch_url = f"http://{ch_host}:{ch_port}"

def ch_query(query: str) -> str:
Comment thread
haritamar marked this conversation as resolved.
Outdated
encoded = query.encode("utf-8")
query_string = urllib.parse.urlencode(
{"user": ch_user, "password": ch_password, "mutations_sync": 1}
)
req = urllib.request.Request(
f"{ch_url}/?{query_string}",
data=encoded,
)
with urllib.request.urlopen(req, timeout=60) as resp: # noqa: S310
return resp.read().decode("utf-8")

# Get all columns and their types
# Validate identifiers to prevent SQL injection
if not re.fullmatch(r"[A-Za-z0-9_]+", schema):
raise ValueError(f"Invalid schema name: {schema!r}")
if not re.fullmatch(r"[A-Za-z0-9_]+", table_name):
raise ValueError(f"Invalid table name: {table_name!r}")

cols_result = ch_query(
f"SELECT name, type FROM system.columns "
f"WHERE database = '{schema}' AND table = '{table_name}'"
).strip()
if not cols_result:
logger.warning(
"ClickHouse fix: no columns found for %s.%s – "
"schema may be wrong (using '%s'). NULLs will not be repaired.",
schema,
table_name,
schema,
)
return

columns = []
for line in cols_result.split("\n"):
parts = line.strip().split("\t")
if len(parts) == 2:
columns.append((parts[0], parts[1]))

# Build SELECT expressions: use nullIf() for nullable columns
select_exprs = []
for col_name, col_type in columns:
if col_name in nullable_columns:
# Strip Nullable(...) wrapper from a prior run to avoid
# Nullable(Nullable(...)) nesting
base_type = col_type
if base_type.startswith("Nullable(") and base_type.endswith(")"):
base_type = base_type[len("Nullable(") : -1]
# Get the default value for this type to use with nullIf
if (
base_type == "String"
or base_type.startswith("FixedString")
or base_type.startswith("LowCardinality")
):
default_val = "''"
elif base_type.startswith("Int") or base_type.startswith("UInt"):
default_val = "0"
elif base_type.startswith("Float"):
default_val = "0"
else:
default_val = "defaultValueOfTypeName('" + base_type + "')"
select_exprs.append(
f"nullIf(`{col_name}`, {default_val})::Nullable({base_type}) as `{col_name}`"
)
else:
select_exprs.append(f"`{col_name}`")

# Rebuild the table: CREATE temp AS SELECT with nullIf, EXCHANGE, DROP
tmp_name = f"{table_name}_tmp_nullable"
select_sql = ", ".join(select_exprs)

logger.info(
"ClickHouse fix: rebuilding %s.%s with Nullable columns: %s",
schema,
table_name,
nullable_columns,
)

ch_query(f"DROP TABLE IF EXISTS {schema}.{tmp_name}")
try:
ch_query(
f"CREATE TABLE {schema}.{tmp_name} "
f"ENGINE = MergeTree() ORDER BY tuple() "
f"AS SELECT {select_sql} FROM {schema}.{table_name}"
)
ch_query(f"EXCHANGE TABLES {schema}.{table_name} AND {schema}.{tmp_name}")
finally:
ch_query(f"DROP TABLE IF EXISTS {schema}.{tmp_name}")

@contextmanager
def seed_context(
Expand All @@ -278,6 +424,7 @@ def seed_context(
with DbtDataSeeder(
self.dbt_runner, self.project_dir_path, self.seeds_dir_path
).seed(data, table_name):
self._fix_seed_if_needed(table_name, data)
yield

@contextmanager
Expand Down
10 changes: 0 additions & 10 deletions integration_tests/tests/test_all_columns_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
}


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_anomalyless_all_columns_anomalies(test_id: str, dbt_project: DbtProject):
utc_today = datetime.utcnow().date()
data: List[Dict[str, Any]] = [
Expand All @@ -31,8 +29,6 @@ def test_anomalyless_all_columns_anomalies(test_id: str, dbt_project: DbtProject
assert all([res["status"] == "pass" for res in test_results])


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_anomalous_all_columns_anomalies(test_id: str, dbt_project: DbtProject):
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))
Expand All @@ -57,8 +53,6 @@ def test_anomalous_all_columns_anomalies(test_id: str, dbt_project: DbtProject):
assert col_to_status == {"superhero": "fail", TIMESTAMP_COLUMN: "pass"}


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_all_columns_anomalies_with_where_parameter(
test_id: str, dbt_project: DbtProject
):
Expand Down Expand Up @@ -128,8 +122,6 @@ def test_all_columns_anomalies_with_where_parameter(
}


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_anomalyless_all_columns_anomalies_all_monitors_sanity(
test_id: str, dbt_project: DbtProject
):
Expand All @@ -155,8 +147,6 @@ def test_anomalyless_all_columns_anomalies_all_monitors_sanity(
assert all([res["status"] == "pass" for res in test_results])


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
@pytest.mark.parametrize(
"exclude_detection,expected_status",
[
Expand Down
15 changes: 0 additions & 15 deletions integration_tests/tests/test_anomalies_backfill_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import datetime, time, timedelta

import dateutil.parser
import pytest
from data_generator import DATE_FORMAT, generate_dates
from dbt_project import DbtProject

Expand Down Expand Up @@ -74,8 +73,6 @@ def get_latest_anomaly_test_metrics(dbt_project: DbtProject, test_id: str):
}


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_full_backfill_for_non_incremental_model(dbt_project: DbtProject, test_id: str):
utc_today = datetime.utcnow().date()
data_dates = generate_dates(base_date=utc_today - timedelta(1))
Expand Down Expand Up @@ -109,8 +106,6 @@ def test_full_backfill_for_non_incremental_model(dbt_project: DbtProject, test_i
}


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_partial_backfill_for_incremental_models(dbt_project: DbtProject, test_id: str):
utc_today = datetime.utcnow().date()
data_dates = generate_dates(base_date=utc_today - timedelta(1))
Expand Down Expand Up @@ -157,8 +152,6 @@ def test_partial_backfill_for_incremental_models(dbt_project: DbtProject, test_i
}


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_longer_backfill_in_case_of_a_gap(dbt_project: DbtProject, test_id: str):
date_gap_size = 5
utc_today = datetime.utcnow().date()
Expand Down Expand Up @@ -211,8 +204,6 @@ def test_longer_backfill_in_case_of_a_gap(dbt_project: DbtProject, test_id: str)
}


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_full_backfill_if_metric_not_updated_for_a_long_time(
dbt_project: DbtProject, test_id: str
):
Expand Down Expand Up @@ -272,8 +263,6 @@ def test_full_backfill_if_metric_not_updated_for_a_long_time(
}


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_backfill_when_metric_doesnt_exist_back_enough(
dbt_project: DbtProject, test_id: str
):
Expand Down Expand Up @@ -318,8 +307,6 @@ def test_backfill_when_metric_doesnt_exist_back_enough(
}


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_backfill_with_middle_buckets_gap(dbt_project: DbtProject, test_id: str):
utc_today = datetime.utcnow().date()
data_start = utc_today - timedelta(21)
Expand Down Expand Up @@ -388,8 +375,6 @@ def test_backfill_with_middle_buckets_gap(dbt_project: DbtProject, test_id: str)
}


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_bucket_size_not_aligned_with_days(dbt_project: DbtProject, test_id: str):
"""
In this test we choose a bucket size that is not aligned with one day - specifically 7 hours.
Expand Down
5 changes: 0 additions & 5 deletions integration_tests/tests/test_anomalies_ranges.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import datetime, timedelta
from typing import Any, Dict, List

import pytest
from data_generator import DATE_FORMAT, generate_dates
from dbt_project import DbtProject

Expand Down Expand Up @@ -34,8 +33,6 @@ def get_latest_anomaly_test_points(dbt_project: DbtProject, test_id: str):
return [json.loads(result["result_row"]) for result in results]


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_anomaly_ranges_are_valid(test_id: str, dbt_project: DbtProject):
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))
Expand Down Expand Up @@ -69,8 +66,6 @@ def test_anomaly_ranges_are_valid(test_id: str, dbt_project: DbtProject):
assert all([row["min_value"] == row["max_value"] for row in anomaly_test_points])


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_anomaly_ranges_are_valid_with_seasonality(
test_id: str, dbt_project: DbtProject
):
Expand Down
Loading