Skip to content

Commit fdc8d3b

Browse files
committed
Support range-based reads for deletion vectors
1 parent ec1413d commit fdc8d3b

5 files changed

Lines changed: 181 additions & 2 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@
147147
from pyiceberg.table.locations import load_location_provider
148148
from pyiceberg.table.metadata import TableMetadata
149149
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
150-
from pyiceberg.table.puffin import PuffinFile
150+
from pyiceberg.table.puffin import PuffinFile, _bitmaps_to_chunked_array, _deserialize_dv_blob
151151
from pyiceberg.transforms import IdentityTransform, TruncateTransform
152152
from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion
153153
from pyiceberg.types import (
@@ -192,6 +192,8 @@
192192
logger = logging.getLogger(__name__)
193193

194194
ONE_MEGABYTE = 1024 * 1024
195+
# Match Iceberg Java's Integer.MAX_VALUE limit before reading a DV content range into memory.
196+
_MAX_DELETION_VECTOR_CONTENT_SIZE = 2**31 - 1
195197
BUFFER_SIZE = "buffer-size"
196198
ICEBERG_SCHEMA = b"iceberg.schema"
197199
# The PARQUET: in front means that it is Parquet specific, in this case the field_id
@@ -1139,6 +1141,32 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]
11391141
}
11401142
elif data_file.file_format == FileFormat.PUFFIN:
11411143
with io.new_input(data_file.file_path).open() as fi:
1144+
content_offset = data_file.content_offset
1145+
content_size_in_bytes = data_file.content_size_in_bytes
1146+
referenced_data_file = data_file.referenced_data_file
1147+
if content_offset is not None or content_size_in_bytes is not None:
1148+
if content_offset is None:
1149+
raise ValueError(f"Invalid deletion vector, content offset is missing: {data_file.file_path}")
1150+
if content_size_in_bytes is None:
1151+
raise ValueError(f"Invalid deletion vector, content size is missing: {data_file.file_path}")
1152+
if content_offset < 0:
1153+
raise ValueError(f"Invalid deletion vector, content offset cannot be negative: {content_offset}")
1154+
if content_size_in_bytes < 0:
1155+
raise ValueError(f"Invalid deletion vector, content size cannot be negative: {content_size_in_bytes}")
1156+
if content_size_in_bytes > _MAX_DELETION_VECTOR_CONTENT_SIZE:
1157+
raise ValueError(f"Cannot read deletion vector larger than 2GB: {content_size_in_bytes}")
1158+
if referenced_data_file is None:
1159+
raise ValueError(f"Invalid deletion vector, referenced data file is missing: {data_file.file_path}")
1160+
1161+
fi.seek(content_offset)
1162+
payload = fi.read(content_size_in_bytes)
1163+
if len(payload) != content_size_in_bytes:
1164+
raise ValueError(
1165+
f"Could not read deletion vector, expected {content_size_in_bytes} bytes, got {len(payload)}"
1166+
)
1167+
bitmaps = _deserialize_dv_blob(payload, data_file.record_count)
1168+
return {referenced_data_file: _bitmaps_to_chunked_array(bitmaps)}
1169+
11421170
payload = fi.read()
11431171

11441172
return PuffinFile(payload).to_vector()

pyiceberg/manifest.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,22 @@ def equality_ids(self) -> list[int] | None:
531531
def sort_order_id(self) -> int | None:
532532
return self._data[15]
533533

534+
@property
535+
def first_row_id(self) -> int | None:
536+
return self._data[16] if len(self._data) > 16 else None
537+
538+
@property
539+
def referenced_data_file(self) -> str | None:
540+
return self._data[17] if len(self._data) > 17 else None
541+
542+
@property
543+
def content_offset(self) -> int | None:
544+
return self._data[18] if len(self._data) > 18 else None
545+
546+
@property
547+
def content_size_in_bytes(self) -> int | None:
548+
return self._data[19] if len(self._data) > 19 else None
549+
534550
# Spec ID should not be stored in the file
535551
_spec_id: int
536552

pyiceberg/table/puffin.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
import math
18+
import struct
19+
import zlib
1820
from typing import TYPE_CHECKING, Literal
1921

2022
from pydantic import Field
@@ -30,6 +32,14 @@
3032
EMPTY_BITMAP = FrozenBitMap()
3133
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
3234
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
35+
_DV_BLOB_LENGTH = struct.Struct(">I")
36+
_DV_BLOB_MAGIC = struct.Struct("<I")
37+
_DV_BLOB_CRC = struct.Struct(">I")
38+
_DV_BLOB_MAGIC_NUMBER = 1681511377
39+
_ROARING_BITMAP_COUNT_SIZE_BYTES = 8
40+
_DV_BLOB_MIN_SIZE_BYTES = (
41+
_DV_BLOB_LENGTH.size + _DV_BLOB_MAGIC.size + _ROARING_BITMAP_COUNT_SIZE_BYTES + _DV_BLOB_CRC.size
42+
)
3343

3444

3545
def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
@@ -62,6 +72,40 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
6272
return bitmaps
6373

6474

75+
def _deserialize_dv_blob(blob: bytes, record_count: int | None = None) -> list[BitMap]:
76+
# The DV blob encoding matches Iceberg Java's BitmapPositionDeleteIndex:
77+
# 4-byte big-endian bitmap-data length, 4-byte little-endian magic number,
78+
# portable Roaring bitmap data, and 4-byte big-endian CRC-32.
79+
if len(blob) < _DV_BLOB_MIN_SIZE_BYTES:
80+
raise ValueError(f"Invalid deletion vector blob length: {len(blob)}")
81+
82+
bitmap_data_length = _DV_BLOB_LENGTH.unpack_from(blob)[0]
83+
expected_bitmap_data_length = len(blob) - _DV_BLOB_LENGTH.size - _DV_BLOB_CRC.size
84+
if bitmap_data_length != expected_bitmap_data_length:
85+
raise ValueError(f"Invalid bitmap data length: {bitmap_data_length}, expected {expected_bitmap_data_length}")
86+
87+
bitmap_data_offset = _DV_BLOB_LENGTH.size
88+
crc_offset = bitmap_data_offset + bitmap_data_length
89+
bitmap_data = blob[bitmap_data_offset:crc_offset]
90+
91+
magic_number = _DV_BLOB_MAGIC.unpack_from(bitmap_data)[0]
92+
if magic_number != _DV_BLOB_MAGIC_NUMBER:
93+
raise ValueError(f"Invalid magic number: {magic_number}, expected {_DV_BLOB_MAGIC_NUMBER}")
94+
95+
checksum = zlib.crc32(bitmap_data) & 0xFFFFFFFF
96+
expected_checksum = _DV_BLOB_CRC.unpack_from(blob, crc_offset)[0]
97+
if checksum != expected_checksum:
98+
raise ValueError("Invalid CRC")
99+
100+
bitmaps = _deserialize_bitmap(bitmap_data[_DV_BLOB_MAGIC.size :])
101+
if record_count is not None:
102+
cardinality = sum(len(bitmap) for bitmap in bitmaps)
103+
if cardinality != record_count:
104+
raise ValueError(f"Invalid cardinality: {cardinality}, expected {record_count}")
105+
106+
return bitmaps
107+
108+
65109
class PuffinBlobMetadata(IcebergBaseModel):
66110
type: Literal["deletion-vector-v1"] = Field()
67111
fields: list[int] = Field()

tests/io/test_pyarrow.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
# pylint: disable=protected-access,unused-argument,redefined-outer-name
1818
import logging
1919
import os
20+
import struct
2021
import tempfile
2122
import uuid
2223
import warnings
24+
import zlib
2325
from collections.abc import Iterator
2426
from datetime import date, datetime, timezone
2527
from pathlib import Path
@@ -34,6 +36,7 @@
3436
import pytest
3537
from packaging import version
3638
from pyarrow.fs import AwsDefaultS3RetryStrategy, FileType, LocalFileSystem, S3FileSystem
39+
from pyroaring import BitMap
3740

3841
from pyiceberg.exceptions import ResolveError
3942
from pyiceberg.expressions import (
@@ -91,6 +94,7 @@
9194
from pyiceberg.table import FileScanTask, TableProperties
9295
from pyiceberg.table.metadata import TableMetadataV2
9396
from pyiceberg.table.name_mapping import create_mapping_from_schema
97+
from pyiceberg.table.puffin import _DV_BLOB_MAGIC_NUMBER
9498
from pyiceberg.transforms import HourTransform, IdentityTransform
9599
from pyiceberg.typedef import UTF8, Properties, Record, TableVersion
96100
from pyiceberg.types import (
@@ -1820,6 +1824,42 @@ def test_read_deletes(deletes_file: str, request: pytest.FixtureRequest) -> None
18201824
assert list(deletes.values())[0] == pa.chunked_array([[1, 3, 5]])
18211825

18221826

1827+
def test_read_deletion_vector_blob_from_content_range(tmp_path: Path) -> None:
1828+
referenced_data_file = f"{tmp_path}/data.parquet"
1829+
bitmap_payload = (
1830+
(1).to_bytes(8, byteorder="little")
1831+
+ (0).to_bytes(4, byteorder="little")
1832+
+ BitMap([1, 3, 5]).serialize()
1833+
)
1834+
bitmap_data = struct.pack("<I", _DV_BLOB_MAGIC_NUMBER) + bitmap_payload
1835+
dv_blob = (
1836+
struct.pack(">I", len(bitmap_data))
1837+
+ bitmap_data
1838+
+ struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF)
1839+
)
1840+
prefix = b"\x01not-a-puffin-file"
1841+
delete_file_path = f"{tmp_path}/deletes.bin"
1842+
1843+
with open(delete_file_path, "wb") as f:
1844+
f.write(prefix + dv_blob + b"trailing-bytes")
1845+
1846+
deletes = _read_deletes(
1847+
PyArrowFileIO(),
1848+
DataFile.from_args(
1849+
_table_format_version=3,
1850+
content=DataFileContent.POSITION_DELETES,
1851+
file_path=delete_file_path,
1852+
file_format=FileFormat.PUFFIN,
1853+
record_count=3,
1854+
referenced_data_file=referenced_data_file,
1855+
content_offset=len(prefix),
1856+
content_size_in_bytes=len(dv_blob),
1857+
),
1858+
)
1859+
1860+
assert deletes == {referenced_data_file: pa.chunked_array([[1, 3, 5]])}
1861+
1862+
18231863
def test_delete(deletes_file: str, request: pytest.FixtureRequest, table_schema_simple: Schema) -> None:
18241864
# Determine file format from the file extension
18251865
file_format = FileFormat.PARQUET if deletes_file.endswith(".parquet") else FileFormat.ORC

tests/table/test_puffin.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
import struct
18+
import zlib
1719
from os import path
1820

1921
import pytest
2022
from pyroaring import BitMap
2123

22-
from pyiceberg.table.puffin import _deserialize_bitmap
24+
from pyiceberg.table.puffin import _DV_BLOB_MAGIC_NUMBER, _deserialize_bitmap, _deserialize_dv_blob
2325

2426

2527
def _open_file(file: str) -> bytes:
@@ -28,6 +30,55 @@ def _open_file(file: str) -> bytes:
2830
return f.read()
2931

3032

33+
def _dv_blob(bitmap_payload: bytes) -> bytes:
34+
bitmap_data = struct.pack("<I", _DV_BLOB_MAGIC_NUMBER) + bitmap_payload
35+
return struct.pack(">I", len(bitmap_data)) + bitmap_data + struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF)
36+
37+
38+
def _bitmap_payload() -> bytes:
39+
return (
40+
(1).to_bytes(8, byteorder="little")
41+
+ (0).to_bytes(4, byteorder="little")
42+
+ BitMap([1, 3, 5]).serialize()
43+
)
44+
45+
46+
def test_deserialize_deletion_vector_blob() -> None:
47+
actual = _deserialize_dv_blob(_dv_blob(_bitmap_payload()), record_count=3)
48+
49+
assert actual == [BitMap([1, 3, 5])]
50+
51+
52+
def test_deserialize_deletion_vector_blob_invalid_length() -> None:
53+
with pytest.raises(ValueError, match="Invalid bitmap data length"):
54+
_deserialize_dv_blob(_dv_blob(_bitmap_payload())[:-1])
55+
56+
57+
def test_deserialize_deletion_vector_blob_invalid_magic() -> None:
58+
bitmap_data = struct.pack("<I", _DV_BLOB_MAGIC_NUMBER + 1) + _bitmap_payload()
59+
blob = (
60+
struct.pack(">I", len(bitmap_data))
61+
+ bitmap_data
62+
+ struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF)
63+
)
64+
65+
with pytest.raises(ValueError, match="Invalid magic number"):
66+
_deserialize_dv_blob(blob)
67+
68+
69+
def test_deserialize_deletion_vector_blob_invalid_crc() -> None:
70+
blob = bytearray(_dv_blob(_bitmap_payload()))
71+
blob[-1] ^= 1
72+
73+
with pytest.raises(ValueError, match="Invalid CRC"):
74+
_deserialize_dv_blob(bytes(blob))
75+
76+
77+
def test_deserialize_deletion_vector_blob_invalid_cardinality() -> None:
78+
with pytest.raises(ValueError, match="Invalid cardinality"):
79+
_deserialize_dv_blob(_dv_blob(_bitmap_payload()), record_count=4)
80+
81+
3182
def test_map_empty() -> None:
3283
puffin = _open_file("64mapempty.bin")
3384

0 commit comments

Comments
 (0)