This document describes Comet's native shuffle implementation (CometNativeShuffle), which performs
shuffle operations entirely in Rust code for maximum performance. For the JVM-based alternative,
see JVM Shuffle.
Native shuffle takes columnar input directly from Comet native operators and performs partitioning, encoding, and writing in native Rust code. This avoids the columnar-to-row-to-columnar conversion overhead that JVM shuffle incurs.
Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar
Compare this to JVM shuffle's data path:
Comet Native (columnar) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → columnar
Native shuffle (CometExchange) is selected when all of the following conditions are met:
-
Shuffle mode allows native:
spark.comet.exec.shuffle.modeisnativeorauto. -
Child plan is a Comet native operator: The child must be a
CometPlanthat produces columnar output. Row-based Spark operators require JVM shuffle. -
Supported partitioning type: Native shuffle supports:
HashPartitioningRangePartitioningSinglePartitionRoundRobinPartitioning
-
Supported partition key types: For
HashPartitioningandRangePartitioning, partition keys must be primitive types. Complex types (struct, array, map) as partition keys require JVM shuffle. Note that complex types are fully supported as data columns in native shuffle.
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometShuffleManager │
│ - Routes to CometNativeShuffleWriter for CometNativeShuffleHandle │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometNativeShuffleWriter │
│ - Constructs protobuf operator plan │
│ - Invokes native execution via CometExec.getCometIterator() │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼ (JNI)
┌─────────────────────────────────────────────────────────────────────────────┐
│ ShuffleWriterExec (Rust) │
│ - DataFusion ExecutionPlan │
│ - Orchestrates partitioning and writing │
└─────────────────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌───────────────────────────────────────────────────────────────────────┐
│ Partitioner Selection │
│ Controlled by spark.comet.exec.shuffle.partitionerMode │
├───────────────────────────┬───────────────────────────────────────────┤
│ immediate │ buffered (default) │
│ ImmediateModePartitioner │ MultiPartitionShuffleRepartitioner │
│ (hash/range/round-robin) │ (hash/range/round-robin) │
│ Partitions batches as │ Buffers all input batches in │
│ they arrive, buffers as │ memory before writing │
│ IPC blocks │ │
├───────────────────────────┴───────────────────────────────────────────┤
│ SinglePartitionShufflePartitioner (single partition case) │
└───────────────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────┐
│ ShuffleBlockWriter │
│ (Arrow IPC + compression) │
└───────────────────────────────────┘
│
▼
┌─────────────────┐
│ Data + Index │
│ Files │
└─────────────────┘
| Class | Location | Description |
|---|---|---|
CometShuffleExchangeExec |
.../shuffle/CometShuffleExchangeExec.scala |
Physical plan node. Validates types and partitioning, creates CometShuffleDependency. |
CometNativeShuffleWriter |
.../shuffle/CometNativeShuffleWriter.scala |
Implements ShuffleWriter. Builds protobuf plan and invokes native execution. |
CometShuffleDependency |
.../shuffle/CometShuffleDependency.scala |
Extends ShuffleDependency. Holds shuffle type, schema, and range partition bounds. |
CometBlockStoreShuffleReader |
.../shuffle/CometBlockStoreShuffleReader.scala |
Reads shuffle blocks via ShuffleBlockFetcherIterator. Decodes Arrow IPC to ColumnarBatch. |
NativeBatchDecoderIterator |
.../shuffle/NativeBatchDecoderIterator.scala |
Reads compressed Arrow IPC from input stream. Calls native decode via JNI. |
| File | Location | Description |
|---|---|---|
shuffle_writer.rs |
native/shuffle/src/ |
ShuffleWriterExec plan. Selects partitioner based on immediate_mode flag. |
immediate_mode.rs |
native/shuffle/src/partitioners/ |
ImmediateModePartitioner. Scatter-writes rows into per-partition Arrow builders and flushes IPC blocks to in-memory buffers eagerly. |
multi_partition.rs |
native/shuffle/src/partitioners/ |
MultiPartitionShuffleRepartitioner. Buffers all rows in memory, then writes partitions. |
codec.rs |
native/shuffle/src/ |
ShuffleBlockWriter for Arrow IPC encoding with compression. Also handles decoding. |
comet_partitioning.rs |
native/shuffle/src/ |
CometPartitioning enum defining partition schemes (Hash, Range, Single). |
-
Plan construction:
CometNativeShuffleWriterbuilds a protobuf operator plan containing:- A scan operator reading from the input iterator
- A
ShuffleWriteroperator with partitioning config and compression codec
-
Native execution:
CometExec.getCometIterator()executes the plan in Rust. -
Partitioning:
ShuffleWriterExecreceives batches and routes to the appropriate partitioner based on thepartitionerModeconfiguration:-
Immediate mode (
ImmediateModePartitioner): For hash/range/round-robin partitioning. As each batch arrives, rows are scattered into per-partition Arrow array builders. When a partition's builder reaches the target batch size, it is flushed as a compressed Arrow IPC block to an in-memory buffer. Under memory pressure, these buffers are spilled to per-partition temporary files. This keeps memory usage much lower than buffered mode since data is encoded into compact IPC format eagerly rather than held as raw Arrow arrays. -
Buffered mode (
MultiPartitionShuffleRepartitioner): For hash/range/round-robin partitioning. Buffers all inputRecordBatches in memory, then partitions and writes them in a single pass. When memory pressure exceeds the threshold, buffered data is partitioned and spilled to per-partition temporary files. -
SinglePartitionShufflePartitioner: For single partition (simpler path, used regardless of partitioner mode).
-
-
Encoding:
ShuffleBlockWriterencodes each partition's data as compressed Arrow IPC:- Writes compression type header
- Writes field count header
- Writes compressed IPC stream
-
Output files: Two files are produced:
- Data file: Concatenated partition data
- Index file: Array of 8-byte little-endian offsets marking partition boundaries
-
Commit: Back in JVM,
CometNativeShuffleWriterreads the index file to get partition lengths and commits via Spark'sIndexShuffleBlockResolver.
-
CometBlockStoreShuffleReaderfetches shuffle blocks viaShuffleBlockFetcherIterator. -
For each block,
NativeBatchDecoderIterator:- Reads the 8-byte compressed length header
- Reads the 8-byte field count header
- Reads the compressed IPC data
- Calls
Native.decodeShuffleBlock()via JNI
-
Native code decompresses and deserializes the Arrow IPC stream.
-
Arrow FFI transfers the
RecordBatchto JVM as aColumnarBatch.
Native shuffle implements Spark-compatible hash partitioning:
- Uses Murmur3 hash function with seed 42 (matching Spark)
- Computes hash of partition key columns
- Applies modulo by partition count:
partition_id = hash % num_partitions
For range partitioning:
- Spark's
RangePartitionersamples data and computes partition boundaries on the driver. - Boundaries are serialized to the native plan.
- Native code converts sort key columns to comparable row format.
- Binary search (
partition_point) determines which partition each row belongs to.
The simplest case: all rows go to partition 0. Uses SinglePartitionShufflePartitioner which
simply concatenates batches to reach the configured batch size.
Comet implements round robin partitioning using hash-based assignment for determinism:
- Computes a Murmur3 hash of columns (using seed 42)
- Assigns partitions directly using the hash:
partition_id = hash % num_partitions
This approach guarantees determinism across retries, which is critical for fault tolerance. However, unlike true round robin which cycles through partitions row-by-row, hash-based assignment only provides even distribution when the data has sufficient variation in the hashed columns. Data with low cardinality or identical values may result in skewed partition sizes.
Native shuffle uses DataFusion's memory management. The memory characteristics differ between the two partitioner modes:
Immediate mode keeps memory usage low by partitioning and encoding data eagerly as it arrives, rather than buffering all input batches before writing:
- Per-partition builders: Each partition has a set of Arrow array builders sized to the target batch size. When a builder fills up, it is flushed as a compressed IPC block to an in-memory buffer.
- Memory footprint: Proportional to
num_partitions × num_columns × batch_size_in_rowsfor the builders, plus the accumulated IPC buffers. This is typically much smaller than buffered mode since IPC encoding is more compact than raw Arrow arrays. - Spilling: When memory pressure is detected via DataFusion's
MemoryConsumertrait, partition builders are flushed, held references to sliced/filteredRecordBatches are released, and all IPC buffers are drained to per-partition temporary files on disk.
Buffered mode holds all input data in memory before writing:
- Buffered batches: All incoming
RecordBatches are accumulated in aVec. - Spill threshold: When buffered data exceeds the memory threshold, partitions spill to temporary files on disk.
- Per-partition spilling: Each partition has its own spill file. Multiple spills for a partition are concatenated when writing the final output.
- Scratch space: Reusable buffers for partition ID computation to reduce allocations.
The MultiPartitionShuffleRepartitioner manages:
PartitionBuffer: In-memory buffer for each partitionSpillFile: Temporary file for spilled data- Memory tracking via
MemoryConsumertrait
Native shuffle supports multiple compression codecs configured via
spark.comet.exec.shuffle.compression.codec:
| Codec | Description |
|---|---|
zstd |
Zstandard compression. Best ratio, configurable level. |
lz4 |
LZ4 compression. Fast with good ratio. |
snappy |
Snappy compression. Fastest, lower ratio. |
none |
No compression. |
The compression codec is applied uniformly to all partitions. Each partition's data is independently compressed, allowing parallel decompression during reads.
| Config | Default | Description |
|---|---|---|
spark.comet.exec.shuffle.enabled |
true |
Enable Comet shuffle |
spark.comet.exec.shuffle.mode |
auto |
Shuffle mode: native, jvm, or auto |
spark.comet.exec.shuffle.partitionerMode |
buffered |
Partitioner mode: immediate or buffered |
spark.comet.exec.shuffle.compression.codec |
zstd |
Compression codec |
spark.comet.exec.shuffle.compression.zstd.level |
1 |
Zstd compression level |
spark.comet.shuffle.write.buffer.size |
1MB |
Write buffer size |
spark.comet.columnar.shuffle.batch.size |
8192 |
Target rows per batch |
| Aspect | Native Shuffle | JVM Shuffle |
|---|---|---|
| Input format | Columnar (direct from Comet operators) | Row-based (via ColumnarToRowExec) |
| Partitioning logic | Rust implementation | Spark's partitioner |
| Supported schemes | Hash, Range, Single, RoundRobin | Hash, Range, Single, RoundRobin |
| Partition key types | Primitives only (Hash, Range) | Any type |
| Performance | Higher (no format conversion) | Lower (columnar→row→columnar) |
| Writer variants | Single path | Bypass (hash) and sort-based |
See JVM Shuffle for details on the JVM-based implementation.