@@ -327,45 +327,49 @@ where
327327 ) -> FindKeyValuesStream < ' a , Self :: Error > {
328328 Box :: pin ( async_stream:: stream! {
329329 let mut stream = self . store. find_key_values_by_prefix_rev_iter( key_prefix) ;
330+ // Accumulator: (current_key, top_index, segments_descending).
331+ // segments[i] is the segment at index `top_index - i`.
332+ // An accumulation that never reaches index 0 is composed of
333+ // leftover segments (from a previously deleted big value, since
334+ // delete_key only removes the master) and is silently dropped.
335+ let mut state: Option <( Vec <u8 >, u32 , Vec <Vec <u8 >>) > = None ;
330336 while let Some ( item) = stream. next( ) . await {
331337 let ( mut big_key, value) =
332338 item. map_err( ValueSplittingError :: InnerStoreError ) ?;
333- let last_index = Self :: read_index_from_key( & big_key) ?;
339+ let index = Self :: read_index_from_key( & big_key) ?;
334340 big_key. truncate( big_key. len( ) - 4 ) ;
335341 let key = big_key;
336- // Collect segments in reverse order: from last_index down to 0.
337- let mut segments: Vec <( u32 , Vec <u8 >) > = vec![ ( last_index, value) ] ;
338- while segments. last( ) . unwrap( ) . 0 > 0 {
339- let expected = segments. last( ) . unwrap( ) . 0 - 1 ;
340- let next = stream. next( ) . await
341- . ok_or( ValueSplittingError :: MissingSegment ) ?;
342- let ( big_key2, value2) =
343- next. map_err( ValueSplittingError :: InnerStoreError ) ?;
344- if !( Self :: read_index_from_key( & big_key2) ? == expected
345- && big_key2. starts_with( & key)
346- && big_key2. len( ) == key. len( ) + 4 )
347- {
342+ let continues = match & state {
343+ Some ( ( current_key, top, segs) ) => {
344+ let segs_len = segs. len( ) as u32 ;
345+ * current_key == key && segs_len <= * top && index == * top - segs_len
346+ }
347+ None => false ,
348+ } ;
349+ if continues {
350+ state. as_mut( ) . unwrap( ) . 2 . push( value) ;
351+ } else {
352+ state = Some ( ( key, index, vec![ value] ) ) ;
353+ }
354+ if index == 0 {
355+ let ( key, top, segs) = state. take( ) . unwrap( ) ;
356+ let segment_zero_value = segs. last( ) . unwrap( ) ;
357+ let count = Self :: read_count_from_value( segment_zero_value) ?;
358+ if count == 0 || count > top + 1 {
348359 yield Err ( ValueSplittingError :: MissingSegment ) ;
349360 return ;
350361 }
351- segments. push( ( expected, value2) ) ;
352- }
353- // Segment 0 carries the count; indices >= count are stranded leftovers.
354- let ( _, segment_zero_value) = segments. last( ) . unwrap( ) ;
355- let count = Self :: read_count_from_value( segment_zero_value) ?;
356- if count == 0 || count > last_index + 1 {
357- yield Err ( ValueSplittingError :: MissingSegment ) ;
358- return ;
359- }
360- let mut big_value = Vec :: new( ) ;
361- for ( idx, val) in segments. iter( ) . rev( ) {
362- if * idx == 0 {
363- big_value. extend_from_slice( & val[ 4 ..] ) ;
364- } else if * idx < count {
365- big_value. extend_from_slice( val) ;
362+ let mut big_value = Vec :: new( ) ;
363+ for ( rev_pos, val) in segs. iter( ) . rev( ) . enumerate( ) {
364+ let idx = rev_pos as u32 ;
365+ if idx == 0 {
366+ big_value. extend_from_slice( & val[ 4 ..] ) ;
367+ } else if idx < count {
368+ big_value. extend_from_slice( val) ;
369+ }
366370 }
371+ yield Ok ( ( key, big_value) ) ;
367372 }
368- yield Ok ( ( key, big_value) ) ;
369373 }
370374 } )
371375 }
0 commit comments