Skip to content

Commit 5f7d79d

Browse files
committed
address comments
1 parent 153eac5 commit 5f7d79d

File tree

2 files changed

+4
-135
lines changed

2 files changed

+4
-135
lines changed

pyiceberg/table/__init__.py

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1856,21 +1856,6 @@ def _min_sequence_number(manifests: list[ManifestFile]) -> int:
18561856
return INITIAL_SEQUENCE_NUMBER
18571857

18581858

1859-
def _match_deletes_to_data_file(data_entry: ManifestEntry, delete_file_index: DeleteFileIndex) -> set[DataFile]:
1860-
"""Check if delete files are relevant for the data file.
1861-
1862-
Args:
1863-
data_entry (ManifestEntry): The manifest entry of the data file.
1864-
delete_file_index (DeleteFileIndex): Index containing all delete files.
1865-
1866-
Returns:
1867-
A set of delete files that are relevant for the data file.
1868-
"""
1869-
return delete_file_index.for_data_file(
1870-
data_entry.sequence_number or 0, data_entry.data_file, partition_key=data_entry.data_file.partition
1871-
)
1872-
1873-
18741859
class DataScan(TableScan):
18751860
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
18761861
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
@@ -2008,13 +1993,13 @@ def plan_files(self) -> Iterable[FileScanTask]:
20081993
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
20091994
else:
20101995
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")
2011-
20121996
return [
20131997
FileScanTask(
20141998
data_entry.data_file,
2015-
delete_files=_match_deletes_to_data_file(
2016-
data_entry,
2017-
delete_index,
1999+
delete_files=delete_index.for_data_file(
2000+
data_entry.sequence_number or INITIAL_SEQUENCE_NUMBER,
2001+
data_entry.data_file,
2002+
partition_key=data_entry.data_file.partition,
20182003
),
20192004
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
20202005
data_entry.data_file.partition

tests/table/test_init.py

Lines changed: 0 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,14 @@
3232
In,
3333
)
3434
from pyiceberg.io import PY_IO_IMPL, load_file_io
35-
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestEntry, ManifestEntryStatus
3635
from pyiceberg.partitioning import PartitionField, PartitionSpec
3736
from pyiceberg.schema import Schema
3837
from pyiceberg.table import (
3938
CommitTableRequest,
4039
StaticTable,
4140
Table,
4241
TableIdentifier,
43-
_match_deletes_to_data_file,
4442
)
45-
from pyiceberg.table.delete_file_index import DeleteFileIndex
4643
from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV2, _generate_snapshot_id
4744
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
4845
from pyiceberg.table.snapshots import (
@@ -371,119 +368,6 @@ def test_static_table_io_does_not_exist(metadata_location: str) -> None:
371368
StaticTable.from_metadata(metadata_location, {PY_IO_IMPL: "pyiceberg.does.not.exist.FileIO"})
372369

373370

374-
def test_match_deletes_to_datafile() -> None:
375-
data_file = DataFile.from_args(
376-
content=DataFileContent.DATA,
377-
file_path="s3://bucket/0000.parquet",
378-
file_format=FileFormat.PARQUET,
379-
partition={},
380-
record_count=3,
381-
file_size_in_bytes=3,
382-
)
383-
data_file._spec_id = 0
384-
data_entry = ManifestEntry.from_args(
385-
status=ManifestEntryStatus.ADDED,
386-
sequence_number=1,
387-
data_file=data_file,
388-
)
389-
delete_file_1 = DataFile.from_args(
390-
content=DataFileContent.POSITION_DELETES,
391-
file_path="s3://bucket/0001-delete.parquet",
392-
file_format=FileFormat.PARQUET,
393-
partition={},
394-
record_count=3,
395-
file_size_in_bytes=3,
396-
)
397-
delete_file_1._spec_id = 0
398-
delete_entry_1 = ManifestEntry.from_args(
399-
status=ManifestEntryStatus.ADDED,
400-
sequence_number=0, # Older than the data
401-
data_file=delete_file_1,
402-
)
403-
delete_file_2 = DataFile.from_args(
404-
content=DataFileContent.POSITION_DELETES,
405-
file_path="s3://bucket/0002-delete.parquet",
406-
file_format=FileFormat.PARQUET,
407-
partition={},
408-
record_count=3,
409-
file_size_in_bytes=3,
410-
)
411-
delete_file_2._spec_id = 0
412-
delete_entry_2 = ManifestEntry.from_args(
413-
status=ManifestEntryStatus.ADDED,
414-
sequence_number=3,
415-
data_file=delete_file_2,
416-
)
417-
418-
delete_file_index = DeleteFileIndex()
419-
delete_file_index.add_delete_file(delete_entry_1)
420-
delete_file_index.add_delete_file(delete_entry_2)
421-
422-
assert _match_deletes_to_data_file(
423-
data_entry,
424-
delete_file_index,
425-
) == {
426-
delete_entry_2.data_file,
427-
}
428-
429-
430-
def test_match_deletes_to_datafile_duplicate_number() -> None:
431-
data_file = DataFile.from_args(
432-
content=DataFileContent.DATA,
433-
file_path="s3://bucket/0000.parquet",
434-
file_format=FileFormat.PARQUET,
435-
partition={},
436-
record_count=3,
437-
file_size_in_bytes=3,
438-
)
439-
data_file._spec_id = 0
440-
data_entry = ManifestEntry.from_args(
441-
status=ManifestEntryStatus.ADDED,
442-
sequence_number=1,
443-
data_file=data_file,
444-
)
445-
delete_file_1 = DataFile.from_args(
446-
content=DataFileContent.POSITION_DELETES,
447-
file_path="s3://bucket/0001-delete.parquet",
448-
file_format=FileFormat.PARQUET,
449-
partition={},
450-
record_count=3,
451-
file_size_in_bytes=3,
452-
)
453-
delete_file_1._spec_id = 0
454-
delete_entry_1 = ManifestEntry.from_args(
455-
status=ManifestEntryStatus.ADDED,
456-
sequence_number=3,
457-
data_file=delete_file_1,
458-
)
459-
delete_file_2 = DataFile.from_args(
460-
content=DataFileContent.POSITION_DELETES,
461-
file_path="s3://bucket/0002-delete.parquet",
462-
file_format=FileFormat.PARQUET,
463-
partition={},
464-
record_count=3,
465-
file_size_in_bytes=3,
466-
)
467-
delete_file_2._spec_id = 0
468-
delete_entry_2 = ManifestEntry.from_args(
469-
status=ManifestEntryStatus.ADDED,
470-
sequence_number=3,
471-
data_file=delete_file_2,
472-
)
473-
474-
delete_file_index = DeleteFileIndex()
475-
delete_file_index.add_delete_file(delete_entry_1)
476-
delete_file_index.add_delete_file(delete_entry_2)
477-
478-
assert _match_deletes_to_data_file(
479-
data_entry,
480-
delete_file_index,
481-
) == {
482-
delete_entry_1.data_file,
483-
delete_entry_2.data_file,
484-
}
485-
486-
487371
def test_serialize_set_properties_updates() -> None:
488372
assert (
489373
SetPropertiesUpdate(updates={"abc": "🤪"}).model_dump_json() == """{"action":"set-properties","updates":{"abc":"🤪"}}"""

0 commit comments

Comments
 (0)