Skip to content

Commit fc29bc3

Browse files
authored
docs: add Understanding Comet Plans user guide page (#4086)
1 parent 4b086bb commit fc29bc3

3 files changed

Lines changed: 235 additions & 11 deletions

File tree

docs/source/user-guide/latest/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ Comet $COMET_VERSION User Guide
3535
Supported Expressions <expressions>
3636
Configuration Settings <configs>
3737
Compatibility Guide <compatibility/index>
38+
Understanding Comet Plans <understanding-comet-plans>
3839
Tuning Guide <tuning>
3940
Metrics Guide <metrics>
4041
Iceberg Guide <iceberg>

docs/source/user-guide/latest/tuning.md

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,5 @@ certain environments, such as single-node setups with fast NVMe drives, at the e
180180

181181
## Explain Plan
182182

183-
### Extended Explain
184-
185-
With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists
186-
reasons why Comet may not have been enabled for specific operations.
187-
To enable this, in the Spark configuration, set the following:
188-
189-
```shell
190-
-c spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo
191-
```
192-
193-
This will add a section to the detailed plan displayed in the Spark SQL UI page.
183+
For an explanation of Comet plan output, the configs that control it, and how
184+
fallback to Spark works, see [Understanding Comet Plans](understanding-comet-plans.md).
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Understanding Comet Plans
21+
22+
This guide explains how to read a Spark query plan once Comet is enabled, what
23+
happens when parts of a plan fall back to Spark, and which configs to use to
24+
inspect that behavior.
25+
26+
## Overview
27+
28+
When Comet is enabled, the `CometSparkSessionExtensions` rules walk the
29+
physical plan bottom-up and replace Spark operators with Comet equivalents
30+
where possible. Consecutive native operators are combined into a single block
31+
that is serialized as protobuf and executed by DataFusion on the executor.
32+
Operators that Comet does not support remain as their original Spark form.
33+
34+
As a result, a plan can mix three kinds of nodes:
35+
36+
- **`Comet*` nodes** that run natively in Rust (for example `CometProject`,
37+
`CometHashAggregate`).
38+
- **`Comet*` nodes that run on the JVM** but are still part of the Comet
39+
pipeline (for example `CometBroadcastExchange`, `CometColumnarExchange`).
40+
- **Standard Spark nodes** (for example `Project`, `HashAggregate`) where
41+
Comet either does not support the operator or has fallen back due to an
42+
unsupported expression, data type, or configuration.
43+
44+
Wherever data crosses between columnar and row-based execution, Comet inserts
45+
a transition node such as `CometColumnarToRow` or `CometSparkRowToColumnar`.
46+
47+
## Reading a Plan
48+
49+
You can print a plan with `df.explain("formatted")` or `EXPLAIN FORMATTED <sql>`, and
50+
the same plan is shown in the Spark SQL UI. When reading a plan, look for:
51+
52+
- **Node prefix.** `Comet*` nodes are accelerated by Comet. Anything without
53+
the prefix is unmodified Spark.
54+
- **Transitions.** `CometColumnarToRow`, `CometNativeColumnarToRow`, and
55+
`CometSparkRowToColumnar` mark boundaries between columnar Comet execution
56+
and row-based Spark execution. Frequent transitions usually indicate
57+
fallback inside the plan.
58+
- **Exchange type.** `CometExchange` is the native shuffle path,
59+
`CometColumnarExchange` is the JVM columnar shuffle path, and a plain
60+
`Exchange` means Spark shuffle. See [Shuffle Operators](#shuffle-operators)
61+
below.
62+
63+
## Fallback
64+
65+
A "fallback" happens when Comet cannot translate part of a plan into native
66+
execution. Fallback can be partial (a subtree falls back while the rest stays
67+
native) or full (no Comet nodes appear).
68+
69+
Common reasons:
70+
71+
- The Spark operator is not supported by Comet.
72+
- An expression inside an otherwise supported operator is not supported, or
73+
is marked incompatible and the per-expression opt-in
74+
`spark.comet.expression.<ExpressionName>.allowIncompatible=true` is not
75+
set. Operators have an equivalent
76+
`spark.comet.operator.<OperatorName>.allowIncompatible` opt-in.
77+
- A data type is not supported by the operator.
78+
- A configuration setting disables a specific operator or expression.
79+
80+
See [Supported Spark Operators](operators.md) and [Supported Expressions](expressions.md)
81+
for current coverage, and the [Compatibility Guide](compatibility/index.md) for
82+
incompatibility details.
83+
84+
## Configs for Inspecting Plans and Fallback
85+
86+
Comet provides four configs for understanding what is happening in a plan.
87+
They serve different purposes and produce output in different places.
88+
89+
| Config | Output destination | What you see |
90+
| ---------------------------------------- | ---------------------------------- | --------------------------------------------------------------------------------------------- |
91+
| `spark.comet.explainFallback.enabled` | Driver log (only when fallback) | A WARN with the list of reasons each query stage could not run natively. |
92+
| `spark.comet.logFallbackReasons.enabled` | Driver log | One WARN per fallback reason as it is encountered, without surrounding plan context. |
93+
| `spark.comet.explain.format` | Spark SQL UI (Spark 4.0 and newer) | Annotated plan or fallback-reason list, depending on `verbose` (default) or `fallback` value. |
94+
| `spark.comet.explain.native.enabled` | Executor logs, per task | The DataFusion native plan with metrics, useful for inspecting native execution. |
95+
96+
### `spark.comet.explainFallback.enabled`
97+
98+
Logs a single WARN listing the reasons each query stage could not be executed
99+
natively. Nothing is logged when the entire stage runs in Comet. Useful as a
100+
low-noise check that fallback is or is not happening.
101+
102+
### `spark.comet.logFallbackReasons.enabled`
103+
104+
Logs every fallback reason as it is encountered, one WARN per reason. Use this
105+
when you want to see all reasons, including ones that
106+
`spark.comet.explainFallback.enabled` may aggregate or omit. The output does
107+
not include the surrounding plan, so it is best for accumulating diagnostics
108+
across many queries.
109+
110+
### `spark.comet.explain.format`
111+
112+
This config is read by `org.apache.comet.ExtendedExplainInfo`, which Spark
113+
loads via the `spark.sql.extendedExplainProviders` mechanism added in Spark
114+
4.0. Add the provider:
115+
116+
```shell
117+
--conf spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo
118+
```
119+
120+
The Spark SQL UI then shows an additional section under the detailed plan.
121+
The format is controlled by `spark.comet.explain.format`:
122+
123+
- `verbose` (default): the full plan annotated with fallback reasons, plus a
124+
summary of how much of the plan is accelerated.
125+
- `fallback`: a list of fallback reasons only.
126+
127+
This is the most convenient option on Spark 4.0 because the output is shown
128+
inline in the UI. Earlier Spark versions do not have the
129+
`extendedExplainProviders` extension point, so this provider is not used and
130+
the config has no effect there.
131+
132+
### `spark.comet.explain.native.enabled`
133+
134+
When enabled, each executor task logs the DataFusion native plan it executes,
135+
along with metrics. This is verbose because there is one plan per task, but it
136+
is the only way to see the native plan as DataFusion sees it (including how
137+
operators were arranged after Comet's serialization). See the
138+
[Metrics Guide](metrics.md) for details on the native metrics that appear in
139+
this output.
140+
141+
## Comet Operator Reference
142+
143+
The following sections describe the Comet nodes you will see in plans, grouped
144+
by role. Names match what is shown in the plan output.
145+
146+
### Scans
147+
148+
| Node | Description |
149+
| ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
150+
| `CometScan` | V1 Parquet scan driven by Spark's file-source path through Comet's Parquet reader. Decoding runs in native code; the resulting Arrow batches cross JNI into the native plan. The active scan implementation is shown in brackets, e.g. `CometScan [native_iceberg_compat]`. |
151+
| `CometBatchScan` | DataSource V2 scan, including Iceberg Parquet, that produces Arrow batches consumed by Comet. |
152+
| `CometNativeScan` | Fully native Parquet scan that runs entirely in DataFusion (no JVM Parquet reader involvement). |
153+
| `CometIcebergNativeScan` | Fully native Iceberg Parquet scan. |
154+
| `CometCsvNativeScan` | Fully native CSV scan (experimental). |
155+
156+
### Native Execution Operators
157+
158+
These run natively in DataFusion. When several appear consecutively in a plan,
159+
they execute as a single fused native block.
160+
161+
| Node | Spark equivalent |
162+
| ---------------------------- | ---------------------------------------------- |
163+
| `CometProject` | `ProjectExec` |
164+
| `CometFilter` | `FilterExec` |
165+
| `CometSort` | `SortExec` |
166+
| `CometLocalLimit` | `LocalLimitExec` |
167+
| `CometGlobalLimit` | `GlobalLimitExec` |
168+
| `CometExpand` | `ExpandExec` |
169+
| `CometExplode` | `GenerateExec` (for `explode` only) |
170+
| `CometHashAggregate` | `HashAggregateExec`, `ObjectHashAggregateExec` |
171+
| `CometHashJoin` | `ShuffledHashJoinExec` |
172+
| `CometBroadcastHashJoin` | `BroadcastHashJoinExec` |
173+
| `CometSortMergeJoin` | `SortMergeJoinExec` |
174+
| `CometWindow` | `WindowExec` |
175+
| `CometTakeOrderedAndProject` | `TakeOrderedAndProjectExec` |
176+
177+
### JVM-Side Operators
178+
179+
These keep their data on the JVM but participate in the Comet pipeline.
180+
181+
| Node | Notes |
182+
| ------------------------ | ------------------------------------------------------------------------------------- |
183+
| `CometUnion` | JVM-side union of Comet inputs. The native side reads each branch as a separate scan. |
184+
| `CometCoalesce` | JVM-side partition coalesce. |
185+
| `CometCollectLimit` | JVM-side collect limit, equivalent to `CollectLimitExec`. |
186+
| `CometBroadcastExchange` | Broadcast exchange producing serialized Arrow batches that the consumer can decode. |
187+
| `CometSubqueryBroadcast` | Companion to `CometBroadcastExchange` for dynamic partition pruning subqueries. |
188+
189+
### Shuffle Operators
190+
191+
Comet has two shuffle implementations and the plan tells you which one is in
192+
use:
193+
194+
- **`CometExchange`** is the **native shuffle** path. The child must already
195+
be a Comet operator producing columnar Arrow batches; the node calls
196+
`executeColumnar()` on its child and the partition, encode, and compress
197+
steps run in native code. Hash and range partitioning **keys** must be
198+
primitive types because native hashing and ordering do not support complex
199+
types, but the data columns themselves can include `StructType`,
200+
`ArrayType`, and `MapType` since batches are serialized via the Arrow IPC
201+
writer.
202+
- **`CometColumnarExchange`** is the **JVM columnar shuffle** path. It accepts
203+
either Spark row-based input or Comet columnar input, which makes it the
204+
fallback when the child is not a Comet operator or when a hash/range key
205+
type is not supported by native shuffle (for example, collated strings). It
206+
is still preferred over Spark's native shuffle when Comet shuffle is
207+
enabled.
208+
209+
Both paths support the same set of partitioning schemes
210+
(`HashPartitioning`, `RangePartitioning`, `RoundRobinPartitioning`,
211+
`SinglePartition`) and both can carry complex types in data columns.
212+
213+
The choice between the two is automatic. See the
214+
[Tuning Guide shuffle section](tuning.md#shuffle) for how to enable Comet
215+
shuffle and choose between the implementations.
216+
217+
### Columnar/Row Transitions
218+
219+
Comet inserts these nodes wherever data has to cross the columnar/row boundary.
220+
Multiple implementations exist because the optimal strategy depends on what
221+
produced the columnar data.
222+
223+
| Node | Direction | Notes |
224+
| ------------------------------ | ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
225+
| `CometColumnarToRow` | columnar → row | JVM-based row conversion. A fork of Spark's `ColumnarToRowExec` that includes the SPARK-50235 fix. |
226+
| `CometNativeColumnarToRow` | columnar → row | Native row conversion that decodes broadcast Arrow batches via `NativeColumnarToRowConverter`. Used downstream of `CometBroadcastExchange`. Zero-copy for variable-length types and avoids an extra JVM materialization step. |
227+
| `CometSparkColumnarToColumnar` | columnar → columnar | Converts a Spark columnar input (a non-Comet `ColumnarBatch`) into Comet's Arrow batches. |
228+
| `CometSparkRowToColumnar` | row → columnar | Converts a Spark row input into Comet's Arrow batches. |
229+
230+
The two `CometSpark*` names come from a single `CometSparkToColumnarExec`
231+
operator that picks the node name based on whether its child supports
232+
columnar.

0 commit comments

Comments
 (0)