Skip to content

Commit d69dfaa

Browse files
committed
feat(rivetkit): add ConnectionMap readonly Map wrapper for actor connections
1 parent ccf09c6 commit d69dfaa

3 files changed

Lines changed: 95 additions & 23 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use parking_lot::{Mutex, RwLock};
1313
use rivet_error::ActorSpecifier;
1414
use rivet_envoy_client::handle::EnvoyHandle;
1515
use rivet_envoy_client::tunnel::HibernatingWebSocketMetadata;
16+
use rivet_error::ActorSpecifier;
1617
use scc::HashMap as SccHashMap;
1718
use tokio::runtime::Handle;
1819
use tokio::sync::{Mutex as AsyncMutex, Notify, OnceCell, broadcast, mpsc, oneshot};
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import type { ActorContextHandle, ConnHandle, CoreRuntime } from "./runtime";
2+
import type { NativeValidationConfig } from "./native-validation";
3+
import { callNativeSync, NativeConnAdapter } from "./native";
4+
5+
export class ConnectionMap implements ReadonlyMap<string, NativeConnAdapter> {
6+
#runtime: CoreRuntime;
7+
#ctx: ActorContextHandle;
8+
#schemas: NativeValidationConfig;
9+
10+
constructor(
11+
runtime: CoreRuntime,
12+
ctx: ActorContextHandle,
13+
schemas: NativeValidationConfig,
14+
) {
15+
this.#runtime = runtime;
16+
this.#ctx = ctx;
17+
this.#schemas = schemas;
18+
}
19+
20+
#connToAdapter(conn: ConnHandle): NativeConnAdapter {
21+
return new NativeConnAdapter(
22+
this.#runtime,
23+
conn,
24+
this.#schemas,
25+
this.#ctx,
26+
(connId) =>
27+
callNativeSync(() =>
28+
this.#runtime.actorQueueHibernationRemoval(
29+
this.#ctx,
30+
connId,
31+
),
32+
),
33+
);
34+
}
35+
36+
get size(): number {
37+
return callNativeSync(() => this.#runtime.actorConns(this.#ctx)).length;
38+
}
39+
40+
get(key: string): NativeConnAdapter | undefined {
41+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
42+
const conn = conns.find(
43+
(c) => this.#runtime.connId(c) === key,
44+
);
45+
if (!conn) return undefined;
46+
return this.#connToAdapter(conn);
47+
}
48+
49+
has(key: string): boolean {
50+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
51+
return conns.some((c) => this.#runtime.connId(c) === key);
52+
}
53+
54+
keys(): MapIterator<string> {
55+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
56+
return conns.map((c) => this.#runtime.connId(c))[Symbol.iterator]() as MapIterator<string>;
57+
}
58+
59+
values(): MapIterator<NativeConnAdapter> {
60+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
61+
return conns.map((c) => this.#connToAdapter(c))[Symbol.iterator]() as MapIterator<NativeConnAdapter>;
62+
}
63+
64+
entries(): MapIterator<[string, NativeConnAdapter]> {
65+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
66+
return conns.map(
67+
(c) => [this.#runtime.connId(c), this.#connToAdapter(c)] as [string, NativeConnAdapter],
68+
)[Symbol.iterator]() as MapIterator<[string, NativeConnAdapter]>;
69+
}
70+
71+
forEach(
72+
callback: (value: NativeConnAdapter, key: string, map: ReadonlyMap<string, NativeConnAdapter>) => void,
73+
thisArg?: unknown,
74+
): void {
75+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
76+
for (const conn of conns) {
77+
const id = this.#runtime.connId(conn);
78+
callback.call(thisArg, this.#connToAdapter(conn), id, this);
79+
}
80+
}
81+
82+
[Symbol.iterator](): MapIterator<[string, NativeConnAdapter]> {
83+
return this.entries();
84+
}
85+
86+
get [Symbol.toStringTag](): string {
87+
return "ConnectionMap";
88+
}
89+
}

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { VirtualWebSocket } from "@rivetkit/virtual-websocket";
2+
import { ConnectionMap } from "./connection-map";
23
import {
34
ACTOR_CONTEXT_INTERNAL_SYMBOL,
45
CONN_STATE_MANAGER_SYMBOL,
@@ -658,7 +659,7 @@ async function callNative<T>(invoke: () => Promise<T>): Promise<T> {
658659
}
659660
}
660661

661-
function callNativeSync<T>(invoke: () => T): T {
662+
export function callNativeSync<T>(invoke: () => T): T {
662663
try {
663664
return invoke();
664665
} catch (error) {
@@ -1153,7 +1154,7 @@ function toActorKey(
11531154
);
11541155
}
11551156

1156-
class NativeConnAdapter {
1157+
export class NativeConnAdapter {
11571158
#runtime: CoreRuntime;
11581159
#conn: ConnHandle;
11591160
#schemas: NativeValidationConfig;
@@ -2503,27 +2504,8 @@ export class ActorContextHandleAdapter {
25032504
return callNativeSync(() => this.#runtime.actorRegion(this.#ctx));
25042505
}
25052506

2506-
get conns(): Map<string, NativeConnAdapter> {
2507-
return new Map(
2508-
callNativeSync(() => this.#runtime.actorConns(this.#ctx)).map(
2509-
(conn) => [
2510-
this.#runtime.connId(conn),
2511-
new NativeConnAdapter(
2512-
this.#runtime,
2513-
conn,
2514-
this.#schemas,
2515-
this.#ctx,
2516-
(connId) =>
2517-
callNativeSync(() =>
2518-
this.#runtime.actorQueueHibernationRemoval(
2519-
this.#ctx,
2520-
connId,
2521-
),
2522-
),
2523-
),
2524-
],
2525-
),
2526-
);
2507+
get conns(): ConnectionMap {
2508+
return new ConnectionMap(this.#runtime, this.#ctx, this.#schemas);
25272509
}
25282510

25292511
get log() {

0 commit comments

Comments
 (0)