Skip to content

Commit e0f2824

Browse files
committed
feat(rivetkit): expose conn handle in onWebSocket context
1 parent 8c874e4 commit e0f2824

10 files changed

Lines changed: 75 additions & 8 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/messages.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ pub enum ActorEvent {
285285
reply: Reply<QueueSendResult>,
286286
},
287287
WebSocketOpen {
288+
conn: ConnHandle,
288289
ws: WebSocket,
289290
request: Option<Request>,
290291
reply: Reply<()>,

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ pub enum DispatchCommand {
283283
reply: oneshot::Sender<HttpDispatchResult>,
284284
},
285285
OpenWebSocket {
286+
conn: ConnHandle,
286287
ws: WebSocket,
287288
request: Option<Request>,
288289
reply: oneshot::Sender<Result<()>>,
@@ -1031,10 +1032,16 @@ impl ActorTask {
10311032
}
10321033
}
10331034
}
1034-
DispatchCommand::OpenWebSocket { ws, request, reply } => {
1035+
DispatchCommand::OpenWebSocket {
1036+
conn,
1037+
ws,
1038+
request,
1039+
reply,
1040+
} => {
10351041
match self.send_actor_event(
10361042
"dispatch_websocket_open",
10371043
ActorEvent::WebSocketOpen {
1044+
conn,
10381045
ws,
10391046
request,
10401047
reply: Reply::from(reply),

rivetkit-rust/packages/rivetkit-core/src/registry/dispatch.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ where
8181
pub(super) async fn dispatch_websocket_open_through_task(
8282
dispatch: &mpsc::Sender<DispatchCommand>,
8383
capacity: usize,
84+
conn: ConnHandle,
8485
ws: WebSocket,
8586
request: Option<Request>,
8687
) -> Result<()> {
@@ -90,6 +91,7 @@ pub(super) async fn dispatch_websocket_open_through_task(
9091
capacity,
9192
"dispatch_websocket_open",
9293
DispatchCommand::OpenWebSocket {
94+
conn,
9395
ws,
9496
request,
9597
reply: reply_tx,

rivetkit-rust/packages/rivetkit-core/src/registry/websocket.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,7 @@ impl RegistryDispatcher {
565565
let dispatch_capacity = instance.factory.config().dispatch_command_inbox_capacity;
566566
let conn_for_close = conn.clone();
567567
let conn_for_message = conn.clone();
568+
let conn_for_open = conn.clone();
568569
let ctx_for_message = ctx.clone();
569570
let ctx_for_close = ctx.clone();
570571
let ws = WebSocket::new();
@@ -676,6 +677,7 @@ impl RegistryDispatcher {
676677
}),
677678
on_open: Some(Box::new(move |sender| {
678679
let request = request_for_open.clone();
680+
let conn = conn_for_open.clone();
679681
let ws = ws_for_open.clone();
680682
let actor_id = actor_id_for_open.clone();
681683
let dispatch = dispatch.clone();
@@ -685,6 +687,7 @@ impl RegistryDispatcher {
685687
let result = dispatch_websocket_open_through_task(
686688
&dispatch,
687689
dispatch_capacity,
690+
conn,
688691
ws.clone(),
689692
Some(request),
690693
)

rivetkit-typescript/packages/rivetkit-napi/src/actor_factory.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ pub(crate) struct QueueSendPayload {
141141
#[derive(Clone)]
142142
pub(crate) struct WebSocketPayload {
143143
pub(crate) ctx: CoreActorContext,
144+
pub(crate) conn: CoreConnHandle,
144145
pub(crate) ws: CoreWebSocket,
145146
pub(crate) request: Option<Request>,
146147
}
@@ -833,6 +834,7 @@ fn build_websocket_payload(
833834
) -> napi::Result<Vec<napi::JsUnknown>> {
834835
let mut object = env.create_object()?;
835836
object.set("ctx", ActorContext::new(payload.ctx))?;
837+
object.set("conn", ConnHandle::new(payload.conn))?;
836838
object.set("ws", WebSocket::new(payload.ws))?;
837839
if let Some(request) = payload.request {
838840
object.set("request", build_request_object(env, request)?)?;

rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,14 +474,19 @@ pub(crate) async fn dispatch_event(
474474
.await
475475
});
476476
}
477-
ActorEvent::WebSocketOpen { ws, request, reply } => {
477+
ActorEvent::WebSocketOpen {
478+
conn,
479+
ws,
480+
request,
481+
reply,
482+
} => {
478483
let Some(callback) = bindings.on_websocket.clone() else {
479484
reply.send(Ok(()));
480485
return;
481486
};
482487
let ctx = ctx.clone();
483488
spawn_reply(tasks, abort.clone(), reply, async move {
484-
call_on_websocket(&callback, &ctx, ws, request).await
489+
call_on_websocket(&callback, &ctx, conn, ws, request).await
485490
});
486491
}
487492
ActorEvent::ConnectionOpen {
@@ -1118,6 +1123,7 @@ where
11181123
async fn call_on_websocket(
11191124
callback: &crate::actor_factory::CallbackTsfn<WebSocketPayload>,
11201125
ctx: &ActorContext,
1126+
conn: rivetkit_core::ConnHandle,
11211127
ws: rivetkit_core::WebSocket,
11221128
request: Option<rivetkit_core::Request>,
11231129
) -> Result<()> {
@@ -1126,6 +1132,7 @@ async fn call_on_websocket(
11261132
callback,
11271133
WebSocketPayload {
11281134
ctx: ctx.inner().clone(),
1135+
conn,
11291136
ws,
11301137
request,
11311138
},

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/raw-websocket.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,21 @@ export const rawWebSocketAsyncOpenActor = actor({
186186
getOpenCount: (ctx) => ctx.state.openCount,
187187
},
188188
});
189+
190+
export const rawWebSocketConnContextActor = actor({
191+
onWebSocket(ctx: any, websocket: UniversalWebSocket) {
192+
const connId = ctx.conn.id;
193+
ctx.conn.state = {
194+
opened: true,
195+
connId,
196+
};
197+
websocket.send(
198+
JSON.stringify({
199+
type: "conn-context",
200+
connId,
201+
state: ctx.conn.state,
202+
}),
203+
);
204+
},
205+
actions: {},
206+
});

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ import {
7575
rawWebSocketActor,
7676
rawWebSocketAsyncOpenActor,
7777
rawWebSocketBinaryActor,
78+
rawWebSocketConnContextActor,
7879
} from "./raw-websocket";
7980
import { rejectConnectionActor } from "./reject-connection";
8081
import { requestAccessActor } from "./request-access";
@@ -268,6 +269,7 @@ export const registry = setup({
268269
rawWebSocketActor,
269270
rawWebSocketAsyncOpenActor,
270271
rawWebSocketBinaryActor,
272+
rawWebSocketConnContextActor,
271273
// From reject-connection.ts
272274
rejectConnectionActor,
273275
// From request-access.ts

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4134,6 +4134,7 @@ export function buildNativeFactory(
41344134
error: unknown,
41354135
payload: {
41364136
ctx: NativeActorContext;
4137+
conn: NativeConnHandle;
41374138
ws: NativeWebSocket;
41384139
request?: {
41394140
method: string;
@@ -4143,14 +4144,19 @@ export function buildNativeFactory(
41434144
};
41444145
},
41454146
) => {
4146-
const { ctx, ws, request } = unwrapTsfnPayload(
4147-
error,
4148-
payload,
4149-
);
4147+
const { ctx, conn, ws, request } =
4148+
unwrapTsfnPayload(
4149+
error,
4150+
payload,
4151+
);
41504152
const jsRequest = request
41514153
? buildRequest(request)
41524154
: undefined;
4153-
const actorCtx = makeActorCtx(ctx, jsRequest);
4155+
const actorCtx = makeConnCtx(
4156+
ctx,
4157+
conn,
4158+
jsRequest,
4159+
);
41544160
try {
41554161
await config.onWebSocket(
41564162
actorCtx,

rivetkit-typescript/packages/rivetkit/tests/driver/raw-websocket.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,25 @@ describeDriverMatrix("Raw Websocket", (driverTestConfig) => {
452452
ws.close();
453453
});
454454

455+
test("should expose connection context in onWebSocket", async (c) => {
456+
const { client } = await setupDriverTest(c, driverTestConfig);
457+
const actor = client.rawWebSocketConnContextActor.getOrCreate([
458+
"conn-context",
459+
]);
460+
461+
const ws = await actor.webSocket();
462+
const message = await waitForJsonMessage(ws, 5_000);
463+
464+
expect(message?.type).toBe("conn-context");
465+
expect(typeof message?.connId).toBe("string");
466+
expect(message?.state).toEqual({
467+
opened: true,
468+
connId: message?.connId,
469+
});
470+
471+
ws.close();
472+
});
473+
455474
test("should properly handle onWebSocket open and close events", async (c) => {
456475
const { client } = await setupDriverTest(c, driverTestConfig);
457476
const actor = client.rawWebSocketActor.getOrCreate([

0 commit comments

Comments
 (0)