Skip to content

Commit e4d176d

Browse files
committed
feat(rivetkit): support gateway bypass client requests
1 parent b0ace70 commit e4d176d

9 files changed

Lines changed: 171 additions & 20 deletions

File tree

rivetkit-typescript/packages/rivetkit/src/client/actor-common.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,18 @@ export type ActorActionFunction<
3131
...args: Args extends [unknown, ...infer Rest] ? Rest : Args
3232
) => Promise<Response>;
3333

34+
export interface ActorGatewayOptions {
35+
bypassConnectable?: boolean;
36+
}
37+
38+
export interface ActorFetchInit extends RequestInit {
39+
gateway?: ActorGatewayOptions;
40+
}
41+
42+
export interface ActorWebSocketOptions {
43+
gateway?: ActorGatewayOptions;
44+
}
45+
3446
/**
3547
* Maps action methods from actor definition to typed function signatures.
3648
*/

rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import { decodeCborCompat, deserializeWithEncoding, encodeCborCompat } from "@/s
2626
import { bufferToArrayBuffer } from "@/utils";
2727
import type {
2828
ActorDefinitionActions,
29+
ActorFetchInit,
2930
ActorDefinitionQueueSend,
31+
ActorWebSocketOptions,
3032
} from "./actor-common";
3133
import { type ActorConn, ActorConnRaw } from "./actor-conn";
3234
import {
@@ -575,16 +577,17 @@ export class ActorHandleRaw {
575577
* Fetches a resource from this actor via the /request endpoint. This is a
576578
* convenience wrapper around the raw HTTP API.
577579
*/
578-
fetch(input: string | URL | Request, init?: RequestInit) {
580+
fetch(input: string | URL | Request, init?: ActorFetchInit) {
579581
return this.#fetchWithResolvedActor(input, init);
580582
}
581583

582584
async #fetchWithResolvedActor(
583585
input: string | URL | Request,
584-
init?: RequestInit,
586+
init?: ActorFetchInit,
585587
) {
586588
const maxAttempts = this.#getDynamicQueryMaxAttempts();
587589
let useQueryTarget = false;
590+
const { gateway, ...requestInit } = init ?? {};
588591

589592
for (let attempt = 0; attempt < maxAttempts; attempt++) {
590593
let actorId: string | undefined;
@@ -596,7 +599,8 @@ export class ActorHandleRaw {
596599
target,
597600
this.#params,
598601
input,
599-
init,
602+
requestInit,
603+
gateway,
600604
);
601605
const retry = await this.#shouldRetryRawFetchResponse(
602606
response,
@@ -783,14 +787,22 @@ export class ActorHandleRaw {
783787
/**
784788
* Opens a raw WebSocket connection to this actor.
785789
*/
786-
async webSocket(path?: string, protocols?: string | string[]) {
790+
async webSocket(
791+
path?: string,
792+
protocols?: string | string[],
793+
options: ActorWebSocketOptions = {},
794+
) {
787795
const params = await this.#resolveConnectionParams();
796+
const target = options.gateway?.bypassConnectable
797+
? await this.#resolveActionTarget(false)
798+
: getGatewayTarget(this.#actorResolutionState);
788799
return await rawWebSocket(
789800
this.#driver,
790-
getGatewayTarget(this.#actorResolutionState),
801+
target,
791802
params,
792803
path,
793804
protocols,
805+
options.gateway,
794806
);
795807
}
796808

rivetkit-typescript/packages/rivetkit/src/client/raw-utils.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { deconstructError } from "@/common/utils";
33
import {
44
type GatewayTarget,
55
type EngineControlClient,
6+
type GatewayRequestOptions,
67
} from "@/engine-client/driver";
78
import { HEADER_CONN_PARAMS } from "@/common/actor-router-consts";
89
import { ActorError } from "./errors";
@@ -17,6 +18,7 @@ export async function rawHttpFetch(
1718
params: unknown,
1819
input: string | URL | Request,
1920
init?: RequestInit,
21+
options: GatewayRequestOptions = {},
2022
): Promise<Response> {
2123
// Extract path and merge init options
2224
let path: string;
@@ -91,7 +93,7 @@ export async function rawHttpFetch(
9193
headers: proxyRequestHeaders,
9294
});
9395

94-
return driver.sendRequest(target, proxyRequest);
96+
return driver.sendRequest(target, proxyRequest, options);
9597
} catch (err) {
9698
// Standardize to ClientActorError instead of the native backend error
9799
const { group, code, message, metadata } = deconstructError(
@@ -114,6 +116,7 @@ export async function rawWebSocket(
114116
path?: string,
115117
// TODO: Supportp rotocols
116118
_protocols?: string | string[],
119+
options: GatewayRequestOptions = {},
117120
): Promise<any> {
118121
// TODO: Do we need encoding in rawWebSocket?
119122
const encoding = "bare";
@@ -145,7 +148,13 @@ export async function rawWebSocket(
145148
});
146149

147150
// Open WebSocket
148-
const ws = await driver.openWebSocket(fullPath, target, encoding, params);
151+
const ws = await driver.openWebSocket(
152+
fullPath,
153+
target,
154+
encoding,
155+
params,
156+
options,
157+
);
149158

150159
// Node & browser WebSocket types are incompatible
151160
return ws as any;

rivetkit-typescript/packages/rivetkit/src/common/actor-router-consts.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ export const HEADER_RIVET_TOKEN = "x-rivet-token";
2121
export const HEADER_RIVET_TARGET = "x-rivet-target";
2222
export const HEADER_RIVET_ACTOR = "x-rivet-actor";
2323
export const HEADER_RIVET_NAMESPACE = "x-rivet-namespace";
24+
export const HEADER_RIVET_BYPASS_CONNECTABLE =
25+
"x-rivet-bypass-connectable";
2426

2527
// MARK: WebSocket Protocol Prefixes
2628
/** Some servers (such as node-ws & Cloudflare) require explicitly match a certain WebSocket protocol. This gives us a static protocol to match against. */
@@ -30,6 +32,7 @@ export const WS_PROTOCOL_ACTOR = "rivet_actor.";
3032
export const WS_PROTOCOL_ENCODING = "rivet_encoding.";
3133
export const WS_PROTOCOL_CONN_PARAMS = "rivet_conn_params.";
3234
export const WS_PROTOCOL_TOKEN = "rivet_token.";
35+
export const WS_PROTOCOL_BYPASS_CONNECTABLE = "rivet_bypass_connectable";
3336
export const WS_PROTOCOL_TEST_ACK_HOOK = "rivet_test_ack_hook.";
3437

3538
// MARK: WebSocket Inline Test Protocol Prefixes
@@ -51,4 +54,5 @@ export const ALLOWED_PUBLIC_HEADERS = [
5154
HEADER_RIVET_ACTOR,
5255
HEADER_RIVET_NAMESPACE,
5356
HEADER_RIVET_TOKEN,
57+
HEADER_RIVET_BYPASS_CONNECTABLE,
5458
];

rivetkit-typescript/packages/rivetkit/src/engine-client/actor-http-client.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
import type { ClientConfig } from "@/client/config";
2-
import { HEADER_RIVET_TOKEN } from "@/common/actor-router-consts";
2+
import {
3+
HEADER_RIVET_ACTOR,
4+
HEADER_RIVET_BYPASS_CONNECTABLE,
5+
HEADER_RIVET_TARGET,
6+
HEADER_RIVET_TOKEN,
7+
} from "@/common/actor-router-consts";
8+
import type { GatewayRequestOptions } from "./driver";
9+
10+
export interface HttpGatewayRequestOptions extends GatewayRequestOptions {
11+
directActorId?: string;
12+
}
313

414
export async function sendHttpRequestToGateway(
515
runConfig: ClientConfig,
616
gatewayUrl: string,
717
actorRequest: Request,
18+
options: HttpGatewayRequestOptions = {},
819
): Promise<Response> {
920
// Handle body properly based on method and presence
1021
let bodyToSend: ArrayBuffer | null = null;
11-
const guardHeaders = buildGuardHeaders(runConfig, actorRequest);
22+
const guardHeaders = buildGuardHeaders(runConfig, actorRequest, options);
1223

1324
if (actorRequest.method !== "GET" && actorRequest.method !== "HEAD") {
1425
if (actorRequest.bodyUsed) {
@@ -49,6 +60,7 @@ function mutableResponse(fetchRes: Response): Response {
4960
function buildGuardHeaders(
5061
runConfig: ClientConfig,
5162
actorRequest: Request,
63+
options: HttpGatewayRequestOptions,
5264
): Headers {
5365
const headers = new Headers();
5466
// Copy all headers from the original request
@@ -63,5 +75,12 @@ function buildGuardHeaders(
6375
if (runConfig.token) {
6476
headers.set(HEADER_RIVET_TOKEN, runConfig.token);
6577
}
78+
if (options.directActorId !== undefined) {
79+
headers.set(HEADER_RIVET_TARGET, "actor");
80+
headers.set(HEADER_RIVET_ACTOR, options.directActorId);
81+
}
82+
if (options.bypassConnectable) {
83+
headers.set(HEADER_RIVET_BYPASS_CONNECTABLE, "1");
84+
}
6685
return headers;
6786
}

rivetkit-typescript/packages/rivetkit/src/engine-client/actor-websocket-client.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
WS_PROTOCOL_STANDARD as WS_PROTOCOL_RIVETKIT,
99
WS_PROTOCOL_TARGET,
1010
WS_PROTOCOL_ACTOR,
11+
WS_PROTOCOL_BYPASS_CONNECTABLE,
1112
WS_PROTOCOL_TEST_ACK_HOOK,
1213
WS_PROTOCOL_TOKEN,
1314
} from "@/common/actor-router-consts";
@@ -17,6 +18,7 @@ import type { ActorGatewayQuery, CrashPolicy } from "@/client/query";
1718
import type { Encoding, UniversalWebSocket } from "@/mod";
1819
import { encodeCborCompat, uint8ArrayToBase64 } from "@/serde";
1920
import { combineUrlPath } from "@/utils";
21+
import type { GatewayRequestOptions } from "./driver";
2022
import { logger } from "./log";
2123

2224
class BufferedRemoteWebSocket implements UniversalWebSocket {
@@ -211,6 +213,7 @@ export function buildActorQueryGatewayUrl(
211213
maxInputSize = DEFAULT_MAX_QUERY_INPUT_SIZE,
212214
crashPolicy: CrashPolicy | undefined = undefined,
213215
runnerName?: string,
216+
options: GatewayRequestOptions = {},
214217
): string {
215218
if (namespace.length === 0) {
216219
throw new Error("actor query namespace must not be empty");
@@ -266,6 +269,9 @@ export function buildActorQueryGatewayUrl(
266269
if (token !== undefined) {
267270
params.append("rvt-token", token);
268271
}
272+
if (options.bypassConnectable) {
273+
params.append("rvt-bypass_connectable", "true");
274+
}
269275

270276
const queryString = params.toString();
271277
let separator: string;
@@ -318,6 +324,7 @@ export async function openWebSocketToGateway(
318324
gatewayUrl: string,
319325
encoding: Encoding,
320326
params: unknown,
327+
options: GatewayRequestOptions & { directActorId?: string } = {},
321328
): Promise<UniversalWebSocket> {
322329
const WebSocket = await importWebSocket();
323330

@@ -334,7 +341,19 @@ export async function openWebSocketToGateway(
334341
// Create WebSocket connection
335342
const ws = new WebSocket(
336343
gatewayUrl,
337-
buildWebSocketProtocols(runConfig, encoding, params, ackHookToken),
344+
buildWebSocketProtocols(
345+
runConfig,
346+
encoding,
347+
params,
348+
ackHookToken,
349+
options.directActorId
350+
? {
351+
target: "actor",
352+
actorId: options.directActorId,
353+
}
354+
: undefined,
355+
options,
356+
),
338357
);
339358

340359
// The WebSocket is returned before the connection is open. This follows
@@ -364,6 +383,7 @@ export function buildWebSocketProtocols(
364383
target: "actor";
365384
actorId: string;
366385
},
386+
options: GatewayRequestOptions = {},
367387
): string[] {
368388
const protocols: string[] = [];
369389
protocols.push(WS_PROTOCOL_RIVETKIT);
@@ -372,6 +392,9 @@ export function buildWebSocketProtocols(
372392
protocols.push(`${WS_PROTOCOL_TARGET}${target.target}`);
373393
protocols.push(`${WS_PROTOCOL_ACTOR}${target.actorId}`);
374394
}
395+
if (options.bypassConnectable) {
396+
protocols.push(WS_PROTOCOL_BYPASS_CONNECTABLE);
397+
}
375398
if (params) {
376399
protocols.push(
377400
`${WS_PROTOCOL_CONN_PARAMS}${encodeURIComponent(JSON.stringify(params))}`,

rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ import type { ActorQuery, CrashPolicy } from "@/client/query";
66

77
export type GatewayTarget = { directId: string } | ActorQuery;
88

9+
export interface GatewayRequestOptions {
10+
bypassConnectable?: boolean;
11+
}
12+
913
export interface EngineControlClient {
1014
getForId(input: GetForIdInput): Promise<ActorOutput | undefined>;
1115
getWithKey(input: GetWithKeyInput): Promise<ActorOutput | undefined>;
@@ -16,12 +20,14 @@ export interface EngineControlClient {
1620
sendRequest(
1721
target: GatewayTarget,
1822
actorRequest: Request,
23+
options?: GatewayRequestOptions,
1924
): Promise<Response>;
2025
openWebSocket(
2126
path: string,
2227
target: GatewayTarget,
2328
encoding: Encoding,
2429
params: unknown,
30+
options?: GatewayRequestOptions,
2531
): Promise<UniversalWebSocket>;
2632
proxyRequest(
2733
c: HonoContext,
@@ -35,7 +41,10 @@ export interface EngineControlClient {
3541
encoding: Encoding,
3642
params: unknown,
3743
): Promise<Response>;
38-
buildGatewayUrl(target: GatewayTarget): Promise<string>;
44+
buildGatewayUrl(
45+
target: GatewayTarget,
46+
options?: GatewayRequestOptions,
47+
): Promise<string>;
3948
displayInformation(): RuntimeDisplayInformation;
4049
extraStartupLog?: () => Record<string, unknown>;
4150
modifyRuntimeRouter?: (config: RegistryConfig, router: Hono) => void;

0 commit comments

Comments
 (0)