Skip to content

Commit f0fc260

Browse files
committed
add merge append
1 parent 6803eba commit f0fc260

File tree

2 files changed

+255
-2
lines changed

2 files changed

+255
-2
lines changed

pyiceberg/table/__init__.py

Lines changed: 206 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
import concurrent
1920
import itertools
2021
import uuid
2122
import warnings
2223
from abc import ABC, abstractmethod
24+
from collections import defaultdict
25+
from concurrent.futures import Future
2326
from copy import copy
2427
from dataclasses import dataclass
2528
from datetime import datetime
@@ -62,7 +65,7 @@
6265
inclusive_projection,
6366
manifest_evaluator,
6467
)
65-
from pyiceberg.io import FileIO, load_file_io
68+
from pyiceberg.io import FileIO, OutputFile, load_file_io
6669
from pyiceberg.manifest import (
6770
POSITIONAL_DELETE_SCHEMA,
6871
DataFile,
@@ -71,6 +74,7 @@
7174
ManifestEntry,
7275
ManifestEntryStatus,
7376
ManifestFile,
77+
ManifestWriter,
7478
PartitionFieldSummary,
7579
write_manifest,
7680
write_manifest_list,
@@ -136,6 +140,7 @@
136140
StructType,
137141
transform_dict_value_to_str,
138142
)
143+
from pyiceberg.utils.bin_packing import ListPacker
139144
from pyiceberg.utils.concurrent import ExecutorFactory
140145
from pyiceberg.utils.datetime import datetime_to_millis
141146
from pyiceberg.utils.singleton import _convert_to_hashable_type
@@ -240,6 +245,15 @@ class TableProperties:
240245
FORMAT_VERSION = "format-version"
241246
DEFAULT_FORMAT_VERSION = 2
242247

248+
MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes"
249+
MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024 # 8 MB
250+
251+
MANIFEST_MIN_MERGE_COUNT = "commit.manifest.min-count-to-merge"
252+
MANIFEST_MIN_MERGE_COUNT_DEFAULT = 100
253+
254+
MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled"
255+
MANIFEST_MERGE_ENABLED_DEFAULT = True
256+
243257

244258
class PropertyUtil:
245259
@staticmethod
@@ -2751,10 +2765,12 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List
27512765

27522766
class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
27532767
commit_uuid: uuid.UUID
2768+
_io: FileIO
27542769
_operation: Operation
27552770
_snapshot_id: int
27562771
_parent_snapshot_id: Optional[int]
27572772
_added_data_files: List[DataFile]
2773+
_manifest_num_counter: itertools.count[int]
27582774

27592775
def __init__(
27602776
self,
@@ -2775,6 +2791,7 @@ def __init__(
27752791
)
27762792
self._added_data_files = []
27772793
self.snapshot_properties = snapshot_properties
2794+
self._manifest_num_counter = itertools.count(0)
27782795

27792796
def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer:
27802797
self._added_data_files.append(data_file)
@@ -2786,6 +2803,10 @@ def _deleted_entries(self) -> List[ManifestEntry]: ...
27862803
@abstractmethod
27872804
def _existing_manifests(self) -> List[ManifestFile]: ...
27882805

2806+
def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
2807+
"""To perform any post-processing on the manifests before writing them to the new snapshot."""
2808+
return manifests
2809+
27892810
def _manifests(self) -> List[ManifestFile]:
27902811
def _write_added_manifest() -> List[ManifestFile]:
27912812
if self._added_data_files:
@@ -2840,7 +2861,7 @@ def _write_delete_manifest() -> List[ManifestFile]:
28402861
delete_manifests = executor.submit(_write_delete_manifest)
28412862
existing_manifests = executor.submit(self._existing_manifests)
28422863

2843-
return added_manifests.result() + delete_manifests.result() + existing_manifests.result()
2864+
return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result())
28442865

28452866
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
28462867
ssc = SnapshotSummaryCollector()
@@ -2913,6 +2934,34 @@ def _commit(self) -> UpdatesAndRequirements:
29132934
),
29142935
)
29152936

2937+
@property
2938+
def snapshot_id(self) -> int:
2939+
return self._snapshot_id
2940+
2941+
def spec(self, spec_id: int) -> PartitionSpec:
2942+
return self._transaction.table_metadata.specs()[spec_id]
2943+
2944+
def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
2945+
return write_manifest(
2946+
format_version=self._transaction.table_metadata.format_version,
2947+
spec=spec,
2948+
schema=self._transaction.table_metadata.schema(),
2949+
output_file=self.new_manifest_output(),
2950+
snapshot_id=self._snapshot_id,
2951+
)
2952+
2953+
def new_manifest_output(self) -> OutputFile:
2954+
return self._io.new_output(
2955+
_new_manifest_path(
2956+
location=self._transaction.table_metadata.location,
2957+
num=next(self._manifest_num_counter),
2958+
commit_uuid=self.commit_uuid,
2959+
)
2960+
)
2961+
2962+
def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:
2963+
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)
2964+
29162965

29172966
class FastAppendFiles(_MergingSnapshotProducer):
29182967
def _existing_manifests(self) -> List[ManifestFile]:
@@ -2943,6 +2992,61 @@ def _deleted_entries(self) -> List[ManifestEntry]:
29432992
return []
29442993

29452994

2995+
class MergeAppendFiles(FastAppendFiles):
2996+
_target_size_bytes: int
2997+
_min_count_to_merge: int
2998+
_merge_enabled: bool
2999+
3000+
def __init__(
3001+
self,
3002+
operation: Operation,
3003+
transaction: Transaction,
3004+
io: FileIO,
3005+
commit_uuid: Optional[uuid.UUID] = None,
3006+
) -> None:
3007+
super().__init__(operation, transaction, io, commit_uuid)
3008+
self._target_size_bytes = PropertyUtil.property_as_int(
3009+
self._transaction.table_metadata.properties,
3010+
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
3011+
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
3012+
) # type: ignore
3013+
self._min_count_to_merge = PropertyUtil.property_as_int(
3014+
self._transaction.table_metadata.properties,
3015+
TableProperties.MANIFEST_MIN_MERGE_COUNT,
3016+
TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT,
3017+
) # type: ignore
3018+
self._merge_enabled = PropertyUtil.property_as_bool(
3019+
self._transaction.table_metadata.properties,
3020+
TableProperties.MANIFEST_MERGE_ENABLED,
3021+
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
3022+
)
3023+
3024+
def _deleted_entries(self) -> List[ManifestEntry]:
3025+
"""To determine if we need to record any deleted manifest entries.
3026+
3027+
In case of an append, nothing is deleted.
3028+
"""
3029+
return []
3030+
3031+
def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
3032+
"""To perform any post-processing on the manifests before writing them to the new snapshot.
3033+
3034+
In MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge
3035+
if automatic merge is enabled.
3036+
"""
3037+
unmerged_data_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DATA]
3038+
unmerged_deletes_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DELETES]
3039+
3040+
data_manifest_merge_manager = _ManifestMergeManager(
3041+
target_size_bytes=self._target_size_bytes,
3042+
min_count_to_merge=self._min_count_to_merge,
3043+
merge_enabled=self._merge_enabled,
3044+
snapshot_producer=self,
3045+
)
3046+
3047+
return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests
3048+
3049+
29463050
class OverwriteFiles(_MergingSnapshotProducer):
29473051
def _existing_manifests(self) -> List[ManifestFile]:
29483052
"""To determine if there are any existing manifest files.
@@ -3001,6 +3105,9 @@ def fast_append(self) -> FastAppendFiles:
30013105
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
30023106
)
30033107

3108+
def merge_append(self) -> MergeAppendFiles:
3109+
return MergeAppendFiles(operation=Operation.APPEND, transaction=self._transaction, io=self._io)
3110+
30043111
def overwrite(self) -> OverwriteFiles:
30053112
return OverwriteFiles(
30063113
operation=Operation.OVERWRITE
@@ -3735,3 +3842,100 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
37353842
table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions)
37363843

37373844
return table_partitions
3845+
3846+
3847+
class _ManifestMergeManager:
3848+
_target_size_bytes: int
3849+
_min_count_to_merge: int
3850+
_merge_enabled: bool
3851+
_snapshot_producer: _MergingSnapshotProducer
3852+
3853+
def __init__(
3854+
self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _MergingSnapshotProducer
3855+
) -> None:
3856+
self._target_size_bytes = target_size_bytes
3857+
self._min_count_to_merge = min_count_to_merge
3858+
self._merge_enabled = merge_enabled
3859+
self._snapshot_producer = snapshot_producer
3860+
3861+
def _group_by_spec(
3862+
self, first_manifest: ManifestFile, remaining_manifests: List[ManifestFile]
3863+
) -> Dict[int, List[ManifestFile]]:
3864+
groups = defaultdict(list)
3865+
groups[first_manifest.partition_spec_id].append(first_manifest)
3866+
for manifest in remaining_manifests:
3867+
groups[manifest.partition_spec_id].append(manifest)
3868+
return groups
3869+
3870+
def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> ManifestFile:
3871+
with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer:
3872+
for manifest in manifest_bin:
3873+
for entry in self._snapshot_producer.fetch_manifest_entry(manifest):
3874+
if entry.status == ManifestEntryStatus.DELETED:
3875+
# suppress deletes from previous snapshots. only files deleted by this snapshot
3876+
# should be added to the new manifest
3877+
if entry.snapshot_id == self._snapshot_producer.snapshot_id:
3878+
writer.add_entry(entry)
3879+
elif entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id:
3880+
# adds from this snapshot are still adds, otherwise they should be existing
3881+
writer.add_entry(entry)
3882+
else:
3883+
# add all files from the old manifest as existing files
3884+
writer.add_entry(
3885+
ManifestEntry(
3886+
status=ManifestEntryStatus.EXISTING,
3887+
snapshot_id=entry.snapshot_id,
3888+
data_sequence_number=entry.data_sequence_number,
3889+
file_sequence_number=entry.file_sequence_number,
3890+
data_file=entry.data_file,
3891+
)
3892+
)
3893+
3894+
return writer.to_manifest_file()
3895+
3896+
def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: List[ManifestFile]) -> List[ManifestFile]:
3897+
packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False)
3898+
bins: List[List[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length)
3899+
3900+
def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]:
3901+
output_manifests = []
3902+
if len(manifest_bin) == 1:
3903+
output_manifests.append(manifest_bin[0])
3904+
elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge:
3905+
# if the bin has the first manifest (the new data files or an appended manifest file)
3906+
# then only merge it
3907+
# if the number of manifests is above the minimum count. this is applied only to bins
3908+
# with an in-memory
3909+
# manifest so that large manifests don't prevent merging older groups.
3910+
output_manifests.extend(manifest_bin)
3911+
else:
3912+
output_manifests.append(self._create_manifest(spec_id, manifest_bin))
3913+
3914+
return output_manifests
3915+
3916+
executor = ExecutorFactory.get_or_create()
3917+
futures = [executor.submit(merge_bin, b) for b in bins]
3918+
3919+
# for consistent ordering, we need to maintain future order
3920+
futures_index = {f: i for i, f in enumerate(futures)}
3921+
completed_futures: SortedList[Future[List[ManifestFile]]] = SortedList(iterable=[], key=lambda f: futures_index[f])
3922+
for future in concurrent.futures.as_completed(futures):
3923+
completed_futures.add(future)
3924+
3925+
bin_results: List[List[ManifestFile]] = [f.result() for f in completed_futures if f.result()]
3926+
3927+
return [manifest for bin_result in bin_results for manifest in bin_result]
3928+
3929+
def merge_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
3930+
if not self._merge_enabled or len(manifests) == 0:
3931+
return manifests
3932+
3933+
first_manifest = manifests[0]
3934+
remaining_manifests = manifests[1:]
3935+
groups = self._group_by_spec(first_manifest, remaining_manifests)
3936+
3937+
merged_manifests = []
3938+
for spec_id in reversed(groups.keys()):
3939+
merged_manifests.extend(self._merge_group(first_manifest, spec_id, groups[spec_id]))
3940+
3941+
return merged_manifests

tests/integration/test_writes/test_writes.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,3 +871,52 @@ def table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null
871871
tbl.append(arrow_table_without_some_columns)
872872
# overwrite and then append should produce twice the data
873873
assert len(tbl.scan().to_arrow()) == len(arrow_table_without_some_columns) * 2
874+
875+
876+
@pytest.mark.integration
877+
@pytest.mark.parametrize("format_version", [1, 2])
878+
def test_merge_manifest_min_count_to_merge(
879+
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
880+
) -> None:
881+
tbl_a = _create_table(
882+
session_catalog,
883+
"default.merge_manifest_a",
884+
{"commit.manifest.min-count-to-merge": "1", "format-version": format_version},
885+
[],
886+
)
887+
tbl_b = _create_table(
888+
session_catalog,
889+
"default.merge_manifest_b",
890+
{"commit.manifest.min-count-to-merge": "1", "commit.manifest.target-size-bytes": "1", "format-version": format_version},
891+
[],
892+
)
893+
tbl_c = _create_table(
894+
session_catalog,
895+
"default.merge_manifest_c",
896+
{"commit.manifest-merge.enabled": "false", "format-version": format_version},
897+
[],
898+
)
899+
900+
# tbl_a should merge all manifests into 1
901+
tbl_a.append(arrow_table_with_null)
902+
tbl_a.append(arrow_table_with_null)
903+
tbl_a.append(arrow_table_with_null)
904+
905+
# tbl_b should not merge any manifests because the target size is too small
906+
tbl_b.append(arrow_table_with_null)
907+
tbl_b.append(arrow_table_with_null)
908+
tbl_b.append(arrow_table_with_null)
909+
910+
# tbl_c should not merge any manifests because merging is disabled
911+
tbl_c.append(arrow_table_with_null)
912+
tbl_c.append(arrow_table_with_null)
913+
tbl_c.append(arrow_table_with_null)
914+
915+
assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore
916+
assert len(tbl_b.current_snapshot().manifests(tbl_b.io)) == 3 # type: ignore
917+
assert len(tbl_c.current_snapshot().manifests(tbl_c.io)) == 3 # type: ignore
918+
919+
# tbl_a and tbl_c should contain the same data
920+
assert tbl_a.scan().to_arrow().equals(tbl_c.scan().to_arrow())
921+
# tbl_b and tbl_c should contain the same data
922+
assert tbl_b.scan().to_arrow().equals(tbl_c.scan().to_arrow())

0 commit comments

Comments
 (0)