-
Notifications
You must be signed in to change notification settings - Fork 330
feat: Add plan conversion statistics to extended explain info #2412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
79310d0
5157f83
4ef5293
8b6dcfc
8dca97d
45aa334
2dc8f70
22407fc
f85ffd5
3552888
8fc2108
6f99dd9
9cd439f
ad7ff51
ac57ee9
36c0439
bdc8300
62473ad
e483ffa
6a0500d
6436574
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,8 +23,10 @@ import scala.collection.mutable | |
|
|
||
| import org.apache.spark.sql.ExtendedExplainGenerator | ||
| import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag} | ||
| import org.apache.spark.sql.execution.{InputAdapter, SparkPlan, WholeStageCodegenExec} | ||
| import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} | ||
| import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometPlan, CometSparkToColumnarExec} | ||
| import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, RowToColumnarExec, SparkPlan, WholeStageCodegenExec} | ||
| import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec} | ||
| import org.apache.spark.sql.execution.exchange.ReusedExchangeExec | ||
|
|
||
| import org.apache.comet.CometExplainInfo.getActualPlan | ||
|
|
||
|
|
@@ -81,9 +83,14 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { | |
| // generates the extended info in a verbose manner, printing each node along with the | ||
| // extended information in a tree display | ||
| def generateVerboseExtendedInfo(plan: SparkPlan): String = { | ||
| val planStats = new PlanStats() | ||
| val outString = new StringBuilder() | ||
| generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString) | ||
| outString.toString() | ||
| generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats) | ||
| val eligible = planStats.sparkOperators + planStats.cometOperators | ||
| val converted = | ||
| if (eligible == 0) 0.0 else planStats.cometOperators.toDouble / eligible * 100.0 | ||
| val summary = s"Comet accelerated ${converted.toInt}% of eligible operators ($planStats)." | ||
| s"${outString.toString()}\n$summary" | ||
| } | ||
|
|
||
| // Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any | ||
|
|
@@ -92,7 +99,22 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { | |
| depth: Int, | ||
| lastChildren: Seq[Boolean], | ||
| indent: Int, | ||
| outString: StringBuilder): Unit = { | ||
| outString: StringBuilder, | ||
| planStats: PlanStats): Unit = { | ||
|
|
||
| node match { | ||
| case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | | ||
| _: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec => | ||
| planStats.wrappers += 1 | ||
| case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | | ||
| _: CometSparkToColumnarExec => | ||
| planStats.transitions += 1 | ||
| case _: CometPlan => | ||
| planStats.cometOperators += 1 | ||
| case _ => | ||
| planStats.sparkOperators += 1 | ||
| } | ||
|
|
||
| outString.append(" " * indent) | ||
| if (depth > 0) { | ||
| lastChildren.init.foreach { isLast => | ||
|
|
@@ -119,15 +141,17 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { | |
| depth + 2, | ||
| lastChildren :+ node.children.isEmpty :+ false, | ||
| indent, | ||
| outString) | ||
| outString, | ||
| planStats) | ||
| case _ => | ||
| } | ||
| generateTreeString( | ||
| getActualPlan(innerChildrenLocal.last), | ||
| depth + 2, | ||
| lastChildren :+ node.children.isEmpty :+ true, | ||
| indent, | ||
| outString) | ||
| outString, | ||
| planStats) | ||
| } | ||
| if (node.children.nonEmpty) { | ||
| node.children.init.foreach { | ||
|
|
@@ -137,18 +161,37 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { | |
| depth + 1, | ||
| lastChildren :+ false, | ||
| indent, | ||
| outString) | ||
| outString, | ||
| planStats) | ||
| case _ => | ||
| } | ||
| node.children.last match { | ||
| case c @ (_: TreeNode[_]) => | ||
| generateTreeString(getActualPlan(c), depth + 1, lastChildren :+ true, indent, outString) | ||
| generateTreeString( | ||
| getActualPlan(c), | ||
| depth + 1, | ||
| lastChildren :+ true, | ||
| indent, | ||
| outString, | ||
| planStats) | ||
| case _ => | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| class PlanStats { | ||
|
andygrove marked this conversation as resolved.
Outdated
|
||
| var sparkOperators: Int = 0 | ||
| var cometOperators: Int = 0 | ||
| var wrappers: Int = 0 | ||
| var transitions: Int = 0 | ||
|
|
||
| override def toString: String = { | ||
| s"sparkOperators=$sparkOperators, cometOperators=$cometOperators, " + | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we could use a more verbose string here so that the meaning of these stats is a little more obvious |
||
| s"transitions=$transitions, wrappers=$wrappers" | ||
| } | ||
| } | ||
|
|
||
| object CometExplainInfo { | ||
| val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo") | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -118,6 +118,11 @@ class CometExecSuite extends CometTestBase { | |
| val (_, cometPlan) = checkSparkAnswer(df) | ||
| val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) | ||
| assert(infos.contains("Dynamic Partition Pruning is not supported")) | ||
|
|
||
| withSQLConf(CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true") { | ||
| val extendedExplain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) | ||
| assert(extendedExplain.contains("Comet accelerated 33% of eligible operators")) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be this number fluctuating?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is currently stable across all Spark versions that we test with.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wrong. There is a failure due to a different percentage. I will make the test less specific. |
||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated change, but we don't need
16gfor this simple example