Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.utils.deprecated import deprecation_message

if TYPE_CHECKING:
from pyiceberg.table.metadata import TableMetadata
Expand Down Expand Up @@ -356,6 +357,11 @@ def update_snapshot_summaries(
raise ValueError(f"Operation not implemented: {summary.operation}")

if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
deprecation_message(
deprecated_in="0.10.0",
removed_in="0.11.0",
help_message="The truncate-full-table should be used.",
Comment thread
Fokko marked this conversation as resolved.
Outdated
)
summary = _truncate_table_summary(summary, previous_summary)

if not previous_summary:
Expand Down
1 change: 0 additions & 1 deletion pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
return update_snapshot_summaries(
summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
truncate_full_table=self._operation == Operation.OVERWRITE,
Comment thread
Fokko marked this conversation as resolved.
)

def _commit(self) -> UpdatesAndRequirements:
Expand Down
20 changes: 9 additions & 11 deletions tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,21 +467,19 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio
assert snapshots[2].summary == Summary(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i ran this locally. snapshot 0 has the following metadata, which is correct

{
    'added-data-files': '2',
    'added-files-size': '1490',
    'added-records': '5',
    'changed-partition-count': '2',
    'total-data-files': '2',
    'total-delete-files': '0',
    'total-equality-deletes': '0',
    'total-files-size': '1490',
    'total-position-deletes': '0',
    'total-records': '5'
}

but snapshot 1, DELETE op with the positional delete, has the following metadata,

{
    'added-delete-files': '1',
    'added-files-size': '1710',
    'added-position-delete-files': '1',
    'added-position-deletes': '1',
    'changed-partition-count': '1',
    'total-data-files': '2',
    'total-delete-files': '1',
    'total-equality-deletes': '0',
    'total-files-size': '3200',
    'total-position-deletes': '1',
    'total-records': '5'
}

everything looks right except for the total-records. we started off with 5 records, and the DELETE op removed 1 record. So total-records should be 4 here.
Is this a bug in the spark snapshot summary?

Copy link
Copy Markdown
Contributor Author

@Fokko Fokko Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Looking at it, snapshot summary 1 seems incorrect. Could you open up an issue on the Java side? It would be good to get some historical context around this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to reproduce it using #1926. I'll also open an issue on the java side

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apache/iceberg#12823 opened an issue on the java side

Operation.OVERWRITE,
Comment thread
kevinjqliu marked this conversation as resolved.
**{
"added-files-size": snapshots[2].summary["total-files-size"],
"added-data-files": "1",
"added-files-size": snapshots[2].summary["added-files-size"],
"added-records": "2",
"changed-partition-count": "1",
"total-files-size": snapshots[2].summary["total-files-size"],
"total-delete-files": "0",
"total-data-files": "1",
"total-position-deletes": "0",
"total-records": "2",
"total-equality-deletes": "0",
"deleted-data-files": "2",
"removed-delete-files": "1",
"deleted-records": "5",
"deleted-data-files": "1",
"deleted-records": "3",
"removed-files-size": snapshots[2].summary["removed-files-size"],
"removed-position-deletes": "1",
"total-data-files": "2",
"total-delete-files": "1",
"total-equality-deletes": "0",
"total-files-size": snapshots[2].summary["total-files-size"],
"total-position-deletes": "1",
"total-records": "4",
},
)

Expand Down
96 changes: 95 additions & 1 deletion tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
"total-records": "0",
}

# Overwrite
# Append
assert summaries[3] == {
"added-data-files": "1",
"added-files-size": str(file_size),
Expand All @@ -262,6 +262,100 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
}


@pytest.mark.integration
def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.test_summaries_partial_overwrite"
Comment thread
kevinjqliu marked this conversation as resolved.
TEST_DATA = {
"id": [1, 2, 3, 1, 1],
"name": ["AB", "CD", "EF", "CD", "EF"],
}
pa_schema = pa.schema(
[
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
]
)
arrow_table = pa.Table.from_pydict(TEST_DATA, schema=pa_schema)
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=pa_schema)
with tbl.update_spec() as txn:
txn.add_identity("id")
tbl.append(arrow_table)

# TODO: We might want to check why this ends up in 3 files
assert len(tbl.inspect.data_files()) == 3
Comment thread
Fokko marked this conversation as resolved.
Outdated

tbl.delete(delete_filter="id == 1 and name = 'AB'") # partial overwrite data from 1 data file

rows = spark.sql(
f"""
SELECT operation, summary
FROM {identifier}.snapshots
ORDER BY committed_at ASC
"""
).collect()

operations = [row.operation for row in rows]
assert operations == ["append", "overwrite"]

summaries = [row.summary for row in rows]

file_size = int(summaries[0]["added-files-size"])
assert file_size > 0

# APPEND
assert summaries[0] == {
"added-data-files": "3",
"added-files-size": "2570",
"added-records": "5",
"changed-partition-count": "3",
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2570",
"total-position-deletes": "0",
"total-records": "5",
}
# Java produces:
# {
# "added-data-files": "1",
# "added-files-size": "707",
# "added-records": "2",
# "app-id": "local-1743678304626",
# "changed-partition-count": "1",
# "deleted-data-files": "1",
# "deleted-records": "3",
# "engine-name": "spark",
# "engine-version": "3.5.5",
# "iceberg-version": "Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc996c3bad2afdcfd33d)",
# "removed-files-size": "693",
# "spark.app.id": "local-1743678304626",
# "total-data-files": "3",
# "total-delete-files": "0",
# "total-equality-deletes": "0",
# "total-files-size": "1993",
# "total-position-deletes": "0",
# "total-records": "4"
# }
files = tbl.inspect.data_files()
assert len(files) == 3
assert summaries[1] == {
"added-data-files": "1",
"added-files-size": "859",
"added-records": "2",
"changed-partition-count": "1",
"deleted-data-files": "1",
"deleted-records": "3",
"removed-files-size": "866",
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2563",
"total-position-deletes": "0",
"total-records": "4",
}
assert len(tbl.scan().to_pandas()) == 4


@pytest.mark.integration
def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_data_files"
Expand Down
20 changes: 6 additions & 14 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None:
"total-position-deletes": "1",
"total-records": "1",
},
truncate_full_table=True,
)

expected = {
Expand All @@ -299,18 +298,12 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None:
"added-files-size": "4",
"added-position-deletes": "5",
"added-records": "6",
"total-data-files": "1",
"total-records": "6",
"total-delete-files": "2",
"total-equality-deletes": "3",
"total-files-size": "4",
"total-position-deletes": "5",
"deleted-data-files": "1",
"removed-delete-files": "1",
"deleted-records": "1",
"removed-files-size": "1",
"removed-position-deletes": "1",
"removed-equality-deletes": "1",
"total-data-files": "2",
"total-delete-files": "3",
"total-records": "7",
"total-files-size": "5",
"total-position-deletes": "6",
"total-equality-deletes": "4",
}

assert actual.additional_properties == expected
Expand All @@ -337,7 +330,6 @@ def test_invalid_type() -> None:
},
),
previous_summary={"total-data-files": "abc"}, # should be a number
truncate_full_table=True,
)

assert "Could not parse summary property total-data-files to an int: abc" in str(e.value)