@@ -223,21 +223,17 @@ where
223223 } )
224224}
225225
226- /// Read previously persisted payments information from the store.
227- pub ( crate ) async fn read_payments < L : Deref > (
228- kv_store : & DynStore , logger : L ,
229- ) -> Result < Vec < PaymentDetails > , std:: io:: Error >
226+ /// Generic helper to read persisted items from a KV store namespace .
227+ async fn read_objects_from_store < T , L : Deref > (
228+ kv_store : & DynStore , logger : L , primary_namespace : & str , secondary_namespace : & str ,
229+ ) -> Result < Vec < T > , std:: io:: Error >
230230where
231+ T : Readable ,
231232 L :: Target : LdkLogger ,
232233{
233- let mut res = Vec :: new ( ) ;
234+ let mut stored_keys = KVStore :: list ( & * kv_store , primary_namespace , secondary_namespace ) . await ? ;
234235
235- let mut stored_keys = KVStore :: list (
236- & * kv_store,
237- PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE ,
238- PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE ,
239- )
240- . await ?;
236+ let mut res = Vec :: with_capacity ( stored_keys. len ( ) ) ;
241237
242238 const BATCH_SIZE : usize = 50 ;
243239
@@ -246,52 +242,44 @@ where
246242 // Fill JoinSet with tasks if possible
247243 while set. len ( ) < BATCH_SIZE && !stored_keys. is_empty ( ) {
248244 if let Some ( next_key) = stored_keys. pop ( ) {
249- let fut = KVStore :: read (
250- & * kv_store,
251- PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE ,
252- PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE ,
253- & next_key,
254- ) ;
245+ let fut = KVStore :: read ( & * kv_store, primary_namespace, secondary_namespace, & next_key) ;
255246 set. spawn ( fut) ;
256247 debug_assert ! ( set. len( ) <= BATCH_SIZE ) ;
257248 }
258249 }
259250
251+ let type_name = std:: any:: type_name :: < T > ( ) ;
252+
260253 while let Some ( read_res) = set. join_next ( ) . await {
261254 // Exit early if we get an IO error.
262255 let reader = read_res
263256 . map_err ( |e| {
264- log_error ! ( logger, "Failed to read PaymentDetails : {}" , e ) ;
257+ log_error ! ( logger, "Failed to read {type_name} : {e}" ) ;
265258 set. abort_all ( ) ;
266259 e
267260 } ) ?
268261 . map_err ( |e| {
269- log_error ! ( logger, "Failed to read PaymentDetails : {}" , e ) ;
262+ log_error ! ( logger, "Failed to read {type_name} : {e}" ) ;
270263 set. abort_all ( ) ;
271264 e
272265 } ) ?;
273266
274267 // Refill set for every finished future, if we still have something to do.
275268 if let Some ( next_key) = stored_keys. pop ( ) {
276- let fut = KVStore :: read (
277- & * kv_store,
278- PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE ,
279- PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE ,
280- & next_key,
281- ) ;
269+ let fut = KVStore :: read ( & * kv_store, primary_namespace, secondary_namespace, & next_key) ;
282270 set. spawn ( fut) ;
283271 debug_assert ! ( set. len( ) <= BATCH_SIZE ) ;
284272 }
285273
286274 // Handle result.
287- let payment = PaymentDetails :: read ( & mut & * reader) . map_err ( |e| {
288- log_error ! ( logger, "Failed to deserialize PaymentDetails : {}" , e ) ;
275+ let item = T :: read ( & mut & * reader) . map_err ( |e| {
276+ log_error ! ( logger, "Failed to deserialize {type_name} : {e}" ) ;
289277 std:: io:: Error :: new (
290278 std:: io:: ErrorKind :: InvalidData ,
291- "Failed to deserialize PaymentDetails" ,
279+ format ! ( "Failed to deserialize {type_name}" ) ,
292280 )
293281 } ) ?;
294- res. push ( payment ) ;
282+ res. push ( item ) ;
295283 }
296284
297285 debug_assert ! ( set. is_empty( ) ) ;
@@ -300,6 +288,22 @@ where
300288 Ok ( res)
301289}
302290
291+ /// Read previously persisted payments information from the store.
292+ pub ( crate ) async fn read_payments < L : Deref > (
293+ kv_store : & DynStore , logger : L ,
294+ ) -> Result < Vec < PaymentDetails > , std:: io:: Error >
295+ where
296+ L :: Target : LdkLogger ,
297+ {
298+ read_objects_from_store (
299+ kv_store,
300+ logger,
301+ PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE ,
302+ PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE ,
303+ )
304+ . await
305+ }
306+
303307/// Read `OutputSweeper` state from the store.
304308pub ( crate ) async fn read_output_sweeper (
305309 broadcaster : Arc < Broadcaster > , fee_estimator : Arc < OnchainFeeEstimator > ,
0 commit comments