Skip to content

Commit 78b6c4f

Browse files
committed
Update Doris dialect handling and adjust test cases for Doris.
1 parent cb5eb7f commit 78b6c4f

File tree

3 files changed

+48
-44
lines changed

3 files changed

+48
-44
lines changed

sqlmesh/core/model/meta.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -468,9 +468,9 @@ def physical_properties(self) -> t.Dict[str, exp.Expression]:
468468
if self.physical_properties_:
469469
properties = {e.this.name: e.expression for e in self.physical_properties_.expressions}
470470

471-
# For INCREMENTAL_BY_UNIQUE_KEY models, add the unique_key to physical_properties
471+
# For INCREMENTAL_BY_UNIQUE_KEY models for doris dialect, add the unique_key to physical_properties
472472
# so it gets passed to table_properties during table creation
473-
if isinstance(self.kind, IncrementalByUniqueKeyKind) and self.unique_key:
473+
if isinstance(self.kind, IncrementalByUniqueKeyKind) and self.unique_key and self.dialect == "doris":
474474
# Convert unique_key expressions to a format suitable for table_properties
475475
if len(self.unique_key) == 1:
476476
# Single column key

tests/core/engine_adapter/test_doris.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,9 @@ def test_create_table(make_mocked_engine_adapter: t.Callable[..., DorisEngineAda
112112
column_descriptions={"a": "test_column_description"},
113113
)
114114
assert to_sql_calls(adapter) == [
115-
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT COMMENT 'test_column_description') UNIQUE KEY (`a`) DISTRIBUTED BY HASH (`a`) BUCKETS 10 PROPERTIES ('replication_num'='1')",
116-
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT COMMENT 'test_column_description') DUPLICATE KEY (`a`) PROPERTIES ('replication_num'='1')",
117-
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT COMMENT 'test_column_description') COMMENT 'test_description' PROPERTIES ('replication_num'='1')",
115+
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT COMMENT 'test_column_description') UNIQUE KEY (`a`) DISTRIBUTED BY HASH (`a`) BUCKETS 10",
116+
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT COMMENT 'test_column_description') DUPLICATE KEY (`a`)",
117+
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT COMMENT 'test_column_description') COMMENT 'test_description'",
118118
]
119119

120120
adapter.cursor.reset_mock()
@@ -218,7 +218,7 @@ def test_create_table_with_distributed_by(
218218
)
219219

220220
assert to_sql_calls(adapter) == [
221-
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` INT) DISTRIBUTED BY HASH (`a`, `b`) BUCKETS 8 PROPERTIES ('replication_num'='1')",
221+
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` INT) DISTRIBUTED BY HASH (`a`, `b`) BUCKETS 8",
222222
]
223223

224224
adapter.cursor.execute.reset_mock()
@@ -235,7 +235,7 @@ def test_create_table_with_distributed_by(
235235
)
236236

237237
assert to_sql_calls(adapter) == [
238-
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` INT) DISTRIBUTED BY RANDOM PROPERTIES ('replication_num'='1')",
238+
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` INT) DISTRIBUTED BY RANDOM",
239239
]
240240

241241
adapter.cursor.execute.reset_mock()
@@ -252,7 +252,7 @@ def test_create_table_with_distributed_by(
252252
)
253253

254254
assert to_sql_calls(adapter) == [
255-
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` INT) DISTRIBUTED BY HASH (`a`) BUCKETS AUTO PROPERTIES ('replication_num'='1')",
255+
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` INT) DISTRIBUTED BY HASH (`a`) BUCKETS AUTO",
256256
]
257257

258258

@@ -269,7 +269,7 @@ def test_create_table_with_properties(
269269
)
270270

271271
assert to_sql_calls(adapter) == [
272-
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT) PROPERTIES ('refresh_interval'='86400', 'replication_num'='1')",
272+
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT) PROPERTIES ('refresh_interval'='86400')",
273273
]
274274

275275

@@ -289,7 +289,7 @@ def test_create_table_with_partitioned_by(
289289
)
290290

291291
assert to_sql_calls(adapter) == [
292-
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` DATE) PARTITION BY RANGE (`b`) (FROM ('2000-11-14') TO ('2021-11-14') INTERVAL 2 YEAR) PROPERTIES ('replication_num'='1')",
292+
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` DATE) PARTITION BY RANGE (`b`) (FROM ('2000-11-14') TO ('2021-11-14') INTERVAL 2 YEAR)",
293293
]
294294

295295
adapter.cursor.execute.reset_mock()
@@ -311,7 +311,7 @@ def test_create_table_with_partitioned_by(
311311
)
312312

313313
assert to_sql_calls(adapter) == [
314-
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` DATE) PARTITION BY RANGE (`b`) (PARTITION `p201701` VALUES [('2017-01-01'), ('2017-02-01')), PARTITION `other` VALUES LESS THAN (MAXVALUE)) PROPERTIES ('replication_num'='1')",
314+
"CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` DATE) PARTITION BY RANGE (`b`) (PARTITION `p201701` VALUES [('2017-01-01'), ('2017-02-01')), PARTITION `other` VALUES LESS THAN (MAXVALUE))",
315315
]
316316

317317

@@ -422,7 +422,7 @@ def test_create_table_with_single_string_distributed_by(
422422
)
423423

424424
assert to_sql_calls(adapter) == [
425-
"CREATE TABLE IF NOT EXISTS `test_table` (`recordid` INT, `name` VARCHAR) DISTRIBUTED BY HASH (`recordid`) BUCKETS 10 PROPERTIES ('replication_num'='1')",
425+
"CREATE TABLE IF NOT EXISTS `test_table` (`recordid` INT, `name` VARCHAR) DISTRIBUTED BY HASH (`recordid`) BUCKETS 10",
426426
]
427427

428428

tests/core/test_context.py

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
from tests.utils.test_helpers import use_terminal_console
6565
from tests.utils.test_filesystem import create_temp_file
6666

67+
import logging
68+
logger = logging.getLogger(__name__)
6769

6870
def test_global_config(copy_to_temp_path: t.Callable):
6971
context = Context(paths=copy_to_temp_path("examples/sushi"))
@@ -2419,7 +2421,8 @@ def test_plan_min_intervals(tmp_path: Path):
24192421
paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
24202422
)
24212423

2422-
current_time = to_datetime("2020-02-01 00:00:01")
2424+
current_year = datetime.now().year
2425+
current_time = to_datetime(f"{current_year}-02-01 00:00:01")
24232426

24242427
# initial state of example project
24252428
context.plan(auto_apply=True, execution_time=current_time)
@@ -2463,14 +2466,14 @@ def test_plan_min_intervals(tmp_path: Path):
24632466
select @start_ds as start_ds, @end_ds as end_ds, @start_dt as start_dt, @end_dt as end_dt;
24642467
""")
24652468

2466-
(tmp_path / "models" / "ended_daily_model.sql").write_text("""
2469+
(tmp_path / "models" / "ended_daily_model.sql").write_text(f"""
24672470
MODEL (
24682471
name sqlmesh_example.ended_daily_model,
24692472
kind INCREMENTAL_BY_TIME_RANGE (
24702473
time_column start_dt
24712474
),
24722475
start '2020-01-01',
2473-
end '2020-01-18',
2476+
end '{current_year}-01-18',
24742477
cron '@daily'
24752478
);
24762479
@@ -2483,8 +2486,8 @@ def test_plan_min_intervals(tmp_path: Path):
24832486
plan = context.plan(execution_time=current_time)
24842487

24852488
assert to_datetime(plan.start) == to_datetime("2020-01-01 00:00:00")
2486-
assert to_datetime(plan.end) == to_datetime("2020-02-01 00:00:00")
2487-
assert to_datetime(plan.execution_time) == to_datetime("2020-02-01 00:00:01")
2489+
assert to_datetime(plan.end) == to_datetime(f"{current_year}-02-01 00:00:00")
2490+
assert to_datetime(plan.execution_time) == to_datetime(f"{current_year}-02-01 00:00:01")
24882491

24892492
def _get_missing_intervals(plan: Plan, name: str) -> t.List[t.Tuple[datetime, datetime]]:
24902493
snapshot_id = context.get_snapshot(name, raise_if_missing=True).snapshot_id
@@ -2497,19 +2500,19 @@ def _get_missing_intervals(plan: Plan, name: str) -> t.List[t.Tuple[datetime, da
24972500
assert len(plan.missing_intervals) == 4
24982501

24992502
assert _get_missing_intervals(plan, "sqlmesh_example.daily_model") == [
2500-
(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2503+
(to_datetime("2020-01-01 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
25012504
]
25022505
assert _get_missing_intervals(plan, "sqlmesh_example.weekly_model") == [
25032506
(
25042507
to_datetime("2020-01-01 00:00:00"),
2505-
to_datetime("2020-01-26 00:00:00"),
2508+
to_datetime(f"{current_year}-01-26 00:00:00"),
25062509
) # last week in 2020-01 hasnt fully elapsed yet
25072510
]
25082511
assert _get_missing_intervals(plan, "sqlmesh_example.monthly_model") == [
2509-
(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2512+
(to_datetime("2020-01-01 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
25102513
]
25112514
assert _get_missing_intervals(plan, "sqlmesh_example.ended_daily_model") == [
2512-
(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-01-19 00:00:00"))
2515+
(to_datetime("2020-01-01 00:00:00"), to_datetime(f"{current_year}-01-19 00:00:00"))
25132516
]
25142517

25152518
# now, create a dev env for "1 day ago" with min_intervals=1
@@ -2524,24 +2527,24 @@ def _get_missing_intervals(plan: Plan, name: str) -> t.List[t.Tuple[datetime, da
25242527
assert len(plan.missing_intervals) == 4
25252528

25262529
assert _get_missing_intervals(plan, "sqlmesh_example.daily_model") == [
2527-
(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2530+
(to_datetime(f"{current_year}-01-31 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
25282531
]
25292532
assert _get_missing_intervals(plan, "sqlmesh_example.weekly_model") == [
25302533
(
2531-
to_datetime("2020-01-19 00:00:00"), # last completed week
2532-
to_datetime("2020-01-26 00:00:00"),
2534+
to_datetime(f"{current_year}-01-19 00:00:00"), # last completed week
2535+
to_datetime(f"{current_year}-01-26 00:00:00"),
25332536
)
25342537
]
25352538
assert _get_missing_intervals(plan, "sqlmesh_example.monthly_model") == [
25362539
(
2537-
to_datetime("2020-01-01 00:00:00"), # last completed month
2538-
to_datetime("2020-02-01 00:00:00"),
2540+
to_datetime(f"{current_year}-01-01 00:00:00"), # last completed month
2541+
to_datetime(f"{current_year}-02-01 00:00:00"),
25392542
)
25402543
]
25412544
assert _get_missing_intervals(plan, "sqlmesh_example.ended_daily_model") == [
25422545
(
2543-
to_datetime("2020-01-18 00:00:00"), # last day before the model end date
2544-
to_datetime("2020-01-19 00:00:00"),
2546+
to_datetime(f"{current_year}-01-18 00:00:00"), # last day before the model end date
2547+
to_datetime(f"{current_year}-01-19 00:00:00"),
25452548
)
25462549
]
25472550

@@ -2551,21 +2554,21 @@ def _get_missing_intervals(plan: Plan, name: str) -> t.List[t.Tuple[datetime, da
25512554
# show that the data was created (which shows that when the Plan became an EvaluatablePlan and eventually evaluated, the start date overrides didnt get dropped)
25522555
assert context.engine_adapter.fetchall(
25532556
"select start_dt, end_dt from sqlmesh_example__pr_env.daily_model"
2554-
) == [(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]
2557+
) == [(to_datetime(f"{current_year}-01-31 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999"))]
25552558
assert context.engine_adapter.fetchall(
25562559
"select start_dt, end_dt from sqlmesh_example__pr_env.weekly_model"
25572560
) == [
2558-
(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-25 23:59:59.999999")),
2561+
(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-01-25 23:59:59.999999")),
25592562
]
25602563
assert context.engine_adapter.fetchall(
25612564
"select start_dt, end_dt from sqlmesh_example__pr_env.monthly_model"
25622565
) == [
2563-
(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-01-31 23:59:59.999999")),
2566+
(to_datetime(f"{current_year}-01-01 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999")),
25642567
]
25652568
assert context.engine_adapter.fetchall(
25662569
"select start_dt, end_dt from sqlmesh_example__pr_env.ended_daily_model"
25672570
) == [
2568-
(to_datetime("2020-01-18 00:00:00"), to_datetime("2020-01-18 23:59:59.999999")),
2571+
(to_datetime(f"{current_year}-01-18 00:00:00"), to_datetime(f"{current_year}-01-18 23:59:59.999999")),
25692572
]
25702573

25712574

@@ -2593,7 +2596,8 @@ def test_plan_min_intervals_adjusted_for_downstream(tmp_path: Path):
25932596
paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
25942597
)
25952598

2596-
current_time = to_datetime("2020-02-01 00:00:01")
2599+
current_year = datetime.now().year
2600+
current_time = to_datetime(f"{current_year}-02-01 00:00:01")
25972601

25982602
# initial state of example project
25992603
context.plan(auto_apply=True, execution_time=current_time)
@@ -2683,54 +2687,54 @@ def _get_missing_intervals(name: str) -> t.List[t.Tuple[datetime, datetime]]:
26832687
return [(to_datetime(s), to_datetime(e)) for s, e in snapshot_intervals.merged_intervals]
26842688

26852689
# We only operate on completed intervals, so given the current_time this is the range of the last completed week
2686-
_get_missing_intervals("sqlmesh_example.weekly_model") == [
2687-
(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-26 00:00:00"))
2690+
assert _get_missing_intervals("sqlmesh_example.weekly_model") == [
2691+
(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-01-26 00:00:00"))
26882692
]
26892693

26902694
# The daily model needs to cover the week, so it gets its start date moved back to line up
26912695
_get_missing_intervals("sqlmesh_example.daily_model") == [
2692-
(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2696+
(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
26932697
]
26942698

26952699
# The hourly model needs to cover both the daily model and the weekly model, so it also gets its start date moved back to line up with the weekly model
26962700
assert _get_missing_intervals("sqlmesh_example.hourly_model") == [
2697-
(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2701+
(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
26982702
]
26992703

27002704
# The two-hourly model only needs to cover 2 hours and should be unaffected by the fact its sibling node has a weekly child node
27012705
# However it still gets backfilled for 24 hours because the plan start is 1 day and this satisfies min_intervals: 1
27022706
assert _get_missing_intervals("sqlmesh_example.two_hourly_model") == [
2703-
(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2707+
(to_datetime(f"{current_year}-01-31 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
27042708
]
27052709

27062710
# The unrelated model has no upstream constraints, so its start date doesnt get moved to line up with the weekly model
27072711
# However it still gets backfilled for 24 hours because the plan start is 1 day and this satisfies min_intervals: 1
27082712
_get_missing_intervals("sqlmesh_example.unrelated_monthly_model") == [
2709-
(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2713+
(to_datetime(f"{current_year}-01-01 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
27102714
]
27112715

27122716
# Check that actually running the plan produces the correct result, since missing intervals are re-calculated in the evaluator
27132717
context.apply(plan)
27142718

27152719
assert context.engine_adapter.fetchall(
27162720
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.weekly_model"
2717-
) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-25 23:59:59.999999"))]
2721+
) == [(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-01-25 23:59:59.999999"))]
27182722

27192723
assert context.engine_adapter.fetchall(
27202724
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.daily_model"
2721-
) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]
2725+
) == [(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999"))]
27222726

27232727
assert context.engine_adapter.fetchall(
27242728
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.hourly_model"
2725-
) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]
2729+
) == [(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999"))]
27262730

27272731
assert context.engine_adapter.fetchall(
27282732
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.two_hourly_model"
2729-
) == [(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]
2733+
) == [(to_datetime(f"{current_year}-01-31 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999"))]
27302734

27312735
assert context.engine_adapter.fetchall(
27322736
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.unrelated_monthly_model"
2733-
) == [(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]
2737+
) == [(to_datetime(f"{current_year}-01-01 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999"))]
27342738

27352739

27362740
def test_defaults_pre_post_statements(tmp_path: Path):

0 commit comments

Comments
 (0)