Skip to content

Commit 3388aec

Browse files
committed
fix(rivetkit): use unbounded actor channels
1 parent 5fddabc commit 3388aec

21 files changed

Lines changed: 171 additions & 467 deletions

File tree

rivetkit-rust/packages/client/src/drivers/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,19 @@ pub enum DriverStopReason {
3030
#[derive(Debug)]
3131
pub struct DriverHandle {
3232
abort_handle: AbortHandle,
33-
sender: mpsc::Sender<MessageToServer>,
33+
sender: mpsc::UnboundedSender<MessageToServer>,
3434
}
3535

3636
impl DriverHandle {
37-
pub fn new(sender: mpsc::Sender<MessageToServer>, abort_handle: AbortHandle) -> Self {
37+
pub fn new(sender: mpsc::UnboundedSender<MessageToServer>, abort_handle: AbortHandle) -> Self {
3838
Self {
3939
sender,
4040
abort_handle,
4141
}
4242
}
4343

4444
pub async fn send(&self, msg: Arc<to_server::ToServer>) -> Result<()> {
45-
self.sender.send(msg).await?;
45+
self.sender.send(msg)?;
4646

4747
Ok(())
4848
}
@@ -61,7 +61,7 @@ impl Drop for DriverHandle {
6161

6262
pub type DriverConnection = (
6363
DriverHandle,
64-
mpsc::Receiver<MessageToClient>,
64+
mpsc::UnboundedReceiver<MessageToClient>,
6565
JoinHandle<DriverStopReason>,
6666
);
6767

rivetkit-rust/packages/client/src/drivers/ws.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ pub(crate) async fn connect(args: DriverConnectArgs) -> Result<DriverConnection>
3737
.await
3838
.context("Failed to connect to WebSocket via gateway")?;
3939

40-
let (in_tx, in_rx) = mpsc::channel::<MessageToClient>(32);
41-
let (out_tx, out_rx) = mpsc::channel::<MessageToServer>(32);
40+
let (in_tx, in_rx) = mpsc::unbounded_channel::<MessageToClient>();
41+
let (out_tx, out_rx) = mpsc::unbounded_channel::<MessageToServer>();
4242

4343
let task = tokio::spawn(start(ws, args.encoding_kind, in_tx, out_rx));
4444
let handle = DriverHandle::new(out_tx, task.abort_handle());
@@ -51,8 +51,8 @@ async fn start(
5151
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
5252
>,
5353
encoding_kind: EncodingKind,
54-
in_tx: mpsc::Sender<MessageToClient>,
55-
mut out_rx: mpsc::Receiver<MessageToServer>,
54+
in_tx: mpsc::UnboundedSender<MessageToClient>,
55+
mut out_rx: mpsc::UnboundedReceiver<MessageToServer>,
5656
) -> DriverStopReason {
5757
let (mut ws_sink, mut ws_stream) = ws.split();
5858

@@ -85,7 +85,7 @@ async fn start(
8585
// Handle ws incoming
8686
msg = ws_stream.next() => {
8787
let Some(msg) = msg else {
88-
println!("Receiver dropped");
88+
debug!("Receiver dropped");
8989
return DriverStopReason::ServerDisconnect;
9090
};
9191

@@ -97,7 +97,7 @@ async fn start(
9797
continue;
9898
};
9999

100-
if let Err(e) = in_tx.send(Arc::new(msg)).await {
100+
if let Err(e) = in_tx.send(Arc::new(msg)) {
101101
debug!("Failed to send text message: {}", e);
102102
// failure to send means user dropped incoming receiver
103103
return DriverStopReason::UserAborted;

rivetkit-rust/packages/rivetkit-core/src/actor/config.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@ const DEFAULT_MAX_QUEUE_SIZE: u32 = 1000;
1919
const DEFAULT_MAX_QUEUE_MESSAGE_SIZE: u32 = 65_536;
2020
const DEFAULT_MAX_INCOMING_MESSAGE_SIZE: u32 = 65_536;
2121
const DEFAULT_MAX_OUTGOING_MESSAGE_SIZE: u32 = 1_048_576;
22-
const DEFAULT_LIFECYCLE_COMMAND_INBOX_CAPACITY: usize = 64;
23-
const DEFAULT_DISPATCH_COMMAND_INBOX_CAPACITY: usize = 1024;
24-
const DEFAULT_LIFECYCLE_EVENT_INBOX_CAPACITY: usize = 4096;
2522

2623
#[derive(Clone)]
2724
pub enum CanHibernateWebSocket {
@@ -83,9 +80,6 @@ pub struct ActorConfig {
8380
pub max_queue_message_size: u32,
8481
pub max_incoming_message_size: u32,
8582
pub max_outgoing_message_size: u32,
86-
pub lifecycle_command_inbox_capacity: usize,
87-
pub dispatch_command_inbox_capacity: usize,
88-
pub lifecycle_event_inbox_capacity: usize,
8983
pub preload_max_workflow_bytes: Option<u64>,
9084
pub preload_max_connections_bytes: Option<u64>,
9185
pub overrides: Option<ActorConfigOverrides>,
@@ -233,9 +227,6 @@ impl Default for ActorConfig {
233227
max_queue_message_size: DEFAULT_MAX_QUEUE_MESSAGE_SIZE,
234228
max_incoming_message_size: DEFAULT_MAX_INCOMING_MESSAGE_SIZE,
235229
max_outgoing_message_size: DEFAULT_MAX_OUTGOING_MESSAGE_SIZE,
236-
lifecycle_command_inbox_capacity: DEFAULT_LIFECYCLE_COMMAND_INBOX_CAPACITY,
237-
dispatch_command_inbox_capacity: DEFAULT_DISPATCH_COMMAND_INBOX_CAPACITY,
238-
lifecycle_event_inbox_capacity: DEFAULT_LIFECYCLE_EVENT_INBOX_CAPACITY,
239230
preload_max_workflow_bytes: None,
240231
preload_max_connections_bytes: None,
241232
overrides: None,

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ use crate::actor::schedule::{InternalKeepAwakeCallback, LocalAlarmCallback};
3333
use crate::actor::sleep::{CanSleep, SleepState};
3434
use crate::actor::state::{PendingSave, PersistedActor, RequestSaveOpts};
3535
use crate::actor::task::LifecycleEvent;
36-
#[cfg(not(target_arch = "wasm32"))]
37-
use crate::actor::task::{LIFECYCLE_EVENT_INBOX_CHANNEL, actor_channel_overloaded_error};
3836
use crate::actor::task_types::UserTaskKind;
3937
use crate::actor::work_registry::{ActorWorkKind, CountGuard, RegionGuard};
4038
use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime};
@@ -140,9 +138,8 @@ pub(crate) struct ActorContextInner {
140138
inspector_attach_count: RwLock<Option<Arc<AtomicU32>>>,
141139
inspector_overlay_tx: RwLock<Option<broadcast::Sender<Arc<Vec<u8>>>>>,
142140
actor_events: RwLock<Option<mpsc::UnboundedSender<ActorEvent>>>,
143-
pub(super) lifecycle_events: RwLock<Option<mpsc::Sender<LifecycleEvent>>>,
141+
pub(super) lifecycle_events: RwLock<Option<mpsc::UnboundedSender<LifecycleEvent>>>,
144142
hibernated_connection_liveness_override: RwLock<Option<BTreeSet<(Vec<u8>, Vec<u8>)>>>,
145-
pub(super) lifecycle_event_inbox_capacity: usize,
146143
pub(super) metrics: ActorMetrics,
147144
diagnostics: ActorDiagnostics,
148145
actor_id: String,
@@ -230,7 +227,6 @@ impl ActorContext {
230227
#[cfg(feature = "sqlite-local")]
231228
sql.set_vfs_metrics(Arc::new(metrics.clone()));
232229
let diagnostics = ActorDiagnostics::new(actor_id.clone());
233-
let lifecycle_event_inbox_capacity = config.lifecycle_event_inbox_capacity;
234230
let state_save_interval = config.state_save_interval;
235231
let abort_signal = CancellationToken::new();
236232
let shutdown_deadline = CancellationToken::new();
@@ -307,7 +303,6 @@ impl ActorContext {
307303
actor_events: RwLock::new(None),
308304
lifecycle_events: RwLock::new(None),
309305
hibernated_connection_liveness_override: RwLock::new(None),
310-
lifecycle_event_inbox_capacity,
311306
metrics,
312307
diagnostics,
313308
actor_id,
@@ -1243,7 +1238,10 @@ impl ActorContext {
12431238
.await
12441239
}
12451240

1246-
pub(crate) fn configure_lifecycle_events(&self, sender: Option<mpsc::Sender<LifecycleEvent>>) {
1241+
pub(crate) fn configure_lifecycle_events(
1242+
&self,
1243+
sender: Option<mpsc::UnboundedSender<LifecycleEvent>>,
1244+
) {
12471245
*self.0.lifecycle_events.write() = sender;
12481246
}
12491247

@@ -1466,28 +1464,12 @@ impl ActorContext {
14661464
}
14671465

14681466
fn try_send_lifecycle_event(&self, event: LifecycleEvent, operation: &'static str) {
1469-
#[cfg(target_arch = "wasm32")]
1470-
let _ = operation;
1471-
14721467
let Some(sender) = self.0.lifecycle_events.read().clone() else {
14731468
return;
14741469
};
14751470

1476-
match sender.try_reserve() {
1477-
Ok(permit) => {
1478-
permit.send(event);
1479-
}
1480-
#[cfg(target_arch = "wasm32")]
1481-
Err(_) => {}
1482-
#[cfg(not(target_arch = "wasm32"))]
1483-
Err(_) => {
1484-
let _ = actor_channel_overloaded_error(
1485-
LIFECYCLE_EVENT_INBOX_CHANNEL,
1486-
self.0.lifecycle_event_inbox_capacity,
1487-
operation,
1488-
Some(&self.0.metrics),
1489-
);
1490-
}
1471+
if sender.send(event).is_err() {
1472+
tracing::warn!(operation, "failed to enqueue actor lifecycle event");
14911473
}
14921474
}
14931475
}

rivetkit-rust/packages/rivetkit-core/src/actor/diagnostics.rs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ const WARNING_LIMIT: usize = 3;
1111

1212
// Forced-sync: warning windows are updated from synchronous diagnostics paths.
1313
static GLOBAL_WARNINGS: OnceLock<SccHashMap<String, Arc<Mutex<WarningWindow>>>> = OnceLock::new();
14-
static ACTOR_WARNINGS: OnceLock<SccHashMap<String, Arc<Mutex<WarningWindow>>>> = OnceLock::new();
1514

1615
#[derive(Debug)]
1716
pub(crate) struct ActorDiagnostics {
@@ -43,25 +42,6 @@ impl ActorDiagnostics {
4342
}
4443
}
4544

46-
pub(crate) fn record_actor_warning(
47-
actor_id: &str,
48-
kind: &'static str,
49-
) -> Option<WarningSuppression> {
50-
let actor_key = format!("{actor_id}:{kind}");
51-
let per_actor = record_limited_warning(actor_warnings(), actor_key, Instant::now());
52-
let global = record_limited_warning(global_warnings(), kind.to_owned(), Instant::now());
53-
54-
if per_actor.emit && global.emit {
55-
Some(WarningSuppression {
56-
actor_id: actor_id.to_owned(),
57-
per_actor_suppressed: per_actor.suppressed,
58-
global_suppressed: global.suppressed,
59-
})
60-
} else {
61-
None
62-
}
63-
}
64-
6545
#[derive(Debug, Clone, PartialEq, Eq)]
6646
pub(crate) struct WarningSuppression {
6747
pub(crate) actor_id: String,
@@ -138,7 +118,3 @@ fn record_limited_warning(
138118
fn global_warnings() -> &'static SccHashMap<String, Arc<Mutex<WarningWindow>>> {
139119
GLOBAL_WARNINGS.get_or_init(SccHashMap::new)
140120
}
141-
142-
fn actor_warnings() -> &'static SccHashMap<String, Arc<Mutex<WarningWindow>>> {
143-
ACTOR_WARNINGS.get_or_init(SccHashMap::new)
144-
}

rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs

Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use crate::actor::task_types::{ShutdownKind, StateMutationReason, UserTaskKind};
1515

1616
#[derive(Clone)]
1717
pub(crate) struct ActorMetrics {
18-
actor_id: Arc<str>,
1918
inner: Arc<Option<ActorMetricsInner>>,
2019
}
2120

@@ -29,11 +28,8 @@ struct ActorMetricsInner {
2928
active_connections: IntGauge,
3029
connections_total: IntCounter,
3130
lifecycle_inbox_depth: IntGauge,
32-
lifecycle_inbox_overload_total: CounterVec,
3331
dispatch_inbox_depth: IntGauge,
34-
dispatch_inbox_overload_total: CounterVec,
3532
lifecycle_event_inbox_depth: IntGauge,
36-
lifecycle_event_overload_total: CounterVec,
3733
user_tasks_active: IntGaugeVec,
3834
user_task_duration_seconds: HistogramVec,
3935
shutdown_wait_seconds: HistogramVec,
@@ -100,10 +96,7 @@ impl ActorMetrics {
10096
}
10197
};
10298

103-
Self {
104-
actor_id: Arc::from(actor_id),
105-
inner: Arc::new(inner),
106-
}
99+
Self { inner: Arc::new(inner) }
107100
}
108101

109102
fn try_new_inner(actor_id: &str, actor_name: String) -> Result<ActorMetricsInner> {
@@ -154,40 +147,16 @@ impl ActorMetrics {
154147
"current actor lifecycle command inbox depth",
155148
))
156149
.context("create lifecycle_inbox_depth gauge")?;
157-
let lifecycle_inbox_overload_total = CounterVec::new(
158-
Opts::new(
159-
"lifecycle_inbox_overload_total",
160-
"total actor lifecycle command inbox overloads",
161-
),
162-
&["command"],
163-
)
164-
.context("create lifecycle_inbox_overload_total counter")?;
165150
let dispatch_inbox_depth = IntGauge::with_opts(Opts::new(
166151
"dispatch_inbox_depth",
167152
"current actor dispatch command inbox depth",
168153
))
169154
.context("create dispatch_inbox_depth gauge")?;
170-
let dispatch_inbox_overload_total = CounterVec::new(
171-
Opts::new(
172-
"dispatch_inbox_overload_total",
173-
"total actor dispatch command inbox overloads",
174-
),
175-
&["command"],
176-
)
177-
.context("create dispatch_inbox_overload_total counter")?;
178155
let lifecycle_event_inbox_depth = IntGauge::with_opts(Opts::new(
179156
"lifecycle_event_inbox_depth",
180157
"current actor lifecycle event inbox depth",
181158
))
182159
.context("create lifecycle_event_inbox_depth gauge")?;
183-
let lifecycle_event_overload_total = CounterVec::new(
184-
Opts::new(
185-
"lifecycle_event_overload_total",
186-
"total actor lifecycle event inbox overloads",
187-
),
188-
&["event"],
189-
)
190-
.context("create lifecycle_event_overload_total counter")?;
191160
let user_tasks_active = IntGaugeVec::new(
192161
Opts::new("user_tasks_active", "current active actor user tasks"),
193162
&["kind"],
@@ -385,11 +354,8 @@ impl ActorMetrics {
385354
register_metric(&registry, active_connections.clone());
386355
register_metric(&registry, connections_total.clone());
387356
register_metric(&registry, lifecycle_inbox_depth.clone());
388-
register_metric(&registry, lifecycle_inbox_overload_total.clone());
389357
register_metric(&registry, dispatch_inbox_depth.clone());
390-
register_metric(&registry, dispatch_inbox_overload_total.clone());
391358
register_metric(&registry, lifecycle_event_inbox_depth.clone());
392-
register_metric(&registry, lifecycle_event_overload_total.clone());
393359
register_metric(&registry, user_tasks_active.clone());
394360
register_metric(&registry, user_task_duration_seconds.clone());
395361
register_metric(&registry, shutdown_wait_seconds.clone());
@@ -464,11 +430,8 @@ impl ActorMetrics {
464430
active_connections,
465431
connections_total,
466432
lifecycle_inbox_depth,
467-
lifecycle_inbox_overload_total,
468433
dispatch_inbox_depth,
469-
dispatch_inbox_overload_total,
470434
lifecycle_event_inbox_depth,
471-
lifecycle_event_overload_total,
472435
user_tasks_active,
473436
user_task_duration_seconds,
474437
shutdown_wait_seconds,
@@ -520,10 +483,6 @@ impl ActorMetrics {
520483
})
521484
}
522485

523-
pub(crate) fn actor_id(&self) -> &str {
524-
&self.actor_id
525-
}
526-
527486
pub(crate) fn render(&self) -> Result<String> {
528487
let Some(inner) = self.inner.as_ref().as_ref() else {
529488
return Ok(String::new());
@@ -600,16 +559,6 @@ impl ActorMetrics {
600559
.set(depth.try_into().unwrap_or(i64::MAX));
601560
}
602561

603-
pub(crate) fn inc_lifecycle_inbox_overload(&self, command: &str) {
604-
let Some(inner) = self.inner.as_ref().as_ref() else {
605-
return;
606-
};
607-
inner
608-
.lifecycle_inbox_overload_total
609-
.with_label_values(&[command])
610-
.inc();
611-
}
612-
613562
pub(crate) fn set_dispatch_inbox_depth(&self, depth: usize) {
614563
let Some(inner) = self.inner.as_ref().as_ref() else {
615564
return;
@@ -619,16 +568,6 @@ impl ActorMetrics {
619568
.set(depth.try_into().unwrap_or(i64::MAX));
620569
}
621570

622-
pub(crate) fn inc_dispatch_inbox_overload(&self, command: &str) {
623-
let Some(inner) = self.inner.as_ref().as_ref() else {
624-
return;
625-
};
626-
inner
627-
.dispatch_inbox_overload_total
628-
.with_label_values(&[command])
629-
.inc();
630-
}
631-
632571
pub(crate) fn set_lifecycle_event_inbox_depth(&self, depth: usize) {
633572
let Some(inner) = self.inner.as_ref().as_ref() else {
634573
return;
@@ -638,16 +577,6 @@ impl ActorMetrics {
638577
.set(depth.try_into().unwrap_or(i64::MAX));
639578
}
640579

641-
pub(crate) fn inc_lifecycle_event_overload(&self, event: &str) {
642-
let Some(inner) = self.inner.as_ref().as_ref() else {
643-
return;
644-
};
645-
inner
646-
.lifecycle_event_overload_total
647-
.with_label_values(&[event])
648-
.inc();
649-
}
650-
651580
pub(crate) fn begin_user_task(&self, kind: UserTaskKind) {
652581
let Some(inner) = self.inner.as_ref().as_ref() else {
653582
return;

0 commit comments

Comments
 (0)