Skip to content

Commit b4f985d

Browse files
authored
fix(actor): drain Actor::tasks JoinSet in run_async (#96)
## Description `Actor::tasks` is a `JoinSet<()>` that the three streaming-reply actions (`Action::ListAuthors`, `Action::ListReplicas`, `ReplicaAction::GetMany`) push tasks into via `spawn_local`, but `run_async`'s `tokio::select!` had no `join_next` arm. Completed task headers accumulate for the lifetime of the actor and only get released when `abort_all()` runs at shutdown. The fix adds the missing arm: ```rust Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => { if let Err(err) = res { if !err.is_cancelled() { warn!(?err, "actor reply-streamer task panicked"); } } continue; } ``` This matches the canonical drain pattern used everywhere else in the crate: - `LiveActor::run_inner` drains three JoinSets in the same `select!`: `running_sync_connect`, `running_sync_accept`, and `download_tasks` (`src/engine/live.rs`). - `GossipActor::progress` drains `active_tasks` with the same `is_cancelled()` skip on `JoinError` (`src/engine/gossip.rs`). `Actor::tasks` was the only `JoinSet` in the crate not following the pattern. There is also a pre-existing TODO in `src/engine/live.rs` that calls this out: ``` // TODO: abort_all and join_next all JoinSets to catch panics ``` The three `spawn_local` sites are kept as-is. `iter_to_irpc` is a sync-iter → async-mpsc bridge that can stall on a slow consumer, so inlining it would block the action queue (notably the `MAX_COMMIT_DELAY` timeout flush). The fix lives entirely in the `select!`. A regression test is added next to the existing `open_close` test in `src/actor.rs::tests`. It fires 1000 calls of each streaming-reply shape (3000 streamer tasks total), drains every reply stream to completion, then asserts that `Actor::tasks` shrinks back to a small bound. Without the new arm the residual is ~3000; with it the count drops to near zero within tens of milliseconds. A `#[cfg(test)]`-gated `Action::DebugTasksLen` and `SyncHandle::debug_tasks_len` provide the introspection. ## Breaking Changes None. ## Notes & open questions The test introspection is `#[cfg(test)]`-only. Happy to instead expose a public `wait_idle()`-style helper if you'd prefer that shape. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented.
1 parent 1f88013 commit b4f985d

1 file changed

Lines changed: 79 additions & 0 deletions

File tree

src/actor.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ enum Action {
8989
#[debug("reply")]
9090
reply: Option<oneshot::Sender<Store>>,
9191
},
92+
#[cfg(test)]
93+
#[display("DebugTasksLen")]
94+
DebugTasksLen {
95+
#[debug("reply")]
96+
reply: oneshot::Sender<usize>,
97+
},
9298
}
9399

94100
#[derive(derive_more::Debug, strum::Display)]
@@ -514,6 +520,13 @@ impl SyncHandle {
514520
Ok(store)
515521
}
516522

523+
#[cfg(test)]
524+
async fn debug_tasks_len(&self) -> Result<usize> {
525+
let (reply, rx) = oneshot::channel();
526+
self.send(Action::DebugTasksLen { reply }).await?;
527+
Ok(rx.await?)
528+
}
529+
517530
pub async fn list_authors(
518531
&self,
519532
reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
@@ -661,6 +674,14 @@ impl Actor {
661674
}
662675
continue;
663676
}
677+
Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
678+
if let Err(err) = res {
679+
if !err.is_cancelled() {
680+
warn!(?err, "actor reply-streamer task panicked");
681+
}
682+
}
683+
continue;
684+
}
664685
action = self.action_rx.recv() => {
665686
match action {
666687
Ok(action) => action,
@@ -702,6 +723,8 @@ impl Actor {
702723
Action::Shutdown { .. } => {
703724
unreachable!("Shutdown is handled in run()")
704725
}
726+
#[cfg(test)]
727+
Action::DebugTasksLen { reply } => send_reply(reply, self.tasks.len()),
705728
Action::ImportAuthor { author, reply } => {
706729
let id = author.id();
707730
send_reply(reply, self.store.import_author(author).map(|_| id))
@@ -1112,4 +1135,60 @@ mod tests {
11121135
assert!(rx.recv().await.is_err());
11131136
Ok(())
11141137
}
1138+
1139+
/// Tests that streamer tasks spawned into `Actor.tasks` are reaped
1140+
/// once they complete.
1141+
///
1142+
/// The three streaming actions (`ListAuthors`, `ListReplicas`, and
1143+
/// `ReplicaAction::GetMany`) each `spawn_local` a task into
1144+
/// `Actor.tasks` to drive their reply channel. The actor must
1145+
/// `join_next` those tasks once they finish, otherwise the
1146+
/// `JoinSet` grows without bound for the lifetime of the actor.
1147+
#[tokio::test]
1148+
async fn actor_tasks_joinset_drain() -> anyhow::Result<()> {
1149+
let store = store::Store::memory();
1150+
let sync = SyncHandle::spawn(store, None, "drain".into());
1151+
1152+
let namespace = NamespaceSecret::new(&mut rand::rng());
1153+
let id = namespace.id();
1154+
sync.import_namespace(namespace.into()).await?;
1155+
sync.open(id, Default::default()).await?;
1156+
1157+
const ITERATIONS: usize = 1000;
1158+
1159+
for _ in 0..ITERATIONS {
1160+
let (tx, mut rx) = mpsc::channel(64);
1161+
sync.list_authors(tx).await?;
1162+
while rx.recv().await?.is_some() {}
1163+
}
1164+
1165+
for _ in 0..ITERATIONS {
1166+
let (tx, mut rx) = mpsc::channel(64);
1167+
sync.list_replicas(tx).await?;
1168+
while rx.recv().await?.is_some() {}
1169+
}
1170+
1171+
for _ in 0..ITERATIONS {
1172+
let (tx, mut rx) = mpsc::channel(64);
1173+
sync.get_many(id, store::Query::all().into(), tx).await?;
1174+
while rx.recv().await?.is_some() {}
1175+
}
1176+
1177+
let mut last = sync.debug_tasks_len().await?;
1178+
let deadline = std::time::Instant::now() + Duration::from_secs(10);
1179+
while last > 16 && std::time::Instant::now() < deadline {
1180+
tokio::time::sleep(Duration::from_millis(50)).await;
1181+
last = sync.debug_tasks_len().await?;
1182+
}
1183+
1184+
assert!(
1185+
last <= 16,
1186+
"residual Actor.tasks JoinSet len = {last}, expected <= 16 \
1187+
(was the join_next arm in run_async lost? streamer tasks \
1188+
for ListAuthors / ListReplicas / GetMany are not being reaped)"
1189+
);
1190+
1191+
sync.close(id).await?;
1192+
Ok(())
1193+
}
11151194
}

0 commit comments

Comments
 (0)