@@ -24,7 +24,7 @@ use bitcoin::Network;
2424use lightning:: impl_writeable_tlv_based_enum;
2525use lightning:: io:: { self , Error , ErrorKind } ;
2626use lightning:: sign:: { EntropySource as LdkEntropySource , RandomBytes } ;
27- use lightning:: util:: persist:: KVStore ;
27+ use lightning:: util:: persist:: { KVStore , PageToken , PaginatedKVStore , PaginatedListResponse } ;
2828use lightning:: util:: ser:: { Readable , Writeable } ;
2929use prost:: Message ;
3030use vss_client:: client:: VssClient ;
@@ -293,6 +293,32 @@ impl KVStore for VssStore {
293293 }
294294}
295295
296+ impl PaginatedKVStore for VssStore {
297+ fn list_paginated (
298+ & self , primary_namespace : & str , secondary_namespace : & str , page_token : Option < PageToken > ,
299+ ) -> impl Future < Output = Result < PaginatedListResponse , io:: Error > > + ' static + Send {
300+ let primary_namespace = primary_namespace. to_string ( ) ;
301+ let secondary_namespace = secondary_namespace. to_string ( ) ;
302+ let inner = Arc :: clone ( & self . inner ) ;
303+ let runtime = self . internal_runtime ( ) ;
304+ async move {
305+ let task = runtime. spawn ( async move {
306+ inner
307+ . list_paginated_internal (
308+ & inner. async_client ,
309+ primary_namespace,
310+ secondary_namespace,
311+ page_token,
312+ )
313+ . await
314+ } ) ;
315+ task. await . map_err ( |e| {
316+ io:: Error :: new ( io:: ErrorKind :: Other , format ! ( "VSS runtime task failed: {}" , e) )
317+ } ) ?
318+ }
319+ }
320+ }
321+
296322impl Drop for VssStore {
297323 fn drop ( & mut self ) {
298324 if let Some ( runtime) = self . internal_runtime . take ( ) {
@@ -416,7 +442,9 @@ impl VssStoreInner {
416442 keys. push ( self . extract_key ( & kv. key ) ?) ;
417443 }
418444
419- Ok ( ( keys, response. next_page_token ) )
445+ // VSS may return an empty string instead of None to signal the last page.
446+ let next_page_token = response. next_page_token . filter ( |t| !t. is_empty ( ) ) ;
447+ Ok ( ( keys, next_page_token) )
420448 }
421449
422450 async fn read_internal (
@@ -548,13 +576,42 @@ impl VssStoreInner {
548576 . await ?;
549577 keys. extend ( page_keys) ;
550578 match next_page_token {
551- Some ( t) if !t . is_empty ( ) => page_token = Some ( t) ,
552- _ => break ,
579+ Some ( t) => page_token = Some ( t) ,
580+ None => break ,
553581 }
554582 }
555583 Ok ( keys)
556584 }
557585
586+ async fn list_paginated_internal (
587+ & self , client : & VssClient < CustomRetryPolicy > , primary_namespace : String ,
588+ secondary_namespace : String , page_token : Option < PageToken > ,
589+ ) -> io:: Result < PaginatedListResponse > {
590+ check_namespace_key_validity (
591+ & primary_namespace,
592+ & secondary_namespace,
593+ None ,
594+ "list_paginated" ,
595+ ) ?;
596+
597+ const PAGE_SIZE : i32 = 50 ;
598+
599+ let vss_page_token = page_token. map ( |t| t. to_string ( ) ) ;
600+ let ( keys, next_page_token) = self
601+ . list_keys (
602+ client,
603+ & primary_namespace,
604+ & secondary_namespace,
605+ vss_page_token,
606+ Some ( PAGE_SIZE ) ,
607+ )
608+ . await ?;
609+
610+ let next_page_token = next_page_token. map ( PageToken :: new) ;
611+
612+ Ok ( PaginatedListResponse { keys, next_page_token } )
613+ }
614+
558615 async fn execute_locked_write <
559616 F : Future < Output = Result < ( ) , lightning:: io:: Error > > ,
560617 FN : FnOnce ( ) -> F ,
@@ -963,4 +1020,85 @@ mod tests {
9631020 do_read_write_remove_list_persist ( & vss_store) . await ;
9641021 drop ( vss_store)
9651022 }
1023+
1024+ #[ tokio:: test]
1025+ async fn vss_paginated_listing ( ) {
1026+ let store = build_vss_store ( ) ;
1027+ let ns = "test_paginated" ;
1028+ let sub = "listing" ;
1029+ let num_entries = 5 ;
1030+
1031+ for i in 0 ..num_entries {
1032+ let key = format ! ( "key_{:04}" , i) ;
1033+ let data = vec ! [ i as u8 ; 32 ] ;
1034+ KVStore :: write ( & store, ns, sub, & key, data) . await . unwrap ( ) ;
1035+ }
1036+
1037+ let mut all_keys = Vec :: new ( ) ;
1038+ let mut page_token = None ;
1039+
1040+ loop {
1041+ let response =
1042+ PaginatedKVStore :: list_paginated ( & store, ns, sub, page_token) . await . unwrap ( ) ;
1043+ all_keys. extend ( response. keys ) ;
1044+ match response. next_page_token {
1045+ Some ( token) => page_token = Some ( token) ,
1046+ _ => break ,
1047+ }
1048+ }
1049+
1050+ assert_eq ! ( all_keys. len( ) , num_entries) ;
1051+
1052+ // Verify no duplicates
1053+ let mut unique = all_keys. clone ( ) ;
1054+ unique. sort ( ) ;
1055+ unique. dedup ( ) ;
1056+ assert_eq ! ( unique. len( ) , num_entries) ;
1057+ }
1058+
1059+ #[ tokio:: test]
1060+ async fn vss_paginated_empty_namespace ( ) {
1061+ let store = build_vss_store ( ) ;
1062+ let response =
1063+ PaginatedKVStore :: list_paginated ( & store, "nonexistent" , "ns" , None ) . await . unwrap ( ) ;
1064+ assert ! ( response. keys. is_empty( ) ) ;
1065+ assert ! ( response. next_page_token. is_none( ) ) ;
1066+ }
1067+
1068+ #[ tokio:: test]
1069+ async fn vss_paginated_removal ( ) {
1070+ let store = build_vss_store ( ) ;
1071+ let ns = "test_paginated" ;
1072+ let sub = "removal" ;
1073+
1074+ KVStore :: write ( & store, ns, sub, "a" , vec ! [ 1u8 ; 8 ] ) . await . unwrap ( ) ;
1075+ KVStore :: write ( & store, ns, sub, "b" , vec ! [ 2u8 ; 8 ] ) . await . unwrap ( ) ;
1076+ KVStore :: write ( & store, ns, sub, "c" , vec ! [ 3u8 ; 8 ] ) . await . unwrap ( ) ;
1077+
1078+ KVStore :: remove ( & store, ns, sub, "b" , false ) . await . unwrap ( ) ;
1079+
1080+ let response = PaginatedKVStore :: list_paginated ( & store, ns, sub, None ) . await . unwrap ( ) ;
1081+ assert_eq ! ( response. keys. len( ) , 2 ) ;
1082+ assert ! ( response. keys. contains( & "a" . to_string( ) ) ) ;
1083+ assert ! ( !response. keys. contains( & "b" . to_string( ) ) ) ;
1084+ assert ! ( response. keys. contains( & "c" . to_string( ) ) ) ;
1085+ }
1086+
1087+ #[ tokio:: test]
1088+ async fn vss_paginated_namespace_isolation ( ) {
1089+ let store = build_vss_store ( ) ;
1090+
1091+ KVStore :: write ( & store, "ns_a" , "sub" , "key_1" , vec ! [ 1u8 ; 8 ] ) . await . unwrap ( ) ;
1092+ KVStore :: write ( & store, "ns_a" , "sub" , "key_2" , vec ! [ 2u8 ; 8 ] ) . await . unwrap ( ) ;
1093+ KVStore :: write ( & store, "ns_b" , "sub" , "key_3" , vec ! [ 3u8 ; 8 ] ) . await . unwrap ( ) ;
1094+
1095+ let response = PaginatedKVStore :: list_paginated ( & store, "ns_a" , "sub" , None ) . await . unwrap ( ) ;
1096+ assert_eq ! ( response. keys. len( ) , 2 ) ;
1097+ assert ! ( response. keys. contains( & "key_1" . to_string( ) ) ) ;
1098+ assert ! ( response. keys. contains( & "key_2" . to_string( ) ) ) ;
1099+
1100+ let response = PaginatedKVStore :: list_paginated ( & store, "ns_b" , "sub" , None ) . await . unwrap ( ) ;
1101+ assert_eq ! ( response. keys. len( ) , 1 ) ;
1102+ assert ! ( response. keys. contains( & "key_3" . to_string( ) ) ) ;
1103+ }
9661104}
0 commit comments