Skip to content

Commit ec23de3

Browse files
schenksjclaudembutrovichandygrove
authored
perf: O(1) PlanDataInjector lookup by op kind (#4535)
* perf: O(1) PlanDataInjector lookup by op kind PlanDataInjector.injectPlanData walked every operator in the tree against every registered injector (`for (injector <- injectors if injector.canInject(op))`) -- N operators x M injectors canInject calls -- even though most operators in any tree are non-scan and match no injector. Add `opStructCase` to the PlanDataInjector trait and key a Map[OpStructCase, PlanDataInjector]. Look up by op.getOpStructCase (O(1)) then a single canInject confirm; non-scan operators skip the iteration entirely. Pure performance change -- no behavior difference. Closes #4530 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * ci: register PlanDataInjectorSuite in PR build workflows check-suites.py requires every *Suite.scala to appear in both pr_build_linux.yml and pr_build_macos.yml. Add the new PlanDataInjectorSuite alongside its sibling org.apache.spark.sql.comet suites. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com> Co-authored-by: Matt Butrovich <mbutrovich@users.noreply.github.com> Co-authored-by: Andy Grove <agrove@apache.org>
1 parent dc10059 commit ec23de3

4 files changed

Lines changed: 90 additions & 15 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ jobs:
356356
org.apache.spark.sql.comet.CometTaskMetricsSuite
357357
org.apache.spark.sql.comet.CometDppFallbackRepro3949Suite
358358
org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite
359+
org.apache.spark.sql.comet.PlanDataInjectorSuite
359360
org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite
360361
org.apache.spark.sql.comet.util.UtilsSuite
361362
org.apache.comet.objectstore.NativeConfigSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ jobs:
172172
org.apache.spark.sql.comet.CometTaskMetricsSuite
173173
org.apache.spark.sql.comet.CometDppFallbackRepro3949Suite
174174
org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite
175+
org.apache.spark.sql.comet.PlanDataInjectorSuite
175176
org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite
176177
org.apache.spark.sql.comet.util.UtilsSuite
177178
org.apache.comet.objectstore.NativeConfigSuite

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ import org.apache.comet.serde.operator.CometSink
6868
*/
6969
private[comet] trait PlanDataInjector {
7070

71+
/**
72+
* Which `OpStructCase` this injector handles. Used by `injectPlanData` for an O(1) pre-filter
73+
* so we don't run every injector's `canInject` against every operator in the tree.
74+
*/
75+
def opStructCase: Operator.OpStructCase
76+
7177
/** Check if this injector can handle the given operator. */
7278
def canInject(op: Operator): Boolean
7379

@@ -90,6 +96,13 @@ private[comet] object PlanDataInjector {
9096
// Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc.
9197
)
9298

99+
// O(1) lookup by op kind: most operators in any tree don't match any injector, so the per-op
100+
// `for (injector <- injectors if injector.canInject(op))` walk was paying N*M canInject calls
101+
// (N operators, M injectors) just to find no match. Keying by OpStructCase lets us skip the
102+
// iteration entirely for non-scan operators.
103+
private val injectorsByKind: Map[Operator.OpStructCase, PlanDataInjector] =
104+
injectors.map(i => i.opStructCase -> i).toMap
105+
93106
/**
94107
* Injects planning data into an Operator tree by finding nodes that need injection and applying
95108
* the appropriate injector.
@@ -103,21 +116,24 @@ private[comet] object PlanDataInjector {
103116
partitionByKey: Map[String, Array[Byte]]): Operator = {
104117
val builder = op.toBuilder
105118

106-
// Try each injector to see if it can handle this operator
107-
for (injector <- injectors if injector.canInject(op)) {
108-
injector.getKey(op) match {
109-
case Some(key) =>
110-
(commonByKey.get(key), partitionByKey.get(key)) match {
111-
case (Some(commonBytes), Some(partitionBytes)) =>
112-
val injectedOp = injector.inject(op, commonBytes, partitionBytes)
113-
// Copy the injected operator's fields to our builder
114-
builder.clear()
115-
builder.mergeFrom(injectedOp)
116-
case _ =>
117-
throw new CometRuntimeException(s"Missing planning data for key: $key")
118-
}
119-
case None =>
120-
}
119+
// O(1) by op kind, then a canInject confirm (which may inspect detail fields like `hasCommon`
120+
// / `!hasFilePartition`). Most operators in any tree are non-scan and skip the lookup body.
121+
injectorsByKind.get(op.getOpStructCase) match {
122+
case Some(injector) if injector.canInject(op) =>
123+
injector.getKey(op) match {
124+
case Some(key) =>
125+
(commonByKey.get(key), partitionByKey.get(key)) match {
126+
case (Some(commonBytes), Some(partitionBytes)) =>
127+
val injectedOp = injector.inject(op, commonBytes, partitionBytes)
128+
// Copy the injected operator's fields to our builder
129+
builder.clear()
130+
builder.mergeFrom(injectedOp)
131+
case _ =>
132+
throw new CometRuntimeException(s"Missing planning data for key: $key")
133+
}
134+
case None =>
135+
}
136+
case _ =>
121137
}
122138

123139
// Recursively process children
@@ -161,6 +177,8 @@ private[comet] object IcebergPlanDataInjector extends PlanDataInjector {
161177
}
162178
})
163179

180+
override val opStructCase: Operator.OpStructCase = Operator.OpStructCase.ICEBERG_SCAN
181+
164182
override def canInject(op: Operator): Boolean =
165183
op.hasIcebergScan &&
166184
op.getIcebergScan.getFileScanTasksCount == 0 &&
@@ -200,6 +218,8 @@ private[comet] object IcebergPlanDataInjector extends PlanDataInjector {
200218
*/
201219
private[comet] object NativeScanPlanDataInjector extends PlanDataInjector {
202220

221+
override val opStructCase: Operator.OpStructCase = Operator.OpStructCase.NATIVE_SCAN
222+
203223
override def canInject(op: Operator): Boolean =
204224
op.hasNativeScan &&
205225
op.getNativeScan.hasCommon &&
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet
21+
22+
import org.scalatest.funsuite.AnyFunSuite
23+
24+
import org.apache.comet.serde.OperatorOuterClass.Operator
25+
26+
class PlanDataInjectorSuite extends AnyFunSuite {
27+
28+
test("injectPlanData leaves a non-scan operator tree unchanged") {
29+
// An operator with no injectable scan (here, an empty op_struct, but the same holds for
30+
// Filter/Projection/etc.) must pass through untouched. This exercises the O(1)
31+
// injectorsByKind miss path (`case _ =>`) that replaced the per-injector canInject walk.
32+
val child = Operator.newBuilder().setPlanId(2).build()
33+
val root = Operator.newBuilder().setPlanId(1).addChildren(child).build()
34+
35+
val result = PlanDataInjector.injectPlanData(root, Map.empty, Map.empty)
36+
37+
assert(result == root, "non-scan operator tree should be returned unchanged")
38+
}
39+
40+
test("each registered injector is reachable by its opStructCase") {
41+
// The O(1) lookup keys injectors by opStructCase, so two injectors sharing a kind would
42+
// silently shadow one another in the map. Guard that every registered injector resolves back
43+
// to itself via its declared opStructCase (i.e. the kinds are distinct and the map is complete).
44+
val injectors = Seq(IcebergPlanDataInjector, NativeScanPlanDataInjector)
45+
val byKind = injectors.map(i => i.opStructCase -> i).toMap
46+
assert(byKind.size == injectors.size, "injectors must have distinct opStructCase keys")
47+
injectors.foreach { i =>
48+
assert(byKind(i.opStructCase) eq i)
49+
}
50+
assert(IcebergPlanDataInjector.opStructCase == Operator.OpStructCase.ICEBERG_SCAN)
51+
assert(NativeScanPlanDataInjector.opStructCase == Operator.OpStructCase.NATIVE_SCAN)
52+
}
53+
}

0 commit comments

Comments
 (0)