Skip to content

Commit cf9a2ce

Browse files
committed
Extract DeletionVector logic from PuffinFile
1 parent ec1413d commit cf9a2ce

4 files changed

Lines changed: 91 additions & 58 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@
147147
from pyiceberg.table.locations import load_location_provider
148148
from pyiceberg.table.metadata import TableMetadata
149149
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
150-
from pyiceberg.table.puffin import PuffinFile
150+
from pyiceberg.table.update.deletion_vector import DeletionVector
151151
from pyiceberg.transforms import IdentityTransform, TruncateTransform
152152
from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion
153153
from pyiceberg.types import (
@@ -1141,7 +1141,7 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]
11411141
with io.new_input(data_file.file_path).open() as fi:
11421142
payload = fi.read()
11431143

1144-
return PuffinFile(payload).to_vector()
1144+
return DeletionVector(payload).to_vector()
11451145
else:
11461146
raise ValueError(f"Delete file format not supported: {data_file.file_format}")
11471147

pyiceberg/table/puffin.py

Lines changed: 5 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -14,52 +14,17 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
import math
1817
from typing import TYPE_CHECKING, Literal
1918

2019
from pydantic import Field
21-
from pyroaring import BitMap, FrozenBitMap
2220

2321
from pyiceberg.typedef import IcebergBaseModel
2422

2523
if TYPE_CHECKING:
26-
import pyarrow as pa
24+
pass
2725

2826
# Short for: Puffin Fratercula arctica, version 1
2927
MAGIC_BYTES = b"PFA1"
30-
EMPTY_BITMAP = FrozenBitMap()
31-
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
32-
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
33-
34-
35-
def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
36-
number_of_bitmaps = int.from_bytes(pl[0:8], byteorder="little")
37-
pl = pl[8:]
38-
39-
bitmaps = []
40-
last_key = -1
41-
for _ in range(number_of_bitmaps):
42-
key = int.from_bytes(pl[0:4], byteorder="little")
43-
if key < 0:
44-
raise ValueError(f"Invalid unsigned key: {key}")
45-
if key <= last_key:
46-
raise ValueError("Keys must be sorted in ascending order")
47-
if key > MAX_JAVA_SIGNED:
48-
raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl")
49-
pl = pl[4:]
50-
51-
while last_key < key - 1:
52-
bitmaps.append(EMPTY_BITMAP)
53-
last_key += 1
54-
55-
bm = BitMap().deserialize(pl)
56-
# TODO: Optimize this
57-
pl = pl[len(bm.serialize()) :]
58-
bitmaps.append(bm)
59-
60-
last_key = key
61-
62-
return bitmaps
6328

6429

6530
class PuffinBlobMetadata(IcebergBaseModel):
@@ -78,15 +43,9 @@ class Footer(IcebergBaseModel):
7843
properties: dict[str, str] = Field(default_factory=dict)
7944

8045

81-
def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray":
82-
import pyarrow as pa
83-
84-
return pa.chunked_array([(key_pos << 32) + pos for pos in bitmap] for key_pos, bitmap in enumerate(bitmaps))
85-
86-
8746
class PuffinFile:
8847
footer: Footer
89-
_deletion_vectors: dict[str, list[BitMap]]
48+
_payload: bytes
9049

9150
def __init__(self, puffin: bytes) -> None:
9251
for magic_bytes in [puffin[:4], puffin[-4:]]:
@@ -105,12 +64,7 @@ def __init__(self, puffin: bytes) -> None:
10564
footer_payload_size_int = int.from_bytes(puffin[-12:-8], byteorder="little")
10665

10766
self.footer = Footer.model_validate_json(puffin[-(footer_payload_size_int + 12) : -12])
108-
puffin = puffin[8:]
109-
110-
self._deletion_vectors = {
111-
blob.properties[PROPERTY_REFERENCED_DATA_FILE]: _deserialize_bitmap(puffin[blob.offset : blob.offset + blob.length])
112-
for blob in self.footer.blobs
113-
}
67+
self._payload = puffin[8:]
11468

115-
def to_vector(self) -> dict[str, "pa.ChunkedArray"]:
116-
return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()}
69+
def get_blob_payload(self, blob: PuffinBlobMetadata) -> bytes:
70+
return self._payload[blob.offset : blob.offset + blob.length]
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import math
18+
from typing import TYPE_CHECKING
19+
20+
from pyroaring import BitMap, FrozenBitMap
21+
22+
from pyiceberg.table.puffin import PuffinFile
23+
24+
if TYPE_CHECKING:
25+
import pyarrow as pa
26+
27+
EMPTY_BITMAP = FrozenBitMap()
28+
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
29+
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
30+
31+
32+
class DeletionVector:
33+
_deletion_vectors: dict[str, list[BitMap]]
34+
35+
def __init__(self, puffin: bytes) -> None:
36+
puffin_file = PuffinFile(puffin)
37+
self._deletion_vectors = {
38+
blob.properties[PROPERTY_REFERENCED_DATA_FILE]: self._deserialize_bitmap(puffin_file.get_blob_payload(blob))
39+
for blob in puffin_file.footer.blobs
40+
}
41+
42+
@staticmethod
43+
def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
44+
number_of_bitmaps = int.from_bytes(pl[0:8], byteorder="little")
45+
pl = pl[8:]
46+
47+
bitmaps = []
48+
last_key = -1
49+
for _ in range(number_of_bitmaps):
50+
key = int.from_bytes(pl[0:4], byteorder="little")
51+
if key < 0:
52+
raise ValueError(f"Invalid unsigned key: {key}")
53+
if key <= last_key:
54+
raise ValueError("Keys must be sorted in ascending order")
55+
if key > MAX_JAVA_SIGNED:
56+
raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl")
57+
pl = pl[4:]
58+
59+
while last_key < key - 1:
60+
bitmaps.append(EMPTY_BITMAP)
61+
last_key += 1
62+
63+
bm = BitMap().deserialize(pl)
64+
# TODO: Optimize this
65+
pl = pl[len(bm.serialize()) :]
66+
bitmaps.append(bm)
67+
68+
last_key = key
69+
70+
return bitmaps
71+
72+
@staticmethod
73+
def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray":
74+
import pyarrow as pa
75+
76+
return pa.chunked_array([(key_pos << 32) + pos for pos in bitmap] for key_pos, bitmap in enumerate(bitmaps))
77+
78+
def to_vector(self) -> dict[str, "pa.ChunkedArray"]:
79+
return {path: self._bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()}

tests/table/test_puffin.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import pytest
2020
from pyroaring import BitMap
2121

22-
from pyiceberg.table.puffin import _deserialize_bitmap
22+
from pyiceberg.table.update.deletion_vector import DeletionVector
2323

2424

2525
def _open_file(file: str) -> bytes:
@@ -32,7 +32,7 @@ def test_map_empty() -> None:
3232
puffin = _open_file("64mapempty.bin")
3333

3434
expected: list[BitMap] = []
35-
actual = _deserialize_bitmap(puffin)
35+
actual = DeletionVector._deserialize_bitmap(puffin)
3636

3737
assert expected == actual
3838

@@ -41,7 +41,7 @@ def test_map_bitvals() -> None:
4141
puffin = _open_file("64map32bitvals.bin")
4242

4343
expected = [BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]
44-
actual = _deserialize_bitmap(puffin)
44+
actual = DeletionVector._deserialize_bitmap(puffin)
4545

4646
assert expected == actual
4747

@@ -61,7 +61,7 @@ def test_map_spread_vals() -> None:
6161
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
6262
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
6363
]
64-
actual = _deserialize_bitmap(puffin)
64+
actual = DeletionVector._deserialize_bitmap(puffin)
6565

6666
assert expected == actual
6767

@@ -70,4 +70,4 @@ def test_map_high_vals() -> None:
7070
puffin = _open_file("64maphighvals.bin")
7171

7272
with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"):
73-
_ = _deserialize_bitmap(puffin)
73+
_ = DeletionVector._deserialize_bitmap(puffin)

0 commit comments

Comments
 (0)