Skip to content

Commit 8cd9dea

Browse files
committed
fix(rivetkit): hide pending connections during preflight
1 parent c7dee23 commit 8cd9dea

15 files changed

Lines changed: 407 additions & 36 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/connection.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -683,14 +683,17 @@ impl ActorContext {
683683
);
684684
conn.configure_hibernation(hibernation);
685685
self.prepare_managed_conn(&conn);
686-
self.insert_existing(conn.clone());
687686

688687
if let Err(error) = prepare_connection(&conn) {
689-
self.remove_existing(conn.id());
690688
return Err(error);
691689
}
692690

693-
if let Err(error) = self.emit_connection_open(&conn, params, request).await {
691+
self
692+
.emit_connection_preflight(&conn, params.clone(), request.clone())
693+
.await?;
694+
self.insert_existing(conn.clone());
695+
696+
if let Err(error) = self.emit_connection_open(&conn, request).await {
694697
self.remove_existing(conn.id());
695698
return Err(error);
696699
}
@@ -868,15 +871,13 @@ impl ActorContext {
868871
async fn emit_connection_open(
869872
&self,
870873
conn: &ConnHandle,
871-
params: Vec<u8>,
872874
request: Option<Request>,
873875
) -> Result<()> {
874876
let config = self.connection_config();
875877
let (reply_tx, reply_rx) = oneshot::channel();
876878
self.try_send_actor_event(
877879
ActorEvent::ConnectionOpen {
878880
conn: conn.clone(),
879-
params,
880881
request,
881882
reply: Reply::from(reply_tx),
882883
},
@@ -889,6 +890,33 @@ impl ActorContext {
889890
Ok(())
890891
}
891892

893+
async fn emit_connection_preflight(
894+
&self,
895+
conn: &ConnHandle,
896+
params: Vec<u8>,
897+
request: Option<Request>,
898+
) -> Result<()> {
899+
let config = self.connection_config();
900+
let timeout_duration = config
901+
.on_before_connect_timeout
902+
.saturating_add(config.create_conn_state_timeout);
903+
let (reply_tx, reply_rx) = oneshot::channel();
904+
self.try_send_actor_event(
905+
ActorEvent::ConnectionPreflight {
906+
conn: conn.clone(),
907+
params,
908+
request,
909+
reply: Reply::from(reply_tx),
910+
},
911+
"connection_preflight",
912+
)?;
913+
timeout(timeout_duration, reply_rx)
914+
.await
915+
.with_context(|| timeout_message("connection_preflight", timeout_duration))?
916+
.context("receive connection_preflight reply")??;
917+
Ok(())
918+
}
919+
892920
pub(crate) fn connection(&self, conn_id: &str) -> Option<ConnHandle> {
893921
self.0.connections.read().get(conn_id).cloned()
894922
}

rivetkit-rust/packages/rivetkit-core/src/actor/messages.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,12 +290,17 @@ pub enum ActorEvent {
290290
request: Option<Request>,
291291
reply: Reply<()>,
292292
},
293-
ConnectionOpen {
293+
ConnectionPreflight {
294294
conn: ConnHandle,
295295
params: Vec<u8>,
296296
request: Option<Request>,
297297
reply: Reply<()>,
298298
},
299+
ConnectionOpen {
300+
conn: ConnHandle,
301+
request: Option<Request>,
302+
reply: Reply<()>,
303+
},
299304
ConnectionClosed {
300305
conn: ConnHandle,
301306
},
@@ -342,6 +347,7 @@ impl ActorEvent {
342347
Self::HttpRequest { .. } => "http_request",
343348
Self::QueueSend { .. } => "queue_send",
344349
Self::WebSocketOpen { .. } => "websocket_open",
350+
Self::ConnectionPreflight { .. } => "connection_preflight",
345351
Self::ConnectionOpen { .. } => "connection_open",
346352
Self::ConnectionClosed { .. } => "connection_closed",
347353
Self::SubscribeRequest { .. } => "subscribe_request",

rivetkit-rust/packages/rivetkit-core/tests/connection.rs

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod moved_tests {
44
use std::collections::BTreeSet;
55
use std::sync::Arc;
66
use std::sync::atomic::{AtomicUsize, Ordering};
7+
use std::time::Duration;
78

89
use parking_lot::Mutex;
910
use tokio::sync::{Barrier, mpsc};
@@ -70,6 +71,101 @@ mod moved_tests {
7071
assert!(ctx.connection("conn-preloaded").is_some());
7172
}
7273

74+
#[tokio::test]
75+
async fn pending_connection_is_invisible_until_preflight_succeeds() {
76+
let ctx = ActorContext::new_with_kv(
77+
"actor-preflight-visibility",
78+
"actor",
79+
Vec::new(),
80+
"local",
81+
Kv::new_in_memory(),
82+
);
83+
ctx.configure_connection_runtime(crate::actor::config::ActorConfig::default());
84+
let (events_tx, mut events_rx) = mpsc::unbounded_channel();
85+
ctx.configure_actor_events(Some(events_tx));
86+
87+
let events_ctx = ctx.clone();
88+
let event_task = tokio::spawn(async move {
89+
let preflight_conn_id = match events_rx.recv().await.expect("preflight event") {
90+
ActorEvent::ConnectionPreflight { conn, reply, .. } => {
91+
assert!(events_ctx.connection(conn.id()).is_none());
92+
conn.set_state_initial(vec![7]);
93+
let conn_id = conn.id().to_owned();
94+
reply.send(Ok(()));
95+
conn_id
96+
}
97+
other => panic!("unexpected event: {other:?}"),
98+
};
99+
100+
match events_rx.recv().await.expect("open event") {
101+
ActorEvent::ConnectionOpen { conn, reply, .. } => {
102+
assert_eq!(conn.id(), preflight_conn_id);
103+
let visible = events_ctx
104+
.connection(conn.id())
105+
.expect("connection should be visible for onConnect");
106+
assert_eq!(visible.state(), vec![7]);
107+
reply.send(Ok(()));
108+
}
109+
other => panic!("unexpected event: {other:?}"),
110+
}
111+
});
112+
113+
let conn = ctx
114+
.connect_with_state(vec![1], false, None, None, async { Ok(vec![2]) })
115+
.await
116+
.expect("connection should succeed");
117+
118+
assert_eq!(conn.state(), vec![7]);
119+
assert!(ctx.connection(conn.id()).is_some());
120+
event_task.await.expect("event task should complete");
121+
}
122+
123+
#[tokio::test]
124+
async fn failed_preflight_never_exposes_connection() {
125+
let ctx = ActorContext::new_with_kv(
126+
"actor-preflight-failure",
127+
"actor",
128+
Vec::new(),
129+
"local",
130+
Kv::new_in_memory(),
131+
);
132+
ctx.configure_connection_runtime(crate::actor::config::ActorConfig::default());
133+
let (events_tx, mut events_rx) = mpsc::unbounded_channel();
134+
ctx.configure_actor_events(Some(events_tx));
135+
let failed_conn_id = Arc::new(Mutex::new(None::<String>));
136+
137+
let events_ctx = ctx.clone();
138+
let event_failed_conn_id = failed_conn_id.clone();
139+
let event_task = tokio::spawn(async move {
140+
match events_rx.recv().await.expect("preflight event") {
141+
ActorEvent::ConnectionPreflight { conn, reply, .. } => {
142+
assert!(events_ctx.connection(conn.id()).is_none());
143+
*event_failed_conn_id.lock() = Some(conn.id().to_owned());
144+
reply.send(Err(anyhow::anyhow!("reject preflight")));
145+
}
146+
other => panic!("unexpected event: {other:?}"),
147+
}
148+
assert!(
149+
tokio::time::timeout(Duration::from_millis(20), events_rx.recv())
150+
.await
151+
.is_err()
152+
);
153+
});
154+
155+
let error = ctx
156+
.connect_with_state(vec![1], false, None, None, async { Ok(vec![2]) })
157+
.await
158+
.expect_err("connection should fail");
159+
160+
assert!(format!("{error:#}").contains("reject preflight"));
161+
let conn_id = failed_conn_id
162+
.lock()
163+
.clone()
164+
.expect("failed connection id should be recorded");
165+
assert!(ctx.connection(&conn_id).is_none());
166+
event_task.await.expect("event task should complete");
167+
}
168+
73169
#[test]
74170
fn persisted_connection_uses_ts_v4_fixed_id_wire_format() {
75171
let persisted = PersistedConnection {
@@ -132,6 +228,7 @@ mod moved_tests {
132228
async move {
133229
while let Some(event) = events_rx.recv().await {
134230
match event {
231+
ActorEvent::ConnectionPreflight { reply, .. } => reply.send(Ok(())),
135232
ActorEvent::ConnectionOpen { reply, .. } => reply.send(Ok(())),
136233
ActorEvent::ConnectionClosed { conn } => {
137234
*observed_conn_id.lock() = Some(conn.id().to_owned());
@@ -218,12 +315,13 @@ mod moved_tests {
218315
ctx.configure_lifecycle_events(Some(lifecycle_events_tx));
219316

220317
let open_replies = tokio::spawn(async move {
221-
for _ in 0..2 {
318+
for _ in 0..4 {
222319
match actor_events_rx
223320
.recv()
224321
.await
225322
.expect("open event should arrive")
226323
{
324+
ActorEvent::ConnectionPreflight { reply, .. } => reply.send(Ok(())),
227325
ActorEvent::ConnectionOpen { reply, .. } => reply.send(Ok(())),
228326
other => panic!("unexpected actor event: {other:?}"),
229327
}

rivetkit-rust/packages/rivetkit-core/tests/integration/counter.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,17 @@ fn counter_factory() -> ActorFactory {
103103
} => {
104104
reply.send(Err(anyhow::anyhow!("websockets are not handled")));
105105
}
106-
ActorEvent::ConnectionOpen {
106+
ActorEvent::ConnectionPreflight {
107107
conn: _,
108108
params: _,
109109
request: _,
110110
reply,
111111
} => {
112112
reply.send(Ok(()));
113113
}
114+
ActorEvent::ConnectionOpen { reply, .. } => {
115+
reply.send(Ok(()));
116+
}
114117
ActorEvent::ConnectionClosed { conn: _ } => {}
115118
ActorEvent::SubscribeRequest {
116119
conn: _,

rivetkit-rust/packages/rivetkit-core/tests/task.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1745,7 +1745,8 @@ mod moved_tests {
17451745
reply.send(Ok(()));
17461746
break;
17471747
}
1748-
ActorEvent::ConnectionOpen { .. } => {
1748+
ActorEvent::ConnectionPreflight { .. }
1749+
| ActorEvent::ConnectionOpen { .. } => {
17491750
panic!("hibernated connection should not refire ConnectionOpen");
17501751
}
17511752
_ => {}
@@ -3982,7 +3983,8 @@ mod moved_tests {
39823983
reply.send(Ok(()));
39833984
break;
39843985
}
3985-
ActorEvent::ConnectionOpen { .. } => {
3986+
ActorEvent::ConnectionPreflight { .. }
3987+
| ActorEvent::ConnectionOpen { .. } => {
39863988
panic!("hibernated connection should not refire ConnectionOpen");
39873989
}
39883990
_ => {}
@@ -4107,7 +4109,8 @@ mod moved_tests {
41074109
reply.send(Ok(()));
41084110
break;
41094111
}
4110-
ActorEvent::ConnectionOpen { .. } => {
4112+
ActorEvent::ConnectionPreflight { .. }
4113+
| ActorEvent::ConnectionOpen { .. } => {
41114114
panic!("dead hibernated connection should not refire ConnectionOpen");
41124115
}
41134116
_ => {}

rivetkit-rust/packages/rivetkit/src/event.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl<A: Actor> Event<A> {
8181
reply: Some(reply),
8282
_p: PhantomData,
8383
}),
84-
ActorEvent::ConnectionOpen {
84+
ActorEvent::ConnectionPreflight {
8585
conn,
8686
params,
8787
request,
@@ -92,6 +92,9 @@ impl<A: Actor> Event<A> {
9292
request,
9393
reply: Some(reply),
9494
}),
95+
ActorEvent::ConnectionOpen { .. } => {
96+
unreachable!("ConnectionOpen is handled by Events")
97+
}
9598
ActorEvent::ConnectionClosed { conn } => Self::ConnClosed(ConnClosed {
9699
conn: ConnCtx::from(conn),
97100
}),

rivetkit-rust/packages/rivetkit/src/start.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ impl<A: Actor> Events<A> {
127127

128128
async fn handle_runtime_event(&self, event: ActorEvent) -> Option<ActorEvent> {
129129
match event {
130+
ActorEvent::ConnectionOpen { reply, .. } => {
131+
reply.send(Ok(()));
132+
None
133+
}
130134
ActorEvent::DisconnectConn { conn_id, reply } => {
131135
reply.send(self.ctx.disconnect_conn(&conn_id).await);
132136
None
@@ -137,6 +141,10 @@ impl<A: Actor> Events<A> {
137141

138142
fn handle_runtime_event_sync(&self, event: ActorEvent) -> Option<ActorEvent> {
139143
match event {
144+
ActorEvent::ConnectionOpen { reply, .. } => {
145+
reply.send(Ok(()));
146+
None
147+
}
140148
ActorEvent::DisconnectConn { conn_id, reply } => {
141149
let ctx = self.ctx.clone();
142150
tokio::spawn(async move {

rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -502,17 +502,15 @@ pub(crate) async fn dispatch_event(
502502
call_on_websocket(&callback, &ctx, conn, ws, request).await
503503
});
504504
}
505-
ActorEvent::ConnectionOpen {
505+
ActorEvent::ConnectionPreflight {
506506
conn,
507507
params,
508508
request,
509509
reply,
510510
} => {
511511
let on_before_connect = bindings.on_before_connect.clone();
512512
let create_conn_state = bindings.create_conn_state.clone();
513-
let on_connect = bindings.on_connect.clone();
514513
let timeout = config.on_before_connect_timeout;
515-
let connect_timeout = config.on_connect_timeout;
516514
let create_conn_state_timeout = config.create_conn_state_timeout;
517515
let ctx = ctx.clone();
518516

@@ -542,6 +540,19 @@ pub(crate) async fn dispatch_event(
542540
ctx.set_conn_state_initial(&conn, state)?;
543541
}
544542

543+
Ok(())
544+
});
545+
}
546+
ActorEvent::ConnectionOpen {
547+
conn,
548+
request,
549+
reply,
550+
} => {
551+
let on_connect = bindings.on_connect.clone();
552+
let connect_timeout = config.on_connect_timeout;
553+
let ctx = ctx.clone();
554+
555+
spawn_reply(tasks, abort.clone(), reply, async move {
545556
if let Some(callback) = on_connect {
546557
with_timeout(
547558
"onConnect",

rivetkit-typescript/packages/rivetkit-napi/tests/napi_actor_events.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ mod moved_tests {
241241
let conn = rivetkit_core::ConnHandle::new("conn-open", vec![1, 2, 3], Vec::new(), false);
242242

243243
dispatch_event(
244-
ActorEvent::ConnectionOpen {
244+
ActorEvent::ConnectionPreflight {
245245
conn: conn.clone(),
246246
params: vec![4, 5, 6],
247247
request: None,

0 commit comments

Comments
 (0)