Skip to content

Commit d38f9d3

Browse files
Merge branch 'main' into themis/missingref
2 parents 557c907 + e1510ce commit d38f9d3

File tree

60 files changed

+1415
-542
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+1415
-542
lines changed

docs/reference/configuration.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,10 @@ Formatting settings for the `sqlmesh format` command and UI.
125125

126126
Configuration for the `sqlmesh janitor` command.
127127

128-
| Option | Description | Type | Required |
129-
|--------------------------|----------------------------------------------------------------------------------------------------------------------------|:-------:|:--------:|
130-
| `warn_on_delete_failure` | Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views (Default: False) | boolean | N |
128+
| Option | Description | Type | Required |
129+
|---------------------------------|----------------------------------------------------------------------------------------------------------------------------|:-------:|:--------:|
130+
| `warn_on_delete_failure` | Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views (Default: False) | boolean | N |
131+
| `expired_snapshots_batch_size` | Maximum number of expired snapshots to clean in a single batch (Default: 200) | int | N |
131132

132133

133134
## UI

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies = [
2424
"requests",
2525
"rich[jupyter]",
2626
"ruamel.yaml",
27-
"sqlglot[rs]~=27.20.0",
27+
"sqlglot[rs]~=27.24.2",
2828
"tenacity",
2929
"time-machine",
3030
"json-stream"

sqlmesh/core/config/janitor.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,26 @@
11
from __future__ import annotations
22

3+
import typing as t
34

45
from sqlmesh.core.config.base import BaseConfig
6+
from sqlmesh.utils.pydantic import field_validator
57

68

79
class JanitorConfig(BaseConfig):
810
"""The configuration for the janitor.
911
1012
Args:
1113
warn_on_delete_failure: Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views.
14+
expired_snapshots_batch_size: Maximum number of expired snapshots to clean in a single batch.
1215
"""
1316

1417
warn_on_delete_failure: bool = False
18+
expired_snapshots_batch_size: t.Optional[int] = None
19+
20+
@field_validator("expired_snapshots_batch_size", mode="before")
21+
@classmethod
22+
def _validate_batch_size(cls, value: int) -> int:
23+
batch_size = int(value)
24+
if batch_size <= 0:
25+
raise ValueError("expired_snapshots_batch_size must be greater than 0")
26+
return batch_size

sqlmesh/core/context.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@
107107
CachingStateSync,
108108
StateReader,
109109
StateSync,
110-
cleanup_expired_views,
111110
)
111+
from sqlmesh.core.janitor import cleanup_expired_views, delete_expired_snapshots
112112
from sqlmesh.core.table_diff import TableDiff
113113
from sqlmesh.core.test import (
114114
ModelTextTestResult,
@@ -2852,19 +2852,14 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
28522852
# Clean up expired environments by removing their views and schemas
28532853
self._cleanup_environments(current_ts=current_ts)
28542854

2855-
cleanup_targets = self.state_sync.get_expired_snapshots(
2856-
ignore_ttl=ignore_ttl, current_ts=current_ts
2857-
)
2858-
2859-
# Remove the expired snapshots tables
2860-
self.snapshot_evaluator.cleanup(
2861-
target_snapshots=cleanup_targets,
2862-
on_complete=self.console.update_cleanup_progress,
2855+
delete_expired_snapshots(
2856+
self.state_sync,
2857+
self.snapshot_evaluator,
2858+
current_ts=current_ts,
2859+
ignore_ttl=ignore_ttl,
2860+
console=self.console,
2861+
batch_size=self.config.janitor.expired_snapshots_batch_size,
28632862
)
2864-
2865-
# Delete the expired snapshot records from the state sync
2866-
self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl, current_ts=current_ts)
2867-
28682863
self.state_sync.compact_intervals()
28692864

28702865
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:

sqlmesh/core/janitor.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
from __future__ import annotations
2+
3+
import typing as t
4+
5+
from sqlglot import exp
6+
7+
from sqlmesh.core.engine_adapter import EngineAdapter
8+
from sqlmesh.core.console import Console
9+
from sqlmesh.core.dialect import schema_
10+
from sqlmesh.core.environment import Environment
11+
from sqlmesh.core.snapshot import SnapshotEvaluator
12+
from sqlmesh.core.state_sync import StateSync
13+
from sqlmesh.core.state_sync.common import (
14+
logger,
15+
iter_expired_snapshot_batches,
16+
RowBoundary,
17+
ExpiredBatchRange,
18+
)
19+
from sqlmesh.utils.errors import SQLMeshError
20+
21+
22+
def cleanup_expired_views(
23+
default_adapter: EngineAdapter,
24+
engine_adapters: t.Dict[str, EngineAdapter],
25+
environments: t.List[Environment],
26+
warn_on_delete_failure: bool = False,
27+
console: t.Optional[Console] = None,
28+
) -> None:
29+
expired_schema_or_catalog_environments = [
30+
environment
31+
for environment in environments
32+
if environment.suffix_target.is_schema or environment.suffix_target.is_catalog
33+
]
34+
expired_table_environments = [
35+
environment for environment in environments if environment.suffix_target.is_table
36+
]
37+
38+
# We have to use the corresponding adapter if the virtual layer is gateway managed
39+
def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> EngineAdapter:
40+
if gateway_managed and gateway:
41+
return engine_adapters.get(gateway, default_adapter)
42+
return default_adapter
43+
44+
catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set()
45+
schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set()
46+
47+
# Collect schemas and catalogs to drop
48+
for engine_adapter, expired_catalog, expired_schema, suffix_target in {
49+
(
50+
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
51+
snapshot.qualified_view_name.catalog_for_environment(
52+
environment.naming_info, dialect=engine_adapter.dialect
53+
),
54+
snapshot.qualified_view_name.schema_for_environment(
55+
environment.naming_info, dialect=engine_adapter.dialect
56+
),
57+
environment.suffix_target,
58+
)
59+
for environment in expired_schema_or_catalog_environments
60+
for snapshot in environment.snapshots
61+
if snapshot.is_model and not snapshot.is_symbolic
62+
}:
63+
if suffix_target.is_catalog:
64+
if expired_catalog:
65+
catalogs_to_drop.add((engine_adapter, expired_catalog))
66+
else:
67+
schema = schema_(expired_schema, expired_catalog)
68+
schemas_to_drop.add((engine_adapter, schema))
69+
70+
# Drop the views for the expired environments
71+
for engine_adapter, expired_view in {
72+
(
73+
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
74+
snapshot.qualified_view_name.for_environment(
75+
environment.naming_info, dialect=engine_adapter.dialect
76+
),
77+
)
78+
for environment in expired_table_environments
79+
for snapshot in environment.snapshots
80+
if snapshot.is_model and not snapshot.is_symbolic
81+
}:
82+
try:
83+
engine_adapter.drop_view(expired_view, ignore_if_not_exists=True)
84+
if console:
85+
console.update_cleanup_progress(expired_view)
86+
except Exception as e:
87+
message = f"Failed to drop the expired environment view '{expired_view}': {e}"
88+
if warn_on_delete_failure:
89+
logger.warning(message)
90+
else:
91+
raise SQLMeshError(message) from e
92+
93+
# Drop the schemas for the expired environments
94+
for engine_adapter, schema in schemas_to_drop:
95+
try:
96+
engine_adapter.drop_schema(
97+
schema,
98+
ignore_if_not_exists=True,
99+
cascade=True,
100+
)
101+
if console:
102+
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
103+
except Exception as e:
104+
message = f"Failed to drop the expired environment schema '{schema}': {e}"
105+
if warn_on_delete_failure:
106+
logger.warning(message)
107+
else:
108+
raise SQLMeshError(message) from e
109+
110+
# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
111+
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
112+
for engine_adapter, catalog in catalogs_to_drop:
113+
if engine_adapter.SUPPORTS_CREATE_DROP_CATALOG:
114+
try:
115+
engine_adapter.drop_catalog(catalog)
116+
if console:
117+
console.update_cleanup_progress(catalog)
118+
except Exception as e:
119+
message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
120+
if warn_on_delete_failure:
121+
logger.warning(message)
122+
else:
123+
raise SQLMeshError(message) from e
124+
125+
126+
def delete_expired_snapshots(
127+
state_sync: StateSync,
128+
snapshot_evaluator: SnapshotEvaluator,
129+
*,
130+
current_ts: int,
131+
ignore_ttl: bool = False,
132+
batch_size: t.Optional[int] = None,
133+
console: t.Optional[Console] = None,
134+
) -> None:
135+
"""Delete all expired snapshots in batches.
136+
137+
This helper function encapsulates the logic for deleting expired snapshots in batches,
138+
eliminating code duplication across different use cases.
139+
140+
Args:
141+
state_sync: StateSync instance to query and delete expired snapshots from.
142+
snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
143+
current_ts: Timestamp used to evaluate expiration.
144+
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
145+
batch_size: Maximum number of snapshots to fetch per batch.
146+
console: Optional console for reporting progress.
147+
148+
Returns:
149+
The total number of deleted expired snapshots.
150+
"""
151+
num_expired_snapshots = 0
152+
for batch in iter_expired_snapshot_batches(
153+
state_reader=state_sync,
154+
current_ts=current_ts,
155+
ignore_ttl=ignore_ttl,
156+
batch_size=batch_size,
157+
):
158+
end_info = (
159+
f"updated_ts={batch.batch_range.end.updated_ts}"
160+
if isinstance(batch.batch_range.end, RowBoundary)
161+
else f"limit={batch.batch_range.end.batch_size}"
162+
)
163+
logger.info(
164+
"Processing batch of size %s with end %s",
165+
len(batch.expired_snapshot_ids),
166+
end_info,
167+
)
168+
snapshot_evaluator.cleanup(
169+
target_snapshots=batch.cleanup_tasks,
170+
on_complete=console.update_cleanup_progress if console else None,
171+
)
172+
state_sync.delete_expired_snapshots(
173+
batch_range=ExpiredBatchRange(
174+
start=RowBoundary.lowest_boundary(),
175+
end=batch.batch_range.end,
176+
),
177+
ignore_ttl=ignore_ttl,
178+
)
179+
logger.info("Cleaned up expired snapshots batch")
180+
num_expired_snapshots += len(batch.expired_snapshot_ids)
181+
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)

sqlmesh/core/state_sync/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,4 @@
2020
Versions as Versions,
2121
)
2222
from sqlmesh.core.state_sync.cache import CachingStateSync as CachingStateSync
23-
from sqlmesh.core.state_sync.common import cleanup_expired_views as cleanup_expired_views
2423
from sqlmesh.core.state_sync.db import EngineAdapterStateSync as EngineAdapterStateSync

sqlmesh/core/state_sync/base.py

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from sqlmesh import migrations
1212
from sqlmesh.core.environment import (
1313
Environment,
14-
EnvironmentNamingInfo,
1514
EnvironmentStatements,
1615
EnvironmentSummary,
1716
)
@@ -21,17 +20,20 @@
2120
SnapshotIdLike,
2221
SnapshotIdAndVersionLike,
2322
SnapshotInfoLike,
24-
SnapshotTableCleanupTask,
25-
SnapshotTableInfo,
2623
SnapshotNameVersion,
2724
SnapshotIdAndVersion,
2825
)
2926
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
3027
from sqlmesh.utils import major_minor
3128
from sqlmesh.utils.date import TimeLike
3229
from sqlmesh.utils.errors import SQLMeshError
33-
from sqlmesh.utils.pydantic import PydanticModel, ValidationInfo, field_validator
34-
from sqlmesh.core.state_sync.common import StateStream
30+
from sqlmesh.utils.pydantic import PydanticModel, field_validator
31+
from sqlmesh.core.state_sync.common import (
32+
StateStream,
33+
ExpiredSnapshotBatch,
34+
PromotionResult,
35+
ExpiredBatchRange,
36+
)
3537

3638
logger = logging.getLogger(__name__)
3739

@@ -72,20 +74,6 @@ def _schema_version_validator(cls, v: t.Any) -> int:
7274
SCHEMA_VERSION: int = MIN_SCHEMA_VERSION + len(MIGRATIONS) - 1
7375

7476

75-
class PromotionResult(PydanticModel):
76-
added: t.List[SnapshotTableInfo]
77-
removed: t.List[SnapshotTableInfo]
78-
removed_environment_naming_info: t.Optional[EnvironmentNamingInfo]
79-
80-
@field_validator("removed_environment_naming_info")
81-
def _validate_removed_environment_naming_info(
82-
cls, v: t.Optional[EnvironmentNamingInfo], info: ValidationInfo
83-
) -> t.Optional[EnvironmentNamingInfo]:
84-
if v and not info.data.get("removed"):
85-
raise ValueError("removed_environment_naming_info must be None if removed is empty")
86-
return v
87-
88-
8977
class StateReader(abc.ABC):
9078
"""Abstract base class for read-only operations on snapshot and environment state."""
9179

@@ -315,15 +303,21 @@ def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStre
315303

316304
@abc.abstractmethod
317305
def get_expired_snapshots(
318-
self, current_ts: t.Optional[int] = None, ignore_ttl: bool = False
319-
) -> t.List[SnapshotTableCleanupTask]:
320-
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
306+
self,
307+
*,
308+
batch_range: ExpiredBatchRange,
309+
current_ts: t.Optional[int] = None,
310+
ignore_ttl: bool = False,
311+
) -> t.Optional[ExpiredSnapshotBatch]:
312+
"""Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).
321313
322-
Expired snapshots are snapshots that have exceeded their time-to-live
323-
and are no longer in use within an environment.
314+
Args:
315+
current_ts: Timestamp used to evaluate expiration.
316+
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
317+
batch_range: The range of the batch to fetch.
324318
325319
Returns:
326-
The list of table cleanup tasks.
320+
A batch describing expired snapshots or None if no snapshots are pending cleanup.
327321
"""
328322

329323
@abc.abstractmethod
@@ -363,16 +357,21 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
363357

364358
@abc.abstractmethod
365359
def delete_expired_snapshots(
366-
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
360+
self,
361+
batch_range: ExpiredBatchRange,
362+
ignore_ttl: bool = False,
363+
current_ts: t.Optional[int] = None,
367364
) -> None:
368365
"""Removes expired snapshots.
369366
370367
Expired snapshots are snapshots that have exceeded their time-to-live
371368
and are no longer in use within an environment.
372369
373370
Args:
371+
batch_range: The range of snapshots to delete in this batch.
374372
ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting
375373
all snapshots that are not referenced in any environment
374+
current_ts: Timestamp used to evaluate expiration.
376375
"""
377376

378377
@abc.abstractmethod

0 commit comments

Comments
 (0)