Skip to content

Commit 1b26a63

Browse files
authored
Merge branch 'main' into fix/local-only-format
2 parents 6574d62 + d15203b commit 1b26a63

8 files changed

Lines changed: 388 additions & 38 deletions

File tree

docs/reference/cli.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,9 +307,17 @@ Usage: sqlmesh janitor [OPTIONS]
307307
The janitor cleans up old environments and expired snapshots.
308308
309309
Options:
310-
--ignore-ttl Cleanup snapshots that are not referenced in any environment,
311-
regardless of when they're set to expire
312-
--help Show this message and exit.
310+
--ignore-ttl Cleanup snapshots that are not referenced in any
311+
environment, regardless of when they're set to expire. Has
312+
no effect when --environment is specified.
313+
--force-delete Delete expired environment and snapshot state records even
314+
when the physical table or view drops fail. Any objects
315+
that could not be dropped become orphaned and must be
316+
removed manually.
317+
-e, --environment TEXT
318+
Scope cleanup to a single expired environment. Global
319+
snapshot and interval compaction are skipped.
320+
--help Show this message and exit.
313321
```
314322

315323
## migrate

sqlmesh/cli/main.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -633,24 +633,36 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
633633
@click.option(
634634
"--ignore-ttl",
635635
is_flag=True,
636-
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
636+
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire. Has no effect when --environment is specified.",
637637
)
638638
@click.option(
639639
"--force-delete",
640640
is_flag=True,
641641
help="Delete expired environment and snapshot state records even when the physical table or view drops fail. "
642642
"Any objects that could not be dropped become orphaned and must be removed manually.",
643643
)
644+
@click.option(
645+
"--environment",
646+
"-e",
647+
default=None,
648+
help="Scope cleanup to a single expired environment. Global snapshot and interval compaction are skipped.",
649+
)
644650
@click.pass_context
645651
@error_handler
646652
@cli_analytics
647-
def janitor(ctx: click.Context, ignore_ttl: bool, force_delete: bool, **kwargs: t.Any) -> None:
653+
def janitor(
654+
ctx: click.Context,
655+
ignore_ttl: bool,
656+
force_delete: bool,
657+
environment: t.Optional[str],
658+
**kwargs: t.Any,
659+
) -> None:
648660
"""
649661
Run the janitor process on-demand.
650662
651663
The janitor cleans up old environments and expired snapshots.
652664
"""
653-
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, **kwargs)
665+
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, environment=environment, **kwargs)
654666

655667

656668
@cli.command("destroy")

sqlmesh/core/context.py

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -890,12 +890,20 @@ def _has_environment_changed() -> bool:
890890
return completion_status
891891

892892
@python_api_analytics
893-
def run_janitor(self, ignore_ttl: bool, force_delete: bool = False) -> bool:
893+
def run_janitor(
894+
self,
895+
ignore_ttl: bool,
896+
force_delete: bool = False,
897+
environment: t.Optional[str] = None,
898+
) -> bool:
899+
if environment is not None:
900+
environment = Environment.sanitize_name(environment)
901+
894902
success = False
895903

896904
if self.console.start_cleanup(ignore_ttl):
897905
try:
898-
self._run_janitor(ignore_ttl, force_delete=force_delete)
906+
self._run_janitor(ignore_ttl, force_delete=force_delete, environment=environment)
899907
success = True
900908
finally:
901909
self.console.stop_cleanup(success=success)
@@ -1828,7 +1836,7 @@ def invalidate_environment(self, name: str, sync: bool = False) -> None:
18281836
name = Environment.sanitize_name(name)
18291837
self.state_sync.invalidate_environment(name)
18301838
if sync:
1831-
self._cleanup_environments()
1839+
self._cleanup_environments(name=name)
18321840
self.console.log_success(f"Environment '{name}' deleted.")
18331841
else:
18341842
self.console.log_success(f"Environment '{name}' invalidated.")
@@ -2899,27 +2907,35 @@ def _destroy(self) -> bool:
28992907

29002908
return True
29012909

2902-
def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) -> None:
2910+
def _run_janitor(
2911+
self,
2912+
ignore_ttl: bool = False,
2913+
force_delete: bool = False,
2914+
environment: t.Optional[str] = None,
2915+
) -> None:
29032916
current_ts = now_timestamp()
29042917
failures: t.List[str] = []
29052918

29062919
# Clean up expired environments by removing their views and schemas
29072920
failures.extend(
2908-
self._cleanup_environments(current_ts=current_ts, force_delete=force_delete)
2921+
self._cleanup_environments(
2922+
current_ts=current_ts, force_delete=force_delete, name=environment
2923+
)
29092924
)
29102925

2911-
failures.extend(
2912-
delete_expired_snapshots(
2913-
self.state_sync,
2914-
self.snapshot_evaluator,
2915-
current_ts=current_ts,
2916-
ignore_ttl=ignore_ttl,
2917-
force_delete=force_delete,
2918-
console=self.console,
2919-
batch_size=self.config.janitor.expired_snapshots_batch_size,
2926+
if environment is None:
2927+
failures.extend(
2928+
delete_expired_snapshots(
2929+
self.state_sync,
2930+
self.snapshot_evaluator,
2931+
current_ts=current_ts,
2932+
ignore_ttl=ignore_ttl,
2933+
force_delete=force_delete,
2934+
console=self.console,
2935+
batch_size=self.config.janitor.expired_snapshots_batch_size,
2936+
)
29202937
)
2921-
)
2922-
self.state_sync.compact_intervals()
2938+
self.state_sync.compact_intervals()
29232939

29242940
if failures:
29252941
failure_string = "\n - ".join(failures)
@@ -2932,15 +2948,23 @@ def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) ->
29322948
raise SQLMeshError(summary)
29332949

29342950
def _cleanup_environments(
2935-
self, current_ts: t.Optional[int] = None, force_delete: bool = False
2951+
self,
2952+
current_ts: t.Optional[int] = None,
2953+
force_delete: bool = False,
2954+
name: t.Optional[str] = None,
29362955
) -> t.List[str]:
29372956
current_ts = current_ts or now_timestamp()
29382957
failures: t.List[str] = []
29392958

29402959
expired_environments_summaries = self.state_sync.get_expired_environments(
2941-
current_ts=current_ts
2960+
current_ts=current_ts, name=name
29422961
)
29432962

2963+
if name is not None and not expired_environments_summaries:
2964+
self.console.log_warning(
2965+
f"Environment '{name}' is not expired or does not exist. Nothing to clean up."
2966+
)
2967+
29442968
for expired_env_summary in expired_environments_summaries:
29452969
expired_env = self.state_reader.get_environment(expired_env_summary.name)
29462970

@@ -2957,7 +2981,7 @@ def _cleanup_environments(
29572981
# we want to retry on the next janitor pass if drops failed, unless
29582982
# force_delete is set in which case we purge state records regardless
29592983
if not failures or force_delete:
2960-
self.state_sync.delete_expired_environments(current_ts=current_ts)
2984+
self.state_sync.delete_expired_environments(current_ts=current_ts, name=name)
29612985
return failures
29622986

29632987
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:

sqlmesh/core/state_sync/base.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,17 @@ def get_expired_snapshots(
321321
"""
322322

323323
@abc.abstractmethod
324-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
324+
def get_expired_environments(
325+
self, current_ts: int, name: t.Optional[str] = None
326+
) -> t.List[EnvironmentSummary]:
325327
"""Returns the expired environments.
326328
327329
Expired environments are environments that have exceeded their time-to-live value.
330+
331+
Args:
332+
current_ts: The current timestamp in milliseconds used to determine expiration.
333+
name: If provided, only the environment with this name is considered.
334+
328335
Returns:
329336
The list of environment summaries to remove.
330337
"""
@@ -436,12 +443,16 @@ def finalize(self, environment: Environment) -> None:
436443

437444
@abc.abstractmethod
438445
def delete_expired_environments(
439-
self, current_ts: t.Optional[int] = None
446+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
440447
) -> t.List[EnvironmentSummary]:
441448
"""Removes expired environments.
442449
443450
Expired environments are environments that have exceeded their time-to-live value.
444451
452+
Args:
453+
current_ts: The current timestamp in milliseconds. Defaults to now.
454+
name: If provided, only the environment with this name is deleted.
455+
445456
Returns:
446457
The list of removed environments.
447458
"""

sqlmesh/core/state_sync/db/environment.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,31 +167,42 @@ def finalize(self, environment: Environment) -> None:
167167
where=environment_filter,
168168
)
169169

170-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
170+
def get_expired_environments(
171+
self, current_ts: int, name: t.Optional[str] = None
172+
) -> t.List[EnvironmentSummary]:
171173
"""Returns the expired environments.
172174
173175
Expired environments are environments that have exceeded their time-to-live value.
176+
177+
Args:
178+
current_ts: The current timestamp in milliseconds used to determine expiration.
179+
name: If provided, only the environment with this name is considered.
180+
174181
Returns:
175182
The list of environment summaries to remove.
176183
"""
177184
return self._fetch_environment_summaries(
178-
where=self._create_expiration_filter_expr(current_ts)
185+
where=self._create_expiration_filter_expr(current_ts, name=name)
179186
)
180187

181188
def delete_expired_environments(
182-
self, current_ts: t.Optional[int] = None
189+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
183190
) -> t.List[EnvironmentSummary]:
184191
"""Deletes expired environments.
185192
193+
Args:
194+
current_ts: The current timestamp in milliseconds. Defaults to now.
195+
name: If provided, only the environment with this name is deleted.
196+
186197
Returns:
187198
A list of deleted environments.
188199
"""
189200
current_ts = current_ts or now_timestamp()
190-
expired_environments = self.get_expired_environments(current_ts=current_ts)
201+
expired_environments = self.get_expired_environments(current_ts=current_ts, name=name)
191202

192203
self.engine_adapter.delete_from(
193204
self.environments_table,
194-
where=self._create_expiration_filter_expr(current_ts),
205+
where=self._create_expiration_filter_expr(current_ts, name=name),
195206
)
196207

197208
# Delete the expired environments' corresponding environment statements
@@ -310,16 +321,22 @@ def _environments_query(
310321
return query.lock(copy=False)
311322
return query
312323

313-
def _create_expiration_filter_expr(self, current_ts: int) -> exp.Expr:
324+
def _create_expiration_filter_expr(
325+
self, current_ts: int, name: t.Optional[str] = None
326+
) -> exp.Expr:
314327
"""Creates a SQLGlot filter expression to find expired environments.
315328
316329
Args:
317330
current_ts: The current timestamp.
331+
name: If provided, adds an equality filter on the environment name.
318332
"""
319-
return exp.LTE(
333+
where: exp.Expr = exp.LTE(
320334
this=exp.column("expiration_ts"),
321335
expression=exp.Literal.number(current_ts),
322336
)
337+
if name is not None:
338+
where = exp.and_(t.cast(exp.Condition, where), exp.column("name").eq(name))
339+
return where
323340

324341
def _fetch_environment_summaries(
325342
self, where: t.Optional[str | exp.Expr] = None

sqlmesh/core/state_sync/db/facade.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,10 @@ def get_expired_snapshots(
276276
batch_range=batch_range,
277277
)
278278

279-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
280-
return self.environment_state.get_expired_environments(current_ts=current_ts)
279+
def get_expired_environments(
280+
self, current_ts: int, name: t.Optional[str] = None
281+
) -> t.List[EnvironmentSummary]:
282+
return self.environment_state.get_expired_environments(current_ts=current_ts, name=name)
281283

282284
@transactional()
283285
def delete_expired_snapshots(
@@ -297,10 +299,10 @@ def delete_expired_snapshots(
297299

298300
@transactional()
299301
def delete_expired_environments(
300-
self, current_ts: t.Optional[int] = None
302+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
301303
) -> t.List[EnvironmentSummary]:
302304
current_ts = current_ts or now_timestamp()
303-
return self.environment_state.delete_expired_environments(current_ts=current_ts)
305+
return self.environment_state.delete_expired_environments(current_ts=current_ts, name=name)
304306

305307
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
306308
self.snapshot_state.delete_snapshots(snapshot_ids)

0 commit comments

Comments
 (0)