Skip to content

Commit ea1e66a

Browse files
committed
Add referenced files DFI, validate_no_new_added_delete_files, _validate_no_new_deletes_for_data_file
1 parent 89a129c commit ea1e66a

File tree

3 files changed

+266
-2
lines changed

3 files changed

+266
-2
lines changed

pyiceberg/table/delete_file_index.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ def filter_by_seq(self, seq: int) -> list[DataFile]:
5454
start_idx = bisect_left(self._seqs, seq)
5555
return [delete_file for delete_file, _ in self._files[start_idx:]]
5656

57+
def referenced_delete_files(self) -> list[DataFile]:
58+
self._ensure_indexed()
59+
return [data_file for data_file, _ in self._files]
60+
5761

5862
def _has_path_bounds(delete_file: DataFile) -> bool:
5963
lower = delete_file.lower_bounds
@@ -140,3 +144,14 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record
140144
deletes.update(path_deletes.filter_by_seq(seq_num))
141145

142146
return deletes
147+
148+
def referenced_data_files(self) -> list[DataFile]:
149+
data_files: list[DataFile] = []
150+
151+
for deletes in self._by_partition.values():
152+
data_files.extend(deletes.referenced_delete_files())
153+
154+
for deletes in self._by_path.values():
155+
data_files.extend(deletes.referenced_delete_files())
156+
157+
return data_files

pyiceberg/table/update/validate.py

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,23 @@
1919
from pyiceberg.exceptions import ValidationException
2020
from pyiceberg.expressions import BooleanExpression
2121
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator
22-
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
22+
from pyiceberg.manifest import (
23+
INITIAL_SEQUENCE_NUMBER,
24+
DataFile,
25+
ManifestContent,
26+
ManifestEntry,
27+
ManifestEntryStatus,
28+
ManifestFile,
29+
)
2330
from pyiceberg.schema import Schema
2431
from pyiceberg.table import Table
32+
from pyiceberg.table.delete_file_index import DeleteFileIndex
2533
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
2634
from pyiceberg.typedef import Record
2735

2836
VALIDATE_DATA_FILES_EXIST_OPERATIONS: set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
2937
VALIDATE_ADDED_DATA_FILES_OPERATIONS: set[Operation] = {Operation.APPEND, Operation.OVERWRITE}
38+
VALIDATE_ADDED_DELETE_FILES_OPERATIONS: set[Operation] = {Operation.DELETE, Operation.OVERWRITE}
3039

3140

3241
def _validation_history(
@@ -216,6 +225,61 @@ def _added_data_files(
216225
yield entry
217226

218227

228+
def _added_delete_files(
229+
table: Table,
230+
starting_snapshot: Snapshot,
231+
data_filter: BooleanExpression | None,
232+
partition_set: dict[int, set[Record]] | None,
233+
parent_snapshot: Snapshot | None,
234+
) -> DeleteFileIndex:
235+
"""Return matching delete files that have been added to the table since a starting snapshot.
236+
237+
Args:
238+
table: Table to get the history from
239+
starting_snapshot: Starting snapshot to get the history from
240+
data_filter: Optional filter to match data files
241+
partition_set: Optional set of partitions to match data files
242+
parent_snapshot: Parent snapshot to get the history from
243+
244+
Returns:
245+
DeleteFileIndex
246+
"""
247+
if parent_snapshot is None or table.format_version < 2:
248+
return DeleteFileIndex()
249+
250+
manifests, snapshot_ids = _validation_history(
251+
table, parent_snapshot, starting_snapshot, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES
252+
)
253+
254+
dfi = DeleteFileIndex()
255+
256+
for manifest in manifests:
257+
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
258+
if _filter_manifest_entries(
259+
entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.ADDED, table.schema()
260+
):
261+
dfi.add_delete_file(entry, entry.data_file.partition)
262+
263+
return dfi
264+
265+
266+
def _starting_sequence_number(table: Table, starting_snapshot: Snapshot | None) -> int:
267+
"""Find the starting sequence number from a snapshot.
268+
269+
Args:
270+
table: Table to find snapshot from
271+
starting_snapshot: Snapshot from where to start looking
272+
273+
Returns
274+
Sequence number as int
275+
"""
276+
if starting_snapshot is not None:
277+
if snapshot := table.snapshot_by_id(starting_snapshot.snapshot_id):
278+
if seq := snapshot.sequence_number:
279+
return seq
280+
return INITIAL_SEQUENCE_NUMBER
281+
282+
219283
def _validate_added_data_files(
220284
table: Table,
221285
starting_snapshot: Snapshot,
@@ -235,3 +299,52 @@ def _validate_added_data_files(
235299
if any(conflicting_entries):
236300
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None}
237301
raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!")
302+
303+
304+
def _validate_no_new_delete_files(
305+
table: Table,
306+
starting_snapshot: Snapshot,
307+
data_filter: BooleanExpression | None,
308+
partition_set: dict[int, set[Record]] | None,
309+
parent_snapshot: Snapshot | None,
310+
) -> None:
311+
"""Validate no new delete files matching a filter have been added to the table since starting a snapshot.
312+
313+
Args:
314+
table: Table to validate
315+
starting_snapshot: Snapshot current at the start of the operation
316+
data_filter: Expression used to find added data files
317+
partition_set: Dictionary of partition spec to set of partition records
318+
parent_snapshot: Ending snapshot on the branch being validated
319+
"""
320+
deletes = _added_delete_files(table, starting_snapshot, data_filter, partition_set, parent_snapshot)
321+
322+
if deletes.is_empty():
323+
return
324+
325+
conflicting_delete_files = deletes.referenced_data_files()
326+
raise ValidationException(
327+
f"Found new conflicting delete files that can apply to records matching {data_filter}: {conflicting_delete_files}"
328+
)
329+
330+
331+
def _validate_no_new_delete_files_for_data_files(
332+
table: Table,
333+
starting_snapshot: Snapshot,
334+
data_filter: BooleanExpression | None,
335+
data_files: set[DataFile],
336+
parent_snapshot: Snapshot | None,
337+
) -> None:
338+
# If there is no current state, or no files has been added
339+
if parent_snapshot is None or table.format_version < 2:
340+
return
341+
342+
deletes = _added_delete_files(table, starting_snapshot, data_filter, None, parent_snapshot)
343+
seq_num = _starting_sequence_number(table, starting_snapshot)
344+
345+
# Fail to any delete file found that applies to files written in or before the starting snapshot
346+
for data_file in data_files:
347+
delete_files = deletes.for_data_file(seq_num, data_file)
348+
349+
if len(delete_files) > 0:
350+
ValidationException(f"Cannot commit, found new delete for replace data file {data_file}")

tests/table/test_validate.py

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@
2222

2323
from pyiceberg.exceptions import ValidationException
2424
from pyiceberg.io import FileIO
25-
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
25+
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
2626
from pyiceberg.table import Table
2727
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
2828
from pyiceberg.table.update.validate import (
2929
_added_data_files,
30+
_added_delete_files,
3031
_deleted_data_files,
3132
_validate_added_data_files,
3233
_validate_deleted_data_files,
34+
_validate_no_new_delete_files,
3335
_validation_history,
3436
)
3537

@@ -350,3 +352,137 @@ class DummyEntry:
350352
data_filter=None,
351353
parent_snapshot=oldest_snapshot,
352354
)
355+
356+
357+
def test_validate_new_delete_files_raises_on_conflict(
358+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
359+
) -> None:
360+
table, _ = table_v2_with_extensive_snapshots_and_manifests
361+
oldest_snapshot = table.snapshots()[0]
362+
newest_snapshot = cast(Snapshot, table.current_snapshot())
363+
364+
with patch("pyiceberg.table.update.validate.DeleteFileIndex.is_empty", return_value=False):
365+
with pytest.raises(ValidationException):
366+
_validate_no_new_delete_files(
367+
table=table,
368+
starting_snapshot=newest_snapshot,
369+
data_filter=None,
370+
partition_set=None,
371+
parent_snapshot=oldest_snapshot,
372+
)
373+
374+
375+
@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.REPLACE])
376+
def test_validate_added_delete_files_non_conflicting_count(
377+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
378+
operation: Operation,
379+
) -> None:
380+
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
381+
382+
snapshot_history = 100
383+
snapshots = table.snapshots()
384+
for i in range(1, snapshot_history + 1):
385+
altered_snapshot = snapshots[-i]
386+
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
387+
snapshots[-i] = altered_snapshot
388+
389+
table.metadata = table.metadata.model_copy(
390+
update={"snapshots": snapshots},
391+
)
392+
393+
oldest_snapshot = table.snapshots()[-snapshot_history]
394+
newest_snapshot = cast(Snapshot, table.current_snapshot())
395+
396+
def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
397+
"""Mock the manifests method to use the snapshot_id for lookup."""
398+
snapshot_id = self.snapshot_id
399+
if snapshot_id in mock_manifests:
400+
return mock_manifests[snapshot_id]
401+
return []
402+
403+
def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
404+
return [
405+
ManifestEntry.from_args(
406+
status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.sequence_number
407+
)
408+
]
409+
410+
with (
411+
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
412+
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
413+
):
414+
dfi = _added_delete_files(
415+
table=table,
416+
starting_snapshot=newest_snapshot,
417+
data_filter=None,
418+
parent_snapshot=oldest_snapshot,
419+
partition_set=None,
420+
)
421+
422+
assert dfi.is_empty()
423+
assert len(dfi.referenced_data_files()) == 0
424+
425+
426+
@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.OVERWRITE])
427+
def test_validate_added_delete_files_conflicting_count(
428+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
429+
operation: Operation,
430+
) -> None:
431+
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
432+
433+
snapshot_history = 100
434+
snapshots = table.snapshots()
435+
for i in range(1, snapshot_history + 1):
436+
altered_snapshot = snapshots[-i]
437+
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
438+
snapshots[-i] = altered_snapshot
439+
440+
table.metadata = table.metadata.model_copy(
441+
update={"snapshots": snapshots},
442+
)
443+
444+
oldest_snapshot = table.snapshots()[-snapshot_history]
445+
newest_snapshot = cast(Snapshot, table.current_snapshot())
446+
447+
mock_delete_file = DataFile.from_args(
448+
content=DataFileContent.POSITION_DELETES,
449+
file_path="s3://dummy/path",
450+
)
451+
452+
def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
453+
"""Mock the manifests method to use the snapshot_id for lookup."""
454+
snapshot_id = self.snapshot_id
455+
if snapshot_id in mock_manifests:
456+
return mock_manifests[snapshot_id]
457+
return []
458+
459+
def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
460+
result = [
461+
ManifestEntry.from_args(
462+
status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.min_sequence_number
463+
)
464+
]
465+
466+
result[-1] = ManifestEntry.from_args(
467+
status=ManifestEntryStatus.ADDED,
468+
snapshot_id=self.added_snapshot_id,
469+
sequence_number=10000,
470+
data_file=mock_delete_file,
471+
)
472+
473+
return result
474+
475+
with (
476+
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
477+
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
478+
):
479+
dfi = _added_delete_files(
480+
table=table,
481+
starting_snapshot=newest_snapshot,
482+
data_filter=None,
483+
parent_snapshot=oldest_snapshot,
484+
partition_set=None,
485+
)
486+
487+
assert not dfi.is_empty()
488+
assert dfi.referenced_data_files()[0] == mock_delete_file

0 commit comments

Comments
 (0)