-
Notifications
You must be signed in to change notification settings - Fork 331
feat: informational message channel + generic native-available hint #4509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
40fbd7d
8bf8443
f430efb
57dd421
9050599
e09cb96
4793a63
3c71ace
b202337
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -701,21 +701,37 @@ case class CometExecRule(session: SparkSession) | |
| } | ||
|
|
||
| val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id) | ||
| if (op.children.nonEmpty && op.children.forall(_.isInstanceOf[CometNativeExec])) { | ||
| val childOp = op.children.map(_.asInstanceOf[CometNativeExec].nativeOp) | ||
| childOp.foreach(builder.addChildren) | ||
| return serde | ||
| .convert(op, builder, childOp: _*) | ||
| .map(nativeOp => serde.createExec(nativeOp, op)) | ||
| } else { | ||
| return serde | ||
| .convert(op, builder) | ||
| .map(nativeOp => serde.createExec(nativeOp, op)) | ||
| val nativeChildren = op.children.collect { case child: CometNativeExec => child.nativeOp } | ||
| val nativeOpOpt = | ||
| if (op.children.nonEmpty && nativeChildren.size == op.children.size) { | ||
| nativeChildren.foreach(builder.addChildren) | ||
| serde.convert(op, builder, nativeChildren: _*) | ||
| } else { | ||
| serde.convert(op, builder) | ||
| } | ||
| return nativeOpOpt.map { nativeOp => | ||
| val exec = serde.createExec(nativeOp, op) | ||
| rollUpInfoMessages(op, exec) | ||
| exec | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have some feeling we can avoid checkcast for all and then cast all, with something like
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I implemented something similar just now |
||
| } | ||
| } | ||
| None | ||
| } | ||
|
|
||
| /** | ||
| * Lift informational (non-fallback) messages tagged on an operator and its expressions onto the | ||
| * converted Comet plan node so they appear in verbose extended explain output. Expression-level | ||
| * hints would otherwise be invisible because explain only traverses plan nodes, not | ||
| * expressions. | ||
| */ | ||
| private def rollUpInfoMessages(op: SparkPlan, exec: SparkPlan): Unit = { | ||
| val fromOp = op.getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse(Set.empty[String]) | ||
| val fromExprs = op.expressions | ||
| .flatMap(_.collect { case e: Expression => e }) | ||
| .flatMap(_.getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse(Set.empty[String])) | ||
| (fromOp ++ fromExprs).foreach(msg => withInfo(exec, msg)) | ||
| } | ||
|
|
||
| private def isOperatorEnabled( | ||
| handler: CometOperatorSerde[SparkPlan], | ||
| op: SparkPlan): Boolean = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,7 @@ CometNativeColumnarToRow | |
| : +- CometExchange | ||
| : +- CometHashAggregate | ||
| : +- CometProject | ||
| : +- CometBroadcastHashJoin | ||
| : +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably decouple informational messages (which may change) from the plan stability test which should really only check that there is no degradation in the generated plan. |
||
| : :- CometProject | ||
| : : +- CometBroadcastHashJoin | ||
| : : :- CometProject | ||
|
|
@@ -51,7 +51,7 @@ CometNativeColumnarToRow | |
| +- CometExchange | ||
| +- CometHashAggregate | ||
| +- CometProject | ||
| +- CometBroadcastHashJoin | ||
| +- CometBroadcastHashJoin [COMET-INFO: A native implementation of Upper is available but needs to be enabled with spark.comet.caseConversion.enabled. See compatibility guide for more information.] | ||
| :- CometProject | ||
| : +- CometBroadcastHashJoin | ||
| : :- CometProject | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| Project [COMET: ] | ||
| Project | ||
| : :- Subquery | ||
| : : +- CometNativeColumnarToRow | ||
| : : +- CometProject | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it would be a simpler design to merely have a single
withInfo(node, tag, message)instead of adding a new set of methods for every new tag type. wdyt?