1111//!
1212//! Run with: cargo run --example bounded_load
1313
14+ use std:: collections:: HashSet ;
1415use std:: hash:: { DefaultHasher , Hash , Hasher } ;
15- use std:: rc :: Rc ;
16+ use std:: time :: Instant ;
1617
1718use consistent_hashing:: ConsistentChooseKHasher ;
1819
@@ -54,14 +55,16 @@ fn count_churn(before: &[Vec<usize>], after: &[Vec<usize>]) -> usize {
5455 . sum ( )
5556}
5657
57- /// Load spread: difference between max and min loaded nodes.
58- fn load_spread ( load : & [ usize ] ) -> usize {
59- load. iter ( ) . max ( ) . unwrap ( ) - load. iter ( ) . min ( ) . unwrap ( )
58+ /// Standard deviation of load across machines.
59+ fn load_stddev ( load : & [ usize ] ) -> f64 {
60+ let mean = load. iter ( ) . sum :: < usize > ( ) as f64 / load. len ( ) as f64 ;
61+ let var = load. iter ( ) . map ( |& x| ( x as f64 - mean) . powi ( 2 ) ) . sum :: < f64 > ( ) / load. len ( ) as f64 ;
62+ var. sqrt ( )
6063}
6164
6265/// A hash ring with `v` virtual nodes per physical node.
6366struct HashRing {
64- ring : Rc < Vec < ( u64 , usize ) > > ,
67+ ring : Vec < ( u64 , usize ) > ,
6568}
6669
6770impl HashRing {
@@ -78,39 +81,38 @@ impl HashRing {
7881 } )
7982 . collect ( ) ;
8083 ring. sort_unstable_by_key ( |& ( pos, _) | pos) ;
81- Self { ring : Rc :: new ( ring ) }
84+ Self { ring }
8285 }
8386
8487 /// Return an iterator over distinct physical nodes for the given token hash,
8588 /// walking clockwise from the token's position on the ring.
86- fn iter ( & self , token_hash : u64 ) -> HashRingIter {
89+ fn iter ( & self , token_hash : u64 ) -> HashRingIter < ' _ > {
8790 let start = self . ring . partition_point ( |& ( pos, _) | pos < token_hash) ;
8891 HashRingIter {
89- ring : Rc :: clone ( & self . ring ) ,
92+ ring : & self . ring ,
9093 start,
9194 offset : 0 ,
92- seen : Vec :: new ( ) ,
95+ seen : HashSet :: new ( ) ,
9396 }
9497 }
9598}
9699
97100/// Iterator that walks a hash ring clockwise, yielding distinct physical nodes.
98- struct HashRingIter {
99- ring : Rc < Vec < ( u64 , usize ) > > ,
101+ struct HashRingIter < ' a > {
102+ ring : & ' a [ ( u64 , usize ) ] ,
100103 start : usize ,
101104 offset : usize ,
102- seen : Vec < usize > ,
105+ seen : HashSet < usize > ,
103106}
104107
105- impl Iterator for HashRingIter {
108+ impl Iterator for HashRingIter < ' _ > {
106109 type Item = usize ;
107110
108111 fn next ( & mut self ) -> Option < usize > {
109112 while self . offset < self . ring . len ( ) {
110113 let ( _, node) = self . ring [ ( self . start + self . offset ) % self . ring . len ( ) ] ;
111114 self . offset += 1 ;
112- if !self . seen . contains ( & node) {
113- self . seen . push ( node) ;
115+ if self . seen . insert ( node) {
114116 return Some ( node) ;
115117 }
116118 }
@@ -164,7 +166,7 @@ const VIRTUAL_NODES: usize = 200;
164166
165167fn run ( num_tokens : usize , k : usize , n : usize , num_seeds : u64 ) {
166168 let total = num_tokens * k;
167- let cap = total. div_ceil ( n) ;
169+ let cap = total. div_ceil ( n) + 1 ;
168170
169171 println ! ( "Parameters: {num_tokens} tokens, k={k} replicas, {n} machines, {num_seeds} seeds" ) ;
170172 println ! ( "Total assignments: {total}, capacity cap per machine: {cap}" ) ;
@@ -183,34 +185,43 @@ fn run(num_tokens: usize, k: usize, n: usize, num_seeds: u64) {
183185 let mut ub_changes = Stats :: new ( ) ;
184186 let mut b_changes = Stats :: new ( ) ;
185187 let mut ring_changes = Stats :: new ( ) ;
188+ let mut ub_time_us = 0u128 ;
189+ let mut b_time_us = 0u128 ;
190+ let mut ring_time_us = 0u128 ;
186191
187192 for seed in 0 ..num_seeds {
188193 // ── Choose-k (unbounded) ─────────────────────────────────────────
189194 let make_iters = |n| {
190195 ( 0 ..num_tokens as u64 )
191196 . map ( move |key| ConsistentChooseKHasher :: new ( hasher_for_seed_and_key ( seed, key) , n) )
192197 } ;
198+ let t = Instant :: now ( ) ;
193199 let ( unbounded, ub_load) = bounded_load_assign ( make_iters ( n) , k, n, usize:: MAX ) ;
194- ub_spread. push ( load_spread ( & ub_load) as f64 ) ;
200+ ub_time_us += t. elapsed ( ) . as_micros ( ) ;
201+ ub_spread. push ( load_stddev ( & ub_load) ) ;
195202
196203 // ── Choose-k (bounded) ───────────────────────────────────────────
204+ let t = Instant :: now ( ) ;
197205 let ( bounded, b_load) = bounded_load_assign ( make_iters ( n) , k, n, cap) ;
198- b_spread. push ( load_spread ( & b_load) as f64 ) ;
206+ b_time_us += t. elapsed ( ) . as_micros ( ) ;
207+ b_spread. push ( load_stddev ( & b_load) ) ;
199208
200209 // ── Hash ring (bounded) ──────────────────────────────────────────
201210 let ring = HashRing :: new ( seed, n, VIRTUAL_NODES ) ;
211+ let t = Instant :: now ( ) ;
202212 let ( ring_assign, r_load) = bounded_load_assign (
203213 ( 0 ..num_tokens as u64 )
204214 . map ( |key| ring. iter ( hasher_for_seed_and_key ( seed, key) . finish ( ) ) ) ,
205215 k,
206216 n,
207217 cap,
208218 ) ;
209- ring_spread. push ( load_spread ( & r_load) as f64 ) ;
219+ ring_time_us += t. elapsed ( ) . as_micros ( ) ;
220+ ring_spread. push ( load_stddev ( & r_load) ) ;
210221
211222 // ── Consistency: add one machine ─────────────────────────────────
212223 let n2 = n + 1 ;
213- let cap2 = total. div_ceil ( n2) ;
224+ let cap2 = total. div_ceil ( n2) + 1 ;
214225
215226 let ( unbounded2, _) = bounded_load_assign ( make_iters ( n2) , k, n2, usize:: MAX ) ;
216227 ub_changes. push ( count_churn ( & unbounded, & unbounded2) as f64 / total as f64 * 100.0 ) ;
@@ -236,7 +247,7 @@ fn run(num_tokens: usize, k: usize, n: usize, num_seeds: u64) {
236247 println ! ( "{:-<24} {:->16} {:->16} {:->16}" , "" , "" , "" , "" ) ;
237248 println ! (
238249 "{:<24} {:>11.2} ± {:<5.2} {:>10.2} ± {:<5.2} {:>10.2} ± {:<5.2}" ,
239- "Load spread (max-min) " ,
250+ "Load stddev " ,
240251 ub_spread. mean( ) ,
241252 ub_spread. stddev( ) ,
242253 b_spread. mean( ) ,
@@ -255,24 +266,28 @@ fn run(num_tokens: usize, k: usize, n: usize, num_seeds: u64) {
255266 ring_changes. stddev( ) ,
256267 ) ;
257268 println ! (
258- "\n ideal churn: {:.2}%" ,
259- 1.0 / ( n + 1 ) as f64 * 100.0
269+ "{:<24} {:>13.1} ms {:>13.1} ms {:>13.1} ms" ,
270+ "Total time" ,
271+ ub_time_us as f64 / 1000.0 ,
272+ b_time_us as f64 / 1000.0 ,
273+ ring_time_us as f64 / 1000.0 ,
260274 ) ;
275+ println ! ( "\n ideal churn: {:.2}%" , 1.0 / ( n + 1 ) as f64 * 100.0 ) ;
261276}
262277
263278fn main ( ) {
264- let configs: & [ ( usize , usize , usize ) ] = & [
265- // (num_tokens, k, n)
266- ( 64 , 3 , 24 ) , // original
267- ( 256 , 3 , 24 ) , // more tokens, same k and n
268- ( 64 , 1 , 24 ) , // k=1 (no replication)
269- ( 64 , 5 , 24 ) , // higher replication
270- ( 64 , 3 , 8 ) , // fewer machines
271- ( 64 , 3 , 60 ) , // many machines (sparse)
279+ // (num_tokens, k, n, num_seeds)
280+ let configs: & [ ( usize , usize , usize , u64 ) ] = & [
281+ ( 64 , 3 , 24 , 1000 ) , // original
282+ ( 256 , 3 , 24 , 1000 ) , // more tokens, same k and n
283+ ( 64 , 1 , 24 , 1000 ) , // k=1 (no replication)
284+ ( 64 , 5 , 24 , 1000 ) , // higher replication
285+ ( 64 , 3 , 8 , 1000 ) , // fewer machines
286+ ( 64 , 3 , 60 , 1000 ) , // many machines (sparse)
287+ ( 1_000_000 , 3 , 100_000 , 1 ) , // 1M tokens, 100k machines
272288 ] ;
273- let num_seeds = 1000 ;
274289
275- for ( i, & ( num_tokens, k, n) ) in configs. iter ( ) . enumerate ( ) {
290+ for ( i, & ( num_tokens, k, n, num_seeds ) ) in configs. iter ( ) . enumerate ( ) {
276291 if i > 0 {
277292 println ! ( "\n {}\n " , "=" . repeat( 76 ) ) ;
278293 }
0 commit comments