Skip to content

Commit 42a82ca

Browse files
committed
Add RemovePartitionStatisticsUpdate and SetPartitionStatisticsUpdate
events This allows us to add, update, and remove partition statistics files.
1 parent 479e663 commit 42a82ca

File tree

3 files changed

+115
-2
lines changed

3 files changed

+115
-2
lines changed

pyiceberg/table/statistics.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,9 @@ def filter_statistics_by_snapshot_id(
5252
reject_snapshot_id: int,
5353
) -> List[StatisticsFile]:
5454
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]
55+
56+
def filter_partition_statistics_by_snapshot_id(
57+
statistics: List[PartitionStatisticsFile],
58+
reject_snapshot_id: int,
59+
) -> List[PartitionStatisticsFile]:
60+
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]

pyiceberg/table/update/__init__.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
SnapshotLogEntry,
3737
)
3838
from pyiceberg.table.sorting import SortOrder
39-
from pyiceberg.table.statistics import StatisticsFile, filter_statistics_by_snapshot_id
39+
from pyiceberg.table.statistics import PartitionStatisticsFile, StatisticsFile, filter_partition_statistics_by_snapshot_id, filter_statistics_by_snapshot_id
4040
from pyiceberg.typedef import (
4141
IcebergBaseModel,
4242
Properties,
@@ -197,6 +197,14 @@ class RemoveStatisticsUpdate(IcebergBaseModel):
197197
action: Literal["remove-statistics"] = Field(default="remove-statistics")
198198
snapshot_id: int = Field(alias="snapshot-id")
199199

200+
class SetPartitionStatisticsUpdate(IcebergBaseModel):
201+
action: Literal["set-partition-statistics"] = Field(default="set-partition-statistics")
202+
partition_statistics: PartitionStatisticsFile
203+
204+
class RemovePartitionStatisticsUpdate(IcebergBaseModel):
205+
action: Literal["remove-partition-statistics"] = Field(default="remove-partition-statistics")
206+
snapshot_id: int = Field(alias="snapshot-id")
207+
200208

201209
TableUpdate = Annotated[
202210
Union[
@@ -217,6 +225,8 @@ class RemoveStatisticsUpdate(IcebergBaseModel):
217225
RemovePropertiesUpdate,
218226
SetStatisticsUpdate,
219227
RemoveStatisticsUpdate,
228+
SetPartitionStatisticsUpdate,
229+
RemovePartitionStatisticsUpdate,
220230
],
221231
Field(discriminator="action"),
222232
]
@@ -581,6 +591,23 @@ def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _Ta
581591

582592
return base_metadata.model_copy(update={"statistics": statistics})
583593

594+
@_apply_table_update.register(SetPartitionStatisticsUpdate)
595+
def _(update: SetPartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
596+
partition_statistics = filter_partition_statistics_by_snapshot_id(base_metadata.partition_statistics, update.partition_statistics.snapshot_id)
597+
context.add_update(update)
598+
599+
return base_metadata.model_copy(update={"partition_statistics": partition_statistics + [update.partition_statistics]})
600+
601+
@_apply_table_update.register(RemovePartitionStatisticsUpdate)
602+
def _(update: RemovePartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
603+
if not any(part_stat.snapshot_id == update.snapshot_id for part_stat in base_metadata.partition_statistics):
604+
raise ValueError(f"Partition Statistics with snapshot id {update.snapshot_id} does not exist")
605+
606+
statistics = filter_partition_statistics_by_snapshot_id(base_metadata.partition_statistics, update.snapshot_id)
607+
context.add_update(update)
608+
609+
return base_metadata.model_copy(update={"partition_statistics": statistics})
610+
584611

585612
def update_table_metadata(
586613
base_metadata: TableMetadata,

tests/table/test_init.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
SortField,
6565
SortOrder,
6666
)
67-
from pyiceberg.table.statistics import BlobMetadata, StatisticsFile
67+
from pyiceberg.table.statistics import BlobMetadata, PartitionStatisticsFile, StatisticsFile
6868
from pyiceberg.table.update import (
6969
AddSnapshotUpdate,
7070
AddSortOrderUpdate,
@@ -77,10 +77,13 @@
7777
AssertRefSnapshotId,
7878
AssertTableUUID,
7979
RemovePropertiesUpdate,
80+
RemovePartitionStatisticsUpdate,
81+
RemovePropertiesUpdate,
8082
RemoveSnapshotRefUpdate,
8183
RemoveSnapshotsUpdate,
8284
RemoveStatisticsUpdate,
8385
SetDefaultSortOrderUpdate,
86+
SetPartitionStatisticsUpdate,
8487
SetPropertiesUpdate,
8588
SetSnapshotRefUpdate,
8689
SetStatisticsUpdate,
@@ -1359,3 +1362,80 @@ def test_remove_statistics_update(table_v2_with_statistics: Table) -> None:
13591362
table_v2_with_statistics.metadata,
13601363
(RemoveStatisticsUpdate(snapshot_id=123456789),),
13611364
)
1365+
1366+
1367+
def test_set_partition_statistics_update(table_v2_with_statistics: Table) -> None:
1368+
snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id
1369+
1370+
partition_statistics_file = PartitionStatisticsFile(
1371+
snapshot_id=snapshot_id,
1372+
statistics_path="s3://bucket/warehouse/stats.puffin",
1373+
file_size_in_bytes=124,
1374+
)
1375+
1376+
update = SetPartitionStatisticsUpdate(
1377+
partition_statistics=partition_statistics_file,
1378+
)
1379+
1380+
new_metadata = update_table_metadata(
1381+
table_v2_with_statistics.metadata,
1382+
(update,),
1383+
)
1384+
1385+
expected = """
1386+
{
1387+
"snapshot-id": 3055729675574597004,
1388+
"statistics-path": "s3://bucket/warehouse/stats.puffin",
1389+
"file-size-in-bytes": 124
1390+
}"""
1391+
1392+
assert len(new_metadata.partition_statistics) == 1
1393+
1394+
updated_statistics = [stat for stat in new_metadata.partition_statistics if stat.snapshot_id == snapshot_id]
1395+
1396+
assert len(updated_statistics) == 1
1397+
assert json.loads(updated_statistics[0].model_dump_json()) == json.loads(expected)
1398+
1399+
1400+
def test_remove_partition_statistics_update(table_v2_with_statistics: Table) -> None:
1401+
# Add partition statistics file.
1402+
snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id
1403+
1404+
partition_statistics_file = PartitionStatisticsFile(
1405+
snapshot_id=snapshot_id,
1406+
statistics_path="s3://bucket/warehouse/stats.puffin",
1407+
file_size_in_bytes=124,
1408+
)
1409+
1410+
update = SetPartitionStatisticsUpdate(
1411+
partition_statistics=partition_statistics_file,
1412+
)
1413+
1414+
new_metadata = update_table_metadata(
1415+
table_v2_with_statistics.metadata,
1416+
(update,),
1417+
)
1418+
assert len(new_metadata.partition_statistics) == 1
1419+
1420+
# Remove the same partition statistics file.
1421+
remove_update = RemovePartitionStatisticsUpdate(
1422+
snapshot_id=snapshot_id
1423+
)
1424+
1425+
remove_metadata = update_table_metadata(
1426+
new_metadata,
1427+
(remove_update,),
1428+
)
1429+
1430+
assert len(remove_metadata.partition_statistics) == 0
1431+
1432+
def test_remove_partition_statistics_update_with_invalid_snapshot_id(table_v2_with_statistics: Table) -> None:
1433+
# Remove the same partition statistics file.
1434+
with pytest.raises(
1435+
ValueError,
1436+
match="Partition Statistics with snapshot id 123456789 does not exist",
1437+
):
1438+
update_table_metadata(
1439+
table_v2_with_statistics.metadata,
1440+
(RemovePartitionStatisticsUpdate(snapshot_id=123456789),),
1441+
)

0 commit comments

Comments
 (0)