Skip to content

Commit 8045ade

Browse files
feat: add full ClickHouse support - remove all skip_targets markers (CORE-397) (#934)
* chore: remove all skip_targets(["clickhouse"]) markers from test files Remove ClickHouse from skip_targets in all integration test files to enable full ClickHouse support testing. For multi-target skip lists (e.g. in test_schema_changes.py and test_exposure_schema_validity.py), only 'clickhouse' was removed while keeping other targets. Also remove now-unused 'import pytest' statements in files where pytest was only imported for the skip_targets decorator. CORE-397 Co-Authored-By: unknown <> * ci: temporarily limit CI matrix to clickhouse-only for iteration Reduce the warehouse-type matrix to only [clickhouse] to enable fast iteration on ClickHouse test fixes. Will be restored to full matrix once all ClickHouse tests pass. CORE-397 Co-Authored-By: unknown <> * fix: use NOT IN instead of LEFT JOIN IS NULL for ClickHouse compatibility ClickHouse LEFT OUTER JOIN produces default values (e.g. 1970-01-01 for DateTime) instead of NULL for unmatched rows, causing the anti-join pattern to fail. Changed missing_bucket_starts CTE to use NOT IN which works correctly on all databases. Co-Authored-By: unknown <> * fix: ClickHouse Nullable(Float32) cast + HTTP API seed null fix - Use Nullable(Float32) in clickhouse__standard_deviation and clickhouse__variance to handle CASE expressions that return NULL - Add _fix_clickhouse_seed_nulls() to rebuild seed tables with proper Nullable types using ClickHouse HTTP API with nullIf() function - Configure ClickHouse Docker with join_use_nulls=1 and mutations_sync=1 - Fix unused variable lint warning in dbt_project.py Co-Authored-By: unknown <> * fix: address CodeRabbit review + revert NOT IN back to LEFT JOIN - Revert NOT IN subquery back to LEFT JOIN IS NULL (join_use_nulls=1 handles NULLs) - Add _fix_seed_if_needed to seed_context for ClickHouse NULL fix - Add try/finally for cleanup in table-rebuild sequence - Handle Nullable wrapping to avoid Nullable(Nullable(...)) - Handle FixedString/LowCardinality string variants - Add warning when cols_result is empty - Backtick-quote column names in ClickHouse ALTER statements Co-Authored-By: unknown <> * fix: address CodeRabbit review round 2 - env vars, timeout, SQL injection guard, mutations_sync Co-Authored-By: unknown <> * fix: ClickHouse full_names adapter.dispatch, seasonality macros, event freshness Nullable cast Co-Authored-By: unknown <> * fix: ClickHouse event_freshness timediff NULL handling + list_concat Nullable dimension cast Co-Authored-By: unknown <> * fix: dynamically resolve ClickHouse schema from dbt profiles.yml instead of hardcoding 'default' Co-Authored-By: unknown <> * ci: restore full CI matrix with all warehouse types Co-Authored-By: unknown <> * refactor: extract ClickHouse seed repair utils + dispatch empty-string NULL macro Co-Authored-By: unknown <> * refactor: remove unused clickhouse__ dispatch from replace_empty_strings_with_nulls The macro is only called for BigQuery fusion seeds. ClickHouse seed NULL repair is handled by fix_clickhouse_seed_nulls() in clickhouse_utils.py via the HTTP API (covers all column types, not just strings). Co-Authored-By: unknown <> * ci: retrigger CI to verify flaky test_seed_group_attribute failure Co-Authored-By: unknown <> * refactor: replace clickhouse_utils.py with ClickHouseDirectSeeder - Add ClickHouseDirectSeeder to data_seeder.py: creates tables with Nullable(String) columns directly via the dbt adapter, bypassing dbt seed and eliminating the need for post-hoc NULL repair - Add execute_sql() and schema_name property to AdapterQueryRunner - DbtProject._create_seeder() auto-selects ClickHouseDirectSeeder when target is 'clickhouse' - Delete clickhouse_utils.py (HTTP API no longer needed for seeding) - Update replace_empty_strings_with_nulls.sql comment Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> Co-Authored-By: unknown <> * fix: add type inference to ClickHouseDirectSeeder Infer ClickHouse column types from Python values instead of using Nullable(String) for all columns. This preserves proper numeric types (Int64, Float64) so that Elementary's numeric monitors (average, zero_count) and schema change detection (type_changed) work correctly. - _infer_column_type(): examines Python types (bool→UInt8, int→Int64, float→Float64, str→String), all wrapped in Nullable() - _escape(): returns unquoted literals for numeric/boolean types - seed(): logs inferred column types for debugging Co-Authored-By: unknown <> * fix: treat booleans as strings in ClickHouseDirectSeeder dbt seed writes Python True/False as 'True'/'False' strings in CSV, so ClickHouse stores them as String columns. Match this behavior in the direct seeder so count_true/count_false monitors work correctly. - Remove Nullable(UInt8) inference for booleans (fall through to String) - Escape True/False as quoted strings 'True'/'False' Co-Authored-By: unknown <> * fix: use Nullable(Bool) for boolean columns in ClickHouseDirectSeeder dbt seed infers True/False CSV values as boolean; dbt-clickhouse maps this to Bool (alias for UInt8). Match this behavior so count_true and count_false monitors work correctly. - Infer Nullable(Bool) for all-boolean columns - Escape True/False as ClickHouse Bool literals (true/false) Co-Authored-By: unknown <> * fix: write CSV for dbt node discovery in ClickHouseDirectSeeder The direct seeder bypasses dbt seed but still needs a CSV file on disk so that dbt can discover the seed node for {{ ref() }} resolution during run_operation. Without it, queries referencing the seed table fail with 'node not found'. - Add seeds_dir_path to ClickHouseDirectSeeder.__init__ - Write CSV before creating the table; delete it in finally block - Pass seeds_dir_path from DbtProject._create_seeder() Co-Authored-By: unknown <> * refactor: remove run_operation retry logic from run_query path The retry masked non-transient errors (e.g. 'node not found') by retrying them pointlessly. Since most queries now use the direct adapter path (AdapterQueryRunner), the retry is no longer needed. If the log-capture issue resurfaces, we can add a proper fix that distinguishes transient from non-transient failures. Co-Authored-By: unknown <> * docs: add comment explaining why clickhouse__has_temp_table_support returns false Co-Authored-By: unknown <> --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Itamar Hartstein <haritamar@gmail.com>
1 parent 3111418 commit 8045ade

34 files changed

Lines changed: 323 additions & 265 deletions

integration_tests/dbt_project/macros/replace_empty_strings_with_nulls.sql

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
1+
{# This macro is only used for BigQuery fusion seeds (see dbt_project.py _fix_seed_if_needed).
2+
ClickHouse uses ClickHouseDirectSeeder (data_seeder.py) which creates Nullable(String)
3+
columns directly, so no post-hoc repair is needed. #}
14
{% macro replace_empty_strings_with_nulls(table_name) %}
25
{% set relation = ref(table_name) %}
36
{% set columns = adapter.get_columns_in_relation(relation) %}
47

58
{% for col in columns %}
69
{% set data_type = elementary.get_column_data_type(col) %}
710
{% set normalized_data_type = elementary.normalize_data_type(data_type) %}
8-
11+
912
{% if normalized_data_type == "string" %}
13+
{% set quoted_col = adapter.quote(col["name"]) %}
1014
{% set update_query %}
1115
update {{ relation }}
12-
set {{ col["name"] }} = NULL
13-
where {{ col["name"] }} = ''
16+
set {{ quoted_col }} = NULL
17+
where {{ quoted_col }} = ''
1418
{% endset %}
1519
{% do elementary.run_query(update_query) %}
1620
{% endif %}

integration_tests/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ services:
2020
- "9000:9000"
2121
volumes:
2222
- clickhouse:/var/lib/clickhouse
23+
- ./docker/clickhouse/users.xml:/etc/clickhouse-server/users.d/elementary.xml
2324
environment:
2425
CLICKHOUSE_DB: default
2526
CLICKHOUSE_USER: default
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<clickhouse>
2+
<profiles>
3+
<default>
4+
<join_use_nulls>1</join_use_nulls>
5+
<mutations_sync>1</mutations_sync>
6+
</default>
7+
</profiles>
8+
</clickhouse>

integration_tests/tests/adapter_query_runner.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,16 @@ def has_non_ref_jinja(query: str) -> bool:
239239
stripped = _SOURCE_PATTERN.sub("", stripped)
240240
return bool(_JINJA_EXPR_PATTERN.search(stripped))
241241

242+
def execute_sql(self, sql: str) -> None:
243+
"""Execute a SQL statement that does not return results (DDL/DML)."""
244+
with self._adapter.connection_named("execute_sql"):
245+
self._adapter.execute(sql, fetch=False)
246+
247+
@property
248+
def schema_name(self) -> str:
249+
"""Return the base schema name from the adapter credentials."""
250+
return self._adapter.config.credentials.schema
251+
242252
def run_query(self, prerendered_query: str) -> List[Dict[str, Any]]:
243253
"""Render Jinja refs/sources and execute a query, returning rows as dicts.
244254

integration_tests/tests/data_seeder.py

Lines changed: 126 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import csv
22
from contextlib import contextmanager
33
from pathlib import Path
4-
from typing import Generator, List
4+
from typing import TYPE_CHECKING, Generator, List
55

66
from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
77
from logger import get_logger
88

9-
# TODO: Write more performant data seeders per adapter.
9+
if TYPE_CHECKING:
10+
from adapter_query_runner import AdapterQueryRunner
1011

1112
logger = get_logger(__name__)
1213

@@ -48,3 +49,126 @@ def seed(self, data: List[dict], table_name: str) -> Generator[None, None, None]
4849
yield
4950
finally:
5051
seed_path.unlink()
52+
53+
54+
# Maximum number of rows per INSERT VALUES statement.
55+
_INSERT_BATCH_SIZE = 500
56+
57+
58+
class ClickHouseDirectSeeder:
59+
"""Fast seeder for ClickHouse: executes CREATE TABLE + INSERT directly.
60+
61+
Bypasses the ``dbt seed`` *subprocess* (and its post-hoc NULL repair),
62+
but still writes a CSV file to the seeds directory so that dbt can
63+
discover the seed node for ``{{ ref() }}`` resolution during
64+
``run_operation``.
65+
66+
Column types are inferred from the Python values in the seed data and
67+
wrapped in ``Nullable()`` so that NULL values are preserved correctly
68+
(ClickHouse columns are non-Nullable by default).
69+
"""
70+
71+
def __init__(
72+
self,
73+
query_runner: "AdapterQueryRunner",
74+
schema: str,
75+
seeds_dir_path: Path,
76+
) -> None:
77+
self._query_runner = query_runner
78+
self._schema = schema
79+
self._seeds_dir_path = seeds_dir_path
80+
81+
@staticmethod
82+
def _infer_column_type(values: List[object]) -> str:
83+
"""Infer a ClickHouse column type from a list of Python values.
84+
85+
Examines non-None, non-empty-string values and returns a
86+
``Nullable(...)`` type string. Falls back to ``Nullable(String)``
87+
when all values are None/empty or when types are mixed.
88+
"""
89+
non_null = [v for v in values if v is not None and v != ""]
90+
if not non_null:
91+
return "Nullable(String)"
92+
93+
# bool is a subclass of int in Python, so check it first.
94+
# dbt seed infers "True"/"False" CSV values as boolean; dbt-clickhouse
95+
# maps this to Bool (alias for UInt8).
96+
if all(isinstance(v, bool) for v in non_null):
97+
return "Nullable(Bool)"
98+
if all(isinstance(v, int) and not isinstance(v, bool) for v in non_null):
99+
return "Nullable(Int64)"
100+
if all(
101+
isinstance(v, (int, float)) and not isinstance(v, bool) for v in non_null
102+
):
103+
return "Nullable(Float64)"
104+
return "Nullable(String)"
105+
106+
@staticmethod
107+
def _escape(value: object) -> str:
108+
"""Escape a value for a ClickHouse SQL literal.
109+
110+
Returns ``NULL`` for None / empty-string, unquoted literals for
111+
numeric / boolean types, and a quoted+escaped string otherwise.
112+
"""
113+
if value is None or (isinstance(value, str) and value == ""):
114+
return "NULL"
115+
# Booleans → ClickHouse Bool literals (true/false).
116+
if isinstance(value, bool):
117+
return "true" if value else "false"
118+
if isinstance(value, (int, float)):
119+
return str(value)
120+
text = str(value)
121+
text = text.replace("\\", "\\\\")
122+
text = text.replace("'", "\\'")
123+
return f"'{text}'"
124+
125+
@contextmanager
126+
def seed(self, data: List[dict], table_name: str) -> Generator[None, None, None]:
127+
"""Create a table with correctly-typed Nullable columns and insert data.
128+
129+
A CSV file is written to the seeds directory so that dbt can
130+
discover the seed node for ``{{ ref() }}`` resolution. The file
131+
is removed when the context manager exits.
132+
"""
133+
columns = list(data[0].keys())
134+
col_types = {
135+
col: self._infer_column_type([row.get(col) for row in data])
136+
for col in columns
137+
}
138+
col_defs = ", ".join(f"`{col}` {col_types[col]}" for col in columns)
139+
fq_table = f"`{self._schema}`.`{table_name}`"
140+
141+
# Write a CSV so dbt discovers the seed node (needed for {{ ref() }}).
142+
seed_path = self._seeds_dir_path / f"{table_name}.csv"
143+
with seed_path.open("w") as f:
144+
writer = csv.DictWriter(f, fieldnames=columns)
145+
writer.writeheader()
146+
writer.writerows(data)
147+
148+
try:
149+
self._query_runner.execute_sql(f"DROP TABLE IF EXISTS {fq_table}")
150+
self._query_runner.execute_sql(
151+
f"CREATE TABLE {fq_table} ({col_defs}) "
152+
f"ENGINE = MergeTree() ORDER BY tuple()"
153+
)
154+
155+
for batch_start in range(0, len(data), _INSERT_BATCH_SIZE):
156+
batch = data[batch_start : batch_start + _INSERT_BATCH_SIZE]
157+
rows_sql = ", ".join(
158+
"(" + ", ".join(self._escape(row.get(c)) for c in columns) + ")"
159+
for row in batch
160+
)
161+
self._query_runner.execute_sql(
162+
f"INSERT INTO {fq_table} VALUES {rows_sql}"
163+
)
164+
165+
logger.info(
166+
"ClickHouseDirectSeeder: loaded %d rows into %s (%s)",
167+
len(data),
168+
fq_table,
169+
", ".join(f"{c}: {t}" for c, t in col_types.items()),
170+
)
171+
172+
yield
173+
finally:
174+
seed_path.unlink(missing_ok=True)

integration_tests/tests/dbt_project.py

Lines changed: 24 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,16 @@
77
from uuid import uuid4
88

99
from adapter_query_runner import AdapterQueryRunner, UnsupportedJinjaError
10-
from data_seeder import DbtDataSeeder
10+
from data_seeder import ClickHouseDirectSeeder, DbtDataSeeder
1111
from dbt_utils import get_database_and_schema_properties
1212
from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
1313
from elementary.clients.dbt.factory import RunnerMethod, create_dbt_runner
1414
from logger import get_logger
1515
from ruamel.yaml import YAML
16-
from tenacity import (
17-
RetryCallState,
18-
retry,
19-
retry_if_result,
20-
stop_after_attempt,
21-
wait_fixed,
22-
)
2316

2417
PYTEST_XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", None)
2518
SCHEMA_NAME_SUFFIX = f"_{PYTEST_XDIST_WORKER}" if PYTEST_XDIST_WORKER else ""
2619

27-
# Retry settings for the run_operation fallback path. run_operation() can
28-
# intermittently return an empty list when the MACRO_RESULT_PATTERN log line
29-
# is not captured from dbt's output.
30-
_RUN_QUERY_MAX_RETRIES = 3
31-
_RUN_QUERY_RETRY_DELAY_SECONDS = 0.5
32-
3320
_DEFAULT_VARS = {
3421
"disable_dbt_invocation_autoupload": True,
3522
"disable_dbt_artifacts_autoupload": True,
@@ -92,51 +79,21 @@ def run_query(self, prerendered_query: str):
9279
except UnsupportedJinjaError:
9380
logger.debug("Query contains complex Jinja; falling back to run_operation")
9481

95-
# Slow path: full Jinja rendering via run_operation (with retry).
82+
# Slow path: full Jinja rendering via run_operation.
9683
return self._run_query_with_run_operation(prerendered_query)
9784

98-
@staticmethod
99-
def _log_retry(retry_state: RetryCallState) -> None:
100-
"""Tenacity before_sleep callback — logs each retry with attempt number."""
101-
logger.warning(
102-
"run_operation('elementary.render_run_query') returned no output; "
103-
"retry %d/%d in %.1fs",
104-
retry_state.attempt_number,
105-
_RUN_QUERY_MAX_RETRIES,
106-
_RUN_QUERY_RETRY_DELAY_SECONDS,
107-
)
108-
109-
@retry(
110-
retry=retry_if_result(lambda r: r is None),
111-
stop=stop_after_attempt(_RUN_QUERY_MAX_RETRIES),
112-
wait=wait_fixed(_RUN_QUERY_RETRY_DELAY_SECONDS),
113-
before_sleep=_log_retry.__func__,
114-
reraise=True,
115-
)
116-
def _run_operation_with_retry(self, prerendered_query: str) -> Optional[list]:
117-
"""Call run_operation and return the parsed result, or None to trigger retry."""
85+
def _run_query_with_run_operation(self, prerendered_query: str):
86+
"""Execute a query via run_operation."""
11887
run_operation_results = self.dbt_runner.run_operation(
11988
"elementary.render_run_query",
12089
macro_args={"prerendered_query": prerendered_query},
12190
)
122-
if run_operation_results:
123-
return json.loads(run_operation_results[0])
124-
return None
125-
126-
def _run_query_with_run_operation(self, prerendered_query: str):
127-
"""Execute a query via run_operation with retry on empty output.
128-
129-
run_operation() can intermittently return an empty list when the
130-
MACRO_RESULT_PATTERN log line is not captured from dbt's output.
131-
"""
132-
result = self._run_operation_with_retry(prerendered_query)
133-
if result is None:
91+
if not run_operation_results:
13492
raise RuntimeError(
135-
f"run_operation('elementary.render_run_query') returned no output "
136-
f"after {_RUN_QUERY_MAX_RETRIES} attempts. "
93+
f"run_operation('elementary.render_run_query') returned no output. "
13794
f"Query: {prerendered_query!r}"
13895
)
139-
return result
96+
return json.loads(run_operation_results[0])
14097

14198
@staticmethod
14299
def read_table_query(
@@ -326,15 +283,25 @@ def test(
326283
}
327284
return [test_result] if multiple_results else test_result
328285

329-
def seed(self, data: List[dict], table_name: str):
330-
with DbtDataSeeder(
286+
def _create_seeder(
287+
self,
288+
) -> Union[DbtDataSeeder, "ClickHouseDirectSeeder"]:
289+
"""Return the appropriate seeder for the current target."""
290+
if self.target == "clickhouse":
291+
runner = self._get_query_runner()
292+
schema = runner.schema_name + SCHEMA_NAME_SUFFIX
293+
return ClickHouseDirectSeeder(runner, schema, self.seeds_dir_path)
294+
return DbtDataSeeder(
331295
self.dbt_runner, self.project_dir_path, self.seeds_dir_path
332-
).seed(data, table_name):
296+
)
297+
298+
def seed(self, data: List[dict], table_name: str):
299+
with self._create_seeder().seed(data, table_name):
333300
self._fix_seed_if_needed(table_name)
334301

335-
def _fix_seed_if_needed(self, table_name: str):
302+
def _fix_seed_if_needed(self, table_name: str) -> None:
336303
# Hack for BigQuery - seems like we get empty strings instead of nulls in seeds, so we
337-
# fix them here
304+
# fix them here.
338305
if self.runner_method == RunnerMethod.FUSION and self.target == "bigquery":
339306
self.dbt_runner.run_operation(
340307
"elementary_tests.replace_empty_strings_with_nulls",
@@ -345,9 +312,8 @@ def _fix_seed_if_needed(self, table_name: str):
345312
def seed_context(
346313
self, data: List[dict], table_name: str
347314
) -> Generator[None, None, None]:
348-
with DbtDataSeeder(
349-
self.dbt_runner, self.project_dir_path, self.seeds_dir_path
350-
).seed(data, table_name):
315+
with self._create_seeder().seed(data, table_name):
316+
self._fix_seed_if_needed(table_name)
351317
yield
352318

353319
@contextmanager

integration_tests/tests/test_all_columns_anomalies.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
}
1414

1515

16-
# Anomalies currently not supported on ClickHouse
17-
@pytest.mark.skip_targets(["clickhouse"])
1816
def test_anomalyless_all_columns_anomalies(test_id: str, dbt_project: DbtProject):
1917
utc_today = datetime.utcnow().date()
2018
data: List[Dict[str, Any]] = [
@@ -31,8 +29,6 @@ def test_anomalyless_all_columns_anomalies(test_id: str, dbt_project: DbtProject
3129
assert all([res["status"] == "pass" for res in test_results])
3230

3331

34-
# Anomalies currently not supported on ClickHouse
35-
@pytest.mark.skip_targets(["clickhouse"])
3632
def test_anomalous_all_columns_anomalies(test_id: str, dbt_project: DbtProject):
3733
utc_today = datetime.utcnow().date()
3834
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))
@@ -57,8 +53,6 @@ def test_anomalous_all_columns_anomalies(test_id: str, dbt_project: DbtProject):
5753
assert col_to_status == {"superhero": "fail", TIMESTAMP_COLUMN: "pass"}
5854

5955

60-
# Anomalies currently not supported on ClickHouse
61-
@pytest.mark.skip_targets(["clickhouse"])
6256
def test_all_columns_anomalies_with_where_parameter(
6357
test_id: str, dbt_project: DbtProject
6458
):
@@ -128,8 +122,6 @@ def test_all_columns_anomalies_with_where_parameter(
128122
}
129123

130124

131-
# Anomalies currently not supported on ClickHouse
132-
@pytest.mark.skip_targets(["clickhouse"])
133125
def test_anomalyless_all_columns_anomalies_all_monitors_sanity(
134126
test_id: str, dbt_project: DbtProject
135127
):
@@ -155,8 +147,6 @@ def test_anomalyless_all_columns_anomalies_all_monitors_sanity(
155147
assert all([res["status"] == "pass" for res in test_results])
156148

157149

158-
# Anomalies currently not supported on ClickHouse
159-
@pytest.mark.skip_targets(["clickhouse"])
160150
@pytest.mark.parametrize(
161151
"exclude_detection,expected_status",
162152
[

0 commit comments

Comments
 (0)