Skip to content

Commit 9750b26

Browse files
committed
feat: add immediate mode option for native shuffle
Add ImmediateModePartitioner that partitions incoming batches immediately using per-partition builders, flushing compressed IPC blocks when they reach target batch size. This reduces memory overhead compared to the buffered approach that stores all uncompressed rows before writing. Includes documentation and config option (spark.comet.exec.shuffle.partitionerMode). Default is buffered.
1 parent 6260665 commit 9750b26

12 files changed

Lines changed: 1247 additions & 49 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,18 @@ object CometConf extends ShimCometConf {
523523
"Should not be larger than batch size `spark.comet.batchSize`")
524524
.createWithDefault(8192)
525525

526+
val COMET_SHUFFLE_PARTITIONER_MODE: ConfigEntry[String] =
527+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.partitionerMode")
528+
.category(CATEGORY_SHUFFLE)
529+
.doc(
530+
"The partitioner mode used by the native shuffle writer. " +
531+
"'immediate' writes partitioned IPC blocks immediately as batches arrive, " +
532+
"reducing memory usage. 'buffered' buffers all rows before writing, which may " +
533+
"improve performance for small datasets but uses more memory.")
534+
.stringConf
535+
.checkValues(Set("immediate", "buffered"))
536+
.createWithDefault("buffered")
537+
526538
val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] =
527539
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.writeBufferSize")
528540
.category(CATEGORY_SHUFFLE)

docs/source/contributor-guide/native_shuffle.md

Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,18 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
8181
└─────────────────────────────────────────────────────────────────────────────┘
8282
│ │
8383
▼ ▼
84-
┌───────────────────────────────────┐ ┌───────────────────────────────────┐
85-
│ MultiPartitionShuffleRepartitioner │ │ SinglePartitionShufflePartitioner │
86-
│ (hash/range partitioning) │ │ (single partition case) │
87-
└───────────────────────────────────┘ └───────────────────────────────────┘
84+
┌───────────────────────────────────────────────────────────────────────┐
85+
│ Partitioner Selection │
86+
│ Controlled by spark.comet.exec.shuffle.partitionerMode │
87+
├───────────────────────────┬───────────────────────────────────────────┤
88+
│ immediate (default) │ buffered │
89+
│ ImmediateModePartitioner │ MultiPartitionShuffleRepartitioner │
90+
│ (hash/range/round-robin) │ (hash/range/round-robin) │
91+
│ Writes IPC blocks as │ Buffers all rows in memory │
92+
│ batches arrive │ before writing │
93+
├───────────────────────────┴───────────────────────────────────────────┤
94+
│ SinglePartitionShufflePartitioner (single partition case) │
95+
└───────────────────────────────────────────────────────────────────────┘
8896
8997
9098
┌───────────────────────────────────┐
@@ -113,11 +121,13 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
113121

114122
### Rust Side
115123

116-
| File | Location | Description |
117-
| ----------------------- | ------------------------------------ | ------------------------------------------------------------------------------------ |
118-
| `shuffle_writer.rs` | `native/core/src/execution/shuffle/` | `ShuffleWriterExec` plan and partitioners. Main shuffle logic. |
119-
| `codec.rs` | `native/core/src/execution/shuffle/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. |
120-
| `comet_partitioning.rs` | `native/core/src/execution/shuffle/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). |
124+
| File | Location | Description |
125+
| ----------------------- | ---------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------- |
126+
| `shuffle_writer.rs` | `native/shuffle/src/` | `ShuffleWriterExec` plan. Selects partitioner based on `immediate_mode` flag. |
127+
| `immediate_mode.rs` | `native/shuffle/src/partitioners/` | `ImmediateModePartitioner`. Scatter-writes rows into per-partition Arrow builders and flushes IPC blocks to in-memory buffers eagerly. |
128+
| `multi_partition.rs` | `native/shuffle/src/partitioners/` | `MultiPartitionShuffleRepartitioner`. Buffers all rows in memory, then writes partitions. |
129+
| `codec.rs` | `native/shuffle/src/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. |
130+
| `comet_partitioning.rs` | `native/shuffle/src/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). |
121131

122132
## Data Flow
123133

@@ -129,23 +139,33 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
129139

130140
2. **Native execution**: `CometExec.getCometIterator()` executes the plan in Rust.
131141

132-
3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner:
133-
- `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin partitioning
134-
- `SinglePartitionShufflePartitioner`: For single partition (simpler path)
142+
3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner
143+
based on the `partitionerMode` configuration:
144+
- **Immediate mode** (`ImmediateModePartitioner`): For hash/range/round-robin partitioning.
145+
As each batch arrives, rows are scattered into per-partition Arrow array builders. When a
146+
partition's builder reaches the target batch size, it is flushed as a compressed Arrow IPC
147+
block to an in-memory buffer. Under memory pressure, these buffers are spilled to
148+
per-partition temporary files. This keeps memory usage much lower than buffered mode since
149+
data is encoded into compact IPC format eagerly rather than held as raw Arrow arrays.
135150

136-
4. **Buffering and spilling**: The partitioner buffers rows per partition. When memory pressure
137-
exceeds the threshold, partitions spill to temporary files.
151+
- **Buffered mode** (`MultiPartitionShuffleRepartitioner`): For hash/range/round-robin
152+
partitioning. Buffers all input `RecordBatch`es in memory, then partitions and writes
153+
them in a single pass. When memory pressure exceeds the threshold, partitions spill to
154+
temporary files.
138155

139-
5. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC:
156+
- `SinglePartitionShufflePartitioner`: For single partition (simpler path, used regardless
157+
of partitioner mode).
158+
159+
4. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC:
140160
- Writes compression type header
141161
- Writes field count header
142162
- Writes compressed IPC stream
143163

144-
6. **Output files**: Two files are produced:
164+
5. **Output files**: Two files are produced:
145165
- **Data file**: Concatenated partition data
146166
- **Index file**: Array of 8-byte little-endian offsets marking partition boundaries
147167

148-
7. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition
168+
6. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition
149169
lengths and commits via Spark's `IndexShuffleBlockResolver`.
150170

151171
### Read Path
@@ -201,10 +221,31 @@ sizes.
201221

202222
## Memory Management
203223

204-
Native shuffle uses DataFusion's memory management with spilling support:
224+
Native shuffle uses DataFusion's memory management. The memory characteristics differ
225+
between the two partitioner modes:
226+
227+
### Immediate Mode
228+
229+
Immediate mode keeps memory usage low by partitioning and encoding data eagerly as it arrives,
230+
rather than buffering all input rows before writing:
231+
232+
- **Per-partition builders**: Each partition has a set of Arrow array builders sized to the
233+
target batch size. When a builder fills up, it is flushed as a compressed IPC block to an
234+
in-memory buffer.
235+
- **Memory footprint**: Proportional to `num_partitions × batch_size` for the builders, plus
236+
the accumulated IPC buffers. This is typically much smaller than buffered mode since IPC
237+
encoding is more compact than raw Arrow arrays.
238+
- **Spilling**: When memory pressure is detected via DataFusion's `MemoryConsumer` trait,
239+
partition builders are flushed and all IPC buffers are drained to per-partition temporary
240+
files on disk.
241+
242+
### Buffered Mode
243+
244+
Buffered mode holds all input data in memory before writing:
205245

206-
- **Memory pool**: Tracks memory usage across the shuffle operation.
207-
- **Spill threshold**: When buffered data exceeds the threshold, partitions spill to disk.
246+
- **Buffered batches**: All incoming `RecordBatch`es are accumulated in a `Vec`.
247+
- **Spill threshold**: When buffered data exceeds the memory threshold, partitions spill to
248+
temporary files on disk.
208249
- **Per-partition spilling**: Each partition has its own spill file. Multiple spills for a
209250
partition are concatenated when writing the final output.
210251
- **Scratch space**: Reusable buffers for partition ID computation to reduce allocations.
@@ -232,14 +273,15 @@ independently compressed, allowing parallel decompression during reads.
232273

233274
## Configuration
234275

235-
| Config | Default | Description |
236-
| ------------------------------------------------- | ------- | ---------------------------------------- |
237-
| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle |
238-
| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` |
239-
| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec |
240-
| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level |
241-
| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size |
242-
| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch |
276+
| Config | Default | Description |
277+
| ------------------------------------------------- | ----------- | ------------------------------------------- |
278+
| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle |
279+
| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` |
280+
| `spark.comet.exec.shuffle.partitionerMode` | `immediate` | Partitioner mode: `immediate` or `buffered` |
281+
| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec |
282+
| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level |
283+
| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size |
284+
| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch |
243285

244286
## Comparison with JVM Shuffle
245287

docs/source/user-guide/latest/tuning.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,17 @@ Comet provides a fully native shuffle implementation, which generally provides t
144144
supports `HashPartitioning`, `RangePartitioning` and `SinglePartitioning` but currently only supports primitive type
145145
partitioning keys. Columns that are not partitioning keys may contain complex types like maps, structs, and arrays.
146146

147+
Native shuffle has two partitioner modes, configured via
148+
`spark.comet.exec.shuffle.partitionerMode`:
149+
150+
- **`immediate`** (default): Writes partitioned Arrow IPC blocks to disk immediately as each batch
151+
arrives. This mode uses less memory because it does not need to buffer the entire input before
152+
writing. It is recommended for most workloads, especially large datasets.
153+
154+
- **`buffered`**: Buffers all input rows in memory before partitioning and writing to disk. This
155+
may improve performance for small datasets that fit in memory, but uses significantly more
156+
memory.
157+
147158
#### Columnar (JVM) Shuffle
148159

149160
Comet Columnar shuffle is JVM-based and supports `HashPartitioning`, `RoundRobinPartitioning`, `RangePartitioning`, and

native/core/src/execution/planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,6 +1379,7 @@ impl PhysicalPlanner {
13791379
writer.output_index_file.clone(),
13801380
writer.tracing_enabled,
13811381
write_buffer_size,
1382+
writer.immediate_mode,
13821383
)?);
13831384

13841385
Ok((

native/proto/src/proto/operator.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ message ShuffleWriter {
294294
// Size of the write buffer in bytes used when writing shuffle data to disk.
295295
// Larger values may improve write performance but use more memory.
296296
int32 write_buffer_size = 8;
297+
// Whether to use immediate mode partitioner. When true, partitioned IPC blocks
298+
// are written immediately as batches arrive. When false, rows are buffered
299+
// before writing (the original behavior).
300+
bool immediate_mode = 9;
297301
}
298302

299303
message ParquetWriter {

native/shuffle/README.md

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,32 +35,32 @@ performance outside of Spark. It streams input data directly from Parquet files.
3535
cargo run --release --features shuffle-bench --bin shuffle_bench -- \
3636
--input /data/tpch-sf100/lineitem/ \
3737
--partitions 200 \
38-
--codec lz4 \
38+
--codec zstd --zstd-level 1 \
3939
--hash-columns 0,3
4040
```
4141

4242
### Options
4343

44-
| Option | Default | Description |
45-
| --------------------- | -------------------------- | ------------------------------------------------------ |
46-
| `--input` | _(required)_ | Path to a Parquet file or directory of Parquet files |
47-
| `--partitions` | `200` | Number of output shuffle partitions |
48-
| `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` |
49-
| `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) |
50-
| `--codec` | `lz4` | Compression codec: `none`, `lz4`, `zstd`, `snappy` |
51-
| `--zstd-level` | `1` | Zstd compression level (1–22) |
52-
| `--batch-size` | `8192` | Batch size for reading Parquet data |
53-
| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded |
54-
| `--write-buffer-size` | `1048576` | Write buffer size in bytes |
55-
| `--limit` | `0` | Limit rows processed per iteration (0 = no limit) |
56-
| `--iterations` | `1` | Number of timed iterations |
57-
| `--warmup` | `0` | Number of warmup iterations before timing |
58-
| `--output-dir` | `/tmp/comet_shuffle_bench` | Directory for temporary shuffle output files |
44+
| Option | Default | Description |
45+
| ------------------------ | -------------------------- | ------------------------------------------------------------ |
46+
| `--input` | _(required)_ | Path to a Parquet file or directory of Parquet files |
47+
| `--partitions` | `200` | Number of output shuffle partitions |
48+
| `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` |
49+
| `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) |
50+
| `--codec` | `zstd` | Compression codec: `none`, `lz4`, `zstd`, `snappy` |
51+
| `--zstd-level` | `1` | Zstd compression level (1–22) |
52+
| `--batch-size` | `8192` | Batch size for reading Parquet data |
53+
| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded |
54+
| `--write-buffer-size` | `1048576` | Write buffer size in bytes |
55+
| `--limit` | `0` | Limit rows processed per iteration (0 = no limit) |
56+
| `--iterations` | `1` | Number of timed iterations |
57+
| `--warmup` | `0` | Number of warmup iterations before timing |
58+
| `--output-dir` | `/tmp/comet_shuffle_bench` | Directory for temporary shuffle output files |
5959

6060
### Profiling with flamegraph
6161

6262
```sh
6363
cargo flamegraph --release --features shuffle-bench --bin shuffle_bench -- \
6464
--input /data/tpch-sf100/lineitem/ \
65-
--partitions 200 --codec lz4
65+
--partitions 200 --codec zstd --zstd-level 1
6666
```

native/shuffle/benches/shuffle_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ fn create_shuffle_writer_exec(
153153
"/tmp/index.out".to_string(),
154154
false,
155155
1024 * 1024,
156+
false, // immediate_mode
156157
)
157158
.unwrap()
158159
}

0 commit comments

Comments
 (0)