diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index cd0bf5ad92..af439d8de2 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -996,7 +996,17 @@ def merge( kept_rows = target_data.join(source_keys, keys=join_cols, join_type="left anti") rows_deleted = target_data.num_rows - kept_rows.num_rows - new_content = pa.concat_tables([kept_rows, df], promote_options="default") + # ``ArrowScan.to_table()`` can return text/binary columns as the + # ``large_*`` PyArrow variants (utf8 vs large_utf8, binary vs + # large_binary) while the user-provided ``df`` from pandas->arrow + # typically produces the non-large variants. Both sides are + # semantically the same Iceberg type; the difference is purely + # PyArrow physical encoding. ``promote_options="default"`` refuses + # to bridge that gap and the concat fails. Use ``"permissive"`` to + # match what ``pyiceberg/io/pyarrow.py:to_table()`` already does + # for the same reason ("different batches can use different + # schema's (due to large_ types)"). + new_content = pa.concat_tables([kept_rows, df], promote_options="permissive") # Step 4: Atomic single-snapshot commit. # Delete old files, append rewritten content. diff --git a/tests/table/test_merge.py b/tests/table/test_merge.py index a656771aac..a0fc2af303 100644 --- a/tests/table/test_merge.py +++ b/tests/table/test_merge.py @@ -696,6 +696,166 @@ def test_merge_schema_preserved_after_anti_join(catalog: Catalog) -> None: assert result.column("score").to_pylist() == [150, 250] +# ==================== ARROW TYPE-WIDTH DRIFT (string vs large_string) ==================== + + +def test_merge_handles_large_string_kept_rows_with_string_source(catalog: Catalog) -> None: + """Regression: kept_rows from ArrowScan can come back as ``large_string`` + while a pandas->arrow source df typically produces ``string``. + + The two encodings are semantically the same Iceberg text type but are + physically distinct PyArrow types. ``pa.concat_tables(..., promote_options="default")`` + refuses to bridge them and merge() blows up with:: + + ArrowTypeError: Unable to merge: Field has incompatible types: + string vs large_string + + This is the exact failure observed against real Iceberg parquet tables + (where ArrowScan returns ``large_*``) when consolidating multiple feeds + into one table via composite merge keys. + + The fix in ``Transaction.merge()`` is to use ``promote_options="permissive"`` + (matching what ``pyiceberg/io/pyarrow.py`` already does for the same + reason). This test pins that behavior. + """ + ident = "default.merge_large_string_drift" + _drop(catalog, ident) + tbl = catalog.create_table(ident, schema=SCHEMA) + + # Write the initial rows with ``large_string`` to simulate what + # ``ArrowScan.to_table()`` returns from a real Iceberg parquet read. + large_string_arrow = pa.schema( + [ + pa.field("user_id", pa.int32(), nullable=False), + pa.field("name", pa.large_string(), nullable=False), + pa.field("score", pa.int32(), nullable=False), + ] + ) + tbl.append( + pa.Table.from_pylist( + [ + {"user_id": 1, "name": "Alice", "score": 100}, + {"user_id": 2, "name": "Bob", "score": 200}, + ], + schema=large_string_arrow, + ) + ) + + # New batch using plain ``string`` (the pandas->arrow default for + # ``pd.StringDtype()`` columns). + assert ARROW.field("name").type == pa.string() + tbl.merge( + pa.Table.from_pylist( + [ + {"user_id": 1, "name": "Alice", "score": 150}, + {"user_id": 3, "name": "Charlie", "score": 300}, + ], + schema=ARROW, + ), + join_cols=["user_id"], + ) + + result = tbl.scan().to_arrow().sort_by("user_id") + assert result.num_rows == 3 + assert result.column("score").to_pylist() == [150, 200, 300] + assert result.column("name").to_pylist() == ["Alice", "Bob", "Charlie"] + + +def test_merge_handles_string_kept_rows_with_large_string_source(catalog: Catalog) -> None: + """Mirror of the previous test - target is ``string``, source is + ``large_string``. Both directions must be tolerated since the + drift can flip either way depending on PyArrow version, file + metadata, and the ``PYARROW_USE_LARGE_TYPES_ON_READ`` setting. + """ + ident = "default.merge_large_string_reverse" + _drop(catalog, ident) + tbl = catalog.create_table(ident, schema=SCHEMA) + + tbl.append( + pa.Table.from_pylist( + [ + {"user_id": 1, "name": "Alice", "score": 100}, + {"user_id": 2, "name": "Bob", "score": 200}, + ], + schema=ARROW, # plain string + ) + ) + + large_string_arrow = pa.schema( + [ + pa.field("user_id", pa.int32(), nullable=False), + pa.field("name", pa.large_string(), nullable=False), + pa.field("score", pa.int32(), nullable=False), + ] + ) + tbl.merge( + pa.Table.from_pylist( + [ + {"user_id": 1, "name": "Alice", "score": 150}, + ], + schema=large_string_arrow, + ), + join_cols=["user_id"], + ) + + result = tbl.scan().to_arrow().sort_by("user_id") + assert result.num_rows == 2 + assert result.column("score").to_pylist() == [150, 200] + + +def test_merge_handles_large_binary_kept_rows_with_binary_source(catalog: Catalog) -> None: + """Same drift family for binary columns: PyIceberg's ``ArrowScan`` can + return ``large_binary`` while user-provided sources typically have + ``binary``. ``"permissive"`` promote bridges these too. + """ + from pyiceberg.types import BinaryType + + binary_schema = Schema( + NestedField(1, "user_id", IntegerType(), required=True), + NestedField(2, "payload", BinaryType(), required=True), + ) + ident = "default.merge_large_binary_drift" + _drop(catalog, ident) + tbl = catalog.create_table(ident, schema=binary_schema) + + large_binary_arrow = pa.schema( + [ + pa.field("user_id", pa.int32(), nullable=False), + pa.field("payload", pa.large_binary(), nullable=False), + ] + ) + tbl.append( + pa.Table.from_pylist( + [ + {"user_id": 1, "payload": b"alpha"}, + {"user_id": 2, "payload": b"bravo"}, + ], + schema=large_binary_arrow, + ) + ) + + binary_arrow = pa.schema( + [ + pa.field("user_id", pa.int32(), nullable=False), + pa.field("payload", pa.binary(), nullable=False), + ] + ) + tbl.merge( + pa.Table.from_pylist( + [ + {"user_id": 1, "payload": b"alpha-updated"}, + {"user_id": 3, "payload": b"charlie"}, + ], + schema=binary_arrow, + ), + join_cols=["user_id"], + ) + + result = tbl.scan().to_arrow().sort_by("user_id") + assert result.num_rows == 3 + assert result.column("payload").to_pylist() == [b"alpha-updated", b"bravo", b"charlie"] + + # ==================== MERGE RESULT ====================