Skip to content

Commit 5014be8

Browse files
committed
feat(rivetkit): add ConnectionMap readonly Map wrapper for actor connections
1 parent ccf09c6 commit 5014be8

2 files changed

Lines changed: 87 additions & 24 deletions

File tree

  • rivetkit-rust/packages/rivetkit-core/src/actor
  • rivetkit-typescript/packages/rivetkit/src/registry

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ use crate::time::{Instant, SystemTime, UNIX_EPOCH};
1010
use anyhow::{Context as AnyhowContext, Result};
1111
use futures::future::BoxFuture;
1212
use parking_lot::{Mutex, RwLock};
13-
use rivet_error::ActorSpecifier;
1413
use rivet_envoy_client::handle::EnvoyHandle;
1514
use rivet_envoy_client::tunnel::HibernatingWebSocketMetadata;
15+
use rivet_error::ActorSpecifier;
1616
use scc::HashMap as SccHashMap;
1717
use tokio::runtime::Handle;
1818
use tokio::sync::{Mutex as AsyncMutex, Notify, OnceCell, broadcast, mpsc, oneshot};

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 86 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,7 @@ async function callNative<T>(invoke: () => Promise<T>): Promise<T> {
658658
}
659659
}
660660

661-
function callNativeSync<T>(invoke: () => T): T {
661+
export function callNativeSync<T>(invoke: () => T): T {
662662
try {
663663
return invoke();
664664
} catch (error) {
@@ -1153,7 +1153,7 @@ function toActorKey(
11531153
);
11541154
}
11551155

1156-
class NativeConnAdapter {
1156+
export class NativeConnAdapter {
11571157
#runtime: CoreRuntime;
11581158
#conn: ConnHandle;
11591159
#schemas: NativeValidationConfig;
@@ -2315,6 +2315,88 @@ class TrackedWebSocketHandleAdapter implements UniversalWebSocket {
23152315
}
23162316
}
23172317

2318+
class NativeConnectionMap implements ReadonlyMap<string, NativeConnAdapter> {
2319+
#runtime: CoreRuntime;
2320+
#ctx: ActorContextHandle;
2321+
#schemas: NativeValidationConfig;
2322+
2323+
constructor(
2324+
runtime: CoreRuntime,
2325+
ctx: ActorContextHandle,
2326+
schemas: NativeValidationConfig,
2327+
) {
2328+
this.#runtime = runtime;
2329+
this.#ctx = ctx;
2330+
this.#schemas = schemas;
2331+
}
2332+
2333+
#connToAdapter(conn: ConnHandle): NativeConnAdapter {
2334+
return new NativeConnAdapter(
2335+
this.#runtime,
2336+
conn,
2337+
this.#schemas,
2338+
this.#ctx,
2339+
(connId) =>
2340+
callNativeSync(() =>
2341+
this.#runtime.actorQueueHibernationRemoval(
2342+
this.#ctx,
2343+
connId,
2344+
),
2345+
),
2346+
);
2347+
}
2348+
2349+
get size(): number {
2350+
return callNativeSync(() => this.#runtime.actorConns(this.#ctx)).length;
2351+
}
2352+
2353+
get(key: string): NativeConnAdapter | undefined {
2354+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2355+
const conn = conns.find(
2356+
(c) => this.#runtime.connId(c) === key,
2357+
);
2358+
if (!conn) return undefined;
2359+
return this.#connToAdapter(conn);
2360+
}
2361+
2362+
has(key: string): boolean {
2363+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2364+
return conns.some((c) => this.#runtime.connId(c) === key);
2365+
}
2366+
2367+
keys(): MapIterator<string> {
2368+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2369+
return conns.map((c) => this.#runtime.connId(c))[Symbol.iterator]() as MapIterator<string>;
2370+
}
2371+
2372+
values(): MapIterator<NativeConnAdapter> {
2373+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2374+
return conns.map((c) => this.#connToAdapter(c))[Symbol.iterator]() as MapIterator<NativeConnAdapter>;
2375+
}
2376+
2377+
entries(): MapIterator<[string, NativeConnAdapter]> {
2378+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2379+
return conns.map(
2380+
(c) => [this.#runtime.connId(c), this.#connToAdapter(c)] as [string, NativeConnAdapter],
2381+
)[Symbol.iterator]() as MapIterator<[string, NativeConnAdapter]>;
2382+
}
2383+
2384+
forEach(
2385+
callback: (value: NativeConnAdapter, key: string, map: ReadonlyMap<string, NativeConnAdapter>) => void,
2386+
thisArg?: unknown,
2387+
): void {
2388+
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
2389+
for (const conn of conns) {
2390+
const id = this.#runtime.connId(conn);
2391+
callback.call(thisArg, this.#connToAdapter(conn), id, this);
2392+
}
2393+
}
2394+
2395+
[Symbol.iterator](): MapIterator<[string, NativeConnAdapter]> {
2396+
return this.entries();
2397+
}
2398+
}
2399+
23182400
export class ActorContextHandleAdapter {
23192401
#runtime: CoreRuntime;
23202402
#ctx: ActorContextHandle;
@@ -2503,27 +2585,8 @@ export class ActorContextHandleAdapter {
25032585
return callNativeSync(() => this.#runtime.actorRegion(this.#ctx));
25042586
}
25052587

2506-
get conns(): Map<string, NativeConnAdapter> {
2507-
return new Map(
2508-
callNativeSync(() => this.#runtime.actorConns(this.#ctx)).map(
2509-
(conn) => [
2510-
this.#runtime.connId(conn),
2511-
new NativeConnAdapter(
2512-
this.#runtime,
2513-
conn,
2514-
this.#schemas,
2515-
this.#ctx,
2516-
(connId) =>
2517-
callNativeSync(() =>
2518-
this.#runtime.actorQueueHibernationRemoval(
2519-
this.#ctx,
2520-
connId,
2521-
),
2522-
),
2523-
),
2524-
],
2525-
),
2526-
);
2588+
get conns(): NativeConnectionMap {
2589+
return new NativeConnectionMap(this.#runtime, this.#ctx, this.#schemas);
25272590
}
25282591

25292592
get log() {

0 commit comments

Comments
 (0)