diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 6c4a92f312..c79746b718 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -361,4 +361,22 @@ object CometSparkSessionExtensions extends Logging { node.getTagValue(CometExplainInfo.FALLBACK_REASONS).exists(_.nonEmpty) } + /** + * Record a purely informational message on a `TreeNode`. Unlike `withFallbackReason`, this does + * NOT cause the node to fall back to Spark: the rules never read this tag. Messages are + * accumulated (never overwritten) on the node's `EXTENSION_INFO` tag and surfaced in verbose + * extended explain output under a `[COMET-INFO: ...]` label. + * + * Use this to point the user at a faster or alternative execution path that is available but + * not currently selected, for example a native implementation gated behind a config. + */ + def withInfo[T <: TreeNode[_]](node: T, message: String): T = { + if (message != null && message.nonEmpty) { + val existing = + node.getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse(Set.empty[String]) + node.setTagValue(CometExplainInfo.EXTENSION_INFO, existing + message) + } + node + } + } diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 592b1955f2..bd6794a5a4 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -122,9 +122,13 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { outString.append(if (lastChildren.last) "+- " else ":- ") } - val tagValue = node.getTagValue(CometExplainInfo.FALLBACK_REASONS) - val str = if (tagValue.nonEmpty) { - s" ${node.nodeName} [COMET: ${tagValue.get.mkString(", ")}]" + val fallback = node.getTagValue(CometExplainInfo.FALLBACK_REASONS) + val info = node.getTagValue(CometExplainInfo.EXTENSION_INFO) + val str = if (fallback.exists(_.nonEmpty) || info.exists(_.nonEmpty)) { + val sb = new StringBuilder(" ").append(node.nodeName) + fallback.filter(_.nonEmpty).foreach(v => sb.append(s" [COMET: ${v.mkString(", ")}]")) + info.filter(_.nonEmpty).foreach(v => sb.append(s" [COMET-INFO: ${v.mkString(", ")}]")) + sb.toString() } else { node.nodeName } @@ -215,6 +219,7 @@ object CometCoverageStats { object CometExplainInfo { val FALLBACK_REASONS = new TreeNodeTag[Set[String]]("CometFallbackReasons") + val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo") def getActualPlan(node: TreeNode[_]): TreeNode[_] = { node match { diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index d116d2f407..e764c61068 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -701,21 +701,37 @@ case class CometExecRule(session: SparkSession) } val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id) - if (op.children.nonEmpty && op.children.forall(_.isInstanceOf[CometNativeExec])) { - val childOp = op.children.map(_.asInstanceOf[CometNativeExec].nativeOp) - childOp.foreach(builder.addChildren) - return serde - .convert(op, builder, childOp: _*) - .map(nativeOp => serde.createExec(nativeOp, op)) - } else { - return serde - .convert(op, builder) - .map(nativeOp => serde.createExec(nativeOp, op)) + val nativeChildren = op.children.collect { case child: CometNativeExec => child.nativeOp } + val nativeOpOpt = + if (op.children.nonEmpty && nativeChildren.size == op.children.size) { + nativeChildren.foreach(builder.addChildren) + serde.convert(op, builder, nativeChildren: _*) + } else { + serde.convert(op, builder) + } + return nativeOpOpt.map { nativeOp => + val exec = serde.createExec(nativeOp, op) + rollUpInfoMessages(op, exec) + exec } } None } + /** + * Lift informational (non-fallback) messages tagged on an operator and its expressions onto the + * converted Comet plan node so they appear in verbose extended explain output. Expression-level + * hints would otherwise be invisible because explain only traverses plan nodes, not + * expressions. + */ + private def rollUpInfoMessages(op: SparkPlan, exec: SparkPlan): Unit = { + val fromOp = op.getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse(Set.empty[String]) + val fromExprs = op.expressions + .flatMap(_.collect { case e: Expression => e }) + .flatMap(_.getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse(Set.empty[String])) + (fromOp ++ fromExprs).foreach(msg => withInfo(exec, msg)) + } + private def isOperatorEnabled( handler: CometOperatorSerde[SparkPlan], op: SparkPlan): Boolean = { diff --git a/spark/src/main/scala/org/apache/comet/serde/CometExpressionSerde.scala b/spark/src/main/scala/org/apache/comet/serde/CometExpressionSerde.scala index 98d0f3b2fc..a29159571f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometExpressionSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometExpressionSerde.scala @@ -21,6 +21,8 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.comet.{CometConf, CometSparkSessionExtensions} + /** * Trait for providing serialization logic for expressions. */ @@ -95,6 +97,31 @@ trait CometExpressionSerde[T <: Expression] { * could not be converted. */ def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] + + /** + * Tag the expression with an informational hint that a native implementation exists but is + * gated behind a config the user has not enabled, so a slower path (the JVM codegen dispatcher, + * or Spark fallback) is being used instead. The hint surfaces in verbose extended explain + * output as `[COMET-INFO: ...]` and does NOT cause fallback. Defaults to the standard + * per-expression `allowIncompatible` config key derived from `getExprConfigName`; use the + * two-arg overload when the gating config is something else. + */ + def withNativeAvailableInfo(expr: T): T = + withNativeAvailableInfo( + expr, + CometConf.getExprAllowIncompatConfigKey(getExprConfigName(expr))) + + /** + * Like the single-arg overload but takes the gating config key explicitly. Use when the native + * path is gated by a config other than the per-expression `allowIncompatible` flag. + */ + def withNativeAvailableInfo(expr: T, configKey: String): T = { + CometSparkSessionExtensions.withInfo( + expr, + s"A native implementation of ${getExprConfigName(expr)} is available but needs to be " + + s"enabled with $configKey. See compatibility guide for more information.") + expr + } } /** diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 1487af1bd2..0652bbd86f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -718,6 +718,10 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { formatExpr) optExprWithFallbackReason(optExpr, expr, expr.left, expr.right) } else { + if (nativeFormat.isDefined) { + // Native `to_char` could run this if allowed; tell the user about the faster path. + withNativeAvailableInfo(expr) + } // Hand the full `DateFormatClass` (with `timeZoneId` already stamped by `ResolveTimeZone`) // to the codegen dispatcher. It closure-serializes the bound tree, so non-UTC timezones // and non-whitelisted / non-literal format strings produce Spark-identical results. diff --git a/spark/src/main/scala/org/apache/comet/serde/json.scala b/spark/src/main/scala/org/apache/comet/serde/json.scala index 2c210fa26c..75dfeac517 100644 --- a/spark/src/main/scala/org/apache/comet/serde/json.scala +++ b/spark/src/main/scala/org/apache/comet/serde/json.scala @@ -43,6 +43,7 @@ object CometLengthOfJsonArray extends CometCodegenDispatch[LengthOfJsonArray] { val optExpr = scalarFunctionExprToProto("json_array_length", childExpr: _*) optExprWithFallbackReason(optExpr, expr, expr.children: _*) } else { + withNativeAvailableInfo(expr) super.convert(expr, inputs, binding) } } diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index dee8a9da8d..60809d2e2a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -66,6 +66,7 @@ class CometCaseConversionBase[T <: Expression](function: String) // Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the // Comet pipeline. This guarantees Spark-compatible behavior across 3.4 / 3.5 / 4.0. // Falls through to Spark when the dispatcher is disabled. + withNativeAvailableInfo(expr, CometConf.COMET_CASE_CONVERSION_ENABLED.key) CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding) } } @@ -98,6 +99,7 @@ object CometInitCap extends CometScalarFunction[InitCap]("initcap") { // Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the // Comet pipeline. This guarantees Spark-compatible behavior across 3.4 / 3.5 / 4.0. // Falls through to Spark when the dispatcher is disabled. + withNativeAvailableInfo(expr) CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding) } } @@ -119,6 +121,7 @@ object CometStringReplace extends CometScalarFunction[StringReplace]("replace") } else { // Run Spark's own generated code inside the Comet pipeline so the result matches Spark // exactly. Falls back to Spark when the codegen dispatcher is disabled. + withNativeAvailableInfo(expr) CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding) } } @@ -342,6 +345,14 @@ object CometRLike extends CometExpressionSerde[RLike] { case _ => // Non-scalar pattern: the native path cannot handle it, fall through to the dispatcher. } + } else { + // Native path is gated off; tell the user about the faster path. Only emit the hint when + // the pattern is a scalar literal -- a non-scalar pattern can never use native regardless + // of the config. + expr.right match { + case Literal(_, DataTypes.StringType) => withNativeAvailableInfo(expr) + case _ => + } } // Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the Comet // pipeline. Falls back to Spark when the dispatcher is disabled. @@ -441,7 +452,10 @@ object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] { optExprWithFallbackReason(optExpr, expr, expr.subject, expr.regexp, expr.rep, expr.pos) } else { // Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the - // Comet pipeline. Falls back to Spark when the dispatcher is disabled. + // Comet pipeline. Falls back to Spark when the dispatcher is disabled. Only emit the + // native-available hint when the native path would actually run (offset==1); otherwise + // flipping the config wouldn't enable native. + if (nativeSupported(expr)) withNativeAvailableInfo(expr) CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding) } } @@ -479,6 +493,7 @@ object CometStringSplit extends CometExpressionSerde[StringSplit] { } else { // Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the // Comet pipeline. Falls back to Spark when the dispatcher is disabled. + withNativeAvailableInfo(expr) CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding) } } @@ -515,6 +530,7 @@ object CometGetJsonObject extends CometCodegenDispatch[GetJsonObject] { pathExpr) optExprWithFallbackReason(optExpr, expr, expr.json, expr.path) } else { + withNativeAvailableInfo(expr) super.convert(expr, inputs, binding) } } diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala b/spark/src/main/scala/org/apache/comet/serde/structs.scala index c38b12bb6d..1eff59cc58 100644 --- a/spark/src/main/scala/org/apache/comet/serde/structs.scala +++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala @@ -151,6 +151,9 @@ object CometStructsToJson extends CometCodegenDispatch[StructsToJson] { None } } else { + // Only emit the native-available hint when the native path would actually run; flipping + // allowIncompatible alone won't enable native for unsupported types or options. + if (nativeSupported(expr)) withNativeAvailableInfo(expr) super.convert(expr, inputs, binding) } @@ -191,6 +194,9 @@ object CometJsonToStructs extends CometCodegenDispatch[JsonToStructs] { binding: Boolean): Option[ExprOuterClass.Expr] = { if (!(CometConf.isExprAllowIncompat(getExprConfigName(expr)) && nativeSupported(expr))) { + // Only emit the native-available hint when the native path would actually run; flipping + // allowIncompatible alone won't enable native for an unsupported schema. + if (nativeSupported(expr)) withNativeAvailableInfo(expr) return super.convert(expr, inputs, binding) } diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a/extended.txt index 7dc6cd0331..520aed49bf 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a/extended.txt @@ -9,7 +9,7 @@ CometNativeColumnarToRow : +- CometExchange : +- CometHashAggregate : +- CometProject - : +- CometBroadcastHashJoin + : +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.] : :- CometProject : : +- CometBroadcastHashJoin : : :- CometProject @@ -51,7 +51,7 @@ CometNativeColumnarToRow +- CometExchange +- CometHashAggregate +- CometProject - +- CometBroadcastHashJoin + +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.] :- CometProject : +- CometBroadcastHashJoin : :- CometProject diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b/extended.txt index 7dc6cd0331..520aed49bf 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b/extended.txt @@ -9,7 +9,7 @@ CometNativeColumnarToRow : +- CometExchange : +- CometHashAggregate : +- CometProject - : +- CometBroadcastHashJoin + : +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.] : :- CometProject : : +- CometBroadcastHashJoin : : :- CometProject @@ -51,7 +51,7 @@ CometNativeColumnarToRow +- CometExchange +- CometHashAggregate +- CometProject - +- CometBroadcastHashJoin + +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.] :- CometProject : +- CometBroadcastHashJoin : :- CometProject diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q9/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q9/extended.txt index ff39852ed0..ca26c9a6b1 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q9/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q9/extended.txt @@ -1,4 +1,4 @@ - Project [COMET: ] +Project : :- Subquery : : +- CometNativeColumnarToRow : : +- CometProject diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24/extended.txt index 7418929ba8..cfb8867f5a 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24/extended.txt @@ -11,7 +11,7 @@ CometNativeColumnarToRow : +- CometExchange : +- CometHashAggregate : +- CometProject - : +- CometBroadcastHashJoin + : +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.] : :- CometProject : : +- CometBroadcastHashJoin : : :- CometProject @@ -53,7 +53,7 @@ CometNativeColumnarToRow +- CometExchange +- CometHashAggregate +- CometProject - +- CometBroadcastHashJoin + +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.] :- CometProject : +- CometBroadcastHashJoin : :- CometProject diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 3bfeccae26..93449724b9 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -993,6 +993,36 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("withInfo does not cause fallback and renders in verbose explain") { + val table = "with_info_msg" + withTable(table) { + sql(s"create table $table(id int) using parquet") + sql(s"insert into $table values(1)") + val query = sql(s"select cast(id as string) from $table") + val (_, cometPlan) = checkSparkAnswerAndOperator(query) + val project = stripAQEPlan(cometPlan).collectFirst { case p: CometProjectExec => p }.get + + // Tagging info must NOT mark the node as falling back. + CometSparkSessionExtensions.withInfo(project, "faster path available") + assert(!CometSparkSessionExtensions.hasFallbackReason(project)) + + // Verbose explain shows it under the distinct COMET-INFO label. + withSQLConf( + CometConf.COMET_EXTENDED_EXPLAIN_FORMAT.key -> + CometConf.COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE) { + val explain = new ExtendedExplainInfo().generateExtendedInfo(project) + assert(explain.contains("[COMET-INFO: faster path available]")) + assert(!explain.contains("[COMET: faster path available]")) + + // A second info message accumulates rather than replacing the first. + CometSparkSessionExtensions.withInfo(project, "second hint") + val explain2 = new ExtendedExplainInfo().generateExtendedInfo(project) + assert(explain2.contains("faster path available")) + assert(explain2.contains("second hint")) + } + } + } + test("rlike with non-scalar pattern runs via codegen dispatcher") { val table = "rlike_non_scalar" withTable(table) { @@ -3124,6 +3154,56 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("date_format JVM codegen path hints at faster native option") { + val table = "date_format_hint" + withTable(table) { + sql(s"create table $table(ts timestamp) using parquet") + sql(s"insert into $table values(timestamp'2024-01-02 03:04:05')") + // Non-UTC session timezone + a natively-supported format => native is blocked only by + // allowIncompatible being off, so we take the JVM codegen path and should hint at native. + // The codegen dispatcher must be enabled, otherwise date_format has no native or JVM path + // here and the operator falls back to Spark (failing checkSparkAnswerAndOperator). + withSQLConf( + CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "true", + "spark.sql.session.timeZone" -> "America/New_York", + CometConf.COMET_EXTENDED_EXPLAIN_FORMAT.key -> + CometConf.COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE) { + val query = sql(s"select date_format(ts, 'yyyy-MM-dd') from $table") + val (_, cometPlan) = checkSparkAnswerAndOperator(query) + val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) + // Pin the full generic message format produced by withNativeAvailableInfo. + assert( + explain.contains( + "[COMET-INFO: A native implementation of DateFormatClass is available but needs " + + "to be enabled with spark.comet.expression.DateFormatClass.allowIncompatible. " + + "See compatibility guide for more information.]")) + } + } + } + + test("string_split JVM codegen path hints at faster native option") { + // Sanity-check that the generic withNativeAvailableInfo helper fires for a different + // expression than date_format, exercising the standard `allowIncompatible` config path. + val table = "string_split_hint" + withTable(table) { + sql(s"create table $table(s string) using parquet") + sql(s"insert into $table values('a,b,c')") + withSQLConf( + CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "true", + CometConf.COMET_EXTENDED_EXPLAIN_FORMAT.key -> + CometConf.COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE) { + val query = sql(s"select split(s, ',') from $table") + val (_, cometPlan) = checkSparkAnswerAndOperator(query) + val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) + assert( + explain.contains( + "[COMET-INFO: A native implementation of StringSplit is available but needs to " + + "be enabled with spark.comet.expression.StringSplit.allowIncompatible. " + + "See compatibility guide for more information.]")) + } + } + } + test("deep AND/OR predicate chains do not overflow the protobuf recursion limit") { // A left-deep chain of N associative boolean operands serializes to a proto nested N // levels deep. With N > protobuf's default recursion limit (100), the message overflows