Skip to content

Commit 34a8f9f

Browse files
committed
feat: Add support for RollingManifestWriter
1 parent b98de51 commit 34a8f9f

File tree

2 files changed

+185
-1
lines changed

2 files changed

+185
-1
lines changed

pyiceberg/manifest.py

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import math
2020
import threading
2121
from abc import ABC, abstractmethod
22-
from collections.abc import Iterator
22+
from collections.abc import Callable, Iterator
2323
from copy import copy
2424
from enum import Enum
2525
from types import TracebackType
@@ -1184,6 +1184,92 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter:
11841184
return self
11851185

11861186

1187+
class RollingManifestWriter:
1188+
"""As opposed to ManifestWriter, a rolling writer could produce multiple manifest files."""
1189+
1190+
_ROWS_DIVISOR = 250
1191+
1192+
def __init__(
1193+
self,
1194+
supplier: Callable[[], ManifestWriter],
1195+
target_file_size_in_bytes: int,
1196+
) -> None:
1197+
self._supplier = supplier
1198+
self._target_file_size_in_bytes = target_file_size_in_bytes
1199+
self._manifest_files: list[ManifestFile] = []
1200+
self._current_writer: ManifestWriter | None = None
1201+
self._current_file_rows: int = 0
1202+
self._closed: bool = False
1203+
1204+
def __enter__(self) -> RollingManifestWriter:
1205+
"""Open the rolling manifest writer."""
1206+
return self
1207+
1208+
def __exit__(
1209+
self,
1210+
exc_type: type[BaseException] | None,
1211+
exc_value: BaseException | None,
1212+
traceback: TracebackType | None,
1213+
) -> None:
1214+
"""Close the rolling manifest writer and finalize all manifests."""
1215+
try:
1216+
self._close_current_writer(exc_type, exc_value, traceback)
1217+
finally:
1218+
self._closed = True
1219+
1220+
def _get_current_writer(self) -> ManifestWriter:
1221+
if self._should_roll_to_new_file():
1222+
self._close_current_writer()
1223+
if not self._current_writer:
1224+
self._current_writer = self._supplier()
1225+
self._current_writer.__enter__()
1226+
return self._current_writer
1227+
1228+
def _should_roll_to_new_file(self) -> bool:
1229+
if not self._current_writer or self._current_file_rows == 0:
1230+
return False
1231+
return (
1232+
self._current_file_rows % self._ROWS_DIVISOR == 0 and self._current_writer.tell() >= self._target_file_size_in_bytes
1233+
)
1234+
1235+
def _close_current_writer(
1236+
self,
1237+
exc_type: type[BaseException] | None = None,
1238+
exc_value: BaseException | None = None,
1239+
traceback: TracebackType | None = None,
1240+
) -> None:
1241+
if self._current_writer:
1242+
if self._current_file_rows > 0:
1243+
self._current_writer.__exit__(exc_type, exc_value, traceback)
1244+
self._manifest_files.append(self._current_writer.to_manifest_file())
1245+
else:
1246+
try:
1247+
self._current_writer.__exit__(None, None, None)
1248+
except ValueError:
1249+
pass
1250+
self._current_writer = None
1251+
self._current_file_rows = 0
1252+
1253+
def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter:
1254+
if self._closed:
1255+
raise RuntimeError("Cannot add entry to closed manifest writer")
1256+
self._get_current_writer().add_entry(entry)
1257+
self._current_file_rows += 1
1258+
return self
1259+
1260+
def add(self, entry: ManifestEntry) -> RollingManifestWriter:
1261+
if self._closed:
1262+
raise RuntimeError("Cannot add entry to closed manifest writer")
1263+
self._get_current_writer().add(entry)
1264+
self._current_file_rows += 1
1265+
return self
1266+
1267+
def to_manifest_files(self) -> list[ManifestFile]:
1268+
if not self._closed:
1269+
raise RuntimeError("Cannot create manifest files from unclosed writer")
1270+
return self._manifest_files
1271+
1272+
11871273
class ManifestWriterV1(ManifestWriter):
11881274
def __init__(
11891275
self,

tests/utils/test_manifest.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
18+
from collections.abc import Callable
1819
from tempfile import TemporaryDirectory
1920

2021
import fastavro
@@ -31,7 +32,9 @@
3132
ManifestEntry,
3233
ManifestEntryStatus,
3334
ManifestFile,
35+
ManifestWriter,
3436
PartitionFieldSummary,
37+
RollingManifestWriter,
3538
_manifest_cache,
3639
_manifests,
3740
read_manifest_list,
@@ -932,3 +935,98 @@ def test_manifest_writer_tell(format_version: TableVersion) -> None:
932935
after_entry_bytes = writer.tell()
933936

934937
assert after_entry_bytes > initial_bytes, "Bytes should increase after adding entry"
938+
939+
940+
@pytest.mark.parametrize("format_version", [1, 2])
941+
def test_rolling_manifest_writer_stays_in_one_file_under_target(format_version: TableVersion) -> None:
942+
with TemporaryDirectory() as tmpdir:
943+
supplier = _create_manifest_writer_supplier(
944+
tmpdir, format_version, Schema(NestedField(1, "id", IntegerType(), required=True))
945+
)
946+
entries = [_create_simple_entry(i) for i in range(100)]
947+
948+
with RollingManifestWriter(supplier=supplier, target_file_size_in_bytes=10000) as writer:
949+
for entry in entries:
950+
writer.add_entry(entry)
951+
952+
assert len(writer.to_manifest_files()) == 1
953+
954+
955+
@pytest.mark.parametrize("format_version", [1, 2])
956+
def test_rolling_manifest_writer_splits_when_over_target(format_version: TableVersion) -> None:
957+
with TemporaryDirectory() as tmpdir:
958+
supplier = _create_manifest_writer_supplier(
959+
tmpdir, format_version, Schema(NestedField(1, "id", IntegerType(), required=True))
960+
)
961+
entries = [_create_simple_entry(i) for i in range(500)]
962+
963+
with RollingManifestWriter(supplier=supplier, target_file_size_in_bytes=1) as writer:
964+
for entry in entries:
965+
writer.add_entry(entry)
966+
967+
manifest_files = writer.to_manifest_files()
968+
# writer will check size every 250 entries. Target=1 forces splits at 250 and 500.
969+
assert len(manifest_files) == 2
970+
971+
with pytest.raises(RuntimeError, match="Cannot add entry to closed"):
972+
writer.add_entry(entries[0])
973+
974+
975+
@pytest.mark.parametrize("format_version", [1, 2])
976+
def test_rolling_manifest_writer_empty(format_version: TableVersion) -> None:
977+
with TemporaryDirectory() as tmpdir:
978+
supplier = _create_manifest_writer_supplier(
979+
tmpdir, format_version, Schema(NestedField(1, "id", IntegerType(), required=True))
980+
)
981+
982+
with RollingManifestWriter(supplier=supplier, target_file_size_in_bytes=42) as writer:
983+
pass
984+
985+
assert writer.to_manifest_files() == []
986+
987+
988+
def _create_manifest_writer_supplier(
989+
tmpdir: str,
990+
format_version: TableVersion,
991+
schema: Schema,
992+
snapshot_id: int = 1,
993+
) -> Callable[[], ManifestWriter]:
994+
counter = [0]
995+
io = PyArrowFileIO()
996+
997+
def _supplier() -> ManifestWriter:
998+
output_file = io.new_output(f"{tmpdir}/manifest-{counter[0]}.avro")
999+
counter[0] += 1
1000+
return write_manifest(
1001+
format_version=format_version,
1002+
spec=UNPARTITIONED_PARTITION_SPEC,
1003+
schema=schema,
1004+
output_file=output_file,
1005+
snapshot_id=snapshot_id,
1006+
avro_compression="null",
1007+
)
1008+
1009+
return _supplier
1010+
1011+
1012+
def _create_simple_entry(
1013+
i: int,
1014+
status: ManifestEntryStatus = ManifestEntryStatus.ADDED,
1015+
sequence_number: int | None = 1,
1016+
) -> ManifestEntry:
1017+
data_file = DataFile.from_args(
1018+
content=DataFileContent.DATA,
1019+
file_path=f"data-{i}.parquet",
1020+
file_format=FileFormat.PARQUET,
1021+
partition=Record(),
1022+
record_count=1,
1023+
file_size_in_bytes=1000,
1024+
)
1025+
return ManifestEntry.from_args(
1026+
status=status,
1027+
snapshot_id=1,
1028+
sequence_number=sequence_number,
1029+
data_sequence_number=1,
1030+
file_sequence_number=1,
1031+
data_file=data_file,
1032+
)

0 commit comments

Comments
 (0)