Skip to content

Commit 824faae

Browse files
committed
[python] Materialize constructed-type to STRING casts at read time
update_column_type from ROW/ARRAY/MAP to STRING passes validation (the cast rules allow constructed types to character strings), but reading an old file failed with ArrowNotImplementedError because struct/list/map cannot be cast to utf8 directly. Render the string form during per-file alignment instead, matching the engine's cast rules: ROW as '{v1, v2}', ARRAY as '[e1, e2]', MAP as '{k1 -> v1, k2 -> v2}', with sub-values rendered recursively, NULL sub-values as the literal 'null', and NULL containers staying NULL. Add round-trip tests for ROW/ARRAY/MAP to STRING, NULL semantics, and a nested sub-field changed to STRING.
1 parent b4d46ec commit 824faae

2 files changed

Lines changed: 153 additions & 2 deletions

File tree

paimon-python/pypaimon/read/reader/data_file_batch_reader.py

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,74 @@
2525
from pypaimon.read.partition_info import PartitionInfo
2626
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
2727
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
28-
from pypaimon.schema.data_types import (ArrayType, DataField, MapType,
29-
PyarrowFieldParser, RowType)
28+
from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
29+
MapType, PyarrowFieldParser, RowType)
3030
from pypaimon.table.special_fields import SpecialFields
3131

3232

33+
def _is_character_string_type(data_type) -> bool:
34+
if not isinstance(data_type, AtomicType):
35+
return False
36+
t = data_type.type.upper()
37+
return t == 'STRING' or t.startswith('VARCHAR') or t.startswith('CHAR')
38+
39+
40+
def _to_string_values(array, data_type) -> list:
41+
"""Render *array* as a list of per-row strings (None for NULL rows)."""
42+
if isinstance(data_type, (RowType, ArrayType, MapType)):
43+
return _constructed_to_string_array(array, data_type).to_pylist()
44+
return array.cast(pa.string(), safe=False).to_pylist()
45+
46+
47+
def _constructed_to_string_array(array, file_type):
48+
"""Render a struct/list/map array in the engine's string form:
49+
ROW -> ``{v1, v2}``, ARRAY -> ``[e1, e2]``, MAP -> ``{k1 -> v1, k2 -> v2}``.
50+
Sub-values are rendered recursively; a NULL sub-value renders as the
51+
literal ``null`` while a NULL container row stays NULL."""
52+
valid = pc.is_valid(array).to_pylist()
53+
out = []
54+
if isinstance(file_type, RowType):
55+
children = [
56+
_to_string_values(array.field(i), sub.type)
57+
for i, sub in enumerate(file_type.fields)
58+
]
59+
for i in range(len(array)):
60+
if not valid[i]:
61+
out.append(None)
62+
continue
63+
vals = [c[i] if c[i] is not None else 'null' for c in children]
64+
out.append('{' + ', '.join(vals) + '}')
65+
elif isinstance(file_type, ArrayType):
66+
values = _to_string_values(array.values, file_type.element)
67+
offsets = array.offsets.to_pylist()
68+
for i in range(len(array)):
69+
if not valid[i]:
70+
out.append(None)
71+
continue
72+
elems = [v if v is not None else 'null'
73+
for v in values[offsets[i]:offsets[i + 1]]]
74+
out.append('[' + ', '.join(elems) + ']')
75+
elif isinstance(file_type, MapType):
76+
keys = _to_string_values(array.keys, file_type.key)
77+
items = _to_string_values(array.items, file_type.value)
78+
offsets = array.offsets.to_pylist()
79+
for i in range(len(array)):
80+
if not valid[i]:
81+
out.append(None)
82+
continue
83+
entries = [
84+
'{} -> {}'.format(
85+
keys[j] if keys[j] is not None else 'null',
86+
items[j] if items[j] is not None else 'null')
87+
for j in range(offsets[i], offsets[i + 1])
88+
]
89+
out.append('{' + ', '.join(entries) + '}')
90+
else:
91+
raise ValueError(
92+
'Unsupported constructed type for string rendering: {}'.format(file_type))
93+
return pa.array(out, type=pa.string())
94+
95+
3396
class DataFileBatchReader(RecordBatchReader):
3497
"""
3598
Reads record batch from files of different formats
@@ -147,6 +210,12 @@ def _align_array_by_id(self, array, file_type, target_type):
147210
fields=[target_pa.key_field, target_pa.item_field])
148211
return pa.Array.from_buffers(
149212
target_pa, len(array), array.buffers()[:2], children=[entries])
213+
# A constructed type changed to a character string: pyarrow cannot
214+
# cast struct/list/map to utf8 directly, so render the engine's
215+
# string form instead.
216+
if (isinstance(file_type, (RowType, ArrayType, MapType))
217+
and _is_character_string_type(target_type)):
218+
return _constructed_to_string_array(array, file_type)
150219
# Leaf / non-nested: cast to the target type when it differs.
151220
target_pa_type = PyarrowFieldParser.from_paimon_type(target_type)
152221
if array.type != target_pa_type:

paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,88 @@ def test_map_of_row_add_subfield(self):
458458
self.assertEqual(rows[0]['m'], [('k', {'a': 1, 'b': 'x', 'c': None})])
459459

460460

461+
class SchemaEvolutionConstructedToStringTest(_NestedBase):
462+
"""update column type from ROW/ARRAY/MAP to STRING: old files must be
463+
materialized as the engine's string rendering at read time."""
464+
465+
def test_row_to_string(self):
466+
s0 = pa.schema([('id', pa.int64()),
467+
('mv', pa.struct([('a', pa.int32()), ('b', pa.string())]))])
468+
table = self._create('c2s_row', s0)
469+
self._write(table, pa.Table.from_pylist(
470+
[{'id': 1, 'mv': {'a': 1, 'b': 'x'}}], schema=s0))
471+
self.catalog.alter_table(
472+
'default.c2s_row',
473+
[SchemaChange.update_column_type('mv', AtomicType('STRING'))], False)
474+
table = self.catalog.get_table('default.c2s_row')
475+
s1 = pa.schema([('id', pa.int64()), ('mv', pa.string())])
476+
self._write(table, pa.Table.from_pylist(
477+
[{'id': 2, 'mv': 's2'}], schema=s1))
478+
479+
rows = self._read_sorted(table)
480+
self.assertEqual(rows, [
481+
{'id': 1, 'mv': '{1, x}'},
482+
{'id': 2, 'mv': 's2'},
483+
])
484+
485+
def test_array_to_string(self):
486+
s0 = pa.schema([('id', pa.int64()), ('arr', pa.list_(pa.int32()))])
487+
table = self._create('c2s_arr', s0)
488+
self._write(table, pa.Table.from_pylist(
489+
[{'id': 1, 'arr': [1, 2, 3]}], schema=s0))
490+
self.catalog.alter_table(
491+
'default.c2s_arr',
492+
[SchemaChange.update_column_type('arr', AtomicType('STRING'))], False)
493+
table = self.catalog.get_table('default.c2s_arr')
494+
rows = self._read_sorted(table)
495+
self.assertEqual(rows[0]['arr'], '[1, 2, 3]')
496+
497+
def test_map_to_string(self):
498+
s0 = pa.schema([('id', pa.int64()),
499+
('m', pa.map_(pa.string(), pa.int32()))])
500+
table = self._create('c2s_map', s0)
501+
self._write(table, pa.Table.from_pylist(
502+
[{'id': 1, 'm': [('k', 7)]}], schema=s0))
503+
self.catalog.alter_table(
504+
'default.c2s_map',
505+
[SchemaChange.update_column_type('m', AtomicType('STRING'))], False)
506+
table = self.catalog.get_table('default.c2s_map')
507+
rows = self._read_sorted(table)
508+
self.assertEqual(rows[0]['m'], '{k -> 7}')
509+
510+
def test_row_to_string_null_semantics(self):
511+
s0 = pa.schema([('id', pa.int64()),
512+
('mv', pa.struct([('a', pa.int32()), ('b', pa.string())]))])
513+
table = self._create('c2s_null', s0)
514+
self._write(table, pa.Table.from_pylist([
515+
{'id': 1, 'mv': None},
516+
{'id': 2, 'mv': {'a': None, 'b': 'x'}},
517+
], schema=s0))
518+
self.catalog.alter_table(
519+
'default.c2s_null',
520+
[SchemaChange.update_column_type('mv', AtomicType('STRING'))], False)
521+
table = self.catalog.get_table('default.c2s_null')
522+
rows = self._read_sorted(table)
523+
# A NULL container stays NULL; a NULL sub-value renders as 'null'.
524+
self.assertIsNone(rows[0]['mv'])
525+
self.assertEqual(rows[1]['mv'], '{null, x}')
526+
527+
def test_nested_subfield_row_to_string(self):
528+
inner = pa.struct([('a', pa.int32())])
529+
s0 = pa.schema([('id', pa.int64()),
530+
('mv', pa.struct([('inner', inner)]))])
531+
table = self._create('c2s_nested', s0)
532+
self._write(table, pa.Table.from_pylist(
533+
[{'id': 1, 'mv': {'inner': {'a': 1}}}], schema=s0))
534+
self.catalog.alter_table(
535+
'default.c2s_nested',
536+
[SchemaChange.update_column_type(['mv', 'inner'], AtomicType('STRING'))],
537+
False)
538+
table = self.catalog.get_table('default.c2s_nested')
539+
rows = self._read_sorted(table)
540+
self.assertEqual(rows[0]['mv'], {'inner': '{1}'})
541+
542+
461543
class NestedFieldIdModelTest(unittest.TestCase):
462544
"""Globally-unique nested field ids, mirrored from the engine id model."""
463545

0 commit comments

Comments
 (0)