Skip to content

Commit b3c8c25

Browse files
committed
[python] Harden nested schema evolution: reject reshaping casts, fix sliced arrays, gate null-to-not-null
Self-review findings on the nested schema-evolution path: - update_column_type between same-root constructed types (e.g. ROW<a INT> -> ROW<a BIGINT, c STRING>) was accepted: the replacement carried caller-supplied nested ids that corrupt the id model and old rows read all-NULL; a VECTOR length change was accepted but unreadable. Reject non-identical constructed-to-constructed casts - reshaping goes through sub-field / 'element' / 'value' paths, which keep working. - The list/map rebuilds in the alignment and string-rendering paths read offsets/raw buffers directly, which errors on a sliced ListArray and silently misaligns rows on a sliced MapArray; re-materialize sliced inputs first. - Converting a nullable column to NOT NULL was silently accepted; it is now rejected by default and opt-in via 'alter-column-null-to-not-null.disabled' = 'false'. Also add an end-to-end test for the array 'element' type promotion path.
1 parent 2ffdce3 commit b3c8c25

4 files changed

Lines changed: 158 additions & 4 deletions

File tree

paimon-python/pypaimon/casting/data_type_casts.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@
2626
applies the conversion leniently.
2727
"""
2828

29-
from pypaimon.schema.data_types import (ArrayType, AtomicType, MapType,
30-
MultisetType, RowType, VectorType)
29+
from pypaimon.schema.data_types import (ArrayType, AtomicType, DataTypeParser,
30+
MapType, MultisetType, RowType,
31+
VectorType)
3132

3233
# ---- Type roots --------------------------------------------------------------
3334

@@ -164,9 +165,25 @@ def supports_cast(source_type, target_type, allow_explicit: bool = True) -> bool
164165
if source_type.nullable and not target_type.nullable and not allow_explicit:
165166
return False
166167
if source_root == target_root:
168+
if source_root in CONSTRUCTED:
169+
# A constructed type is only castable to an (ignoring outer
170+
# nullability) identical constructed type. Reshaping is done
171+
# through sub-field / 'element' / 'value' paths instead: a whole
172+
# ROW replacement would carry caller-supplied nested field ids
173+
# that corrupt the id model, and there is no runtime conversion
174+
# between differently-shaped constructed values.
175+
return _equals_ignore_nullable(source_type, target_type)
167176
return True
168177
if source_root in _IMPLICIT_RULES.get(target_root, set()):
169178
return True
170179
if allow_explicit and source_root in _EXPLICIT_RULES.get(target_root, set()):
171180
return True
172181
return False
182+
183+
184+
def _equals_ignore_nullable(source_type, target_type) -> bool:
185+
source_copy = DataTypeParser.parse_data_type(source_type.to_dict())
186+
target_copy = DataTypeParser.parse_data_type(target_type.to_dict())
187+
source_copy.nullable = True
188+
target_copy.nullable = True
189+
return source_copy == target_copy

paimon-python/pypaimon/read/reader/data_file_batch_reader.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,18 @@ def _is_character_string_type(data_type) -> bool:
3737
return t == 'STRING' or t.startswith('VARCHAR') or t.startswith('CHAR')
3838

3939

40+
def _unslice(array):
41+
"""Re-materialize a sliced array so offsets/buffers start at zero.
42+
43+
The list/map rebuilds below read ``offsets``/raw buffers directly; on a
44+
sliced array those still point into the parent storage, which either
45+
errors (list rebuild with a null mask) or silently misaligns rows (map
46+
rebuild from raw buffers)."""
47+
if array.offset == 0:
48+
return array
49+
return pa.concat_arrays([array])
50+
51+
4052
def _to_string_values(array, data_type) -> list:
4153
"""Render *array* as a list of per-row strings (None for NULL rows)."""
4254
if isinstance(data_type, (RowType, ArrayType, MapType)):
@@ -49,6 +61,7 @@ def _constructed_to_string_array(array, file_type):
4961
ROW -> ``{v1, v2}``, ARRAY -> ``[e1, e2]``, MAP -> ``{k1 -> v1, k2 -> v2}``.
5062
Sub-values are rendered recursively; a NULL sub-value renders as the
5163
literal ``null`` while a NULL container row stays NULL."""
64+
array = _unslice(array)
5265
valid = pc.is_valid(array).to_pylist()
5366
out = []
5467
if isinstance(file_type, RowType):
@@ -194,11 +207,13 @@ def _align_array_by_id(self, array, file_type, target_type):
194207
return pa.StructArray.from_arrays(
195208
children, fields=pa_fields, mask=pc.is_null(array))
196209
if isinstance(target_type, ArrayType) and isinstance(file_type, ArrayType):
210+
array = _unslice(array)
197211
aligned_values = self._align_array_by_id(
198212
array.values, file_type.element, target_type.element)
199213
return pa.ListArray.from_arrays(
200214
array.offsets, aligned_values, mask=pc.is_null(array))
201215
if isinstance(target_type, MapType) and isinstance(file_type, MapType):
216+
array = _unslice(array)
202217
aligned_items = self._align_array_by_id(
203218
array.items, file_type.value, target_type.value)
204219
# MapArray.from_arrays cannot carry a null mask (a null map would

paimon-python/pypaimon/schema/schema_manager.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,15 +181,30 @@ def update_func(field: DataField, depth: int) -> DataField:
181181
_update_nested_column(new_fields, change.field_names, update_func)
182182

183183

184+
def _assert_nullability_change(old_nullability: bool, new_nullability: bool,
185+
field_name: str, disable_null_to_not_null: bool):
186+
if disable_null_to_not_null and old_nullability and not new_nullability:
187+
raise ValueError(
188+
"Cannot update column type from nullable to non nullable for {}. "
189+
"You can set table configuration option "
190+
"'alter-column-null-to-not-null.disabled' = 'false' "
191+
"to allow converting null columns to not null".format(field_name)
192+
)
193+
194+
184195
def _handle_update_column_nullability(
185-
change: UpdateColumnNullability, new_fields: List[DataField]
196+
change: UpdateColumnNullability, new_fields: List[DataField],
197+
disable_null_to_not_null: bool
186198
):
187199
from pypaimon.schema.data_types import DataTypeParser
188200
field_names = change.field_names
189201
max_depth = len(field_names)
190202

191203
def update_func(field: DataField, depth: int) -> DataField:
192204
source_root = _get_root_type(field.type, depth, max_depth)
205+
_assert_nullability_change(
206+
source_root.nullable, change.new_nullability,
207+
'.'.join(field_names), disable_null_to_not_null)
193208
new_root = DataTypeParser.parse_data_type(source_root.to_dict())
194209
new_root.nullable = change.new_nullability
195210
new_type = _get_array_map_type_with_target_type_root(
@@ -652,6 +667,10 @@ def _generate_table_schema(
652667
# Get add_column_before_partition option
653668
add_column_before_partition = CoreOptions(Options(old_table_schema.options)).add_column_before_partition()
654669
partition_keys = old_table_schema.partition_keys
670+
# Converting a nullable column to NOT NULL is unsafe for existing
671+
# data and is disabled by default; the table option below opts in.
672+
disable_null_to_not_null = str(old_table_schema.options.get(
673+
'alter-column-null-to-not-null.disabled', 'true')).lower() != 'false'
655674

656675
for change in changes:
657676
if isinstance(change, SetOption):
@@ -688,7 +707,8 @@ def _generate_table_schema(
688707
_assert_not_updating_primary_keys(
689708
old_table_schema, change.field_names, "change nullability of"
690709
)
691-
_handle_update_column_nullability(change, new_fields)
710+
_handle_update_column_nullability(
711+
change, new_fields, disable_null_to_not_null)
692712
elif isinstance(change, UpdateColumnComment):
693713
_handle_update_column_comment(change, new_fields)
694714
elif isinstance(change, UpdateColumnPosition):

paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,31 @@ def test_drop_all_subfields_rejected(self):
343343
'default.nsub_dropall',
344344
[SchemaChange.drop_column(['mv', 'latest_version'])], False)
345345

346+
def test_null_to_not_null_disabled_by_default(self):
347+
# Converting nullable -> NOT NULL is unsafe for existing data and is
348+
# rejected unless the table opts in via
349+
# 'alter-column-null-to-not-null.disabled' = 'false'.
350+
self._create_struct_table('nsub_nullability')
351+
with self.assertRaises(RuntimeError) as cm:
352+
self.catalog.alter_table(
353+
'default.nsub_nullability',
354+
[SchemaChange.update_column_nullability(
355+
['mv', 'latest_value'], False)], False)
356+
self.assertIn('nullable to non nullable', str(cm.exception))
357+
# Opt-in makes the same change succeed.
358+
self.catalog.alter_table(
359+
'default.nsub_nullability',
360+
[SchemaChange.set_option(
361+
'alter-column-null-to-not-null.disabled', 'false')], False)
362+
self.catalog.alter_table(
363+
'default.nsub_nullability',
364+
[SchemaChange.update_column_nullability(
365+
['mv', 'latest_value'], False)], False)
366+
schema = self.catalog.get_table('default.nsub_nullability').table_schema
367+
mv = next(f for f in schema.fields if f.name == 'mv')
368+
lv = next(sf for sf in mv.type.fields if sf.name == 'latest_value')
369+
self.assertFalse(lv.type.nullable)
370+
346371
def test_unsupported_subfield_cast_rejected(self):
347372
self._create_struct_table('nsub_badcast')
348373
with self.assertRaises(RuntimeError) as cm:
@@ -476,6 +501,65 @@ def test_array_wrapper_token_validated(self):
476501
[SchemaChange.add_column(['arr', 'element', 'c'], AtomicType('INT'))],
477502
False)
478503

504+
def test_array_element_type_update(self):
505+
# The canonical path for promoting an array's element type descends
506+
# through the 'element' token; old files are cast at read time.
507+
s0 = pa.schema([('id', pa.int64()), ('a2', pa.list_(pa.int32()))])
508+
table = self._create('nelem_type', s0)
509+
self._write(table, pa.Table.from_pylist(
510+
[{'id': 1, 'a2': [1, 2]}], schema=s0))
511+
self.catalog.alter_table(
512+
'default.nelem_type',
513+
[SchemaChange.update_column_type(['a2', 'element'], AtomicType('BIGINT'))],
514+
False)
515+
table = self.catalog.get_table('default.nelem_type')
516+
s1 = pa.schema([('id', pa.int64()), ('a2', pa.list_(pa.int64()))])
517+
self._write(table, pa.Table.from_pylist(
518+
[{'id': 2, 'a2': [3]}], schema=s1))
519+
rb = table.new_read_builder()
520+
splits = rb.new_scan().plan().splits()
521+
arrow = rb.new_read().to_arrow(splits)
522+
self.assertEqual(arrow.schema.field('a2').type, pa.list_(pa.int64()))
523+
rows = sorted(arrow.to_pylist(), key=lambda r: r['id'])
524+
self.assertEqual(rows, [{'id': 1, 'a2': [1, 2]}, {'id': 2, 'a2': [3]}])
525+
526+
def test_whole_struct_type_replacement_rejected(self):
527+
# Replacing a whole ROW type would carry caller-supplied nested ids
528+
# that corrupt the id model; it must be rejected at alter time.
529+
elem = pa.struct([('a', pa.int32()), ('b', pa.string())])
530+
s0 = pa.schema([('id', pa.int64()), ('mv', elem)])
531+
self._create('nrow_replace', s0)
532+
new_row = _paimon_type(pa.struct([('a', pa.int64()), ('c', pa.string())]))
533+
with self.assertRaises(RuntimeError) as cm:
534+
self.catalog.alter_table(
535+
'default.nrow_replace',
536+
[SchemaChange.update_column_type('mv', new_row)], False)
537+
self.assertIn('cannot be converted', str(cm.exception))
538+
539+
def test_align_handles_sliced_arrays(self):
540+
# The list/map rebuilds read offsets/raw buffers; a sliced input
541+
# must be re-materialized, not read through stale parent offsets.
542+
from pypaimon.read.reader.data_file_batch_reader import \
543+
DataFileBatchReader
544+
reader = DataFileBatchReader.__new__(DataFileBatchReader)
545+
sliced_list = pa.array(
546+
[[1, 2], [3], [4, 5, 6], None], type=pa.list_(pa.int32())).slice(1, 3)
547+
out = reader._align_array_by_id(
548+
sliced_list,
549+
ArrayType(True, AtomicType('INT')),
550+
ArrayType(True, AtomicType('BIGINT')))
551+
self.assertEqual(out.to_pylist(), [[3], [4, 5, 6], None])
552+
self.assertEqual(out.type, pa.list_(pa.int64()))
553+
554+
sliced_map = pa.array(
555+
[[('a', 1)], [('b', 2)], None],
556+
type=pa.map_(pa.string(), pa.int32())).slice(1, 2)
557+
out = reader._align_array_by_id(
558+
sliced_map,
559+
MapType(True, AtomicType('STRING'), AtomicType('INT')),
560+
MapType(True, AtomicType('STRING'), AtomicType('BIGINT')))
561+
self.assertEqual(out.to_pylist(), [[('b', 2)], None])
562+
479563
def test_map_wrapper_token_validated(self):
480564
# The token consumed when descending through a MAP must be 'value'.
481565
val = pa.struct([('a', pa.int64())])
@@ -657,6 +741,24 @@ def test_constructed_to_string(self):
657741
for src in (vec, ms):
658742
self.assertFalse(supports_cast(src, AtomicType('STRING')), str(src))
659743

744+
def test_constructed_to_differently_shaped_constructed_rejected(self):
745+
# Reshaping a constructed type must go through sub-field /
746+
# 'element' / 'value' paths; a whole-type replacement would carry
747+
# caller-supplied nested ids that corrupt the id model.
748+
self.assertFalse(supports_cast(
749+
RowType(True, [DataField(0, 'a', AtomicType('INT'))]),
750+
RowType(True, [DataField(0, 'a', AtomicType('BIGINT'))])))
751+
self.assertFalse(supports_cast(
752+
ArrayType(True, AtomicType('INT')),
753+
ArrayType(True, AtomicType('BIGINT'))))
754+
self.assertFalse(supports_cast(
755+
VectorType(True, AtomicType('FLOAT'), 3),
756+
VectorType(True, AtomicType('FLOAT'), 5)))
757+
# Only the outer nullability differing is still an identity cast.
758+
self.assertTrue(supports_cast(
759+
RowType(True, [DataField(2, 'a', AtomicType('INT'))]),
760+
RowType(False, [DataField(2, 'a', AtomicType('INT'))])))
761+
660762

661763
if __name__ == '__main__':
662764
unittest.main()

0 commit comments

Comments
 (0)