Skip to content

Commit cdbece3

Browse files
committed
fix(clickhouse): support multi-gateway projects with catalog-aware engines
ClickHouse's UNSUPPORTED catalog_support caused 2-level FQNs that broke sqlglot MappingSchema when mixed with 3-level FQNs from Trino or other catalog-aware gateways. Fix: auto-inject a virtual catalog (gateway name) for UNSUPPORTED adapters when catalog-aware peers exist, then strip it before any SQL reaches ClickHouse. Signed-off-by: mday-io <mdaytn@gmail.com>
1 parent 7c31c5c commit cdbece3

6 files changed

Lines changed: 226 additions & 1 deletion

File tree

sqlmesh/core/config/scheduler.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,25 @@ def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
138138

139139
def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
140140
default_catalogs_per_gateway: t.Dict[str, str] = {}
141+
unsupported_gateways = []
142+
141143
for gateway, adapter in context.engine_adapters.items():
142-
if catalog := adapter.default_catalog:
144+
if adapter.catalog_support.is_unsupported:
145+
unsupported_gateways.append((gateway, adapter))
146+
elif catalog := adapter.default_catalog:
143147
default_catalogs_per_gateway[gateway] = catalog
148+
149+
# When catalog-aware gateways exist, assign the gateway name as a virtual catalog for
150+
# catalog-unsupported gateways that opt in (e.g. ClickHouse) so that all models in the
151+
# project have a uniform 3-level FQN and the MappingSchema nesting level check passes.
152+
# Only adapters that explicitly return True from supports_virtual_catalog() are mutated;
153+
# other UNSUPPORTED adapters are left unchanged to avoid silent breakage.
154+
if default_catalogs_per_gateway and unsupported_gateways:
155+
for gateway, adapter in unsupported_gateways:
156+
if adapter.supports_virtual_catalog():
157+
adapter.inject_virtual_catalog(gateway)
158+
default_catalogs_per_gateway[gateway] = gateway
159+
144160
return default_catalogs_per_gateway
145161

146162

sqlmesh/core/context.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,12 @@ def engine_adapter(self) -> EngineAdapter:
487487
@property
488488
def snapshot_evaluator(self) -> SnapshotEvaluator:
489489
if not self._snapshot_evaluator:
490+
# Ensure virtual catalog injection (via default_catalog_per_gateway) has run before
491+
# cloning adapters with with_settings(). Adapters that support virtual catalogs (e.g.
492+
# ClickHouse alongside catalog-aware gateways) mutate _default_catalog during
493+
# get_default_catalog_per_gateway. with_settings() forwards _default_catalog to the
494+
# clone, so the mutation must happen first or the clones will miss the virtual catalog.
495+
self.default_catalog_per_gateway # noqa: B018
490496
self._snapshot_evaluator = SnapshotEvaluator(
491497
{
492498
gateway: adapter.with_settings(execute_log_level=logging.INFO)

sqlmesh/core/engine_adapter/base.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,29 @@ def comments_enabled(self) -> bool:
218218
def catalog_support(self) -> CatalogSupport:
219219
return CatalogSupport.UNSUPPORTED
220220

221+
def supports_virtual_catalog(self) -> bool:
222+
"""Return True if this adapter can accept a virtual catalog for multi-gateway nesting alignment.
223+
224+
When a project mixes catalog-aware gateways (e.g. DuckDB) with catalog-unsupported gateways
225+
(e.g. ClickHouse), all adapters need a uniform 3-level FQN so MappingSchema nesting stays
226+
consistent. Adapters that return True here opt in to receiving an injected virtual catalog
227+
via inject_virtual_catalog(), which causes the set_catalog decorator to strip the catalog
228+
from DDL expressions rather than raising UnsupportedCatalogOperationError.
229+
"""
230+
return False
231+
232+
def inject_virtual_catalog(self, catalog: str) -> None:
233+
"""Inject a virtual catalog name for multi-gateway nesting alignment.
234+
235+
Only call this on adapters that return True from supports_virtual_catalog(). After
236+
injection, catalog_support should return SINGLE_CATALOG_ONLY so the set_catalog decorator
237+
strips the virtual catalog from DDL expressions instead of raising an error.
238+
"""
239+
raise NotImplementedError(
240+
f"{self.dialect} does not support virtual catalog injection. "
241+
"Override supports_virtual_catalog() to return True and implement inject_virtual_catalog()."
242+
)
243+
221244
@cached_property
222245
def schema_differ(self) -> SchemaDiffer:
223246
return SchemaDiffer(

sqlmesh/core/engine_adapter/clickhouse.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin
99
from sqlmesh.core.engine_adapter.base import EngineAdapterWithIndexSupport
1010
from sqlmesh.core.engine_adapter.shared import (
11+
CatalogSupport,
1112
DataObject,
1213
DataObjectType,
1314
EngineRunMode,
@@ -42,6 +43,22 @@ class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin):
4243
DEFAULT_TABLE_ENGINE = "MergeTree"
4344
ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$"
4445

46+
@property
47+
def catalog_support(self) -> CatalogSupport:
48+
# When a virtual catalog has been injected via inject_virtual_catalog() (to align
49+
# nesting levels with catalog-aware gateways in the same project), treat ClickHouse as
50+
# SINGLE_CATALOG_ONLY so the set_catalog decorator strips the virtual catalog from DDL
51+
# expressions instead of raising UnsupportedCatalogOperationError.
52+
if self._default_catalog:
53+
return CatalogSupport.SINGLE_CATALOG_ONLY
54+
return CatalogSupport.UNSUPPORTED
55+
56+
def supports_virtual_catalog(self) -> bool:
57+
return True
58+
59+
def inject_virtual_catalog(self, catalog: str) -> None:
60+
self._default_catalog = catalog
61+
4562
@property
4663
def engine_run_mode(self) -> EngineRunMode:
4764
if self._extra_config.get("cloud_mode"):
@@ -172,10 +189,28 @@ def create_schema(
172189
173190
Clickhouse has a two-level naming scheme [database].[table].
174191
"""
192+
from sqlmesh.utils.errors import SQLMeshError
193+
175194
properties_copy = properties.copy()
176195
if self.engine_run_mode.is_cluster:
177196
properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
178197

198+
# ClickHouse does not support catalogs. When a virtual catalog has been injected
199+
# (self._default_catalog is set), strip it from the schema name. This mirrors the
200+
# SINGLE_CATALOG_ONLY branch in the set_catalog decorator, which does not apply here
201+
# because this override is not wrapped by @set_catalog().
202+
if self._default_catalog:
203+
schema_exp = to_schema(schema_name)
204+
catalog_name = schema_exp.catalog
205+
if catalog_name:
206+
if catalog_name != self._default_catalog:
207+
raise SQLMeshError(
208+
f"clickhouse requires that all catalog operations be against a single catalog: "
209+
f"{self._default_catalog}. Provided catalog: {catalog_name}"
210+
)
211+
schema_exp.set("catalog", None)
212+
schema_name = schema_exp
213+
179214
# can't call super() because it will try to set a catalog
180215
return self._create_schema(
181216
schema_name=schema_name,

tests/core/engine_adapter/test_clickhouse.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1407,3 +1407,40 @@ def test_exchange_tables(
14071407
'RENAME TABLE "table2" TO "table1"',
14081408
'DROP TABLE IF EXISTS "__temp_table1_abcd"',
14091409
]
1410+
1411+
1412+
def test_virtual_catalog_ddl_stripping(make_mocked_engine_adapter: t.Callable):
1413+
"""After inject_virtual_catalog(), create_schema() with the virtual catalog prefix must strip
1414+
the catalog and execute without raising, and with a wrong catalog must raise SQLMeshError."""
1415+
from sqlmesh.utils.errors import SQLMeshError
1416+
1417+
adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter)
1418+
1419+
assert adapter.supports_virtual_catalog() is True
1420+
adapter.inject_virtual_catalog("clickhouse_gw")
1421+
1422+
# catalog_support must switch to SINGLE_CATALOG_ONLY after injection
1423+
from sqlmesh.core.engine_adapter.shared import CatalogSupport
1424+
1425+
assert adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY
1426+
assert adapter._default_catalog == "clickhouse_gw"
1427+
1428+
# create_schema with the virtual catalog prefix must strip the catalog and not raise
1429+
adapter.create_schema("clickhouse_gw.mydb")
1430+
assert to_sql_calls(adapter) == ['CREATE DATABASE IF NOT EXISTS "mydb"']
1431+
1432+
# create_schema with a wrong catalog must raise SQLMeshError
1433+
with pytest.raises(SQLMeshError, match="clickhouse_gw"):
1434+
adapter.create_schema("wrong_catalog.mydb")
1435+
1436+
1437+
def test_supports_virtual_catalog_returns_true():
1438+
"""ClickhouseEngineAdapter.supports_virtual_catalog() must return True without any connection."""
1439+
from unittest.mock import MagicMock
1440+
1441+
adapter = ClickhouseEngineAdapter(
1442+
lambda *a, **k: MagicMock(),
1443+
dialect="clickhouse",
1444+
)
1445+
assert adapter.supports_virtual_catalog() is True
1446+
assert adapter._default_catalog is None

tests/core/test_context.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,114 @@ def test_multiple_gateways(tmp_path: Path):
399399
assert context.dag._sorted == ['"db"."staging"."stg_model"', '"db"."main"."final_model"']
400400

401401

402+
def test_multi_gateway_catalog_aware_and_unsupported(tmp_path: Path, mocker):
403+
"""ClickHouse (catalog UNSUPPORTED) alongside DuckDB (catalog FULL_SUPPORT) must not raise a
404+
nesting-level SchemaError when models are loaded.
405+
406+
Expected behaviour after the fix:
407+
- get_default_catalog_per_gateway assigns the gateway name as a virtual catalog for
408+
catalog-unsupported gateways when catalog-aware gateways are present.
409+
- ClickHouse models end up with a 3-level FQN so the MappingSchema nesting is uniform.
410+
- The virtual catalog is stripped from DDL expressions (not raised as an error) because the
411+
adapter's catalog_support flips to SINGLE_CATALOG_ONLY when _default_catalog is set.
412+
"""
413+
414+
from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig
415+
from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter
416+
from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
417+
from sqlmesh.core.engine_adapter.shared import CatalogSupport
418+
419+
db_path = str(tmp_path / "db.db")
420+
421+
# Build a real DuckDB adapter for the primary gateway.
422+
duck_adapter = DuckDBEngineAdapter(
423+
lambda *a, **k: __import__("duckdb").connect(db_path),
424+
dialect="duckdb",
425+
)
426+
427+
# Build a minimal ClickHouse adapter stub — no real connection needed.
428+
ch_adapter = ClickhouseEngineAdapter(
429+
lambda *a, **k: mocker.NonCallableMock(),
430+
dialect="clickhouse",
431+
)
432+
433+
# Simulate the context's engine_adapters dict and call the scheduler directly.
434+
engine_adapters = {
435+
"duckdb_gw": duck_adapter,
436+
"clickhouse_gw": ch_adapter,
437+
}
438+
439+
ctx_mock = mocker.MagicMock()
440+
ctx_mock.engine_adapters = engine_adapters
441+
442+
scheduler = BuiltInSchedulerConfig()
443+
catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock)
444+
445+
# DuckDB gateway must have a real catalog entry.
446+
assert "duckdb_gw" in catalog_per_gw
447+
# DuckDB's default catalog is the database filename without extension.
448+
assert catalog_per_gw["duckdb_gw"] == "db"
449+
# ClickHouse gateway must now also have a virtual catalog equal to its gateway name.
450+
assert "clickhouse_gw" in catalog_per_gw
451+
assert catalog_per_gw["clickhouse_gw"] == "clickhouse_gw"
452+
453+
# The ClickHouse adapter's _default_catalog must be mutated to the virtual catalog name.
454+
assert ch_adapter._default_catalog == "clickhouse_gw"
455+
456+
# The adapter's catalog_support must now be SINGLE_CATALOG_ONLY (not UNSUPPORTED),
457+
# so that the set_catalog decorator strips the virtual catalog instead of raising.
458+
assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY
459+
460+
# Loading models for both gateways must not raise a SchemaError.
461+
duckdb_model = load_sql_based_model(
462+
parse("MODEL(name main.duckdb_tbl, kind FULL, gateway duckdb_gw);\nSELECT 1 AS col"),
463+
default_catalog="db",
464+
)
465+
ch_model = load_sql_based_model(
466+
parse("MODEL(name mydb.ch_tbl, kind FULL, gateway clickhouse_gw);\nSELECT 1 AS col"),
467+
default_catalog="clickhouse_gw",
468+
)
469+
470+
# Both models must have 3-level FQNs so MappingSchema nesting is uniform.
471+
assert duckdb_model.fqn.count(".") == 2, (
472+
f"Expected 3-level FQN for duckdb model, got: {duckdb_model.fqn}"
473+
)
474+
assert ch_model.fqn.count(".") == 2, f"Expected 3-level FQN for ch model, got: {ch_model.fqn}"
475+
476+
# Both models loaded into the same MappingSchema must not raise a nesting SchemaError.
477+
from sqlglot.schema import MappingSchema
478+
479+
schema = MappingSchema(normalize=False)
480+
schema.add_table(duckdb_model.fqn, duckdb_model.columns_to_types or {})
481+
schema.add_table(ch_model.fqn, ch_model.columns_to_types or {})
482+
483+
484+
def test_single_gateway_clickhouse_no_virtual_catalog(mocker):
485+
"""When ClickHouse is the only gateway (no catalog-aware peer), it must NOT receive a virtual
486+
catalog. Models remain 2-level and catalog_support stays UNSUPPORTED."""
487+
from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig
488+
from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter
489+
from sqlmesh.core.engine_adapter.shared import CatalogSupport
490+
491+
ch_adapter = ClickhouseEngineAdapter(
492+
lambda *a, **k: mocker.NonCallableMock(),
493+
dialect="clickhouse",
494+
)
495+
496+
ctx_mock = mocker.MagicMock()
497+
ctx_mock.engine_adapters = {"clickhouse_gw": ch_adapter}
498+
499+
scheduler = BuiltInSchedulerConfig()
500+
catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock)
501+
502+
# With only a catalog-unsupported gateway there must be no entry at all.
503+
assert "clickhouse_gw" not in catalog_per_gw
504+
505+
# The adapter must remain unchanged — no virtual catalog injected.
506+
assert ch_adapter._default_catalog is None
507+
assert ch_adapter.catalog_support == CatalogSupport.UNSUPPORTED
508+
509+
402510
def test_plan_execution_time():
403511
context = Context(config=Config())
404512
context.upsert_model(

0 commit comments

Comments
 (0)