Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sqlmesh/core/state_sync/db/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,12 @@ def get_expired_snapshots(
promoted_snapshot_ids = {
snapshot.snapshot_id
for environment in environments
for snapshot in environment.snapshots
for snapshot in (
environment.snapshots
if environment.finalized_ts is not None
# If the environment is not finalized, check both the current snapshots and the previous finalized snapshots
else [*environment.snapshots, *(environment.previous_finalized_snapshots or [])]
)
}

if promoted_snapshot_ids:
Expand Down
77 changes: 77 additions & 0 deletions tests/core/state_sync/test_state_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,83 @@ def test_delete_expired_snapshots_promoted(
assert not state_sync.get_snapshots(all_snapshots)


def test_delete_expired_snapshots_previous_finalized_snapshots(
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
):
"""Test that expired snapshots are protected if they are part of previous finalized snapshots
in a non-finalized environment."""
now_ts = now_timestamp()

# Create an old snapshot that will be expired
old_snapshot = make_snapshot(
SqlModel(
name="a",
query=parse_one("select a, ds"),
),
)
old_snapshot.ttl = "in 10 seconds"
old_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

# Create a new snapshot
new_snapshot = make_snapshot(
SqlModel(
name="a",
query=parse_one("select a, b, ds"),
),
)
new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

state_sync.push_snapshots([old_snapshot, new_snapshot])

# Promote the old snapshot to an environment and finalize it
env = Environment(
name="test_environment",
snapshots=[old_snapshot.table_info],
start_at="2022-01-01",
end_at="2022-01-01",
plan_id="test_plan_id",
previous_plan_id="test_plan_id",
)
state_sync.promote(env)
state_sync.finalize(env)

# Verify old snapshot is not cleaned up because it's in a finalized environment
assert not _get_cleanup_tasks(state_sync)

# Now promote the new snapshot to the same environment (this simulates a new plan)
# The environment will have previous_finalized_snapshots set to the old snapshot
# and will not be finalized yet
env = Environment(
name="test_environment",
snapshots=[new_snapshot.table_info],
previous_finalized_snapshots=[old_snapshot.table_info],
start_at="2022-01-01",
end_at="2022-01-01",
plan_id="new_plan_id",
previous_plan_id="test_plan_id",
)
state_sync.promote(env)

# Manually update the snapshtos updated_ts to simulate expiration
state_sync.engine_adapter.execute(
f"UPDATE sqlmesh._snapshots SET updated_ts = {now_ts - 15000} WHERE name = '{old_snapshot.name}' AND identifier = '{old_snapshot.identifier}'"
)

# The old snapshot should still not be cleaned up because it's part of
# previous_finalized_snapshots in a non-finalized environment
assert not _get_cleanup_tasks(state_sync)
state_sync.delete_expired_snapshots(batch_range=ExpiredBatchRange.all_batch_range())
assert state_sync.snapshots_exist([old_snapshot.snapshot_id]) == {old_snapshot.snapshot_id}

# Once the environment is finalized, the expired snapshot should be removed successfully
state_sync.finalize(env)
assert _get_cleanup_tasks(state_sync) == [
SnapshotTableCleanupTask(snapshot=old_snapshot.table_info, dev_table_only=False),
]
state_sync.delete_expired_snapshots(batch_range=ExpiredBatchRange.all_batch_range())
assert not state_sync.snapshots_exist([old_snapshot.snapshot_id])


def test_delete_expired_snapshots_dev_table_cleanup_only(
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
):
Expand Down