-
Notifications
You must be signed in to change notification settings - Fork 521
Expand file tree
/
Copy pathtest_puffin.py
More file actions
175 lines (132 loc) · 5.97 KB
/
Copy pathtest_puffin.py
File metadata and controls
175 lines (132 loc) · 5.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import zlib
from os import path
import pytest
from pyroaring import BitMap
from pyiceberg.table.puffin import (
DELETION_VECTOR_MAGIC,
MAGIC_BYTES,
PROPERTY_REFERENCED_DATA_FILE,
PuffinFile,
PuffinWriter,
_deserialize_bitmap,
)
def _open_file(file: str) -> bytes:
cur_dir = path.dirname(path.realpath(__file__))
with open(f"{cur_dir}/bitmaps/{file}", "rb") as f:
return f.read()
def test_map_empty() -> None:
puffin = _open_file("64mapempty.bin")
expected: list[BitMap] = []
actual = _deserialize_bitmap(puffin)
assert expected == actual
def test_map_bitvals() -> None:
puffin = _open_file("64map32bitvals.bin")
expected = [BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]
actual = _deserialize_bitmap(puffin)
assert expected == actual
def test_map_spread_vals() -> None:
puffin = _open_file("64mapspreadvals.bin")
expected = [
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
]
actual = _deserialize_bitmap(puffin)
assert expected == actual
def test_map_high_vals() -> None:
puffin = _open_file("64maphighvals.bin")
with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"):
_ = _deserialize_bitmap(puffin)
def test_puffin_round_trip() -> None:
# Define some deletion positions for a file
deletions = [5, (1 << 32) + 1, 5] # Test with a high-bit position and duplicate
file_path = "path/to/data.parquet"
# Write the Puffin file
writer = PuffinWriter(created_by="my-test-app")
writer.set_blob(positions=deletions, referenced_data_file=file_path)
puffin_bytes = writer.finish()
# Read the Puffin file back
reader = PuffinFile(puffin_bytes)
# Assert footer metadata
assert reader.footer.properties["created-by"] == "my-test-app"
assert len(reader.footer.blobs) == 1
blob_meta = reader.footer.blobs[0]
assert blob_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file_path
assert blob_meta.properties["cardinality"] == str(len(set(deletions)))
# Assert the content of deletion vectors
read_vectors = reader.to_vector()
assert file_path in read_vectors
assert read_vectors[file_path].to_pylist() == sorted(set(deletions))
def test_write_and_read_puffin_file() -> None:
writer = PuffinWriter()
writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet")
writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet")
puffin_bytes = writer.finish()
reader = PuffinFile(puffin_bytes)
assert len(reader.footer.blobs) == 1
blob = reader.footer.blobs[0]
assert blob.properties["referenced-data-file"] == "file2.parquet"
assert blob.properties["cardinality"] == "3"
assert blob.type == "deletion-vector-v1"
# Reserved field id of the row position column (Java MetadataColumns.ROW_POSITION, INT_MAX - 2);
# required for Java/Spark interoperability.
assert blob.fields == [2147483645]
assert blob.snapshot_id == -1
assert blob.sequence_number == -1
assert blob.compression_codec is None
vectors = reader.to_vector()
assert len(vectors) == 1
assert "file1.parquet" not in vectors
assert vectors["file2.parquet"].to_pylist() == [4, 5, 6]
def test_deletion_vector_blob_framing_is_spec_compliant() -> None:
# PuffinFile reads only the serialized vector, skipping the blob's length prefix,
# deletion-vector magic and CRC-32. Assert that framing directly at the byte level so
# the bytes an external reader (Java/Spark) relies on stay spec-compliant.
positions = [0, 1, 5, (1 << 32) + 7]
writer = PuffinWriter()
writer.set_blob(positions=positions, referenced_data_file="file.parquet")
puffin_bytes = writer.finish()
# The Puffin file begins with the magic.
assert puffin_bytes[:4] == MAGIC_BYTES
blob = PuffinFile(puffin_bytes).footer.blobs[0]
blob_bytes = puffin_bytes[blob.offset : blob.offset + blob.length]
# Layout: length (4B big-endian) | DV magic (4B) | vector | CRC-32 (4B big-endian),
# where the length and CRC-32 both cover the magic bytes plus the vector.
length_prefix = int.from_bytes(blob_bytes[0:4], "big")
dv_magic = blob_bytes[4:8]
vector = blob_bytes[8 : 4 + length_prefix]
crc = int.from_bytes(blob_bytes[4 + length_prefix : 8 + length_prefix], "big")
assert dv_magic == DELETION_VECTOR_MAGIC
assert length_prefix == len(dv_magic) + len(vector)
assert blob.length == 4 + length_prefix + 4
assert crc == zlib.crc32(dv_magic + vector)
def test_puffin_file_with_no_blobs() -> None:
writer = PuffinWriter()
puffin_bytes = writer.finish()
reader = PuffinFile(puffin_bytes)
assert len(reader.footer.blobs) == 0
assert len(reader.to_vector()) == 0
assert "created-by" not in reader.footer.properties