Skip to content

Commit ebee7de

Browse files
committed
Feat(doris): Add support for Doris
1 parent 4fc3ba6 commit ebee7de

File tree

12 files changed

+1130
-6
lines changed

12 files changed

+1130
-6
lines changed

.circleci/continue_config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ workflows:
285285
matrix:
286286
parameters:
287287
engine:
288+
- doris
288289
- duckdb
289290
- postgres
290291
- mysql

.circleci/wait-for-db.sh

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,34 @@ clickhouse_ready() {
3434
probe_port 8123
3535
}
3636

37+
doris_ready() {
38+
probe_port 9030
39+
40+
echo "Checking for 3 alive Doris backends..."
41+
sleep 15
42+
43+
while true; do
44+
echo "Checking Doris backends..."
45+
ALIVE_BACKENDS=$(docker exec -i doris-fe-01 mysql -h127.0.0.1 -P9030 -uroot -e "show backends \G" | grep -c "^ *Alive: true$")
46+
47+
# fallback value if failed to get number
48+
if ! [[ "$ALIVE_BACKENDS" =~ ^[0-9]+$ ]]; then
49+
echo "WARN: Unable to parse number of alive backends, got: '$ALIVE_BACKENDS'"
50+
ALIVE_BACKENDS=0
51+
fi
52+
53+
echo "Found $ALIVE_BACKENDS alive backends"
54+
55+
if [ "$ALIVE_BACKENDS" -ge 3 ]; then
56+
echo "Doris has 3 or more alive backends"
57+
break
58+
fi
59+
60+
echo "Waiting for more backends to become alive..."
61+
sleep 5
62+
done
63+
}
64+
3765
postgres_ready() {
3866
probe_port 5432
3967
}

.readthedocs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build:
66
python: "3.10"
77
jobs:
88
pre_build:
9-
- pip install -e ".[athena,azuresql,bigframes,bigquery,clickhouse,databricks,dbt,dlt,gcppostgres,github,llm,mssql,mysql,mwaa,postgres,redshift,slack,snowflake,trino,web,risingwave]"
9+
- pip install -e ".[athena,azuresql,bigframes,bigquery,clickhouse,databricks,dbt,dlt,doris,gcppostgres,github,llm,mssql,mysql,mwaa,postgres,redshift,slack,snowflake,trino,web,risingwave]"
1010
- make api-docs
1111

1212
mkdocs:

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,9 @@ trino-test: engine-trino-up
148148

149149
risingwave-test: engine-risingwave-up
150150
pytest -n auto -m "risingwave" --retries 3 --junitxml=test-results/junit-risingwave.xml
151-
151+
152+
doris-test: engine-doris-up
153+
pytest -n auto -m "doris" --retries 3 --junitxml=test-results/junit-doris.xml
152154
#################
153155
# Cloud Engines #
154156
#################

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ dev = [
102102
]
103103
dbt = ["dbt-core<2"]
104104
dlt = ["dlt"]
105+
doris = ["pymysql"]
105106
duckdb = []
106107
fabric = ["pyodbc>=5.0.0"]
107108
gcppostgres = ["cloud-sql-python-connector[pg8000]>=1.8.0"]
@@ -263,7 +264,8 @@ markers = [
263264
"snowflake: test for Snowflake",
264265
"spark: test for Spark",
265266
"trino: test for Trino (all connectors)",
266-
"risingwave: test for Risingwave"
267+
"risingwave: test for Risingwave",
268+
"doris: test for Doris",
267269
]
268270
addopts = "-n 0 --dist=loadgroup"
269271
asyncio_default_fixture_loop_scope = "session"

sqlmesh/cli/project_init.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ class ExampleObjects:
167167
python_macros: t.Dict[str, str]
168168

169169

170-
def _gen_example_objects(schema_name: str) -> ExampleObjects:
170+
def _gen_example_objects(schema_name: str, dialect: str) -> ExampleObjects:
171171
sql_models: t.Dict[str, str] = {}
172172
python_models: t.Dict[str, str] = {}
173173
seeds: t.Dict[str, str] = {}
@@ -200,6 +200,7 @@ def _gen_example_objects(schema_name: str) -> ExampleObjects:
200200
name {incremental_model_name},
201201
kind INCREMENTAL_BY_TIME_RANGE (
202202
time_column event_date
203+
{"partition_by_time_column false" if dialect == "doris" else ""}
203204
),
204205
start '2020-01-01',
205206
cron '@daily',
@@ -352,7 +353,10 @@ def init_example_project(
352353
)
353354
return config_path
354355

355-
example_objects = _gen_example_objects(schema_name=schema_name)
356+
example_objects = _gen_example_objects(
357+
schema_name=schema_name,
358+
dialect=dialect if dialect else DIALECT_TO_TYPE.get(engine_type, "duckdb"),
359+
)
356360

357361
if template != ProjectTemplate.EMPTY:
358362
_create_object_files(models_path, example_objects.sql_models, "sql")

sqlmesh/core/config/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
BigQueryConnectionConfig as BigQueryConnectionConfig,
1313
ConnectionConfig as ConnectionConfig,
1414
DatabricksConnectionConfig as DatabricksConnectionConfig,
15+
DorisConnectionConfig as DorisConnectionConfig,
1516
DuckDBConnectionConfig as DuckDBConnectionConfig,
1617
FabricConnectionConfig as FabricConnectionConfig,
1718
GCPPostgresConnectionConfig as GCPPostgresConnectionConfig,

sqlmesh/core/config/connection.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
"trino",
5757
# Nullable types are problematic
5858
"clickhouse",
59+
# Do not support table name starts with "_"
60+
"doris",
5961
}
6062
MOTHERDUCK_TOKEN_REGEX = re.compile(r"(\?|\&)(motherduck_token=)(\S*)")
6163

@@ -2274,6 +2276,80 @@ def init(cursor: t.Any) -> None:
22742276
return init
22752277

22762278

2279+
class DorisConnectionConfig(ConnectionConfig):
2280+
"""Configuration for the Apache Doris connection.
2281+
2282+
Apache Doris uses MySQL network protocol and is compatible with MySQL ecosystem tools,
2283+
JDBC/ODBC drivers, and various visualization tools.
2284+
2285+
Args:
2286+
host: The hostname of the Doris FE (Frontend) node.
2287+
user: The Doris username.
2288+
password: The Doris password.
2289+
port: The port number of the Doris FE node. Default is 9030.
2290+
database: The optional database name.
2291+
charset: The optional character set.
2292+
collation: The optional collation.
2293+
ssl_disabled: Whether to disable SSL connection.
2294+
concurrent_tasks: The maximum number of tasks that can use this connection concurrently.
2295+
register_comments: Whether or not to register model comments with the SQL engine.
2296+
local_infile: Whether or not to allow local file access.
2297+
pre_ping: Whether or not to pre-ping the connection before starting a new transaction to ensure it is still alive.
2298+
"""
2299+
2300+
host: str
2301+
user: str
2302+
password: str
2303+
port: t.Optional[int] = 9030 # Default Doris FE port
2304+
database: t.Optional[str] = None
2305+
charset: t.Optional[str] = None
2306+
collation: t.Optional[str] = None
2307+
ssl_disabled: t.Optional[bool] = None
2308+
2309+
concurrent_tasks: int = 4
2310+
register_comments: bool = True
2311+
local_infile: bool = False
2312+
pre_ping: bool = True
2313+
2314+
type_: t.Literal["doris"] = Field(alias="type", default="doris")
2315+
DIALECT: t.ClassVar[t.Literal["doris"]] = "doris"
2316+
DISPLAY_NAME: t.ClassVar[t.Literal["Apache Doris"]] = "Apache Doris"
2317+
DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17
2318+
2319+
_engine_import_validator = _get_engine_import_validator("pymysql", "doris")
2320+
2321+
@property
2322+
def _connection_kwargs_keys(self) -> t.Set[str]:
2323+
connection_keys = {
2324+
"host",
2325+
"user",
2326+
"password",
2327+
}
2328+
if self.port is not None:
2329+
connection_keys.add("port")
2330+
if self.database is not None:
2331+
connection_keys.add("database")
2332+
if self.charset is not None:
2333+
connection_keys.add("charset")
2334+
if self.collation is not None:
2335+
connection_keys.add("collation")
2336+
if self.ssl_disabled is not None:
2337+
connection_keys.add("ssl_disabled")
2338+
if self.local_infile is not None:
2339+
connection_keys.add("local_infile")
2340+
return connection_keys
2341+
2342+
@property
2343+
def _engine_adapter(self) -> t.Type[EngineAdapter]:
2344+
return engine_adapter.DorisEngineAdapter
2345+
2346+
@property
2347+
def _connection_factory(self) -> t.Callable:
2348+
from pymysql import connect
2349+
2350+
return connect
2351+
2352+
22772353
CONNECTION_CONFIG_TO_TYPE = {
22782354
# Map all subclasses of ConnectionConfig to the value of their `type_` field.
22792355
tpe.all_field_infos()["type_"].default: tpe

sqlmesh/core/engine_adapter/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from sqlmesh.core.engine_adapter.bigquery import BigQueryEngineAdapter
1010
from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter
1111
from sqlmesh.core.engine_adapter.databricks import DatabricksEngineAdapter
12+
from sqlmesh.core.engine_adapter.doris import DorisEngineAdapter
1213
from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
1314
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
1415
from sqlmesh.core.engine_adapter.mysql import MySQLEngineAdapter
@@ -37,6 +38,7 @@
3738
"athena": AthenaEngineAdapter,
3839
"risingwave": RisingwaveEngineAdapter,
3940
"fabric": FabricEngineAdapter,
41+
"doris": DorisEngineAdapter,
4042
}
4143

4244
DIALECT_ALIASES = {

sqlmesh/core/engine_adapter/base.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2586,6 +2586,13 @@ def _to_sql(self, expression: exp.Expression, quote: bool = True, **kwargs: t.An
25862586

25872587
return expression.sql(**sql_gen_kwargs, copy=False) # type: ignore
25882588

2589+
def _get_temp_table_name(self, table: TableName) -> str:
2590+
"""
2591+
Get the name of the temp table.
2592+
"""
2593+
table_obj = exp.to_table(table)
2594+
return f"__temp_{table_obj.name}_{random_id(short=True)}"
2595+
25892596
def _get_data_objects(
25902597
self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
25912598
) -> t.List[DataObject]:
@@ -2602,7 +2609,8 @@ def _get_temp_table(
26022609
"""
26032610
table = t.cast(exp.Table, exp.to_table(table).copy())
26042611
table.set(
2605-
"this", exp.to_identifier(f"__temp_{table.name}_{random_id(short=True)}", quoted=quoted)
2612+
"this",
2613+
exp.to_identifier(self._get_temp_table_name(table), quoted=quoted),
26062614
)
26072615

26082616
if table_only:

0 commit comments

Comments
 (0)