Skip to content

Commit 153eac5

Browse files
committed
feat: Add DeleteFileIndex for Merge-on-Read file lookup
1 parent 8e4d424 commit 153eac5

File tree

4 files changed

+422
-100
lines changed

4 files changed

+422
-100
lines changed

pyiceberg/table/__init__.py

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
)
3434

3535
from pydantic import Field
36-
from sortedcontainers import SortedList
3736

3837
import pyiceberg.expressions.parser as parser
3938
from pyiceberg.expressions import (
@@ -56,7 +55,6 @@
5655
)
5756
from pyiceberg.io import FileIO, load_file_io
5857
from pyiceberg.manifest import (
59-
POSITIONAL_DELETE_SCHEMA,
6058
DataFile,
6159
DataFileContent,
6260
ManifestContent,
@@ -70,6 +68,7 @@
7068
PartitionSpec,
7169
)
7270
from pyiceberg.schema import Schema
71+
from pyiceberg.table.delete_file_index import DeleteFileIndex
7372
from pyiceberg.table.inspect import InspectTable
7473
from pyiceberg.table.locations import LocationProvider, load_location_provider
7574
from pyiceberg.table.maintenance import MaintenanceTable
@@ -1857,29 +1856,19 @@ def _min_sequence_number(manifests: list[ManifestFile]) -> int:
18571856
return INITIAL_SEQUENCE_NUMBER
18581857

18591858

1860-
def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_entries: SortedList[ManifestEntry]) -> set[DataFile]:
1861-
"""Check if the delete file is relevant for the data file.
1862-
1863-
Using the column metrics to see if the filename is in the lower and upper bound.
1859+
def _match_deletes_to_data_file(data_entry: ManifestEntry, delete_file_index: DeleteFileIndex) -> set[DataFile]:
1860+
"""Check if delete files are relevant for the data file.
18641861
18651862
Args:
1866-
data_entry (ManifestEntry): The manifest entry path of the datafile.
1867-
positional_delete_entries (List[ManifestEntry]): All the candidate positional deletes manifest entries.
1863+
data_entry (ManifestEntry): The manifest entry of the data file.
1864+
delete_file_index (DeleteFileIndex): Index containing all delete files.
18681865
18691866
Returns:
1870-
A set of files that are relevant for the data file.
1867+
A set of delete files that are relevant for the data file.
18711868
"""
1872-
relevant_entries = positional_delete_entries[positional_delete_entries.bisect_right(data_entry) :]
1873-
1874-
if len(relevant_entries) > 0:
1875-
evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path))
1876-
return {
1877-
positional_delete_entry.data_file
1878-
for positional_delete_entry in relevant_entries
1879-
if evaluator.eval(positional_delete_entry.data_file)
1880-
}
1881-
else:
1882-
return set()
1869+
return delete_file_index.for_data_file(
1870+
data_entry.sequence_number or 0, data_entry.data_file, partition_key=data_entry.data_file.partition
1871+
)
18831872

18841873

18851874
class DataScan(TableScan):
@@ -2005,7 +1994,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
20051994
List of FileScanTasks that contain both data and delete files.
20061995
"""
20071996
data_entries: list[ManifestEntry] = []
2008-
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)
1997+
delete_index = DeleteFileIndex()
20091998

20101999
residual_evaluators: dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)
20112000

@@ -2014,7 +2003,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
20142003
if data_file.content == DataFileContent.DATA:
20152004
data_entries.append(manifest_entry)
20162005
elif data_file.content == DataFileContent.POSITION_DELETES:
2017-
positional_delete_entries.add(manifest_entry)
2006+
delete_index.add_delete_file(manifest_entry, partition_key=data_file.partition)
20182007
elif data_file.content == DataFileContent.EQUALITY_DELETES:
20192008
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
20202009
else:
@@ -2025,7 +2014,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
20252014
data_entry.data_file,
20262015
delete_files=_match_deletes_to_data_file(
20272016
data_entry,
2028-
positional_delete_entries,
2017+
delete_index,
20292018
),
20302019
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
20312020
data_entry.data_file.partition
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from bisect import bisect_left
20+
from typing import Any
21+
22+
from pyiceberg.expressions import EqualTo
23+
from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator
24+
from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, ManifestEntry
25+
from pyiceberg.typedef import Record
26+
27+
PATH_FIELD_ID = 2147483546
28+
29+
30+
class PositionDeletes:
31+
"""Collects position delete files and indexes them by sequence number."""
32+
33+
__slots__ = ("_buffer", "_seqs", "_files")
34+
35+
def __init__(self) -> None:
36+
self._buffer: list[tuple[DataFile, int]] | None = []
37+
self._seqs: list[int] = []
38+
self._files: list[tuple[DataFile, int]] = []
39+
40+
def add(self, delete_file: DataFile, seq_num: int) -> None:
41+
if self._buffer is None:
42+
raise ValueError("Cannot add files after indexing")
43+
self._buffer.append((delete_file, seq_num))
44+
45+
def _ensure_indexed(self) -> None:
46+
if self._buffer is not None:
47+
self._files = sorted(self._buffer, key=lambda file: file[1])
48+
self._seqs = [seq for _, seq in self._files]
49+
self._buffer = None
50+
51+
def filter_by_seq(self, seq: int) -> list[DataFile]:
52+
self._ensure_indexed()
53+
if not self._files:
54+
return []
55+
start_idx = bisect_left(self._seqs, seq)
56+
return [delete_file for delete_file, _ in self._files[start_idx:]]
57+
58+
59+
def _has_path_bounds(delete_file: DataFile) -> bool:
60+
lower = delete_file.lower_bounds
61+
upper = delete_file.upper_bounds
62+
if not lower or not upper:
63+
return False
64+
65+
return PATH_FIELD_ID in lower and PATH_FIELD_ID in upper
66+
67+
68+
def _applies_to_data_file(delete_file: DataFile, data_file: DataFile) -> bool:
69+
if not _has_path_bounds(delete_file):
70+
return True
71+
72+
evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_file.file_path))
73+
return evaluator.eval(delete_file)
74+
75+
76+
def _referenced_data_file_path(delete_file: DataFile) -> str | None:
77+
"""Return the path, if the path bounds evaluate to the same location."""
78+
lower_bounds = delete_file.lower_bounds
79+
upper_bounds = delete_file.upper_bounds
80+
81+
if not lower_bounds or not upper_bounds:
82+
return None
83+
84+
lower = lower_bounds.get(PATH_FIELD_ID)
85+
upper = upper_bounds.get(PATH_FIELD_ID)
86+
87+
if lower and upper and lower == upper:
88+
try:
89+
return lower.decode("utf-8")
90+
except (UnicodeDecodeError, AttributeError):
91+
pass
92+
93+
return None
94+
95+
96+
def _partition_key(spec_id: int, partition: Record | None) -> tuple[int, tuple[Any, ...]]:
97+
if partition:
98+
return spec_id, tuple(partition._data)
99+
return spec_id, () # unpartitioned handling
100+
101+
102+
class DeleteFileIndex:
103+
"""Indexes position delete files by partition and by exact data file path."""
104+
105+
def __init__(self) -> None:
106+
self._by_partition: dict[tuple[int, tuple[Any, ...]], PositionDeletes] = {}
107+
self._by_path: dict[str, PositionDeletes] = {}
108+
109+
def is_empty(self) -> bool:
110+
return not self._by_partition and not self._by_path
111+
112+
def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record | None = None) -> None:
113+
delete_file = manifest_entry.data_file
114+
seq = manifest_entry.sequence_number or INITIAL_SEQUENCE_NUMBER
115+
target_path = _referenced_data_file_path(delete_file)
116+
117+
if target_path:
118+
deletes = self._by_path.setdefault(target_path, PositionDeletes())
119+
deletes.add(delete_file, seq)
120+
else:
121+
key = _partition_key(delete_file.spec_id or 0, partition_key)
122+
deletes = self._by_partition.setdefault(key, PositionDeletes())
123+
deletes.add(delete_file, seq)
124+
125+
def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record | None = None) -> set[DataFile]:
126+
if self.is_empty():
127+
return set()
128+
129+
deletes: set[DataFile] = set()
130+
spec_id = data_file.spec_id or 0
131+
132+
key = _partition_key(spec_id, partition_key)
133+
partition_deletes = self._by_partition.get(key)
134+
if partition_deletes:
135+
for delete_file in partition_deletes.filter_by_seq(seq_num):
136+
if _applies_to_data_file(delete_file, data_file):
137+
deletes.add(delete_file)
138+
139+
path_deletes = self._by_path.get(data_file.file_path)
140+
if path_deletes:
141+
deletes.update(path_deletes.filter_by_seq(seq_num))
142+
143+
return deletes

0 commit comments

Comments
 (0)