Skip to content

Commit 151210b

Browse files
authored
Feat: Drop schemas for expired environments (#1177)
* drop schemas for expired environments * add test * fix location for dropping schemas * airflow drop environment schemas * remove the demotion of snapshots * cleanup * fix cleanup test
1 parent efdd487 commit 151210b

4 files changed

Lines changed: 48 additions & 7 deletions

File tree

sqlmesh/core/context.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,8 +1170,13 @@ def _load_configs(
11701170

11711171
def _run_janitor(self) -> None:
11721172
expired_environments = self.state_sync.delete_expired_environments()
1173-
for expired_environment in expired_environments:
1174-
self.snapshot_evaluator.demote(expired_environment.snapshots, expired_environment.name)
1173+
expired_schemas = {
1174+
snapshot.qualified_view_name.schema_for_environment(expired_environment.name)
1175+
for expired_environment in expired_environments
1176+
for snapshot in expired_environment.snapshots
1177+
}
1178+
for expired_schema in expired_schemas:
1179+
self.engine_adapter.drop_schema(expired_schema, ignore_if_not_exists=True, cascade=True)
11751180

11761181
expired_snapshots = self.state_sync.delete_expired_snapshots()
11771182
self.snapshot_evaluator.cleanup(expired_snapshots)

sqlmesh/engines/commands.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,14 @@ def cleanup(
114114
) -> None:
115115
if isinstance(command_payload, str):
116116
command_payload = CleanupCommandPayload.parse_raw(command_payload)
117-
for environment in command_payload.environments:
118-
evaluator.demote(environment.snapshots, environment.name)
117+
118+
expired_schemas = {
119+
snapshot.qualified_view_name.schema_for_environment(environment.name)
120+
for environment in command_payload.environments
121+
for snapshot in environment.snapshots
122+
}
123+
for expired_schema in expired_schemas:
124+
evaluator.adapter.drop_schema(expired_schema, ignore_if_not_exists=True, cascade=True)
119125
evaluator.cleanup(command_payload.snapshots)
120126

121127

tests/core/test_integration.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,33 @@ def test_incremental_time_self_reference(
711711
)
712712

713713

714+
@pytest.mark.integration
715+
@pytest.mark.core_integration
716+
def test_invalidating_environment(sushi_context: Context):
717+
def get_schemas() -> t.Set[str]:
718+
return set(
719+
sushi_context.state_sync.state_sync.engine_adapter.fetchdf("SHOW ALL TABLES")["schema"] # type: ignore
720+
.to_dict()
721+
.values()
722+
)
723+
724+
environment = "dev"
725+
apply_to_environment(sushi_context, environment)
726+
start_environment = sushi_context.state_sync.get_environment("dev")
727+
assert start_environment is not None
728+
start_schemas = get_schemas()
729+
sushi_context.invalidate_environment("dev")
730+
invalidate_environment = sushi_context.state_sync.get_environment("dev")
731+
assert invalidate_environment is not None
732+
schemas_prior_to_janitor = get_schemas()
733+
assert invalidate_environment.expiration_ts < start_environment.expiration_ts # type: ignore
734+
assert start_schemas == schemas_prior_to_janitor
735+
sushi_context._run_janitor()
736+
schemas_after_janitor = get_schemas()
737+
assert sushi_context.state_sync.get_environment("dev") is None
738+
assert start_schemas - schemas_after_janitor == {"sushi__dev"}
739+
740+
714741
def initial_add(context: Context, environment: str):
715742
assert not context.state_reader.get_environment(environment)
716743

tests/schedulers/airflow/operators/test_targets.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import typing as t
2+
from unittest.mock import call
23

34
import pytest
45
from airflow.exceptions import AirflowSkipException
@@ -134,7 +135,6 @@ def test_cleanup_target_execute(mocker: MockerFixture, make_snapshot: t.Callable
134135

135136
context = Context(ti=task_instance_mock) # type: ignore
136137

137-
evaluator_demote_mock = mocker.patch("sqlmesh.core.snapshot.evaluator.SnapshotEvaluator.demote")
138138
evaluator_cleanup_mock = mocker.patch(
139139
"sqlmesh.core.snapshot.evaluator.SnapshotEvaluator.cleanup"
140140
)
@@ -143,9 +143,12 @@ def test_cleanup_target_execute(mocker: MockerFixture, make_snapshot: t.Callable
143143

144144
target = targets.SnapshotCleanupTarget()
145145

146-
target.execute(context, lambda: mocker.Mock(), "spark")
146+
evaluator_adapter_mock = mocker.MagicMock()
147+
target.execute(context, lambda: evaluator_adapter_mock, "spark")
147148

148-
evaluator_demote_mock.assert_called_once_with([snapshot.table_info], "test_env")
149+
evaluator_adapter_mock.cursor().execute.assert_has_calls(
150+
[call("DROP SCHEMA IF EXISTS default__test_env CASCADE")]
151+
)
149152
evaluator_cleanup_mock.assert_called_once_with([snapshot.table_info])
150153

151154
task_instance_mock.xcom_pull.assert_called_once_with(key="snapshot_cleanup_command")

0 commit comments

Comments
 (0)