Skip to content

Commit 61d7ec9

Browse files
committed
fix(rivetkit-core): decrement active actor metrics
1 parent b28585d commit 61d7ec9

4 files changed

Lines changed: 72 additions & 28 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1364,58 +1364,71 @@ impl ActorContext {
13641364
}
13651365

13661366
fn configure_sleep_hooks(&self) {
1367-
let keep_awake_ctx = self.clone();
1367+
let keep_awake_ctx = self.downgrade();
13681368
self.0
13691369
.sleep
13701370
.work
13711371
.keep_awake
13721372
.register_change_callback(Arc::new(move || {
1373-
keep_awake_ctx
1374-
.0
1375-
.metrics
1376-
.set_keep_awake_active(keep_awake_ctx.sleep_keep_awake_count());
1373+
if let Some(ctx) = ActorContext::from_weak(&keep_awake_ctx) {
1374+
ctx.0
1375+
.metrics
1376+
.set_keep_awake_active(ctx.sleep_keep_awake_count());
1377+
}
13771378
}));
13781379

1379-
let internal_keep_awake_metric_ctx = self.clone();
1380+
let internal_keep_awake_metric_ctx = self.downgrade();
13801381
self.0
13811382
.sleep
13821383
.work
13831384
.internal_keep_awake
13841385
.register_change_callback(Arc::new(move || {
1385-
internal_keep_awake_metric_ctx
1386-
.0
1387-
.metrics
1388-
.set_internal_keep_awake_active(
1389-
internal_keep_awake_metric_ctx.sleep_internal_keep_awake_count(),
1390-
);
1386+
if let Some(ctx) = ActorContext::from_weak(&internal_keep_awake_metric_ctx) {
1387+
ctx.0
1388+
.metrics
1389+
.set_internal_keep_awake_active(ctx.sleep_internal_keep_awake_count());
1390+
}
13911391
}));
13921392

1393-
let shutdown_tasks_ctx = self.clone();
1393+
let shutdown_tasks_ctx = self.downgrade();
13941394
self.0
13951395
.sleep
13961396
.work
13971397
.shutdown_counter
13981398
.register_change_callback(Arc::new(move || {
1399-
shutdown_tasks_ctx
1400-
.0
1401-
.metrics
1402-
.set_shutdown_tasks_active(shutdown_tasks_ctx.shutdown_task_count());
1399+
if let Some(ctx) = ActorContext::from_weak(&shutdown_tasks_ctx) {
1400+
ctx.0
1401+
.metrics
1402+
.set_shutdown_tasks_active(ctx.shutdown_task_count());
1403+
}
14031404
}));
14041405

1405-
let internal_keep_awake_ctx = self.clone();
1406+
let internal_keep_awake_ctx = self.downgrade();
14061407
self.set_internal_keep_awake(Some(Arc::new(move |future| {
1407-
let ctx = internal_keep_awake_ctx.clone();
1408-
Box::pin(async move { ctx.internal_keep_awake_task(future).await })
1408+
let ctx = ActorContext::from_weak(&internal_keep_awake_ctx);
1409+
Box::pin(async move {
1410+
let Some(ctx) = ctx else {
1411+
return Err(ActorRuntime::NotConfigured {
1412+
component: "actor context".to_owned(),
1413+
}
1414+
.build());
1415+
};
1416+
ctx.internal_keep_awake_task(future).await
1417+
})
14091418
})));
14101419

1411-
let queue_ctx = self.clone();
1420+
let queue_ctx = self.downgrade();
14121421
self.set_wait_activity_callback(Some(Arc::new(move || {
1413-
queue_ctx.reset_sleep_timer();
1422+
if let Some(ctx) = ActorContext::from_weak(&queue_ctx) {
1423+
ctx.reset_sleep_timer();
1424+
}
14141425
})));
14151426

1416-
let queue_ctx = self.clone();
1427+
let queue_ctx = self.downgrade();
14171428
self.set_inspector_update_callback(Some(Arc::new(move |queue_size| {
1418-
queue_ctx.record_queue_updated(queue_size);
1429+
if let Some(ctx) = ActorContext::from_weak(&queue_ctx) {
1430+
ctx.record_queue_updated(queue_size);
1431+
}
14191432
})));
14201433
}
14211434

rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::collections::BTreeMap;
22
use std::fmt;
33
use std::sync::{Arc, LazyLock};
4+
use std::sync::atomic::{AtomicBool, Ordering};
45
use std::time::Duration;
56

67
use parking_lot::Mutex;
@@ -34,6 +35,7 @@ pub(crate) struct ActorMetrics {
3435
struct ActorMetricInner {
3536
labels: ActorMetricLabels,
3637
state: Mutex<ActorMetricState>,
38+
active: AtomicBool,
3739
}
3840

3941
#[derive(Debug)]
@@ -643,6 +645,7 @@ impl ActorMetrics {
643645
inner: Arc::new(ActorMetricInner {
644646
labels,
645647
state: Mutex::new(ActorMetricState::default()),
648+
active: AtomicBool::new(true),
646649
}),
647650
}
648651
}
@@ -850,10 +853,24 @@ impl ActorMetrics {
850853
.with_label_values(&[labels[0], subsystem, operation])
851854
.inc();
852855
}
856+
857+
pub(crate) fn record_actor_stopped(&self) {
858+
self.inner.record_actor_stopped();
859+
}
853860
}
854861

855862
impl Drop for ActorMetricInner {
856863
fn drop(&mut self) {
864+
self.record_actor_stopped();
865+
}
866+
}
867+
868+
impl ActorMetricInner {
869+
fn record_actor_stopped(&self) {
870+
if !self.active.swap(false, Ordering::AcqRel) {
871+
return;
872+
}
873+
857874
self.clear_aggregated_gauges();
858875
let metrics = &*METRICS;
859876
metrics
@@ -865,9 +882,7 @@ impl Drop for ActorMetricInner {
865882
.with_label_values(&self.labels.as_label_values())
866883
.inc();
867884
}
868-
}
869885

870-
impl ActorMetricInner {
871886
fn clear_aggregated_gauges(&self) {
872887
let labels = self.labels.as_label_values();
873888
let mut state = self.state.lock();

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,11 +387,13 @@ impl ActorTask {
387387
Arc::clone(&inspector_attach_count),
388388
inspector_overlay_tx.clone(),
389389
);
390-
let inspector_ctx = ctx.clone();
390+
let inspector_ctx = ctx.downgrade();
391391
let inspector_attach_count_for_hook = Arc::clone(&inspector_attach_count);
392392
ctx.on_request_save(Box::new(move |_opts| {
393393
if inspector_attach_count_for_hook.load(Ordering::SeqCst) > 0 {
394-
inspector_ctx.notify_inspector_serialize_requested();
394+
if let Some(ctx) = ActorContext::from_weak(&inspector_ctx) {
395+
ctx.notify_inspector_serialize_requested();
396+
}
395397
}
396398
}));
397399
Self {
@@ -444,6 +446,7 @@ impl ActorTask {
444446
let exit = self.run_live().await;
445447
let LiveExit::Shutdown { reason } = exit else {
446448
self.record_inbox_depths();
449+
self.ctx.metrics().record_actor_stopped();
447450
return Ok(());
448451
};
449452

@@ -457,6 +460,7 @@ impl ActorTask {
457460
self.deliver_shutdown_reply(reason, &result);
458461
self.transition_to(LifecycleState::Terminated);
459462
self.record_inbox_depths();
463+
self.ctx.metrics().record_actor_stopped();
460464
result
461465
}
462466

rivetkit-rust/packages/rivetkit-core/tests/task.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,18 @@ mod moved_tests {
174174
)
175175
}
176176

177+
#[test]
178+
fn request_save_hook_does_not_retain_actor_context() {
179+
let ctx = ActorContext::new("actor-hook-drop", "task-hook-drop", Vec::new(), "local");
180+
let weak = ctx.downgrade();
181+
let task = new_task(ctx.clone());
182+
183+
drop(task);
184+
drop(ctx);
185+
186+
assert!(weak.upgrade().is_none());
187+
}
188+
177189
struct IdleEnvoyCallbacks;
178190

179191
impl EnvoyCallbacks for IdleEnvoyCallbacks {

0 commit comments

Comments
 (0)