Skip to content

Commit e4a18a0

Browse files
authored
[multistage] add lookup join support to physical optimizer (#18158)
* add lookup join support to physical optimizer * add lookup join explain plan tests * add more golden explain plan tests
1 parent 77ecc4c commit e4a18a0

11 files changed

Lines changed: 1001 additions & 38 deletions

File tree

pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.common.annotations.VisibleForTesting;
2222
import com.google.common.base.Preconditions;
2323
import java.util.ArrayList;
24-
import java.util.Collections;
2524
import java.util.List;
2625
import java.util.function.Supplier;
2726
import org.apache.calcite.plan.RelTraitSet;
@@ -43,9 +42,7 @@
4342
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
4443
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAsOfJoin;
4544
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalJoin;
46-
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalProject;
4745
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
48-
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan;
4946
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalWindow;
5047

5148

@@ -134,9 +131,11 @@ RelNode assignSort(PhysicalSort sort) {
134131
*/
135132
@VisibleForTesting
136133
RelNode assignJoin(Join join) {
137-
// Case-1: Handle lookup joins.
134+
// Case-1: Lookup joins — no distribution traits needed. LookupJoinRule (post-pass) handles
135+
// fragment isolation by converting the right exchange to LOOKUP_LOCAL_EXCHANGE, ensuring the
136+
// left has an exchange, and wrapping the join with IDENTITY_EXCHANGE above.
138137
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) {
139-
return assignLookupJoin(join);
138+
return join;
140139
}
141140
// Case-2: Handle dynamic filter for semi joins.
142141
JoinInfo joinInfo = join.analyzeCondition();
@@ -257,28 +256,6 @@ RelNode assignWindow(PhysicalWindow window) {
257256
return window.copy(window.getTraitSet(), List.of(input));
258257
}
259258

260-
private RelNode assignLookupJoin(Join join) {
261-
/*
262-
* Lookup join expects right input to have project and table-scan nodes exactly. Moreover, lookup join is used
263-
* with Dimension tables only. Given this, we expect the entire right input to be available in all workers
264-
* selected for the left input. For now, we will assign broadcast trait to the entire right input. Worker
265-
* assignment will have to handle this explicitly regardless.
266-
*/
267-
RelNode leftInput = join.getInputs().get(0);
268-
RelNode rightInput = join.getInputs().get(1);
269-
Preconditions.checkState(rightInput instanceof PhysicalProject, "Expected project as right input of table scan");
270-
Preconditions.checkState(rightInput.getInput(0) instanceof PhysicalTableScan,
271-
"Expected table scan under project for right input of lookup join");
272-
PhysicalProject oldProject = (PhysicalProject) rightInput;
273-
PhysicalTableScan oldTableScan = (PhysicalTableScan) oldProject.getInput(0);
274-
PhysicalTableScan newTableScan =
275-
(PhysicalTableScan) oldTableScan.copy(oldTableScan.getTraitSet().plus(
276-
RelDistributions.BROADCAST_DISTRIBUTED), Collections.emptyList());
277-
PhysicalProject newProject =
278-
(PhysicalProject) oldProject.copy(oldProject.getTraitSet().plus(RelDistributions.BROADCAST_DISTRIBUTED),
279-
List.of(newTableScan));
280-
return join.copy(join.getTraitSet(), List.of(leftInput, newProject));
281-
}
282259

283260
@SuppressWarnings("unused")
284261
private RelNode assignDynamicFilterSemiJoin(PhysicalJoin join) {

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,13 @@ public enum ExchangeStrategy {
6868
/**
6969
* Records are sent randomly from a given worker in the sender to some worker in the receiver.
7070
*/
71-
RANDOM_EXCHANGE(false);
71+
RANDOM_EXCHANGE(false),
72+
/**
73+
* Pseudo-exchange for lookup join right side. The dim table stays in the same plan fragment as
74+
* the join (no fragment split). Inserted by {@code LookupJoinRule} after worker/exchange assignment;
75+
* handled transparently by {@code PlanFragmentAndMailboxAssignment.processLookupLocalExchange}.
76+
*/
77+
LOOKUP_LOCAL_EXCHANGE(false);
7278

7379
/**
7480
* This is true when the Exchange Strategy is such that it requires a List<Integer> representing the
@@ -110,6 +116,12 @@ public static RelDistribution getRelDistribution(ExchangeStrategy exchangeStrate
110116
return RelDistributions.ROUND_ROBIN_DISTRIBUTED;
111117
case RANDOM_EXCHANGE:
112118
return RelDistributions.RANDOM_DISTRIBUTED;
119+
case LOOKUP_LOCAL_EXCHANGE:
120+
// LOOKUP_LOCAL is a pseudo-exchange (no fragment split, no mailbox). This mapping is only used
121+
// by the PhysicalExchange constructor to satisfy the Calcite Exchange superclass — it has no
122+
// runtime significance since PlanFragmentAndMailboxAssignment handles LOOKUP_LOCAL transparently.
123+
// Uses RANDOM_DISTRIBUTED as placeholder (Calcite's Exchange constructor rejects ANY).
124+
return RelDistributions.RANDOM_DISTRIBUTED;
113125
default:
114126
throw new IllegalStateException(String.format("Unexpected exchange strategy: %s", exchangeStrategy));
115127
}

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.pinot.query.routing.MailboxInfos;
4545
import org.apache.pinot.query.routing.QueryServerInstance;
4646
import org.apache.pinot.query.routing.SharedMailboxInfos;
47+
import org.apache.pinot.spi.config.table.TableType;
4748

4849

4950
/**
@@ -87,11 +88,15 @@ private void process(PRelNode pRelNode, @Nullable PlanNode parent, int currentFr
8788
processTableScan((PhysicalTableScan) pRelNode.unwrap(), currentFragmentId, context);
8889
}
8990
if (pRelNode.unwrap() instanceof PhysicalExchange) {
91+
PhysicalExchange physicalExchange = (PhysicalExchange) pRelNode.unwrap();
92+
if (physicalExchange.getExchangeStrategy() == ExchangeStrategy.LOOKUP_LOCAL_EXCHANGE) {
93+
processLookupLocalExchange(pRelNode, parent, currentFragmentId, context);
94+
return;
95+
}
9096
// Split an exchange into two fragments: one for the sender and one for the receiver.
9197
// The sender fragment will have a MailboxSendNode and receiver a MailboxReceiveNode.
9298
// It is possible that the receiver fragment doesn't exist yet (e.g. when PhysicalExchange is the root node).
9399
// In that case, we also create it here. If it exists already, we simply re-use it.
94-
PhysicalExchange physicalExchange = (PhysicalExchange) pRelNode.unwrap();
95100
PlanFragment receiverFragment = context._planFragmentMap.get(currentFragmentId);
96101
int senderFragmentId = context._planFragmentMap.size() + (receiverFragment == null ? 1 : 0);
97102
final DataSchema inputFragmentSchema = PRelToPlanNodeConverter.toDataSchema(
@@ -173,6 +178,77 @@ private void processTableScan(PhysicalTableScan tableScan, int currentFragmentId
173178
}
174179
}
175180

181+
/**
182+
* Handles LOOKUP_LOCAL_EXCHANGE: a pseudo-exchange that does NOT split fragments. The dim table
183+
* stays in the join's fragment. This method:
184+
* <ol>
185+
* <li>Registers the dim table name so the fragment is classified as a leaf stage</li>
186+
* <li>Sets fake empty segments per worker (the dim table is accessed via
187+
* {@code DimensionTableDataManager} at runtime, not via segment routing)</li>
188+
* <li>Converts children to PlanNodes in the same fragment (no MailboxSend/Receive)</li>
189+
* </ol>
190+
* This matches V1's behavior in {@code WorkerManager.assignWorkersToNonRootFragment} where
191+
* lookup joins are detected and the dim table is registered with empty segments.
192+
*/
193+
private void processLookupLocalExchange(PRelNode pRelNode, @Nullable PlanNode parent, int currentFragmentId,
194+
Context context) {
195+
// Find the dim table scan in the exchange's children and register it with empty segments.
196+
DispatchablePlanMetadata fragmentMetadata = context._fragmentMetadataMap.get(currentFragmentId);
197+
for (PRelNode child : pRelNode.getPRelInputs()) {
198+
registerDimTableInFragment(child, fragmentMetadata);
199+
}
200+
// Process children in the same fragment (no MailboxSend/Receive), but skip processTableScan
201+
// by converting PRelNodes to PlanNodes directly. The right side of a lookup join is always
202+
// [Project →] TableScan (at most 2 levels deep) — Calcite pushes dim-side filters to post-join.
203+
for (PRelNode child : pRelNode.getPRelInputs()) {
204+
PlanNode planNode = PRelToPlanNodeConverter.toPlanNode(child, currentFragmentId);
205+
for (PRelNode grandChild : child.getPRelInputs()) {
206+
Preconditions.checkState(grandChild.getPRelInputs().isEmpty(),
207+
"LOOKUP_LOCAL_EXCHANGE right side deeper than 2 levels: found children under %s. "
208+
+ "Expected [Project →] TableScan only.", grandChild.unwrap().getClass().getSimpleName());
209+
PlanNode grandChildNode = PRelToPlanNodeConverter.toPlanNode(grandChild, currentFragmentId);
210+
planNode.getInputs().add(grandChildNode);
211+
}
212+
if (parent != null) {
213+
parent.getInputs().add(planNode);
214+
}
215+
}
216+
}
217+
218+
/**
219+
* Recursively find TableScan nodes and register the dim table name in the fragment metadata with
220+
* fake empty segments per worker, matching V1's {@code WorkerManager.assignWorkersToNonRootFragment}
221+
* behavior for lookup joins.
222+
*/
223+
private void registerDimTableInFragment(PRelNode pRelNode, DispatchablePlanMetadata fragmentMetadata) {
224+
if (pRelNode.unwrap() instanceof TableScan) {
225+
PhysicalTableScan tableScan = (PhysicalTableScan) pRelNode.unwrap();
226+
TableScanMetadata tableScanMetadata = Objects.requireNonNull(tableScan.getTableScanMetadata(),
227+
"No metadata in table scan PRelNode");
228+
String tableName = tableScanMetadata.getScannedTables().stream().findFirst().orElseThrow();
229+
fragmentMetadata.addScannedTable(tableName);
230+
// Set fake empty segments for each worker so isLeafStageWorker() returns true.
231+
// The actual dim table data comes from DimensionTableDataManager at runtime.
232+
// Use putIfAbsent rather than overwrite to be defensive if called multiple times.
233+
Map<Integer, QueryServerInstance> workers = fragmentMetadata.getWorkerIdToServerInstanceMap();
234+
if (workers != null) {
235+
Map<Integer, Map<String, List<String>>> existing = fragmentMetadata.getWorkerIdToSegmentsMap();
236+
Map<Integer, Map<String, List<String>>> fakeSegmentsMap =
237+
existing != null ? new HashMap<>(existing) : new HashMap<>();
238+
for (Integer workerId : workers.keySet()) {
239+
fakeSegmentsMap.putIfAbsent(workerId, Map.of(TableType.OFFLINE.name(), List.of()));
240+
}
241+
fragmentMetadata.setWorkerIdToSegmentsMap(fakeSegmentsMap);
242+
}
243+
NodeHint nodeHint = NodeHint.fromRelHints(tableScan.getHints());
244+
fragmentMetadata.setTableOptions(nodeHint.getHintOptions().get(PinotHintOptions.TABLE_HINT_OPTIONS));
245+
return;
246+
}
247+
for (PRelNode child : pRelNode.getPRelInputs()) {
248+
registerDimTableInFragment(child, fragmentMetadata);
249+
}
250+
}
251+
176252
private PlanFragment createFragment(int fragmentId, PlanNode planNode, List<PlanFragment> inputFragments,
177253
Context context, List<String> workers) {
178254
// track new plan fragment
@@ -248,6 +324,9 @@ private void computeMailboxInfos(int senderStageId, int receiverStageId,
248324
}
249325
break;
250326
}
327+
case LOOKUP_LOCAL_EXCHANGE:
328+
throw new IllegalStateException("LOOKUP_LOCAL_EXCHANGE should not reach computeMailboxInfos — "
329+
+ "it must be handled as transparent in process() before fragment splitting");
251330
default:
252331
throw new UnsupportedOperationException("exchange desc not supported yet: " + exchangeDesc);
253332
}

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule;
2929
import org.apache.pinot.query.planner.physical.v2.opt.rules.LiteModeSortInsertRule;
3030
import org.apache.pinot.query.planner.physical.v2.opt.rules.LiteModeWorkerAssignmentRule;
31+
import org.apache.pinot.query.planner.physical.v2.opt.rules.LookupJoinRule;
3132
import org.apache.pinot.query.planner.physical.v2.opt.rules.RootExchangeInsertRule;
3233
import org.apache.pinot.query.planner.physical.v2.opt.rules.SortPushdownRule;
3334
import org.apache.pinot.query.planner.physical.v2.opt.rules.WorkerExchangeAssignmentRule;
@@ -44,6 +45,7 @@ public static List<PRelNodeTransformer> create(PhysicalPlannerContext context, T
4445
context));
4546
transformers.add(create(new LeafStageAggregateRule(context), RuleExecutors.Type.POST_ORDER, context));
4647
transformers.add(createWorkerAssignmentRule(context));
48+
transformers.add(new LookupJoinRule(context));
4749
transformers.add(create(new AggregatePushdownRule(context), RuleExecutors.Type.POST_ORDER, context));
4850
transformers.add(create(new SortPushdownRule(context), RuleExecutors.Type.POST_ORDER, context));
4951
if (context.isUseLiteMode()) {

0 commit comments

Comments
 (0)