Skip to content

Commit b4d46ec

Browse files
committed
[python] Align nested-leaf projection with field-id schema evolution
Nested-leaf projection on append-only reads pushed the leaf path down by the LATEST name, bypassing the per-file field-id normalization: after a sub-field rename the old file's leaf read NULL, and after a sub-field type change old and new batches carried different types and failed to concatenate. Mirror the merge path instead: widen the projection to the full top-level columns so the field-id normalization applies (rename follows the id, missing sub-fields pad NULL, types are cast), then extract the requested leaf paths back to the user's flat schema - batch-level via NestedLeafBatchReader, or row-level via OuterProjectionRecordReader when a post-read filter is involved. Add regression tests projecting a renamed and a type-changed sub-field across old and new files.
1 parent 0dccd47 commit b4d46ec

4 files changed

Lines changed: 173 additions & 2 deletions

File tree

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from typing import List, Optional
19+
20+
import pyarrow as pa
21+
import pyarrow.compute as pc
22+
from pyarrow import RecordBatch
23+
24+
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
25+
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
26+
27+
28+
class NestedLeafBatchReader(RecordBatchReader):
29+
"""Extract projected nested leaves from batches of full top-level columns.
30+
31+
The inner reader yields batches carrying the widened top-level columns,
32+
already normalized to the latest schema by field id (renames followed,
33+
missing sub-fields padded NULL, types cast). Each requested name path is
34+
walked through the struct children (a NULL parent propagates to the
35+
leaf), producing the user's flat projected schema.
36+
"""
37+
38+
def __init__(self, inner: RecordBatchReader, name_paths: List[List[str]],
39+
output_fields: List[DataField]):
40+
if len(name_paths) != len(output_fields):
41+
raise ValueError(
42+
"name_paths length {} does not match output_fields length {}".format(
43+
len(name_paths), len(output_fields)))
44+
self._inner = inner
45+
self._paths = name_paths
46+
self._schema = PyarrowFieldParser.from_paimon_schema(output_fields)
47+
48+
def read_arrow_batch(self) -> Optional[RecordBatch]:
49+
batch = self._inner.read_arrow_batch()
50+
if batch is None:
51+
return None
52+
arrays = []
53+
for i, path in enumerate(self._paths):
54+
column = batch.column(path[0])
55+
for name in path[1:]:
56+
column = pc.struct_field(column, name)
57+
target_type = self._schema.field(i).type
58+
if column.type != target_type:
59+
column = column.cast(target_type, safe=False)
60+
arrays.append(column)
61+
return pa.RecordBatch.from_arrays(arrays, schema=self._schema)
62+
63+
def close(self) -> None:
64+
self._inner.close()

paimon-python/pypaimon/read/split_read.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,33 @@ def _genarate_deletion_file_readers(self):
627627

628628

629629
class RawFileSplitRead(SplitRead):
630+
def __init__(
631+
self,
632+
table,
633+
predicate: Optional[Predicate],
634+
read_type: List[DataField],
635+
split: Split,
636+
row_tracking_enabled: bool,
637+
outer_extract_name_paths: Optional[List[List[str]]] = None,
638+
outer_flat_read_type: Optional[List[DataField]] = None,
639+
limit: Optional[int] = None):
640+
# Nested-leaf projection is NOT pushed down by name: a leaf path is
641+
# only valid against the latest schema, while each data file stores
642+
# its own (possibly renamed / retyped) sub-fields. Instead the read
643+
# widens to the full top-level columns, which the per-file field-id
644+
# normalization aligns to the latest schema, and the requested leaf
645+
# paths are extracted afterwards (``outer_extract_name_paths``).
646+
super().__init__(
647+
table=table,
648+
predicate=predicate,
649+
read_type=read_type,
650+
split=split,
651+
row_tracking_enabled=row_tracking_enabled,
652+
nested_name_paths=None,
653+
limit=limit)
654+
self.outer_extract_name_paths = outer_extract_name_paths
655+
self.outer_flat_read_type = outer_flat_read_type
656+
630657
def raw_reader_supplier(self, file: DataFileMeta, dv_factory: Optional[Callable] = None) -> Optional[RecordReader]:
631658
read_fields = self._get_final_read_data_fields()
632659
# Check if this is a SlicedSplit to get shard_file_idx_map
@@ -676,10 +703,27 @@ def create_reader(self) -> RecordReader:
676703
# if the table is appendonly table, we don't need extra filter, all predicates has pushed down
677704
if self.table.is_primary_key_table and self.predicate_for_reader:
678705
reader = FilterRecordReader(concat_reader, self.predicate_for_reader)
706+
if self.outer_extract_name_paths:
707+
# Row-level extraction: the filter evaluates rows in the
708+
# widened top-level coordinate space, so extract after it.
709+
from pypaimon.read.reader.outer_projection_record_reader import \
710+
OuterProjectionRecordReader
711+
reader = OuterProjectionRecordReader(
712+
reader, [f.name for f in self.read_fields],
713+
self.outer_extract_name_paths,
714+
file_io=self.table.file_io,
715+
blob_field_indices=_blob_field_indices(self.read_fields),
716+
vector_field_indices=_vector_field_indices(self.read_fields))
679717
if self.limit is not None:
680718
reader = LimitedRecordReader(reader, self.limit)
681719
else:
682720
reader = concat_reader
721+
if self.outer_extract_name_paths:
722+
from pypaimon.read.reader.nested_leaf_batch_reader import \
723+
NestedLeafBatchReader
724+
reader = NestedLeafBatchReader(
725+
reader, self.outer_extract_name_paths,
726+
self.outer_flat_read_type)
683727
if self.limit is not None:
684728
reader = LimitedRecordBatchReader(reader, self.limit)
685729
return reader

paimon-python/pypaimon/read/table_read.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -603,13 +603,26 @@ def _create_split_read(self, split: Split) -> SplitRead:
603603
limit=self.limit,
604604
)
605605
else:
606+
inner_read_type = self.read_type
607+
outer_extract_name_paths: Optional[List[List[str]]] = None
608+
if self.nested_name_paths and any(
609+
len(p) > 1 for p in self.nested_name_paths):
610+
# Mirror the merge path: read the full top-level columns so
611+
# the per-file field-id normalization applies (a leaf path is
612+
# only valid against the latest schema, not each file's own
613+
# names/types), then extract the requested sub-paths back to
614+
# the user's flat schema.
615+
inner_read_type = self._widen_to_top_level_for_merge()
616+
outer_extract_name_paths = self.nested_name_paths
606617
return RawFileSplitRead(
607618
table=self.table,
608619
predicate=self.predicate,
609-
read_type=self.read_type,
620+
read_type=inner_read_type,
610621
split=split,
611622
row_tracking_enabled=self.table.options.row_tracking_enabled(),
612-
nested_name_paths=self.nested_name_paths,
623+
outer_extract_name_paths=outer_extract_name_paths,
624+
outer_flat_read_type=(
625+
self.read_type if outer_extract_name_paths else None),
613626
limit=self.limit,
614627
)
615628

paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,56 @@ def test_unsupported_subfield_cast_rejected(self):
351351
['mv', 'latest_version'], AtomicType('DATE'))], False)
352352
self.assertIn('cannot be converted', str(cm.exception))
353353

354+
def test_nested_projection_after_rename_subfield(self):
355+
# Projecting a renamed leaf must follow the field id into old files,
356+
# not look the new name up in the file's physical schema.
357+
s0 = pa.schema([('id', pa.int64()),
358+
('mv', pa.struct([('v', pa.int32()), ('s', pa.string())]))])
359+
table = self._create('nsub_proj_rename', s0)
360+
self._write(table, pa.Table.from_pylist(
361+
[{'id': 1, 'mv': {'v': 10, 's': 'a'}}], schema=s0))
362+
self.catalog.alter_table(
363+
'default.nsub_proj_rename',
364+
[SchemaChange.rename_column(['mv', 's'], 'ss')], False)
365+
table = self.catalog.get_table('default.nsub_proj_rename')
366+
s1 = pa.schema([('id', pa.int64()),
367+
('mv', pa.struct([('v', pa.int32()), ('ss', pa.string())]))])
368+
self._write(table, pa.Table.from_pylist(
369+
[{'id': 2, 'mv': {'v': 20, 'ss': 'b'}}], schema=s1))
370+
371+
rows = self._read_sorted(table, projection=['id', 'mv.ss'])
372+
self.assertEqual(rows, [
373+
{'id': 1, 'mv_ss': 'a'},
374+
{'id': 2, 'mv_ss': 'b'},
375+
])
376+
377+
def test_nested_projection_after_update_subfield_type(self):
378+
# Projecting a type-changed leaf must cast old batches to the latest
379+
# type instead of emitting mixed-type batches.
380+
s0 = pa.schema([('id', pa.int64()),
381+
('mv', pa.struct([('v', pa.int32()), ('s', pa.string())]))])
382+
table = self._create('nsub_proj_type', s0)
383+
self._write(table, pa.Table.from_pylist(
384+
[{'id': 1, 'mv': {'v': 10, 's': 'a'}}], schema=s0))
385+
self.catalog.alter_table(
386+
'default.nsub_proj_type',
387+
[SchemaChange.update_column_type(['mv', 'v'], AtomicType('BIGINT'))], False)
388+
table = self.catalog.get_table('default.nsub_proj_type')
389+
s1 = pa.schema([('id', pa.int64()),
390+
('mv', pa.struct([('v', pa.int64()), ('s', pa.string())]))])
391+
self._write(table, pa.Table.from_pylist(
392+
[{'id': 2, 'mv': {'v': 20, 's': 'b'}}], schema=s1))
393+
394+
rb = table.new_read_builder().with_projection(['id', 'mv.v'])
395+
splits = rb.new_scan().plan().splits()
396+
arrow = rb.new_read().to_arrow(splits)
397+
self.assertEqual(arrow.schema.field('mv_v').type, pa.int64())
398+
rows = sorted(arrow.to_pylist(), key=lambda r: r['id'])
399+
self.assertEqual(rows, [
400+
{'id': 1, 'mv_v': 10},
401+
{'id': 2, 'mv_v': 20},
402+
])
403+
354404
def test_pk_nested_subfield_evolution_merge(self):
355405
s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA)])
356406
table = self._create('nsub_pk', s0, primary_keys=['id'], bucket='1')

0 commit comments

Comments
 (0)