Skip to content

Commit 94aace7

Browse files
committed
feat: US-012 - Bound register_task shutdown drain against shutdown deadline
1 parent 97f85c9 commit 94aace7

5 files changed

Lines changed: 102 additions & 15 deletions

File tree

rivetkit-rust/packages/rivetkit-core/CLAUDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
- Any mutation that changes a `can_sleep` input must call `ActorContext::reset_sleep_timer()` so the `ActorTask` sleep deadline is re-evaluated. Inputs are: `ready`/`started`, `no_sleep`, `active_http_request_count`, `sleep_keep_awake_count`, `sleep_internal_keep_awake_count`, `pending_disconnect_count`, `conns()`, and `websocket_callback_count`. Missing this call leaves the sleep timer armed against stale state and triggers the `"sleep idle deadline elapsed but actor stayed awake"` warning on the next tick.
1111
- `ActorContext::set_prevent_sleep(...)` / `prevent_sleep()` are deprecated no-ops kept for NAPI bridge compatibility. Use `keep_awake(future)` (holds counter while awaited) or `wait_until(future)` (tracked shutdown task) instead. Do not reintroduce a `prevent_sleep` field, a `CanSleep::PreventSleep` variant, or branches that read it.
12-
- Runtime-owned promises that must drain during shutdown should use `ActorContext::register_task(...)`, not public `wait_until(...)`, so metrics and runtime intent stay distinct.
12+
- Runtime-owned promises that must drain during shutdown should use `ActorContext::register_task(...)`, not public `wait_until(...)`, so metrics and runtime intent stay distinct. Registered tasks must race user work against `shutdown_deadline_token()` so shutdown cannot hang forever.
1313
- `ctx.sleep()` and `ctx.destroy()` return `Result<()>`. They error with `ActorLifecycleError::Starting` when called before startup completes and `ActorLifecycleError::Stopping` if the requested flag has already been set this generation (atomic `swap(true, ...)`). Internal idle-timer paths log and suppress the already-requested error.
1414
- The grace deadline path (`on_sleep_grace_deadline`) aborts the user `run` handle and cancels `shutdown_deadline_token()`. Foreign-runtime adapters running `onSleep` / `onDestroy` must observe that token via `tokio::select!` so SQLite teardown does not race user cleanup work.
1515
- Counter `register_zero_notify(&idle_notify)` hooks only drive shutdown drain waits. They are not a substitute for the activity-dirty notification, so any new sleep-affecting counter must also notify on transitions that change `can_sleep`.

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -556,10 +556,7 @@ impl ActorContext {
556556
pub fn register_task(&self, future: impl Future<Output = ()> + Send + 'static) {
557557
let ctx = self.clone();
558558
self.track_shutdown_task(async move {
559-
ctx.record_user_task_started(UserTaskKind::RegisteredTask);
560-
let started_at = Instant::now();
561-
future.await;
562-
ctx.record_user_task_finished(UserTaskKind::RegisteredTask, started_at.elapsed());
559+
Self::run_registered_task(ctx, future).await;
563560
});
564561
}
565562

@@ -583,13 +580,30 @@ impl ActorContext {
583580
pub fn register_task(&self, future: impl Future<Output = ()> + 'static) {
584581
let ctx = self.clone();
585582
self.track_shutdown_task(async move {
586-
ctx.record_user_task_started(UserTaskKind::RegisteredTask);
587-
let started_at = Instant::now();
588-
future.await;
589-
ctx.record_user_task_finished(UserTaskKind::RegisteredTask, started_at.elapsed());
583+
Self::run_registered_task(ctx, future).await;
590584
});
591585
}
592586

587+
async fn run_registered_task<F>(ctx: ActorContext, future: F)
588+
where
589+
F: Future<Output = ()>,
590+
{
591+
let shutdown_deadline = ctx.shutdown_deadline_token();
592+
ctx.record_user_task_started(UserTaskKind::RegisteredTask);
593+
let started_at = Instant::now();
594+
tokio::select! {
595+
_ = future => {}
596+
_ = shutdown_deadline.cancelled() => {
597+
tracing::warn!(
598+
actor_id = %ctx.actor_id(),
599+
reason = "shutdown_deadline_elapsed",
600+
"registered task cancelled by shutdown deadline"
601+
);
602+
}
603+
}
604+
ctx.record_user_task_finished(UserTaskKind::RegisteredTask, started_at.elapsed());
605+
}
606+
593607
pub async fn keep_awake<F>(&self, future: F) -> F::Output
594608
where
595609
F: Future,

rivetkit-rust/packages/rivetkit-core/tests/sleep.rs

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,27 @@ mod moved_tests {
1919
#[derive(Default)]
2020
struct MessageVisitor {
2121
message: Option<String>,
22+
actor_id: Option<String>,
23+
reason: Option<String>,
2224
}
2325

2426
impl Visit for MessageVisitor {
2527
fn record_str(&mut self, field: &Field, value: &str) {
26-
if field.name() == "message" {
27-
self.message = Some(value.to_owned());
28+
match field.name() {
29+
"message" => self.message = Some(value.to_owned()),
30+
"actor_id" => self.actor_id = Some(value.to_owned()),
31+
"reason" => self.reason = Some(value.to_owned()),
32+
_ => {}
2833
}
2934
}
3035

3136
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
32-
if field.name() == "message" {
33-
self.message = Some(format!("{value:?}").trim_matches('"').to_owned());
37+
let value = format!("{value:?}").trim_matches('"').to_owned();
38+
match field.name() {
39+
"message" => self.message = Some(value),
40+
"actor_id" => self.actor_id = Some(value),
41+
"reason" => self.reason = Some(value),
42+
_ => {}
3443
}
3544
}
3645
}
@@ -40,6 +49,11 @@ mod moved_tests {
4049
count: Arc<AtomicUsize>,
4150
}
4251

52+
#[derive(Clone)]
53+
struct RegisteredTaskDeadlineLayer {
54+
count: Arc<AtomicUsize>,
55+
}
56+
4357
impl<S> Layer<S> for ShutdownTaskRefusedLayer
4458
where
4559
S: Subscriber,
@@ -59,6 +73,27 @@ mod moved_tests {
5973
}
6074
}
6175

76+
impl<S> Layer<S> for RegisteredTaskDeadlineLayer
77+
where
78+
S: Subscriber,
79+
{
80+
fn on_event(&self, event: &Event<'_>, _ctx: LayerContext<'_, S>) {
81+
if *event.metadata().level() != tracing::Level::WARN {
82+
return;
83+
}
84+
85+
let mut visitor = MessageVisitor::default();
86+
event.record(&mut visitor);
87+
if visitor.message.as_deref()
88+
== Some("registered task cancelled by shutdown deadline")
89+
&& visitor.actor_id.as_deref() == Some("actor-register-task-deadline")
90+
&& visitor.reason.as_deref() == Some("shutdown_deadline_elapsed")
91+
{
92+
self.count.fetch_add(1, Ordering::SeqCst);
93+
}
94+
}
95+
}
96+
6297
struct NotifyOnDrop(DropMutex<Option<oneshot::Sender<()>>>);
6398

6499
impl NotifyOnDrop {
@@ -162,6 +197,33 @@ mod moved_tests {
162197
assert_eq!(warning_count.load(Ordering::SeqCst), 1);
163198
}
164199

200+
#[tokio::test(start_paused = true)]
201+
async fn register_task_exits_when_shutdown_deadline_cancels() {
202+
let ctx = ActorContext::new_for_sleep_tests("actor-register-task-deadline");
203+
let warning_count = Arc::new(AtomicUsize::new(0));
204+
let subscriber = Registry::default().with(RegisteredTaskDeadlineLayer {
205+
count: warning_count.clone(),
206+
});
207+
let _guard = tracing::subscriber::set_default(subscriber);
208+
209+
ctx.register_task(futures::future::pending::<()>());
210+
assert_eq!(ctx.shutdown_task_count(), 1);
211+
212+
ctx.cancel_shutdown_deadline();
213+
214+
assert!(
215+
ctx.0
216+
.sleep
217+
.work
218+
.shutdown_counter
219+
.wait_zero(Instant::now() + Duration::from_millis(1))
220+
.await,
221+
"registered task should stop waiting after the shutdown deadline"
222+
);
223+
assert_eq!(ctx.shutdown_task_count(), 0);
224+
assert_eq!(warning_count.load(Ordering::SeqCst), 1);
225+
}
226+
165227
#[tokio::test(start_paused = true)]
166228
async fn sleep_then_destroy_signal_tasks_do_not_leak_after_teardown() {
167229
let ctx = ActorContext::new_for_sleep_tests("actor-sleep-destroy");

scripts/ralph/prd.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,8 @@
195195
"Tests pass"
196196
],
197197
"priority": 12,
198-
"passes": false,
199-
"notes": ""
198+
"passes": true,
199+
"notes": "Core register_task now races registered futures against the shutdown deadline token for both native and wasm cfgs. A focused core test verifies a never-completing task drains after deadline cancellation and logs the stable reason."
200200
},
201201
{
202202
"id": "US-013",

scripts/ralph/progress.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
- NAPI actor-event synthetic `RivetErrorSchema` values should be `static` when fields are compile-time constants, or process-interned by `(group, code)` with `scc::HashMap` when the default message is dynamic.
2020
- Wasm websocket callback regions should be tracked in a map keyed by monotonic region IDs and removed on end so callback churn does not retain empty slots.
2121
- NAPI `Ref::unref` needs an `Env`; cleanup from worker or `Drop` paths should be routed through an Env-bearing TSF and must tolerate addon shutdown by falling back to a bounded leak.
22+
- Core `ActorContext::register_task(...)` must race registered runtime promises against `shutdown_deadline_token()` so shutdown drain cannot hang forever.
2223

2324
Started: Sat May 2 02:13:32 AM PDT 2026
2425
---
@@ -133,3 +134,13 @@ Started: Sat May 2 02:13:32 AM PDT 2026
133134
- Use unique `(group, code)` namespaces when asserting `BRIDGE_RIVET_ERROR_SCHEMAS.len()` deltas because the intern map is process-global.
134135
- Schema defaults are only the first-seen fallback for a `(group, code)`; callers should assert live bridged messages through `RivetTransportError.message()`.
135136
---
137+
## 2026-05-02 03:30:40 PDT - US-012
138+
- Implemented bounded shutdown drain for core `ActorContext::register_task(...)` by racing registered futures against `shutdown_deadline_token()` in one shared native/wasm helper.
139+
- Added a core sleep regression test that registers a never-completing future, cancels the shutdown deadline, verifies the shutdown counter drains, and checks the warning includes `actor_id` plus `reason = "shutdown_deadline_elapsed"`.
140+
- Files changed: `rivetkit-rust/packages/rivetkit-core/src/actor/context.rs`, `rivetkit-rust/packages/rivetkit-core/tests/sleep.rs`, `rivetkit-rust/packages/rivetkit-core/AGENTS.md`, `scripts/ralph/prd.json`, `scripts/ralph/progress.txt`.
141+
- Checks: `cargo test -p rivetkit-core register_task_exits_when_shutdown_deadline_cancels`, `cargo check -p rivetkit-core`, `scripts/cargo/check-rivetkit-core-wasm.sh`, `cargo test -p rivetkit-core actor_task_logs_lifecycle_dispatch_and_actor_event_flow`.
142+
- Full `cargo test -p rivetkit-core` was stopped after it hung in the existing `save_tick_cancels_pending_inspector_deadline_and_broadcasts_overlay` test; that test also hung when run in isolation.
143+
- **Learnings for future iterations:**
144+
- Keep native and wasm `register_task(...)` behavior in a shared helper where possible so the two cfg-specific public signatures cannot diverge.
145+
- The package-wide core test suite currently has an unrelated hanging inspector debounce test, so use focused tests plus `check-rivetkit-core-wasm.sh` for this shutdown path.
146+
---

0 commit comments

Comments
 (0)