feat: informational message channel + generic native-available hint#4509
feat: informational message channel + generic native-available hint#4509andygrove wants to merge 9 commits into
Conversation
Rename withInfo/withInfos/hasExplainInfo and EXTENSION_INFO to withFallbackReason/withFallbackReasons/hasFallbackReason and FALLBACK_REASONS to match their actual semantics (fallback reasons, not generic info). Also rename the private extensionInfo helper in ExtendedExplainInfo to fallbackReasons, and update the TreeNodeTag string from "CometExtensionInfo" to "CometFallbackReasons" so a future PR can reuse the old string for a distinct tag.
…skip ci] When date_format gets a natively-supported format string but the session timezone is non-UTC and allowIncompatible is off, Comet takes the JVM codegen path. Emit a COMET-INFO hint on the expression and lift expression-level info messages onto the converted operator centrally in CometExecRule, so verbose extended explain shows the faster native option and how to enable it.
# Conflicts: # spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala # spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala # spark/src/main/scala/org/apache/comet/serde/contraintExpressions.scala # spark/src/main/scala/org/apache/comet/serde/datetime.scala # spark/src/main/scala/org/apache/comet/serde/math.scala # spark/src/main/scala/org/apache/comet/serde/statics.scala # spark/src/main/scala/org/apache/comet/serde/strings.scala # spark/src/main/scala/org/apache/comet/serde/structs.scala # spark/src/main/scala/org/apache/comet/serde/unixtime.scala
Two issues surfaced once CI ran (the feature commits had skipped CI): - datetime.scala: drop redundant `s` interpolators on two string literals in the date_format info hint (scalafix CHECK). - tpcds q9 golden: the info-message change makes ExtendedExplainInfo omit an empty `[COMET: ]` for a bare Spark fallback Project, so the root renders as `Project`. Update the approved plan to match.
a5453a6 to
9050599
Compare
# Conflicts: # spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Add `withNativeAvailableInfo` on `CometExpressionSerde` so any serde whose native path is gated behind a config can attach a generic hint without re-inventing the message: "A native implementation of EXPR is available but needs to be enabled with CONFIG. See compatibility guide for more information." Default uses the per-expression `allowIncompatible` config key derived from `getExprConfigName`; the two-arg overload takes an explicit key for serdes (case conversion) gated by a different flag. Replaces the bespoke `date_format` hint and wires the helper into InitCap, StringReplace, RLike (only for scalar-literal patterns), RegExpReplace (only when offset==1), StringSplit, GetJsonObject, LengthOfJsonArray, StructsToJson and JsonToStructs (only when their `nativeSupported` would let native run), and Upper/Lower (with the case-conversion enable flag). Tests pin the full message text for date_format (existing) and for string_split (new), exercising both the standard allowIncompatible config path and the rendering through `rollUpInfoMessages`.
q24a, q24b (v1_4) and q24 (v2_7) now route through `Upper`, which after the new `withNativeAvailableInfo` wiring tags its enclosing BroadcastHashJoin with `[COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.]`. Verified all five Spark profiles (3.4 / 3.5 / 4.0 / 4.1 / 4.2) match the regenerated base files, so no version-specific overrides are needed.
| return nativeOpOpt.map { nativeOp => | ||
| val exec = serde.createExec(nativeOp, op) | ||
| rollUpInfoMessages(op, exec) | ||
| exec |
There was a problem hiding this comment.
I have some feeling we can avoid checkcast for all and then cast all, with something like
val nativeChildren = op.children.iterator.map {
case child: CometNativeExec => child.nativeOp
}.toSeq
val nativeOpOpt =
if (nativeChildren.size == op.children.size) {
nativeChildren.foreach(builder.addChildren)
serde.convert(op, builder, nativeChildren: _*)
} else {
serde.convert(op, builder)
}
nativeOpOpt.map { nativeOp =>
val exec = serde.createExec(nativeOp, op)
rollUpInfoMessages(op, exec)
exec
}
There was a problem hiding this comment.
Thanks, I implemented something similar just now
| : +- CometHashAggregate | ||
| : +- CometProject | ||
| : +- CometBroadcastHashJoin | ||
| : +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.] |
There was a problem hiding this comment.
We should probably decouple informational messages (which may change) from the plan stability test which should really only check that there is no degradation in the generated plan.
| * Use this to point the user at a faster or alternative execution path that is available but | ||
| * not currently selected, for example a native implementation gated behind a config. | ||
| */ | ||
| def withInfo[T <: TreeNode[_]](node: T, message: String): T = { |
There was a problem hiding this comment.
I feel it would be a simpler design to merely have a single withInfo(node, tag, message) instead of adding a new set of methods for every new tag type. wdyt?
There was a problem hiding this comment.
How would we distinguish between messages that should trigger fallback versus purely informational messages? Are you thinking that would be an extra parameter to withInfo?
There was a problem hiding this comment.
I see now that you are suggesting adding tag as an argument, presumably we could default it to fallback and then be explicit in other cases... I will take a look
Which issue does this PR close?
Closes #4006.
Stacked on #4508 (the
withInfo->withFallbackReasonrename). With #4508 merged, only the feature commits below remain in the diff.Rationale for this change
Comet only had one way to tag a plan node with a message, and that message always meant "this node falls back to Spark". There was no way to attach a purely informational note that does not trigger fallback. This matters for codegen dispatch: when Comet runs a JVM implementation of an expression even though a faster native implementation exists behind a config, we want to tell the user about the faster path without that note being treated as a fallback.
What changes are included in this PR?
Informational channel parallel to the fallback channel freed up by #4508:
CometSparkSessionExtensions.withInfo(node, message)records a message on a newCometExplainInfo.EXTENSION_INFOtag. It does not cause fallback: no planning rule reads this tag.[COMET-INFO: ...]segment, in addition to any[COMET: ...]fallback segment on the same node. The fallback explain list format is unchanged and still excludes info messages.CometExecRule.convertToComet(a single central rollup, applied to all native operators), because verbose explain only traverses plan nodes, not expressions.Generic
withNativeAvailableInfohelper onCometExpressionSerde:withNativeAvailableInfo(expr)(andwithNativeAvailableInfo(expr, configKey)for non-allowIncompatibleflags) that emits a uniform message: "A native implementation of EXPR is available but needs to be enabled with CONFIG. See compatibility guide for more information."allowIncompatiblegate:CometInitCap,CometStringReplace,CometStringSplit,CometGetJsonObject,CometLengthOfJsonArray,CometDateFormat.CometRLike(scalar-literal pattern),CometRegExpReplace(offset==1),CometStructsToJson(nativeSupported(expr)),CometJsonToStructs(nativeSupported(expr)).CometCaseConversionBase(Upper/Lower) — usesCOMET_CASE_CONVERSION_ENABLED.keyinstead of allowIncompat.Known limitation for future work: the Spark 4.x
CometExprShimnode reconstruction copiesFALLBACK_REASONSbut notEXTENSION_INFOonto the wrappingInvoke. No current code path routeswithInfothrough those shims, so this is latent. It can be addressed if a future serde tags one of those reconstructed nodes.How are these changes tested?
New tests in
CometExpressionSuite:withInfodoes not set a fallback reason and renders as[COMET-INFO: ...]in verbose explain; a second message accumulates rather than overwriting.date_format JVM codegen path hints at faster native option— pins the full generic message text for the date_format wiring.string_split JVM codegen path hints at faster native option— same assertion against a different expression, so the test exercises the standardallowIncompatibleconfig path through the new helper, not just date_format's bespoke wiring.