@@ -17,6 +17,7 @@ use crate::expression::range_detacher::Range;
1717use crate :: serdes:: { ReferenceSerialization , ReferenceTables } ;
1818use crate :: storage:: { TableCache , Transaction } ;
1919use crate :: types:: value:: DataValue ;
20+ use kite_sql_serde_macros:: ReferenceSerialization ;
2021use siphasher:: sip:: SipHasher13 ;
2122use std:: borrow:: Borrow ;
2223use std:: hash:: { Hash , Hasher } ;
@@ -25,6 +26,51 @@ use std::marker::PhantomData;
2526use std:: { cmp, mem} ;
2627
2728pub ( crate ) type FastHasher = SipHasher13 ;
29+ pub ( crate ) const COUNT_MIN_SKETCH_STORAGE_PAGE_LEN : usize = 16 * 1024 ;
30+
31+ #[ derive( Debug , Clone , ReferenceSerialization ) ]
32+ pub struct CountMinSketchMeta {
33+ width : usize ,
34+ k_num : usize ,
35+ page_len : usize ,
36+ hasher_0 : FastHasher ,
37+ hasher_1 : FastHasher ,
38+ }
39+
40+ impl CountMinSketchMeta {
41+ pub fn width ( & self ) -> usize {
42+ self . width
43+ }
44+
45+ pub fn k_num ( & self ) -> usize {
46+ self . k_num
47+ }
48+
49+ pub fn page_len ( & self ) -> usize {
50+ self . page_len
51+ }
52+ }
53+
54+ impl CountMinSketchPage {
55+ pub fn row_idx ( & self ) -> usize {
56+ self . row_idx
57+ }
58+
59+ pub fn page_idx ( & self ) -> usize {
60+ self . page_idx
61+ }
62+
63+ pub fn counters ( & self ) -> & [ usize ] {
64+ & self . counters
65+ }
66+ }
67+
68+ #[ derive( Debug , Clone , ReferenceSerialization ) ]
69+ pub struct CountMinSketchPage {
70+ row_idx : usize ,
71+ page_idx : usize ,
72+ counters : Vec < usize > ,
73+ }
2874
2975// https://github.com/jedisct1/rust-count-min-sketch
3076#[ derive( Debug , Clone ) ]
@@ -37,6 +83,121 @@ pub struct CountMinSketch<K> {
3783 phantom_k : PhantomData < K > ,
3884}
3985
86+ impl < K > CountMinSketch < K > {
87+ pub fn storage_page_count ( & self , page_len : usize ) -> usize {
88+ self . counters
89+ . iter ( )
90+ . map ( |row| row. len ( ) . div_ceil ( page_len) )
91+ . sum ( )
92+ }
93+
94+ pub fn into_storage_parts (
95+ self ,
96+ page_len : usize ,
97+ ) -> ( CountMinSketchMeta , impl Iterator < Item = CountMinSketchPage > ) {
98+ let CountMinSketch {
99+ counters,
100+ hashers,
101+ mask,
102+ k_num,
103+ ..
104+ } = self ;
105+ let width = mask + 1 ;
106+ let meta = CountMinSketchMeta {
107+ width,
108+ k_num,
109+ page_len,
110+ hasher_0 : hashers[ 0 ] . clone ( ) ,
111+ hasher_1 : hashers[ 1 ] . clone ( ) ,
112+ } ;
113+ let pages = counters
114+ . into_iter ( )
115+ . enumerate ( )
116+ . flat_map ( move |( row_idx, counters) | {
117+ let page_count = counters. len ( ) . div_ceil ( page_len) ;
118+ ( 0 ..page_count) . map ( move |page_idx| {
119+ let start = page_idx * page_len;
120+ let end = ( ( page_idx + 1 ) * page_len) . min ( counters. len ( ) ) ;
121+
122+ CountMinSketchPage {
123+ row_idx,
124+ page_idx,
125+ counters : counters[ start..end] . to_vec ( ) ,
126+ }
127+ } )
128+ } ) ;
129+
130+ ( meta, pages)
131+ }
132+
133+ pub fn from_storage_parts (
134+ meta : CountMinSketchMeta ,
135+ pages : Vec < CountMinSketchPage > ,
136+ ) -> Result < Self , DatabaseError > {
137+ let width = meta. width ;
138+ let k_num = meta. k_num ;
139+ let page_len = meta. page_len ;
140+ if width == 0 || k_num == 0 || page_len == 0 {
141+ return Err ( DatabaseError :: InvalidValue (
142+ "count-min sketch storage meta is invalid" . to_string ( ) ,
143+ ) ) ;
144+ }
145+ if !width. is_power_of_two ( ) {
146+ return Err ( DatabaseError :: InvalidValue (
147+ "count-min sketch width must be a power of two" . to_string ( ) ,
148+ ) ) ;
149+ }
150+
151+ let mut counters = vec ! [ Vec :: with_capacity( width) ; k_num] ;
152+ let mut expected_page_idx = vec ! [ 0usize ; k_num] ;
153+
154+ for CountMinSketchPage {
155+ row_idx,
156+ page_idx,
157+ counters : page_counters,
158+ } in pages
159+ {
160+ if row_idx >= k_num {
161+ return Err ( DatabaseError :: InvalidValue ( format ! (
162+ "count-min sketch row index out of bounds: {row_idx}"
163+ ) ) ) ;
164+ }
165+ if page_idx != expected_page_idx[ row_idx] {
166+ return Err ( DatabaseError :: InvalidValue ( format ! (
167+ "count-min sketch page sequence is invalid: row={row_idx}, page={page_idx}, expected={}" ,
168+ expected_page_idx[ row_idx]
169+ ) ) ) ;
170+ }
171+ if page_counters. len ( ) > page_len {
172+ return Err ( DatabaseError :: InvalidValue ( format ! (
173+ "count-min sketch page is too large: row={row_idx}, page={page_idx}"
174+ ) ) ) ;
175+ }
176+
177+ counters[ row_idx] . extend ( page_counters) ;
178+ expected_page_idx[ row_idx] += 1 ;
179+ }
180+
181+ for ( row_idx, row) in counters. iter ( ) . enumerate ( ) {
182+ if row. len ( ) != width {
183+ return Err ( DatabaseError :: InvalidValue ( format ! (
184+ "count-min sketch row width mismatch: row={row_idx}, expected={width}, actual={}" ,
185+ row. len( )
186+ ) ) ) ;
187+ }
188+ }
189+
190+ Ok ( CountMinSketch {
191+ counters,
192+ offsets : vec ! [ 0 ; k_num] ,
193+ hashers : [ meta. hasher_0 , meta. hasher_1 ] ,
194+ mask : width - 1 ,
195+ k_num,
196+ phantom_k : Default :: default ( ) ,
197+ } )
198+ }
199+ }
200+
40201impl CountMinSketch < DataValue > {
41202 pub fn collect_count ( & self , ranges : & [ Range ] ) -> usize {
42203 let mut count = 0 ;
@@ -256,4 +417,25 @@ mod tests {
256417 300
257418 ) ;
258419 }
420+
421+ #[ test]
422+ fn test_storage_parts_roundtrip ( ) {
423+ let mut cms = CountMinSketch :: < DataValue > :: new ( 128 , 0.95 , 10.0 ) ;
424+ for i in 0 ..256 {
425+ cms. increment ( & DataValue :: Int32 ( i % 17 ) ) ;
426+ }
427+
428+ let ( meta, pages) = cms. clone ( ) . into_storage_parts ( 8 ) ;
429+ let rebuilt =
430+ CountMinSketch :: < DataValue > :: from_storage_parts ( meta, pages. collect ( ) ) . unwrap ( ) ;
431+
432+ assert_eq ! (
433+ cms. estimate( & DataValue :: Int32 ( 3 ) ) ,
434+ rebuilt. estimate( & DataValue :: Int32 ( 3 ) )
435+ ) ;
436+ assert_eq ! (
437+ cms. estimate( & DataValue :: Int32 ( 9 ) ) ,
438+ rebuilt. estimate( & DataValue :: Int32 ( 9 ) )
439+ ) ;
440+ }
259441}
0 commit comments