Skip to content

Commit 3deeadd

Browse files
authored
perf: strength reduce hash partition modulo (up to 1.16x faster) (#21900)
## Which issue does this PR close? - Closes #21843. ## Rationale for this change Performance improvement on large hash-repartitions. TPC-H at bigger scale factor shows biggest benefit: <details> ``` -------------------- Benchmark tpch_sf10.json -------------------- ┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ HEAD ┃ perf-strength-reduce-hash-partition ┃ Change ┃ ┡━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 327.64 / 329.66 ±1.44 / 331.84 ms │ 327.82 / 330.00 ±1.68 / 331.48 ms │ no change │ │ QQuery 2 │ 131.35 / 138.32 ±3.91 / 143.24 ms │ 125.94 / 126.49 ±0.61 / 127.42 ms │ +1.09x faster │ │ QQuery 3 │ 286.47 / 300.24 ±8.19 / 308.99 ms │ 273.76 / 276.01 ±1.80 / 277.98 ms │ +1.09x faster │ │ QQuery 4 │ 158.30 / 160.81 ±2.21 / 163.54 ms │ 137.56 / 138.79 ±0.98 / 139.80 ms │ +1.16x faster │ │ QQuery 5 │ 428.90 / 437.68 ±4.45 / 440.83 ms │ 390.52 / 396.51 ±4.30 / 403.67 ms │ +1.10x faster │ │ QQuery 6 │ 131.88 / 132.83 ±1.17 / 134.81 ms │ 133.06 / 134.70 ±1.22 / 135.83 ms │ no change │ │ QQuery 7 │ 541.09 / 545.88 ±4.21 / 552.67 ms │ 508.51 / 531.75 ±16.82 / 548.21 ms │ no change │ │ QQuery 8 │ 467.86 / 476.44 ±6.95 / 483.87 ms │ 427.19 / 439.03 ±9.88 / 453.56 ms │ +1.09x faster │ │ QQuery 9 │ 649.16 / 660.07 ±10.12 / 676.72 ms │ 605.25 / 611.87 ±5.72 / 620.70 ms │ +1.08x faster │ │ QQuery 10 │ 327.64 / 339.90 ±6.92 / 348.85 ms │ 321.66 / 330.67 ±4.76 / 334.89 ms │ no change │ │ QQuery 11 │ 104.93 / 107.54 ±1.71 / 110.18 ms │ 92.80 / 101.35 ±12.27 / 125.63 ms │ +1.06x faster │ │ QQuery 12 │ 198.96 / 202.37 ±2.45 / 206.21 ms │ 195.07 / 197.77 ±4.26 / 206.26 ms │ no change │ │ QQuery 13 │ 300.44 / 312.37 ±6.87 / 321.90 ms │ 291.85 / 308.47 ±10.23 / 317.55 ms │ no change │ │ QQuery 14 │ 188.06 / 193.71 ±4.81 / 200.69 ms │ 182.89 / 186.72 ±3.67 / 192.75 ms │ no change │ │ QQuery 15 │ 334.88 / 339.95 ±5.81 / 350.78 ms │ 330.79 / 336.21 ±4.31 / 342.71 ms │ no change │ │ QQuery 16 │ 78.38 / 81.25 ±2.51 / 84.55 ms │ 74.35 / 76.61 ±2.80 / 81.94 ms │ +1.06x faster │ │ QQuery 17 │ 744.08 / 761.70 ±12.84 / 781.69 ms │ 703.40 / 724.66 ±23.35 / 770.05 ms │ no change │ │ QQuery 18 │ 760.17 / 782.23 ±12.12 / 796.85 ms │ 725.45 / 744.71 ±15.59 / 765.59 ms │ no change │ │ QQuery 19 │ 267.90 / 280.99 ±14.61 / 306.80 ms │ 275.58 / 298.23 ±27.69 / 351.75 ms │ 1.06x slower │ │ QQuery 20 │ 311.46 / 323.12 ±10.13 / 341.26 ms │ 312.13 / 319.42 ±4.39 / 324.46 ms │ no change │ │ QQuery 21 │ 816.40 / 837.33 ±19.78 / 870.18 ms │ 766.23 / 778.58 ±8.98 / 792.31 ms │ +1.08x faster │ │ QQuery 22 │ 81.46 / 84.94 ±2.58 / 88.20 ms │ 75.31 / 77.73 ±1.39 / 79.55 ms │ +1.09x faster │ └───────────┴────────────────────────────────────┴─────────────────────────────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩ │ Total Time (HEAD) │ 7829.34ms │ │ Total Time (perf-strength-reduce-hash-partition) │ 7466.29ms │ │ Average Time (HEAD) │ 355.88ms │ │ Average Time (perf-strength-reduce-hash-partition) │ 339.38ms │ │ Queries Faster │ 10 │ │ Queries Slower │ 1 │ │ Queries with No Change │ 11 │ │ Queries with Failure │ 0 │ └────────────────────────────────────────────────────┴───────────┘ ``` </details> ## What changes are included in this PR? Use strength-reduce to speed up hash % partition ## Are these changes tested? Existing tests ## Are there any user-facing changes? A small change to `new_hash_partitioner` to return a `Result` instead of panic during runtime
1 parent 1364286 commit 3deeadd

1 file changed

Lines changed: 187 additions & 17 deletions

File tree

  • datafusion/physical-plan/src/repartition

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 187 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ pub struct BatchPartitioner {
424424
enum BatchPartitionerState {
425425
Hash {
426426
exprs: Vec<Arc<dyn PhysicalExpr>>,
427-
num_partitions: usize,
427+
partition_reducer: StrengthReducedU64,
428428
hash_buffer: Vec<u64>,
429429
indices: Vec<Vec<u32>>,
430430
},
@@ -438,6 +438,78 @@ enum BatchPartitionerState {
438438
/// executions and runs.
439439
pub const REPARTITION_RANDOM_STATE: SeededRandomState = SeededRandomState::with_seed(0);
440440

441+
/// Computes `value % divisor` without division in the hot loop when `divisor`
442+
/// is fixed for many values.
443+
///
444+
/// Hash repartitioning computes a remainder for every row. Integer division is
445+
/// relatively expensive, so this precomputes the strength-reduced form of the
446+
/// divisor: powers of two use a bit mask, and other divisors use a reciprocal
447+
/// multiply to recover the quotient and therefore the remainder. This is the
448+
/// same invariant-divisor optimization compilers use for `%` by a constant.
449+
#[derive(Debug, Clone, Copy)]
450+
enum StrengthReducedU64 {
451+
PowerOfTwo { mask: u64 },
452+
Reciprocal { divisor: u64, reciprocal: u128 },
453+
}
454+
455+
impl StrengthReducedU64 {
456+
fn new(divisor: u64) -> Self {
457+
debug_assert!(divisor > 0);
458+
459+
if divisor.is_power_of_two() {
460+
Self::PowerOfTwo { mask: divisor - 1 }
461+
} else {
462+
Self::Reciprocal {
463+
divisor,
464+
// ceil(2^128 / divisor), computed without representing 2^128
465+
reciprocal: u128::MAX / u128::from(divisor) + 1,
466+
}
467+
}
468+
}
469+
470+
fn partition_indices(self, hash_buffer: &[u64], indices: &mut [Vec<u32>]) {
471+
match self {
472+
Self::PowerOfTwo { mask } => {
473+
for (index, hash) in hash_buffer.iter().enumerate() {
474+
indices[(*hash & mask) as usize].push(index as u32);
475+
}
476+
}
477+
Self::Reciprocal {
478+
divisor,
479+
reciprocal,
480+
} => {
481+
for (index, hash) in hash_buffer.iter().enumerate() {
482+
let quotient = Self::quotient(*hash, reciprocal);
483+
let partition = *hash - quotient * divisor;
484+
indices[partition as usize].push(index as u32);
485+
}
486+
}
487+
}
488+
}
489+
490+
#[cfg(test)]
491+
fn remainder(self, value: u64) -> u64 {
492+
match self {
493+
Self::PowerOfTwo { mask } => value & mask,
494+
Self::Reciprocal {
495+
divisor,
496+
reciprocal,
497+
} => value - Self::quotient(value, reciprocal) * divisor,
498+
}
499+
}
500+
501+
#[inline]
502+
fn quotient(value: u64, reciprocal: u128) -> u64 {
503+
let reciprocal_low = reciprocal as u64;
504+
let reciprocal_high = (reciprocal >> 64) as u64;
505+
let low_product = u128::from(value) * u128::from(reciprocal_low);
506+
let high_product = u128::from(value) * u128::from(reciprocal_high);
507+
let carry = ((high_product & u128::from(u64::MAX)) + (low_product >> 64)) >> 64;
508+
509+
((high_product >> 64) + carry) as u64
510+
}
511+
}
512+
441513
impl BatchPartitioner {
442514
/// Create a new [`BatchPartitioner`] for hash-based repartitioning.
443515
///
@@ -446,22 +518,29 @@ impl BatchPartitioner {
446518
/// - `num_partitions`: Total number of output partitions.
447519
/// - `timer`: Metric used to record time spent during repartitioning.
448520
///
449-
/// # Notes
450-
/// This constructor cannot fail and performs no validation.
521+
/// The partition count is fixed for the lifetime of the partitioner, so this
522+
/// precomputes a strength-reduced reducer for `hash % num_partitions`.
523+
///
524+
/// # Errors
525+
/// Returns an error if `num_partitions` is zero.
451526
pub fn new_hash_partitioner(
452527
exprs: Vec<Arc<dyn PhysicalExpr>>,
453528
num_partitions: usize,
454529
timer: metrics::Time,
455-
) -> Self {
456-
Self {
530+
) -> Result<Self> {
531+
if num_partitions == 0 {
532+
return internal_err!("Hash repartition requires at least one partition");
533+
}
534+
535+
Ok(Self {
457536
state: BatchPartitionerState::Hash {
458537
exprs,
459-
num_partitions,
538+
partition_reducer: StrengthReducedU64::new(num_partitions as u64),
460539
hash_buffer: vec![],
461540
indices: vec![vec![]; num_partitions],
462541
},
463542
timer,
464-
}
543+
})
465544
}
466545

467546
/// Create a new [`BatchPartitioner`] for round-robin repartitioning.
@@ -501,7 +580,8 @@ impl BatchPartitioner {
501580
/// - `num_input_partitions`: Total number of input partitions.
502581
///
503582
/// # Errors
504-
/// Returns an error if the provided partitioning scheme is not supported.
583+
/// Returns an error if the provided partitioning scheme is not supported,
584+
/// or if hash partitioning is requested with zero output partitions.
505585
pub fn try_new(
506586
partitioning: Partitioning,
507587
timer: metrics::Time,
@@ -510,7 +590,7 @@ impl BatchPartitioner {
510590
) -> Result<Self> {
511591
match partitioning {
512592
Partitioning::Hash(exprs, num_partitions) => {
513-
Ok(Self::new_hash_partitioner(exprs, num_partitions, timer))
593+
Self::new_hash_partitioner(exprs, num_partitions, timer)
514594
}
515595
Partitioning::RoundRobinBatch(num_partitions) => {
516596
Ok(Self::new_round_robin_partitioner(
@@ -575,7 +655,7 @@ impl BatchPartitioner {
575655
}
576656
BatchPartitionerState::Hash {
577657
exprs,
578-
num_partitions: partitions,
658+
partition_reducer,
579659
hash_buffer,
580660
indices,
581661
} => {
@@ -596,9 +676,7 @@ impl BatchPartitioner {
596676

597677
indices.iter_mut().for_each(|v| v.clear());
598678

599-
for (index, hash) in hash_buffer.iter().enumerate() {
600-
indices[(*hash % *partitions as u64) as usize].push(index as u32);
601-
}
679+
partition_reducer.partition_indices(hash_buffer, indices);
602680

603681
// Finished building index-arrays for output partitions
604682
timer.done();
@@ -653,9 +731,9 @@ impl BatchPartitioner {
653731

654732
// return the number of output partitions
655733
fn num_partitions(&self) -> usize {
656-
match self.state {
657-
BatchPartitionerState::RoundRobin { num_partitions, .. } => num_partitions,
658-
BatchPartitionerState::Hash { num_partitions, .. } => num_partitions,
734+
match &self.state {
735+
BatchPartitionerState::RoundRobin { num_partitions, .. } => *num_partitions,
736+
BatchPartitionerState::Hash { indices, .. } => indices.len(),
659737
}
660738
}
661739
}
@@ -1359,7 +1437,7 @@ impl RepartitionExec {
13591437
exprs.clone(),
13601438
*num_partitions,
13611439
metrics.repartition_time.clone(),
1362-
)
1440+
)?
13631441
}
13641442
Partitioning::RoundRobinBatch(num_partitions) => {
13651443
BatchPartitioner::new_round_robin_partitioner(
@@ -1783,6 +1861,98 @@ mod tests {
17831861
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
17841862
use insta::assert_snapshot;
17851863

1864+
#[test]
1865+
fn strength_reduced_u64_remainder_matches_modulo() {
1866+
let divisors = [
1867+
1,
1868+
2,
1869+
3,
1870+
4,
1871+
5,
1872+
7,
1873+
8,
1874+
10,
1875+
16,
1876+
31,
1877+
32,
1878+
63,
1879+
64,
1880+
65,
1881+
97,
1882+
u64::from(u32::MAX),
1883+
u64::from(u32::MAX) + 1,
1884+
1_u64 << 32,
1885+
(1_u64 << 63) - 1,
1886+
1_u64 << 63,
1887+
u64::MAX - 1,
1888+
u64::MAX,
1889+
];
1890+
let values = [
1891+
0,
1892+
1,
1893+
2,
1894+
3,
1895+
4,
1896+
5,
1897+
31,
1898+
32,
1899+
33,
1900+
63,
1901+
64,
1902+
65,
1903+
u64::from(u32::MAX) - 1,
1904+
u64::from(u32::MAX),
1905+
u64::from(u32::MAX) + 1,
1906+
(1_u64 << 32) - 1,
1907+
1_u64 << 32,
1908+
(1_u64 << 32) + 1,
1909+
(1_u64 << 63) - 1,
1910+
1_u64 << 63,
1911+
(1_u64 << 63) + 1,
1912+
u64::MAX - 1,
1913+
u64::MAX,
1914+
];
1915+
1916+
for divisor in divisors {
1917+
let reducer = StrengthReducedU64::new(divisor);
1918+
for value in values {
1919+
assert_eq!(
1920+
reducer.remainder(value),
1921+
value % divisor,
1922+
"value={value} divisor={divisor}"
1923+
);
1924+
}
1925+
1926+
let mut value = 0x1234_5678_9abc_def0 ^ divisor;
1927+
for _ in 0..10_000 {
1928+
value = value
1929+
.wrapping_mul(6_364_136_223_846_793_005)
1930+
.wrapping_add(1_442_695_040_888_963_407);
1931+
assert_eq!(
1932+
reducer.remainder(value),
1933+
value % divisor,
1934+
"value={value} divisor={divisor}"
1935+
);
1936+
}
1937+
}
1938+
}
1939+
1940+
#[test]
1941+
fn hash_partitioner_requires_nonzero_partitions() {
1942+
let metrics = ExecutionPlanMetricsSet::new();
1943+
let timer = MetricBuilder::new(&metrics).subset_time("test", 0);
1944+
1945+
let err = BatchPartitioner::new_hash_partitioner(vec![], 0, timer)
1946+
.err()
1947+
.expect("zero hash partitions should fail")
1948+
.to_string();
1949+
1950+
assert!(
1951+
err.contains("Hash repartition requires at least one partition"),
1952+
"actual: {err}"
1953+
);
1954+
}
1955+
17861956
#[tokio::test]
17871957
async fn one_to_many_round_robin() -> Result<()> {
17881958
// define input partitions

0 commit comments

Comments
 (0)