Skip to content

[WIP] feat: Use put_batch_with_indices to track partitions by index for sort-based shuffle#1451

Draft
mattcuento wants to merge 2 commits intoapache:mainfrom
mattcuento:angry-golick
Draft

[WIP] feat: Use put_batch_with_indices to track partitions by index for sort-based shuffle#1451
mattcuento wants to merge 2 commits intoapache:mainfrom
mattcuento:angry-golick

Conversation

@mattcuento
Copy link
Copy Markdown
Contributor

@mattcuento mattcuento commented Feb 11, 2026

Which issue does this PR close?

Closes #1432 .

Rationale for this change

The default BatchPartitioner used in sort-based shuffle creates separate RecordBatch instances for each output partition. In situations where the number of output partitions is high, there's a lot of overhead in the metadata with each RecordBatch.

This PR aims to defer materializing the shuffled output batches until spilling/writing results. In the interim, the same input batches are stored, and rows for each partition are tracked via a separate index data structure.

What changes are included in this PR?

  • Remove the PartitionBuffer struct
  • Introduce InputBatchStore and ScratchSpace for storing input batches and tracking output partition ownership
  • Integrate the above structs to defer materialization until spilling or finalizing results

I found that existing test coverage via the client sort shuffle tests felt sufficient for the changes to the writer.

Are there any user-facing changes?

No

@mattcuento mattcuento force-pushed the angry-golick branch 2 times, most recently from b475fdc to 2967b31 Compare February 13, 2026 04:50
@mattcuento
Copy link
Copy Markdown
Contributor Author

mattcuento commented Feb 17, 2026

interleave_record_batch

Benchmarking sort_shuffle_no_spill/10_batches_200_partitions: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 5.8s or enable flat sampling.
sort_shuffle_no_spill/10_batches_200_partitions
time: [93.036 ms 105.89 ms 119.03 ms]
Benchmarking sort_shuffle_no_spill/50_batches_200_partitions: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 7.3s.
sort_shuffle_no_spill/50_batches_200_partitions
time: [235.61 ms 295.25 ms 361.97 ms]

Benchmarking sort_shuffle_with_spill/50_batches_200_partitions_8mb_limit: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 67.8s.
sort_shuffle_with_spill/50_batches_200_partitions_8mb_limit
time: [4.3843 s 4.4786 s 4.5840 s]
Found 2 outliers among 10 measurements (20.00%)
2 (20.00%) high mild
Benchmarking sort_shuffle_with_spill/50_batches_200_partitions_2mb_limit: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 70.4s.
sort_shuffle_with_spill/50_batches_200_partitions_2mb_limit
time: [6.9901 s 7.1286 s 7.2624 s]

@mattcuento
Copy link
Copy Markdown
Contributor Author

main:

Benchmarking sort_shuffle_no_spill/10_batches_200_partitions: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 7.6s or enable flat sampling.
sort_shuffle_no_spill/10_batches_200_partitions
time: [132.21 ms 137.14 ms 148.05 ms]
Found 2 outliers among 10 measurements (20.00%)
1 (10.00%) high mild
1 (10.00%) high severe
sort_shuffle_no_spill/50_batches_200_partitions
time: [458.17 ms 469.96 ms 487.29 ms]
Found 1 outliers among 10 measurements (10.00%)
1 (10.00%) high severe

Benchmarking sort_shuffle_with_spill/50_batches_200_partitions_8mb_limit: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 41.5s.
sort_shuffle_with_spill/50_batches_200_partitions_8mb_limit
time: [3.9553 s 3.9729 s 3.9931 s]
Found 1 outliers among 10 measurements (10.00%)
1 (10.00%) high mild
Benchmarking sort_shuffle_with_spill/50_batches_200_partitions_2mb_limit: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 55.6s.
sort_shuffle_with_spill/50_batches_200_partitions_2mb_limit
time: [5.4494 s 5.5062 s 5.5745 s]

@mattcuento
Copy link
Copy Markdown
Contributor Author

push_batch_with_indices:

Benchmarking sort_shuffle_no_spill/10_batches_200_partitions: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 7.2s or enable flat sampling.
sort_shuffle_no_spill/10_batches_200_partitions
time: [131.53 ms 133.07 ms 135.00 ms]
Found 2 outliers among 10 measurements (20.00%)
2 (20.00%) high mild
sort_shuffle_no_spill/50_batches_200_partitions
time: [305.84 ms 311.67 ms 320.05 ms]
Found 1 outliers among 10 measurements (10.00%)
1 (10.00%) high severe

Benchmarking sort_shuffle_with_spill/50_batches_200_partitions_8mb_limit: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 48.6s.
sort_shuffle_with_spill/50_batches_200_partitions_8mb_limit
time: [4.7025 s 4.7285 s 4.7563 s]
Benchmarking sort_shuffle_with_spill/50_batches_200_partitions_2mb_limit: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 65.3s.
sort_shuffle_with_spill/50_batches_200_partitions_2mb_limit
time: [6.4165 s 6.4397 s 6.4684 s]

@milenkovicm
Copy link
Copy Markdown
Contributor

@sqlbenchmark tpch --iterations 3 --scale-factor 10

@sqlbenchmark
Copy link
Copy Markdown

Ballista TPC-H Benchmark Results

PR: #1451 - interleave_record_batch
PR Commit: 0f189c78
Base Commit: 97452516 (main)
Scale Factor: SF10
Iterations: 3

Query Comparison

Query main (ms) PR (ms) Change
Q1 2061.90 2542.60 🔴 +23.3%
Q2 844.10 2014.50 🔴 +138.7%
Q3 1327.00 2094.30 🔴 +57.8%
Q4 619.90 1248.70 🔴 +101.4%
Q5 2819.30 4144.70 🔴 +47.0%
Q6 582.10 929.60 🔴 +59.7%
Q7 4606.90 5552.50 🔴 +20.5%
Q8 3354.80 5083.40 🔴 +51.5%
Q9 5079.80 6368.10 🔴 +25.4%
Q10 1581.00 2454.80 🔴 +55.3%
Q11 600.70 1391.50 🔴 +131.6%
Q12 1050.00 1755.90 🔴 +67.2%
Q13 1530.80 2113.30 🔴 +38.1%
Q14 614.20 1102.20 🔴 +79.5%
Q15 615.60 1472.80 🔴 +139.2%
Q16 396.40 1307.30 🔴 +229.8%
Q17 3783.10 4344.70 🔴 +14.8%
Q18 6603.90 7228.50 🔴 +9.5%
Q19 1359.40 1868.20 🔴 +37.4%
Q20 1036.20 1798.50 🔴 +73.6%
Q21 4788.20 5747.50 🔴 +20.0%
Q22 326.50 1032.10 🔴 +216.1%

Total: Main=45581.80ms, PR=63595.70ms (+39.5%)


Automated benchmark run by dfbench

@milenkovicm
Copy link
Copy Markdown
Contributor

Ballista TPC-H Benchmark Results

PR: #1451 - interleave_record_batch
PR Commit: 0f189c78
Base Commit: 97452516 (main)
Scale Factor: SF10
Iterations: 3

Query Comparison

Query main (ms) PR (ms) Change
Q1 2061.90 2542.60 🔴 +23.3%
Q2 844.10 2014.50 🔴 +138.7%
Q3 1327.00 2094.30 🔴 +57.8%
Q4 619.90 1248.70 🔴 +101.4%
Q5 2819.30 4144.70 🔴 +47.0%
Q6 582.10 929.60 🔴 +59.7%
Q7 4606.90 5552.50 🔴 +20.5%
Q8 3354.80 5083.40 🔴 +51.5%
Q9 5079.80 6368.10 🔴 +25.4%
Q10 1581.00 2454.80 🔴 +55.3%
Q11 600.70 1391.50 🔴 +131.6%
Q12 1050.00 1755.90 🔴 +67.2%
Q13 1530.80 2113.30 🔴 +38.1%
Q14 614.20 1102.20 🔴 +79.5%
Q15 615.60 1472.80 🔴 +139.2%
Q16 396.40 1307.30 🔴 +229.8%
Q17 3783.10 4344.70 🔴 +14.8%
Q18 6603.90 7228.50 🔴 +9.5%
Q19 1359.40 1868.20 🔴 +37.4%
Q20 1036.20 1798.50 🔴 +73.6%
Q21 4788.20 5747.50 🔴 +20.0%
Q22 326.50 1032.10 🔴 +216.1%

Total: Main=45581.80ms, PR=63595.70ms (+39.5%)


Automated benchmark run by dfbench

Ah, branch needs rebase

…t-based shuffle

lint

first round of otpimizations

second round

lint

lint
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Use interleave_record_batch to avoid tiny batches in sort-based shuffle

3 participants