Skip to content

Commit ff7c02b

Browse files
committed
Add readiness check for Doris backends
1 parent 78b6c4f commit ff7c02b

File tree

3 files changed

+94
-18
lines changed

3 files changed

+94
-18
lines changed

.circleci/wait-for-db.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,23 @@ clickhouse_ready() {
3636

3737
doris_ready() {
3838
probe_port 9030
39+
40+
# Check that we have 3 alive backends
41+
echo "Checking for 3 alive Doris backends..."
42+
43+
while true; do
44+
echo "Checking Doris backends..."
45+
# Use docker compose exec to run mysql command inside the fe-01 container
46+
# Use timeout to prevent hanging, and handle connection errors gracefully
47+
ALIVE_BACKENDS=$(timeout 10 docker compose -f tests/core/engine_adapter/integration/docker/compose.doris.yaml exec -T fe-01 mysql -uroot -e "show backends \G" 2>/dev/null | grep -c "Alive: true" || echo "0")
48+
echo "Found $ALIVE_BACKENDS alive backends"
49+
if [ "$ALIVE_BACKENDS" -ge 3 ]; then
50+
echo "Doris has 3 or more alive backends"
51+
break
52+
fi
53+
echo "Waiting for more backends to become alive..."
54+
sleep 5
55+
done
3956
}
4057

4158
postgres_ready() {

sqlmesh/core/model/meta.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,11 @@ def physical_properties(self) -> t.Dict[str, exp.Expression]:
470470

471471
# For INCREMENTAL_BY_UNIQUE_KEY models for doris dialect, add the unique_key to physical_properties
472472
# so it gets passed to table_properties during table creation
473-
if isinstance(self.kind, IncrementalByUniqueKeyKind) and self.unique_key and self.dialect == "doris":
473+
if (
474+
isinstance(self.kind, IncrementalByUniqueKeyKind)
475+
and self.unique_key
476+
and self.dialect == "doris"
477+
):
474478
# Convert unique_key expressions to a format suitable for table_properties
475479
if len(self.unique_key) == 1:
476480
# Single column key

tests/core/test_context.py

Lines changed: 72 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@
6464
from tests.utils.test_helpers import use_terminal_console
6565
from tests.utils.test_filesystem import create_temp_file
6666

67-
import logging
68-
logger = logging.getLogger(__name__)
6967

7068
def test_global_config(copy_to_temp_path: t.Callable):
7169
context = Context(paths=copy_to_temp_path("examples/sushi"))
@@ -2527,7 +2525,10 @@ def _get_missing_intervals(plan: Plan, name: str) -> t.List[t.Tuple[datetime, da
25272525
assert len(plan.missing_intervals) == 4
25282526

25292527
assert _get_missing_intervals(plan, "sqlmesh_example.daily_model") == [
2530-
(to_datetime(f"{current_year}-01-31 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
2528+
(
2529+
to_datetime(f"{current_year}-01-31 00:00:00"),
2530+
to_datetime(f"{current_year}-02-01 00:00:00"),
2531+
)
25312532
]
25322533
assert _get_missing_intervals(plan, "sqlmesh_example.weekly_model") == [
25332534
(
@@ -2554,21 +2555,35 @@ def _get_missing_intervals(plan: Plan, name: str) -> t.List[t.Tuple[datetime, da
25542555
# show that the data was created (which shows that when the Plan became an EvaluatablePlan and eventually evaluated, the start date overrides didnt get dropped)
25552556
assert context.engine_adapter.fetchall(
25562557
"select start_dt, end_dt from sqlmesh_example__pr_env.daily_model"
2557-
) == [(to_datetime(f"{current_year}-01-31 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999"))]
2558+
) == [
2559+
(
2560+
to_datetime(f"{current_year}-01-31 00:00:00"),
2561+
to_datetime(f"{current_year}-01-31 23:59:59.999999"),
2562+
)
2563+
]
25582564
assert context.engine_adapter.fetchall(
25592565
"select start_dt, end_dt from sqlmesh_example__pr_env.weekly_model"
25602566
) == [
2561-
(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-01-25 23:59:59.999999")),
2567+
(
2568+
to_datetime(f"{current_year}-01-19 00:00:00"),
2569+
to_datetime(f"{current_year}-01-25 23:59:59.999999"),
2570+
),
25622571
]
25632572
assert context.engine_adapter.fetchall(
25642573
"select start_dt, end_dt from sqlmesh_example__pr_env.monthly_model"
25652574
) == [
2566-
(to_datetime(f"{current_year}-01-01 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999")),
2575+
(
2576+
to_datetime(f"{current_year}-01-01 00:00:00"),
2577+
to_datetime(f"{current_year}-01-31 23:59:59.999999"),
2578+
),
25672579
]
25682580
assert context.engine_adapter.fetchall(
25692581
"select start_dt, end_dt from sqlmesh_example__pr_env.ended_daily_model"
25702582
) == [
2571-
(to_datetime(f"{current_year}-01-18 00:00:00"), to_datetime(f"{current_year}-01-18 23:59:59.999999")),
2583+
(
2584+
to_datetime(f"{current_year}-01-18 00:00:00"),
2585+
to_datetime(f"{current_year}-01-18 23:59:59.999999"),
2586+
),
25722587
]
25732588

25742589

@@ -2688,53 +2703,93 @@ def _get_missing_intervals(name: str) -> t.List[t.Tuple[datetime, datetime]]:
26882703

26892704
# We only operate on completed intervals, so given the current_time this is the range of the last completed week
26902705
assert _get_missing_intervals("sqlmesh_example.weekly_model") == [
2691-
(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-01-26 00:00:00"))
2706+
(
2707+
to_datetime(f"{current_year}-01-19 00:00:00"),
2708+
to_datetime(f"{current_year}-01-26 00:00:00"),
2709+
)
26922710
]
26932711

26942712
# The daily model needs to cover the week, so it gets its start date moved back to line up
26952713
_get_missing_intervals("sqlmesh_example.daily_model") == [
2696-
(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
2714+
(
2715+
to_datetime(f"{current_year}-01-19 00:00:00"),
2716+
to_datetime(f"{current_year}-02-01 00:00:00"),
2717+
)
26972718
]
26982719

26992720
# The hourly model needs to cover both the daily model and the weekly model, so it also gets its start date moved back to line up with the weekly model
27002721
assert _get_missing_intervals("sqlmesh_example.hourly_model") == [
2701-
(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
2722+
(
2723+
to_datetime(f"{current_year}-01-19 00:00:00"),
2724+
to_datetime(f"{current_year}-02-01 00:00:00"),
2725+
)
27022726
]
27032727

27042728
# The two-hourly model only needs to cover 2 hours and should be unaffected by the fact its sibling node has a weekly child node
27052729
# However it still gets backfilled for 24 hours because the plan start is 1 day and this satisfies min_intervals: 1
27062730
assert _get_missing_intervals("sqlmesh_example.two_hourly_model") == [
2707-
(to_datetime(f"{current_year}-01-31 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
2731+
(
2732+
to_datetime(f"{current_year}-01-31 00:00:00"),
2733+
to_datetime(f"{current_year}-02-01 00:00:00"),
2734+
)
27082735
]
27092736

27102737
# The unrelated model has no upstream constraints, so its start date doesnt get moved to line up with the weekly model
27112738
# However it still gets backfilled for 24 hours because the plan start is 1 day and this satisfies min_intervals: 1
27122739
_get_missing_intervals("sqlmesh_example.unrelated_monthly_model") == [
2713-
(to_datetime(f"{current_year}-01-01 00:00:00"), to_datetime(f"{current_year}-02-01 00:00:00"))
2740+
(
2741+
to_datetime(f"{current_year}-01-01 00:00:00"),
2742+
to_datetime(f"{current_year}-02-01 00:00:00"),
2743+
)
27142744
]
27152745

27162746
# Check that actually running the plan produces the correct result, since missing intervals are re-calculated in the evaluator
27172747
context.apply(plan)
27182748

27192749
assert context.engine_adapter.fetchall(
27202750
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.weekly_model"
2721-
) == [(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-01-25 23:59:59.999999"))]
2751+
) == [
2752+
(
2753+
to_datetime(f"{current_year}-01-19 00:00:00"),
2754+
to_datetime(f"{current_year}-01-25 23:59:59.999999"),
2755+
)
2756+
]
27222757

27232758
assert context.engine_adapter.fetchall(
27242759
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.daily_model"
2725-
) == [(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999"))]
2760+
) == [
2761+
(
2762+
to_datetime(f"{current_year}-01-19 00:00:00"),
2763+
to_datetime(f"{current_year}-01-31 23:59:59.999999"),
2764+
)
2765+
]
27262766

27272767
assert context.engine_adapter.fetchall(
27282768
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.hourly_model"
2729-
) == [(to_datetime(f"{current_year}-01-19 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999"))]
2769+
) == [
2770+
(
2771+
to_datetime(f"{current_year}-01-19 00:00:00"),
2772+
to_datetime(f"{current_year}-01-31 23:59:59.999999"),
2773+
)
2774+
]
27302775

27312776
assert context.engine_adapter.fetchall(
27322777
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.two_hourly_model"
2733-
) == [(to_datetime(f"{current_year}-01-31 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999"))]
2778+
) == [
2779+
(
2780+
to_datetime(f"{current_year}-01-31 00:00:00"),
2781+
to_datetime(f"{current_year}-01-31 23:59:59.999999"),
2782+
)
2783+
]
27342784

27352785
assert context.engine_adapter.fetchall(
27362786
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.unrelated_monthly_model"
2737-
) == [(to_datetime(f"{current_year}-01-01 00:00:00"), to_datetime(f"{current_year}-01-31 23:59:59.999999"))]
2787+
) == [
2788+
(
2789+
to_datetime(f"{current_year}-01-01 00:00:00"),
2790+
to_datetime(f"{current_year}-01-31 23:59:59.999999"),
2791+
)
2792+
]
27382793

27392794

27402795
def test_defaults_pre_post_statements(tmp_path: Path):

0 commit comments

Comments
 (0)