Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -457,11 +457,21 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explain.enabled")
.doc(
"When this setting is enabled, Comet will log an explain plan as part of the query " +
"planning process for each query stage.")
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explain.verbose.enabled")
.doc(
"When this setting is enabled, Comet will provide a verbose tree representation of " +
"the extended information.")
"When this setting is enabled, Comet's extended explain output will provide the full " +
"query plan annotated with fallback reasons as well as a summary of how much of " +
"the plan was accelerated by Comet. When this setting is disabled, a list of fallback " +
"reasons will be provided instead.")
.booleanConf
.createWithDefault(false)

Expand Down Expand Up @@ -495,6 +505,7 @@ object CometConf extends ShimCometConf {
"When this setting is enabled, Comet will provide logging explaining the reason(s) " +
"why a query stage cannot be executed natively. Set this to false to " +
"reduce the amount of logging.")
.internal()
.booleanConf
.createWithDefault(false)

Expand Down
5 changes: 3 additions & 2 deletions docs/source/user-guide/latest/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ $SPARK_HOME/bin/spark-shell \
--conf spark.executor.extraClassPath=$COMET_JAR \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.comet.explainFallback.enabled=true \
--conf spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo \
--conf spark.comet.explain.enabled=true \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=16g
--conf spark.memory.offHeap.size=2g
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change, but we don't need 16g for this simple example

```

### Verify Comet enabled for Spark SQL query
Expand Down
61 changes: 52 additions & 9 deletions spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import scala.collection.mutable

import org.apache.spark.sql.ExtendedExplainGenerator
import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag}
import org.apache.spark.sql.execution.{InputAdapter, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometPlan, CometSparkToColumnarExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, RowToColumnarExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec

import org.apache.comet.CometExplainInfo.getActualPlan

Expand Down Expand Up @@ -81,9 +83,14 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
// generates the extended info in a verbose manner, printing each node along with the
// extended information in a tree display
def generateVerboseExtendedInfo(plan: SparkPlan): String = {
val planStats = new PlanStats()
val outString = new StringBuilder()
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString)
outString.toString()
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats)
val eligible = planStats.sparkOperators + planStats.cometOperators
val converted =
if (eligible == 0) 0.0 else planStats.cometOperators.toDouble / eligible * 100.0
val summary = s"Comet accelerated ${converted.toInt}% of eligible operators ($planStats)."
s"${outString.toString()}\n$summary"
}

// Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any
Expand All @@ -92,7 +99,22 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
depth: Int,
lastChildren: Seq[Boolean],
indent: Int,
outString: StringBuilder): Unit = {
outString: StringBuilder,
planStats: PlanStats): Unit = {

node match {
case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec |
_: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec =>
planStats.wrappers += 1
case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec |
_: CometSparkToColumnarExec =>
planStats.transitions += 1
case _: CometPlan =>
planStats.cometOperators += 1
case _ =>
planStats.sparkOperators += 1
}

outString.append(" " * indent)
if (depth > 0) {
lastChildren.init.foreach { isLast =>
Expand All @@ -119,15 +141,17 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
depth + 2,
lastChildren :+ node.children.isEmpty :+ false,
indent,
outString)
outString,
planStats)
case _ =>
}
generateTreeString(
getActualPlan(innerChildrenLocal.last),
depth + 2,
lastChildren :+ node.children.isEmpty :+ true,
indent,
outString)
outString,
planStats)
}
if (node.children.nonEmpty) {
node.children.init.foreach {
Expand All @@ -137,18 +161,37 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
depth + 1,
lastChildren :+ false,
indent,
outString)
outString,
planStats)
case _ =>
}
node.children.last match {
case c @ (_: TreeNode[_]) =>
generateTreeString(getActualPlan(c), depth + 1, lastChildren :+ true, indent, outString)
generateTreeString(
getActualPlan(c),
depth + 1,
lastChildren :+ true,
indent,
outString,
planStats)
case _ =>
}
}
}
}

class PlanStats {
Comment thread
andygrove marked this conversation as resolved.
Outdated
var sparkOperators: Int = 0
var cometOperators: Int = 0
var wrappers: Int = 0
var transitions: Int = 0

override def toString: String = {
s"sparkOperators=$sparkOperators, cometOperators=$cometOperators, " +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could use a more verbose string here so that the meaning of these stats is a little more obvious

s"transitions=$transitions, wrappers=$wrappers"
}
}

object CometExplainInfo {
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")

Expand Down
31 changes: 25 additions & 6 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -638,13 +638,32 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {

var newPlan = transform(normalizedPlan)

// if the plan cannot be run fully natively then explain why (when appropriate
// config is enabled)
if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
if (CometConf.COMET_EXPLAIN_ENABLED.get()) {
// COMET_EXPLAIN_ENABLED shows an explain plan regardless of
// whether there were any fallback reasons or not
val info = new ExtendedExplainInfo()
if (info.extensionInfo(newPlan).nonEmpty) {
logWarning(
"Comet cannot execute some parts of this plan natively " +
val fallbackReasons = info.extensionInfo(newPlan)
if (fallbackReasons.nonEmpty) {
logInfo(
"Comet cannot accelerate some parts of this plan " +
s"(set ${CometConf.COMET_EXPLAIN_ENABLED.key}=false " +
"to disable this logging):\n" +
s"${info.generateVerboseExtendedInfo(newPlan)}")
} else if (CometConf.COMET_EXPLAIN_ENABLED.get()) {
logInfo(
"Comet fully accelerated this plan " +
s"(set ${CometConf.COMET_EXPLAIN_ENABLED.key}=false " +
"to disable this logging):\n" +
s"${info.generateVerboseExtendedInfo(newPlan)}")
}
} else if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
// COMET_EXPLAIN_FALLBACK_ENABLED only shows an explain plan if
// there were fallback reasons
val info = new ExtendedExplainInfo()
val fallbackReasons = info.extensionInfo(newPlan)
if (fallbackReasons.nonEmpty) {
logInfo(
"Comet cannot accelerate some parts of this plan " +
s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false " +
"to disable this logging):\n" +
s"${info.generateVerboseExtendedInfo(newPlan)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class CometExecSuite extends CometTestBase {
val (_, cometPlan) = checkSparkAnswer(df)
val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
assert(infos.contains("Dynamic Partition Pruning is not supported"))

withSQLConf(CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true") {
val extendedExplain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
assert(extendedExplain.contains("Comet accelerated 33% of eligible operators"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be this number fluctuating?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is currently stable across all Spark versions that we test with.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong. There is a failure due to a different percentage. I will make the test less specific.

- DPP fallback *** FAILED *** (1 second, 553 milliseconds)
  "BroadcastHashJoin
  :- ColumnarToRow
  :  +-  Scan parquet  [COMET: Dynamic Partition Pruning is not supported]
  :        +- SubqueryBroadcast
  :           +- BroadcastExchange
  :              +- CometColumnarToRow
  :                 +- CometFilter
  :                    +- CometScan [native_iceberg_compat] parquet 
  +- BroadcastExchange
     +- CometColumnarToRow
        +- CometFilter
           +- CometScan [native_iceberg_compat] parquet 
  
  Comet accelerated 4 out of 9 eligible operators (44%). Final plan contains 3 transitions between Spark and Comet." did not contain "Comet accelerated 33% of eligible operators" (CometExecSuite.scala:124)

}
}
}
}
Expand Down
Loading