Skip to content

Commit 4fbdc66

Browse files
authored
feat(rivetkit): add ConnectionMap readonly Map wrapper for actor connections (#5047)
duplicate of https://app.graphite.com/github/pr/rivet-dev/rivet/5021
1 parent dd7eb21 commit 4fbdc66

1 file changed

Lines changed: 90 additions & 21 deletions

File tree

  • rivetkit-typescript/packages/rivetkit/src/registry

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 90 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2324,6 +2324,90 @@ class TrackedWebSocketHandleAdapter implements UniversalWebSocket {
23242324
}
23252325
}
23262326

2327+
class NativeConnectionMap implements ReadonlyMap<string, NativeConnAdapter> {
2328+
#runtime: CoreRuntime;
2329+
#ctx: ActorContextHandle;
2330+
#schemas: NativeValidationConfig;
2331+
2332+
constructor(
2333+
runtime: CoreRuntime,
2334+
ctx: ActorContextHandle,
2335+
schemas: NativeValidationConfig,
2336+
) {
2337+
this.#runtime = runtime;
2338+
this.#ctx = ctx;
2339+
this.#schemas = schemas;
2340+
}
2341+
2342+
#connToAdapter(conn: ConnHandle): NativeConnAdapter {
2343+
return new NativeConnAdapter(
2344+
this.#runtime,
2345+
conn,
2346+
this.#schemas,
2347+
this.#ctx,
2348+
(connId) =>
2349+
callNativeSync(() =>
2350+
this.#runtime.actorQueueHibernationRemoval(
2351+
this.#ctx,
2352+
connId,
2353+
),
2354+
),
2355+
);
2356+
}
2357+
2358+
get size(): number {
2359+
return callNativeSync(() => this.#runtime.actorConns(this.#ctx)).length;
2360+
}
2361+
2362+
get(key: string): NativeConnAdapter | undefined {
2363+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2364+
const conn = conns.find(
2365+
(c) => this.#runtime.connId(c) === key,
2366+
);
2367+
if (!conn) return undefined;
2368+
return this.#connToAdapter(conn);
2369+
}
2370+
2371+
has(key: string): boolean {
2372+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2373+
return conns.some((c) => this.#runtime.connId(c) === key);
2374+
}
2375+
2376+
keys(): MapIterator<string> {
2377+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2378+
return conns.map((c) => this.#runtime.connId(c))[Symbol.iterator]() satisfies MapIterator<string>;
2379+
}
2380+
2381+
values(): MapIterator<NativeConnAdapter> {
2382+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2383+
return conns.map((c) => this.#connToAdapter(c))[Symbol.iterator]() satisfies MapIterator<NativeConnAdapter>;
2384+
}
2385+
2386+
entries(): MapIterator<[string, NativeConnAdapter]> {
2387+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2388+
return conns.map(
2389+
(c) => [this.#runtime.connId(c), this.#connToAdapter(c)] as [string, NativeConnAdapter],
2390+
)[Symbol.iterator]() satisfies MapIterator<[string, NativeConnAdapter]>;
2391+
}
2392+
2393+
forEach(
2394+
callback: (value: NativeConnAdapter, key: string, map: ReadonlyMap<string, NativeConnAdapter>) => void,
2395+
thisArg?: unknown,
2396+
): void {
2397+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2398+
for (const conn of conns) {
2399+
const id = this.#runtime.connId(conn);
2400+
callback.call(thisArg, this.#connToAdapter(conn), id, this);
2401+
}
2402+
}
2403+
2404+
[Symbol.iterator](): MapIterator<[string, NativeConnAdapter]> {
2405+
return this.entries();
2406+
}
2407+
2408+
readonly [Symbol.toStringTag] = "NativeConnectionMap";
2409+
}
2410+
23272411
export class ActorContextHandleAdapter {
23282412
#runtime: CoreRuntime;
23292413
#ctx: ActorContextHandle;
@@ -2332,6 +2416,7 @@ export class ActorContextHandleAdapter {
23322416
#abortSignalCleanup?: () => void;
23332417
#client?: AnyClient;
23342418
#clientFactory?: () => AnyClient;
2419+
#connMap?: NativeConnectionMap;
23352420
#databaseProvider?: Exclude<AnyDatabaseProvider, undefined>;
23362421
#db?: unknown;
23372422
#dispatchCancelToken?: CancellationTokenHandle;
@@ -2497,27 +2582,11 @@ export class ActorContextHandleAdapter {
24972582
return callNativeSync(() => this.#runtime.actorRegion(this.#ctx));
24982583
}
24992584

2500-
get conns(): Map<string, NativeConnAdapter> {
2501-
return new Map(
2502-
callNativeSync(() => this.#runtime.actorConns(this.#ctx)).map(
2503-
(conn) => [
2504-
this.#runtime.connId(conn),
2505-
new NativeConnAdapter(
2506-
this.#runtime,
2507-
conn,
2508-
this.#schemas,
2509-
this.#ctx,
2510-
(connId) =>
2511-
callNativeSync(() =>
2512-
this.#runtime.actorQueueHibernationRemoval(
2513-
this.#ctx,
2514-
connId,
2515-
),
2516-
),
2517-
),
2518-
],
2519-
),
2520-
);
2585+
get conns(): ReadonlyMap<string, NativeConnAdapter> {
2586+
if (!this.#connMap) {
2587+
this.#connMap = new NativeConnectionMap(this.#runtime, this.#ctx, this.#schemas);
2588+
}
2589+
return this.#connMap;
25212590
}
25222591

25232592
get log() {

0 commit comments

Comments
 (0)