Skip to content

Commit 48e3b0b

Browse files
committed
Update bounded_load.rs
1 parent da37dd7 commit 48e3b0b

1 file changed

Lines changed: 116 additions & 33 deletions

File tree

crates/consistent-hashing/examples/bounded_load.rs

Lines changed: 116 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,16 @@
1212
//! Run with: cargo run --example bounded_load
1313
1414
use std::hash::{DefaultHasher, Hash, Hasher};
15+
use std::rc::Rc;
1516

1617
use consistent_hashing::ConsistentChooseKHasher;
1718

1819
/// Bounded-load assignment.
1920
///
2021
/// Each token claims all k replicas before moving to the next token,
2122
/// skipping any node that has reached `max_load`.
22-
fn bounded_load_assign(
23-
iters: impl IntoIterator<Item = ConsistentChooseKHasher<DefaultHasher>>,
23+
fn bounded_load_assign<I: Iterator<Item = usize>>(
24+
iters: impl IntoIterator<Item = I>,
2425
k: usize,
2526
n: usize,
2627
max_load: usize,
@@ -44,6 +45,79 @@ fn bounded_load_assign(
4445
(assignments, load)
4546
}
4647

48+
/// Count the number of assignments that changed between two runs.
49+
fn count_churn(before: &[Vec<usize>], after: &[Vec<usize>]) -> usize {
50+
before
51+
.iter()
52+
.zip(after.iter())
53+
.map(|(b, a)| b.iter().filter(|node| !a.contains(node)).count())
54+
.sum()
55+
}
56+
57+
/// Load spread: difference between max and min loaded nodes.
58+
fn load_spread(load: &[usize]) -> usize {
59+
load.iter().max().unwrap() - load.iter().min().unwrap()
60+
}
61+
62+
/// A hash ring with `v` virtual nodes per physical node.
63+
struct HashRing {
64+
ring: Rc<Vec<(u64, usize)>>,
65+
}
66+
67+
impl HashRing {
68+
fn new(seed: u64, n: usize, v: usize) -> Self {
69+
let mut ring: Vec<(u64, usize)> = (0..n)
70+
.flat_map(|node| {
71+
(0..v).map(move |vi| {
72+
let mut h = DefaultHasher::default();
73+
seed.hash(&mut h);
74+
node.hash(&mut h);
75+
vi.hash(&mut h);
76+
(h.finish(), node)
77+
})
78+
})
79+
.collect();
80+
ring.sort_unstable_by_key(|&(pos, _)| pos);
81+
Self { ring: Rc::new(ring) }
82+
}
83+
84+
/// Return an iterator over distinct physical nodes for the given token hash,
85+
/// walking clockwise from the token's position on the ring.
86+
fn iter(&self, token_hash: u64) -> HashRingIter {
87+
let start = self.ring.partition_point(|&(pos, _)| pos < token_hash);
88+
HashRingIter {
89+
ring: Rc::clone(&self.ring),
90+
start,
91+
offset: 0,
92+
seen: Vec::new(),
93+
}
94+
}
95+
}
96+
97+
/// Iterator that walks a hash ring clockwise, yielding distinct physical nodes.
98+
struct HashRingIter {
99+
ring: Rc<Vec<(u64, usize)>>,
100+
start: usize,
101+
offset: usize,
102+
seen: Vec<usize>,
103+
}
104+
105+
impl Iterator for HashRingIter {
106+
type Item = usize;
107+
108+
fn next(&mut self) -> Option<usize> {
109+
while self.offset < self.ring.len() {
110+
let (_, node) = self.ring[(self.start + self.offset) % self.ring.len()];
111+
self.offset += 1;
112+
if !self.seen.contains(&node) {
113+
self.seen.push(node);
114+
return Some(node);
115+
}
116+
}
117+
None
118+
}
119+
}
120+
47121
fn hasher_for_seed_and_key(seed: u64, key: u64) -> DefaultHasher {
48122
let mut h = DefaultHasher::default();
49123
seed.hash(&mut h);
@@ -86,6 +160,8 @@ impl Stats {
86160
}
87161
}
88162

163+
const VIRTUAL_NODES: usize = 200;
164+
89165
fn run(num_tokens: usize, k: usize, n: usize, num_seeds: u64) {
90166
let total = num_tokens * k;
91167
let cap = total.div_ceil(n);
@@ -103,73 +179,80 @@ fn run(num_tokens: usize, k: usize, n: usize, num_seeds: u64) {
103179

104180
let mut ub_spread = Stats::new();
105181
let mut b_spread = Stats::new();
182+
let mut ring_spread = Stats::new();
106183
let mut ub_changes = Stats::new();
107184
let mut b_changes = Stats::new();
185+
let mut ring_changes = Stats::new();
108186

109187
for seed in 0..num_seeds {
110-
// ── Unbounded ────────────────────────────────────────────────────
188+
// ── Choose-k (unbounded) ─────────────────────────────────────────
111189
let make_iters = |n| {
112190
(0..num_tokens as u64)
113191
.map(move |key| ConsistentChooseKHasher::new(hasher_for_seed_and_key(seed, key), n))
114192
};
115193
let (unbounded, ub_load) = bounded_load_assign(make_iters(n), k, n, usize::MAX);
116-
let ub_min = *ub_load.iter().min().unwrap();
117-
let ub_max = *ub_load.iter().max().unwrap();
118-
ub_spread.push((ub_max - ub_min) as f64);
194+
ub_spread.push(load_spread(&ub_load) as f64);
119195

120-
// ── Bounded ──────────────────────────────────────────────────────
196+
// ── Choose-k (bounded) ───────────────────────────────────────────
121197
let (bounded, b_load) = bounded_load_assign(make_iters(n), k, n, cap);
122-
let b_min = *b_load.iter().min().unwrap();
123-
let b_max = *b_load.iter().max().unwrap();
124-
b_spread.push((b_max - b_min) as f64);
198+
b_spread.push(load_spread(&b_load) as f64);
199+
200+
// ── Hash ring (bounded) ──────────────────────────────────────────
201+
let ring = HashRing::new(seed, n, VIRTUAL_NODES);
202+
let (ring_assign, r_load) = bounded_load_assign(
203+
(0..num_tokens as u64)
204+
.map(|key| ring.iter(hasher_for_seed_and_key(seed, key).finish())),
205+
k,
206+
n,
207+
cap,
208+
);
209+
ring_spread.push(load_spread(&r_load) as f64);
125210

126211
// ── Consistency: add one machine ─────────────────────────────────
127212
let n2 = n + 1;
128213
let cap2 = total.div_ceil(n2);
129214

130215
let (unbounded2, _) = bounded_load_assign(make_iters(n2), k, n2, usize::MAX);
131-
let mut ub_chg = 0usize;
132-
for (before, after) in unbounded.iter().zip(unbounded2.iter()) {
133-
for node in before {
134-
if !after.contains(node) {
135-
ub_chg += 1;
136-
}
137-
}
138-
}
139-
ub_changes.push(ub_chg as f64 / total as f64 * 100.0);
216+
ub_changes.push(count_churn(&unbounded, &unbounded2) as f64 / total as f64 * 100.0);
140217

141218
let (bounded2, _) = bounded_load_assign(make_iters(n2), k, n2, cap2);
142-
let mut b_chg = 0usize;
143-
for (before, after) in bounded.iter().zip(bounded2.iter()) {
144-
for node in before {
145-
if !after.contains(node) {
146-
b_chg += 1;
147-
}
148-
}
149-
}
150-
b_changes.push(b_chg as f64 / total as f64 * 100.0);
219+
b_changes.push(count_churn(&bounded, &bounded2) as f64 / total as f64 * 100.0);
220+
221+
let ring2 = HashRing::new(seed, n2, VIRTUAL_NODES);
222+
let (ring_assign2, _) = bounded_load_assign(
223+
(0..num_tokens as u64)
224+
.map(|key| ring2.iter(hasher_for_seed_and_key(seed, key).finish())),
225+
k,
226+
n2,
227+
cap2,
228+
);
229+
ring_changes.push(count_churn(&ring_assign, &ring_assign2) as f64 / total as f64 * 100.0);
151230
}
152231

153232
println!(
154-
"{:<24} {:>16} {:>16}",
155-
"", "Unbounded", "Bounded"
233+
"{:<24} {:>16} {:>16} {:>16}",
234+
"", "Choose-k", "Bounded", "Ring Bounded"
156235
);
157-
println!("{:-<24} {:->16} {:->16}", "", "", "");
236+
println!("{:-<24} {:->16} {:->16} {:->16}", "", "", "", "");
158237
println!(
159-
"{:<24} {:>11.2} ± {:<5.2} {:>10.2} ± {:<5.2}",
238+
"{:<24} {:>11.2} ± {:<5.2} {:>10.2} ± {:<5.2} {:>10.2} ± {:<5.2}",
160239
"Load spread (max-min)",
161240
ub_spread.mean(),
162241
ub_spread.stddev(),
163242
b_spread.mean(),
164243
b_spread.stddev(),
244+
ring_spread.mean(),
245+
ring_spread.stddev(),
165246
);
166247
println!(
167-
"{:<24} {:>10.2}% ± {:<5.2} {:>9.2}% ± {:<5.2}",
248+
"{:<24} {:>10.2}% ± {:<5.2} {:>9.2}% ± {:<5.2} {:>9.2}% ± {:<5.2}",
168249
"Churn on n→n+1",
169250
ub_changes.mean(),
170251
ub_changes.stddev(),
171252
b_changes.mean(),
172253
b_changes.stddev(),
254+
ring_changes.mean(),
255+
ring_changes.stddev(),
173256
);
174257
println!(
175258
"\n ideal churn: {:.2}%",

0 commit comments

Comments
 (0)