Skip to content

Commit ec5df97

Browse files
authored
feat: Add support for round-robin partitioning in native shuffle (#3076)
1 parent 6a2209d commit ec5df97

11 files changed

Lines changed: 348 additions & 16 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,36 @@ object CometConf extends ShimCometConf {
375375
.booleanConf
376376
.createWithDefault(true)
377377

378+
val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
379+
conf("spark.comet.native.shuffle.partitioning.roundrobin.enabled")
380+
.category(CATEGORY_SHUFFLE)
381+
.doc(
382+
"Whether to enable round robin partitioning for Comet native shuffle. " +
383+
"This is disabled by default because Comet's round-robin produces different " +
384+
"partition assignments than Spark. Spark sorts rows by their binary UnsafeRow " +
385+
"representation before assigning partitions, but Comet uses Arrow format which " +
386+
"has a different binary layout. Instead, Comet implements round-robin as hash " +
387+
"partitioning on all columns, which achieves the same goals: even distribution, " +
388+
"deterministic output (for fault tolerance), and no semantic grouping. " +
389+
"Sorted output will be identical to Spark, but unsorted row ordering may differ.")
390+
.booleanConf
391+
.createWithDefault(false)
392+
393+
val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_MAX_HASH_COLUMNS: ConfigEntry[Int] =
394+
conf("spark.comet.native.shuffle.partitioning.roundrobin.maxHashColumns")
395+
.category(CATEGORY_SHUFFLE)
396+
.doc(
397+
"The maximum number of columns to hash for round robin partitioning. " +
398+
"When set to 0 (the default), all columns are hashed. " +
399+
"When set to a positive value, only the first N columns are used for hashing, " +
400+
"which can improve performance for wide tables while still providing " +
401+
"reasonable distribution.")
402+
.intConf
403+
.checkValue(
404+
v => v >= 0,
405+
"The maximum number of columns to hash for round robin partitioning must be non-negative.")
406+
.createWithDefault(0)
407+
378408
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
379409
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
380410
.category(CATEGORY_SHUFFLE)

docs/source/contributor-guide/jvm_shuffle.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,10 @@ JVM shuffle (`CometColumnarExchange`) is used instead of native shuffle (`CometE
4646
(not a `CometPlan`), JVM shuffle is the only option since native shuffle requires columnar input
4747
from Comet operators.
4848

49-
3. **Unsupported partitioning type**: Native shuffle only supports `HashPartitioning`, `RangePartitioning`,
50-
and `SinglePartition`. JVM shuffle additionally supports `RoundRobinPartitioning`.
51-
52-
4. **Unsupported partition key types**: For `HashPartitioning` and `RangePartitioning`, native shuffle
49+
3. **Unsupported partition key types**: For `HashPartitioning` and `RangePartitioning`, native shuffle
5350
only supports primitive types as partition keys. Complex types (struct, array, map) cannot be used
54-
as partition keys in native shuffle, though they are fully supported as data columns in both implementations.
51+
as partition keys in native shuffle and will fall back to JVM columnar shuffle. Note that complex types are
52+
fully supported as data columns in both implementations.
5553

5654
## Input Handling
5755

docs/source/contributor-guide/native_shuffle.md

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
5252
- `HashPartitioning`
5353
- `RangePartitioning`
5454
- `SinglePartition`
55-
56-
`RoundRobinPartitioning` requires JVM shuffle.
55+
- `RoundRobinPartitioning`
5756

5857
4. **Supported partition key types**: For `HashPartitioning` and `RangePartitioning`, partition
5958
keys must be primitive types. Complex types (struct, array, map) as partition keys require
@@ -131,7 +130,7 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
131130
2. **Native execution**: `CometExec.getCometIterator()` executes the plan in Rust.
132131

133132
3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner:
134-
- `MultiPartitionShuffleRepartitioner`: For hash/range partitioning
133+
- `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin partitioning
135134
- `SinglePartitionShufflePartitioner`: For single partition (simpler path)
136135

137136
4. **Buffering and spilling**: The partitioner buffers rows per partition. When memory pressure
@@ -187,6 +186,19 @@ For range partitioning:
187186
The simplest case: all rows go to partition 0. Uses `SinglePartitionShufflePartitioner` which
188187
simply concatenates batches to reach the configured batch size.
189188

189+
### Round Robin Partitioning
190+
191+
Comet implements round robin partitioning using hash-based assignment for determinism:
192+
193+
1. Computes a Murmur3 hash of columns (using seed 42)
194+
2. Assigns partitions directly using the hash: `partition_id = hash % num_partitions`
195+
196+
This approach guarantees determinism across retries, which is critical for fault tolerance.
197+
However, unlike true round robin which cycles through partitions row-by-row, hash-based
198+
assignment only provides even distribution when the data has sufficient variation in the
199+
hashed columns. Data with low cardinality or identical values may result in skewed partition
200+
sizes.
201+
190202
## Memory Management
191203

192204
Native shuffle uses DataFusion's memory management with spilling support:
@@ -235,8 +247,8 @@ independently compressed, allowing parallel decompression during reads.
235247
| ------------------- | -------------------------------------- | --------------------------------- |
236248
| Input format | Columnar (direct from Comet operators) | Row-based (via ColumnarToRowExec) |
237249
| Partitioning logic | Rust implementation | Spark's partitioner |
238-
| Supported schemes | Hash, Range, Single | Hash, Range, Single, RoundRobin |
239-
| Partition key types | Primitives only | Any type |
250+
| Supported schemes | Hash, Range, Single, RoundRobin | Hash, Range, Single, RoundRobin |
251+
| Partition key types | Primitives only (Hash, Range) | Any type |
240252
| Performance | Higher (no format conversion) | Lower (columnar→row→columnar) |
241253
| Writer variants | Single path | Bypass (hash) and sort-based |
242254

docs/source/user-guide/latest/compatibility.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,32 @@ this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
6969
Comet's support for window functions is incomplete and known to be incorrect. It is disabled by default and
7070
should not be used in production. The feature will be enabled in a future release. Tracking issue: [#2721](https://github.com/apache/datafusion-comet/issues/2721).
7171

72+
## Round-Robin Partitioning
73+
74+
Comet's native shuffle implementation of round-robin partitioning (`df.repartition(n)`) is not compatible with
75+
Spark's implementation and is disabled by default. It can be enabled by setting
76+
`spark.comet.native.shuffle.partitioning.roundrobin.enabled=true`.
77+
78+
**Why the incompatibility exists:**
79+
80+
Spark's round-robin partitioning sorts rows by their binary `UnsafeRow` representation before assigning them to
81+
partitions. This ensures deterministic output for fault tolerance (task retries produce identical results).
82+
Comet uses Arrow format internally, which has a completely different binary layout than `UnsafeRow`, making it
83+
impossible to match Spark's exact partition assignments.
84+
85+
**Comet's approach:**
86+
87+
Instead of true round-robin assignment, Comet implements round-robin as hash partitioning on ALL columns. This
88+
achieves the same semantic goals:
89+
90+
- **Even distribution**: Rows are distributed evenly across partitions (as long as the hash varies sufficiently -
91+
in some cases there could be skew)
92+
- **Deterministic**: Same input always produces the same partition assignments (important for fault tolerance)
93+
- **No semantic grouping**: Unlike hash partitioning on specific columns, this doesn't group related rows together
94+
95+
The only difference is that Comet's partition assignments will differ from Spark's. When results are sorted,
96+
they will be identical to Spark. Unsorted results may have different row ordering.
97+
7298
## Cast
7399

74100
Cast operations in Comet fall into three levels of support:

native/core/src/execution/planner.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2342,6 +2342,18 @@ impl PhysicalPlanner {
23422342
))
23432343
}
23442344
PartitioningStruct::SinglePartition(_) => Ok(CometPartitioning::SinglePartition),
2345+
PartitioningStruct::RoundRobinPartition(rr_partition) => {
2346+
// Treat negative max_hash_columns as 0 (no limit)
2347+
let max_hash_columns = if rr_partition.max_hash_columns <= 0 {
2348+
0
2349+
} else {
2350+
rr_partition.max_hash_columns as usize
2351+
};
2352+
Ok(CometPartitioning::RoundRobin(
2353+
rr_partition.num_partitions as usize,
2354+
max_hash_columns,
2355+
))
2356+
}
23452357
}
23462358
}
23472359

native/core/src/execution/shuffle/comet_partitioning.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ pub enum CometPartitioning {
3131
/// Rows for comparing to 4) OwnedRows that represent the boundaries of each partition, used with
3232
/// LexOrdering to bin each value in the RecordBatch to a partition.
3333
RangePartitioning(LexOrdering, usize, Arc<RowConverter>, Vec<OwnedRow>),
34+
/// Round robin partitioning. Distributes rows across partitions by sorting them by hash
35+
/// (computed from columns) and then assigning partitions sequentially. Args are:
36+
/// 1) number of partitions, 2) max columns to hash (0 means no limit).
37+
RoundRobin(usize, usize),
3438
}
3539

3640
impl CometPartitioning {
3741
pub fn partition_count(&self) -> usize {
3842
use CometPartitioning::*;
3943
match self {
4044
SinglePartition => 1,
41-
Hash(_, n) | RangePartitioning(_, n, _, _) => *n,
45+
Hash(_, n) | RangePartitioning(_, n, _, _) | RoundRobin(n, _) => *n,
4246
}
4347
}
4448
}

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 159 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,11 @@ impl MultiPartitionShuffleRepartitioner {
382382
// The initial values are not used.
383383
let scratch = ScratchSpace {
384384
hashes_buf: match partitioning {
385-
// Only allocate the hashes_buf if hash partitioning.
386-
CometPartitioning::Hash(_, _) => vec![0; batch_size],
385+
// Allocate hashes_buf for hash and round robin partitioning.
386+
// Round robin hashes all columns to achieve even, deterministic distribution.
387+
CometPartitioning::Hash(_, _) | CometPartitioning::RoundRobin(_, _) => {
388+
vec![0; batch_size]
389+
}
387390
_ => vec![],
388391
},
389392
partition_ids: vec![0; batch_size],
@@ -598,6 +601,68 @@ impl MultiPartitionShuffleRepartitioner {
598601
.await?;
599602
self.scratch = scratch;
600603
}
604+
CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => {
605+
// Comet implements "round robin" as hash partitioning on columns.
606+
// This achieves the same goal as Spark's round robin (even distribution
607+
// without semantic grouping) while being deterministic for fault tolerance.
608+
//
609+
// Note: This produces different partition assignments than Spark's round robin,
610+
// which sorts by UnsafeRow binary representation before assigning partitions.
611+
// However, both approaches provide even distribution and determinism.
612+
let mut scratch = std::mem::take(&mut self.scratch);
613+
let (partition_starts, partition_row_indices): (&Vec<u32>, &Vec<u32>) = {
614+
let mut timer = self.metrics.repart_time.timer();
615+
616+
let num_rows = input.num_rows();
617+
618+
// Collect columns for hashing, respecting max_hash_columns limit
619+
// max_hash_columns of 0 means no limit (hash all columns)
620+
// Negative values are normalized to 0 in the planner
621+
let num_columns_to_hash = if *max_hash_columns == 0 {
622+
input.num_columns()
623+
} else {
624+
(*max_hash_columns).min(input.num_columns())
625+
};
626+
let columns_to_hash: Vec<ArrayRef> = (0..num_columns_to_hash)
627+
.map(|i| Arc::clone(input.column(i)))
628+
.collect();
629+
630+
// Use identical seed as Spark hash partitioning.
631+
let hashes_buf = &mut scratch.hashes_buf[..num_rows];
632+
hashes_buf.fill(42_u32);
633+
634+
// Compute hash for selected columns
635+
create_murmur3_hashes(&columns_to_hash, hashes_buf)?;
636+
637+
// Assign partition IDs based on hash (same as hash partitioning)
638+
let partition_ids = &mut scratch.partition_ids[..num_rows];
639+
hashes_buf.iter().enumerate().for_each(|(idx, hash)| {
640+
partition_ids[idx] = pmod(*hash, *num_output_partitions) as u32;
641+
});
642+
643+
// We now have partition ids for every input row, map that to partition starts
644+
// and partition indices to eventually write these rows to partition buffers.
645+
map_partition_ids_to_starts_and_indices(
646+
&mut scratch,
647+
*num_output_partitions,
648+
num_rows,
649+
);
650+
651+
timer.stop();
652+
Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
653+
&scratch.partition_starts,
654+
&scratch.partition_row_indices,
655+
))
656+
}?;
657+
658+
self.buffer_partitioned_batch_may_spill(
659+
input,
660+
partition_row_indices,
661+
partition_starts,
662+
)
663+
.await?;
664+
self.scratch = scratch;
665+
}
601666
other => {
602667
// this should be unreachable as long as the validation logic
603668
// in the constructor is kept up-to-date
@@ -1431,6 +1496,7 @@ mod test {
14311496
Arc::new(row_converter),
14321497
owned_rows,
14331498
),
1499+
CometPartitioning::RoundRobin(num_partitions, 0),
14341500
] {
14351501
let batches = (0..num_batches).map(|_| batch.clone()).collect::<Vec<_>>();
14361502

@@ -1483,4 +1549,95 @@ mod test {
14831549
let expected = vec![69, 5, 193, 171, 115];
14841550
assert_eq!(result, expected);
14851551
}
1552+
1553+
#[test]
1554+
#[cfg_attr(miri, ignore)]
1555+
fn test_round_robin_deterministic() {
1556+
// Test that round robin partitioning produces identical results when run multiple times
1557+
use std::fs;
1558+
use std::io::Read;
1559+
1560+
let batch_size = 1000;
1561+
let num_batches = 10;
1562+
let num_partitions = 8;
1563+
1564+
let batch = create_batch(batch_size);
1565+
let batches = (0..num_batches).map(|_| batch.clone()).collect::<Vec<_>>();
1566+
1567+
// Run shuffle twice and compare results
1568+
for run in 0..2 {
1569+
let data_file = format!("/tmp/rr_data_{}.out", run);
1570+
let index_file = format!("/tmp/rr_index_{}.out", run);
1571+
1572+
let partitions = std::slice::from_ref(&batches);
1573+
let exec = ShuffleWriterExec::try_new(
1574+
Arc::new(DataSourceExec::new(Arc::new(
1575+
MemorySourceConfig::try_new(partitions, batch.schema(), None).unwrap(),
1576+
))),
1577+
CometPartitioning::RoundRobin(num_partitions, 0),
1578+
CompressionCodec::Zstd(1),
1579+
data_file.clone(),
1580+
index_file.clone(),
1581+
false,
1582+
1024 * 1024,
1583+
)
1584+
.unwrap();
1585+
1586+
let config = SessionConfig::new();
1587+
let runtime_env = Arc::new(
1588+
RuntimeEnvBuilder::new()
1589+
.with_memory_limit(10 * 1024 * 1024, 1.0)
1590+
.build()
1591+
.unwrap(),
1592+
);
1593+
let session_ctx = Arc::new(SessionContext::new_with_config_rt(config, runtime_env));
1594+
let task_ctx = Arc::new(TaskContext::from(session_ctx.as_ref()));
1595+
1596+
// Execute the shuffle
1597+
futures::executor::block_on(async {
1598+
let mut stream = exec.execute(0, Arc::clone(&task_ctx)).unwrap();
1599+
while stream.next().await.is_some() {}
1600+
});
1601+
1602+
if run == 1 {
1603+
// Compare data files
1604+
let mut data0 = Vec::new();
1605+
fs::File::open("/tmp/rr_data_0.out")
1606+
.unwrap()
1607+
.read_to_end(&mut data0)
1608+
.unwrap();
1609+
let mut data1 = Vec::new();
1610+
fs::File::open("/tmp/rr_data_1.out")
1611+
.unwrap()
1612+
.read_to_end(&mut data1)
1613+
.unwrap();
1614+
assert_eq!(
1615+
data0, data1,
1616+
"Round robin shuffle data should be identical across runs"
1617+
);
1618+
1619+
// Compare index files
1620+
let mut index0 = Vec::new();
1621+
fs::File::open("/tmp/rr_index_0.out")
1622+
.unwrap()
1623+
.read_to_end(&mut index0)
1624+
.unwrap();
1625+
let mut index1 = Vec::new();
1626+
fs::File::open("/tmp/rr_index_1.out")
1627+
.unwrap()
1628+
.read_to_end(&mut index1)
1629+
.unwrap();
1630+
assert_eq!(
1631+
index0, index1,
1632+
"Round robin shuffle index should be identical across runs"
1633+
);
1634+
}
1635+
}
1636+
1637+
// Clean up
1638+
let _ = fs::remove_file("/tmp/rr_data_0.out");
1639+
let _ = fs::remove_file("/tmp/rr_index_0.out");
1640+
let _ = fs::remove_file("/tmp/rr_data_1.out");
1641+
let _ = fs::remove_file("/tmp/rr_index_1.out");
1642+
}
14861643
}

native/proto/src/proto/partitioning.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ message Partitioning {
3131
HashPartition hash_partition = 1;
3232
SinglePartition single_partition = 2;
3333
RangePartition range_partition = 3;
34+
RoundRobinPartition round_robin_partition = 4;
3435
}
3536
}
3637

@@ -51,3 +52,9 @@ message RangePartition {
5152
int32 num_partitions = 2;
5253
repeated BoundaryRow boundary_rows = 4;
5354
}
55+
56+
message RoundRobinPartition {
57+
int32 num_partitions = 1;
58+
// Maximum number of columns to hash. 0 means no limit (hash all columns).
59+
int32 max_hash_columns = 2;
60+
}

0 commit comments

Comments
 (0)