Skip to content

Commit d1ee092

Browse files
committed
delete file index
1 parent 4b3ccbb commit d1ee092

4 files changed

Lines changed: 193 additions & 22 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2060,7 +2060,7 @@ def _plan_files_server_side(self) -> Iterable[FileScanTask]:
20602060
def _plan_files_local(self) -> Iterable[FileScanTask]:
20612061
"""Plan files locally by reading manifests."""
20622062
data_entries: list[ManifestEntry] = []
2063-
delete_index = DeleteFileIndex()
2063+
delete_index = DeleteFileIndex(self.table_metadata.schema())
20642064

20652065
residual_evaluators: dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)
20662066

pyiceberg/table/delete_file_index.py

Lines changed: 98 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,17 @@
1717
from __future__ import annotations
1818

1919
from bisect import bisect_left
20+
from typing import TYPE_CHECKING
2021

22+
from pyiceberg.conversions import from_bytes
2123
from pyiceberg.expressions import EqualTo
2224
from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator
23-
from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, ManifestEntry
25+
from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, DataFileContent, ManifestEntry
2426
from pyiceberg.typedef import Record
2527

28+
if TYPE_CHECKING:
29+
from pyiceberg.schema import Schema
30+
2631
PATH_FIELD_ID = 2147483546
2732

2833

@@ -59,6 +64,15 @@ def referenced_delete_files(self) -> list[DataFile]:
5964
return [data_file for data_file, _ in self._files]
6065

6166

67+
class EqualityDeletes(PositionDeletes):
68+
"""Collects equality delete files and indexes them by sequence number."""
69+
70+
def add(self, delete_file: DataFile, seq_num: int) -> None:
71+
# Equality deletes are indexed by sequence number - 1 to ensure they only
72+
# apply to data files added in strictly earlier snapshots.
73+
super().add(delete_file, seq_num - 1)
74+
75+
6276
def _has_path_bounds(delete_file: DataFile) -> bool:
6377
lower = delete_file.lower_bounds
6478
upper = delete_file.upper_bounds
@@ -76,6 +90,36 @@ def _applies_to_data_file(delete_file: DataFile, data_file: DataFile) -> bool:
7690
return evaluator.eval(delete_file)
7791

7892

93+
def _eq_applies_to_data_file(eq_delete_file: DataFile, data_file: DataFile, schema: Schema) -> bool:
94+
if not eq_delete_file.equality_ids:
95+
return True
96+
97+
for field_id in eq_delete_file.equality_ids:
98+
if (
99+
eq_delete_file.lower_bounds
100+
and eq_delete_file.upper_bounds
101+
and data_file.lower_bounds
102+
and data_file.upper_bounds
103+
and field_id in eq_delete_file.lower_bounds
104+
and field_id in eq_delete_file.upper_bounds
105+
and field_id in data_file.lower_bounds
106+
and field_id in data_file.upper_bounds
107+
):
108+
field_type = schema.find_type(field_id)
109+
if not field_type.is_primitive:
110+
continue
111+
112+
eq_lower = from_bytes(field_type, eq_delete_file.lower_bounds[field_id])
113+
eq_upper = from_bytes(field_type, eq_delete_file.upper_bounds[field_id])
114+
data_lower = from_bytes(field_type, data_file.lower_bounds[field_id])
115+
data_upper = from_bytes(field_type, data_file.upper_bounds[field_id])
116+
117+
if eq_upper < data_lower or eq_lower > data_upper:
118+
return False
119+
120+
return True
121+
122+
79123
def _referenced_data_file_path(delete_file: DataFile) -> str | None:
80124
"""Return the path, if the path bounds evaluate to the same location."""
81125
lower_bounds = delete_file.lower_bounds
@@ -103,45 +147,75 @@ def _partition_key(spec_id: int, partition: Record | None) -> tuple[int, Record]
103147

104148

105149
class DeleteFileIndex:
106-
"""Indexes position delete files by partition and by exact data file path."""
150+
"""Indexes position and equality delete files by partition and by exact data file path."""
107151

108-
def __init__(self) -> None:
152+
def __init__(self, schema: Schema | None = None) -> None:
153+
self._schema = schema
109154
self._by_partition: dict[tuple[int, Record], PositionDeletes] = {}
110155
self._by_path: dict[str, PositionDeletes] = {}
156+
self._eq_by_partition: dict[tuple[int, Record], EqualityDeletes] = {}
157+
self._global_eq_deletes: EqualityDeletes = EqualityDeletes()
111158

112159
def is_empty(self) -> bool:
113-
return not self._by_partition and not self._by_path
160+
return (
161+
not self._by_partition
162+
and not self._by_path
163+
and not self._eq_by_partition
164+
and not self._global_eq_deletes.referenced_delete_files()
165+
)
114166

115167
def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record | None = None) -> None:
116168
delete_file = manifest_entry.data_file
117169
seq = manifest_entry.sequence_number or INITIAL_SEQUENCE_NUMBER
118-
target_path = _referenced_data_file_path(delete_file)
119170

120-
if target_path:
121-
deletes = self._by_path.setdefault(target_path, PositionDeletes())
122-
deletes.add(delete_file, seq)
123-
else:
124-
key = _partition_key(delete_file.spec_id or 0, partition_key)
125-
deletes = self._by_partition.setdefault(key, PositionDeletes())
126-
deletes.add(delete_file, seq)
171+
if delete_file.content == DataFileContent.POSITION_DELETES:
172+
target_path = _referenced_data_file_path(delete_file)
173+
if target_path:
174+
deletes = self._by_path.setdefault(target_path, PositionDeletes())
175+
deletes.add(delete_file, seq)
176+
else:
177+
key = _partition_key(delete_file.spec_id or 0, partition_key)
178+
deletes = self._by_partition.setdefault(key, PositionDeletes())
179+
deletes.add(delete_file, seq)
180+
elif delete_file.content == DataFileContent.EQUALITY_DELETES:
181+
if partition_key is None or len(partition_key) == 0:
182+
self._global_eq_deletes.add(delete_file, seq)
183+
else:
184+
key = _partition_key(delete_file.spec_id or 0, partition_key)
185+
deletes = self._eq_by_partition.setdefault(key, EqualityDeletes())
186+
deletes.add(delete_file, seq)
127187

128188
def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record | None = None) -> set[DataFile]:
129189
if self.is_empty():
130190
return set()
131191

132192
deletes: set[DataFile] = set()
133193
spec_id = data_file.spec_id or 0
134-
135194
key = _partition_key(spec_id, partition_key)
136-
partition_deletes = self._by_partition.get(key)
137-
if partition_deletes:
138-
for delete_file in partition_deletes.filter_by_seq(seq_num):
195+
196+
# Add position deletes
197+
partition_pos_deletes = self._by_partition.get(key)
198+
if partition_pos_deletes:
199+
for delete_file in partition_pos_deletes.filter_by_seq(seq_num):
139200
if _applies_to_data_file(delete_file, data_file):
140201
deletes.add(delete_file)
141202

142-
path_deletes = self._by_path.get(data_file.file_path)
143-
if path_deletes:
144-
deletes.update(path_deletes.filter_by_seq(seq_num))
203+
path_pos_deletes = self._by_path.get(data_file.file_path)
204+
if path_pos_deletes:
205+
deletes.update(path_pos_deletes.filter_by_seq(seq_num))
206+
207+
# Add equality deletes
208+
candidate_eq_deletes: list[DataFile] = []
209+
partition_eq_deletes = self._eq_by_partition.get(key)
210+
if partition_eq_deletes:
211+
candidate_eq_deletes.extend(partition_eq_deletes.filter_by_seq(seq_num))
212+
213+
candidate_eq_deletes.extend(self._global_eq_deletes.filter_by_seq(seq_num))
214+
215+
for eq_delete_file in candidate_eq_deletes:
216+
if self._schema and not _eq_applies_to_data_file(eq_delete_file, data_file, self._schema):
217+
continue
218+
deletes.add(eq_delete_file)
145219

146220
return deletes
147221

@@ -154,4 +228,9 @@ def referenced_delete_files(self) -> list[DataFile]:
154228
for deletes in self._by_path.values():
155229
data_files.extend(deletes.referenced_delete_files())
156230

231+
for deletes in self._eq_by_partition.values():
232+
data_files.extend(deletes.referenced_delete_files())
233+
234+
data_files.extend(self._global_eq_deletes.referenced_delete_files())
235+
157236
return data_files

pyiceberg/table/update/validate.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,13 @@ def _added_delete_files(
245245
DeleteFileIndex
246246
"""
247247
if parent_snapshot is None or table.format_version < 2:
248-
return DeleteFileIndex()
248+
return DeleteFileIndex(table.schema())
249249

250250
manifests, snapshot_ids = _validation_history(
251251
table, parent_snapshot, starting_snapshot, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES
252252
)
253253

254-
dfi = DeleteFileIndex()
254+
dfi = DeleteFileIndex(table.schema())
255255

256256
for manifest in manifests:
257257
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=True):

tests/table/test_delete_file_index.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616
# under the License.
1717
import pytest
1818

19+
from pyiceberg.conversions import to_bytes
1920
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestEntry, ManifestEntryStatus
21+
from pyiceberg.schema import Schema
2022
from pyiceberg.table.delete_file_index import PATH_FIELD_ID, DeleteFileIndex, PositionDeletes
2123
from pyiceberg.typedef import Record
24+
from pyiceberg.types import IntegerType, NestedField
2225

2326

2427
def _create_data_file(file_path: str = "s3://bucket/data.parquet", spec_id: int = 0) -> DataFile:
@@ -187,3 +190,92 @@ def test_record_equality_for_partition_lookup() -> None:
187190

188191
assert len(index.for_data_file(1, data_file, partition_b)) == 1
189192
assert len(index.for_data_file(1, data_file, partition_c)) == 0
193+
194+
195+
def test_equality_delete_sequence_number_filtering() -> None:
196+
index = DeleteFileIndex()
197+
198+
# Equality delete with sequence number 2
199+
index.add_delete_file(_create_equality_delete(sequence_number=2))
200+
201+
data_file = _create_data_file()
202+
203+
# Data file with sequence number 1 should be affected by equality delete with sequence number 2
204+
assert len(index.for_data_file(1, data_file)) == 1
205+
206+
# Data file with sequence number 2 should NOT be affected by equality delete with sequence number 2
207+
# Equality deletes apply only to data files added in strictly earlier snapshots (seq - 1)
208+
assert len(index.for_data_file(2, data_file)) == 0
209+
210+
# Data file with sequence number 3 should NOT be affected
211+
assert len(index.for_data_file(3, data_file)) == 0
212+
213+
214+
def test_global_equality_deletes() -> None:
215+
index = DeleteFileIndex()
216+
217+
# Global equality delete (unpartitioned)
218+
index.add_delete_file(_create_equality_delete(sequence_number=10))
219+
220+
partition_1 = Record(1)
221+
partition_2 = Record(2)
222+
223+
# Partitioned equality delete for partition 1
224+
index.add_delete_file(_create_equality_delete(sequence_number=20), partition_1)
225+
226+
file_1 = _create_data_file(file_path="s3://bucket/file_1.parquet")
227+
file_2 = _create_data_file(file_path="s3://bucket/file_2.parquet")
228+
229+
# Partition 1 should have 2 equality deletes (1 global, 1 partitioned)
230+
assert len(index.for_data_file(1, file_1, partition_1)) == 2
231+
# Partition 2 should have 1 equality delete (1 global)
232+
assert len(index.for_data_file(1, file_2, partition_2)) == 1
233+
234+
235+
def test_equality_delete_metrics_filtering() -> None:
236+
schema = Schema(NestedField(1, "id", IntegerType(), required=True))
237+
index = DeleteFileIndex(schema=schema)
238+
239+
def _create_data_file_with_metrics(file_path: str, lower: int, upper: int) -> DataFile:
240+
data_file = DataFile.from_args(
241+
content=DataFileContent.DATA,
242+
file_path=file_path,
243+
file_format=FileFormat.PARQUET,
244+
partition=Record(),
245+
record_count=100,
246+
file_size_in_bytes=1000,
247+
lower_bounds={1: to_bytes(IntegerType(), lower)},
248+
upper_bounds={1: to_bytes(IntegerType(), upper)},
249+
)
250+
data_file._spec_id = 0
251+
return data_file
252+
253+
def _create_equality_delete_with_metrics(sequence_number: int, lower: int, upper: int) -> ManifestEntry:
254+
delete_file = DataFile.from_args(
255+
content=DataFileContent.EQUALITY_DELETES,
256+
file_path=f"s3://bucket/eq-delete-{sequence_number}.parquet",
257+
file_format=FileFormat.PARQUET,
258+
partition=Record(),
259+
record_count=10,
260+
file_size_in_bytes=100,
261+
equality_ids=[1],
262+
lower_bounds={1: to_bytes(IntegerType(), lower)},
263+
upper_bounds={1: to_bytes(IntegerType(), upper)},
264+
)
265+
delete_file._spec_id = 0
266+
return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file)
267+
268+
# Equality delete for rows where id is between 10 and 20
269+
index.add_delete_file(_create_equality_delete_with_metrics(sequence_number=100, lower=10, upper=20))
270+
271+
# Data file with id between 0 and 5 (no overlap)
272+
file_no_overlap = _create_data_file_with_metrics("s3://bucket/no_overlap.parquet", 0, 5)
273+
assert len(index.for_data_file(1, file_no_overlap)) == 0
274+
275+
# Data file with id between 15 and 25 (overlap)
276+
file_overlap = _create_data_file_with_metrics("s3://bucket/overlap.parquet", 15, 25)
277+
assert len(index.for_data_file(1, file_overlap)) == 1
278+
279+
# Data file with id between 25 and 30 (no overlap)
280+
file_no_overlap_2 = _create_data_file_with_metrics("s3://bucket/no_overlap_2.parquet", 25, 30)
281+
assert len(index.for_data_file(1, file_no_overlap_2)) == 0

0 commit comments

Comments
 (0)