Skip to content

Commit a481171

Browse files
committed
fix parquet writing, spark 3.4 dpp fallback, #4010 changes, and more
1 parent 0ee4ec8 commit a481171

15 files changed

Lines changed: 834 additions & 113 deletions

spark/src/main/scala/org/apache/comet/planner/CometPlanner.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
3030

3131
import org.apache.comet.CometConf
3232
import org.apache.comet.CometSparkSessionExtensions.isCometLoaded
33-
import org.apache.comet.planner.phases.{NormalizePrePass, Phase1LikelyComet, Phase2Decision, Phase3Emit, SubqueryBroadcastRewrite}
33+
import org.apache.comet.planner.phases.{NormalizePrePass, Phase1LikelyComet, Phase2Decision, Phase3Emit, Spark34DppFallbackPrePass, SubqueryBroadcastRewrite}
3434
import org.apache.comet.planner.tags.CometTags
3535
import org.apache.comet.rules.RewriteJoin
3636

@@ -93,15 +93,25 @@ case class CometPlanner(session: SparkSession) extends Rule[SparkPlan] with Logg
9393
case _ => false
9494
}))
9595

96-
val broadcastConsumers = BroadcastConsumerIndex.build(prepared, conf)
96+
// Phase 1 must run before BroadcastConsumerIndex.build: Phase 1's generic-exec prediction
97+
// reads children's LIKELY_COMET tags (post-order walk), and the index reads BHJ tags to
98+
// decide which broadcasts have a Comet consumer.
99+
val annotated1 = phase1LikelyComet(
100+
prepared,
101+
PlanningContext(
102+
session = session,
103+
conf = conf,
104+
broadcastConsumers = BroadcastConsumerIndex.Empty,
105+
hasInputFileExpressions = hasInputFileExpressions))
106+
107+
val broadcastConsumers = BroadcastConsumerIndex.build(annotated1, conf)
97108

98109
val context = PlanningContext(
99110
session = session,
100111
conf = conf,
101112
broadcastConsumers = broadcastConsumers,
102113
hasInputFileExpressions = hasInputFileExpressions)
103114

104-
val annotated1 = phase1LikelyComet(prepared, context)
105115
val annotated2 = phase2Decision(annotated1, context)
106116
val emitted = phase3Emit(annotated2, context)
107117
val reverted = revertOrphanedBroadcasts(emitted)
@@ -275,18 +285,18 @@ case class CometPlanner(session: SparkSession) extends Rule[SparkPlan] with Logg
275285
}
276286

277287
/**
278-
* Pre-pass: expression-level rewrites that change operator structure. Float NaN/zero
279-
* normalization around comparison operators (arrow-rs doesn't normalize) and SMJ-to-SHJ/BHJ
280-
* join rewriting. Both must run before classification because they change the set of node types
281-
* the later phases see.
288+
* Pre-pass: expression-level rewrites that change operator structure, plus the Spark 3.4 AQE
289+
* DPP fallback tagging. The latter must run before Phase 1 because Phase 2 reads its tags;
290+
* absorbs the legacy `CometSpark34AqeDppFallbackRule` queryStagePrep rule into the planner.
282291
*/
283292
private def prePass(plan: SparkPlan): SparkPlan = {
284293
val normalized = NormalizePrePass(plan)
285-
if (CometConf.COMET_REPLACE_SMJ.get()) {
294+
val rewritten = if (CometConf.COMET_REPLACE_SMJ.get()) {
286295
normalized.transformUp { case p => RewriteJoin.rewrite(p) }
287296
} else {
288297
normalized
289298
}
299+
Spark34DppFallbackPrePass(rewritten, conf)
290300
}
291301

292302
/**

spark/src/main/scala/org/apache/comet/planner/PlanningContext.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExc
2727
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
2828
import org.apache.spark.sql.internal.SQLConf
2929

30-
import org.apache.comet.planner.phases.Phase1LikelyComet
31-
3230
/**
3331
* State threaded through CometPlanner's three phases. Holds session-scoped data (configs, the
3432
* active SparkSession), a few plan-wide flags computed once at the top of `apply` so per-node
@@ -65,15 +63,21 @@ object BroadcastConsumerIndex extends Logging {
6563
}
6664

6765
/**
68-
* Walks `plan` looking for `BroadcastHashJoinExec` nodes that would themselves be LIKELY_COMET
69-
* under the current configuration. For each such join, records every `BroadcastExchangeExec` it
70-
* references as a consumer. Handles the AQE wrappers (`BroadcastQueryStageExec`,
71-
* `ReusedExchangeExec`) that hide the raw broadcast between planning and execution.
66+
* Walks `plan` looking for `BroadcastHashJoinExec` nodes that would themselves be LIKELY_COMET.
67+
* For each such join, records every `BroadcastExchangeExec` it references as a consumer.
68+
* Handles the AQE wrappers (`BroadcastQueryStageExec`, `ReusedExchangeExec`) that hide the raw
69+
* broadcast between planning and execution.
70+
*
71+
* Reads the `LIKELY_COMET` tag directly rather than re-computing — Phase 1 must have already
72+
* tagged the plan (see ordering in `CometPlanner.apply`).
7273
*/
7374
def build(plan: SparkPlan, conf: SQLConf): BroadcastConsumerIndex = {
7475
val consumed = new java.util.IdentityHashMap[BroadcastExchangeExec, java.lang.Boolean]()
7576
plan.foreach {
76-
case bhj: BroadcastHashJoinExec if Phase1LikelyComet.isLikelyComet(bhj, conf) =>
77+
case bhj: BroadcastHashJoinExec
78+
if bhj
79+
.getTagValue(org.apache.comet.planner.tags.CometTags.LIKELY_COMET)
80+
.getOrElse(false) =>
7781
bhj.children.foreach(indexBroadcast(_, consumed))
7882
case _ =>
7983
}

spark/src/main/scala/org/apache/comet/planner/gates/V1ScanGate.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefa
2929
import org.apache.spark.sql.comet.CometScanExec
3030
import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SubqueryAdaptiveBroadcastExec}
3131
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
32-
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
3332
import org.apache.spark.sql.internal.SQLConf
3433

3534
import org.apache.comet.CometConf._
@@ -87,6 +86,18 @@ object V1ScanGate extends Logging {
8786
return reject(s"Unsupported file format ${r.fileFormat}")
8887
}
8988

89+
// Disabling the vectorized reader opts into parquet-mr's permissive behavior (silent
90+
// overflow / null-on-narrowing). Comet has no parquet-mr-equivalent backend, so default
91+
// to falling back to Spark; the opt-in config lets the user accept the loss of those
92+
// behaviors and use Comet anyway.
93+
if (!conf.parquetVectorizedReaderEnabled &&
94+
!COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.get()) {
95+
return reject(
96+
s"$SCAN_NATIVE_DATAFUSION scan is incompatible with " +
97+
s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key}=false; set " +
98+
s"${COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key}=true to opt in")
99+
}
100+
90101
val hadoopConf = r.sparkSession.sessionState.newHadoopConfWithOptions(r.options)
91102

92103
val possibleDefaultValues = getExistenceDefaultValues(scanExec.requiredSchema)
@@ -127,11 +138,6 @@ object V1ScanGate extends Logging {
127138
return reject("Native DataFusion scan does not support row index generation")
128139
}
129140

130-
if (session.sessionState.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) &&
131-
ParquetUtils.hasFieldIds(scanExec.requiredSchema)) {
132-
return reject("Native DataFusion scan does not support Parquet field ID matching")
133-
}
134-
135141
val typeChecker = CometScanTypeChecker(SCAN_NATIVE_DATAFUSION)
136142
val schemaFallback = new ListBuffer[String]()
137143
val schemaSupported =

spark/src/main/scala/org/apache/comet/planner/phases/Phase1LikelyComet.scala

Lines changed: 58 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,31 @@ import org.apache.spark.sql.internal.SQLConf
3131

3232
import org.apache.comet.CometConf
3333
import org.apache.comet.CometSparkSessionExtensions.withInfo
34+
import org.apache.comet.planner.gates.S2CGate
3435
import org.apache.comet.planner.tags.CometTags
3536
import org.apache.comet.rules.CometExecRule
3637
import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, Unsupported}
3738

3839
/**
39-
* Phase 1: predict whether each node's serde would support it *in isolation*, ignoring child
40-
* gating. The resulting `LIKELY_COMET` tag is read by Phase 2 to make demand-aware decisions
41-
* (e.g. shuffle conversion pays off only if at least one side is likelyComet).
40+
* Phase 1: predict per node whether it will end up Comet in the final plan, expressed as a
41+
* `LIKELY_COMET` tag read by Phase 2's demand-aware decisions.
4242
*
43-
* "In isolation" means: config enabled, own structural / expression / type checks pass, but no
44-
* consideration of whether children have already been marked convertible. This breaks the
45-
* circularity that otherwise requires a retry pass (as in the old CometExecRule broadcast
46-
* handling).
43+
* Prediction inputs by node category:
44+
* - Scan leaves: serde + config gates.
45+
* - Shuffle / Broadcast / AQE stage wrappers / already-converted CometNativeExec: structural;
46+
* optimistically `true` for exchanges since their real convertibility depends on the children
47+
* they end up with, which Phase 3 re-checks at emit time.
48+
* - Generic exec operators (Project, Filter, HashAggregate, joins, ...): the operator's own
49+
* serde AND every child must be able to provide native input (child `LIKELY_COMET=true` or
50+
* S2C-eligible leaf). Mirrors the legacy `CometExecRule` generic case's implicit
51+
* `op.children.forall(_.isInstanceOf[CometNativeExec])` guard.
4752
*
48-
* The predicate currently wraps `CometOperatorSerde.getSupportLevel` and the `enabledConfig`
49-
* gate, with `Incompatible` treated per `COMET_EXPR_ALLOW_INCOMPATIBLE_OPERATORS` heuristics.
53+
* Anti-circularity property: no prediction depends on the operator's own DECISION nor on any
54+
* parent's prediction. That's why this phase is a single post-order walk with no fixed-point
55+
* iteration. The earlier "in isolation" framing (predict from serde alone, ignore children) was
56+
* not enough to make Phase 2's demand-aware rules correct: a shuffle between two Spark aggregates
57+
* needs `LIKELY_COMET=false` on both aggregates to decide Passthrough, and that requires
58+
* propagating cant-go-native upward from the failing descendant.
5059
*
5160
* This phase only mutates tag state on nodes. It does not change the plan tree shape.
5261
*/
@@ -55,20 +64,27 @@ object Phase1LikelyComet extends Logging {
5564
def apply(plan: SparkPlan, conf: SQLConf): SparkPlan = {
5665
var total = 0
5766
var likely = 0
58-
plan.foreach { node =>
67+
// Post-order walk: children's LIKELY_COMET tags are set before their parents are visited, so
68+
// `isLikelyComet` can read child tags when refining a parent's prediction (e.g. an exec
69+
// operator that needs columnar input from its children).
70+
def visit(node: SparkPlan): Unit = {
71+
node.children.foreach(visit)
5972
val verdict = isLikelyComet(node, conf)
6073
node.setTagValue(CometTags.LIKELY_COMET, verdict)
6174
total += 1
6275
if (verdict) likely += 1
6376
}
77+
visit(plan)
6478
logDebug(s"Phase1: total=$total likely=$likely")
6579
plan
6680
}
6781

6882
/**
69-
* Returns whether `node` would be LIKELY_COMET under the current configuration. Exposed so
70-
* other planner components (e.g. `BroadcastConsumerIndex`) can reason about a hypothetical
71-
* node's eligibility without walking the whole plan.
83+
* Returns whether `node` would be LIKELY_COMET under the current configuration. Generic exec
84+
* ops additionally require that each child either already has `LIKELY_COMET=true` or is an
85+
* S2C-eligible leaf, mirroring the legacy `CometExecRule` generic case which only converts when
86+
* `op.children.forall(_.isInstanceOf[CometNativeExec])`. External callers must ensure children
87+
* have been visited first (the planner's `apply` does this via post-order traversal).
7288
*/
7389
def isLikelyComet(node: SparkPlan, conf: SQLConf): Boolean = node match {
7490
// Never-convertible control plan nodes.
@@ -86,16 +102,23 @@ object Phase1LikelyComet extends Logging {
86102
case _: BatchScanExec =>
87103
CometConf.COMET_NATIVE_SCAN_ENABLED.get(conf)
88104

89-
case _: ShuffleExchangeExec =>
90-
// Optimistic: Phase 1 can't evaluate the shuffle serde's getSupportLevel accurately
91-
// because `shuffleSupported` checks `isCometPlan(child)` and children haven't been
92-
// converted yet. Emit-time guard in Phase 3 re-checks with the actual converted child.
93-
true
105+
case s: ShuffleExchangeExec =>
106+
// A shuffle's LIKELY_COMET reflects whether its data source can provide native input.
107+
// Optimistic-true broke upward propagation: a Spark partial HashAgg below a shuffle could
108+
// not flag the shuffle as non-likely, which made the final HashAgg above read a stale-true
109+
// child, which made Phase 2 see `parentLikely=true` on the shuffle and convert it. The
110+
// legacy rule lived with this and added `revertRedundantColumnarShuffle` (PR #4010) as a
111+
// post-pass; the planner avoids the conversion in the first place by predicting
112+
// accurately. Phase 3 still re-checks at emit time. S2C-eligible leaves count because
113+
// Phase 2 wraps them in `CometSparkToColumnarExec`.
114+
s.children.exists(c => childCanProvideNativeInput(c, conf))
94115

95-
case _: BroadcastExchangeExec =>
96-
// Same as shuffle. Broadcast's convertibility also depends on child type / state after
97-
// conversion. Phase 3 re-checks at emit time.
98-
true
116+
case b: BroadcastExchangeExec =>
117+
// Same shape as shuffle: a broadcast over Spark-only data cannot itself go native, so
118+
// Phase 1 must report that honestly for parent BHJs' children-OK checks to be correct.
119+
// Phase 2's BroadcastConsumerIndex still gates conversion on a downstream Comet BHJ
120+
// wanting this broadcast; this only changes whether the broadcast is a candidate at all.
121+
b.children.exists(c => childCanProvideNativeInput(c, conf))
99122

100123
// AQE stage re-entry: a prior CometPlanner pass converted an exchange, AQE materialized it
101124
// and wrapped it in a query stage. Phase 3 re-emits the stage itself as a Comet-compatible
@@ -113,8 +136,15 @@ object Phase1LikelyComet extends Logging {
113136
// Generic exec operators dispatched through the serde map.
114137
case op =>
115138
CometExecRule.allExecs.get(op.getClass) match {
116-
case Some(serde) =>
117-
predictFromSerde(op, serde.asInstanceOf[CometOperatorSerde[SparkPlan]], conf)
139+
case Some(rawSerde) =>
140+
val serde = rawSerde.asInstanceOf[CometOperatorSerde[SparkPlan]]
141+
val selfOk = predictFromSerde(op, serde, conf)
142+
// Use `serde.dataChildren` so operators with structural Spark wrappers
143+
// (e.g. DataWritingCommandExec wrapping WriteFilesExec) check the actual data
144+
// producers, not the wrapper. Mirrors the legacy CometExecRule generic case's
145+
// `op.children.forall(_.isInstanceOf[CometNativeExec])` guard, with the unwrap moved
146+
// into the serde.
147+
selfOk && serde.dataChildren(op).forall(c => childCanProvideNativeInput(c, conf))
118148
case None =>
119149
// Fall back: a leaf we don't recognize can't convert; a non-leaf we don't recognize
120150
// might still act as a passthrough in Phase 2 but is not itself LIKELY_COMET.
@@ -125,6 +155,10 @@ object Phase1LikelyComet extends Logging {
125155
}
126156
}
127157

158+
private def childCanProvideNativeInput(child: SparkPlan, conf: SQLConf): Boolean =
159+
child.getTagValue(CometTags.LIKELY_COMET).getOrElse(false) ||
160+
(child.isInstanceOf[LeafExecNode] && S2CGate.shouldApply(child, conf))
161+
128162
private def predictFromSerde(
129163
op: SparkPlan,
130164
serde: CometOperatorSerde[SparkPlan],

0 commit comments

Comments
 (0)