Skip to content

feat: Add sort-based shuffle repartitioning mode [do not merge - for discussion only]#3940

Draft
andygrove wants to merge 7 commits intoapache:mainfrom
andygrove:sort-based-repartitioning
Draft

feat: Add sort-based shuffle repartitioning mode [do not merge - for discussion only]#3940
andygrove wants to merge 7 commits intoapache:mainfrom
andygrove:sort-based-repartitioning

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Related to #887 (memory over-reservation when running native shuffle write)

Rationale for this change

The current MultiPartitionShuffleRepartitioner (buffered mode) buffers all input batches in memory before writing partitioned output. While memory-efficient per-partition (no per-partition builders), it can consume significant memory when the total input data is large.

The ImmediateModePartitioner from #3845 takes a different approach with per-partition Arrow builders, but as discussed in that PR, this causes memory to scale linearly with the number of output partitions — problematic for the common case of 1000+ partitions.

This PR introduces a third approach: sort-based repartitioning. For each input batch, it:

  1. Computes partition IDs via hash (same as buffered mode)
  2. Counting-sorts row indices by partition ID — O(n) since partition IDs are integers in [0, N)
  3. Uses Arrow take to reorder the batch by partition
  4. Slices the sorted batch at partition boundaries (zero-copy)
  5. Writes each slice to per-partition spill files via persistent BufBatchWriters with BatchCoalescer

This avoids both buffering all input (like buffered mode) and per-partition Arrow builders (like immediate mode).

Benchmark Results (macOS M3 Ultra)

200 partitions, 4 GB memory limit, 100M rows, lz4:

Mode Throughput Peak RSS
buffered 2.17 M rows/s 6.8 GiB
sort 2.95 M rows/s 4.0 GiB

800 partitions, 4 GB memory limit, 100M rows, lz4:

Mode Throughput Peak RSS
buffered 1.99 M rows/s 7.4 GiB
sort 2.09 M rows/s 22.2 GiB

At moderate partition counts (200), sort-based is 36% faster with 41% less memory. At 800 partitions, the per-partition spill files and write buffers cause higher memory usage. This trade-off should be documented in the tuning guide.

What changes are included in this PR?

  • New SortBasedPartitioner implementing ShufflePartitioner trait
  • PartitionSpillWriter with persistent BufBatchWriter per partition for batch coalescing
  • sort_based: bool parameter on ShuffleWriterExec
  • Config: spark.comet.exec.shuffle.sort_based (default: false)
  • Protobuf field in ShuffleWriter message
  • Benchmark support: --mode sort in shuffle_bench
  • spill_batch method on PartitionWriter (for potential future use)

How are these changes tested?

  • 4 new Rust unit tests (test_sort_based_basic, test_sort_based_insert_larger_batch, test_sort_based_insert_smaller_batch, test_sort_based_large_number_of_partitions) covering Hash, Range, and RoundRobin partitioning
  • All 23 shuffle tests pass
  • Benchmarked with TPC-H SF100 lineitem data at 200 and 800 partitions

Add a new shuffle partitioner that processes each batch immediately by
sorting rows by partition ID using counting sort, slicing the sorted
batch at partition boundaries, and writing each slice to per-partition
spill files. This avoids per-partition builder memory overhead compared
to the existing MultiPartitionShuffleRepartitioner.

Also add spill_batch method to PartitionWriter for writing individual
batches to spill files.
Add sort_based parameter to ShuffleWriterExec and external_shuffle to
enable sort-based partitioning as an alternative to the default
multi-partition hash repartitioner. When sort_based is true and more
than one partition is requested, the SortBasedPartitioner is used.

Add test cases for sort-based shuffle covering basic operation, larger
and smaller batch sizes, and large numbers of partitions.
Add COMET_SHUFFLE_SORT_BASED config option and pass it through protobuf
to the native ShuffleWriterExec, replacing the hardcoded `false` value.
…oner

Replace per-call BufBatchWriter creation with persistent per-partition
writers that keep BatchCoalescer state across calls. This allows small
partition slices to be coalesced to batch_size before encoding,
dramatically reducing per-block IPC schema overhead.
@andygrove andygrove changed the title feat: Add sort-based shuffle repartitioning mode feat: Add sort-based shuffle repartitioning mode [do not merge - for discussion only] Apr 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant