Skip to content

Commit a3bb6a6

Browse files
committed
[python] Re-apply dropped nested-leaf predicate after projection extraction
When a read projects a nested struct sub-field (e.g. mv.latest_version), the read widens the projection to the full top-level column so per-file field-id normalization applies, then extracts the leaf afterwards. The leaf path is absent from the widened read fields, so SplitRead.__init__ dropped any predicate referencing it (predicate_for_reader=None) and the filter was silently lost -- every row was returned. Re-evaluate the dropped predicate after extraction, where the flat columns match the predicate fields: RawFileSplitRead (append-only / PK raw-convertible) wraps the extracted batches with FilterRecordBatchReader; MergeFileSplitRead (PK non raw-convertible) filters the extracted rows with FilterRecordReader, rewriting indices into the flat output. The predicate is trimmed to the projected columns first, so a filter on a non-projected column keeps the existing drop semantics instead of referencing a missing column.
1 parent b3c8c25 commit a3bb6a6

3 files changed

Lines changed: 99 additions & 0 deletions

File tree

paimon-python/pypaimon/read/split_read.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,22 @@ def create_reader(self) -> RecordReader:
724724
reader = NestedLeafBatchReader(
725725
reader, self.outer_extract_name_paths,
726726
self.outer_flat_read_type)
727+
# A predicate on a projected nested leaf cannot be pushed down:
728+
# its leaf path is absent from the widened top-level read
729+
# fields, so SplitRead.__init__ dropped it (predicate_for_reader
730+
# is None). Without re-applying it the filter is silently lost
731+
# and every row is returned. Re-evaluate it on the extracted
732+
# flat batches, whose column names match the predicate fields;
733+
# trim to the projected columns so a filter on a non-projected
734+
# column keeps the existing "dropped" semantics rather than
735+
# referencing a missing column.
736+
if self.predicate is not None and self.predicate_for_reader is None:
737+
flat_names = [f.name for f in self.outer_flat_read_type]
738+
trimmed = trim_predicate_by_fields(self.predicate, flat_names)
739+
if trimmed is not None:
740+
from pypaimon.read.reader.filter_record_batch_reader \
741+
import FilterRecordBatchReader
742+
reader = FilterRecordBatchReader(reader, trimmed)
727743
if self.limit is not None:
728744
reader = LimitedRecordBatchReader(reader, self.limit)
729745
return reader
@@ -743,6 +759,7 @@ def __init__(
743759
split: Split,
744760
row_tracking_enabled: bool,
745761
outer_extract_name_paths: Optional[List[List[str]]] = None,
762+
outer_flat_read_type: Optional[List[DataField]] = None,
746763
limit: Optional[int] = None):
747764
# Merge functions need full ROW sub-structures, so nested paths
748765
# are not pushed down here; sub-path extraction happens above
@@ -757,6 +774,7 @@ def __init__(
757774
limit=limit,
758775
)
759776
self.outer_extract_name_paths = outer_extract_name_paths
777+
self.outer_flat_read_type = outer_flat_read_type
760778
# Built once per split-read (value_fields and options are constant
761779
# for the object's life), not per section. ``None`` when
762780
# ``sequence.field`` is unset, in which case the heap falls back to
@@ -855,6 +873,21 @@ def create_reader(self) -> RecordReader:
855873
file_io=self.table.file_io,
856874
blob_field_indices=_blob_field_indices(inner_value_fields),
857875
vector_field_indices=_vector_field_indices(inner_value_fields))
876+
# A predicate on a projected nested leaf is not pushed down (its leaf
877+
# path is absent from the widened-to-full-ROW read fields, so it was
878+
# dropped in __init__). Without re-applying it after extraction the
879+
# filter is silently lost. Evaluate it on the extracted flat rows,
880+
# whose fields are outer_flat_read_type; trim to the projected
881+
# columns and rewrite indices into that flat row.
882+
if (self.predicate is not None and self.predicate_for_reader is None
883+
and self.outer_flat_read_type is not None):
884+
flat_names = [f.name for f in self.outer_flat_read_type]
885+
trimmed = trim_predicate_by_fields(self.predicate, flat_names)
886+
if trimmed is not None:
887+
reader = FilterRecordReader(
888+
reader,
889+
rewrite_predicate_indices(
890+
trimmed, self.outer_flat_read_type))
858891
if self.limit is not None:
859892
reader = LimitedRecordReader(reader, self.limit)
860893
return reader

paimon-python/pypaimon/read/table_read.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,8 @@ def _create_split_read(self, split: Split) -> SplitRead:
585585
split=split,
586586
row_tracking_enabled=False,
587587
outer_extract_name_paths=outer_extract_name_paths,
588+
outer_flat_read_type=(
589+
self.read_type if outer_extract_name_paths else None),
588590
limit=self.limit,
589591
)
590592
elif self.table.options.data_evolution_enabled():

paimon-python/pypaimon/tests/test_nested_projection_e2e.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,23 @@ def test_partitioned_table_with_nested_projection(self):
193193
[{'part': 'A', 'mv_latest_version': 100, 'val': 'x'},
194194
{'part': 'B', 'mv_latest_version': 200, 'val': 'y'}])
195195

196+
def test_filter_on_projected_nested_leaf(self):
197+
"""A predicate on a projected nested leaf must actually filter rows.
198+
The read widens the projection to the top-level struct, which drops
199+
the leaf predicate from push-down (its path is absent from the read
200+
fields); without re-applying it after the leaves are extracted, every
201+
row leaks through."""
202+
table = self._create_table('ao_nested_leaf_filter')
203+
rb = table.new_read_builder().with_projection(['id', 'mv.latest_version'])
204+
pred = rb.new_predicate_builder().greater_than('mv_latest_version', 150)
205+
rb = rb.with_filter(pred)
206+
got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
207+
got = sorted(got, key=lambda r: r['id'])
208+
self.assertEqual(
209+
got,
210+
[{'id': 2, 'mv_latest_version': 200},
211+
{'id': 3, 'mv_latest_version': 300}])
212+
196213
def test_avro_nested_projection_python_fallback(self):
197214
"""Avro has no native nested column pruning; the reader walks
198215
each fastavro record dict by path and assembles the column
@@ -245,11 +262,43 @@ def _create_pk_table(self, name: str, file_format: str = 'parquet'):
245262
w.close()
246263
return table
247264

265+
def _create_pk_raw_table(self, name: str, file_format: str = 'parquet'):
266+
"""Single commit keeps the split raw-convertible, so the read stays on
267+
the RawFileSplitRead fast path rather than the merge reader."""
268+
identifier = 'default.{}'.format(name)
269+
schema = Schema.from_pyarrow_schema(
270+
self.pa_schema,
271+
primary_keys=['id'],
272+
options={'bucket': '1', 'file.format': file_format},
273+
)
274+
self.catalog.create_table(identifier, schema, False)
275+
table = self.catalog.get_table(identifier)
276+
wb = table.new_batch_write_builder()
277+
w = wb.new_write()
278+
w.write_arrow(pa.Table.from_pylist(self.rows, schema=self.pa_schema))
279+
wb.new_commit().commit(w.prepare_commit())
280+
w.close()
281+
return table
282+
248283
def _read_arrow(self, table, projection):
249284
rb = table.new_read_builder().with_projection(projection)
250285
splits = rb.new_scan().plan().splits()
251286
return rb.new_read().to_arrow(splits)
252287

288+
def test_raw_convertible_filter_on_projected_nested_leaf(self):
289+
"""PK raw-convertible split also widens nested projection and so drops
290+
the leaf predicate from push-down. The filter must be re-applied on the
291+
extracted leaves; otherwise all rows are returned (reviewer repro)."""
292+
table = self._create_pk_raw_table('pk_raw_nested_leaf_filter')
293+
rb = table.new_read_builder().with_projection(['id', 'mv.latest_version'])
294+
pred = rb.new_predicate_builder().greater_than('mv_latest_version', 150)
295+
rb = rb.with_filter(pred)
296+
arrow = rb.new_read().to_arrow(rb.new_scan().plan().splits())
297+
rows = sorted(zip(
298+
arrow.column('id').to_pylist(),
299+
arrow.column('mv_latest_version').to_pylist()))
300+
self.assertEqual(rows, [(2, 200), (3, 300)])
301+
253302
def test_extracts_single_nested_leaf(self):
254303
table = self._create_pk_table('pk_nested_single')
255304
arrow = self._read_arrow(table, ['mv.latest_version'])
@@ -301,6 +350,21 @@ def test_dotted_top_level_field_kept(self):
301350
got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
302351
self.assertEqual(got, [{'id': 1, 'media.left': 'hello'}])
303352

353+
def test_merge_filter_on_projected_nested_leaf(self):
354+
"""Non-raw-convertible PK splits go through the merge reader, which
355+
widens the nested projection to the full ROW and so also drops the leaf
356+
predicate from push-down. The filter must be re-applied on the extracted
357+
leaves above the merge; otherwise all rows are returned."""
358+
table = self._create_pk_table('pk_merge_nested_leaf_filter')
359+
rb = table.new_read_builder().with_projection(['id', 'mv.latest_version'])
360+
pred = rb.new_predicate_builder().greater_than('mv_latest_version', 150)
361+
rb = rb.with_filter(pred)
362+
arrow = rb.new_read().to_arrow(rb.new_scan().plan().splits())
363+
rows = sorted(zip(
364+
arrow.column('id').to_pylist(),
365+
arrow.column('mv_latest_version').to_pylist()))
366+
self.assertEqual(rows, [(2, 200), (3, 300)])
367+
304368
def test_avro_extracts_single_nested_leaf(self):
305369
# Avro PK reads resolve DataFields through ``full_fields_map`` which
306370
# historically only covered merge-internal aliases; without the

0 commit comments

Comments
 (0)