Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -361,4 +361,22 @@ object CometSparkSessionExtensions extends Logging {
node.getTagValue(CometExplainInfo.FALLBACK_REASONS).exists(_.nonEmpty)
}

/**
* Record a purely informational message on a `TreeNode`. Unlike `withFallbackReason`, this does
* NOT cause the node to fall back to Spark: the rules never read this tag. Messages are
* accumulated (never overwritten) on the node's `EXTENSION_INFO` tag and surfaced in verbose
* extended explain output under a `[COMET-INFO: ...]` label.
*
* Use this to point the user at a faster or alternative execution path that is available but
* not currently selected, for example a native implementation gated behind a config.
*/
def withInfo[T <: TreeNode[_]](node: T, message: String): T = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it would be a simpler design to merely have a single withInfo(node, tag, message) instead of adding a new set of methods for every new tag type. wdyt?

if (message != null && message.nonEmpty) {
val existing =
node.getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse(Set.empty[String])
node.setTagValue(CometExplainInfo.EXTENSION_INFO, existing + message)
}
node
}

}
11 changes: 8 additions & 3 deletions spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,13 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
outString.append(if (lastChildren.last) "+- " else ":- ")
}

val tagValue = node.getTagValue(CometExplainInfo.FALLBACK_REASONS)
val str = if (tagValue.nonEmpty) {
s" ${node.nodeName} [COMET: ${tagValue.get.mkString(", ")}]"
val fallback = node.getTagValue(CometExplainInfo.FALLBACK_REASONS)
val info = node.getTagValue(CometExplainInfo.EXTENSION_INFO)
val str = if (fallback.exists(_.nonEmpty) || info.exists(_.nonEmpty)) {
val sb = new StringBuilder(" ").append(node.nodeName)
fallback.filter(_.nonEmpty).foreach(v => sb.append(s" [COMET: ${v.mkString(", ")}]"))
info.filter(_.nonEmpty).foreach(v => sb.append(s" [COMET-INFO: ${v.mkString(", ")}]"))
sb.toString()
} else {
node.nodeName
}
Expand Down Expand Up @@ -215,6 +219,7 @@ object CometCoverageStats {

object CometExplainInfo {
val FALLBACK_REASONS = new TreeNodeTag[Set[String]]("CometFallbackReasons")
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")

def getActualPlan(node: TreeNode[_]): TreeNode[_] = {
node match {
Expand Down
36 changes: 26 additions & 10 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -701,21 +701,37 @@ case class CometExecRule(session: SparkSession)
}

val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
if (op.children.nonEmpty && op.children.forall(_.isInstanceOf[CometNativeExec])) {
val childOp = op.children.map(_.asInstanceOf[CometNativeExec].nativeOp)
childOp.foreach(builder.addChildren)
return serde
.convert(op, builder, childOp: _*)
.map(nativeOp => serde.createExec(nativeOp, op))
} else {
return serde
.convert(op, builder)
.map(nativeOp => serde.createExec(nativeOp, op))
val nativeChildren = op.children.collect { case child: CometNativeExec => child.nativeOp }
val nativeOpOpt =
if (op.children.nonEmpty && nativeChildren.size == op.children.size) {
nativeChildren.foreach(builder.addChildren)
serde.convert(op, builder, nativeChildren: _*)
} else {
serde.convert(op, builder)
}
return nativeOpOpt.map { nativeOp =>
val exec = serde.createExec(nativeOp, op)
rollUpInfoMessages(op, exec)
exec

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some feeling we can avoid checkcast for all and then cast all, with something like

val nativeChildren = op.children.iterator.map {
  case child: CometNativeExec => child.nativeOp
}.toSeq

val nativeOpOpt =
  if (nativeChildren.size == op.children.size) {
    nativeChildren.foreach(builder.addChildren)
    serde.convert(op, builder, nativeChildren: _*)
  } else {
    serde.convert(op, builder)
  }

nativeOpOpt.map { nativeOp =>
  val exec = serde.createExec(nativeOp, op)
  rollUpInfoMessages(op, exec)
  exec
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I implemented something similar just now

}
}
None
}

/**
* Lift informational (non-fallback) messages tagged on an operator and its expressions onto the
* converted Comet plan node so they appear in verbose extended explain output. Expression-level
* hints would otherwise be invisible because explain only traverses plan nodes, not
* expressions.
*/
private def rollUpInfoMessages(op: SparkPlan, exec: SparkPlan): Unit = {
val fromOp = op.getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse(Set.empty[String])
val fromExprs = op.expressions
.flatMap(_.collect { case e: Expression => e })
.flatMap(_.getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse(Set.empty[String]))
(fromOp ++ fromExprs).foreach(msg => withInfo(exec, msg))
}

private def isOperatorEnabled(
handler: CometOperatorSerde[SparkPlan],
op: SparkPlan): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package org.apache.comet.serde

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}

import org.apache.comet.{CometConf, CometSparkSessionExtensions}

/**
* Trait for providing serialization logic for expressions.
*/
Expand Down Expand Up @@ -95,6 +97,31 @@ trait CometExpressionSerde[T <: Expression] {
* could not be converted.
*/
def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr]

/**
* Tag the expression with an informational hint that a native implementation exists but is
* gated behind a config the user has not enabled, so a slower path (the JVM codegen dispatcher,
* or Spark fallback) is being used instead. The hint surfaces in verbose extended explain
* output as `[COMET-INFO: ...]` and does NOT cause fallback. Defaults to the standard
* per-expression `allowIncompatible` config key derived from `getExprConfigName`; use the
* two-arg overload when the gating config is something else.
*/
def withNativeAvailableInfo(expr: T): T =
withNativeAvailableInfo(
expr,
CometConf.getExprAllowIncompatConfigKey(getExprConfigName(expr)))

/**
* Like the single-arg overload but takes the gating config key explicitly. Use when the native
* path is gated by a config other than the per-expression `allowIncompatible` flag.
*/
def withNativeAvailableInfo(expr: T, configKey: String): T = {
CometSparkSessionExtensions.withInfo(
expr,
s"A native implementation of ${getExprConfigName(expr)} is available but needs to be " +
s"enabled with $configKey. See compatibility guide for more information.")
expr
}
}

/**
Expand Down
4 changes: 4 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,10 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] {
formatExpr)
optExprWithFallbackReason(optExpr, expr, expr.left, expr.right)
} else {
if (nativeFormat.isDefined) {
// Native `to_char` could run this if allowed; tell the user about the faster path.
withNativeAvailableInfo(expr)
}
// Hand the full `DateFormatClass` (with `timeZoneId` already stamped by `ResolveTimeZone`)
// to the codegen dispatcher. It closure-serializes the bound tree, so non-UTC timezones
// and non-whitelisted / non-literal format strings produce Spark-identical results.
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/org/apache/comet/serde/json.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object CometLengthOfJsonArray extends CometCodegenDispatch[LengthOfJsonArray] {
val optExpr = scalarFunctionExprToProto("json_array_length", childExpr: _*)
optExprWithFallbackReason(optExpr, expr, expr.children: _*)
} else {
withNativeAvailableInfo(expr)
super.convert(expr, inputs, binding)
}
}
Expand Down
18 changes: 17 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class CometCaseConversionBase[T <: Expression](function: String)
// Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the
// Comet pipeline. This guarantees Spark-compatible behavior across 3.4 / 3.5 / 4.0.
// Falls through to Spark when the dispatcher is disabled.
withNativeAvailableInfo(expr, CometConf.COMET_CASE_CONVERSION_ENABLED.key)
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
}
}
Expand Down Expand Up @@ -98,6 +99,7 @@ object CometInitCap extends CometScalarFunction[InitCap]("initcap") {
// Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the
// Comet pipeline. This guarantees Spark-compatible behavior across 3.4 / 3.5 / 4.0.
// Falls through to Spark when the dispatcher is disabled.
withNativeAvailableInfo(expr)
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
}
}
Expand All @@ -119,6 +121,7 @@ object CometStringReplace extends CometScalarFunction[StringReplace]("replace")
} else {
// Run Spark's own generated code inside the Comet pipeline so the result matches Spark
// exactly. Falls back to Spark when the codegen dispatcher is disabled.
withNativeAvailableInfo(expr)
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
}
}
Expand Down Expand Up @@ -342,6 +345,14 @@ object CometRLike extends CometExpressionSerde[RLike] {
case _ =>
// Non-scalar pattern: the native path cannot handle it, fall through to the dispatcher.
}
} else {
// Native path is gated off; tell the user about the faster path. Only emit the hint when
// the pattern is a scalar literal -- a non-scalar pattern can never use native regardless
// of the config.
expr.right match {
case Literal(_, DataTypes.StringType) => withNativeAvailableInfo(expr)
case _ =>
}
}
// Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the Comet
// pipeline. Falls back to Spark when the dispatcher is disabled.
Expand Down Expand Up @@ -441,7 +452,10 @@ object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] {
optExprWithFallbackReason(optExpr, expr, expr.subject, expr.regexp, expr.rep, expr.pos)
} else {
// Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the
// Comet pipeline. Falls back to Spark when the dispatcher is disabled.
// Comet pipeline. Falls back to Spark when the dispatcher is disabled. Only emit the
// native-available hint when the native path would actually run (offset==1); otherwise
// flipping the config wouldn't enable native.
if (nativeSupported(expr)) withNativeAvailableInfo(expr)
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
}
}
Expand Down Expand Up @@ -479,6 +493,7 @@ object CometStringSplit extends CometExpressionSerde[StringSplit] {
} else {
// Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the
// Comet pipeline. Falls back to Spark when the dispatcher is disabled.
withNativeAvailableInfo(expr)
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
}
}
Expand Down Expand Up @@ -515,6 +530,7 @@ object CometGetJsonObject extends CometCodegenDispatch[GetJsonObject] {
pathExpr)
optExprWithFallbackReason(optExpr, expr, expr.json, expr.path)
} else {
withNativeAvailableInfo(expr)
super.convert(expr, inputs, binding)
}
}
Expand Down
6 changes: 6 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/structs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ object CometStructsToJson extends CometCodegenDispatch[StructsToJson] {
None
}
} else {
// Only emit the native-available hint when the native path would actually run; flipping
// allowIncompatible alone won't enable native for unsupported types or options.
if (nativeSupported(expr)) withNativeAvailableInfo(expr)
super.convert(expr, inputs, binding)
}

Expand Down Expand Up @@ -191,6 +194,9 @@ object CometJsonToStructs extends CometCodegenDispatch[JsonToStructs] {
binding: Boolean): Option[ExprOuterClass.Expr] = {

if (!(CometConf.isExprAllowIncompat(getExprConfigName(expr)) && nativeSupported(expr))) {
// Only emit the native-available hint when the native path would actually run; flipping
// allowIncompatible alone won't enable native for an unsupported schema.
if (nativeSupported(expr)) withNativeAvailableInfo(expr)
return super.convert(expr, inputs, binding)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CometNativeColumnarToRow
: +- CometExchange
: +- CometHashAggregate
: +- CometProject
: +- CometBroadcastHashJoin
: +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably decouple informational messages (which may change) from the plan stability test which should really only check that there is no degradation in the generated plan.

: :- CometProject
: : +- CometBroadcastHashJoin
: : :- CometProject
Expand Down Expand Up @@ -51,7 +51,7 @@ CometNativeColumnarToRow
+- CometExchange
+- CometHashAggregate
+- CometProject
+- CometBroadcastHashJoin
+- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.]
:- CometProject
: +- CometBroadcastHashJoin
: :- CometProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CometNativeColumnarToRow
: +- CometExchange
: +- CometHashAggregate
: +- CometProject
: +- CometBroadcastHashJoin
: +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.]
: :- CometProject
: : +- CometBroadcastHashJoin
: : :- CometProject
Expand Down Expand Up @@ -51,7 +51,7 @@ CometNativeColumnarToRow
+- CometExchange
+- CometHashAggregate
+- CometProject
+- CometBroadcastHashJoin
+- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.]
:- CometProject
: +- CometBroadcastHashJoin
: :- CometProject
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Project [COMET: ]
Project
: :- Subquery
: : +- CometNativeColumnarToRow
: : +- CometProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ CometNativeColumnarToRow
: +- CometExchange
: +- CometHashAggregate
: +- CometProject
: +- CometBroadcastHashJoin
: +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.]
: :- CometProject
: : +- CometBroadcastHashJoin
: : :- CometProject
Expand Down Expand Up @@ -53,7 +53,7 @@ CometNativeColumnarToRow
+- CometExchange
+- CometHashAggregate
+- CometProject
+- CometBroadcastHashJoin
+- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.]
:- CometProject
: +- CometBroadcastHashJoin
: :- CometProject
Expand Down
80 changes: 80 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,36 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("withInfo does not cause fallback and renders in verbose explain") {
val table = "with_info_msg"
withTable(table) {
sql(s"create table $table(id int) using parquet")
sql(s"insert into $table values(1)")
val query = sql(s"select cast(id as string) from $table")
val (_, cometPlan) = checkSparkAnswerAndOperator(query)
val project = stripAQEPlan(cometPlan).collectFirst { case p: CometProjectExec => p }.get

// Tagging info must NOT mark the node as falling back.
CometSparkSessionExtensions.withInfo(project, "faster path available")
assert(!CometSparkSessionExtensions.hasFallbackReason(project))

// Verbose explain shows it under the distinct COMET-INFO label.
withSQLConf(
CometConf.COMET_EXTENDED_EXPLAIN_FORMAT.key ->
CometConf.COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE) {
val explain = new ExtendedExplainInfo().generateExtendedInfo(project)
assert(explain.contains("[COMET-INFO: faster path available]"))
assert(!explain.contains("[COMET: faster path available]"))

// A second info message accumulates rather than replacing the first.
CometSparkSessionExtensions.withInfo(project, "second hint")
val explain2 = new ExtendedExplainInfo().generateExtendedInfo(project)
assert(explain2.contains("faster path available"))
assert(explain2.contains("second hint"))
}
}
}

test("rlike with non-scalar pattern runs via codegen dispatcher") {
val table = "rlike_non_scalar"
withTable(table) {
Expand Down Expand Up @@ -3124,6 +3154,56 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("date_format JVM codegen path hints at faster native option") {
val table = "date_format_hint"
withTable(table) {
sql(s"create table $table(ts timestamp) using parquet")
sql(s"insert into $table values(timestamp'2024-01-02 03:04:05')")
// Non-UTC session timezone + a natively-supported format => native is blocked only by
// allowIncompatible being off, so we take the JVM codegen path and should hint at native.
// The codegen dispatcher must be enabled, otherwise date_format has no native or JVM path
// here and the operator falls back to Spark (failing checkSparkAnswerAndOperator).
withSQLConf(
CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "true",
"spark.sql.session.timeZone" -> "America/New_York",
CometConf.COMET_EXTENDED_EXPLAIN_FORMAT.key ->
CometConf.COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE) {
val query = sql(s"select date_format(ts, 'yyyy-MM-dd') from $table")
val (_, cometPlan) = checkSparkAnswerAndOperator(query)
val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
// Pin the full generic message format produced by withNativeAvailableInfo.
assert(
explain.contains(
"[COMET-INFO: A native implementation of DateFormatClass is available but needs " +
"to be enabled with spark.comet.expression.DateFormatClass.allowIncompatible. " +
"See compatibility guide for more information.]"))
}
}
}

test("string_split JVM codegen path hints at faster native option") {
// Sanity-check that the generic withNativeAvailableInfo helper fires for a different
// expression than date_format, exercising the standard `allowIncompatible` config path.
val table = "string_split_hint"
withTable(table) {
sql(s"create table $table(s string) using parquet")
sql(s"insert into $table values('a,b,c')")
withSQLConf(
CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "true",
CometConf.COMET_EXTENDED_EXPLAIN_FORMAT.key ->
CometConf.COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE) {
val query = sql(s"select split(s, ',') from $table")
val (_, cometPlan) = checkSparkAnswerAndOperator(query)
val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
assert(
explain.contains(
"[COMET-INFO: A native implementation of StringSplit is available but needs to " +
"be enabled with spark.comet.expression.StringSplit.allowIncompatible. " +
"See compatibility guide for more information.]"))
}
}
}

test("deep AND/OR predicate chains do not overflow the protobuf recursion limit") {
// A left-deep chain of N associative boolean operands serializes to a proto nested N
// levels deep. With N > protobuf's default recursion limit (100), the message overflows
Expand Down
Loading