Skip to content

Commit e79183e

Browse files
authored
refactor: rename withInfo to withFallbackReason for clarity (#4508)
1 parent 9cb4927 commit e79183e

46 files changed

Lines changed: 450 additions & 419 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -290,21 +290,22 @@ object CometSparkSessionExtensions extends Logging {
290290
* @return
291291
* `node` with fallback reasons attached (as a side effect on its tag map).
292292
*/
293-
def withInfo[T <: TreeNode[_]](node: T, info: String, exprs: T*): T = {
293+
def withFallbackReason[T <: TreeNode[_]](node: T, info: String, exprs: T*): T = {
294294
// support existing approach of passing in multiple infos in a newline-delimited string
295295
val infoSet = if (info == null || info.isEmpty) {
296296
Set.empty[String]
297297
} else {
298298
info.split("\n").toSet
299299
}
300-
withInfos(node, infoSet, exprs: _*)
300+
withFallbackReasons(node, infoSet, exprs: _*)
301301
}
302302

303303
/**
304304
* Record one or more fallback reasons on a `TreeNode` and roll up reasons from any child nodes.
305-
* This is the set-valued form of [[withInfo]]; see that overload for the full contract.
305+
* This is the set-valued form of [[withFallbackReason]]; see that overload for the full
306+
* contract.
306307
*
307-
* Reasons are accumulated (never overwritten) on the node's `EXTENSION_INFO` tag and are
308+
* Reasons are accumulated (never overwritten) on the node's `FALLBACK_REASONS` tag and are
308309
* surfaced in extended explain output. When `COMET_LOG_FALLBACK_REASONS` is enabled, each new
309310
* reason is also emitted as a warning.
310311
*
@@ -320,16 +321,16 @@ object CometSparkSessionExtensions extends Logging {
320321
* @return
321322
* `node` with fallback reasons attached (as a side effect on its tag map).
322323
*/
323-
def withInfos[T <: TreeNode[_]](node: T, info: Set[String], exprs: T*): T = {
324+
def withFallbackReasons[T <: TreeNode[_]](node: T, info: Set[String], exprs: T*): T = {
324325
if (CometConf.COMET_LOG_FALLBACK_REASONS.get()) {
325326
for (reason <- info) {
326327
logWarning(s"Comet cannot accelerate ${node.getClass.getSimpleName} because: $reason")
327328
}
328329
}
329-
val existingNodeInfos = node.getTagValue(CometExplainInfo.EXTENSION_INFO)
330+
val existingNodeInfos = node.getTagValue(CometExplainInfo.FALLBACK_REASONS)
330331
val newNodeInfo = (existingNodeInfos ++ exprs
331-
.flatMap(_.getTagValue(CometExplainInfo.EXTENSION_INFO))).flatten.toSet
332-
node.setTagValue(CometExplainInfo.EXTENSION_INFO, newNodeInfo ++ info)
332+
.flatMap(_.getTagValue(CometExplainInfo.FALLBACK_REASONS))).flatten.toSet
333+
node.setTagValue(CometExplainInfo.FALLBACK_REASONS, newNodeInfo ++ info)
333334
node
334335
}
335336

@@ -347,17 +348,17 @@ object CometSparkSessionExtensions extends Logging {
347348
* @return
348349
* `node` with the rolled-up reasons attached (as a side effect on its tag map).
349350
*/
350-
def withInfo[T <: TreeNode[_]](node: T, exprs: T*): T = {
351-
withInfos(node, Set.empty, exprs: _*)
351+
def withFallbackReason[T <: TreeNode[_]](node: T, exprs: T*): T = {
352+
withFallbackReasons(node, Set.empty, exprs: _*)
352353
}
353354

354355
/**
355-
* True if any fallback reason has been recorded on `node` (via [[withInfo]] / [[withInfos]]).
356-
* Callers that need to short-circuit when a prior rule pass has already decided a node falls
357-
* back can use this as the sticky signal.
356+
* True if any fallback reason has been recorded on `node` (via [[withFallbackReason]] /
357+
* [[withFallbackReasons]]). Callers that need to short-circuit when a prior rule pass has
358+
* already decided a node falls back can use this as the sticky signal.
358359
*/
359-
def hasExplainInfo(node: TreeNode[_]): Boolean = {
360-
node.getTagValue(CometExplainInfo.EXTENSION_INFO).exists(_.nonEmpty)
360+
def hasFallbackReason(node: TreeNode[_]): Boolean = {
361+
node.getTagValue(CometExplainInfo.FALLBACK_REASONS).exists(_.nonEmpty)
361362
}
362363

363364
}

spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,17 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
5050
}
5151

5252
def getFallbackReasons(plan: SparkPlan): Seq[String] = {
53-
extensionInfo(plan).toSeq.sorted
53+
fallbackReasons(plan).toSeq.sorted
5454
}
5555

56-
private[comet] def extensionInfo(node: TreeNode[_]): Set[String] = {
56+
private[comet] def fallbackReasons(node: TreeNode[_]): Set[String] = {
5757
var info = mutable.Seq[String]()
5858
val sorted = sortup(node)
5959
sorted.foreach { p =>
6060
val all: Set[String] =
61-
getActualPlan(p).getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse(Set.empty[String])
61+
getActualPlan(p)
62+
.getTagValue(CometExplainInfo.FALLBACK_REASONS)
63+
.getOrElse(Set.empty[String])
6264
for (s <- all) {
6365
info = info :+ s
6466
}
@@ -120,7 +122,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
120122
outString.append(if (lastChildren.last) "+- " else ":- ")
121123
}
122124

123-
val tagValue = node.getTagValue(CometExplainInfo.EXTENSION_INFO)
125+
val tagValue = node.getTagValue(CometExplainInfo.FALLBACK_REASONS)
124126
val str = if (tagValue.nonEmpty) {
125127
s" ${node.nodeName} [COMET: ${tagValue.get.mkString(", ")}]"
126128
} else {
@@ -212,7 +214,7 @@ object CometCoverageStats {
212214
}
213215

214216
object CometExplainInfo {
215-
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")
217+
val FALLBACK_REASONS = new TreeNodeTag[Set[String]]("CometFallbackReasons")
216218

217219
def getActualPlan(node: TreeNode[_]): TreeNode[_] = {
218220
node match {

spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
103103

104104
/**
105105
* Plan-time predicate. `None` greenlights the serde to emit the codegen proto; `Some(reason)`
106-
* forces a Spark fallback (typically `withInfo(...) + None`) so the operator falls back cleanly
107-
* rather than crashing the Janino compile at execute time.
106+
* forces a Spark fallback (typically `withFallbackReason(...) + None`) so the operator falls
107+
* back cleanly rather than crashing the Janino compile at execute time.
108108
*
109109
* Checks every `BoundReference`'s data type and the root `expr.dataType` against
110110
* [[isSupportedDataType]], rejects aggregates / generators / `CodegenFallback` (other than

spark/src/main/scala/org/apache/comet/expressions/CometCast.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.internal.SQLConf
2424
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType, TimestampNTZType, TimestampType}
2525

2626
import org.apache.comet.CometConf
27-
import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, withInfo}
27+
import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, withFallbackReason}
2828
import org.apache.comet.serde.{CometExpressionSerde, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported}
2929
import org.apache.comet.serde.ExprOuterClass.Expr
3030
import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, serializeDataType}
@@ -81,7 +81,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
8181
if (childExpr.isDefined) {
8282
castToProto(cast, cast.timeZoneId, cast.dataType, childExpr.get, cometEvalMode)
8383
} else {
84-
withInfo(cast, cast.child)
84+
withFallbackReason(cast, cast.child)
8585
None
8686
}
8787
}
@@ -131,7 +131,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
131131
.setCast(castBuilder)
132132
.build())
133133
case _ =>
134-
withInfo(expr, s"Unsupported datatype in castToProto: $dt")
134+
withFallbackReason(expr, s"Unsupported datatype in castToProto: $dt")
135135
None
136136
}
137137
}

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -328,8 +328,8 @@ case class CometExecRule(session: SparkSession)
328328
} else {
329329
// copy fallback reasons to the original plan
330330
newPlan
331-
.getTagValue(CometExplainInfo.EXTENSION_INFO)
332-
.foreach(reasons => withInfos(plan, reasons))
331+
.getTagValue(CometExplainInfo.FALLBACK_REASONS)
332+
.foreach(reasons => withFallbackReasons(plan, reasons))
333333
// return the original plan
334334
plan
335335
}
@@ -382,8 +382,8 @@ case class CometExecRule(session: SparkSession)
382382
// reasons.
383383
// 3. The operator has children that could not be converted, so execution
384384
// has already fallen back to Spark.
385-
if (op.children.forall(_.isInstanceOf[CometNativeExec]) && !hasExplainInfo(op)) {
386-
withInfo(op, s"${op.nodeName} is not supported")
385+
if (op.children.forall(_.isInstanceOf[CometNativeExec]) && !hasFallbackReason(op)) {
386+
withFallbackReason(op, s"${op.nodeName} is not supported")
387387
} else {
388388
op
389389
}
@@ -587,7 +587,7 @@ case class CometExecRule(session: SparkSession)
587587
// config is enabled)
588588
if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
589589
val info = new ExtendedExplainInfo()
590-
if (info.extensionInfo(newPlan).nonEmpty) {
590+
if (info.fallbackReasons(newPlan).nonEmpty) {
591591
logWarning(
592592
"Comet cannot execute some parts of this plan natively " +
593593
s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false " +
@@ -693,7 +693,9 @@ case class CometExecRule(session: SparkSession)
693693
case other => Seq(other)
694694
}
695695
if (!dataProducingChildren.forall(_.isInstanceOf[CometNativeExec])) {
696-
withInfo(op, "Cannot perform native operation because input is not in Arrow format")
696+
withFallbackReason(
697+
op,
698+
"Cannot perform native operation because input is not in Arrow format")
697699
return None
698700
}
699701
}
@@ -721,7 +723,7 @@ case class CometExecRule(session: SparkSession)
721723
if (handler.enabledConfig.forall(_.get(op.conf))) {
722724
handler.getSupportLevel(op) match {
723725
case Unsupported(notes) =>
724-
withInfo(op, notes.getOrElse(""))
726+
withFallbackReason(op, notes.getOrElse(""))
725727
false
726728
case Incompatible(notes) =>
727729
val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
@@ -735,7 +737,7 @@ case class CometExecRule(session: SparkSession)
735737
true
736738
} else {
737739
val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
738-
withInfo(
740+
withFallbackReason(
739741
op,
740742
s"$opName is not fully compatible with Spark$optionalNotes. " +
741743
s"To enable it anyway, set $incompatConf=true. " +
@@ -749,7 +751,7 @@ case class CometExecRule(session: SparkSession)
749751
true
750752
}
751753
} else {
752-
withInfo(
754+
withFallbackReason(
753755
op,
754756
s"Native support for operator $opName is disabled. " +
755757
s"Set ${handler.enabledConfig.get.key}=true to enable it.")

0 commit comments

Comments
 (0)