Skip to content

Commit 77cd1d9

Browse files
committed
Merge remote-tracking branch 'apache/main' into feat/map-sort-spark4
# Conflicts: # spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala
2 parents 9ad865a + 023d912 commit 77cd1d9

98 files changed

Lines changed: 8986 additions & 8960 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,20 @@ object CometConf extends ShimCometConf {
427427
"The maximum number of columns to hash for round robin partitioning must be non-negative.")
428428
.createWithDefault(0)
429429

430+
val COMET_EXEC_SHUFFLE_REVERT_REDUNDANT_COLUMNAR_ENABLED: ConfigEntry[Boolean] =
431+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.revertRedundantColumnar.enabled")
432+
.category(CATEGORY_SHUFFLE)
433+
.doc(
434+
"When enabled, Comet reverts a `CometShuffleExchangeExec` with `CometColumnarShuffle` " +
435+
"back to Spark's `ShuffleExchangeExec` when both its parent and child are non-Comet " +
436+
"hash aggregate operators. This avoids a redundant " +
437+
"row -> Arrow -> shuffle -> Arrow -> row conversion when no Comet operator on either " +
438+
"side can consume columnar output. Disable to keep Comet columnar shuffle even in " +
439+
"that case, which preserves Comet's off-heap shuffle memory accounting at the cost of " +
440+
"the extra conversion.")
441+
.booleanConf
442+
.createWithDefault(true)
443+
430444
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
431445
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
432446
.category(CATEGORY_SHUFFLE)

docs/source/user-guide/latest/tuning.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,24 @@ partitioning keys. Columns that are not partitioning keys may contain complex ty
154154
Comet Columnar shuffle is JVM-based and supports `HashPartitioning`, `RoundRobinPartitioning`, `RangePartitioning`, and
155155
`SinglePartitioning`. This shuffle implementation supports complex data types as partitioning keys.
156156

157+
#### Automatic Revert to Spark Shuffle
158+
159+
When a Comet columnar shuffle ends up between two non-Comet operators (for example, a partial/final hash aggregate
160+
pair that Comet could not convert), Comet reverts it to Spark's built-in shuffle. Keeping columnar shuffle between
161+
two row-based operators would add `row -> Arrow -> shuffle -> Arrow -> row` conversions with no Comet consumer on
162+
either side to benefit from columnar output.
163+
164+
This shifts the affected shuffles from Comet's off-heap memory pool back to the JVM execution memory pool. Clusters
165+
tuned for a small JVM heap may see `ExternalSorter` spills on queries where this revert fires. Shuffle I/O may also
166+
grow marginally because Spark's row-based serializer generally compresses less well than Comet's Arrow IPC format.
167+
168+
Each revert is logged at `INFO` level on the driver as `Reverting Comet columnar shuffle to Spark shuffle between
169+
<parent> and <child>`, which lets you correlate any unexpected behavior with this optimization.
170+
171+
This optimization is enabled by default and can be disabled by setting
172+
`spark.comet.exec.shuffle.revertRedundantColumnar.enabled=false`, in which case Comet will keep the columnar shuffle
173+
even when both its parent and child are non-Comet operators.
174+
157175
### Shuffle Compression
158176

159177
By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression.

native/spark-expr/src/array_funcs/array_compact.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ fn compact_list<OffsetSize: OffsetSizeTrait>(
132132
);
133133
let mut valid = NullBufferBuilder::new(list_array.len());
134134

135+
// Use logical_nulls() instead of is_null() to correctly handle NullArray.
136+
// NullArray::nulls() returns None (which makes is_null() return false),
137+
// but logical_nulls() correctly reports all elements as null.
138+
let value_nulls = values.logical_nulls();
139+
135140
for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
136141
if list_array.is_null(row_index) {
137142
offsets.push(offsets[row_index]);
@@ -144,7 +149,8 @@ fn compact_list<OffsetSize: OffsetSizeTrait>(
144149
let mut copied = 0usize;
145150

146151
for i in start..end {
147-
if !values.is_null(i) {
152+
let is_null = value_nulls.as_ref().map(|n| n.is_null(i)).unwrap_or(false);
153+
if !is_null {
148154
mutable.extend(0, i, i + 1);
149155
copied += 1;
150156
}

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 88 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,13 @@ object CometExecRule {
8989

9090
val allExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] = nativeExecs ++ sinks
9191

92+
/**
93+
* Tag set on a `ShuffleExchangeExec` that should be left as a plain Spark shuffle rather than
94+
* wrapped in `CometShuffleExchangeExec`. See `tagRedundantColumnarShuffle`.
95+
*/
96+
val SKIP_COMET_SHUFFLE_TAG: org.apache.spark.sql.catalyst.trees.TreeNodeTag[Unit] =
97+
org.apache.spark.sql.catalyst.trees.TreeNodeTag[Unit]("comet.skipCometShuffle")
98+
9299
}
93100

94101
/**
@@ -100,19 +107,78 @@ case class CometExecRule(session: SparkSession)
100107

101108
private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()
102109

110+
/**
111+
* Revert any `CometShuffleExchangeExec` with `CometColumnarShuffle` whose parent and child are
112+
* both non-Comet `HashAggregateExec` / `ObjectHashAggregateExec` operators back to the original
113+
* Spark `ShuffleExchangeExec`. This is the partial-final-aggregate pattern where Comet couldn't
114+
* convert either aggregate; keeping a columnar shuffle between them only adds
115+
* row->arrow->shuffle->arrow->row conversion overhead with no Comet consumer on either side.
116+
* See https://github.com/apache/datafusion-comet/issues/4004.
117+
*
118+
* The match is intentionally narrow (both sides must be row-based aggregates that remained JVM
119+
* after the main transform pass). Running the revert post-transform means we only fire when the
120+
* main conversion already decided to keep both aggregates JVM - we never create the dangerous
121+
* mixed mode where a Comet partial feeds a JVM final (see issue #1389).
122+
*
123+
* Correctness depends on running as part of `preColumnarTransitions`: if the revert ran after
124+
* Spark inserted `ColumnarToRowExec` between the aggregate and the columnar shuffle, the
125+
* pattern would no longer match (the shuffle would be separated from the aggregate by the
126+
* transition) and the unnecessary conversion could not be eliminated.
127+
*
128+
* The reverted shuffle is tagged with `SKIP_COMET_SHUFFLE_TAG` so both the AQE
129+
* `QueryStagePrepRule` pass and the `ColumnarRule` `preColumnarTransitions` pass leave it alone
130+
* on re-entry - AQE in particular re-runs the rule on each stage in isolation, where the outer
131+
* aggregate context is no longer visible and the shuffle would otherwise be re-wrapped as a
132+
* Comet columnar shuffle.
133+
*/
134+
private def revertRedundantColumnarShuffle(plan: SparkPlan): SparkPlan = {
135+
def isAggregate(p: SparkPlan): Boolean =
136+
p.isInstanceOf[HashAggregateExec] || p.isInstanceOf[ObjectHashAggregateExec]
137+
138+
def isRedundantShuffle(child: SparkPlan): Boolean = child match {
139+
case s: CometShuffleExchangeExec =>
140+
s.shuffleType == CometColumnarShuffle && isAggregate(s.child)
141+
case _ => false
142+
}
143+
144+
plan.transform {
145+
case op if isAggregate(op) && op.children.exists(isRedundantShuffle) =>
146+
val newChildren = op.children.map {
147+
case s: CometShuffleExchangeExec
148+
if s.shuffleType == CometColumnarShuffle && isAggregate(s.child) =>
149+
val reverted =
150+
s.originalPlan.withNewChildren(Seq(s.child)).asInstanceOf[ShuffleExchangeExec]
151+
reverted.setTagValue(CometExecRule.SKIP_COMET_SHUFFLE_TAG, ())
152+
logInfo(
153+
"Reverting Comet columnar shuffle to Spark shuffle between " +
154+
s"${op.getClass.getSimpleName} and ${s.child.getClass.getSimpleName} " +
155+
"(no Comet operator on either side to consume columnar output)")
156+
reverted
157+
case other => other
158+
}
159+
op.withNewChildren(newChildren)
160+
}
161+
}
162+
163+
private def shouldSkipCometShuffle(s: ShuffleExchangeExec): Boolean =
164+
s.getTagValue(CometExecRule.SKIP_COMET_SHUFFLE_TAG).isDefined
165+
103166
private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
104-
plan.transformUp { case s: ShuffleExchangeExec =>
105-
CometShuffleExchangeExec.shuffleSupported(s) match {
106-
case Some(CometNativeShuffle) =>
107-
// Switch to use Decimal128 regardless of precision, since Arrow native execution
108-
// doesn't support Decimal32 and Decimal64 yet.
109-
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
110-
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
111-
case Some(CometColumnarShuffle) =>
112-
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
113-
case None =>
114-
s
115-
}
167+
plan.transformUp {
168+
case s: ShuffleExchangeExec if shouldSkipCometShuffle(s) =>
169+
s
170+
case s: ShuffleExchangeExec =>
171+
CometShuffleExchangeExec.shuffleSupported(s) match {
172+
case Some(CometNativeShuffle) =>
173+
// Switch to use Decimal128 regardless of precision, since Arrow native execution
174+
// doesn't support Decimal32 and Decimal64 yet.
175+
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
176+
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
177+
case Some(CometColumnarShuffle) =>
178+
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
179+
case None =>
180+
s
181+
}
116182
}
117183
}
118184

@@ -261,6 +327,9 @@ case class CometExecRule(session: SparkSession)
261327
case s @ ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) =>
262328
convertToComet(s, CometExchangeSink).getOrElse(s)
263329

330+
case s: ShuffleExchangeExec if shouldSkipCometShuffle(s) =>
331+
s
332+
264333
case s: ShuffleExchangeExec =>
265334
convertToComet(s, CometShuffleExchangeExec).getOrElse(s)
266335

@@ -464,6 +533,13 @@ case class CometExecRule(session: SparkSession)
464533
case CometScanWrapper(_, s) => s
465534
}
466535

536+
// Revert CometColumnarShuffle to Spark's ShuffleExchangeExec when both its parent and child
537+
// are non-Comet HashAggregate/ObjectHashAggregate operators that remained JVM after the main
538+
// transform pass. See https://github.com/apache/datafusion-comet/issues/4004.
539+
if (CometConf.COMET_EXEC_SHUFFLE_REVERT_REDUNDANT_COLUMNAR_ENABLED.get()) {
540+
newPlan = revertRedundantColumnarShuffle(newPlan)
541+
}
542+
467543
// Set up logical links
468544
newPlan = newPlan.transform {
469545
case op: CometExec =>

spark/src/main/scala/org/apache/comet/serde/arrays.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] {
295295
}
296296

297297
object CometArrayCompact extends CometExpressionSerde[Expression] {
298+
298299
override def convert(
299300
expr: Expression,
300301
inputs: Seq[Attribute],

spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator
2424
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
2525
import org.apache.spark.sql.internal.SQLConf
2626
import org.apache.spark.sql.internal.types.StringTypeWithCollation
27-
import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, MapType, StringType}
27+
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes, MapType, StringType}
2828

2929
import org.apache.comet.CometSparkSessionExtensions.withInfo
3030
import org.apache.comet.expressions.{CometCast, CometEvalMode}
@@ -56,6 +56,28 @@ trait CometExprShim extends CommonStringExprs {
5656
inputs: Seq[Attribute],
5757
binding: Boolean): Option[Expr] = {
5858
expr match {
59+
case knc: KnownNotContainsNull =>
60+
// On Spark 4.0, array_compact rewrites to KnownNotContainsNull(ArrayFilter(IsNotNull)).
61+
// Strip the wrapper and serialize the inner ArrayFilter as spark_array_compact.
62+
knc.child match {
63+
case filter: ArrayFilter =>
64+
filter.function.children.headOption match {
65+
case Some(_: IsNotNull) =>
66+
val arrayChild = filter.left
67+
val elementType = arrayChild.dataType.asInstanceOf[ArrayType].elementType
68+
val arrayExprProto = exprToProtoInternal(arrayChild, inputs, binding)
69+
val returnType = ArrayType(elementType)
70+
val scalarExpr = scalarFunctionExprToProtoWithReturnType(
71+
"spark_array_compact",
72+
returnType,
73+
false,
74+
arrayExprProto)
75+
optExprWithInfo(scalarExpr, knc, arrayChild)
76+
case _ => exprToProtoInternal(knc.child, inputs, binding)
77+
}
78+
case _ => exprToProtoInternal(knc.child, inputs, binding)
79+
}
80+
5981
case s: StaticInvoke
6082
if s.staticObject == classOf[StringDecode] &&
6183
s.dataType.isInstanceOf[StringType] &&
@@ -109,12 +131,6 @@ trait CometExprShim extends CommonStringExprs {
109131
val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*)
110132
optExprWithInfo(optExpr, wb, wb.children: _*)
111133

112-
// KnownNotContainsNull is a TaggingExpression added in Spark 4.0 that only
113-
// changes schema metadata (containsNull = false). It has no runtime effect,
114-
// so we pass through to the child expression.
115-
case k: KnownNotContainsNull =>
116-
exprToProtoInternal(k.child, inputs, binding)
117-
118134
// In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is
119135
// Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the
120136
// original StructsToJson and recurse so support-level checks apply.

spark/src/test/resources/sql-tests/expressions/array/array_compact.sql

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,48 @@
1717

1818

1919
statement
20-
CREATE TABLE test_array_compact(arr array<int>) USING parquet
20+
CREATE TABLE test_array_compact(
21+
ints array<int>,
22+
strs array<string>,
23+
dbls array<double>,
24+
nested array<array<int>>
25+
) USING parquet
2126

2227
statement
23-
INSERT INTO test_array_compact VALUES (array(1, NULL, 2, NULL, 3)), (array()), (NULL), (array(NULL, NULL)), (array(1, 2, 3))
28+
INSERT INTO test_array_compact VALUES
29+
(array(1, NULL, 2, NULL, 3), array('a', NULL, 'b', NULL, 'c'), array(1.0, NULL, 2.0), array(array(1, NULL, 3), NULL, array(4, NULL, 6))),
30+
(array(), array(), array(), array()),
31+
(NULL, NULL, NULL, NULL),
32+
(array(NULL, NULL), array(NULL, NULL), array(NULL, NULL), array(NULL, NULL)),
33+
(array(1, 2, 3), array('x', 'y', 'z'), array(1.5, 2.5), array(array(1, 2), array(3, 4)))
2434

25-
-- column argument
35+
-- integer column
2636
query
27-
SELECT array_compact(arr) FROM test_array_compact
37+
SELECT array_compact(ints) FROM test_array_compact
38+
39+
-- string column
40+
query
41+
SELECT array_compact(strs) FROM test_array_compact
42+
43+
-- double column
44+
query
45+
SELECT array_compact(dbls) FROM test_array_compact
46+
47+
-- nested array column: outer nulls removed, inner nulls preserved
48+
query
49+
SELECT array_compact(nested) FROM test_array_compact
2850

2951
-- literal arguments
3052
query
3153
SELECT array_compact(array(1, NULL, 2, NULL, 3))
3254

33-
-- string element type
34-
statement
35-
CREATE TABLE test_array_compact_str(arr array<string>) USING parquet
36-
37-
statement
38-
INSERT INTO test_array_compact_str VALUES (array('a', NULL, 'b', NULL, 'c')), (array()), (NULL), (array(NULL, NULL)), (array('', NULL, '', NULL))
55+
-- literal string array
56+
query
57+
SELECT array_compact(array('a', NULL, 'b'))
3958

59+
-- all-null literal array
4060
query
41-
SELECT array_compact(arr) FROM test_array_compact_str
61+
SELECT array_compact(array(NULL, NULL, NULL))
4262

4363
-- double element type
4464
query

0 commit comments

Comments
 (0)