This document describes Comet's native shuffle implementation (CometNativeShuffle), which performs
shuffle operations entirely in Rust code for maximum performance. For the JVM-based alternative,
see JVM Shuffle.
Native shuffle takes columnar input directly from Comet native operators and performs partitioning, encoding, and writing in native Rust code. This avoids the columnar-to-row-to-columnar conversion overhead that JVM shuffle incurs.
Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar
Compare this to JVM shuffle's data path:
Comet Native (columnar) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → columnar
Native shuffle (CometExchange) is selected when all of the following conditions are met:
-
Shuffle mode allows native:
spark.comet.exec.shuffle.modeisnativeorauto. -
Child plan is a Comet native operator: The child must be a
CometPlanthat produces columnar output. Row-based Spark operators require JVM shuffle. -
Supported partitioning type: Native shuffle supports:
HashPartitioningRangePartitioningSinglePartition
RoundRobinPartitioningrequires JVM shuffle. -
Supported partition key types: For
HashPartitioning, both primitive and complex types (struct, array, map) are supported as partition keys. ForRangePartitioning, only primitive types are supported as partition keys.
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometShuffleManager │
│ - Routes to CometNativeShuffleWriter for CometNativeShuffleHandle │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometNativeShuffleWriter │
│ - Constructs protobuf operator plan │
│ - Invokes native execution via CometExec.getCometIterator() │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼ (JNI)
┌─────────────────────────────────────────────────────────────────────────────┐
│ ShuffleWriterExec (Rust) │
│ - DataFusion ExecutionPlan │
│ - Orchestrates partitioning and writing │
└─────────────────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌───────────────────────────────────┐ ┌───────────────────────────────────┐
│ MultiPartitionShuffleRepartitioner │ │ SinglePartitionShufflePartitioner │
│ (hash/range partitioning) │ │ (single partition case) │
└───────────────────────────────────┘ └───────────────────────────────────┘
│
▼
┌───────────────────────────────────┐
│ ShuffleBlockWriter │
│ (Arrow IPC + compression) │
└───────────────────────────────────┘
│
▼
┌─────────────────┐
│ Data + Index │
│ Files │
└─────────────────┘
| Class | Location | Description |
|---|---|---|
CometShuffleExchangeExec |
.../shuffle/CometShuffleExchangeExec.scala |
Physical plan node. Validates types and partitioning, creates CometShuffleDependency. |
CometNativeShuffleWriter |
.../shuffle/CometNativeShuffleWriter.scala |
Implements ShuffleWriter. Builds protobuf plan and invokes native execution. |
CometShuffleDependency |
.../shuffle/CometShuffleDependency.scala |
Extends ShuffleDependency. Holds shuffle type, schema, and range partition bounds. |
CometBlockStoreShuffleReader |
.../shuffle/CometBlockStoreShuffleReader.scala |
Reads shuffle blocks via ShuffleBlockFetcherIterator. Decodes Arrow IPC to ColumnarBatch. |
NativeBatchDecoderIterator |
.../shuffle/NativeBatchDecoderIterator.scala |
Reads compressed Arrow IPC from input stream. Calls native decode via JNI. |
| File | Location | Description |
|---|---|---|
shuffle_writer.rs |
native/core/src/execution/shuffle/ |
ShuffleWriterExec plan and partitioners. Main shuffle logic. |
codec.rs |
native/core/src/execution/shuffle/ |
ShuffleBlockWriter for Arrow IPC encoding with compression. Also handles decoding. |
comet_partitioning.rs |
native/core/src/execution/shuffle/ |
CometPartitioning enum defining partition schemes (Hash, Range, Single). |
-
Plan construction:
CometNativeShuffleWriterbuilds a protobuf operator plan containing:- A scan operator reading from the input iterator
- A
ShuffleWriteroperator with partitioning config and compression codec
-
Native execution:
CometExec.getCometIterator()executes the plan in Rust. -
Partitioning:
ShuffleWriterExecreceives batches and routes to the appropriate partitioner:MultiPartitionShuffleRepartitioner: For hash/range partitioningSinglePartitionShufflePartitioner: For single partition (simpler path)
-
Buffering and spilling: The partitioner buffers rows per partition. When memory pressure exceeds the threshold, partitions spill to temporary files.
-
Encoding:
ShuffleBlockWriterencodes each partition's data as compressed Arrow IPC:- Writes compression type header
- Writes field count header
- Writes compressed IPC stream
-
Output files: Two files are produced:
- Data file: Concatenated partition data
- Index file: Array of 8-byte little-endian offsets marking partition boundaries
-
Commit: Back in JVM,
CometNativeShuffleWriterreads the index file to get partition lengths and commits via Spark'sIndexShuffleBlockResolver.
-
CometBlockStoreShuffleReaderfetches shuffle blocks viaShuffleBlockFetcherIterator. -
For each block,
NativeBatchDecoderIterator:- Reads the 8-byte compressed length header
- Reads the 8-byte field count header
- Reads the compressed IPC data
- Calls
Native.decodeShuffleBlock()via JNI
-
Native code decompresses and deserializes the Arrow IPC stream.
-
Arrow FFI transfers the
RecordBatchto JVM as aColumnarBatch.
Native shuffle implements Spark-compatible hash partitioning:
- Uses Murmur3 hash function with seed 42 (matching Spark)
- Computes hash of partition key columns
- Applies modulo by partition count:
partition_id = hash % num_partitions
For range partitioning:
- Spark's
RangePartitionersamples data and computes partition boundaries on the driver. - Boundaries are serialized to the native plan.
- Native code converts sort key columns to comparable row format.
- Binary search (
partition_point) determines which partition each row belongs to.
The simplest case: all rows go to partition 0. Uses SinglePartitionShufflePartitioner which
simply concatenates batches to reach the configured batch size.
Native shuffle uses DataFusion's memory management with spilling support:
- Memory pool: Tracks memory usage across the shuffle operation.
- Spill threshold: When buffered data exceeds the threshold, partitions spill to disk.
- Per-partition spilling: Each partition has its own spill file. Multiple spills for a partition are concatenated when writing the final output.
- Scratch space: Reusable buffers for partition ID computation to reduce allocations.
The MultiPartitionShuffleRepartitioner manages:
PartitionBuffer: In-memory buffer for each partitionSpillFile: Temporary file for spilled data- Memory tracking via
MemoryConsumertrait
Native shuffle supports multiple compression codecs configured via
spark.comet.exec.shuffle.compression.codec:
| Codec | Description |
|---|---|
zstd |
Zstandard compression. Best ratio, configurable level. |
lz4 |
LZ4 compression. Fast with good ratio. |
snappy |
Snappy compression. Fastest, lower ratio. |
none |
No compression. |
The compression codec is applied uniformly to all partitions. Each partition's data is independently compressed, allowing parallel decompression during reads.
| Config | Default | Description |
|---|---|---|
spark.comet.exec.shuffle.enabled |
true |
Enable Comet shuffle |
spark.comet.exec.shuffle.mode |
auto |
Shuffle mode: native, jvm, or auto |
spark.comet.exec.shuffle.compression.codec |
zstd |
Compression codec |
spark.comet.exec.shuffle.compression.zstd.level |
1 |
Zstd compression level |
spark.comet.shuffle.write.buffer.size |
1MB |
Write buffer size |
spark.comet.columnar.shuffle.batch.size |
8192 |
Target rows per batch |
| Aspect | Native Shuffle | JVM Shuffle |
|---|---|---|
| Input format | Columnar (direct from Comet operators) | Row-based (via ColumnarToRowExec) |
| Partitioning logic | Rust implementation | Spark's partitioner |
| Supported schemes | Hash, Range, Single | Hash, Range, Single, RoundRobin |
| Partition key types | Primitives only | Any type |
| Performance | Higher (no format conversion) | Lower (columnar→row→columnar) |
| Writer variants | Single path | Bypass (hash) and sort-based |
See JVM Shuffle for details on the JVM-based implementation.