Skip to content

Commit 6ac210b

Browse files
engleflyclaudeCopilot
authored
[improvement](fe) TopN lazy materialization support struct/variant nested column pruning (#63736)
Extend TopN lazy materialization to defer reading complex-type base columns (struct, variant, map, array) until after TopN filtering, and expand the scope to all non-trivial projection expressions. ## Core Changes **PullUpProjectExprUnderTopN** (new CustomRewriter): - Two-pass design: Collector walks the plan tree top-down to find qualifying TopNs, then walks into descendants (through Join/Filter) to find Projects with pull-able expressions. Replacer simplifies found Projects bottom-up and adds upper Projects to restore pulled-up expressions. - Eligible expressions: Alias with non-trivial child (not Slot/Literal), not referenced by TopN order keys, no NoneMovableFunction. - Excludes: CTE Producers (output mapping safety), Join/Filter conditions that reference pulled-up outputs (cleared/removed). **LazyMaterializeTopN** (simplified): - Expression pull-up moved from physical PlanPostProcessor to logical CustomRewriter, eliminating hard-coded `MERGE_SORT→Distribute→LOCAL_SORT→Project` shape walking. Now only handles MaterializeNode insertion. **OperativeColumnDerive**: - Skip PreferPushDownProject input slots from operative propagation so complex-type base columns can be lazy. **Other**: - `PhysicalLazyMaterialize`: propagate access paths to lazy output slots for nested column/subPath pruning on BE. - `MaterializationNode`/`PlanNode`: fix nested column display in EXPLAIN. - `NoneMovableFunction`: fix missing interface name. - Session variable `enable_topn_expr_pullup` for rollback. **Tests**: - `topn_expr_pullup`: 15 test cases covering struct/variant/map/array, non-PPD expressions, joins, column order preservation, negative cases. - `topn_lazy_nested_column_pruning`: 17 test cases for struct/variant nested pruning + map/array lazy mat + multi-level variant nesting. - Updated 48 shape_check .out files to reflect new plan shapes. ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent c27fd0b commit 6ac210b

90 files changed

Lines changed: 4770 additions & 1796 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@
125125
import org.apache.doris.nereids.rules.rewrite.PullUpCteAnchor;
126126
import org.apache.doris.nereids.rules.rewrite.PullUpJoinFromUnionAll;
127127
import org.apache.doris.nereids.rules.rewrite.PullUpProjectBetweenTopNAndAgg;
128+
import org.apache.doris.nereids.rules.rewrite.PullUpProjectExprUnderTopN;
128129
import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderApply;
129130
import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderLimit;
130131
import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderTopN;
@@ -718,7 +719,9 @@ public class Rewriter extends AbstractBatchJobExecutor {
718719
topDown(
719720
new PullUpProjectUnderTopN(),
720721
new PullUpProjectUnderLimit()
721-
)
722+
),
723+
custom(RuleType.PULL_UP_PROJECT_EXPR_UNDER_TOPN,
724+
PullUpProjectExprUnderTopN::new)
722725
),
723726
// TODO: these rules should be implementation rules, and generate alternative physical plans.
724727
topic("Table/Physical optimization",

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.doris.nereids.trees.expressions.Expression;
2424
import org.apache.doris.nereids.trees.plans.ObjectId;
2525
import org.apache.doris.nereids.trees.plans.algebra.TopN;
26+
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeOlapScan;
2627
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
2728
import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
2829
import org.apache.doris.planner.ScanNode;
@@ -71,13 +72,22 @@ public List<TopnFilter> getTopnFilters() {
7172
public void translateTarget(PhysicalRelation relation, ScanNode legacyScan,
7273
PlanTranslatorContext translatorContext) {
7374
for (TopnFilter filter : filters.values()) {
74-
if (filter.hasTargetRelation(relation)) {
75-
Expr expr = ExpressionTranslator.translate(filter.targets.get(relation), translatorContext);
76-
filter.legacyTargets.put(legacyScan, expr);
75+
translateTarget(filter, relation, legacyScan, translatorContext);
76+
if (relation instanceof PhysicalLazyMaterializeOlapScan) {
77+
translateTarget(filter, ((PhysicalLazyMaterializeOlapScan) relation).getScan(),
78+
legacyScan, translatorContext);
7779
}
7880
}
7981
}
8082

83+
private void translateTarget(TopnFilter filter, PhysicalRelation relation, ScanNode legacyScan,
84+
PlanTranslatorContext translatorContext) {
85+
if (filter.hasTargetRelation(relation)) {
86+
Expr expr = ExpressionTranslator.translate(filter.targets.get(relation), translatorContext);
87+
filter.legacyTargets.put(legacyScan, expr);
88+
}
89+
}
90+
8191
/**
8292
* translate topn-filter
8393
*/

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java

Lines changed: 122 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919

2020
import org.apache.doris.catalog.AggregateType;
2121
import org.apache.doris.catalog.Column;
22+
import org.apache.doris.catalog.KeysType;
23+
import org.apache.doris.catalog.OlapTable;
2224
import org.apache.doris.catalog.Type;
2325
import org.apache.doris.nereids.CascadesContext;
2426
import org.apache.doris.nereids.StatementContext;
2527
import org.apache.doris.nereids.processor.post.PlanPostProcessor;
2628
import org.apache.doris.nereids.processor.post.Validator;
29+
import org.apache.doris.nereids.trees.expressions.Alias;
30+
import org.apache.doris.nereids.trees.expressions.NamedExpression;
2731
import org.apache.doris.nereids.trees.expressions.Slot;
2832
import org.apache.doris.nereids.trees.expressions.SlotReference;
2933
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
@@ -53,25 +57,15 @@
5357
import java.util.Set;
5458

5559
/**
56-
* post rule to do lazy materialize
60+
* Post rule to insert MaterializeNode for TopN lazy materialization.
61+
* Expression pull-up is handled by PullUpProjectExprUnderTopN in the logical phase.
5762
*/
5863
public class LazyMaterializeTopN extends PlanPostProcessor {
59-
/* BE do not support pattern:
60-
union
61-
-->materialize
62-
-->topn
63-
-->scan1
64-
-->materialize
65-
-->topn
66-
-->scan2
67-
when we create materializeNode for the first union child, set hasMaterialized=true
68-
to avoid generating materializeNode for other union's children
69-
*/
7064
private static final Logger LOG = LogManager.getLogger(LazyMaterializeTopN.class);
7165
private boolean hasMaterialized = false;
7266

7367
@Override
74-
public Plan visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) {
68+
public Plan visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
7569
try {
7670
Plan result = computeTopN(topN, ctx);
7771
if (SessionVariable.isFeDebug()) {
@@ -85,36 +79,73 @@ public Plan visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) {
8579
}
8680
}
8781

88-
private Plan computeTopN(PhysicalTopN topN, CascadesContext ctx) {
82+
private Plan computeTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
8983
if (hasMaterialized) {
9084
return topN;
9185
}
9286
if (SessionVariable.getTopNLazyMaterializationThreshold() < topN.getLimit()) {
9387
return topN;
9488
}
95-
/*
96-
topn(output=[x] orderkey=[b])
97-
->project(a as x)
98-
->T(a, b)
99-
'x' can be lazy materialized.
100-
materializeMap: x->(T, a)
101-
*/
89+
try {
90+
List<Slot> userVisibleOutput = ImmutableList.copyOf(topN.getOutput());
91+
List<Slot> effectiveOutput = ImmutableList.copyOf(topN.getOutput());
92+
Plan result = doComputeTopN(topN, ctx, effectiveOutput);
93+
if (result == topN) {
94+
return topN;
95+
}
96+
result = new PhysicalProject(ImmutableList.copyOf(userVisibleOutput), null, result);
97+
return result;
98+
} catch (RuntimeException e) {
99+
LOG.warn("lazy materialize topn failed for plan: {}", topN.shapeInfo(), e);
100+
return topN;
101+
}
102+
}
103+
104+
private Plan doComputeTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx, List<Slot> effectiveOutput) {
102105
Map<Slot, MaterializeSource> materializeMap = new HashMap<>();
103106
List<Slot> materializedSlots = new ArrayList<>();
104-
// find the slots which can be lazy materialized
105-
for (Slot slot : topN.getOutput()) {
106-
Optional<MaterializeSource> source = computeMaterializeSource(topN, (SlotReference) slot);
107+
Set<Slot> requiredMaterializedSlots = new HashSet<>();
108+
collectProjectExprInputSlots(topN.child(), requiredMaterializedSlots);
109+
110+
/*
111+
* requiredMaterializedSlots only records slots consumed by Project/final-projection expressions inside the
112+
* TopN subtree. Other mandatory slots, such as TopN order keys or Filter predicates, are rejected by
113+
* MaterializeProbeVisitor while tracing each output slot from TopN down to the source relation:
114+
*
115+
* Project(b) -> TopN(order by id) -> Filter(a > 0) -> Scan(id, a, b, c)
116+
*
117+
* For id, the probe stops at TopN because id is in TopN.getInputSlots(); for a, it stops at Filter because
118+
* a is in Filter.getInputSlots(). Both return Optional.empty() and are appended to materializedSlots below.
119+
* Therefore an empty requiredMaterializedSlots set does not mean every scan column can be delayed; it only
120+
* means no extra Project/final-projection input must be forced materialized by this local safety check.
121+
*/
122+
for (Slot slot : effectiveOutput) {
123+
Optional<MaterializeSource> source = computeMaterializeSource(topN, (SlotReference) slot,
124+
requiredMaterializedSlots);
107125
if (source.isPresent()) {
108126
SlotReference baseSlot = source.get().baseSlot;
109-
if (source.get().baseSlot.hasSubColPath()) {
127+
if (source.get().baseSlot.hasSubColPath()
128+
|| source.get().baseSlot.getAllAccessPaths().isPresent()) {
110129
slot = baseSlot.withExprId(slot.getExprId());
111130
}
112131
materializeMap.put(slot, source.get());
113132
} else {
114133
materializedSlots.add(slot);
115134
}
116135
}
117-
// find out the slots which are worth doing lazy materialization
136+
List<Slot> requiredOutputSlots = new ArrayList<>();
137+
for (Map.Entry<Slot, MaterializeSource> entry : materializeMap.entrySet()) {
138+
if (requiredMaterializedSlots.contains(entry.getKey())
139+
|| requiredMaterializedSlots.contains(entry.getValue().baseSlot)) {
140+
requiredOutputSlots.add(entry.getKey());
141+
}
142+
}
143+
for (Slot slot : requiredOutputSlots) {
144+
if (materializeMap.remove(slot) != null) {
145+
materializedSlots.add(slot);
146+
}
147+
}
148+
118149
List<Slot> lazyMaterializeSlots = filterSlotsForLazyMaterialization(materializeMap);
119150
if (lazyMaterializeSlots.isEmpty()) {
120151
return topN;
@@ -127,12 +158,8 @@ private Plan computeTopN(PhysicalTopN topN, CascadesContext ctx) {
127158
}
128159

129160
Plan result = topN;
130-
List<Slot> originOutput = topN.getOutput();
131161
BiMap<Relation, SlotReference> relationToRowId = HashBiMap.create(relationToLazySlotMap.size());
132162
HashSet<SlotReference> rowIdSet = new HashSet<>();
133-
// we should use threadStatementContext, not ctx.getStatementContext(), because the StatisticsCleaner
134-
// will generate two statementContext, and reuse the plan which generated by outer StatementContext,
135-
// so we should generate exprId by the outer StatementContext, or else generate conflict expr id
136163
StatementContext threadStatementContext = StatementScopeIdGenerator.getStatementContext();
137164
for (Relation relation : relationToLazySlotMap.keySet()) {
138165
if (relation instanceof CatalogRelation) {
@@ -142,9 +169,9 @@ private Plan computeTopN(PhysicalTopN topN, CascadesContext ctx) {
142169
catalogRelation.getTable().getName() + ".global_row_id", false, Integer.MAX_VALUE);
143170
SlotReference rowIdSlot = SlotReference.fromColumn(threadStatementContext.getNextExprId(),
144171
catalogRelation.getTable(), rowIdCol, catalogRelation.getQualifier());
145-
result = result.accept(new LazySlotPruning(),
146-
new LazySlotPruning.Context((PhysicalCatalogRelation) relation,
147-
rowIdSlot, relationToLazySlotMap.get(relation)));
172+
result = result.accept(new LazySlotPruning(), new LazySlotPruning.Context(
173+
(PhysicalCatalogRelation) relation,
174+
rowIdSlot, relationToLazySlotMap.get(relation)));
148175
relationToRowId.put(catalogRelation, rowIdSlot);
149176
rowIdSet.add(rowIdSlot);
150177
} else if (relation instanceof PhysicalTVFRelation) {
@@ -154,47 +181,33 @@ private Plan computeTopN(PhysicalTopN topN, CascadesContext ctx) {
154181
tvfRelation.getFunction().getName() + ".global_row_id", false, Integer.MAX_VALUE);
155182
SlotReference rowIdSlot = SlotReference.fromColumn(threadStatementContext.getNextExprId(),
156183
tvfRelation.getFunction().getTable(), rowIdCol, ImmutableList.of());
157-
result = result.accept(new LazySlotPruning(),
158-
new LazySlotPruning.Context((PhysicalTVFRelation) relation,
159-
rowIdSlot, relationToLazySlotMap.get(relation)));
184+
result = result.accept(new LazySlotPruning(), new LazySlotPruning.Context(
185+
(PhysicalTVFRelation) relation,
186+
rowIdSlot, relationToLazySlotMap.get(relation)));
160187
relationToRowId.put(tvfRelation, rowIdSlot);
161188
rowIdSet.add(rowIdSlot);
162189
} else {
163-
// should not reach here.
164190
throw new RuntimeException("LazyMaterializeTopN not support this relation." + relation);
165191
}
166192
}
167193

168-
// materialize.child.output requires
169-
// rowId only appears once.
170-
// that is [a, rowId1, b rowId1] is not acceptable
171194
List<SlotReference> materializeInput = moveRowIdsToTail(result.getOutput(), rowIdSet);
172195

173196
if (materializeInput == null) {
174-
/*
175-
topn
176-
-->any
177-
=>
178-
project
179-
-->materialize
180-
-->topn
181-
-->any
182-
*/
197+
// Row IDs are already at the tail in the correct order.
198+
// Keep materialized slots in the same order as the child tuple layout.
199+
List<Slot> reOrderedMaterializedSlots = new ArrayList<>();
200+
for (Slot slot : result.getOutput()) {
201+
if (rowIdSet.contains(slot)) {
202+
break;
203+
}
204+
reOrderedMaterializedSlots.add(slot);
205+
}
183206
result = new PhysicalLazyMaterialize(result, result.getOutput(),
184-
materializedSlots, relationToLazySlotMap, relationToRowId, materializeMap,
207+
reOrderedMaterializedSlots, relationToLazySlotMap, relationToRowId, materializeMap,
185208
null, ((AbstractPlan) result).getStats());
186209
hasMaterialized = true;
187210
} else {
188-
/*
189-
topn
190-
-->any
191-
=>
192-
project
193-
-->materialize
194-
-->project
195-
-->topn
196-
-->any
197-
*/
198211
List<Slot> reOrderedMaterializedSlots = new ArrayList<>();
199212
for (Slot slot : materializeInput) {
200213
if (rowIdSet.contains(slot)) {
@@ -208,14 +221,54 @@ private Plan computeTopN(PhysicalTopN topN, CascadesContext ctx) {
208221
null, ((AbstractPlan) result).getStats());
209222
hasMaterialized = true;
210223
}
211-
result = new PhysicalProject(originOutput, null, result);
212224
return result;
213225
}
214226

215-
/*
216-
[a, r1, r2, b, r2] => [a, b, r1, r2]
217-
move all rowIds to tail, and remove duplicated rowIds
218-
*/
227+
private void collectProjectExprInputSlots(Plan plan, Set<Slot> requiredMaterializedSlots) {
228+
if (plan instanceof PhysicalProject) {
229+
PhysicalProject<?> project = (PhysicalProject<?>) plan;
230+
for (NamedExpression projectExpr : project.getProjects()) {
231+
if (projectExpr instanceof SlotReference) {
232+
continue;
233+
}
234+
if (projectExpr instanceof Alias && ((Alias) projectExpr).child() instanceof SlotReference) {
235+
SlotReference childSlot = (SlotReference) ((Alias) projectExpr).child();
236+
if (!childSlot.getOriginalColumn().isPresent()) {
237+
requiredMaterializedSlots.addAll(project.getInputSlots());
238+
}
239+
continue;
240+
}
241+
requiredMaterializedSlots.addAll(projectExpr.getInputSlots());
242+
}
243+
} else if (plan instanceof PhysicalCatalogRelation) {
244+
PhysicalCatalogRelation relation = (PhysicalCatalogRelation) plan;
245+
if (relation.getTable() instanceof OlapTable) {
246+
OlapTable table = (OlapTable) relation.getTable();
247+
if (KeysType.UNIQUE_KEYS.equals(table.getKeysType())
248+
&& !table.getTableProperty().getEnableUniqueKeyMergeOnWrite()
249+
|| KeysType.AGG_KEYS.equals(table.getKeysType())
250+
|| KeysType.PRIMARY_KEYS.equals(table.getKeysType())) {
251+
for (Slot slot : relation.getOutput()) {
252+
SlotReference slotReference = (SlotReference) slot;
253+
if (slotReference.getOriginalColumn().isPresent()
254+
&& slotReference.getOriginalColumn().get().isKey()) {
255+
requiredMaterializedSlots.add(slotReference);
256+
}
257+
}
258+
}
259+
}
260+
for (Slot slot : plan.getOutput()) {
261+
if (slot instanceof SlotReference && !((SlotReference) slot).getOriginalColumn().isPresent()) {
262+
requiredMaterializedSlots.addAll(plan.getOutputSet());
263+
break;
264+
}
265+
}
266+
}
267+
for (Plan child : plan.children()) {
268+
collectProjectExprInputSlots(child, requiredMaterializedSlots);
269+
}
270+
}
271+
219272
private List<SlotReference> moveRowIdsToTail(List<Slot> slots, Set<SlotReference> rowIds) {
220273
List<SlotReference> reArrangedSlots = new ArrayList<>();
221274
List<SlotReference> reArrangedRowIds = new ArrayList<>();
@@ -245,10 +298,11 @@ private List<Slot> filterSlotsForLazyMaterialization(Map<Slot, MaterializeSource
245298
return new ArrayList<>(materializeMap.keySet());
246299
}
247300

248-
private Optional<MaterializeSource> computeMaterializeSource(PhysicalTopN topN, SlotReference slot) {
301+
private Optional<MaterializeSource> computeMaterializeSource(PhysicalTopN<? extends Plan> topN, SlotReference slot,
302+
Set<Slot> requiredMaterializedSlots) {
249303
MaterializeProbeVisitor probe = new MaterializeProbeVisitor();
250-
MaterializeProbeVisitor.ProbeContext context = new MaterializeProbeVisitor.ProbeContext(slot);
304+
MaterializeProbeVisitor.ProbeContext context = new MaterializeProbeVisitor.ProbeContext(slot,
305+
requiredMaterializedSlots);
251306
return probe.visit(topN, context);
252307
}
253-
254308
}

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,23 @@ public void updateRowIdSlot(SlotReference rowIdSlot) {
8080
}
8181
}
8282

83-
@Override
83+
/**
84+
* Whether the given child should be pruned. Default checks if the child's
85+
* output contains all lazy slots. Override to bypass when logical properties
86+
* are stale after plan restructuring.
87+
*/
88+
protected boolean shouldPruneChild(Plan child, Context context) {
89+
return child.getOutput().containsAll(context.lazySlots);
90+
}
91+
92+
/**
93+
* visit
94+
*/
8495
public Plan visit(Plan plan, Context context) {
8596
ImmutableList.Builder<Plan> newChildren = ImmutableList.builderWithExpectedSize(plan.arity());
8697
boolean hasNewChildren = false;
8798
for (Plan child : plan.children()) {
88-
if (child.getOutput().containsAll(context.lazySlots)) {
99+
if (shouldPruneChild(child, context)) {
89100
Plan newChild = child.accept(this, context);
90101
if (newChild != child) {
91102
hasNewChildren = true;

0 commit comments

Comments
 (0)