|
| 1 | +//! Bounded-load consistent hashing example. |
| 2 | +//! |
| 3 | +//! Pure consistent hashing selects each node with equal probability, but for |
| 4 | +//! small workloads (e.g. 64 tokens across 24 machines) random variance causes |
| 5 | +//! highly skewed assignments. This example layers a capacity cap on top of |
| 6 | +//! ConsistentChooseK to enforce near-perfect balance. |
| 7 | +//! |
| 8 | +//! Assignment uses round-robin over replicas: first assign every token's |
| 9 | +//! most-preferred machine, then every token's second-preferred, etc. This |
| 10 | +//! ensures all tokens compete fairly for each replica round. |
| 11 | +//! |
| 12 | +//! Run with: cargo run --example bounded_load |
| 13 | +
|
| 14 | +use std::hash::{DefaultHasher, Hash}; |
| 15 | + |
| 16 | +use consistent_hashing::ConsistentChooseKHasher; |
| 17 | + |
| 18 | +/// Round-robin bounded-load assignment. |
| 19 | +/// |
| 20 | +/// For each replica round r = 0..k, iterate over all tokens and assign each |
| 21 | +/// to its next most-preferred node that still has capacity. This gives every |
| 22 | +/// token equal priority within each round. |
| 23 | +fn bounded_load_assign( |
| 24 | + rankings: &[Vec<usize>], |
| 25 | + k: usize, |
| 26 | + n: usize, |
| 27 | + max_load: usize, |
| 28 | +) -> (Vec<Vec<usize>>, Vec<usize>) { |
| 29 | + let mut load = vec![0usize; n]; |
| 30 | + let num_tokens = rankings.len(); |
| 31 | + let mut assignments = vec![Vec::with_capacity(k); num_tokens]; |
| 32 | + let mut cursors = vec![0usize; num_tokens]; |
| 33 | + |
| 34 | + for _round in 0..k { |
| 35 | + for (token, ranking) in rankings.iter().enumerate() { |
| 36 | + while cursors[token] < ranking.len() { |
| 37 | + let node = ranking[cursors[token]]; |
| 38 | + cursors[token] += 1; |
| 39 | + if load[node] < max_load { |
| 40 | + load[node] += 1; |
| 41 | + assignments[token].push(node); |
| 42 | + break; |
| 43 | + } |
| 44 | + } |
| 45 | + } |
| 46 | + } |
| 47 | + (assignments, load) |
| 48 | +} |
| 49 | + |
| 50 | +fn main() { |
| 51 | + let num_tokens: usize = 64; |
| 52 | + let k: usize = 2; // replicas per token |
| 53 | + let n: usize = 24; // machines |
| 54 | + let total = num_tokens * k; |
| 55 | + let cap = total.div_ceil(n); // ceil(128/24) = 6 |
| 56 | + |
| 57 | + println!("Parameters: {num_tokens} tokens, k={k} replicas, {n} machines"); |
| 58 | + println!("Total assignments: {total}, capacity cap per machine: {cap}"); |
| 59 | + println!( |
| 60 | + "Perfect balance: {}×{} + {}×{}\n", |
| 61 | + n - total % n, |
| 62 | + total / n, |
| 63 | + total % n, |
| 64 | + total / n + 1 |
| 65 | + ); |
| 66 | + |
| 67 | + // ── Unbounded ──────────────────────────────────────────────────────── |
| 68 | + let unbounded: Vec<Vec<usize>> = (0..num_tokens as u64) |
| 69 | + .map(|key| { |
| 70 | + let mut h = DefaultHasher::default(); |
| 71 | + key.hash(&mut h); |
| 72 | + ConsistentChooseKHasher::new(h, n).take(k).collect() |
| 73 | + }) |
| 74 | + .collect(); |
| 75 | + let mut unbounded_load = vec![0usize; n]; |
| 76 | + for a in &unbounded { |
| 77 | + for &node in a { |
| 78 | + unbounded_load[node] += 1; |
| 79 | + } |
| 80 | + } |
| 81 | + |
| 82 | + // ── Bounded (round-robin) ──────────────────────────────────────────── |
| 83 | + let rankings: Vec<Vec<usize>> = (0..num_tokens as u64) |
| 84 | + .map(|key| { |
| 85 | + let mut h = DefaultHasher::default(); |
| 86 | + key.hash(&mut h); |
| 87 | + ConsistentChooseKHasher::new(h, n).collect() |
| 88 | + }) |
| 89 | + .collect(); |
| 90 | + let (bounded, bounded_load) = bounded_load_assign(&rankings, k, n, cap); |
| 91 | + |
| 92 | + // ── Display ────────────────────────────────────────────────────────── |
| 93 | + println!("{:<12} {:>10} {:>10}", "Machine", "Unbounded", "Bounded"); |
| 94 | + println!("{:-<12} {:->10} {:->10}", "", "", ""); |
| 95 | + for i in 0..n { |
| 96 | + println!( |
| 97 | + "{:<12} {:>10} {:>10}", |
| 98 | + i, unbounded_load[i], bounded_load[i] |
| 99 | + ); |
| 100 | + } |
| 101 | + |
| 102 | + let ub_min = *unbounded_load.iter().min().unwrap(); |
| 103 | + let ub_max = *unbounded_load.iter().max().unwrap(); |
| 104 | + let b_min = *bounded_load.iter().min().unwrap(); |
| 105 | + let b_max = *bounded_load.iter().max().unwrap(); |
| 106 | + println!("{:-<12} {:->10} {:->10}", "", "", ""); |
| 107 | + println!( |
| 108 | + "{:<12} {:>10} {:>10}", |
| 109 | + "spread", |
| 110 | + ub_max - ub_min, |
| 111 | + b_max - b_min |
| 112 | + ); |
| 113 | + |
| 114 | + // ── Consistency check: what happens when we add one machine? ───────── |
| 115 | + let n2 = n + 1; |
| 116 | + let cap2 = (num_tokens * k).div_ceil(n2); |
| 117 | + let rankings2: Vec<Vec<usize>> = (0..num_tokens as u64) |
| 118 | + .map(|key| { |
| 119 | + let mut h = DefaultHasher::default(); |
| 120 | + key.hash(&mut h); |
| 121 | + ConsistentChooseKHasher::new(h, n2).collect() |
| 122 | + }) |
| 123 | + .collect(); |
| 124 | + let (bounded2, _) = bounded_load_assign(&rankings2, k, n2, cap2); |
| 125 | + |
| 126 | + let mut changes = 0; |
| 127 | + for (before, after) in bounded.iter().zip(bounded2.iter()) { |
| 128 | + for node in before { |
| 129 | + if !after.contains(node) { |
| 130 | + changes += 1; |
| 131 | + } |
| 132 | + } |
| 133 | + } |
| 134 | + println!("\nConsistency: adding machine {n} → {n2}"); |
| 135 | + println!( |
| 136 | + " {changes}/{total} assignments changed ({:.1}%), ideal ≈ {:.1}%", |
| 137 | + changes as f64 / total as f64 * 100.0, |
| 138 | + k as f64 / n2 as f64 * 100.0 |
| 139 | + ); |
| 140 | +} |
0 commit comments