Skip to content

Commit 7537b62

Browse files
committed
prefactor: Refactor read_payments to be generic across other types
1 parent ba16c92 commit 7537b62

1 file changed

Lines changed: 33 additions & 29 deletions

File tree

src/io/utils.rs

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -223,21 +223,17 @@ where
223223
})
224224
}
225225

226-
/// Read previously persisted payments information from the store.
227-
pub(crate) async fn read_payments<L: Deref>(
228-
kv_store: &DynStore, logger: L,
229-
) -> Result<Vec<PaymentDetails>, std::io::Error>
226+
/// Generic helper to read persisted items from a KV store namespace.
227+
async fn read_objects_from_store<T, L: Deref>(
228+
kv_store: &DynStore, logger: L, primary_namespace: &str, secondary_namespace: &str,
229+
) -> Result<Vec<T>, std::io::Error>
230230
where
231+
T: Readable,
231232
L::Target: LdkLogger,
232233
{
233-
let mut res = Vec::new();
234+
let mut stored_keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?;
234235

235-
let mut stored_keys = KVStore::list(
236-
&*kv_store,
237-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
238-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
239-
)
240-
.await?;
236+
let mut res = Vec::with_capacity(stored_keys.len());
241237

242238
const BATCH_SIZE: usize = 50;
243239

@@ -246,52 +242,44 @@ where
246242
// Fill JoinSet with tasks if possible
247243
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
248244
if let Some(next_key) = stored_keys.pop() {
249-
let fut = KVStore::read(
250-
&*kv_store,
251-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
252-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
253-
&next_key,
254-
);
245+
let fut = KVStore::read(&*kv_store, primary_namespace, secondary_namespace, &next_key);
255246
set.spawn(fut);
256247
debug_assert!(set.len() <= BATCH_SIZE);
257248
}
258249
}
259250

251+
let type_name = std::any::type_name::<T>();
252+
260253
while let Some(read_res) = set.join_next().await {
261254
// Exit early if we get an IO error.
262255
let reader = read_res
263256
.map_err(|e| {
264-
log_error!(logger, "Failed to read PaymentDetails: {}", e);
257+
log_error!(logger, "Failed to read {type_name}: {e}");
265258
set.abort_all();
266259
e
267260
})?
268261
.map_err(|e| {
269-
log_error!(logger, "Failed to read PaymentDetails: {}", e);
262+
log_error!(logger, "Failed to read {type_name}: {e}");
270263
set.abort_all();
271264
e
272265
})?;
273266

274267
// Refill set for every finished future, if we still have something to do.
275268
if let Some(next_key) = stored_keys.pop() {
276-
let fut = KVStore::read(
277-
&*kv_store,
278-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
279-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
280-
&next_key,
281-
);
269+
let fut = KVStore::read(&*kv_store, primary_namespace, secondary_namespace, &next_key);
282270
set.spawn(fut);
283271
debug_assert!(set.len() <= BATCH_SIZE);
284272
}
285273

286274
// Handle result.
287-
let payment = PaymentDetails::read(&mut &*reader).map_err(|e| {
288-
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
275+
let item = T::read(&mut &*reader).map_err(|e| {
276+
log_error!(logger, "Failed to deserialize {type_name}: {e}");
289277
std::io::Error::new(
290278
std::io::ErrorKind::InvalidData,
291-
"Failed to deserialize PaymentDetails",
279+
format!("Failed to deserialize {type_name}"),
292280
)
293281
})?;
294-
res.push(payment);
282+
res.push(item);
295283
}
296284

297285
debug_assert!(set.is_empty());
@@ -300,6 +288,22 @@ where
300288
Ok(res)
301289
}
302290

291+
/// Read previously persisted payments information from the store.
292+
pub(crate) async fn read_payments<L: Deref>(
293+
kv_store: &DynStore, logger: L,
294+
) -> Result<Vec<PaymentDetails>, std::io::Error>
295+
where
296+
L::Target: LdkLogger,
297+
{
298+
read_objects_from_store(
299+
kv_store,
300+
logger,
301+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
302+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
303+
)
304+
.await
305+
}
306+
303307
/// Read `OutputSweeper` state from the store.
304308
pub(crate) async fn read_output_sweeper(
305309
broadcaster: Arc<Broadcaster>, fee_estimator: Arc<OnchainFeeEstimator>,

0 commit comments

Comments
 (0)