Skip to content

Commit 9654de6

Browse files
committed
Add expire snapshots to cli
1 parent a9ad3a3 commit 9654de6

2 files changed

Lines changed: 162 additions & 0 deletions

File tree

pyiceberg/cli/console.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# pylint: disable=broad-except,redefined-builtin,redefined-outer-name
1818
import logging
1919
from collections.abc import Callable
20+
from datetime import datetime
2021
from functools import wraps
2122
from typing import (
2223
Any,
@@ -478,3 +479,48 @@ def _retention_properties(ref: SnapshotRef, table_properties: dict[str, str]) ->
478479
retention_properties["max_ref_age_ms"] = str(ref.max_ref_age_ms) if ref.max_ref_age_ms else "forever"
479480

480481
return retention_properties
482+
483+
484+
@run.group()
485+
def maintenance() -> None:
486+
"""Run maintenance operations on a table."""
487+
488+
489+
@maintenance.command("expire-snapshots")
490+
@click.argument("identifier")
491+
@click.option(
492+
"--snapshot-id",
493+
"snapshot_ids",
494+
type=int,
495+
multiple=True,
496+
help="Snapshot ID to expire. May be passed multiple times.",
497+
)
498+
@click.option(
499+
"--older-than",
500+
"older_than",
501+
type=click.DateTime(),
502+
help="Expire all unprotected snapshots with a timestamp older than this ISO datetime.",
503+
)
504+
@click.pass_context
505+
@catch_exception()
506+
def expire_snapshots(
507+
ctx: Context,
508+
identifier: str,
509+
snapshot_ids: tuple[int, ...],
510+
older_than: datetime | None,
511+
) -> None:
512+
"""Expire snapshots from a table by ID or age."""
513+
catalog, output = _catalog_and_output(ctx)
514+
515+
if not snapshot_ids and older_than is None:
516+
raise click.UsageError("Must provide at least one of --snapshot-id or --older-than.")
517+
518+
table = catalog.load_table(identifier)
519+
builder = table.maintenance.expire_snapshots()
520+
if snapshot_ids:
521+
builder = builder.by_ids([*snapshot_ids])
522+
if older_than is not None:
523+
builder = builder.older_than(older_than)
524+
builder.commit()
525+
526+
output.text(f"Expired snapshots on {identifier}")

tests/cli/test_console.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from unittest import mock
2222
from unittest.mock import MagicMock
2323

24+
import pyarrow as pa
2425
import pytest
2526
from click.testing import CliRunner
2627
from pytest_mock import MockFixture
@@ -1071,3 +1072,118 @@ def test_warehouse_cli_option_forwarded_to_catalog(mocker: MockFixture) -> None:
10711072
assert result.exit_code == 0
10721073
mock_basicConfig.assert_called_once()
10731074
mock_load_catalog.assert_called_once_with("rest", uri="https://catalog.service", warehouse="example-warehouse")
1075+
1076+
1077+
def _create_table_with_expirable_snapshot(catalog: InMemoryCatalog) -> int:
1078+
"""Create a table with two snapshots and return the older (non-HEAD) one.
1079+
1080+
The HEAD snapshot of a branch is protected from expiration, so to test the
1081+
expire-snapshots command we need a snapshot that has been superseded.
1082+
"""
1083+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
1084+
table = catalog.create_table(
1085+
identifier=TEST_TABLE_IDENTIFIER,
1086+
schema=TEST_TABLE_SCHEMA,
1087+
partition_spec=TEST_TABLE_PARTITION_SPEC,
1088+
)
1089+
arrow_schema = pa.schema(
1090+
[
1091+
pa.field("x", pa.int64(), nullable=False),
1092+
pa.field("y", pa.int64(), nullable=False),
1093+
pa.field("z", pa.int64(), nullable=False),
1094+
]
1095+
)
1096+
table.append(pa.Table.from_pylist([{"x": 1, "y": 2, "z": 3}], schema=arrow_schema))
1097+
table.refresh()
1098+
older_snapshot_id = table.current_snapshot().snapshot_id
1099+
table.append(pa.Table.from_pylist([{"x": 4, "y": 5, "z": 6}], schema=arrow_schema))
1100+
return older_snapshot_id
1101+
1102+
1103+
def test_expire_snapshots_requires_option(catalog: InMemoryCatalog) -> None:
1104+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
1105+
catalog.create_table(
1106+
identifier=TEST_TABLE_IDENTIFIER,
1107+
schema=TEST_TABLE_SCHEMA,
1108+
partition_spec=TEST_TABLE_PARTITION_SPEC,
1109+
)
1110+
1111+
runner = CliRunner()
1112+
result = runner.invoke(run, ["maintenance", "expire-snapshots", "default.my_table"])
1113+
1114+
assert result.exit_code == 1
1115+
assert "Must provide at least one of --snapshot-id or --older-than." in result.output
1116+
1117+
1118+
def test_expire_snapshots_table_does_not_exists(catalog: InMemoryCatalog) -> None:
1119+
# pylint: disable=unused-argument
1120+
1121+
runner = CliRunner()
1122+
result = runner.invoke(run, ["maintenance", "expire-snapshots", "default.doesnotexist", "--snapshot-id", "1"])
1123+
1124+
assert result.exit_code == 1
1125+
assert result.output == "Table does not exist: default.doesnotexist\n"
1126+
1127+
1128+
def test_expire_snapshots_by_id(catalog: InMemoryCatalog) -> None:
1129+
snapshot_id = _create_table_with_expirable_snapshot(catalog)
1130+
1131+
runner = CliRunner()
1132+
result = runner.invoke(run, ["maintenance", "expire-snapshots", "default.my_table", "--snapshot-id", str(snapshot_id)])
1133+
1134+
assert result.exit_code == 0
1135+
assert result.output == "Expired snapshots on default.my_table\n"
1136+
1137+
refreshed = catalog.load_table(TEST_TABLE_IDENTIFIER)
1138+
assert refreshed.metadata.snapshot_by_id(snapshot_id) is None
1139+
1140+
1141+
def test_expire_snapshots_older_than(catalog: InMemoryCatalog) -> None:
1142+
snapshot_id = _create_table_with_expirable_snapshot(catalog)
1143+
cutoff = datetime.datetime.now() + datetime.timedelta(days=1)
1144+
1145+
runner = CliRunner()
1146+
result = runner.invoke(
1147+
run,
1148+
[
1149+
"maintenance",
1150+
"expire-snapshots",
1151+
"default.my_table",
1152+
"--older-than",
1153+
cutoff.strftime("%Y-%m-%dT%H:%M:%S"),
1154+
],
1155+
)
1156+
1157+
assert result.exit_code == 0
1158+
assert result.output == "Expired snapshots on default.my_table\n"
1159+
1160+
refreshed = catalog.load_table(TEST_TABLE_IDENTIFIER)
1161+
assert refreshed.metadata.snapshot_by_id(snapshot_id) is None
1162+
1163+
1164+
def test_expire_snapshots_unknown_snapshot_id(catalog: InMemoryCatalog) -> None:
1165+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
1166+
catalog.create_table(
1167+
identifier=TEST_TABLE_IDENTIFIER,
1168+
schema=TEST_TABLE_SCHEMA,
1169+
partition_spec=TEST_TABLE_PARTITION_SPEC,
1170+
)
1171+
1172+
runner = CliRunner()
1173+
result = runner.invoke(run, ["maintenance", "expire-snapshots", "default.my_table", "--snapshot-id", "999"])
1174+
1175+
assert result.exit_code == 1
1176+
assert "Snapshot with ID 999 does not exist." in result.output
1177+
1178+
1179+
def test_json_expire_snapshots_by_id(catalog: InMemoryCatalog) -> None:
1180+
snapshot_id = _create_table_with_expirable_snapshot(catalog)
1181+
1182+
runner = CliRunner()
1183+
result = runner.invoke(
1184+
run,
1185+
["--output=json", "maintenance", "expire-snapshots", "default.my_table", "--snapshot-id", str(snapshot_id)],
1186+
)
1187+
1188+
assert result.exit_code == 0
1189+
assert result.output == '"Expired snapshots on default.my_table"\n'

0 commit comments

Comments
 (0)