@@ -24,7 +24,7 @@ use bitcoin::Network;
2424use lightning:: impl_writeable_tlv_based_enum;
2525use lightning:: io:: { self , Error , ErrorKind } ;
2626use lightning:: sign:: { EntropySource as LdkEntropySource , RandomBytes } ;
27- use lightning:: util:: persist:: KVStore ;
27+ use lightning:: util:: persist:: { KVStore , PageToken , PaginatedKVStore , PaginatedListResponse } ;
2828use lightning:: util:: ser:: { Readable , Writeable } ;
2929use prost:: Message ;
3030use vss_client:: client:: VssClient ;
@@ -70,6 +70,8 @@ impl_writeable_tlv_based_enum!(VssSchemaVersion,
7070 ( 1 , V1 ) => { } ,
7171) ;
7272
73+ const PAGE_SIZE : i32 = 50 ;
74+
7375const VSS_HARDENED_CHILD_INDEX : u32 = 877 ;
7476const VSS_SIGS_AUTH_HARDENED_CHILD_INDEX : u32 = 139 ;
7577const VSS_SCHEMA_VERSION_KEY : & str = "vss_schema_version" ;
@@ -293,6 +295,32 @@ impl KVStore for VssStore {
293295 }
294296}
295297
298+ impl PaginatedKVStore for VssStore {
299+ fn list_paginated (
300+ & self , primary_namespace : & str , secondary_namespace : & str , page_token : Option < PageToken > ,
301+ ) -> impl Future < Output = Result < PaginatedListResponse , io:: Error > > + ' static + Send {
302+ let primary_namespace = primary_namespace. to_string ( ) ;
303+ let secondary_namespace = secondary_namespace. to_string ( ) ;
304+ let inner = Arc :: clone ( & self . inner ) ;
305+ let runtime = self . internal_runtime ( ) ;
306+ async move {
307+ let task = runtime. spawn ( async move {
308+ inner
309+ . list_paginated_internal (
310+ & inner. async_client ,
311+ primary_namespace,
312+ secondary_namespace,
313+ page_token,
314+ )
315+ . await
316+ } ) ;
317+ task. await . map_err ( |e| {
318+ io:: Error :: new ( io:: ErrorKind :: Other , format ! ( "VSS runtime task failed: {}" , e) )
319+ } ) ?
320+ }
321+ }
322+ }
323+
296324impl Drop for VssStore {
297325 fn drop ( & mut self ) {
298326 if let Some ( runtime) = self . internal_runtime . take ( ) {
@@ -391,35 +419,33 @@ impl VssStoreInner {
391419 }
392420 }
393421
394- async fn list_all_keys (
422+ async fn list_keys (
395423 & self , client : & VssClient < CustomRetryPolicy > , primary_namespace : & str ,
396- secondary_namespace : & str ,
397- ) -> io:: Result < Vec < String > > {
398- let mut page_token = None ;
399- let mut keys = vec ! [ ] ;
400- let key_prefix = self . build_obfuscated_prefix ( primary_namespace, secondary_namespace) ;
401- while page_token != Some ( "" . to_string ( ) ) {
402- let request = ListKeyVersionsRequest {
403- store_id : self . store_id . clone ( ) ,
404- key_prefix : Some ( key_prefix. clone ( ) ) ,
405- page_token,
406- page_size : None ,
407- } ;
424+ secondary_namespace : & str , key_prefix : String , page_token : Option < String > , page_size : Option < i32 > ,
425+ ) -> io:: Result < ( Vec < String > , Option < String > ) > {
426+ let request = ListKeyVersionsRequest {
427+ store_id : self . store_id . clone ( ) ,
428+ key_prefix : Some ( key_prefix) ,
429+ page_token,
430+ page_size,
431+ } ;
408432
409- let response = client. list_key_versions ( & request) . await . map_err ( |e| {
410- let msg = format ! (
411- "Failed to list keys in {}/{}: {}" ,
412- primary_namespace, secondary_namespace, e
413- ) ;
414- Error :: new ( ErrorKind :: Other , msg)
415- } ) ?;
433+ let response = client. list_key_versions ( & request) . await . map_err ( |e| {
434+ let msg = format ! (
435+ "Failed to list keys in {}/{}: {}" ,
436+ primary_namespace, secondary_namespace, e
437+ ) ;
438+ Error :: new ( ErrorKind :: Other , msg)
439+ } ) ?;
416440
417- for kv in response. key_versions {
418- keys. push ( self . extract_key ( & kv. key ) ?) ;
419- }
420- page_token = response. next_page_token ;
441+ let mut keys = Vec :: with_capacity ( response. key_versions . len ( ) ) ;
442+ for kv in response. key_versions {
443+ keys. push ( self . extract_key ( & kv. key ) ?) ;
421444 }
422- Ok ( keys)
445+
446+ // VSS may return an empty string instead of None to signal the last page.
447+ let next_page_token = response. next_page_token . filter ( |t| !t. is_empty ( ) ) ;
448+ Ok ( ( keys, next_page_token) )
423449 }
424450
425451 async fn read_internal (
@@ -543,20 +569,51 @@ impl VssStoreInner {
543569 ) -> io:: Result < Vec < String > > {
544570 check_namespace_key_validity ( & primary_namespace, & secondary_namespace, None , "list" ) ?;
545571
546- let keys = self
547- . list_all_keys ( client, & primary_namespace, & secondary_namespace)
548- . await
549- . map_err ( |e| {
550- let msg = format ! (
551- "Failed to retrieve keys in namespace: {}/{} : {}" ,
552- primary_namespace, secondary_namespace, e
553- ) ;
554- Error :: new ( ErrorKind :: Other , msg)
555- } ) ?;
556-
572+ let key_prefix = self . build_obfuscated_prefix ( & primary_namespace, & secondary_namespace) ;
573+ let mut page_token: Option < String > = None ;
574+ let mut keys = vec ! [ ] ;
575+ loop {
576+ let ( page_keys, next_page_token) = self
577+ . list_keys ( client, & primary_namespace, & secondary_namespace, key_prefix. clone ( ) , page_token, None )
578+ . await ?;
579+ keys. extend ( page_keys) ;
580+ match next_page_token {
581+ Some ( t) => page_token = Some ( t) ,
582+ None => break ,
583+ }
584+ }
557585 Ok ( keys)
558586 }
559587
588+ async fn list_paginated_internal (
589+ & self , client : & VssClient < CustomRetryPolicy > , primary_namespace : String ,
590+ secondary_namespace : String , page_token : Option < PageToken > ,
591+ ) -> io:: Result < PaginatedListResponse > {
592+ check_namespace_key_validity (
593+ & primary_namespace,
594+ & secondary_namespace,
595+ None ,
596+ "list_paginated" ,
597+ ) ?;
598+
599+ let key_prefix = self . build_obfuscated_prefix ( & primary_namespace, & secondary_namespace) ;
600+ let vss_page_token = page_token. map ( |t| t. to_string ( ) ) ;
601+ let ( keys, next_page_token) = self
602+ . list_keys (
603+ client,
604+ & primary_namespace,
605+ & secondary_namespace,
606+ key_prefix,
607+ vss_page_token,
608+ Some ( PAGE_SIZE ) ,
609+ )
610+ . await ?;
611+
612+ let next_page_token = next_page_token. map ( PageToken :: new) ;
613+
614+ Ok ( PaginatedListResponse { keys, next_page_token } )
615+ }
616+
560617 async fn execute_locked_write <
561618 F : Future < Output = Result < ( ) , lightning:: io:: Error > > ,
562619 FN : FnOnce ( ) -> F ,
@@ -626,6 +683,7 @@ fn retry_policy() -> CustomRetryPolicy {
626683 VssError :: NoSuchKeyError ( ..)
627684 | VssError :: InvalidRequestError ( ..)
628685 | VssError :: ConflictError ( ..)
686+ | VssError :: VSSVersionMismatchError { .. }
629687 )
630688 } ) as _ )
631689}
@@ -647,6 +705,12 @@ async fn determine_and_write_schema_version(
647705 // The value is not set.
648706 None
649707 } ,
708+ Err ( VssError :: VSSVersionMismatchError { version_served, version_expected } ) => {
709+ let msg = format ! (
710+ "VSS version mismatch, expected: {version_expected}, got: {version_served:?}"
711+ ) ;
712+ return Err ( Error :: new ( ErrorKind :: Other , msg) ) ;
713+ } ,
650714 Err ( e) => {
651715 let msg = format ! ( "Failed to read schema version: {}" , e) ;
652716 return Err ( Error :: new ( ErrorKind :: Other , msg) ) ;
@@ -941,35 +1005,109 @@ mod tests {
9411005 use super :: * ;
9421006 use crate :: io:: test_utils:: do_read_write_remove_list_persist;
9431007
944- #[ tokio:: test]
945- async fn vss_read_write_remove_list_persist ( ) {
1008+ fn build_vss_store ( ) -> VssStore {
9461009 let vss_base_url = std:: env:: var ( "TEST_VSS_BASE_URL" ) . unwrap ( ) ;
9471010 let mut rng = rng ( ) ;
9481011 let rand_store_id: String = ( 0 ..7 ) . map ( |_| rng. sample ( Alphanumeric ) as char ) . collect ( ) ;
9491012 let mut node_seed = [ 0u8 ; 64 ] ;
9501013 rng. fill_bytes ( & mut node_seed) ;
9511014 let entropy = NodeEntropy :: from_seed_bytes ( node_seed) ;
952- let vss_store =
953- VssStoreBuilder :: new ( entropy, vss_base_url, rand_store_id, Network :: Testnet )
954- . build_with_sigs_auth ( HashMap :: new ( ) )
955- . unwrap ( ) ;
1015+ VssStoreBuilder :: new ( entropy, vss_base_url, rand_store_id, Network :: Testnet )
1016+ . build_with_sigs_auth ( HashMap :: new ( ) )
1017+ . unwrap ( )
1018+ }
1019+
1020+ #[ tokio:: test]
1021+ async fn vss_read_write_remove_list_persist ( ) {
1022+ let vss_store = build_vss_store ( ) ;
9561023 do_read_write_remove_list_persist ( & vss_store) . await ;
9571024 }
9581025
9591026 #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
9601027 async fn vss_read_write_remove_list_persist_in_runtime_context ( ) {
961- let vss_base_url = std:: env:: var ( "TEST_VSS_BASE_URL" ) . unwrap ( ) ;
962- let mut rng = rng ( ) ;
963- let rand_store_id: String = ( 0 ..7 ) . map ( |_| rng. sample ( Alphanumeric ) as char ) . collect ( ) ;
964- let mut node_seed = [ 0u8 ; 64 ] ;
965- rng. fill_bytes ( & mut node_seed) ;
966- let entropy = NodeEntropy :: from_seed_bytes ( node_seed) ;
967- let vss_store =
968- VssStoreBuilder :: new ( entropy, vss_base_url, rand_store_id, Network :: Testnet )
969- . build_with_sigs_auth ( HashMap :: new ( ) )
970- . unwrap ( ) ;
971-
1028+ let vss_store = build_vss_store ( ) ;
9721029 do_read_write_remove_list_persist ( & vss_store) . await ;
9731030 drop ( vss_store)
9741031 }
1032+
1033+ #[ tokio:: test]
1034+ async fn vss_paginated_listing ( ) {
1035+ let store = build_vss_store ( ) ;
1036+ let ns = "test_paginated" ;
1037+ let sub = "listing" ;
1038+ let num_entries = 5 ;
1039+
1040+ for i in 0 ..num_entries {
1041+ let key = format ! ( "key_{:04}" , i) ;
1042+ let data = vec ! [ i as u8 ; 32 ] ;
1043+ KVStore :: write ( & store, ns, sub, & key, data) . await . unwrap ( ) ;
1044+ }
1045+
1046+ let mut all_keys = Vec :: new ( ) ;
1047+ let mut page_token = None ;
1048+
1049+ loop {
1050+ let response =
1051+ PaginatedKVStore :: list_paginated ( & store, ns, sub, page_token) . await . unwrap ( ) ;
1052+ all_keys. extend ( response. keys ) ;
1053+ match response. next_page_token {
1054+ Some ( token) => page_token = Some ( token) ,
1055+ _ => break ,
1056+ }
1057+ }
1058+
1059+ assert_eq ! ( all_keys. len( ) , num_entries) ;
1060+
1061+ // Verify no duplicates
1062+ let mut unique = all_keys. clone ( ) ;
1063+ unique. sort ( ) ;
1064+ unique. dedup ( ) ;
1065+ assert_eq ! ( unique. len( ) , num_entries) ;
1066+ }
1067+
1068+ #[ tokio:: test]
1069+ async fn vss_paginated_empty_namespace ( ) {
1070+ let store = build_vss_store ( ) ;
1071+ let response =
1072+ PaginatedKVStore :: list_paginated ( & store, "nonexistent" , "ns" , None ) . await . unwrap ( ) ;
1073+ assert ! ( response. keys. is_empty( ) ) ;
1074+ assert ! ( response. next_page_token. is_none( ) ) ;
1075+ }
1076+
1077+ #[ tokio:: test]
1078+ async fn vss_paginated_removal ( ) {
1079+ let store = build_vss_store ( ) ;
1080+ let ns = "test_paginated" ;
1081+ let sub = "removal" ;
1082+
1083+ KVStore :: write ( & store, ns, sub, "a" , vec ! [ 1u8 ; 8 ] ) . await . unwrap ( ) ;
1084+ KVStore :: write ( & store, ns, sub, "b" , vec ! [ 2u8 ; 8 ] ) . await . unwrap ( ) ;
1085+ KVStore :: write ( & store, ns, sub, "c" , vec ! [ 3u8 ; 8 ] ) . await . unwrap ( ) ;
1086+
1087+ KVStore :: remove ( & store, ns, sub, "b" , false ) . await . unwrap ( ) ;
1088+
1089+ let response = PaginatedKVStore :: list_paginated ( & store, ns, sub, None ) . await . unwrap ( ) ;
1090+ assert_eq ! ( response. keys. len( ) , 2 ) ;
1091+ assert ! ( response. keys. contains( & "a" . to_string( ) ) ) ;
1092+ assert ! ( !response. keys. contains( & "b" . to_string( ) ) ) ;
1093+ assert ! ( response. keys. contains( & "c" . to_string( ) ) ) ;
1094+ }
1095+
1096+ #[ tokio:: test]
1097+ async fn vss_paginated_namespace_isolation ( ) {
1098+ let store = build_vss_store ( ) ;
1099+
1100+ KVStore :: write ( & store, "ns_a" , "sub" , "key_1" , vec ! [ 1u8 ; 8 ] ) . await . unwrap ( ) ;
1101+ KVStore :: write ( & store, "ns_a" , "sub" , "key_2" , vec ! [ 2u8 ; 8 ] ) . await . unwrap ( ) ;
1102+ KVStore :: write ( & store, "ns_b" , "sub" , "key_3" , vec ! [ 3u8 ; 8 ] ) . await . unwrap ( ) ;
1103+
1104+ let response = PaginatedKVStore :: list_paginated ( & store, "ns_a" , "sub" , None ) . await . unwrap ( ) ;
1105+ assert_eq ! ( response. keys. len( ) , 2 ) ;
1106+ assert ! ( response. keys. contains( & "key_1" . to_string( ) ) ) ;
1107+ assert ! ( response. keys. contains( & "key_2" . to_string( ) ) ) ;
1108+
1109+ let response = PaginatedKVStore :: list_paginated ( & store, "ns_b" , "sub" , None ) . await . unwrap ( ) ;
1110+ assert_eq ! ( response. keys. len( ) , 1 ) ;
1111+ assert ! ( response. keys. contains( & "key_3" . to_string( ) ) ) ;
1112+ }
9751113}
0 commit comments