Skip to content

Commit 3a7413a

Browse files
committed
PR comments
1 parent 876de0c commit 3a7413a

2 files changed

Lines changed: 42 additions & 19 deletions

File tree

pyiceberg/table/delete_file_index.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -153,16 +153,10 @@ def __init__(self, schema: Schema | None = None) -> None:
153153
self._schema = schema
154154
self._by_partition: dict[tuple[int, Record], PositionDeletes] = {}
155155
self._by_path: dict[str, PositionDeletes] = {}
156-
self._eq_by_partition: dict[tuple[int, Record], EqualityDeletes] = {}
157-
self._global_eq_deletes: EqualityDeletes = EqualityDeletes()
156+
self._eq_deletes: dict[tuple[int, Record] | None, EqualityDeletes] = {}
158157

159158
def is_empty(self) -> bool:
160-
return (
161-
not self._by_partition
162-
and not self._by_path
163-
and not self._eq_by_partition
164-
and not self._global_eq_deletes.referenced_delete_files()
165-
)
159+
return not self._by_partition and not self._by_path and not self._eq_deletes
166160

167161
def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record | None = None) -> None:
168162
delete_file = manifest_entry.data_file
@@ -178,12 +172,9 @@ def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record |
178172
deletes = self._by_partition.setdefault(key, PositionDeletes())
179173
deletes.add(delete_file, seq)
180174
elif delete_file.content == DataFileContent.EQUALITY_DELETES:
181-
if partition_key is None or len(partition_key) == 0:
182-
self._global_eq_deletes.add(delete_file, seq)
183-
else:
184-
key = _partition_key(delete_file.spec_id or 0, partition_key)
185-
deletes = self._eq_by_partition.setdefault(key, EqualityDeletes())
186-
deletes.add(delete_file, seq)
175+
eq_key = _partition_key(delete_file.spec_id or 0, partition_key) if partition_key else None
176+
deletes = self._eq_deletes.setdefault(eq_key, EqualityDeletes())
177+
deletes.add(delete_file, seq)
187178

188179
def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record | None = None) -> set[DataFile]:
189180
if self.is_empty():
@@ -206,11 +197,13 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record
206197

207198
# Add equality deletes
208199
candidate_eq_deletes: list[DataFile] = []
209-
partition_eq_deletes = self._eq_by_partition.get(key)
200+
partition_eq_deletes = self._eq_deletes.get(key)
210201
if partition_eq_deletes:
211202
candidate_eq_deletes.extend(partition_eq_deletes.filter_by_seq(seq_num))
212203

213-
candidate_eq_deletes.extend(self._global_eq_deletes.filter_by_seq(seq_num))
204+
global_eq_deletes = self._eq_deletes.get(None)
205+
if global_eq_deletes:
206+
candidate_eq_deletes.extend(global_eq_deletes.filter_by_seq(seq_num))
214207

215208
for eq_delete_file in candidate_eq_deletes:
216209
if self._schema and not _eq_applies_to_data_file(eq_delete_file, data_file, self._schema):
@@ -228,9 +221,7 @@ def referenced_delete_files(self) -> list[DataFile]:
228221
for deletes in self._by_path.values():
229222
data_files.extend(deletes.referenced_delete_files())
230223

231-
for deletes in self._eq_by_partition.values():
224+
for deletes in self._eq_deletes.values():
232225
data_files.extend(deletes.referenced_delete_files())
233226

234-
data_files.extend(self._global_eq_deletes.referenced_delete_files())
235-
236227
return data_files

tests/table/test_delete_file_index.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,38 @@ def test_equality_delete_sequence_number_filtering() -> None:
239239
assert len(index.for_data_file(3, data_file)) == 0
240240

241241

242+
def test_equality_delete_sequence_number_unpartitioned() -> None:
243+
data_file = _create_data_file()
244+
245+
# Create both types of deletes at sequence number 10
246+
pos_delete = _create_positional_delete(sequence_number=10)
247+
eq_delete = _create_equality_delete(sequence_number=10)
248+
249+
index = DeleteFileIndex()
250+
index.add_delete_file(pos_delete)
251+
index.add_delete_file(eq_delete)
252+
253+
# Sequence 10
254+
deletes = index.for_data_file(10, data_file)
255+
assert len(deletes) == 1
256+
# Position deletes will apply (applies to seq <= 10)
257+
assert pos_delete.data_file in deletes
258+
# Equality deletes will not (applies to seq < 10)
259+
assert eq_delete.data_file not in deletes
260+
261+
# Sequence 9
262+
# Both deletes will apply.
263+
deletes = index.for_data_file(9, data_file)
264+
assert len(deletes) == 2
265+
assert pos_delete.data_file in deletes
266+
assert eq_delete.data_file in deletes
267+
268+
# At sequence 111
269+
# Neither should apply.
270+
deletes = index.for_data_file(11, data_file)
271+
assert len(deletes) == 0
272+
273+
242274
def test_global_equality_deletes() -> None:
243275
index = DeleteFileIndex()
244276

0 commit comments

Comments
 (0)