Skip to content

Commit 118c561

Browse files
committed
Support range-based reads for deletion vectors
1 parent ec1413d commit 118c561

5 files changed

Lines changed: 227 additions & 2 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@
147147
from pyiceberg.table.locations import load_location_provider
148148
from pyiceberg.table.metadata import TableMetadata
149149
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
150-
from pyiceberg.table.puffin import PuffinFile
150+
from pyiceberg.table.puffin import PuffinFile, _bitmaps_to_chunked_array, _deserialize_dv_blob
151151
from pyiceberg.transforms import IdentityTransform, TruncateTransform
152152
from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion
153153
from pyiceberg.types import (
@@ -192,6 +192,8 @@
192192
logger = logging.getLogger(__name__)
193193

194194
ONE_MEGABYTE = 1024 * 1024
195+
# Match Iceberg Java's Integer.MAX_VALUE limit before reading a DV content range into memory.
196+
_MAX_DELETION_VECTOR_CONTENT_SIZE = 2**31 - 1
195197
BUFFER_SIZE = "buffer-size"
196198
ICEBERG_SCHEMA = b"iceberg.schema"
197199
# The PARQUET: in front means that it is Parquet specific, in this case the field_id
@@ -1116,6 +1118,27 @@ def _get_file_format(file_format: FileFormat, **kwargs: dict[str, Any]) -> ds.Fi
11161118
raise ValueError(f"Unsupported file format: {file_format}")
11171119

11181120

1121+
def _validate_deletion_vector(data_file: DataFile) -> tuple[int, int, str]:
1122+
content_offset = getattr(data_file, "content_offset", None)
1123+
content_size_in_bytes = getattr(data_file, "content_size_in_bytes", None)
1124+
referenced_data_file = getattr(data_file, "referenced_data_file", None)
1125+
1126+
if content_offset is None:
1127+
raise ValueError(f"Invalid deletion vector, content offset is missing: {data_file.file_path}")
1128+
if content_size_in_bytes is None:
1129+
raise ValueError(f"Invalid deletion vector, content size is missing: {data_file.file_path}")
1130+
if content_offset < 0:
1131+
raise ValueError(f"Invalid deletion vector, content offset cannot be negative: {content_offset}")
1132+
if content_size_in_bytes < 0:
1133+
raise ValueError(f"Invalid deletion vector, content size cannot be negative: {content_size_in_bytes}")
1134+
if content_size_in_bytes > _MAX_DELETION_VECTOR_CONTENT_SIZE:
1135+
raise ValueError(f"Cannot read deletion vector larger than 2GB: {content_size_in_bytes}")
1136+
if referenced_data_file is None:
1137+
raise ValueError(f"Invalid deletion vector, referenced data file is missing: {data_file.file_path}")
1138+
1139+
return content_offset, content_size_in_bytes, referenced_data_file
1140+
1141+
11191142
def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]:
11201143
if data_file.file_format == FileFormat.PARQUET:
11211144
with io.new_input(data_file.file_path).open() as fi:
@@ -1139,6 +1162,22 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]
11391162
}
11401163
elif data_file.file_format == FileFormat.PUFFIN:
11411164
with io.new_input(data_file.file_path).open() as fi:
1165+
content_offset = getattr(data_file, "content_offset", None)
1166+
content_size_in_bytes = getattr(data_file, "content_size_in_bytes", None)
1167+
if content_offset is not None or content_size_in_bytes is not None:
1168+
# A DV is declared as PUFFIN in the manifest, but the content range points directly
1169+
# to the serialized bitmap blob, so avoid parsing the entire file as a Puffin file.
1170+
content_offset, content_size_in_bytes, referenced_data_file = _validate_deletion_vector(data_file)
1171+
1172+
fi.seek(content_offset)
1173+
payload = fi.read(content_size_in_bytes)
1174+
if len(payload) != content_size_in_bytes:
1175+
raise ValueError(
1176+
f"Could not read deletion vector, expected {content_size_in_bytes} bytes, got {len(payload)}"
1177+
)
1178+
bitmaps = _deserialize_dv_blob(payload, data_file.record_count)
1179+
return {referenced_data_file: _bitmaps_to_chunked_array(bitmaps)}
1180+
11421181
payload = fi.read()
11431182

11441183
return PuffinFile(payload).to_vector()

pyiceberg/manifest.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,18 @@ def equality_ids(self) -> list[int] | None:
531531
def sort_order_id(self) -> int | None:
532532
return self._data[15]
533533

534+
@property
535+
def referenced_data_file(self) -> str | None:
536+
return self._data[17] if len(self._data) > 17 else None
537+
538+
@property
539+
def content_offset(self) -> int | None:
540+
return self._data[18] if len(self._data) > 18 else None
541+
542+
@property
543+
def content_size_in_bytes(self) -> int | None:
544+
return self._data[19] if len(self._data) > 19 else None
545+
534546
# Spec ID should not be stored in the file
535547
_spec_id: int
536548

pyiceberg/table/puffin.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
import math
18+
import struct
19+
import zlib
1820
from typing import TYPE_CHECKING, Literal
1921

2022
from pydantic import Field
@@ -30,6 +32,12 @@
3032
EMPTY_BITMAP = FrozenBitMap()
3133
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
3234
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
35+
_DV_BLOB_LENGTH = struct.Struct(">I")
36+
_DV_BLOB_MAGIC = struct.Struct("<I")
37+
_DV_BLOB_CRC = struct.Struct(">I")
38+
_DV_BLOB_MAGIC_NUMBER = 1681511377
39+
_ROARING_BITMAP_COUNT_SIZE_BYTES = 8
40+
_DV_BLOB_MIN_SIZE_BYTES = _DV_BLOB_LENGTH.size + _DV_BLOB_MAGIC.size + _ROARING_BITMAP_COUNT_SIZE_BYTES + _DV_BLOB_CRC.size
3341

3442

3543
def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
@@ -62,6 +70,40 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
6270
return bitmaps
6371

6472

73+
def _deserialize_dv_blob(blob: bytes, record_count: int | None = None) -> list[BitMap]:
74+
# The DV blob encoding matches Iceberg Java's BitmapPositionDeleteIndex:
75+
# 4-byte big-endian bitmap-data length, 4-byte little-endian magic number,
76+
# portable Roaring bitmap data, and 4-byte big-endian CRC-32.
77+
if len(blob) < _DV_BLOB_MIN_SIZE_BYTES:
78+
raise ValueError(f"Invalid deletion vector blob length: {len(blob)}")
79+
80+
bitmap_data_length = _DV_BLOB_LENGTH.unpack_from(blob)[0]
81+
expected_bitmap_data_length = len(blob) - _DV_BLOB_LENGTH.size - _DV_BLOB_CRC.size
82+
if bitmap_data_length != expected_bitmap_data_length:
83+
raise ValueError(f"Invalid bitmap data length: {bitmap_data_length}, expected {expected_bitmap_data_length}")
84+
85+
bitmap_data_offset = _DV_BLOB_LENGTH.size
86+
crc_offset = bitmap_data_offset + bitmap_data_length
87+
bitmap_data = blob[bitmap_data_offset:crc_offset]
88+
89+
magic_number = _DV_BLOB_MAGIC.unpack_from(bitmap_data)[0]
90+
if magic_number != _DV_BLOB_MAGIC_NUMBER:
91+
raise ValueError(f"Invalid magic number: {magic_number}, expected {_DV_BLOB_MAGIC_NUMBER}")
92+
93+
checksum = zlib.crc32(bitmap_data) & 0xFFFFFFFF
94+
expected_checksum = _DV_BLOB_CRC.unpack_from(blob, crc_offset)[0]
95+
if checksum != expected_checksum:
96+
raise ValueError("Invalid CRC")
97+
98+
bitmaps = _deserialize_bitmap(bitmap_data[_DV_BLOB_MAGIC.size :])
99+
if record_count is not None:
100+
cardinality = sum(len(bitmap) for bitmap in bitmaps)
101+
if cardinality != record_count:
102+
raise ValueError(f"Invalid cardinality: {cardinality}, expected {record_count}")
103+
104+
return bitmaps
105+
106+
65107
class PuffinBlobMetadata(IcebergBaseModel):
66108
type: Literal["deletion-vector-v1"] = Field()
67109
fields: list[int] = Field()

tests/io/test_pyarrow.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
# pylint: disable=protected-access,unused-argument,redefined-outer-name
18+
import json
1819
import logging
1920
import os
21+
import struct
2022
import tempfile
2123
import uuid
2224
import warnings
25+
import zlib
2326
from collections.abc import Iterator
2427
from datetime import date, datetime, timezone
2528
from pathlib import Path
@@ -34,6 +37,7 @@
3437
import pytest
3538
from packaging import version
3639
from pyarrow.fs import AwsDefaultS3RetryStrategy, FileType, LocalFileSystem, S3FileSystem
40+
from pyroaring import BitMap
3741

3842
from pyiceberg.exceptions import ResolveError
3943
from pyiceberg.expressions import (
@@ -91,6 +95,11 @@
9195
from pyiceberg.table import FileScanTask, TableProperties
9296
from pyiceberg.table.metadata import TableMetadataV2
9397
from pyiceberg.table.name_mapping import create_mapping_from_schema
98+
from pyiceberg.table.puffin import (
99+
_DV_BLOB_MAGIC_NUMBER,
100+
MAGIC_BYTES,
101+
PROPERTY_REFERENCED_DATA_FILE,
102+
)
94103
from pyiceberg.transforms import HourTransform, IdentityTransform
95104
from pyiceberg.typedef import UTF8, Properties, Record, TableVersion
96105
from pyiceberg.types import (
@@ -1820,6 +1829,86 @@ def test_read_deletes(deletes_file: str, request: pytest.FixtureRequest) -> None
18201829
assert list(deletes.values())[0] == pa.chunked_array([[1, 3, 5]])
18211830

18221831

1832+
def _deletion_vector_bitmap_payload() -> bytes:
1833+
return (1).to_bytes(8, byteorder="little") + (0).to_bytes(4, byteorder="little") + BitMap([1, 3, 5]).serialize()
1834+
1835+
1836+
def _deletion_vector_blob(bitmap_payload: bytes) -> bytes:
1837+
bitmap_data = struct.pack("<I", _DV_BLOB_MAGIC_NUMBER) + bitmap_payload
1838+
return struct.pack(">I", len(bitmap_data)) + bitmap_data + struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF)
1839+
1840+
1841+
def test_read_deletion_vector_from_puffin_file(tmp_path: Path) -> None:
1842+
referenced_data_file = f"{tmp_path}/data.parquet"
1843+
bitmap_payload = _deletion_vector_bitmap_payload()
1844+
footer_payload = json.dumps(
1845+
{
1846+
"blobs": [
1847+
{
1848+
"type": "deletion-vector-v1",
1849+
"fields": [2147483546],
1850+
"snapshot-id": 1,
1851+
"sequence-number": 1,
1852+
"offset": 0,
1853+
"length": len(bitmap_payload),
1854+
"properties": {PROPERTY_REFERENCED_DATA_FILE: referenced_data_file},
1855+
}
1856+
],
1857+
"properties": {},
1858+
}
1859+
).encode()
1860+
puffin_payload = (
1861+
MAGIC_BYTES
1862+
+ b"\x00\x00\x00\x00"
1863+
+ bitmap_payload
1864+
+ footer_payload
1865+
+ len(footer_payload).to_bytes(4, byteorder="little")
1866+
+ b"\x00\x00\x00\x00"
1867+
+ MAGIC_BYTES
1868+
)
1869+
delete_file_path = f"{tmp_path}/deletes.puffin"
1870+
1871+
with open(delete_file_path, "wb") as f:
1872+
f.write(puffin_payload)
1873+
1874+
deletes = _read_deletes(
1875+
PyArrowFileIO(),
1876+
DataFile.from_args(
1877+
content=DataFileContent.POSITION_DELETES,
1878+
file_path=delete_file_path,
1879+
file_format=FileFormat.PUFFIN,
1880+
),
1881+
)
1882+
1883+
assert deletes == {referenced_data_file: pa.chunked_array([[1, 3, 5]])}
1884+
1885+
1886+
def test_read_deletion_vector_blob_from_content_range(tmp_path: Path) -> None:
1887+
referenced_data_file = f"{tmp_path}/data.parquet"
1888+
dv_blob = _deletion_vector_blob(_deletion_vector_bitmap_payload())
1889+
prefix = b"\x01not-a-puffin-file"
1890+
delete_file_path = f"{tmp_path}/deletes.bin"
1891+
1892+
with open(delete_file_path, "wb") as f:
1893+
f.write(prefix + dv_blob + b"trailing-bytes")
1894+
1895+
deletes = _read_deletes(
1896+
PyArrowFileIO(),
1897+
DataFile.from_args(
1898+
_table_format_version=3,
1899+
content=DataFileContent.POSITION_DELETES,
1900+
file_path=delete_file_path,
1901+
file_format=FileFormat.PUFFIN,
1902+
record_count=3,
1903+
referenced_data_file=referenced_data_file,
1904+
content_offset=len(prefix),
1905+
content_size_in_bytes=len(dv_blob),
1906+
),
1907+
)
1908+
1909+
assert deletes == {referenced_data_file: pa.chunked_array([[1, 3, 5]])}
1910+
1911+
18231912
def test_delete(deletes_file: str, request: pytest.FixtureRequest, table_schema_simple: Schema) -> None:
18241913
# Determine file format from the file extension
18251914
file_format = FileFormat.PARQUET if deletes_file.endswith(".parquet") else FileFormat.ORC

tests/table/test_puffin.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
import struct
18+
import zlib
1719
from os import path
1820

1921
import pytest
2022
from pyroaring import BitMap
2123

22-
from pyiceberg.table.puffin import _deserialize_bitmap
24+
from pyiceberg.table.puffin import _DV_BLOB_MAGIC_NUMBER, _deserialize_bitmap, _deserialize_dv_blob
2325

2426

2527
def _open_file(file: str) -> bytes:
@@ -28,6 +30,47 @@ def _open_file(file: str) -> bytes:
2830
return f.read()
2931

3032

33+
def _dv_blob(bitmap_payload: bytes) -> bytes:
34+
bitmap_data = struct.pack("<I", _DV_BLOB_MAGIC_NUMBER) + bitmap_payload
35+
return struct.pack(">I", len(bitmap_data)) + bitmap_data + struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF)
36+
37+
38+
def _bitmap_payload() -> bytes:
39+
return (1).to_bytes(8, byteorder="little") + (0).to_bytes(4, byteorder="little") + BitMap([1, 3, 5]).serialize()
40+
41+
42+
def test_deserialize_deletion_vector_blob() -> None:
43+
actual = _deserialize_dv_blob(_dv_blob(_bitmap_payload()), record_count=3)
44+
45+
assert actual == [BitMap([1, 3, 5])]
46+
47+
48+
def test_deserialize_deletion_vector_blob_invalid_length() -> None:
49+
with pytest.raises(ValueError, match="Invalid bitmap data length"):
50+
_deserialize_dv_blob(_dv_blob(_bitmap_payload())[:-1])
51+
52+
53+
def test_deserialize_deletion_vector_blob_invalid_magic() -> None:
54+
bitmap_data = struct.pack("<I", _DV_BLOB_MAGIC_NUMBER + 1) + _bitmap_payload()
55+
blob = struct.pack(">I", len(bitmap_data)) + bitmap_data + struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF)
56+
57+
with pytest.raises(ValueError, match="Invalid magic number"):
58+
_deserialize_dv_blob(blob)
59+
60+
61+
def test_deserialize_deletion_vector_blob_invalid_crc() -> None:
62+
blob = bytearray(_dv_blob(_bitmap_payload()))
63+
blob[-1] ^= 1
64+
65+
with pytest.raises(ValueError, match="Invalid CRC"):
66+
_deserialize_dv_blob(bytes(blob))
67+
68+
69+
def test_deserialize_deletion_vector_blob_invalid_cardinality() -> None:
70+
with pytest.raises(ValueError, match="Invalid cardinality"):
71+
_deserialize_dv_blob(_dv_blob(_bitmap_payload()), record_count=4)
72+
73+
3174
def test_map_empty() -> None:
3275
puffin = _open_file("64mapempty.bin")
3376

0 commit comments

Comments
 (0)