This document describes Comet's native shuffle implementation (CometNativeShuffle), which performs
shuffle operations entirely in Rust code for maximum performance. For the JVM-based alternative,
see JVM Shuffle.
Native shuffle takes columnar input directly from Comet native operators and performs partitioning, encoding, and writing in native Rust code. This avoids the columnar-to-row-to-columnar conversion overhead that JVM shuffle incurs.
Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar
Compare this to JVM shuffle's data path:
Comet Native (columnar) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → columnar
Native shuffle (CometExchange) is selected when all of the following conditions are met:
-
Shuffle mode allows native:
spark.comet.exec.shuffle.modeisnativeorauto. -
Child plan is a Comet native operator: The child must be a
CometPlanthat produces columnar output. Row-based Spark operators require JVM shuffle. -
Supported partitioning type: Native shuffle supports:
HashPartitioningRangePartitioningSinglePartitionRoundRobinPartitioning
-
Supported partition key types: For
HashPartitioningandRangePartitioning, partition keys must be primitive types. Complex types (struct, array, map) as partition keys require JVM shuffle. Note that complex types are fully supported as data columns in native shuffle.
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometShuffleManager │
│ - Routes to CometNativeShuffleWriter for CometNativeShuffleHandle │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometNativeShuffleWriter │
│ - Constructs protobuf operator plan │
│ - Invokes native execution via CometExec.getCometIterator() │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼ (JNI)
┌─────────────────────────────────────────────────────────────────────────────┐
│ ShuffleWriterExec (Rust) │
│ - DataFusion ExecutionPlan │
│ - Orchestrates partitioning and writing │
└─────────────────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌───────────────────────────────────┐ ┌───────────────────────────────────┐
│ MultiPartitionShuffleRepartitioner │ │ SinglePartitionShufflePartitioner │
│ (hash/range partitioning) │ │ (single partition case) │
└───────────────────────────────────┘ └───────────────────────────────────┘
│
▼
┌───────────────────────────────────┐
│ ShuffleBlockWriter │
│ (Arrow IPC + compression) │
└───────────────────────────────────┘
│
▼
┌─────────────────┐
│ Data + Index │
│ Files │
└─────────────────┘
| Class | Location | Description |
|---|---|---|
CometShuffleExchangeExec |
.../shuffle/CometShuffleExchangeExec.scala |
Physical plan node. Validates types and partitioning, creates CometShuffleDependency. |
CometNativeShuffleWriter |
.../shuffle/CometNativeShuffleWriter.scala |
Implements ShuffleWriter. Builds protobuf plan and invokes native execution. |
CometShuffleDependency |
.../shuffle/CometShuffleDependency.scala |
Extends ShuffleDependency. Holds shuffle type, schema, and range partition bounds. |
CometBlockStoreShuffleReader |
.../shuffle/CometBlockStoreShuffleReader.scala |
Reads shuffle blocks via ShuffleBlockFetcherIterator. Decodes Arrow IPC to ColumnarBatch. |
NativeBatchDecoderIterator |
.../shuffle/NativeBatchDecoderIterator.scala |
Reads compressed Arrow IPC from input stream. Calls native decode via JNI. |
| File | Location | Description |
|---|---|---|
shuffle_writer.rs |
native/core/src/execution/shuffle/ |
ShuffleWriterExec plan and partitioners. Main shuffle logic. |
codec.rs |
native/core/src/execution/shuffle/ |
ShuffleBlockWriter for Arrow IPC encoding with compression. Also handles decoding. |
comet_partitioning.rs |
native/core/src/execution/shuffle/ |
CometPartitioning enum defining partition schemes (Hash, Range, Single). |
-
Plan construction:
CometNativeShuffleWriterbuilds a protobuf operator plan containing:- A scan operator reading from the input iterator
- A
ShuffleWriteroperator with partitioning config and compression codec
-
Native execution:
CometExec.getCometIterator()executes the plan in Rust. -
Partitioning:
ShuffleWriterExecreceives batches and routes to the appropriate partitioner:MultiPartitionShuffleRepartitioner: For hash/range/round-robin partitioningSinglePartitionShufflePartitioner: For single partition (simpler path)
-
Buffering and spilling: The partitioner buffers rows per partition. When memory pressure exceeds the threshold, partitions spill to temporary files.
-
Encoding:
ShuffleBlockWriterencodes each partition's data as compressed Arrow IPC:- Writes compression type header
- Writes field count header
- Writes compressed IPC stream
-
Output files: Two files are produced:
- Data file: Concatenated partition data
- Index file: Array of 8-byte little-endian offsets marking partition boundaries
-
Commit: Back in JVM,
CometNativeShuffleWriterreads the index file to get partition lengths and commits via Spark'sIndexShuffleBlockResolver.
-
CometBlockStoreShuffleReaderfetches shuffle blocks viaShuffleBlockFetcherIterator. -
For each block,
NativeBatchDecoderIterator:- Reads the 8-byte compressed length header
- Reads the 8-byte field count header
- Reads the compressed IPC data
- Calls
Native.decodeShuffleBlock()via JNI
-
Native code decompresses and deserializes the Arrow IPC stream.
-
Arrow FFI transfers the
RecordBatchto JVM as aColumnarBatch.
Native shuffle implements Spark-compatible hash partitioning:
- Uses Murmur3 hash function with seed 42 (matching Spark)
- Computes hash of partition key columns
- Applies modulo by partition count:
partition_id = hash % num_partitions
For range partitioning:
- Spark's
RangePartitionersamples data and computes partition boundaries on the driver. - Boundaries are serialized to the native plan.
- Native code converts sort key columns to comparable row format.
- Binary search (
partition_point) determines which partition each row belongs to.
The simplest case: all rows go to partition 0. Uses SinglePartitionShufflePartitioner which
simply concatenates batches to reach the configured batch size.
Round robin partitioning distributes rows evenly across partitions in a deterministic way:
- Computes a Murmur3 hash of all columns in each row (using seed 42)
- Sorts rows by their hash values to ensure deterministic ordering
- Assigns rows to partitions sequentially:
partition_id = sorted_index % num_partitions
This approach ensures that repeated execution of the same query produces identical results, which is critical for fault tolerance and retry logic. Unlike Spark's round robin implementation which uses random seeding, Comet's hash-based approach guarantees determinism across retries.
Native shuffle uses DataFusion's memory management with spilling support:
- Memory pool: Tracks memory usage across the shuffle operation.
- Spill threshold: When buffered data exceeds the threshold, partitions spill to disk.
- Per-partition spilling: Each partition has its own spill file. Multiple spills for a partition are concatenated when writing the final output.
- Scratch space: Reusable buffers for partition ID computation to reduce allocations.
The MultiPartitionShuffleRepartitioner manages:
PartitionBuffer: In-memory buffer for each partitionSpillFile: Temporary file for spilled data- Memory tracking via
MemoryConsumertrait
Native shuffle supports multiple compression codecs configured via
spark.comet.exec.shuffle.compression.codec:
| Codec | Description |
|---|---|
zstd |
Zstandard compression. Best ratio, configurable level. |
lz4 |
LZ4 compression. Fast with good ratio. |
snappy |
Snappy compression. Fastest, lower ratio. |
none |
No compression. |
The compression codec is applied uniformly to all partitions. Each partition's data is independently compressed, allowing parallel decompression during reads.
| Config | Default | Description |
|---|---|---|
spark.comet.exec.shuffle.enabled |
true |
Enable Comet shuffle |
spark.comet.exec.shuffle.mode |
auto |
Shuffle mode: native, jvm, or auto |
spark.comet.exec.shuffle.compression.codec |
zstd |
Compression codec |
spark.comet.exec.shuffle.compression.zstd.level |
1 |
Zstd compression level |
spark.comet.shuffle.write.buffer.size |
1MB |
Write buffer size |
spark.comet.columnar.shuffle.batch.size |
8192 |
Target rows per batch |
| Aspect | Native Shuffle | JVM Shuffle |
|---|---|---|
| Input format | Columnar (direct from Comet operators) | Row-based (via ColumnarToRowExec) |
| Partitioning logic | Rust implementation | Spark's partitioner |
| Supported schemes | Hash, Range, Single, RoundRobin | Hash, Range, Single, RoundRobin |
| Partition key types | Primitives only (Hash, Range) | Any type |
| Performance | Higher (no format conversion) | Lower (columnar→row→columnar) |
| Writer variants | Single path | Bypass (hash) and sort-based |
See JVM Shuffle for details on the JVM-based implementation.