Skip to content

Commit 8c0f095

Browse files
committed
comments, thanks geruh
1 parent 32b2428 commit 8c0f095

File tree

3 files changed

+19
-26
lines changed

3 files changed

+19
-26
lines changed

pyiceberg/table/delete_file_index.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record
145145

146146
return deletes
147147

148-
def referenced_data_files(self) -> list[DataFile]:
148+
def referenced_delete_files(self) -> list[DataFile]:
149149
data_files: list[DataFile] = []
150150

151151
for deletes in self._by_partition.values():

pyiceberg/table/update/validate.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ def _added_delete_files(
254254
dfi = DeleteFileIndex()
255255

256256
for manifest in manifests:
257-
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
257+
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=True):
258258
if _filter_manifest_entries(
259259
entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.ADDED, table.schema()
260260
):
@@ -274,9 +274,8 @@ def _starting_sequence_number(table: Table, starting_snapshot: Snapshot | None)
274274
Sequence number as int
275275
"""
276276
if starting_snapshot is not None:
277-
if snapshot := table.snapshot_by_id(starting_snapshot.snapshot_id):
278-
if seq := snapshot.sequence_number:
279-
return seq
277+
if seq := starting_snapshot.sequence_number:
278+
return seq
280279
return INITIAL_SEQUENCE_NUMBER
281280

282281

@@ -322,13 +321,13 @@ def _validate_no_new_delete_files(
322321
if deletes.is_empty():
323322
return
324323

325-
conflicting_delete_files = deletes.referenced_data_files()
324+
conflicting_delete_paths = [file.file_path for file in deletes.referenced_delete_files()]
326325
raise ValidationException(
327-
f"Found new conflicting delete files that can apply to records matching {data_filter}: {conflicting_delete_files}"
326+
f"Found new conflicting delete files that can apply to records matching {data_filter}: {conflicting_delete_paths}"
328327
)
329328

330329

331-
def _validate_no_new_delete_files_for_data_files(
330+
def _validate_no_new_deletes_for_data_files(
332331
table: Table,
333332
starting_snapshot: Snapshot,
334333
data_filter: BooleanExpression | None,
@@ -353,6 +352,6 @@ def _validate_no_new_delete_files_for_data_files(
353352

354353
# Fail to any delete file found that applies to files written in or before the starting snapshot
355354
for data_file in data_files:
356-
delete_files = deletes.for_data_file(seq_num, data_file)
355+
delete_files = deletes.for_data_file(seq_num, data_file, data_file.partition)
357356
if len(delete_files) > 0:
358357
raise ValidationException(f"Cannot commit, found new delete for replace data file {data_file}")

tests/table/test_validate.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
_validate_added_data_files,
3333
_validate_deleted_data_files,
3434
_validate_no_new_delete_files,
35-
_validate_no_new_delete_files_for_data_files,
35+
_validate_no_new_deletes_for_data_files,
3636
_validation_history,
3737
)
3838

@@ -356,7 +356,7 @@ class DummyEntry:
356356

357357

358358
@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.REPLACE])
359-
def test_validate_added_delete_files_non_conflicting_count(
359+
def test_added_delete_files_non_conflicting_count(
360360
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
361361
operation: Operation,
362362
) -> None:
@@ -403,11 +403,11 @@ def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: b
403403
)
404404

405405
assert dfi.is_empty()
406-
assert len(dfi.referenced_data_files()) == 0
406+
assert len(dfi.referenced_delete_files()) == 0
407407

408408

409409
@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.OVERWRITE])
410-
def test_validate_added_delete_files_conflicting_count(
410+
def test_added_delete_files_conflicting_count(
411411
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
412412
operation: Operation,
413413
) -> None:
@@ -442,21 +442,15 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF
442442
return []
443443

444444
def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
445-
result = [
445+
return [
446446
ManifestEntry.from_args(
447-
status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.min_sequence_number
447+
status=ManifestEntryStatus.ADDED,
448+
snapshot_id=self.added_snapshot_id,
449+
sequence_number=self.min_sequence_number,
450+
data_file=mock_delete_file,
448451
)
449452
]
450453

451-
result[-1] = ManifestEntry.from_args(
452-
status=ManifestEntryStatus.ADDED,
453-
snapshot_id=self.added_snapshot_id,
454-
sequence_number=10000,
455-
data_file=mock_delete_file,
456-
)
457-
458-
return result
459-
460454
with (
461455
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
462456
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
@@ -470,7 +464,7 @@ def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: b
470464
)
471465

472466
assert not dfi.is_empty()
473-
assert dfi.referenced_data_files()[0] == mock_delete_file
467+
assert dfi.referenced_delete_files()[0] == mock_delete_file
474468

475469

476470
def test_validate_no_new_delete_files_raises_on_conflict(
@@ -502,7 +496,7 @@ def test_validate_no_new_delete_files_for_data_files_raises_on_conflict(
502496

503497
with patch("pyiceberg.table.update.validate.DeleteFileIndex.for_data_file", return_value=[mocked_data_file]):
504498
with pytest.raises(ValidationException):
505-
_validate_no_new_delete_files_for_data_files(
499+
_validate_no_new_deletes_for_data_files(
506500
table=table,
507501
starting_snapshot=newest_snapshot,
508502
data_filter=None,

0 commit comments

Comments
 (0)