Skip to content

Commit f91e367

Browse files
committed
Avoid heap-allocating background processor futures
Replace Box::pin with core::pin::pin! in process_events_async now that MSRV is 1.75. This eliminates a heap allocation per task on every loop iteration by pinning the futures directly to the stack. To satisfy lifetime and Joiner bounds, the loop logic was refactored to run synchronous timer checks first, using flags to conditionally execute the stack-pinned futures. Existing eager polling and early-break semantics are preserved.
1 parent 6ac2cf3 commit f91e367

1 file changed

Lines changed: 95 additions & 84 deletions

File tree

  • lightning-background-processor/src

lightning-background-processor/src/lib.rs

Lines changed: 95 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,21 +1120,12 @@ where
11201120
None => {},
11211121
}
11221122

1123-
// We capture pending_operation_count inside the persistence branch to
1124-
// avoid a race: ChannelManager handlers queue deferred monitor ops
1125-
// before the persistence flag is set. Capturing outside would let us
1126-
// observe pending ops while the flag is still unset, causing us to
1127-
// flush monitor writes without persisting the ChannelManager.
1128-
// Declared before futures so it outlives the Joiner (drop order).
1129-
let pending_monitor_writes;
1123+
let needs_cm_persist = channel_manager.get_cm().get_and_clear_needs_persistence();
1124+
let mut cm_fut = core::pin::pin!(async {
1125+
if needs_cm_persist {
1126+
// Capture the monitor operations pending before we persist the ChannelManager.
1127+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
11301128

1131-
let mut futures = Joiner::new();
1132-
1133-
if channel_manager.get_cm().get_and_clear_needs_persistence() {
1134-
pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
1135-
log_trace!(logger, "Persisting ChannelManager...");
1136-
1137-
let fut = async {
11381129
kv_store
11391130
.write(
11401131
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1147,22 +1138,24 @@ where
11471138
// Flush monitor operations that were pending before we persisted. New updates
11481139
// that arrived after are left for the next iteration.
11491140
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1150-
Ok(())
1151-
};
1152-
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1153-
let mut fut = Box::pin(fut);
1154-
1155-
// Because persisting the ChannelManager is important to avoid accidental
1156-
// force-closures, go ahead and poll the future once before we do slightly more
1157-
// CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping
1158-
// below. This will get it moving but won't block us for too long if the underlying
1159-
// future is actually async.
1141+
}
1142+
Ok(())
1143+
});
1144+
1145+
// Because persisting the ChannelManager is important to avoid accidental force-closures,
1146+
// go ahead and poll the future once before we do slightly more CPU-intensive tasks in the
1147+
// form of NetworkGraph pruning or scorer time-stepping below. This will get it moving but
1148+
// won't block us for too long if the underlying future is actually async. We stash the
1149+
// outcome and feed it into the `Joiner` once it is constructed.
1150+
let mut cm_persist_res = None;
1151+
if needs_cm_persist {
1152+
log_trace!(logger, "Persisting ChannelManager...");
1153+
11601154
use core::future::Future;
11611155
let mut waker = dummy_waker();
11621156
let mut ctx = task::Context::from_waker(&mut waker);
1163-
match core::pin::Pin::new(&mut fut).poll(&mut ctx) {
1164-
task::Poll::Ready(res) => futures.set_a_res(res),
1165-
task::Poll::Pending => futures.set_a(fut),
1157+
if let task::Poll::Ready(res) = cm_fut.as_mut().poll(&mut ctx) {
1158+
cm_persist_res = Some(res);
11661159
}
11671160

11681161
log_trace!(logger, "Done persisting ChannelManager.");
@@ -1210,7 +1203,8 @@ where
12101203
GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
12111204
_ => prune_timer_elapsed,
12121205
};
1213-
if should_prune {
1206+
1207+
let network_graph_to_persist = if should_prune {
12141208
// The network graph must not be pruned while rapid sync completion is pending
12151209
if let Some(network_graph) = gossip_sync.prunable_network_graph() {
12161210
if let Some(duration_since_epoch) = fetch_time() {
@@ -1222,28 +1216,15 @@ where
12221216
log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
12231217
log_trace!(logger, "Persisting network graph.");
12241218
}
1225-
let fut = async {
1226-
if let Err(e) = kv_store
1227-
.write(
1228-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1229-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1230-
NETWORK_GRAPH_PERSISTENCE_KEY,
1231-
network_graph.encode(),
1232-
)
1233-
.await
1234-
{
1235-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
1236-
}
1237-
1238-
Ok(())
1239-
};
1240-
1241-
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1242-
futures.set_b(Box::pin(fut));
12431219

12441220
have_pruned = true;
1221+
Some(network_graph)
1222+
} else {
1223+
None
12451224
}
1246-
}
1225+
} else {
1226+
None
1227+
};
12471228
if !have_decayed_scorer {
12481229
if let Some(ref scorer) = scorer {
12491230
if let Some(duration_since_epoch) = fetch_time() {
@@ -1253,7 +1234,9 @@ where
12531234
}
12541235
have_decayed_scorer = true;
12551236
}
1256-
match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
1237+
// Step the scorer forward synchronously here, deferring the actual write to the
1238+
// future built below.
1239+
let persist_scorer = match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
12571240
sleeper(SCORER_PERSIST_TIMER)
12581241
}) {
12591242
Some(false) => {
@@ -1264,7 +1247,44 @@ where
12641247
} else {
12651248
log_trace!(logger, "Persisting scorer");
12661249
}
1267-
let fut = async {
1250+
true
1251+
} else {
1252+
false
1253+
}
1254+
},
1255+
Some(true) => break,
1256+
None => false,
1257+
};
1258+
let persist_sweeper =
1259+
match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
1260+
Some(false) => {
1261+
log_trace!(logger, "Regenerating sweeper spends if necessary");
1262+
true
1263+
},
1264+
Some(true) => break,
1265+
None => false,
1266+
};
1267+
1268+
let network_graph_fut = core::pin::pin!(async {
1269+
if let Some(network_graph) = network_graph_to_persist {
1270+
if let Err(e) = kv_store
1271+
.write(
1272+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1273+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1274+
NETWORK_GRAPH_PERSISTENCE_KEY,
1275+
network_graph.encode(),
1276+
)
1277+
.await
1278+
{
1279+
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
1280+
}
1281+
}
1282+
Ok(())
1283+
});
1284+
let scorer_fut =
1285+
core::pin::pin!(async {
1286+
if persist_scorer {
1287+
if let Some(ref scorer) = scorer {
12681288
if let Err(e) = kv_store
12691289
.write(
12701290
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1274,43 +1294,22 @@ where
12741294
)
12751295
.await
12761296
{
1277-
log_error!(
1278-
logger,
1279-
"Error: Failed to persist scorer, check your disk and permissions {}",
1280-
e
1281-
);
1297+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
12821298
}
1283-
1284-
Ok(())
1285-
};
1286-
1287-
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1288-
futures.set_c(Box::pin(fut));
1299+
}
12891300
}
1290-
},
1291-
Some(true) => break,
1292-
None => {},
1293-
}
1294-
match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
1295-
Some(false) => {
1296-
log_trace!(logger, "Regenerating sweeper spends if necessary");
1301+
Ok(())
1302+
});
1303+
let sweeper_fut = core::pin::pin!(async {
1304+
if persist_sweeper {
12971305
if let Some(ref sweeper) = sweeper {
1298-
let fut = async {
1299-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1300-
1301-
Ok(())
1302-
};
1303-
1304-
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1305-
futures.set_d(Box::pin(fut));
1306+
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
13061307
}
1307-
},
1308-
Some(true) => break,
1309-
None => {},
1310-
}
1311-
1312-
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1313-
let fut = async {
1308+
}
1309+
Ok(())
1310+
});
1311+
let lm_fut = core::pin::pin!(async {
1312+
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
13141313
liquidity_manager
13151314
.get_lm()
13161315
.persist()
@@ -1324,9 +1323,21 @@ where
13241323
log_error!(logger, "Persisting LiquidityManager failed: {}", e);
13251324
e
13261325
})
1327-
};
1328-
futures.set_e(Box::pin(fut));
1326+
} else {
1327+
Ok(())
1328+
}
1329+
});
1330+
1331+
let mut futures = Joiner::new();
1332+
match cm_persist_res {
1333+
Some(res) => futures.set_a_res(res),
1334+
None if needs_cm_persist => futures.set_a(cm_fut),
1335+
None => {},
13291336
}
1337+
futures.set_b(network_graph_fut);
1338+
futures.set_c(scorer_fut);
1339+
futures.set_d(sweeper_fut);
1340+
futures.set_e(lm_fut);
13301341

13311342
// Run persistence tasks in parallel and exit if any of them returns an error.
13321343
for res in futures.await {

0 commit comments

Comments
 (0)