|
18 | 18 | from typing import List, Optional |
19 | 19 |
|
20 | 20 | import pyarrow as pa |
| 21 | +import pyarrow.compute as pc |
21 | 22 | from pyarrow import RecordBatch |
22 | 23 |
|
23 | 24 | from pypaimon.common.file_io import FileIO |
24 | 25 | from pypaimon.read.partition_info import PartitionInfo |
25 | 26 | from pypaimon.read.reader.format_blob_reader import FormatBlobReader |
26 | 27 | from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader |
27 | | -from pypaimon.schema.data_types import DataField, PyarrowFieldParser |
| 28 | +from pypaimon.schema.data_types import (ArrayType, DataField, MapType, |
| 29 | + PyarrowFieldParser, RowType) |
28 | 30 | from pypaimon.table.special_fields import SpecialFields |
29 | 31 |
|
30 | 32 |
|
@@ -57,55 +59,99 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p |
57 | 59 | self.file_io = file_io |
58 | 60 | # Per-file field-id normalization: map the physically-read columns |
59 | 61 | # (the file's own field order/names) onto the latest read target by |
60 | | - # field id, padding missing ids with NULL. ``None`` when there is no |
61 | | - # evolution to reconcile (identity) -- the common path stays zero-copy. |
62 | | - self._normalize_positions, self._normalize_names = \ |
63 | | - self._build_normalize_plan(file_data_fields, target_data_fields) |
| 62 | + # field id, padding missing ids with NULL and recursing into nested |
| 63 | + # ROW / ARRAY<ROW> / MAP<.,ROW> sub-fields the same way. ``None`` when |
| 64 | + # there is no evolution to reconcile -- the common path stays zero-copy. |
| 65 | + self._normalize_plan = self._build_normalize_plan(file_data_fields, target_data_fields) |
64 | 66 |
|
65 | 67 | @staticmethod |
66 | 68 | def _build_normalize_plan(file_data_fields, target_data_fields): |
67 | 69 | """Build a per-file field-id alignment plan. |
68 | 70 |
|
69 | | - Returns ``(positions, names)`` where ``positions[i]`` is the column |
70 | | - index in the physically-read batch carrying ``target_data_fields[i]`` |
71 | | - (matched by field id), or -1 if the file does not contain that id (pad |
72 | | - NULL). ``names[i]`` is the latest target name. Returns ``(None, None)`` |
73 | | - when the plan is the identity (no evolution), so the caller skips |
74 | | - normalization and stays zero-copy. |
| 71 | + Returns a list of ``(pos, file_field, target_field)`` -- one per target |
| 72 | + field, in target order -- where ``pos`` is the column index in the |
| 73 | + physically-read batch carrying ``target_field`` (matched by field id), |
| 74 | + or -1 if the file does not contain that id (pad NULL). Returns ``None`` |
| 75 | + when the file already matches the target exactly (no evolution), so the |
| 76 | + caller stays zero-copy. |
75 | 77 | """ |
76 | 78 | if file_data_fields is None or target_data_fields is None: |
77 | | - return None, None |
| 79 | + return None |
| 80 | + # Recursive equality covers nested sub-field changes too: any rename / |
| 81 | + # add / drop / type change at any depth makes the file != target. |
| 82 | + if file_data_fields == target_data_fields: |
| 83 | + return None |
78 | 84 | file_id_to_pos = {f.id: i for i, f in enumerate(file_data_fields)} |
79 | | - positions = [] |
80 | | - names = [] |
81 | | - # Identity only when every target maps to the same physical position |
82 | | - # AND already carries the same name -- a rename keeps the position but |
83 | | - # changes the name, which still requires a relabel pass. |
84 | | - identity = len(file_data_fields) == len(target_data_fields) |
85 | | - for i, target in enumerate(target_data_fields): |
| 85 | + plan = [] |
| 86 | + for target in target_data_fields: |
86 | 87 | pos = file_id_to_pos.get(target.id, -1) |
87 | | - positions.append(pos) |
88 | | - names.append(target.name) |
89 | | - if pos != i or (pos >= 0 and file_data_fields[pos].name != target.name): |
90 | | - identity = False |
91 | | - if identity: |
92 | | - return None, None |
93 | | - return positions, names |
| 88 | + file_field = file_data_fields[pos] if pos >= 0 else None |
| 89 | + plan.append((pos, file_field, target)) |
| 90 | + return plan |
94 | 91 |
|
95 | 92 | def _normalize_batch(self, record_batch: RecordBatch) -> RecordBatch: |
96 | 93 | """Reorder/pad the physically-read batch onto the latest read target by |
97 | | - field id, and relabel columns to the latest names. Missing ids become |
98 | | - all-NULL columns; types are reconciled later by _align_batch_to_read_schema.""" |
99 | | - if self._normalize_positions is None: |
| 94 | + field id, relabel columns to the latest names, and align nested ROW |
| 95 | + sub-fields by id. Missing ids become typed all-NULL columns.""" |
| 96 | + if self._normalize_plan is None: |
100 | 97 | return record_batch |
101 | 98 | num_rows = record_batch.num_rows |
102 | 99 | arrays = [] |
103 | | - for pos in self._normalize_positions: |
| 100 | + names = [] |
| 101 | + for pos, file_field, target_field in self._normalize_plan: |
| 102 | + target_pa_type = PyarrowFieldParser.from_paimon_type(target_field.type) |
104 | 103 | if pos < 0: |
105 | | - arrays.append(pa.nulls(num_rows)) |
| 104 | + arrays.append(pa.nulls(num_rows, type=target_pa_type)) |
106 | 105 | else: |
107 | | - arrays.append(record_batch.column(pos)) |
108 | | - return pa.RecordBatch.from_arrays(arrays, names=self._normalize_names) |
| 106 | + arrays.append(self._align_array_by_id( |
| 107 | + record_batch.column(pos), file_field.type, target_field.type)) |
| 108 | + names.append(target_field.name) |
| 109 | + return pa.RecordBatch.from_arrays(arrays, names=names) |
| 110 | + |
| 111 | + def _align_array_by_id(self, array, file_type, target_type): |
| 112 | + """Return *array* converted to *target_type*, matching ROW sub-fields by |
| 113 | + field id (reorder, pad missing with NULL, follow renames, cast changed |
| 114 | + types) recursively, transparently through ARRAY/MAP wrappers.""" |
| 115 | + if isinstance(target_type, RowType) and isinstance(file_type, RowType): |
| 116 | + n = len(array) |
| 117 | + file_id_to_pos = {f.id: i for i, f in enumerate(file_type.fields)} |
| 118 | + children = [] |
| 119 | + pa_fields = [] |
| 120 | + for tsub in target_type.fields: |
| 121 | + p = file_id_to_pos.get(tsub.id, -1) |
| 122 | + if p < 0: |
| 123 | + child = pa.nulls(n, type=PyarrowFieldParser.from_paimon_type(tsub.type)) |
| 124 | + else: |
| 125 | + child = self._align_array_by_id( |
| 126 | + array.field(p), file_type.fields[p].type, tsub.type) |
| 127 | + children.append(child) |
| 128 | + pa_fields.append(pa.field(tsub.name, child.type, nullable=tsub.type.nullable)) |
| 129 | + # Preserve the struct's own null mask; child values under a null |
| 130 | + # struct are irrelevant. |
| 131 | + return pa.StructArray.from_arrays( |
| 132 | + children, fields=pa_fields, mask=pc.is_null(array)) |
| 133 | + if isinstance(target_type, ArrayType) and isinstance(file_type, ArrayType): |
| 134 | + aligned_values = self._align_array_by_id( |
| 135 | + array.values, file_type.element, target_type.element) |
| 136 | + return pa.ListArray.from_arrays( |
| 137 | + array.offsets, aligned_values, mask=pc.is_null(array)) |
| 138 | + if isinstance(target_type, MapType) and isinstance(file_type, MapType): |
| 139 | + aligned_items = self._align_array_by_id( |
| 140 | + array.items, file_type.value, target_type.value) |
| 141 | + # MapArray.from_arrays cannot carry a null mask (a null map would |
| 142 | + # collapse to an empty one), so rebuild from buffers, reusing the |
| 143 | + # original validity/offset buffers and only swapping the value child. |
| 144 | + target_pa = PyarrowFieldParser.from_paimon_type(target_type) |
| 145 | + entries = pa.StructArray.from_arrays( |
| 146 | + [array.keys, aligned_items], |
| 147 | + fields=[target_pa.key_field, target_pa.item_field]) |
| 148 | + return pa.Array.from_buffers( |
| 149 | + target_pa, len(array), array.buffers()[:2], children=[entries]) |
| 150 | + # Leaf / non-nested: cast to the target type when it differs. |
| 151 | + target_pa_type = PyarrowFieldParser.from_paimon_type(target_type) |
| 152 | + if array.type != target_pa_type: |
| 153 | + return array.cast(target_pa_type, safe=False) |
| 154 | + return array |
109 | 155 |
|
110 | 156 | def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]: |
111 | 157 | if isinstance(self.format_reader, FormatBlobReader): |
|
0 commit comments