Skip to content

Commit 7c52733

Browse files
Merge branch 'main' into feat/local_tz
2 parents 40f64db + 278757c commit 7c52733

12 files changed

Lines changed: 429 additions & 40 deletions

File tree

docs/reference/cli.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,9 +307,17 @@ Usage: sqlmesh janitor [OPTIONS]
307307
The janitor cleans up old environments and expired snapshots.
308308
309309
Options:
310-
--ignore-ttl Cleanup snapshots that are not referenced in any environment,
311-
regardless of when they're set to expire
312-
--help Show this message and exit.
310+
--ignore-ttl Cleanup snapshots that are not referenced in any
311+
environment, regardless of when they're set to expire. Has
312+
no effect when --environment is specified.
313+
--force-delete Delete expired environment and snapshot state records even
314+
when the physical table or view drops fail. Any objects
315+
that could not be dropped become orphaned and must be
316+
removed manually.
317+
-e, --environment TEXT
318+
Scope cleanup to a single expired environment. Global
319+
snapshot and interval compaction are skipped.
320+
--help Show this message and exit.
313321
```
314322

315323
## migrate

sqlmesh/cli/main.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -637,24 +637,36 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
637637
@click.option(
638638
"--ignore-ttl",
639639
is_flag=True,
640-
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
640+
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire. Has no effect when --environment is specified.",
641641
)
642642
@click.option(
643643
"--force-delete",
644644
is_flag=True,
645645
help="Delete expired environment and snapshot state records even when the physical table or view drops fail. "
646646
"Any objects that could not be dropped become orphaned and must be removed manually.",
647647
)
648+
@click.option(
649+
"--environment",
650+
"-e",
651+
default=None,
652+
help="Scope cleanup to a single expired environment. Global snapshot and interval compaction are skipped.",
653+
)
648654
@click.pass_context
649655
@error_handler
650656
@cli_analytics
651-
def janitor(ctx: click.Context, ignore_ttl: bool, force_delete: bool, **kwargs: t.Any) -> None:
657+
def janitor(
658+
ctx: click.Context,
659+
ignore_ttl: bool,
660+
force_delete: bool,
661+
environment: t.Optional[str],
662+
**kwargs: t.Any,
663+
) -> None:
652664
"""
653665
Run the janitor process on-demand.
654666
655667
The janitor cleans up old environments and expired snapshots.
656668
"""
657-
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, **kwargs)
669+
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, environment=environment, **kwargs)
658670

659671

660672
@cli.command("destroy")

sqlmesh/core/context.py

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -901,12 +901,20 @@ def _has_environment_changed() -> bool:
901901
return completion_status
902902

903903
@python_api_analytics
904-
def run_janitor(self, ignore_ttl: bool, force_delete: bool = False) -> bool:
904+
def run_janitor(
905+
self,
906+
ignore_ttl: bool,
907+
force_delete: bool = False,
908+
environment: t.Optional[str] = None,
909+
) -> bool:
910+
if environment is not None:
911+
environment = Environment.sanitize_name(environment)
912+
905913
success = False
906914

907915
if self.console.start_cleanup(ignore_ttl):
908916
try:
909-
self._run_janitor(ignore_ttl, force_delete=force_delete)
917+
self._run_janitor(ignore_ttl, force_delete=force_delete, environment=environment)
910918
success = True
911919
finally:
912920
self.console.stop_cleanup(success=success)
@@ -1926,7 +1934,7 @@ def invalidate_environment(self, name: str, sync: bool = False) -> None:
19261934
name = Environment.sanitize_name(name)
19271935
self.state_sync.invalidate_environment(name)
19281936
if sync:
1929-
self._cleanup_environments()
1937+
self._cleanup_environments(name=name)
19301938
self.console.log_success(f"Environment '{name}' deleted.")
19311939
else:
19321940
self.console.log_success(f"Environment '{name}' invalidated.")
@@ -3026,27 +3034,35 @@ def _destroy(self) -> bool:
30263034

30273035
return True
30283036

3029-
def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) -> None:
3037+
def _run_janitor(
3038+
self,
3039+
ignore_ttl: bool = False,
3040+
force_delete: bool = False,
3041+
environment: t.Optional[str] = None,
3042+
) -> None:
30303043
current_ts = now_timestamp()
30313044
failures: t.List[str] = []
30323045

30333046
# Clean up expired environments by removing their views and schemas
30343047
failures.extend(
3035-
self._cleanup_environments(current_ts=current_ts, force_delete=force_delete)
3048+
self._cleanup_environments(
3049+
current_ts=current_ts, force_delete=force_delete, name=environment
3050+
)
30363051
)
30373052

3038-
failures.extend(
3039-
delete_expired_snapshots(
3040-
self.state_sync,
3041-
self.snapshot_evaluator,
3042-
current_ts=current_ts,
3043-
ignore_ttl=ignore_ttl,
3044-
force_delete=force_delete,
3045-
console=self.console,
3046-
batch_size=self.config.janitor.expired_snapshots_batch_size,
3053+
if environment is None:
3054+
failures.extend(
3055+
delete_expired_snapshots(
3056+
self.state_sync,
3057+
self.snapshot_evaluator,
3058+
current_ts=current_ts,
3059+
ignore_ttl=ignore_ttl,
3060+
force_delete=force_delete,
3061+
console=self.console,
3062+
batch_size=self.config.janitor.expired_snapshots_batch_size,
3063+
)
30473064
)
3048-
)
3049-
self.state_sync.compact_intervals()
3065+
self.state_sync.compact_intervals()
30503066

30513067
if failures:
30523068
failure_string = "\n - ".join(failures)
@@ -3059,15 +3075,23 @@ def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) ->
30593075
raise SQLMeshError(summary)
30603076

30613077
def _cleanup_environments(
3062-
self, current_ts: t.Optional[int] = None, force_delete: bool = False
3078+
self,
3079+
current_ts: t.Optional[int] = None,
3080+
force_delete: bool = False,
3081+
name: t.Optional[str] = None,
30633082
) -> t.List[str]:
30643083
current_ts = current_ts or now_timestamp()
30653084
failures: t.List[str] = []
30663085

30673086
expired_environments_summaries = self.state_sync.get_expired_environments(
3068-
current_ts=current_ts
3087+
current_ts=current_ts, name=name
30693088
)
30703089

3090+
if name is not None and not expired_environments_summaries:
3091+
self.console.log_warning(
3092+
f"Environment '{name}' is not expired or does not exist. Nothing to clean up."
3093+
)
3094+
30713095
for expired_env_summary in expired_environments_summaries:
30723096
expired_env = self.state_reader.get_environment(expired_env_summary.name)
30733097

@@ -3084,7 +3108,7 @@ def _cleanup_environments(
30843108
# we want to retry on the next janitor pass if drops failed, unless
30853109
# force_delete is set in which case we purge state records regardless
30863110
if not failures or force_delete:
3087-
self.state_sync.delete_expired_environments(current_ts=current_ts)
3111+
self.state_sync.delete_expired_environments(current_ts=current_ts, name=name)
30883112
return failures
30893113

30903114
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:

sqlmesh/core/dialect.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,16 @@ def _whens_sql(self: Generator, expression: exp.Whens) -> str:
745745
return self.wrap(self.expressions(expression, sep=" ", indent=False))
746746

747747

748+
def _parse_interval_span(self: Parser, this: exp.Expr) -> exp.Interval:
749+
interval = self.__parse_interval_span(this) # type: ignore
750+
# Without this, @unit in `INTERVAL @value @unit` is misread as an alias.
751+
if not interval.args.get("unit") and self._match(TokenType.PARAMETER):
752+
macro = _parse_macro(self)
753+
if macro is not None:
754+
interval.set("unit", macro)
755+
return interval
756+
757+
748758
def _override(klass: t.Type[Tokenizer | Parser], func: t.Callable) -> None:
749759
name = func.__name__
750760
setattr(klass, f"_{name}", getattr(klass, name))
@@ -1126,6 +1136,7 @@ def extend_sqlglot() -> None:
11261136
_override(TSQL.Parser, Parser._parse_if)
11271137
_override(Parser, _parse_if)
11281138
_override(Parser, _parse_id_var)
1139+
_override(Parser, _parse_interval_span)
11291140
_override(Parser, _warn_unsupported)
11301141
_override(Snowflake.Parser, _parse_table_parts)
11311142

sqlmesh/core/state_sync/base.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,17 @@ def get_expired_snapshots(
321321
"""
322322

323323
@abc.abstractmethod
324-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
324+
def get_expired_environments(
325+
self, current_ts: int, name: t.Optional[str] = None
326+
) -> t.List[EnvironmentSummary]:
325327
"""Returns the expired environments.
326328
327329
Expired environments are environments that have exceeded their time-to-live value.
330+
331+
Args:
332+
current_ts: The current timestamp in milliseconds used to determine expiration.
333+
name: If provided, only the environment with this name is considered.
334+
328335
Returns:
329336
The list of environment summaries to remove.
330337
"""
@@ -436,12 +443,16 @@ def finalize(self, environment: Environment) -> None:
436443

437444
@abc.abstractmethod
438445
def delete_expired_environments(
439-
self, current_ts: t.Optional[int] = None
446+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
440447
) -> t.List[EnvironmentSummary]:
441448
"""Removes expired environments.
442449
443450
Expired environments are environments that have exceeded their time-to-live value.
444451
452+
Args:
453+
current_ts: The current timestamp in milliseconds. Defaults to now.
454+
name: If provided, only the environment with this name is deleted.
455+
445456
Returns:
446457
The list of removed environments.
447458
"""

sqlmesh/core/state_sync/db/environment.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,31 +167,42 @@ def finalize(self, environment: Environment) -> None:
167167
where=environment_filter,
168168
)
169169

170-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
170+
def get_expired_environments(
171+
self, current_ts: int, name: t.Optional[str] = None
172+
) -> t.List[EnvironmentSummary]:
171173
"""Returns the expired environments.
172174
173175
Expired environments are environments that have exceeded their time-to-live value.
176+
177+
Args:
178+
current_ts: The current timestamp in milliseconds used to determine expiration.
179+
name: If provided, only the environment with this name is considered.
180+
174181
Returns:
175182
The list of environment summaries to remove.
176183
"""
177184
return self._fetch_environment_summaries(
178-
where=self._create_expiration_filter_expr(current_ts)
185+
where=self._create_expiration_filter_expr(current_ts, name=name)
179186
)
180187

181188
def delete_expired_environments(
182-
self, current_ts: t.Optional[int] = None
189+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
183190
) -> t.List[EnvironmentSummary]:
184191
"""Deletes expired environments.
185192
193+
Args:
194+
current_ts: The current timestamp in milliseconds. Defaults to now.
195+
name: If provided, only the environment with this name is deleted.
196+
186197
Returns:
187198
A list of deleted environments.
188199
"""
189200
current_ts = current_ts or now_timestamp()
190-
expired_environments = self.get_expired_environments(current_ts=current_ts)
201+
expired_environments = self.get_expired_environments(current_ts=current_ts, name=name)
191202

192203
self.engine_adapter.delete_from(
193204
self.environments_table,
194-
where=self._create_expiration_filter_expr(current_ts),
205+
where=self._create_expiration_filter_expr(current_ts, name=name),
195206
)
196207

197208
# Delete the expired environments' corresponding environment statements
@@ -310,16 +321,22 @@ def _environments_query(
310321
return query.lock(copy=False)
311322
return query
312323

313-
def _create_expiration_filter_expr(self, current_ts: int) -> exp.Expr:
324+
def _create_expiration_filter_expr(
325+
self, current_ts: int, name: t.Optional[str] = None
326+
) -> exp.Expr:
314327
"""Creates a SQLGlot filter expression to find expired environments.
315328
316329
Args:
317330
current_ts: The current timestamp.
331+
name: If provided, adds an equality filter on the environment name.
318332
"""
319-
return exp.LTE(
333+
where: exp.Expr = exp.LTE(
320334
this=exp.column("expiration_ts"),
321335
expression=exp.Literal.number(current_ts),
322336
)
337+
if name is not None:
338+
where = exp.and_(t.cast(exp.Condition, where), exp.column("name").eq(name))
339+
return where
323340

324341
def _fetch_environment_summaries(
325342
self, where: t.Optional[str | exp.Expr] = None

sqlmesh/core/state_sync/db/facade.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,10 @@ def get_expired_snapshots(
276276
batch_range=batch_range,
277277
)
278278

279-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
280-
return self.environment_state.get_expired_environments(current_ts=current_ts)
279+
def get_expired_environments(
280+
self, current_ts: int, name: t.Optional[str] = None
281+
) -> t.List[EnvironmentSummary]:
282+
return self.environment_state.get_expired_environments(current_ts=current_ts, name=name)
281283

282284
@transactional()
283285
def delete_expired_snapshots(
@@ -297,10 +299,10 @@ def delete_expired_snapshots(
297299

298300
@transactional()
299301
def delete_expired_environments(
300-
self, current_ts: t.Optional[int] = None
302+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
301303
) -> t.List[EnvironmentSummary]:
302304
current_ts = current_ts or now_timestamp()
303-
return self.environment_state.delete_expired_environments(current_ts=current_ts)
305+
return self.environment_state.delete_expired_environments(current_ts=current_ts, name=name)
304306

305307
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
306308
self.snapshot_state.delete_snapshots(snapshot_ids)

sqlmesh/utils/cache.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,16 @@ def __init__(self, path: Path, prefix: t.Optional[str] = None):
6363
# the file.stat() call below will fail on windows if the :file name is longer than 260 chars
6464
file = fix_windows_path(file)
6565

66-
if not file.stem.startswith(self._cache_version) or file.stat().st_atime < threshold:
67-
file.unlink(missing_ok=True)
66+
try:
67+
stat_result = file.stat()
68+
if (
69+
not file.stem.startswith(self._cache_version)
70+
or stat_result.st_atime < threshold
71+
):
72+
file.unlink(missing_ok=True)
73+
except FileNotFoundError:
74+
# File was deleted between glob() and stat() — skip stale cache entries gracefully
75+
continue
6876

6977
def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T:
7078
"""Returns an existing cached entry or loads and caches a new one.

0 commit comments

Comments
 (0)