Skip to content

Commit dc272e6

Browse files
AyushPatel101Ayush Patel
andauthored
fix: Use permissive promote in merge() concat for large_string drift (#10)
`Transaction.merge()` calls `pa.concat_tables([kept_rows, df], promote_options="default")` to combine the kept-after-anti-join rows with the user-supplied source df. `ArrowScan.to_table()` can return text and binary columns as the `large_*` PyArrow variants (`large_utf8`, `large_binary`), while user-supplied dfs - in particular ones constructed via pandas->arrow with `dtype_backend="numpy_nullable"` - typically use the non-large variants. Both sides are semantically the same Iceberg type; the difference is purely physical encoding width. `promote_options="default"` refuses to bridge `utf8 <-> large_utf8` / `binary <-> large_binary` and the merge fails with: pyarrow.lib.ArrowTypeError: Unable to merge: Field <name> has incompatible types: string vs large_string This is the exact failure observed against real Iceberg parquet tables when consolidating multiple feeds into one table via composite merge keys. Switch to `promote_options="permissive"` to bridge the gap. This matches what `pyiceberg/io/pyarrow.py:to_table()` already does for the same reason - the comment there reads: # Note: cannot use pa.Table.from_batches(itertools.chain([first_batch], batches))) # as different batches can use different schema's (due to large_ types) Adds three regression tests covering both directions of the string drift and the binary equivalent. The existing test fixtures all share a single `ARROW` schema with `pa.string()` on both sides of every merge, so no test ever exercised the cross-encoding path that triggers the failure. Co-authored-by: Ayush Patel <Ayush.Patel@imc.com>
1 parent d5a5a38 commit dc272e6

2 files changed

Lines changed: 171 additions & 1 deletion

File tree

pyiceberg/table/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -996,7 +996,17 @@ def merge(
996996

997997
kept_rows = target_data.join(source_keys, keys=join_cols, join_type="left anti")
998998
rows_deleted = target_data.num_rows - kept_rows.num_rows
999-
new_content = pa.concat_tables([kept_rows, df], promote_options="default")
999+
# ``ArrowScan.to_table()`` can return text/binary columns as the
1000+
# ``large_*`` PyArrow variants (utf8 vs large_utf8, binary vs
1001+
# large_binary) while the user-provided ``df`` from pandas->arrow
1002+
# typically produces the non-large variants. Both sides are
1003+
# semantically the same Iceberg type; the difference is purely
1004+
# PyArrow physical encoding. ``promote_options="default"`` refuses
1005+
# to bridge that gap and the concat fails. Use ``"permissive"`` to
1006+
# match what ``pyiceberg/io/pyarrow.py:to_table()`` already does
1007+
# for the same reason ("different batches can use different
1008+
# schema's (due to large_ types)").
1009+
new_content = pa.concat_tables([kept_rows, df], promote_options="permissive")
10001010

10011011
# Step 4: Atomic single-snapshot commit.
10021012
# Delete old files, append rewritten content.

tests/table/test_merge.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,166 @@ def test_merge_schema_preserved_after_anti_join(catalog: Catalog) -> None:
696696
assert result.column("score").to_pylist() == [150, 250]
697697

698698

699+
# ==================== ARROW TYPE-WIDTH DRIFT (string vs large_string) ====================
700+
701+
702+
def test_merge_handles_large_string_kept_rows_with_string_source(catalog: Catalog) -> None:
703+
"""Regression: kept_rows from ArrowScan can come back as ``large_string``
704+
while a pandas->arrow source df typically produces ``string``.
705+
706+
The two encodings are semantically the same Iceberg text type but are
707+
physically distinct PyArrow types. ``pa.concat_tables(..., promote_options="default")``
708+
refuses to bridge them and merge() blows up with::
709+
710+
ArrowTypeError: Unable to merge: Field <name> has incompatible types:
711+
string vs large_string
712+
713+
This is the exact failure observed against real Iceberg parquet tables
714+
(where ArrowScan returns ``large_*``) when consolidating multiple feeds
715+
into one table via composite merge keys.
716+
717+
The fix in ``Transaction.merge()`` is to use ``promote_options="permissive"``
718+
(matching what ``pyiceberg/io/pyarrow.py`` already does for the same
719+
reason). This test pins that behavior.
720+
"""
721+
ident = "default.merge_large_string_drift"
722+
_drop(catalog, ident)
723+
tbl = catalog.create_table(ident, schema=SCHEMA)
724+
725+
# Write the initial rows with ``large_string`` to simulate what
726+
# ``ArrowScan.to_table()`` returns from a real Iceberg parquet read.
727+
large_string_arrow = pa.schema(
728+
[
729+
pa.field("user_id", pa.int32(), nullable=False),
730+
pa.field("name", pa.large_string(), nullable=False),
731+
pa.field("score", pa.int32(), nullable=False),
732+
]
733+
)
734+
tbl.append(
735+
pa.Table.from_pylist(
736+
[
737+
{"user_id": 1, "name": "Alice", "score": 100},
738+
{"user_id": 2, "name": "Bob", "score": 200},
739+
],
740+
schema=large_string_arrow,
741+
)
742+
)
743+
744+
# New batch using plain ``string`` (the pandas->arrow default for
745+
# ``pd.StringDtype()`` columns).
746+
assert ARROW.field("name").type == pa.string()
747+
tbl.merge(
748+
pa.Table.from_pylist(
749+
[
750+
{"user_id": 1, "name": "Alice", "score": 150},
751+
{"user_id": 3, "name": "Charlie", "score": 300},
752+
],
753+
schema=ARROW,
754+
),
755+
join_cols=["user_id"],
756+
)
757+
758+
result = tbl.scan().to_arrow().sort_by("user_id")
759+
assert result.num_rows == 3
760+
assert result.column("score").to_pylist() == [150, 200, 300]
761+
assert result.column("name").to_pylist() == ["Alice", "Bob", "Charlie"]
762+
763+
764+
def test_merge_handles_string_kept_rows_with_large_string_source(catalog: Catalog) -> None:
765+
"""Mirror of the previous test - target is ``string``, source is
766+
``large_string``. Both directions must be tolerated since the
767+
drift can flip either way depending on PyArrow version, file
768+
metadata, and the ``PYARROW_USE_LARGE_TYPES_ON_READ`` setting.
769+
"""
770+
ident = "default.merge_large_string_reverse"
771+
_drop(catalog, ident)
772+
tbl = catalog.create_table(ident, schema=SCHEMA)
773+
774+
tbl.append(
775+
pa.Table.from_pylist(
776+
[
777+
{"user_id": 1, "name": "Alice", "score": 100},
778+
{"user_id": 2, "name": "Bob", "score": 200},
779+
],
780+
schema=ARROW, # plain string
781+
)
782+
)
783+
784+
large_string_arrow = pa.schema(
785+
[
786+
pa.field("user_id", pa.int32(), nullable=False),
787+
pa.field("name", pa.large_string(), nullable=False),
788+
pa.field("score", pa.int32(), nullable=False),
789+
]
790+
)
791+
tbl.merge(
792+
pa.Table.from_pylist(
793+
[
794+
{"user_id": 1, "name": "Alice", "score": 150},
795+
],
796+
schema=large_string_arrow,
797+
),
798+
join_cols=["user_id"],
799+
)
800+
801+
result = tbl.scan().to_arrow().sort_by("user_id")
802+
assert result.num_rows == 2
803+
assert result.column("score").to_pylist() == [150, 200]
804+
805+
806+
def test_merge_handles_large_binary_kept_rows_with_binary_source(catalog: Catalog) -> None:
807+
"""Same drift family for binary columns: PyIceberg's ``ArrowScan`` can
808+
return ``large_binary`` while user-provided sources typically have
809+
``binary``. ``"permissive"`` promote bridges these too.
810+
"""
811+
from pyiceberg.types import BinaryType
812+
813+
binary_schema = Schema(
814+
NestedField(1, "user_id", IntegerType(), required=True),
815+
NestedField(2, "payload", BinaryType(), required=True),
816+
)
817+
ident = "default.merge_large_binary_drift"
818+
_drop(catalog, ident)
819+
tbl = catalog.create_table(ident, schema=binary_schema)
820+
821+
large_binary_arrow = pa.schema(
822+
[
823+
pa.field("user_id", pa.int32(), nullable=False),
824+
pa.field("payload", pa.large_binary(), nullable=False),
825+
]
826+
)
827+
tbl.append(
828+
pa.Table.from_pylist(
829+
[
830+
{"user_id": 1, "payload": b"alpha"},
831+
{"user_id": 2, "payload": b"bravo"},
832+
],
833+
schema=large_binary_arrow,
834+
)
835+
)
836+
837+
binary_arrow = pa.schema(
838+
[
839+
pa.field("user_id", pa.int32(), nullable=False),
840+
pa.field("payload", pa.binary(), nullable=False),
841+
]
842+
)
843+
tbl.merge(
844+
pa.Table.from_pylist(
845+
[
846+
{"user_id": 1, "payload": b"alpha-updated"},
847+
{"user_id": 3, "payload": b"charlie"},
848+
],
849+
schema=binary_arrow,
850+
),
851+
join_cols=["user_id"],
852+
)
853+
854+
result = tbl.scan().to_arrow().sort_by("user_id")
855+
assert result.num_rows == 3
856+
assert result.column("payload").to_pylist() == [b"alpha-updated", b"bravo", b"charlie"]
857+
858+
699859
# ==================== MERGE RESULT ====================
700860

701861

0 commit comments

Comments
 (0)