Skip to content

Commit 059a77d

Browse files
committed
feat(rivetkit): support gateway bypass client overrides
1 parent 29f3416 commit 059a77d

9 files changed

Lines changed: 343 additions & 40 deletions

File tree

rivetkit-typescript/packages/rivetkit/src/client/actor-common.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,27 @@ export interface ActorGatewayOptions {
3535
bypassConnectable?: boolean;
3636
}
3737

38+
export type ResolvedActorGatewayOptions = Required<ActorGatewayOptions>;
39+
40+
export function resolveActorGatewayOptions(
41+
defaults: ActorGatewayOptions = {},
42+
overrides?: ActorGatewayOptions,
43+
): ResolvedActorGatewayOptions {
44+
return {
45+
bypassConnectable:
46+
overrides?.bypassConnectable ?? defaults.bypassConnectable ?? false,
47+
};
48+
}
49+
50+
export interface ActorActionOptions {
51+
gateway?: ActorGatewayOptions;
52+
signal?: AbortSignal;
53+
}
54+
55+
export interface ActorConnectOptions {
56+
gateway?: ActorGatewayOptions;
57+
}
58+
3859
export interface ActorFetchInit extends RequestInit {
3960
gateway?: ActorGatewayOptions;
4061
}

rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ import type {
3434
ActorDefinitionActions,
3535
ActorDefinitionEventSubscriptions,
3636
ActorDefinitionQueueSend,
37+
ActorGatewayOptions,
38+
ResolvedActorGatewayOptions,
3739
} from "./actor-common";
40+
import { resolveActorGatewayOptions } from "./actor-common";
3841
import {
3942
type ActorResolutionState,
4043
checkForSchedulingError,
@@ -53,6 +56,7 @@ import {
5356
type QueueSendResult,
5457
type QueueSendWaitOptions,
5558
} from "./queue";
59+
import { resolveGatewayTarget } from "./resolve-gateway-target";
5660
import {
5761
type WebSocketMessage as ConnMessage,
5862
messageLength,
@@ -186,6 +190,7 @@ export class ActorConnRaw {
186190
#getParams?: () => Promise<unknown>;
187191
#encoding: Encoding;
188192
#actorResolutionState: ActorResolutionState;
193+
#gatewayOptions: ResolvedActorGatewayOptions;
189194

190195
// TODO: ws message queue
191196

@@ -203,13 +208,15 @@ export class ActorConnRaw {
203208
getParams: (() => Promise<unknown>) | undefined,
204209
encoding: Encoding,
205210
actorResolutionState: ActorResolutionState,
211+
gatewayOptions: ActorGatewayOptions = {},
206212
) {
207213
this.#client = client;
208214
this.#driver = driver;
209215
this.#params = params;
210216
this.#getParams = getParams;
211217
this.#encoding = encoding;
212218
this.#actorResolutionState = actorResolutionState;
219+
this.#gatewayOptions = resolveActorGatewayOptions(gatewayOptions);
213220
this.#readyPromise = promiseWithResolvers((reason) =>
214221
logger().warn({
215222
msg: "unhandled ready promise rejection",
@@ -225,6 +232,7 @@ export class ActorConnRaw {
225232
return await this.#driver.sendRequest(
226233
getGatewayTarget(this.#actorResolutionState),
227234
request,
235+
this.#gatewayOptions,
228236
);
229237
},
230238
});
@@ -570,12 +578,15 @@ export class ActorConnRaw {
570578

571579
async #connectWebSocket() {
572580
const params = await this.#resolveConnectionParams();
573-
const target = getGatewayTarget(this.#actorResolutionState);
581+
const target = this.#gatewayOptions.bypassConnectable
582+
? await this.#resolveGatewayTargetForBypass()
583+
: getGatewayTarget(this.#actorResolutionState);
574584
const ws = await this.#driver.openWebSocket(
575585
PATH_CONNECT,
576586
target,
577587
this.#encoding,
578588
params,
589+
this.#gatewayOptions,
579590
);
580591
invariant(ws, "websocket should have been created");
581592
logger().debug({
@@ -623,6 +634,25 @@ export class ActorConnRaw {
623634
});
624635
}
625636

637+
async #resolveGatewayTargetForBypass() {
638+
if ("getForId" in this.#actorResolutionState) {
639+
return {
640+
directId: this.#actorResolutionState.getForId.actorId,
641+
} as const;
642+
}
643+
644+
if (this.#actorId) {
645+
return { directId: this.#actorId } as const;
646+
}
647+
648+
return {
649+
directId: await resolveGatewayTarget(
650+
this.#driver,
651+
this.#actorResolutionState,
652+
),
653+
} as const;
654+
}
655+
626656
/** Called by the onopen event from drivers. */
627657
#handleOnOpen() {
628658
// Connection was disposed before Init message arrived - close the websocket to avoid leak

rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@ import type { EngineControlClient } from "@/engine-client/driver";
2525
import { decodeCborCompat, deserializeWithEncoding, encodeCborCompat } from "@/serde";
2626
import { bufferToArrayBuffer } from "@/utils";
2727
import type {
28+
ActorActionOptions,
29+
ActorConnectOptions,
2830
ActorDefinitionActions,
2931
ActorFetchInit,
3032
ActorDefinitionQueueSend,
33+
ActorGatewayOptions,
3134
ActorWebSocketOptions,
3235
} from "./actor-common";
36+
import { resolveActorGatewayOptions } from "./actor-common";
3337
import { type ActorConn, ActorConnRaw } from "./actor-conn";
3438
import {
3539
type ActorResolutionState,
@@ -65,6 +69,7 @@ export class ActorHandleRaw {
6569
#driver: EngineControlClient;
6670
#encoding: Encoding;
6771
#actorResolutionState: ActorResolutionState;
72+
#gatewayOptions: ActorGatewayOptions;
6873
#params: unknown;
6974
#getParams?: () => Promise<unknown>;
7075
#resolvedActorId?: string;
@@ -85,11 +90,13 @@ export class ActorHandleRaw {
8590
getParams: (() => Promise<unknown>) | undefined,
8691
encoding: Encoding,
8792
actorResolutionState: ActorResolutionState,
93+
gatewayOptions: ActorGatewayOptions = {},
8894
) {
8995
this.#client = client;
9096
this.#driver = driver;
9197
this.#encoding = encoding;
9298
this.#actorResolutionState = actorResolutionState;
99+
this.#gatewayOptions = gatewayOptions;
93100
this.#params = params;
94101
this.#getParams = getParams;
95102
}
@@ -139,7 +146,13 @@ export class ActorHandleRaw {
139146
encoding: this.#encoding,
140147
params: this.#params,
141148
customFetch: async (request: Request) => {
142-
return await this.#driver.sendRequest(target, request);
149+
return await this.#driver.sendRequest(
150+
target,
151+
request,
152+
resolveActorGatewayOptions(
153+
this.#gatewayOptions,
154+
),
155+
);
143156
},
144157
}).send(name, body, options as any);
145158
} catch (err) {
@@ -224,8 +237,7 @@ export class ActorHandleRaw {
224237
>(opts: {
225238
name: string;
226239
args: Args;
227-
signal?: AbortSignal;
228-
}): Promise<Response> {
240+
} & ActorActionOptions): Promise<Response> {
229241
if (
230242
typeof opts === "string" ||
231243
typeof opts !== "object" ||
@@ -247,10 +259,13 @@ export class ActorHandleRaw {
247259
async #sendActionNow(opts: {
248260
name: string;
249261
args: unknown[];
250-
signal?: AbortSignal;
251-
}): Promise<unknown> {
262+
} & ActorActionOptions): Promise<unknown> {
252263
const maxAttempts = this.#getDynamicQueryMaxAttempts();
253264
let useQueryTarget = false;
265+
const gatewayOptions = resolveActorGatewayOptions(
266+
this.#gatewayOptions,
267+
opts.gateway,
268+
);
254269

255270
for (let attempt = 0; attempt < maxAttempts; attempt++) {
256271
let actorId: string | undefined;
@@ -294,10 +309,12 @@ export class ActorHandleRaw {
294309
},
295310
body: opts.args,
296311
encoding: this.#encoding,
297-
customFetch: this.#driver.sendRequest.bind(
298-
this.#driver,
299-
target,
300-
),
312+
customFetch: async (request) =>
313+
await this.#driver.sendRequest(
314+
target,
315+
request,
316+
gatewayOptions,
317+
),
301318
signal: opts?.signal,
302319
requestVersion: CLIENT_PROTOCOL_CURRENT_VERSION,
303320
requestVersionedDataHandler: HTTP_ACTION_REQUEST_VERSIONED,
@@ -550,7 +567,10 @@ export class ActorHandleRaw {
550567
* @template AD The actor class that this connection is for.
551568
* @returns {ActorConn<AD>} A connection to the actor.
552569
*/
553-
connect(params?: unknown): ActorConn<AnyActorDefinition> {
570+
connect(
571+
params?: unknown,
572+
options: ActorConnectOptions = {},
573+
): ActorConn<AnyActorDefinition> {
554574
logger().debug({
555575
msg: "establishing connection from handle",
556576
query: this.#actorResolutionState,
@@ -566,6 +586,7 @@ export class ActorHandleRaw {
566586
getParams,
567587
this.#encoding,
568588
this.#actorResolutionState,
589+
resolveActorGatewayOptions(this.#gatewayOptions, options.gateway),
569590
);
570591

571592
return this.#client[CREATE_ACTOR_CONN_PROXY](
@@ -588,6 +609,10 @@ export class ActorHandleRaw {
588609
const maxAttempts = this.#getDynamicQueryMaxAttempts();
589610
let useQueryTarget = false;
590611
const { gateway, ...requestInit } = init ?? {};
612+
const gatewayOptions = resolveActorGatewayOptions(
613+
this.#gatewayOptions,
614+
gateway,
615+
);
591616

592617
for (let attempt = 0; attempt < maxAttempts; attempt++) {
593618
let actorId: string | undefined;
@@ -600,7 +625,7 @@ export class ActorHandleRaw {
600625
this.#params,
601626
input,
602627
requestInit,
603-
gateway,
628+
gatewayOptions,
604629
);
605630
const retry = await this.#shouldRetryRawFetchResponse(
606631
response,
@@ -793,7 +818,11 @@ export class ActorHandleRaw {
793818
options: ActorWebSocketOptions = {},
794819
) {
795820
const params = await this.#resolveConnectionParams();
796-
const target = options.gateway?.bypassConnectable
821+
const gatewayOptions = resolveActorGatewayOptions(
822+
this.#gatewayOptions,
823+
options.gateway,
824+
);
825+
const target = gatewayOptions.bypassConnectable
797826
? await this.#resolveActionTarget(false)
798827
: getGatewayTarget(this.#actorResolutionState);
799828
return await rawWebSocket(
@@ -802,7 +831,7 @@ export class ActorHandleRaw {
802831
params,
803832
path,
804833
protocols,
805-
options.gateway,
834+
gatewayOptions,
806835
);
807836
}
808837

@@ -828,6 +857,7 @@ export class ActorHandleRaw {
828857
async getGatewayUrl(): Promise<string> {
829858
return await this.#driver.buildGatewayUrl(
830859
getGatewayTarget(this.#actorResolutionState),
860+
this.#gatewayOptions,
831861
);
832862
}
833863

@@ -870,7 +900,7 @@ export type ActorHandle<AD extends AnyActorDefinition> = Omit<
870900
"connect" | "send"
871901
> & {
872902
// Add typed version of ActorConn (instead of using AnyActorDefinition)
873-
connect(params?: unknown): ActorConn<AD>;
903+
connect(params?: unknown, options?: ActorConnectOptions): ActorConn<AD>;
874904
// Resolve method returns the actor ID
875905
resolve(): Promise<string>;
876906
} & ActorDefinitionQueueSend<AD> &

rivetkit-typescript/packages/rivetkit/src/client/client.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import type { AnyActorDefinition } from "@/actor/definition";
2+
import type { ActorQuery } from "@/client/query";
23
import type { Encoding } from "@/common/encoding";
34
import type { EngineControlClient } from "@/engine-client/driver";
4-
import type { ActorQuery } from "@/client/query";
55
import type { Registry } from "@/registry";
6-
import type { ActorActionFunction } from "./actor-common";
6+
import type { ActorActionFunction, ActorGatewayOptions } from "./actor-common";
77
import {
88
type ActorConn,
99
type ActorConnRaw,
@@ -178,17 +178,20 @@ export class ClientRaw {
178178

179179
#driver: EngineControlClient;
180180
#encodingKind: Encoding;
181+
#gatewayOptions: ActorGatewayOptions;
181182

182183
/**
183184
* Creates an instance of Client.
184185
*/
185186
public constructor(
186187
driver: EngineControlClient,
187188
encoding: Encoding | undefined,
189+
gatewayOptions: ActorGatewayOptions = {},
188190
) {
189191
this.#driver = driver;
190192

191193
this.#encodingKind = encoding ?? "bare";
194+
this.#gatewayOptions = gatewayOptions;
192195
}
193196

194197
/**
@@ -382,6 +385,7 @@ export class ClientRaw {
382385
getParams,
383386
this.#encodingKind,
384387
actorQuery,
388+
this.#gatewayOptions,
385389
);
386390
}
387391

@@ -438,9 +442,9 @@ export type AnyClient = Client<Registry<any>>;
438442

439443
export function createClientWithDriver<A extends Registry<any>>(
440444
driver: EngineControlClient,
441-
config: { encoding?: Encoding } = {},
445+
config: { encoding?: Encoding; gateway?: ActorGatewayOptions } = {},
442446
): Client<A> {
443-
const client = new ClientRaw(driver, config.encoding);
447+
const client = new ClientRaw(driver, config.encoding, config.gateway);
444448

445449
// Create proxy for accessing actors by name
446450
return new Proxy(client, {

rivetkit-typescript/packages/rivetkit/src/client/config.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@ export const ClientConfigSchemaBase = z.object({
7070
.optional()
7171
.default(() => ({})),
7272

73+
gateway: z
74+
.object({
75+
bypassConnectable: z.boolean().optional().default(false),
76+
})
77+
.optional()
78+
.default(() => ({ bypassConnectable: false })),
79+
7380
// See RunConfig.getUpgradeWebSocket
7481
//
7582
// This is required in the client config in order to support
@@ -147,6 +154,7 @@ export function convertRegistryConfigToClientConfig(
147154
namespace: config.namespace,
148155
poolName: config.envoy.poolName,
149156
headers: config.headers,
157+
gateway: { bypassConnectable: false },
150158
encoding: "bare",
151159
getUpgradeWebSocket: undefined,
152160
// We don't need health checks for internal clients

0 commit comments

Comments
 (0)