Skip to content

Commit afdf798

Browse files
committed
fix(inspector): stream patched state updates
1 parent ccd2fc4 commit afdf798

3 files changed

Lines changed: 193 additions & 4 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1895,7 +1895,10 @@ impl ActorTask {
18951895
save_request_revision,
18961896
"actor serializeState completed"
18971897
);
1898-
self.broadcast_inspector_overlay(&deltas);
1898+
// Skip the overlay broadcast on the save path. save_state_with_revision
1899+
// triggers record_state_updated after persist, which the inspector
1900+
// websocket signal subscriber forwards as StateUpdated. Broadcasting
1901+
// the overlay here too would deliver a duplicate message.
18991902
if let Err(error) = self
19001903
.ctx
19011904
.save_state_with_revision(deltas, save_request_revision)

rivetkit-rust/packages/rivetkit-core/src/registry/inspector_ws.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,10 @@ impl RegistryDispatcher {
193193
on_open_instance
194194
.inspector
195195
.subscribe(Arc::new(move |signal| {
196-
if signal == InspectorSignal::StateUpdated {
197-
return;
198-
}
196+
// Keep forwarding persisted StateUpdated signals here.
197+
// Overlay broadcasts still carry unsaved in-memory state,
198+
// but explicit inspector PATCH saves only emit the
199+
// InspectorSignal path after the write completes.
199200
let dispatcher = listener_dispatcher.clone();
200201
let instance = listener_instance.clone();
201202
let sender = listener_sender.clone();

rivetkit-typescript/packages/rivetkit/tests/driver/actor-inspector.test.ts

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1+
import * as cbor from "cbor-x";
12
import { describe, expect, test, vi } from "vitest";
3+
import {
4+
CURRENT_VERSION as INSPECTOR_PROTOCOL_VERSION,
5+
TO_CLIENT_VERSIONED,
6+
TO_SERVER_VERSIONED,
7+
} from "../../src/inspector/client.browser";
28
import { describeDriverMatrix } from "./shared-matrix";
39
import { setupDriverTest, waitFor } from "./shared-utils";
410

@@ -92,6 +98,117 @@ function buildInspectorUrl(
9298
return url.toString();
9399
}
94100

101+
function buildInspectorWebSocketUrl(gatewayUrl: string): string {
102+
const url = new URL(buildInspectorUrl(gatewayUrl, "/inspector/connect"));
103+
url.protocol = url.protocol === "https:" ? "wss:" : "ws:";
104+
return url.toString();
105+
}
106+
107+
async function toBinaryPayload(data: Blob | ArrayBuffer | Buffer | string) {
108+
if (typeof data === "string") {
109+
return new TextEncoder().encode(data);
110+
}
111+
if (data instanceof ArrayBuffer) {
112+
return new Uint8Array(data);
113+
}
114+
if (data instanceof Blob) {
115+
return new Uint8Array(await data.arrayBuffer());
116+
}
117+
return new Uint8Array(data);
118+
}
119+
120+
type InspectorMessage = ReturnType<
121+
typeof TO_CLIENT_VERSIONED.deserializeWithEmbeddedVersion
122+
>;
123+
124+
async function waitForInspectorMessage(
125+
ws: WebSocket,
126+
timeoutMs = 10_000,
127+
predicate?: (message: InspectorMessage) => boolean,
128+
) {
129+
return await new Promise<InspectorMessage>((resolve, reject) => {
130+
const timeout = setTimeout(() => {
131+
cleanup();
132+
reject(new Error("Inspector websocket message timed out"));
133+
}, timeoutMs);
134+
135+
const cleanup = () => {
136+
clearTimeout(timeout);
137+
ws.removeEventListener("message", onMessage);
138+
ws.removeEventListener("error", onError);
139+
};
140+
141+
const onError = () => {
142+
cleanup();
143+
reject(new Error("Inspector websocket errored"));
144+
};
145+
146+
const onMessage = async (event: MessageEvent) => {
147+
try {
148+
const payload = await toBinaryPayload(
149+
event.data as Blob | ArrayBuffer | Buffer | string,
150+
);
151+
const decoded =
152+
TO_CLIENT_VERSIONED.deserializeWithEmbeddedVersion(payload);
153+
if (predicate && !predicate(decoded)) {
154+
return;
155+
}
156+
cleanup();
157+
resolve(decoded);
158+
} catch (error) {
159+
cleanup();
160+
reject(error);
161+
}
162+
};
163+
164+
ws.addEventListener("message", onMessage);
165+
ws.addEventListener("error", onError);
166+
});
167+
}
168+
169+
async function waitForInspectorMessageWithTag<
170+
T extends InspectorMessage["body"]["tag"],
171+
>(
172+
ws: WebSocket,
173+
tag: T,
174+
timeoutMs = 10_000,
175+
): Promise<Extract<InspectorMessage, { body: { tag: T } }>> {
176+
const message = await waitForInspectorMessage(
177+
ws,
178+
timeoutMs,
179+
(candidate) => candidate.body.tag === tag,
180+
);
181+
return message as Extract<InspectorMessage, { body: { tag: T } }>;
182+
}
183+
184+
async function waitForInspectorOpen(ws: WebSocket, timeoutMs = 10_000) {
185+
await new Promise<void>((resolve, reject) => {
186+
const timeout = setTimeout(() => {
187+
cleanup();
188+
reject(new Error("Inspector websocket open timed out"));
189+
}, timeoutMs);
190+
191+
const cleanup = () => {
192+
clearTimeout(timeout);
193+
ws.removeEventListener("open", onOpen);
194+
ws.removeEventListener("error", onError);
195+
};
196+
197+
const onOpen = () => {
198+
cleanup();
199+
resolve();
200+
};
201+
202+
const onError = () => {
203+
cleanup();
204+
reject(new Error("Inspector websocket failed to open"));
205+
};
206+
207+
ws.addEventListener("open", onOpen);
208+
ws.addEventListener("error", onError);
209+
});
210+
}
211+
95212
function isActorStoppingDbError(error: unknown): boolean {
96213
return (
97214
error instanceof Error &&
@@ -154,6 +271,74 @@ describeDriverMatrix("Actor Inspector", (driverTestConfig) => {
154271
expect(count).toBe(42);
155272
});
156273

274+
test("PATCH /inspector/state pushes StateUpdated over inspector websocket", async (c) => {
275+
const { client } = await setupDriverTest(c, driverTestConfig);
276+
const handle = client.counter.getOrCreate([
277+
"inspector-state-websocket-patch",
278+
]);
279+
280+
await handle.increment(5);
281+
282+
const gatewayUrl = await handle.getGatewayUrl();
283+
const ws = new WebSocket(buildInspectorWebSocketUrl(gatewayUrl), [
284+
"rivet",
285+
"rivet_inspector_token.token",
286+
]);
287+
ws.binaryType = "arraybuffer";
288+
289+
try {
290+
await waitForInspectorOpen(ws);
291+
292+
await waitForInspectorMessageWithTag(ws, "Init");
293+
294+
ws.send(
295+
TO_SERVER_VERSIONED.serializeWithEmbeddedVersion(
296+
{
297+
body: {
298+
tag: "PatchStateRequest",
299+
val: {
300+
state: new Uint8Array(
301+
cbor.encode({ count: 42 }),
302+
).buffer,
303+
},
304+
},
305+
},
306+
INSPECTOR_PROTOCOL_VERSION,
307+
),
308+
);
309+
310+
const stateUpdated = await waitForInspectorMessageWithTag(
311+
ws,
312+
"StateUpdated",
313+
);
314+
expect(
315+
cbor.decode(new Uint8Array(stateUpdated.body.val.state)),
316+
).toEqual({ count: 42 });
317+
318+
ws.send(
319+
TO_SERVER_VERSIONED.serializeWithEmbeddedVersion(
320+
{
321+
body: {
322+
tag: "StateRequest",
323+
val: { id: 1n },
324+
},
325+
},
326+
INSPECTOR_PROTOCOL_VERSION,
327+
),
328+
);
329+
330+
const stateResponse = await waitForInspectorMessageWithTag(
331+
ws,
332+
"StateResponse",
333+
);
334+
expect(
335+
cbor.decode(new Uint8Array(stateResponse.body.val.state!)),
336+
).toEqual({ count: 42 });
337+
} finally {
338+
ws.close();
339+
}
340+
});
341+
157342
test("GET /inspector/connections returns connections list", async (c) => {
158343
const { client } = await setupDriverTest(c, driverTestConfig);
159344
const handle = client.counter.getOrCreate([

0 commit comments

Comments
 (0)