1717from __future__ import annotations
1818
1919from bisect import bisect_left
20+ from typing import TYPE_CHECKING
2021
22+ from pyiceberg .conversions import from_bytes
2123from pyiceberg .expressions import EqualTo
2224from pyiceberg .expressions .visitors import _InclusiveMetricsEvaluator
23- from pyiceberg .manifest import INITIAL_SEQUENCE_NUMBER , POSITIONAL_DELETE_SCHEMA , DataFile , ManifestEntry
25+ from pyiceberg .manifest import INITIAL_SEQUENCE_NUMBER , POSITIONAL_DELETE_SCHEMA , DataFile , DataFileContent , ManifestEntry
2426from pyiceberg .typedef import Record
2527
28+ if TYPE_CHECKING :
29+ from pyiceberg .schema import Schema
30+
2631PATH_FIELD_ID = 2147483546
2732
2833
@@ -59,6 +64,15 @@ def referenced_delete_files(self) -> list[DataFile]:
5964 return [data_file for data_file , _ in self ._files ]
6065
6166
67+ class EqualityDeletes (PositionDeletes ):
68+ """Collects equality delete files and indexes them by sequence number."""
69+
70+ def add (self , delete_file : DataFile , seq_num : int ) -> None :
71+ # Equality deletes are indexed by sequence number - 1 to ensure they only
72+ # apply to data files added in strictly earlier snapshots.
73+ super ().add (delete_file , seq_num - 1 )
74+
75+
6276def _has_path_bounds (delete_file : DataFile ) -> bool :
6377 lower = delete_file .lower_bounds
6478 upper = delete_file .upper_bounds
@@ -76,6 +90,36 @@ def _applies_to_data_file(delete_file: DataFile, data_file: DataFile) -> bool:
7690 return evaluator .eval (delete_file )
7791
7892
93+ def _eq_applies_to_data_file (eq_delete_file : DataFile , data_file : DataFile , schema : Schema ) -> bool :
94+ if not eq_delete_file .equality_ids :
95+ return True
96+
97+ for field_id in eq_delete_file .equality_ids :
98+ if (
99+ eq_delete_file .lower_bounds
100+ and eq_delete_file .upper_bounds
101+ and data_file .lower_bounds
102+ and data_file .upper_bounds
103+ and field_id in eq_delete_file .lower_bounds
104+ and field_id in eq_delete_file .upper_bounds
105+ and field_id in data_file .lower_bounds
106+ and field_id in data_file .upper_bounds
107+ ):
108+ field_type = schema .find_type (field_id )
109+ if not field_type .is_primitive :
110+ continue
111+
112+ eq_lower = from_bytes (field_type , eq_delete_file .lower_bounds [field_id ])
113+ eq_upper = from_bytes (field_type , eq_delete_file .upper_bounds [field_id ])
114+ data_lower = from_bytes (field_type , data_file .lower_bounds [field_id ])
115+ data_upper = from_bytes (field_type , data_file .upper_bounds [field_id ])
116+
117+ if eq_upper < data_lower or eq_lower > data_upper :
118+ return False
119+
120+ return True
121+
122+
79123def _referenced_data_file_path (delete_file : DataFile ) -> str | None :
80124 """Return the path, if the path bounds evaluate to the same location."""
81125 lower_bounds = delete_file .lower_bounds
@@ -103,45 +147,75 @@ def _partition_key(spec_id: int, partition: Record | None) -> tuple[int, Record]
103147
104148
105149class DeleteFileIndex :
106- """Indexes position delete files by partition and by exact data file path."""
150+ """Indexes position and equality delete files by partition and by exact data file path."""
107151
108- def __init__ (self ) -> None :
152+ def __init__ (self , schema : Schema | None = None ) -> None :
153+ self ._schema = schema
109154 self ._by_partition : dict [tuple [int , Record ], PositionDeletes ] = {}
110155 self ._by_path : dict [str , PositionDeletes ] = {}
156+ self ._eq_by_partition : dict [tuple [int , Record ], EqualityDeletes ] = {}
157+ self ._global_eq_deletes : EqualityDeletes = EqualityDeletes ()
111158
112159 def is_empty (self ) -> bool :
113- return not self ._by_partition and not self ._by_path
160+ return (
161+ not self ._by_partition
162+ and not self ._by_path
163+ and not self ._eq_by_partition
164+ and not self ._global_eq_deletes .referenced_delete_files ()
165+ )
114166
115167 def add_delete_file (self , manifest_entry : ManifestEntry , partition_key : Record | None = None ) -> None :
116168 delete_file = manifest_entry .data_file
117169 seq = manifest_entry .sequence_number or INITIAL_SEQUENCE_NUMBER
118- target_path = _referenced_data_file_path (delete_file )
119170
120- if target_path :
121- deletes = self ._by_path .setdefault (target_path , PositionDeletes ())
122- deletes .add (delete_file , seq )
123- else :
124- key = _partition_key (delete_file .spec_id or 0 , partition_key )
125- deletes = self ._by_partition .setdefault (key , PositionDeletes ())
126- deletes .add (delete_file , seq )
171+ if delete_file .content == DataFileContent .POSITION_DELETES :
172+ target_path = _referenced_data_file_path (delete_file )
173+ if target_path :
174+ deletes = self ._by_path .setdefault (target_path , PositionDeletes ())
175+ deletes .add (delete_file , seq )
176+ else :
177+ key = _partition_key (delete_file .spec_id or 0 , partition_key )
178+ deletes = self ._by_partition .setdefault (key , PositionDeletes ())
179+ deletes .add (delete_file , seq )
180+ elif delete_file .content == DataFileContent .EQUALITY_DELETES :
181+ if partition_key is None or len (partition_key ) == 0 :
182+ self ._global_eq_deletes .add (delete_file , seq )
183+ else :
184+ key = _partition_key (delete_file .spec_id or 0 , partition_key )
185+ deletes = self ._eq_by_partition .setdefault (key , EqualityDeletes ())
186+ deletes .add (delete_file , seq )
127187
128188 def for_data_file (self , seq_num : int , data_file : DataFile , partition_key : Record | None = None ) -> set [DataFile ]:
129189 if self .is_empty ():
130190 return set ()
131191
132192 deletes : set [DataFile ] = set ()
133193 spec_id = data_file .spec_id or 0
134-
135194 key = _partition_key (spec_id , partition_key )
136- partition_deletes = self ._by_partition .get (key )
137- if partition_deletes :
138- for delete_file in partition_deletes .filter_by_seq (seq_num ):
195+
196+ # Add position deletes
197+ partition_pos_deletes = self ._by_partition .get (key )
198+ if partition_pos_deletes :
199+ for delete_file in partition_pos_deletes .filter_by_seq (seq_num ):
139200 if _applies_to_data_file (delete_file , data_file ):
140201 deletes .add (delete_file )
141202
142- path_deletes = self ._by_path .get (data_file .file_path )
143- if path_deletes :
144- deletes .update (path_deletes .filter_by_seq (seq_num ))
203+ path_pos_deletes = self ._by_path .get (data_file .file_path )
204+ if path_pos_deletes :
205+ deletes .update (path_pos_deletes .filter_by_seq (seq_num ))
206+
207+ # Add equality deletes
208+ candidate_eq_deletes : list [DataFile ] = []
209+ partition_eq_deletes = self ._eq_by_partition .get (key )
210+ if partition_eq_deletes :
211+ candidate_eq_deletes .extend (partition_eq_deletes .filter_by_seq (seq_num ))
212+
213+ candidate_eq_deletes .extend (self ._global_eq_deletes .filter_by_seq (seq_num ))
214+
215+ for eq_delete_file in candidate_eq_deletes :
216+ if self ._schema and not _eq_applies_to_data_file (eq_delete_file , data_file , self ._schema ):
217+ continue
218+ deletes .add (eq_delete_file )
145219
146220 return deletes
147221
@@ -154,4 +228,9 @@ def referenced_delete_files(self) -> list[DataFile]:
154228 for deletes in self ._by_path .values ():
155229 data_files .extend (deletes .referenced_delete_files ())
156230
231+ for deletes in self ._eq_by_partition .values ():
232+ data_files .extend (deletes .referenced_delete_files ())
233+
234+ data_files .extend (self ._global_eq_deletes .referenced_delete_files ())
235+
157236 return data_files
0 commit comments