diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/connection.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/connection.rs index 617f25ed86..90d11006fd 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/connection.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/connection.rs @@ -683,14 +683,17 @@ impl ActorContext { ); conn.configure_hibernation(hibernation); self.prepare_managed_conn(&conn); - self.insert_existing(conn.clone()); if let Err(error) = prepare_connection(&conn) { - self.remove_existing(conn.id()); return Err(error); } - if let Err(error) = self.emit_connection_open(&conn, params, request).await { + self + .emit_connection_preflight(&conn, params.clone(), request.clone()) + .await?; + self.insert_existing(conn.clone()); + + if let Err(error) = self.emit_connection_open(&conn, request).await { self.remove_existing(conn.id()); return Err(error); } @@ -868,7 +871,6 @@ impl ActorContext { async fn emit_connection_open( &self, conn: &ConnHandle, - params: Vec, request: Option, ) -> Result<()> { let config = self.connection_config(); @@ -876,7 +878,6 @@ impl ActorContext { self.try_send_actor_event( ActorEvent::ConnectionOpen { conn: conn.clone(), - params, request, reply: Reply::from(reply_tx), }, @@ -889,6 +890,33 @@ impl ActorContext { Ok(()) } + async fn emit_connection_preflight( + &self, + conn: &ConnHandle, + params: Vec, + request: Option, + ) -> Result<()> { + let config = self.connection_config(); + let timeout_duration = config + .on_before_connect_timeout + .saturating_add(config.create_conn_state_timeout); + let (reply_tx, reply_rx) = oneshot::channel(); + self.try_send_actor_event( + ActorEvent::ConnectionPreflight { + conn: conn.clone(), + params, + request, + reply: Reply::from(reply_tx), + }, + "connection_preflight", + )?; + timeout(timeout_duration, reply_rx) + .await + .with_context(|| timeout_message("connection_preflight", timeout_duration))? + .context("receive connection_preflight reply")??; + Ok(()) + } + pub(crate) fn connection(&self, conn_id: &str) -> Option { self.0.connections.read().get(conn_id).cloned() } diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/messages.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/messages.rs index 4ea57aedd8..2e2954658d 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/messages.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/messages.rs @@ -290,12 +290,17 @@ pub enum ActorEvent { request: Option, reply: Reply<()>, }, - ConnectionOpen { + ConnectionPreflight { conn: ConnHandle, params: Vec, request: Option, reply: Reply<()>, }, + ConnectionOpen { + conn: ConnHandle, + request: Option, + reply: Reply<()>, + }, ConnectionClosed { conn: ConnHandle, }, @@ -342,6 +347,7 @@ impl ActorEvent { Self::HttpRequest { .. } => "http_request", Self::QueueSend { .. } => "queue_send", Self::WebSocketOpen { .. } => "websocket_open", + Self::ConnectionPreflight { .. } => "connection_preflight", Self::ConnectionOpen { .. } => "connection_open", Self::ConnectionClosed { .. } => "connection_closed", Self::SubscribeRequest { .. } => "subscribe_request", diff --git a/rivetkit-rust/packages/rivetkit-core/tests/connection.rs b/rivetkit-rust/packages/rivetkit-core/tests/connection.rs index 5e56977bd6..9524c4ee2a 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/connection.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/connection.rs @@ -4,6 +4,7 @@ mod moved_tests { use std::collections::BTreeSet; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; use parking_lot::Mutex; use tokio::sync::{Barrier, mpsc}; @@ -70,6 +71,101 @@ mod moved_tests { assert!(ctx.connection("conn-preloaded").is_some()); } + #[tokio::test] + async fn pending_connection_is_invisible_until_preflight_succeeds() { + let ctx = ActorContext::new_with_kv( + "actor-preflight-visibility", + "actor", + Vec::new(), + "local", + Kv::new_in_memory(), + ); + ctx.configure_connection_runtime(crate::actor::config::ActorConfig::default()); + let (events_tx, mut events_rx) = mpsc::unbounded_channel(); + ctx.configure_actor_events(Some(events_tx)); + + let events_ctx = ctx.clone(); + let event_task = tokio::spawn(async move { + let preflight_conn_id = match events_rx.recv().await.expect("preflight event") { + ActorEvent::ConnectionPreflight { conn, reply, .. } => { + assert!(events_ctx.connection(conn.id()).is_none()); + conn.set_state_initial(vec![7]); + let conn_id = conn.id().to_owned(); + reply.send(Ok(())); + conn_id + } + other => panic!("unexpected event: {other:?}"), + }; + + match events_rx.recv().await.expect("open event") { + ActorEvent::ConnectionOpen { conn, reply, .. } => { + assert_eq!(conn.id(), preflight_conn_id); + let visible = events_ctx + .connection(conn.id()) + .expect("connection should be visible for onConnect"); + assert_eq!(visible.state(), vec![7]); + reply.send(Ok(())); + } + other => panic!("unexpected event: {other:?}"), + } + }); + + let conn = ctx + .connect_with_state(vec![1], false, None, None, async { Ok(vec![2]) }) + .await + .expect("connection should succeed"); + + assert_eq!(conn.state(), vec![7]); + assert!(ctx.connection(conn.id()).is_some()); + event_task.await.expect("event task should complete"); + } + + #[tokio::test] + async fn failed_preflight_never_exposes_connection() { + let ctx = ActorContext::new_with_kv( + "actor-preflight-failure", + "actor", + Vec::new(), + "local", + Kv::new_in_memory(), + ); + ctx.configure_connection_runtime(crate::actor::config::ActorConfig::default()); + let (events_tx, mut events_rx) = mpsc::unbounded_channel(); + ctx.configure_actor_events(Some(events_tx)); + let failed_conn_id = Arc::new(Mutex::new(None::)); + + let events_ctx = ctx.clone(); + let event_failed_conn_id = failed_conn_id.clone(); + let event_task = tokio::spawn(async move { + match events_rx.recv().await.expect("preflight event") { + ActorEvent::ConnectionPreflight { conn, reply, .. } => { + assert!(events_ctx.connection(conn.id()).is_none()); + *event_failed_conn_id.lock() = Some(conn.id().to_owned()); + reply.send(Err(anyhow::anyhow!("reject preflight"))); + } + other => panic!("unexpected event: {other:?}"), + } + assert!( + tokio::time::timeout(Duration::from_millis(20), events_rx.recv()) + .await + .is_err() + ); + }); + + let error = ctx + .connect_with_state(vec![1], false, None, None, async { Ok(vec![2]) }) + .await + .expect_err("connection should fail"); + + assert!(format!("{error:#}").contains("reject preflight")); + let conn_id = failed_conn_id + .lock() + .clone() + .expect("failed connection id should be recorded"); + assert!(ctx.connection(&conn_id).is_none()); + event_task.await.expect("event task should complete"); + } + #[test] fn persisted_connection_uses_ts_v4_fixed_id_wire_format() { let persisted = PersistedConnection { @@ -132,6 +228,7 @@ mod moved_tests { async move { while let Some(event) = events_rx.recv().await { match event { + ActorEvent::ConnectionPreflight { reply, .. } => reply.send(Ok(())), ActorEvent::ConnectionOpen { reply, .. } => reply.send(Ok(())), ActorEvent::ConnectionClosed { conn } => { *observed_conn_id.lock() = Some(conn.id().to_owned()); @@ -218,12 +315,13 @@ mod moved_tests { ctx.configure_lifecycle_events(Some(lifecycle_events_tx)); let open_replies = tokio::spawn(async move { - for _ in 0..2 { + for _ in 0..4 { match actor_events_rx .recv() .await .expect("open event should arrive") { + ActorEvent::ConnectionPreflight { reply, .. } => reply.send(Ok(())), ActorEvent::ConnectionOpen { reply, .. } => reply.send(Ok(())), other => panic!("unexpected actor event: {other:?}"), } diff --git a/rivetkit-rust/packages/rivetkit-core/tests/integration/counter.rs b/rivetkit-rust/packages/rivetkit-core/tests/integration/counter.rs index 97e87e4a50..6ad17be37f 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/integration/counter.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/integration/counter.rs @@ -103,7 +103,7 @@ fn counter_factory() -> ActorFactory { } => { reply.send(Err(anyhow::anyhow!("websockets are not handled"))); } - ActorEvent::ConnectionOpen { + ActorEvent::ConnectionPreflight { conn: _, params: _, request: _, @@ -111,6 +111,9 @@ fn counter_factory() -> ActorFactory { } => { reply.send(Ok(())); } + ActorEvent::ConnectionOpen { reply, .. } => { + reply.send(Ok(())); + } ActorEvent::ConnectionClosed { conn: _ } => {} ActorEvent::SubscribeRequest { conn: _, diff --git a/rivetkit-rust/packages/rivetkit-core/tests/task.rs b/rivetkit-rust/packages/rivetkit-core/tests/task.rs index 33d7865950..ba30342bfc 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/task.rs @@ -1745,7 +1745,8 @@ mod moved_tests { reply.send(Ok(())); break; } - ActorEvent::ConnectionOpen { .. } => { + ActorEvent::ConnectionPreflight { .. } + | ActorEvent::ConnectionOpen { .. } => { panic!("hibernated connection should not refire ConnectionOpen"); } _ => {} @@ -3982,7 +3983,8 @@ mod moved_tests { reply.send(Ok(())); break; } - ActorEvent::ConnectionOpen { .. } => { + ActorEvent::ConnectionPreflight { .. } + | ActorEvent::ConnectionOpen { .. } => { panic!("hibernated connection should not refire ConnectionOpen"); } _ => {} @@ -4107,7 +4109,8 @@ mod moved_tests { reply.send(Ok(())); break; } - ActorEvent::ConnectionOpen { .. } => { + ActorEvent::ConnectionPreflight { .. } + | ActorEvent::ConnectionOpen { .. } => { panic!("dead hibernated connection should not refire ConnectionOpen"); } _ => {} diff --git a/rivetkit-rust/packages/rivetkit/src/event.rs b/rivetkit-rust/packages/rivetkit/src/event.rs index 35910c5880..545fe66f40 100644 --- a/rivetkit-rust/packages/rivetkit/src/event.rs +++ b/rivetkit-rust/packages/rivetkit/src/event.rs @@ -81,7 +81,7 @@ impl Event { reply: Some(reply), _p: PhantomData, }), - ActorEvent::ConnectionOpen { + ActorEvent::ConnectionPreflight { conn, params, request, @@ -92,6 +92,9 @@ impl Event { request, reply: Some(reply), }), + ActorEvent::ConnectionOpen { .. } => { + unreachable!("ConnectionOpen is handled by Events") + } ActorEvent::ConnectionClosed { conn } => Self::ConnClosed(ConnClosed { conn: ConnCtx::from(conn), }), diff --git a/rivetkit-rust/packages/rivetkit/src/start.rs b/rivetkit-rust/packages/rivetkit/src/start.rs index 73dd751817..e23302dd21 100644 --- a/rivetkit-rust/packages/rivetkit/src/start.rs +++ b/rivetkit-rust/packages/rivetkit/src/start.rs @@ -127,6 +127,10 @@ impl Events { async fn handle_runtime_event(&self, event: ActorEvent) -> Option { match event { + ActorEvent::ConnectionOpen { reply, .. } => { + reply.send(Ok(())); + None + } ActorEvent::DisconnectConn { conn_id, reply } => { reply.send(self.ctx.disconnect_conn(&conn_id).await); None @@ -137,6 +141,10 @@ impl Events { fn handle_runtime_event_sync(&self, event: ActorEvent) -> Option { match event { + ActorEvent::ConnectionOpen { reply, .. } => { + reply.send(Ok(())); + None + } ActorEvent::DisconnectConn { conn_id, reply } => { let ctx = self.ctx.clone(); tokio::spawn(async move { diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs b/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs index 74fd941ccf..4f4c65bb86 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs @@ -502,7 +502,7 @@ pub(crate) async fn dispatch_event( call_on_websocket(&callback, &ctx, conn, ws, request).await }); } - ActorEvent::ConnectionOpen { + ActorEvent::ConnectionPreflight { conn, params, request, @@ -510,9 +510,7 @@ pub(crate) async fn dispatch_event( } => { let on_before_connect = bindings.on_before_connect.clone(); let create_conn_state = bindings.create_conn_state.clone(); - let on_connect = bindings.on_connect.clone(); let timeout = config.on_before_connect_timeout; - let connect_timeout = config.on_connect_timeout; let create_conn_state_timeout = config.create_conn_state_timeout; let ctx = ctx.clone(); @@ -542,6 +540,19 @@ pub(crate) async fn dispatch_event( ctx.set_conn_state_initial(&conn, state)?; } + Ok(()) + }); + } + ActorEvent::ConnectionOpen { + conn, + request, + reply, + } => { + let on_connect = bindings.on_connect.clone(); + let connect_timeout = config.on_connect_timeout; + let ctx = ctx.clone(); + + spawn_reply(tasks, abort.clone(), reply, async move { if let Some(callback) = on_connect { with_timeout( "onConnect", diff --git a/rivetkit-typescript/packages/rivetkit-napi/tests/napi_actor_events.rs b/rivetkit-typescript/packages/rivetkit-napi/tests/napi_actor_events.rs index d2c8d04b1c..c8a97eaf88 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/tests/napi_actor_events.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/tests/napi_actor_events.rs @@ -241,7 +241,7 @@ mod moved_tests { let conn = rivetkit_core::ConnHandle::new("conn-open", vec![1, 2, 3], Vec::new(), false); dispatch_event( - ActorEvent::ConnectionOpen { + ActorEvent::ConnectionPreflight { conn: conn.clone(), params: vec![4, 5, 6], request: None, diff --git a/rivetkit-typescript/packages/rivetkit-wasm/src/lib.rs b/rivetkit-typescript/packages/rivetkit-wasm/src/lib.rs index 0c845c17cb..4847a8b161 100644 --- a/rivetkit-typescript/packages/rivetkit-wasm/src/lib.rs +++ b/rivetkit-typescript/packages/rivetkit-wasm/src/lib.rs @@ -865,13 +865,26 @@ async fn dispatch_event(callbacks: &WasmCallbacks, ctx: &WasmActorContext, event } reply.send(result); } - ActorEvent::ConnectionOpen { + ActorEvent::ConnectionPreflight { conn, params, request, reply, } => { - let result = run_connection_open(callbacks, ctx, conn, params, request).await; + let result = run_connection_preflight(callbacks, ctx, conn, params, request).await; + if let Err(error) = &result { + console_error(&format!( + "wasm connection preflight callback failed: {error:#}" + )); + } + reply.send(result); + } + ActorEvent::ConnectionOpen { + conn, + request, + reply, + } => { + let result = run_connection_open(callbacks, ctx, conn, request).await; if let Err(error) = &result { console_error(&format!("wasm connection open callback failed: {error:#}")); } @@ -949,7 +962,7 @@ async fn dispatch_event(callbacks: &WasmCallbacks, ctx: &WasmActorContext, event } } -async fn run_connection_open( +async fn run_connection_preflight( callbacks: &WasmCallbacks, ctx: &WasmActorContext, conn: rivetkit_core::ConnHandle, @@ -966,11 +979,14 @@ async fn run_connection_open( call_callback(callback, &payload.into()).await?; } - let wasm_conn = WasmConnHandle::from_core(conn.clone()); if let Some(callback) = &callbacks.create_conn_state { let payload = object(); set_anyhow(&payload, "ctx", JsValue::from(ctx.clone()))?; - set_anyhow(&payload, "conn", JsValue::from(wasm_conn.clone()))?; + set_anyhow( + &payload, + "conn", + JsValue::from(WasmConnHandle::from_core(conn.clone())), + )?; set_anyhow(&payload, "params", bytes_to_js(¶ms))?; if let Some(request) = request.as_ref() { set_anyhow(&payload, "request", request_to_js(request.clone())?)?; @@ -979,16 +995,29 @@ async fn run_connection_open( conn.set_state_initial(state); } - if let Some(callback) = &callbacks.on_connect { - let payload = object(); - set_anyhow(&payload, "ctx", JsValue::from(ctx.clone()))?; - set_anyhow(&payload, "conn", JsValue::from(wasm_conn))?; - if let Some(request) = request { - set_anyhow(&payload, "request", request_to_js(request)?)?; - } - call_callback(callback, &payload.into()).await?; - } + Ok(()) +} +async fn run_connection_open( + callbacks: &WasmCallbacks, + ctx: &WasmActorContext, + conn: rivetkit_core::ConnHandle, + request: Option, +) -> Result<()> { + let Some(callback) = &callbacks.on_connect else { + return Ok(()); + }; + let payload = object(); + set_anyhow(&payload, "ctx", JsValue::from(ctx.clone()))?; + set_anyhow( + &payload, + "conn", + JsValue::from(WasmConnHandle::from_core(conn)), + )?; + if let Some(request) = request { + set_anyhow(&payload, "request", request_to_js(request)?)?; + } + call_callback(callback, &payload.into()).await?; Ok(()) } diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-preflight-visibility.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-preflight-visibility.ts new file mode 100644 index 0000000000..2f55906f2e --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-preflight-visibility.ts @@ -0,0 +1,78 @@ +import { actor } from "rivetkit"; + +type ConnState = { + label: string; +}; + +type ConnParams = { + label?: string; + beforeDelayMs?: number; + createDelayMs?: number; +}; + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); +const visibleLabels = (c: { + conns: Map; +}) => + Array.from(c.conns.values()).map((conn) => conn.state?.label ?? null); + +export const connPreflightVisibilityActor = actor({ + state: { + beforeStarted: 0, + createStarted: 0, + beforeVisibleLabels: [] as Array>, + createVisibleLabels: [] as Array>, + connectSnapshots: [] as Array<{ + label: string; + ownVisible: boolean; + visibleLabels: string[]; + }>, + disconnectSnapshots: [] as Array<{ + label: string | null; + otherLabels: Array; + }>, + }, + onBeforeConnect: async (c, params: ConnParams) => { + c.state.beforeStarted += 1; + c.state.beforeVisibleLabels.push(visibleLabels(c)); + if (params?.beforeDelayMs) { + await sleep(params.beforeDelayMs); + } + }, + createConnState: async (c, params: ConnParams): Promise => { + c.state.createStarted += 1; + c.state.createVisibleLabels.push(visibleLabels(c)); + if (params?.createDelayMs) { + await sleep(params.createDelayMs); + } + return { label: params?.label ?? "anonymous" }; + }, + onConnect: (c, conn) => { + c.state.connectSnapshots.push({ + label: conn.state.label, + ownVisible: c.conns.has(conn.id), + visibleLabels: Array.from(c.conns.values()).map( + (other) => other.state.label, + ), + }); + }, + onDisconnect: (c, conn) => { + c.state.disconnectSnapshots.push({ + label: conn.state?.label ?? null, + otherLabels: Array.from(c.conns.values()) + .filter((other) => other !== conn) + .map((other) => other.state?.label ?? null), + }); + }, + actions: { + snapshot: (c) => ({ + beforeStarted: c.state.beforeStarted, + createStarted: c.state.createStarted, + beforeVisibleLabels: c.state.beforeVisibleLabels, + createVisibleLabels: c.state.createVisibleLabels, + connectSnapshots: c.state.connectSnapshots, + disconnectSnapshots: c.state.disconnectSnapshots, + visibleLabels: visibleLabels(c), + }), + }, +}); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts index df4f7c22d0..832e1065ee 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts @@ -22,6 +22,7 @@ import { import { dbActorRaw, dbRemoteLifecycleProbe } from "./actor-db-raw"; import { onStateChangeActor } from "./actor-onstatechange"; import { connErrorSerializationActor } from "./conn-error-serialization"; +import { connPreflightVisibilityActor } from "./conn-preflight-visibility"; import { dbInitOrderCreateStateActor, dbInitOrderCreateVarsActor, @@ -258,6 +259,8 @@ export const registry = setup({ counterWithParams, // From conn-state.ts connStateActor, + // From conn-preflight-visibility.ts + connPreflightVisibilityActor, // From metadata.ts metadataActor, // From vars.ts diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index e812375484..f82717ec6f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -1001,6 +1001,7 @@ type CreateState< // This must have only one or the other or else TState will not be able to be inferred // // Data returned from this handler will be available on `c.conn.state`. +// The pending connection is not visible in `c.conns` until this succeeds. type CreateConnState< TState, TConnParams, @@ -1292,10 +1293,11 @@ interface BaseActorConfig< * Called before a client connects to the actor. * * Use this hook to determine if a connection should be accepted - * and to initialize connection-specific state. + * and to validate client-provided parameters. The pending connection + * is not visible in `c.conns` while this hook runs. * * @param opts Connection parameters including client-provided data - * @returns The initial connection state or a Promise that resolves to it + * @returns Void or a Promise. * @throws Throw an error to reject the connection */ onBeforeConnect?: ( @@ -1314,7 +1316,8 @@ interface BaseActorConfig< * Called when a client successfully connects to the actor. * * Use this hook to perform actions when a connection is established, - * such as sending initial data or updating the actor's state. + * such as sending initial data or updating the actor's state. The + * connection is visible in `c.conns` before this hook runs. * * @param conn The connection object * @returns Void or a Promise that resolves when connection handling is complete @@ -1875,7 +1878,7 @@ export const DocActorConfigSchema = z .unknown() .optional() .describe( - "Function to create connection state. Receives context and connection params. Cannot be used with connState.", + "Function to create connection state. Receives context and connection params. The pending connection is not visible in c.conns until this succeeds. Cannot be used with connState.", ), vars: z .unknown() @@ -1937,12 +1940,14 @@ export const DocActorConfigSchema = z .unknown() .optional() .describe( - "Called before a client connects. Throw an error to reject the connection.", + "Called before a client connects. Throw an error to reject the connection. The pending connection is not visible in c.conns while this runs.", ), onConnect: z .unknown() .optional() - .describe("Called when a client successfully connects."), + .describe( + "Called when a client successfully connects. The connection is visible in c.conns before this runs.", + ), onDisconnect: z .unknown() .optional() diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-conn-state.test.ts b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-conn-state.test.ts index d0c742224a..204ee9a251 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-conn-state.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-conn-state.test.ts @@ -149,6 +149,94 @@ describeDriverMatrix("Actor Conn State", (driverTestConfig) => { }); describe("Connection Lifecycle", () => { + test("should hide connections from c.conns until createConnState completes", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const handle = client.connPreflightVisibilityActor.getOrCreate([ + "create-state-visibility", + crypto.randomUUID(), + ]); + const primary = handle.connect({ label: "primary" }); + await primary.snapshot(); + + const pending = handle.connect({ + label: "pending", + createDelayMs: 300, + }); + + const pendingSnapshot = await pending.snapshot(); + expect([...pendingSnapshot.visibleLabels].sort()).toEqual([ + "pending", + "primary", + ]); + expect(pendingSnapshot.createVisibleLabels).toEqual([ + [], + ["primary"], + ]); + expect( + pendingSnapshot.connectSnapshots.find( + (snapshot) => snapshot.label === "pending", + ), + ).toMatchObject({ + ownVisible: true, + }); + expect( + [ + ...pendingSnapshot.connectSnapshots.find( + (snapshot) => snapshot.label === "pending", + ).visibleLabels, + ].sort(), + ).toEqual(["pending", "primary"]); + + await pending.dispose(); + await primary.dispose(); + }); + + test("should hide connections from c.conns until onBeforeConnect completes", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const handle = client.connPreflightVisibilityActor.getOrCreate([ + "before-connect-visibility", + crypto.randomUUID(), + ]); + const primary = handle.connect({ label: "primary" }); + await primary.snapshot(); + + const pending = handle.connect({ + label: "pending", + beforeDelayMs: 300, + }); + + const pendingSnapshot = await pending.snapshot(); + expect([...pendingSnapshot.visibleLabels].sort()).toEqual([ + "pending", + "primary", + ]); + expect(pendingSnapshot.beforeVisibleLabels).toEqual([ + [], + ["primary"], + ]); + expect(pendingSnapshot.createVisibleLabels).toEqual([ + [], + ["primary"], + ]); + expect( + pendingSnapshot.connectSnapshots.find( + (snapshot) => snapshot.label === "pending", + ), + ).toMatchObject({ + ownVisible: true, + }); + expect( + [ + ...pendingSnapshot.connectSnapshots.find( + (snapshot) => snapshot.label === "pending", + ).visibleLabels, + ].sort(), + ).toEqual(["pending", "primary"]); + + await pending.dispose(); + await primary.dispose(); + }); + test("should deliver onConnect events to listeners registered before the first await", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); diff --git a/website/src/content/docs/actors/connections.mdx b/website/src/content/docs/actors/connections.mdx index da43f7d5d1..40149acce6 100644 --- a/website/src/content/docs/actors/connections.mdx +++ b/website/src/content/docs/actors/connections.mdx @@ -183,6 +183,8 @@ Each client connection goes through a series of lifecycle hooks that allow you t - `createConnState` - `onConnect` +Pending connections are not visible in `c.conns` while `onBeforeConnect` or `createConnState` is running. RivetKit adds the connection to `c.conns` after those hooks succeed and before `onConnect` runs. + **On Disconnect** (per client) - `onDisconnect` @@ -195,6 +197,8 @@ There are two ways to define the initial state for connections: 1. `connState`: Define a constant object that will be used as the initial state for all connections 2. `createConnState`: A function that dynamically creates initial connection state based on connection parameters. Can be async. +Connections are not visible in `c.conns` until `createConnState` completes successfully. + ### `onBeforeConnect` [API Reference](/typedoc/interfaces/rivetkit.mod.BeforeConnectContext.html) @@ -203,6 +207,8 @@ The `onBeforeConnect` hook is called whenever a new client connects to the actor The `onBeforeConnect` hook does NOT return connection state - it's used solely for validation. +Connections are not visible in `c.conns` while `onBeforeConnect` is running. + ```typescript import { actor } from "rivetkit"; @@ -263,6 +269,8 @@ Connections cannot interact with the actor until this method completes successfu Executed after the client has successfully connected. Can be async. Receives the connection object as a second parameter. +By the time `onConnect` runs, the connection is visible in `c.conns`. + ```typescript import { actor } from "rivetkit";