Skip to content

Commit 2adebcb

Browse files
committed
merge manifests, tests
1 parent 8cc3612 commit 2adebcb

File tree

3 files changed

+195
-36
lines changed

3 files changed

+195
-36
lines changed

pyiceberg/table/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
update_table_metadata,
7676
)
7777
from pyiceberg.table.update.schema import UpdateSchema
78-
from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles
78+
from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _AppendFiles
7979
from pyiceberg.table.update.sorting import UpdateSortOrder
8080
from pyiceberg.table.update.spec import UpdateSpec
8181
from pyiceberg.table.update.statistics import UpdateStatistics
@@ -384,7 +384,7 @@ def _build_partition_predicate(
384384

385385
def _append_snapshot_producer(
386386
self, snapshot_properties: dict[str, str], branch: str | None = MAIN_BRANCH
387-
) -> _FastAppendFiles:
387+
) -> _AppendFiles[Any]:
388388
"""Determine the append type based on table properties.
389389
390390
Args:
@@ -699,6 +699,7 @@ def delete(
699699
)
700700

701701
if len(replaced_files) > 0:
702+
print("HEERE")
702703
with self.update_snapshot(
703704
snapshot_properties=snapshot_properties, branch=branch
704705
).overwrite() as overwrite_snapshot:

pyiceberg/table/update/snapshot.py

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -496,36 +496,7 @@ def files_affected(self) -> bool:
496496
return len(self._deleted_entries()) > 0
497497

498498

499-
class _FastAppendFiles(_SnapshotProducer["_FastAppendFiles"]):
500-
def _existing_manifests(self) -> list[ManifestFile]:
501-
"""To determine if there are any existing manifest files.
502-
503-
A fast append will add another ManifestFile to the ManifestList.
504-
All the existing manifest files are considered existing.
505-
"""
506-
existing_manifests = []
507-
508-
if self._parent_snapshot_id is not None:
509-
previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
510-
511-
if previous_snapshot is None:
512-
raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}")
513-
514-
for manifest in previous_snapshot.manifests(io=self._io):
515-
if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id:
516-
existing_manifests.append(manifest)
517-
518-
return existing_manifests
519-
520-
def _deleted_entries(self) -> list[ManifestEntry]:
521-
"""To determine if we need to record any deleted manifest entries.
522-
523-
In case of an append, nothing is deleted.
524-
"""
525-
return []
526-
527-
528-
class _MergeAppendFiles(_FastAppendFiles):
499+
class _MergingSnapshotProducer(_SnapshotProducer[U]):
529500
_target_size_bytes: int
530501
_min_count_to_merge: int
531502
_merge_enabled: bool
@@ -561,8 +532,8 @@ def __init__(
561532
def _process_manifests(self, manifests: list[ManifestFile]) -> list[ManifestFile]:
562533
"""To perform any post-processing on the manifests before writing them to the new snapshot.
563534
564-
In _MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge
565-
if automatic merge is enabled.
535+
We merge manifests based on the target size and the minimum count to merge if automatic
536+
merge is enabled.
566537
"""
567538
unmerged_data_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DATA]
568539
unmerged_deletes_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DELETES]
@@ -577,7 +548,44 @@ def _process_manifests(self, manifests: list[ManifestFile]) -> list[ManifestFile
577548
return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests
578549

579550

580-
class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]):
551+
class _AppendFiles(_SnapshotProducer[U]):
552+
def _existing_manifests(self) -> list[ManifestFile]:
553+
"""To determine if there are any existing manifest files.
554+
555+
An append will add another ManifestFile to the ManifestList.
556+
All the existing manifest files are considered existing.
557+
"""
558+
existing_manifests = []
559+
560+
if self._parent_snapshot_id is not None:
561+
previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
562+
563+
if previous_snapshot is None:
564+
raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}")
565+
566+
for manifest in previous_snapshot.manifests(io=self._io):
567+
if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id:
568+
existing_manifests.append(manifest)
569+
570+
return existing_manifests
571+
572+
def _deleted_entries(self) -> list[ManifestEntry]:
573+
"""To determine if we need to record any deleted manifest entries.
574+
575+
In case of an append, nothing is deleted.
576+
"""
577+
return []
578+
579+
580+
class _FastAppendFiles(_AppendFiles["_FastAppendFiles"]):
581+
pass
582+
583+
584+
class _MergeAppendFiles(_MergingSnapshotProducer["_MergeAppendFiles"], _AppendFiles["_MergeAppendFiles"]):
585+
pass
586+
587+
588+
class _OverwriteFiles(_MergingSnapshotProducer["_OverwriteFiles"]):
581589
"""Overwrites data from the table. This will produce an OVERWRITE snapshot.
582590
583591
Data and delete files were added and removed in a logical overwrite operation.
@@ -742,7 +750,13 @@ def __init__(
742750
def _group_by_spec(self, manifests: list[ManifestFile]) -> dict[int, list[ManifestFile]]:
743751
groups = defaultdict(list)
744752
for manifest in manifests:
745-
groups[manifest.partition_spec_id].append(manifest)
753+
# filter out manifests that only has non-live data files
754+
if (
755+
manifest.has_added_files()
756+
or manifest.has_existing_files()
757+
or manifest.added_snapshot_id == self._snapshot_producer._snapshot_id
758+
):
759+
groups[manifest.partition_spec_id].append(manifest)
746760
return groups
747761

748762
def _create_manifest(self, spec_id: int, manifest_bin: list[ManifestFile]) -> ManifestFile:
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
# pylint:disable=redefined-outer-name
18+
19+
import pyarrow as pa
20+
import pytest
21+
22+
from pyiceberg.catalog import Catalog
23+
from pyiceberg.expressions import EqualTo
24+
from pyiceberg.partitioning import PartitionField, PartitionSpec
25+
from pyiceberg.schema import Schema
26+
from pyiceberg.table import TableProperties
27+
from pyiceberg.transforms import IdentityTransform
28+
from pyiceberg.types import LongType, NestedField
29+
from utils import _create_table
30+
31+
_SCHEMA = Schema(
32+
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
33+
NestedField(field_id=2, name="partition_col", field_type=LongType(), required=False),
34+
)
35+
36+
_PARTITION_SPEC = PartitionSpec(PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="partition_col"))
37+
38+
_DATA_P1 = pa.table({"id": [1, 2, 3], "partition_col": [1, 1, 1]})
39+
_DATA_P2 = pa.table({"id": [4, 5, 6], "partition_col": [2, 2, 2]})
40+
_DATA_P1_NEW = pa.table({"id": [10, 11, 12], "partition_col": [1, 1, 1]})
41+
42+
43+
@pytest.mark.integration
44+
@pytest.mark.parametrize("format_version", [1, 2])
45+
def test_overwrite_with_merging_bounds_manifest_count(session_catalog: Catalog, format_version: int) -> None:
46+
"""After a partial overwrite with manifest merging enabled, the manifest count should
47+
not exceed the count before the overwrite, even when many manifests were accumulated."""
48+
identifier = f"default.test_overwrite_merges_manifests_v{format_version}"
49+
50+
# Build up 6 manifests via fast append (merging disabled)
51+
tbl = _create_table(
52+
session_catalog,
53+
identifier,
54+
{
55+
"format-version": str(format_version),
56+
TableProperties.MANIFEST_MERGE_ENABLED: "false",
57+
},
58+
partition_spec=_PARTITION_SPEC,
59+
schema=_SCHEMA,
60+
)
61+
62+
for _ in range(3):
63+
tbl.append(_DATA_P1)
64+
tbl.append(_DATA_P2)
65+
66+
assert len(tbl.inspect.manifests()) == 6
67+
68+
# Enable merging before the overwrite
69+
with tbl.transaction() as tx:
70+
tx.set_properties(
71+
{
72+
TableProperties.MANIFEST_MERGE_ENABLED: "true",
73+
TableProperties.MANIFEST_MIN_MERGE_COUNT: "2",
74+
}
75+
)
76+
77+
# Overwrite partition_col=1 only: the 3 partition_col=2 manifests are preserved
78+
# and merged together with the new manifest by _MergeAppendFiles._process_manifests
79+
tbl.overwrite(_DATA_P1_NEW, EqualTo("partition_col", 1))
80+
81+
assert len(tbl.inspect.manifests()) < 6
82+
83+
# Data correctness: partition_col=2 has 3 × 3 = 9 rows; partition_col=1 replaced with 3 new rows
84+
result = tbl.scan().to_arrow()
85+
assert len(result) == 12
86+
assert sorted(result.column("id").to_pylist()) == [4, 4, 4, 5, 5, 5, 6, 6, 6, 10, 11, 12]
87+
88+
89+
@pytest.mark.integration
90+
@pytest.mark.parametrize("format_version", [1, 2])
91+
def test_overwrite_without_merging_increases_manifest_count(session_catalog: Catalog, format_version: int) -> None:
92+
"""Control test: without manifest merging, a partial overwrite grows the manifest count."""
93+
identifier = f"default.test_overwrite_no_merge_manifests_v{format_version}"
94+
95+
tbl = _create_table(
96+
session_catalog,
97+
identifier,
98+
{
99+
"format-version": str(format_version),
100+
TableProperties.MANIFEST_MERGE_ENABLED: "false",
101+
},
102+
partition_spec=_PARTITION_SPEC,
103+
schema=_SCHEMA,
104+
)
105+
106+
for _ in range(3):
107+
tbl.append(_DATA_P1)
108+
for _ in range(3):
109+
tbl.append(_DATA_P2)
110+
111+
assert len(tbl.inspect.manifests()) == 6
112+
113+
# Overwrite without merging: new manifest is added on top of the existing ones
114+
tbl.overwrite(_DATA_P1_NEW, EqualTo("partition_col", 1))
115+
116+
assert len(tbl.inspect.manifests()) == 4
117+
118+
# Data correctness is identical regardless of merging strategy
119+
result = tbl.scan().to_arrow()
120+
assert len(result) == 12
121+
assert sorted(result.column("id").to_pylist()) == [4, 4, 4, 5, 5, 5, 6, 6, 6, 10, 11, 12]
122+
123+
124+
@pytest.mark.integration
125+
@pytest.mark.parametrize("format_version", [1, 2])
126+
def test_fast_append_does_not_merge_manifests(session_catalog: Catalog, format_version: int) -> None:
127+
"""Fast append bypasses _MergingSnapshotProducer, so manifests grow with each append
128+
even when manifest merging properties are set to trigger early."""
129+
identifier = f"default.test_fast_append_no_merge_v{format_version}"
130+
131+
tbl = _create_table(
132+
session_catalog,
133+
identifier,
134+
{
135+
"format-version": str(format_version),
136+
TableProperties.MANIFEST_MERGE_ENABLED: "false",
137+
TableProperties.MANIFEST_MIN_MERGE_COUNT: "2",
138+
},
139+
schema=_SCHEMA,
140+
)
141+
142+
for expected_count in range(1, 6):
143+
tbl.append(_DATA_P1)
144+
assert len(tbl.inspect.manifests()) == expected_count

0 commit comments

Comments
 (0)