|
1 | 1 | //! Bounded-load consistent hashing example. |
2 | 2 | //! |
3 | | -//! Pure consistent hashing selects each node with equal probability, but for |
4 | | -//! small workloads (e.g. 64 tokens across 24 machines) random variance causes |
5 | | -//! highly skewed assignments. This example layers a capacity cap on top of |
6 | | -//! ConsistentChooseK to enforce near-perfect balance. |
| 3 | +//! Compares unbounded vs bounded-load assignment across many random seeds, |
| 4 | +//! reporting average and standard deviation of load spread and consistency |
| 5 | +//! (fraction of assignments that change when a node is added). |
7 | 6 | //! |
8 | | -//! Assignment uses round-robin over replicas: first assign every token's |
9 | | -//! most-preferred machine, then every token's second-preferred, etc. This |
10 | | -//! ensures all tokens compete fairly for each replica round. |
| 7 | +//! Bounded assignment iterates over tokens sequentially, greedily assigning |
| 8 | +//! each token its k most-preferred nodes that still have capacity. Using |
| 9 | +//! round-robin (all tokens claim one replica per round) yields nearly |
| 10 | +//! identical churn numbers with marginally better load spread. |
11 | 11 | //! |
12 | 12 | //! Run with: cargo run --example bounded_load |
13 | 13 |
|
14 | | -use std::hash::{DefaultHasher, Hash}; |
| 14 | +use std::hash::{DefaultHasher, Hash, Hasher}; |
15 | 15 |
|
16 | 16 | use consistent_hashing::ConsistentChooseKHasher; |
17 | 17 |
|
18 | | -/// Round-robin bounded-load assignment. |
| 18 | +/// Bounded-load assignment. |
19 | 19 | /// |
20 | | -/// For each replica round r = 0..k, iterate over all tokens and assign each |
21 | | -/// to its next most-preferred node that still has capacity. This gives every |
22 | | -/// token equal priority within each round. |
| 20 | +/// Each token claims all k replicas before moving to the next token, |
| 21 | +/// skipping any node that has reached `max_load`. |
23 | 22 | fn bounded_load_assign( |
24 | | - rankings: &[Vec<usize>], |
| 23 | + iters: impl IntoIterator<Item = ConsistentChooseKHasher<DefaultHasher>>, |
25 | 24 | k: usize, |
26 | 25 | n: usize, |
27 | 26 | max_load: usize, |
28 | 27 | ) -> (Vec<Vec<usize>>, Vec<usize>) { |
29 | 28 | let mut load = vec![0usize; n]; |
30 | | - let num_tokens = rankings.len(); |
31 | | - let mut assignments = vec![Vec::with_capacity(k); num_tokens]; |
32 | | - let mut cursors = vec![0usize; num_tokens]; |
33 | | - |
34 | | - for _round in 0..k { |
35 | | - for (token, ranking) in rankings.iter().enumerate() { |
36 | | - while cursors[token] < ranking.len() { |
37 | | - let node = ranking[cursors[token]]; |
38 | | - cursors[token] += 1; |
39 | | - if load[node] < max_load { |
40 | | - load[node] += 1; |
41 | | - assignments[token].push(node); |
| 29 | + let mut assignments = Vec::new(); |
| 30 | + |
| 31 | + for mut iter in iters { |
| 32 | + let mut assigned = Vec::with_capacity(k); |
| 33 | + for node in iter.by_ref() { |
| 34 | + if load[node] < max_load { |
| 35 | + load[node] += 1; |
| 36 | + assigned.push(node); |
| 37 | + if assigned.len() == k { |
42 | 38 | break; |
43 | 39 | } |
44 | 40 | } |
45 | 41 | } |
| 42 | + assignments.push(assigned); |
46 | 43 | } |
47 | 44 | (assignments, load) |
48 | 45 | } |
49 | 46 |
|
50 | | -fn main() { |
51 | | - let num_tokens: usize = 64; |
52 | | - let k: usize = 2; // replicas per token |
53 | | - let n: usize = 24; // machines |
| 47 | +fn hasher_for_seed_and_key(seed: u64, key: u64) -> DefaultHasher { |
| 48 | + let mut h = DefaultHasher::default(); |
| 49 | + seed.hash(&mut h); |
| 50 | + let seed_state = h.finish(); |
| 51 | + let mut h2 = DefaultHasher::default(); |
| 52 | + seed_state.hash(&mut h2); |
| 53 | + key.hash(&mut h2); |
| 54 | + h2 |
| 55 | +} |
| 56 | + |
| 57 | +struct Stats { |
| 58 | + sum: f64, |
| 59 | + sum_sq: f64, |
| 60 | + count: f64, |
| 61 | +} |
| 62 | + |
| 63 | +impl Stats { |
| 64 | + fn new() -> Self { |
| 65 | + Self { |
| 66 | + sum: 0.0, |
| 67 | + sum_sq: 0.0, |
| 68 | + count: 0.0, |
| 69 | + } |
| 70 | + } |
| 71 | + |
| 72 | + fn push(&mut self, x: f64) { |
| 73 | + self.sum += x; |
| 74 | + self.sum_sq += x * x; |
| 75 | + self.count += 1.0; |
| 76 | + } |
| 77 | + |
| 78 | + fn mean(&self) -> f64 { |
| 79 | + self.sum / self.count |
| 80 | + } |
| 81 | + |
| 82 | + fn stddev(&self) -> f64 { |
| 83 | + (self.sum_sq / self.count - self.mean().powi(2)) |
| 84 | + .max(0.0) |
| 85 | + .sqrt() |
| 86 | + } |
| 87 | +} |
| 88 | + |
| 89 | +fn run(num_tokens: usize, k: usize, n: usize, num_seeds: u64) { |
54 | 90 | let total = num_tokens * k; |
55 | | - let cap = total.div_ceil(n); // ceil(128/24) = 6 |
| 91 | + let cap = total.div_ceil(n); |
56 | 92 |
|
57 | | - println!("Parameters: {num_tokens} tokens, k={k} replicas, {n} machines"); |
| 93 | + println!("Parameters: {num_tokens} tokens, k={k} replicas, {n} machines, {num_seeds} seeds"); |
58 | 94 | println!("Total assignments: {total}, capacity cap per machine: {cap}"); |
59 | 95 | println!( |
60 | | - "Perfect balance: {}×{} + {}×{}\n", |
| 96 | + "Perfect balance: {}×{} + {}×{}", |
61 | 97 | n - total % n, |
62 | 98 | total / n, |
63 | 99 | total % n, |
64 | 100 | total / n + 1 |
65 | 101 | ); |
| 102 | + println!(); |
| 103 | + |
| 104 | + let mut ub_spread = Stats::new(); |
| 105 | + let mut b_spread = Stats::new(); |
| 106 | + let mut ub_changes = Stats::new(); |
| 107 | + let mut b_changes = Stats::new(); |
| 108 | + |
| 109 | + for seed in 0..num_seeds { |
| 110 | + // ── Unbounded ──────────────────────────────────────────────────── |
| 111 | + let make_iters = |n| { |
| 112 | + (0..num_tokens as u64) |
| 113 | + .map(move |key| ConsistentChooseKHasher::new(hasher_for_seed_and_key(seed, key), n)) |
| 114 | + }; |
| 115 | + let (unbounded, ub_load) = bounded_load_assign(make_iters(n), k, n, usize::MAX); |
| 116 | + let ub_min = *ub_load.iter().min().unwrap(); |
| 117 | + let ub_max = *ub_load.iter().max().unwrap(); |
| 118 | + ub_spread.push((ub_max - ub_min) as f64); |
| 119 | + |
| 120 | + // ── Bounded ────────────────────────────────────────────────────── |
| 121 | + let (bounded, b_load) = bounded_load_assign(make_iters(n), k, n, cap); |
| 122 | + let b_min = *b_load.iter().min().unwrap(); |
| 123 | + let b_max = *b_load.iter().max().unwrap(); |
| 124 | + b_spread.push((b_max - b_min) as f64); |
66 | 125 |
|
67 | | - // ── Unbounded ──────────────────────────────────────────────────────── |
68 | | - let unbounded: Vec<Vec<usize>> = (0..num_tokens as u64) |
69 | | - .map(|key| { |
70 | | - let mut h = DefaultHasher::default(); |
71 | | - key.hash(&mut h); |
72 | | - ConsistentChooseKHasher::new(h, n).take(k).collect() |
73 | | - }) |
74 | | - .collect(); |
75 | | - let mut unbounded_load = vec![0usize; n]; |
76 | | - for a in &unbounded { |
77 | | - for &node in a { |
78 | | - unbounded_load[node] += 1; |
| 126 | + // ── Consistency: add one machine ───────────────────────────────── |
| 127 | + let n2 = n + 1; |
| 128 | + let cap2 = total.div_ceil(n2); |
| 129 | + |
| 130 | + let (unbounded2, _) = bounded_load_assign(make_iters(n2), k, n2, usize::MAX); |
| 131 | + let mut ub_chg = 0usize; |
| 132 | + for (before, after) in unbounded.iter().zip(unbounded2.iter()) { |
| 133 | + for node in before { |
| 134 | + if !after.contains(node) { |
| 135 | + ub_chg += 1; |
| 136 | + } |
| 137 | + } |
79 | 138 | } |
80 | | - } |
| 139 | + ub_changes.push(ub_chg as f64 / total as f64 * 100.0); |
81 | 140 |
|
82 | | - // ── Bounded (round-robin) ──────────────────────────────────────────── |
83 | | - let rankings: Vec<Vec<usize>> = (0..num_tokens as u64) |
84 | | - .map(|key| { |
85 | | - let mut h = DefaultHasher::default(); |
86 | | - key.hash(&mut h); |
87 | | - ConsistentChooseKHasher::new(h, n).collect() |
88 | | - }) |
89 | | - .collect(); |
90 | | - let (bounded, bounded_load) = bounded_load_assign(&rankings, k, n, cap); |
91 | | - |
92 | | - // ── Display ────────────────────────────────────────────────────────── |
93 | | - println!("{:<12} {:>10} {:>10}", "Machine", "Unbounded", "Bounded"); |
94 | | - println!("{:-<12} {:->10} {:->10}", "", "", ""); |
95 | | - for i in 0..n { |
96 | | - println!( |
97 | | - "{:<12} {:>10} {:>10}", |
98 | | - i, unbounded_load[i], bounded_load[i] |
99 | | - ); |
| 141 | + let (bounded2, _) = bounded_load_assign(make_iters(n2), k, n2, cap2); |
| 142 | + let mut b_chg = 0usize; |
| 143 | + for (before, after) in bounded.iter().zip(bounded2.iter()) { |
| 144 | + for node in before { |
| 145 | + if !after.contains(node) { |
| 146 | + b_chg += 1; |
| 147 | + } |
| 148 | + } |
| 149 | + } |
| 150 | + b_changes.push(b_chg as f64 / total as f64 * 100.0); |
100 | 151 | } |
101 | 152 |
|
102 | | - let ub_min = *unbounded_load.iter().min().unwrap(); |
103 | | - let ub_max = *unbounded_load.iter().max().unwrap(); |
104 | | - let b_min = *bounded_load.iter().min().unwrap(); |
105 | | - let b_max = *bounded_load.iter().max().unwrap(); |
106 | | - println!("{:-<12} {:->10} {:->10}", "", "", ""); |
107 | 153 | println!( |
108 | | - "{:<12} {:>10} {:>10}", |
109 | | - "spread", |
110 | | - ub_max - ub_min, |
111 | | - b_max - b_min |
| 154 | + "{:<24} {:>16} {:>16}", |
| 155 | + "", "Unbounded", "Bounded" |
112 | 156 | ); |
| 157 | + println!("{:-<24} {:->16} {:->16}", "", "", ""); |
| 158 | + println!( |
| 159 | + "{:<24} {:>11.2} ± {:<5.2} {:>10.2} ± {:<5.2}", |
| 160 | + "Load spread (max-min)", |
| 161 | + ub_spread.mean(), |
| 162 | + ub_spread.stddev(), |
| 163 | + b_spread.mean(), |
| 164 | + b_spread.stddev(), |
| 165 | + ); |
| 166 | + println!( |
| 167 | + "{:<24} {:>10.2}% ± {:<5.2} {:>9.2}% ± {:<5.2}", |
| 168 | + "Churn on n→n+1", |
| 169 | + ub_changes.mean(), |
| 170 | + ub_changes.stddev(), |
| 171 | + b_changes.mean(), |
| 172 | + b_changes.stddev(), |
| 173 | + ); |
| 174 | + println!( |
| 175 | + "\n ideal churn: {:.2}%", |
| 176 | + 1.0 / (n + 1) as f64 * 100.0 |
| 177 | + ); |
| 178 | +} |
113 | 179 |
|
114 | | - // ── Consistency check: what happens when we add one machine? ───────── |
115 | | - let n2 = n + 1; |
116 | | - let cap2 = (num_tokens * k).div_ceil(n2); |
117 | | - let rankings2: Vec<Vec<usize>> = (0..num_tokens as u64) |
118 | | - .map(|key| { |
119 | | - let mut h = DefaultHasher::default(); |
120 | | - key.hash(&mut h); |
121 | | - ConsistentChooseKHasher::new(h, n2).collect() |
122 | | - }) |
123 | | - .collect(); |
124 | | - let (bounded2, _) = bounded_load_assign(&rankings2, k, n2, cap2); |
125 | | - |
126 | | - let mut changes = 0; |
127 | | - for (before, after) in bounded.iter().zip(bounded2.iter()) { |
128 | | - for node in before { |
129 | | - if !after.contains(node) { |
130 | | - changes += 1; |
131 | | - } |
| 180 | +fn main() { |
| 181 | + let configs: &[(usize, usize, usize)] = &[ |
| 182 | + // (num_tokens, k, n) |
| 183 | + (64, 3, 24), // original |
| 184 | + (256, 3, 24), // more tokens, same k and n |
| 185 | + (64, 1, 24), // k=1 (no replication) |
| 186 | + (64, 5, 24), // higher replication |
| 187 | + (64, 3, 8), // fewer machines |
| 188 | + (64, 3, 60), // many machines (sparse) |
| 189 | + ]; |
| 190 | + let num_seeds = 1000; |
| 191 | + |
| 192 | + for (i, &(num_tokens, k, n)) in configs.iter().enumerate() { |
| 193 | + if i > 0 { |
| 194 | + println!("\n{}\n", "=".repeat(76)); |
132 | 195 | } |
| 196 | + run(num_tokens, k, n, num_seeds); |
133 | 197 | } |
134 | | - println!("\nConsistency: adding machine {n} → {n2}"); |
135 | | - println!( |
136 | | - " {changes}/{total} assignments changed ({:.1}%), ideal ≈ {:.1}%", |
137 | | - changes as f64 / total as f64 * 100.0, |
138 | | - k as f64 / n2 as f64 * 100.0 |
139 | | - ); |
140 | 198 | } |
0 commit comments