Skip to content

Commit 65ac801

Browse files
authored
feat: add airflow operator and hook for ClickHouse (#3699)
1 parent 8128354 commit 65ac801

4 files changed

Lines changed: 90 additions & 0 deletions

File tree

docs/integrations/engines/clickhouse.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,26 @@ If a model has many records in each partition, you may see additional performanc
394394
## Local/Built-in Scheduler
395395
**Engine Adapter Type**: `clickhouse`
396396

397+
## Airflow Scheduler
398+
**Engine Name:** `clickhouse`
399+
400+
In order to share a common implementation across local and Airflow, SQLMesh ClickHouse implements its own hook and operator.
401+
402+
By default, the connection ID is set to `sqlmesh_clickhouse_default`, but can be overridden using the `engine_operator_args` parameter to the `SQLMeshAirflow` instance as in the example below:
403+
```python linenums="1"
404+
from sqlmesh.schedulers.airflow import NO_DEFAULT_CATALOG
405+
406+
sqlmesh_airflow = SQLMeshAirflow(
407+
"clickhouse",
408+
default_catalog=NO_DEFAULT_CATALOG,
409+
engine_operator_args={
410+
"sqlmesh_clickhouse_conn_id": "<Connection ID>"
411+
},
412+
)
413+
```
414+
415+
Note: `NO_DEFAULT_CATALOG` is required for ClickHouse since ClickHouse doesn't support catalogs.
416+
397417
### Connection options
398418

399419
| Option | Description | Type | Required |
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from __future__ import annotations
2+
3+
import typing as t
4+
5+
from airflow.providers.common.sql.hooks.sql import DbApiHook
6+
7+
if t.TYPE_CHECKING:
8+
from clickhouse_connect.dbapi.connection import Connection
9+
10+
11+
class SQLMeshClickHouseHook(DbApiHook):
12+
"""
13+
Uses the ClickHouse Python DB API connector.
14+
"""
15+
16+
conn_name_attr = "sqlmesh_clickhouse_conn_id"
17+
default_conn_name = "sqlmesh_clickhouse_default"
18+
conn_type = "sqlmesh_clickhouse"
19+
hook_name = "SQLMesh ClickHouse"
20+
21+
def get_conn(self) -> Connection:
22+
"""Returns a ClickHouse connection object"""
23+
from clickhouse_connect.dbapi import connect
24+
25+
db = self.get_connection(getattr(self, t.cast(str, self.conn_name_attr)))
26+
27+
return connect(
28+
host=db.host,
29+
port=db.port,
30+
username=db.login,
31+
password=db.password,
32+
database=db.schema,
33+
**db.extra_dejson,
34+
)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from __future__ import annotations
2+
3+
import typing as t
4+
5+
6+
from sqlmesh.schedulers.airflow.hooks.clickhouse import SQLMeshClickHouseHook
7+
from sqlmesh.schedulers.airflow.operators.base import BaseDbApiOperator
8+
from sqlmesh.schedulers.airflow.operators.targets import BaseTarget
9+
10+
11+
class SQLMeshClickHouseOperator(BaseDbApiOperator):
12+
"""The operator that evaluates a SQLMesh model snapshot on a ClickHouse target
13+
14+
Args:
15+
target: The target that will be executed by this operator instance.
16+
postgres_conn_id: The Airflow connection id for the postgres target.
17+
"""
18+
19+
def __init__(
20+
self,
21+
*,
22+
target: BaseTarget,
23+
clickhouse_conn_id: str = SQLMeshClickHouseHook.default_conn_name,
24+
**kwargs: t.Any,
25+
) -> None:
26+
super().__init__(
27+
target=target,
28+
conn_id=clickhouse_conn_id,
29+
dialect="clickhouse",
30+
hook_type=SQLMeshClickHouseHook,
31+
**kwargs,
32+
)

sqlmesh/schedulers/airflow/util.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ def discover_engine_operator(name: str, sql_only: bool = False) -> t.Type[BaseOp
122122
name = name.lower()
123123

124124
try:
125+
if name == "clickhouse":
126+
from sqlmesh.schedulers.airflow.operators.clickhouse import SQLMeshClickHouseOperator
127+
128+
return SQLMeshClickHouseOperator
125129
if name == "spark":
126130
from sqlmesh.schedulers.airflow.operators.spark_submit import (
127131
SQLMeshSparkSubmitOperator,

0 commit comments

Comments
 (0)