|
| 1 | +use std::hash::{DefaultHasher, Hash, Hasher}; |
| 2 | + |
| 3 | +/// One building block for the consistent hashing algorithm is a consistent |
| 4 | +/// hash iterator which enumerates all the hashes for a given for a specific bucket. |
| 5 | +/// A bucket covers the range `(1<<bit)..(2<<bit)`. |
| 6 | +#[derive(Default)] |
| 7 | +struct BucketIterator { |
| 8 | + hasher: DefaultHasher, |
| 9 | + n: usize, |
| 10 | + is_first: bool, |
| 11 | + bit: u64, |
| 12 | +} |
| 13 | + |
| 14 | +impl BucketIterator { |
| 15 | + fn new(key: u64, n: usize, bit: u64) -> Self { |
| 16 | + let mut hasher = DefaultHasher::new(); |
| 17 | + key.hash(&mut hasher); |
| 18 | + bit.hash(&mut hasher); |
| 19 | + Self { |
| 20 | + hasher, |
| 21 | + n, |
| 22 | + is_first: true, |
| 23 | + bit, |
| 24 | + } |
| 25 | + } |
| 26 | +} |
| 27 | + |
| 28 | +impl Iterator for BucketIterator { |
| 29 | + type Item = usize; |
| 30 | + |
| 31 | + fn next(&mut self) -> Option<Self::Item> { |
| 32 | + if self.bit == 0 { |
| 33 | + return None; |
| 34 | + } |
| 35 | + if self.is_first { |
| 36 | + let res = self.hasher.finish() % self.bit + self.bit; |
| 37 | + if res < self.n as u64 { |
| 38 | + self.n = res as usize; |
| 39 | + return Some(self.n); |
| 40 | + } |
| 41 | + self.is_first = false; |
| 42 | + } |
| 43 | + loop { |
| 44 | + 478392.hash(&mut self.hasher); |
| 45 | + let res = self.hasher.finish() % (self.bit * 2); |
| 46 | + if res & self.bit == 0 { |
| 47 | + return None; |
| 48 | + } |
| 49 | + if res < self.n as u64 { |
| 50 | + self.n = res as usize; |
| 51 | + return Some(self.n); |
| 52 | + } |
| 53 | + } |
| 54 | + } |
| 55 | +} |
| 56 | + |
| 57 | +/// An iterator which enumerates all the consistent hashes for a given key |
| 58 | +/// from largest to smallest in the range `0..n`. |
| 59 | +pub struct ConsistentHashRevIterator { |
| 60 | + bits: u64, |
| 61 | + key: u64, |
| 62 | + n: usize, |
| 63 | + inner: BucketIterator, |
| 64 | +} |
| 65 | + |
| 66 | +impl ConsistentHashRevIterator { |
| 67 | + pub fn new(key: u64, n: usize) -> Self { |
| 68 | + let mut hasher = DefaultHasher::new(); |
| 69 | + key.hash(&mut hasher); |
| 70 | + let bits = hasher.finish() % n.next_power_of_two() as u64; |
| 71 | + let inner = BucketIterator::default(); |
| 72 | + Self { |
| 73 | + bits, |
| 74 | + key, |
| 75 | + n, |
| 76 | + inner, |
| 77 | + } |
| 78 | + } |
| 79 | +} |
| 80 | + |
| 81 | +impl Iterator for ConsistentHashRevIterator { |
| 82 | + type Item = usize; |
| 83 | + |
| 84 | + fn next(&mut self) -> Option<Self::Item> { |
| 85 | + if self.n == 0 { |
| 86 | + return None; |
| 87 | + } |
| 88 | + if let Some(res) = self.inner.next() { |
| 89 | + return Some(res); |
| 90 | + } |
| 91 | + while self.bits > 0 { |
| 92 | + let bit = 1 << self.bits.ilog2(); |
| 93 | + self.bits ^= bit; |
| 94 | + self.inner = BucketIterator::new(self.key, self.n, bit); |
| 95 | + if let Some(res) = self.inner.next() { |
| 96 | + return Some(res); |
| 97 | + } |
| 98 | + } |
| 99 | + self.n = 0; |
| 100 | + Some(self.n) |
| 101 | + } |
| 102 | +} |
| 103 | + |
| 104 | +/// Same as `ConsistentHashRevIterator`, but iterates from smallest to largest |
| 105 | +/// for the range `n..`. |
| 106 | +pub struct ConsistentHashIterator { |
| 107 | + bits: u64, |
| 108 | + key: u64, |
| 109 | + n: usize, |
| 110 | + stack: Vec<usize>, |
| 111 | +} |
| 112 | + |
| 113 | +impl ConsistentHashIterator { |
| 114 | + pub fn new(key: u64, n: usize) -> Self { |
| 115 | + let mut hasher = DefaultHasher::new(); |
| 116 | + key.hash(&mut hasher); |
| 117 | + let mut bits = hasher.finish() as u64; |
| 118 | + bits &= !((n + 2).next_power_of_two() as u64 / 2 - 1); |
| 119 | + let stack = if n == 0 { vec![0] } else { vec![] }; |
| 120 | + Self { |
| 121 | + bits, |
| 122 | + key, |
| 123 | + n, |
| 124 | + stack, |
| 125 | + } |
| 126 | + } |
| 127 | +} |
| 128 | + |
| 129 | +impl Iterator for ConsistentHashIterator { |
| 130 | + type Item = usize; |
| 131 | + |
| 132 | + fn next(&mut self) -> Option<Self::Item> { |
| 133 | + if let Some(res) = self.stack.pop() { |
| 134 | + return Some(res); |
| 135 | + } |
| 136 | + while self.bits > 0 { |
| 137 | + let bit = self.bits & !(self.bits - 1); |
| 138 | + self.bits &= self.bits - 1; |
| 139 | + let inner = BucketIterator::new(self.key, bit as usize * 2, bit); |
| 140 | + self.stack = inner.take_while(|x| *x >= self.n).collect(); |
| 141 | + if let Some(res) = self.stack.pop() { |
| 142 | + return Some(res); |
| 143 | + } |
| 144 | + } |
| 145 | + None |
| 146 | + } |
| 147 | +} |
| 148 | + |
| 149 | +/// Wrapper around `ConsistentHashIterator` and `ConsistentHashRevIterator` to compute |
| 150 | +/// the next or previous consistent hash for a given key for a given number of nodes `n`. |
| 151 | +pub struct ConsistentHasher { |
| 152 | + key: u64, |
| 153 | +} |
| 154 | + |
| 155 | +impl ConsistentHasher { |
| 156 | + pub fn new(key: u64) -> Self { |
| 157 | + Self { key } |
| 158 | + } |
| 159 | + |
| 160 | + pub fn prev(&self, n: usize) -> usize { |
| 161 | + let mut sampler = ConsistentHashRevIterator::new(self.key, n); |
| 162 | + sampler.next().expect("n must be > 0!") |
| 163 | + } |
| 164 | + |
| 165 | + pub fn next(&self, n: usize) -> usize { |
| 166 | + let mut sampler = ConsistentHashIterator::new(self.key, n); |
| 167 | + sampler.next().expect("Exceeded iterator bounds :(") |
| 168 | + } |
| 169 | +} |
| 170 | + |
| 171 | +/// Implementation of a consistent choose k hashing algorithm. |
| 172 | +/// It returns k distinct consistent hashes in the range `0..n`. |
| 173 | +/// The hashes are consistent when `n` changes and when `k` changes! |
| 174 | +/// I.e. on average exactly `1/(n+1)` (resp. `1/(k+1)`) many hashes will change |
| 175 | +/// when `n` (resp. `k`) increases by one. Additionally, the returned `k` tuple |
| 176 | +/// is guaranteed to be uniformely chosen from all possible `n-choose-k` tuples. |
| 177 | +pub struct ConsistentChooseKHasher { |
| 178 | + key: u64, |
| 179 | + k: usize, |
| 180 | +} |
| 181 | + |
| 182 | +impl ConsistentChooseKHasher { |
| 183 | + pub fn new(key: u64, k: usize) -> Self { |
| 184 | + Self { key, k } |
| 185 | + } |
| 186 | + |
| 187 | + // TODO: Implement this as an iterator! |
| 188 | + pub fn prev(&self, mut n: usize) -> Vec<usize> { |
| 189 | + let mut samples = Vec::with_capacity(self.k); |
| 190 | + let mut samplers: Vec<_> = (0..self.k) |
| 191 | + .map(|i| ConsistentHashRevIterator::new(self.key + 43987492 * i as u64, n - i).peekable()) |
| 192 | + .collect(); |
| 193 | + for i in (0..self.k).rev() { |
| 194 | + let mut max = 0; |
| 195 | + for k in 0..=i { |
| 196 | + while samplers[k].peek() >= Some(&(n - k)) && n - k > 0 { |
| 197 | + samplers[k].next(); |
| 198 | + } |
| 199 | + max = max.max(samplers[k].peek().unwrap() + k); |
| 200 | + } |
| 201 | + samples.push(max); |
| 202 | + n = max; |
| 203 | + } |
| 204 | + samples.sort(); |
| 205 | + samples |
| 206 | + } |
| 207 | +} |
| 208 | + |
| 209 | + |
| 210 | +#[cfg(test)] |
| 211 | +mod tests { |
| 212 | + use super::*; |
| 213 | + |
| 214 | + #[test] |
| 215 | + fn test_uniform_1() { |
| 216 | + for k in 0..100 { |
| 217 | + let sampler = ConsistentHasher::new(k); |
| 218 | + for n in 0..1000 { |
| 219 | + assert!(sampler.prev(n + 1) <= sampler.prev(n + 2)); |
| 220 | + let next = sampler.next(n); |
| 221 | + assert_eq!(next, sampler.prev(next + 1)); |
| 222 | + } |
| 223 | + let mut iter_rev: Vec<_> = ConsistentHashIterator::new(k, 0) |
| 224 | + .take_while(|x| *x < 1000) |
| 225 | + .collect(); |
| 226 | + iter_rev.reverse(); |
| 227 | + let iter: Vec<_> = ConsistentHashRevIterator::new(k, 1000).collect(); |
| 228 | + assert_eq!(iter, iter_rev); |
| 229 | + } |
| 230 | + let mut stats = vec![0; 13]; |
| 231 | + for i in 0..100000 { |
| 232 | + let sampler = ConsistentHasher::new(i); |
| 233 | + let x = sampler.prev(stats.len()); |
| 234 | + stats[x] += 1; |
| 235 | + } |
| 236 | + println!("{stats:?}"); |
| 237 | + } |
| 238 | + |
| 239 | + #[test] |
| 240 | + fn test_uniform_k() { |
| 241 | + const K: usize = 3; |
| 242 | + for k in 0..100 { |
| 243 | + let sampler = ConsistentChooseKHasher::new(k, K); |
| 244 | + for n in K..1000 { |
| 245 | + let samples = sampler.prev(n + 1); |
| 246 | + assert!(samples.len() == K); |
| 247 | + for i in 0..K - 1 { |
| 248 | + assert!(samples[i] < samples[i + 1]); |
| 249 | + } |
| 250 | + let next = sampler.prev(n + 2); |
| 251 | + for i in 0..K { |
| 252 | + assert!(samples[i] <= next[i]); |
| 253 | + } |
| 254 | + let mut merged = samples.clone(); |
| 255 | + merged.extend(next.clone()); |
| 256 | + merged.sort(); |
| 257 | + merged.dedup(); |
| 258 | + assert!( |
| 259 | + merged.len() == K || merged.len() == K + 1, |
| 260 | + "Unexpected {samples:?} vs. {next:?}" |
| 261 | + ); |
| 262 | + } |
| 263 | + } |
| 264 | + let mut stats = vec![0; 8]; |
| 265 | + for i in 0..32 { |
| 266 | + let sampler = ConsistentChooseKHasher::new(i + 32783, 2); |
| 267 | + let samples = sampler.prev(stats.len()); |
| 268 | + for s in samples { |
| 269 | + stats[s] += 1; |
| 270 | + } |
| 271 | + } |
| 272 | + println!("{stats:?}"); |
| 273 | + // Test consistency when increasing k! |
| 274 | + for k in 1..10 { |
| 275 | + for n in k + 1..20 { |
| 276 | + for key in 0..1000 { |
| 277 | + let sampler1 = ConsistentChooseKHasher::new(key, k); |
| 278 | + let sampler2 = ConsistentChooseKHasher::new(key, k + 1); |
| 279 | + let set1 = sampler1.prev(n); |
| 280 | + let set2 = sampler2.prev(n); |
| 281 | + assert_eq!(set1.len(), k); |
| 282 | + assert_eq!(set2.len(), k + 1); |
| 283 | + let mut merged = set1.clone(); |
| 284 | + merged.extend(set2); |
| 285 | + merged.sort(); |
| 286 | + merged.dedup(); |
| 287 | + assert_eq!(merged.len(), k + 1); |
| 288 | + } |
| 289 | + } |
| 290 | + } |
| 291 | + } |
| 292 | +} |
0 commit comments