Skip to content

Commit 19a5af5

Browse files
shmuelhizmishmuel hizmiclaude
authored
Add stream replay for late-joining clients (#24)
* Add stream replay support for late-joining clients Streams can now buffer the last N chunks server-side and replay them to newly connected clients (ReplaySubject pattern). This fixes wmux terminals showing blank for clients that connect after process start. API: `stream(z.string(), { replay: 1000 })` Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add stream replay tests: protocol-level integration + browser e2e - Protocol-level (bun:test): verifies stream_replay message delivery, ring buffer capacity, and no-replay passthrough behavior - Browser e2e (Playwright): verifies late-joining client sees terminal history via replay in the wmux dev-server demo - Adds echoform-render as devDependency for integration tests - Adds echoform test step to CI pipeline Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: shmuel hizmi <shmuel@tov.dev> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 954e3d7 commit 19a5af5

16 files changed

Lines changed: 443 additions & 25 deletions

File tree

.github/workflows/ci.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ jobs:
3838
working-directory: packages/wmux-client
3939
run: bun run build
4040

41+
- name: Run echoform tests
42+
working-directory: packages/echoform
43+
run: bun run test
44+
4145
- name: Install Playwright browsers
4246
run: bunx playwright install --with-deps chromium
4347

bun.lock

Lines changed: 17 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

demo/dev-server/test/e2e.test.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ const WMUX_URL = "/#token=test-token&ws=ws://localhost:4220/ws";
77
// Selector for xterm inside the visible (active) tab panel
88
const VISIBLE_XTERM = "[style*='visibility: visible'] .xterm";
99

10+
// xterm-rows contains terminal text as readable DOM nodes
11+
const XTERM_ROWS = "[style*='visibility: visible'] .xterm-rows";
12+
1013
test.describe("Terminal E2E", () => {
1114
test.beforeEach(async ({ page }) => {
1215
await page.goto(WMUX_URL);
@@ -32,3 +35,52 @@ test.describe("Terminal E2E", () => {
3235
await expect(page.getByText("interactive")).toBeVisible({ timeout: 5000 });
3336
});
3437
});
38+
39+
test.describe("Stream Replay E2E", () => {
40+
test("late-joining client receives terminal history", async ({ browser }) => {
41+
// Page 1: connect and ensure counter tab is active
42+
const ctx1 = await browser.newContext();
43+
const page1 = await ctx1.newPage();
44+
await page1.goto(WMUX_URL);
45+
await page1.waitForSelector(VISIBLE_XTERM, { state: "visible", timeout: 15000 });
46+
47+
// Ensure we're on the counter tab (first tab in background category)
48+
await page1.getByText("counter", { exact: true }).click();
49+
await page1.waitForTimeout(500);
50+
51+
// Wait for ticks to accumulate in the replay buffer
52+
await page1.waitForFunction(
53+
(sel: string) => {
54+
const tree = document.querySelector(sel);
55+
return tree?.textContent?.includes("tick") ?? false;
56+
},
57+
XTERM_ROWS,
58+
{ timeout: 10000 },
59+
);
60+
61+
// Page 2: late joiner opens a fresh context (new WebSocket connection)
62+
const ctx2 = await browser.newContext();
63+
const page2 = await ctx2.newPage();
64+
await page2.goto(WMUX_URL);
65+
await page2.waitForSelector(VISIBLE_XTERM, { state: "visible", timeout: 15000 });
66+
67+
// Ensure counter tab is selected on page 2 as well
68+
await page2.getByText("counter", { exact: true }).click();
69+
await page2.waitForTimeout(500);
70+
71+
// The late joiner should see replayed terminal content with "tick"
72+
const hasReplay = await page2.waitForFunction(
73+
(sel: string) => {
74+
const tree = document.querySelector(sel);
75+
return tree?.textContent?.includes("tick") ?? false;
76+
},
77+
XTERM_ROWS,
78+
{ timeout: 10000 },
79+
);
80+
81+
expect(hasReplay).toBeTruthy();
82+
83+
await ctx1.close();
84+
await ctx2.close();
85+
});
86+
});

packages/echoform/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
],
3131
"scripts": {
3232
"typecheck": "tsgo -p .",
33-
"build": "tsgo -p ."
33+
"build": "tsgo -p .",
34+
"test": "bun test tests/"
3435
},
3536
"peerDependencies": {
3637
"react": ">=19.0.0"
@@ -41,6 +42,7 @@
4142
"typed-binary": "^4.3.3"
4243
},
4344
"devDependencies": {
45+
"@playfast/echoform-render": "workspace:*",
4446
"@types/react": "19.0.2",
4547
"react": "19.0.0"
4648
},

packages/echoform/src/client/Client.tsx

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ function Client<TEvents extends Record<string | number, unknown> = Record<string
6565
const [runningViews, setRunningViews] = useState<ReadonlyArray<ExistingSharedViewData>>([]);
6666
const transportRef = useRef(decompileTransport(rawTransport));
6767
const streamListenersRef = useRef<Map<StreamUid, Set<(chunk: SerializableValue) => void>>>(new Map());
68+
const pendingReplayRef = useRef<Map<StreamUid, ReadonlyArray<SerializableValue>>>(new Map());
6869

6970
const createEvent = useCallback((eventUid: EventUid, ...args: ReadonlyArray<SerializableValue>): Promise<SerializableValue> => {
7071
return new Promise((resolve, reject) => {
@@ -106,6 +107,16 @@ function Client<TEvents extends Record<string | number, unknown> = Record<string
106107
newSet.add(listener);
107108
streamListenersRef.current = new Map([...streamListenersRef.current, [streamUid, newSet]]);
108109

110+
const pending = pendingReplayRef.current.get(streamUid);
111+
if (pending) {
112+
const updated = new Map(pendingReplayRef.current);
113+
updated.delete(streamUid);
114+
pendingReplayRef.current = updated;
115+
for (const chunk of pending) {
116+
listener(chunk);
117+
}
118+
}
119+
109120
return () => {
110121
const current = streamListenersRef.current.get(streamUid);
111122
if (!current) return;
@@ -151,11 +162,28 @@ function Client<TEvents extends Record<string | number, unknown> = Record<string
151162
streamListenersRef.current = newMap;
152163
};
153164

165+
const streamReplayHandler = ({ streamUid, chunks }: AppEvents['stream_replay']): void => {
166+
const listeners = streamListenersRef.current.get(streamUid);
167+
if (listeners && listeners.size > 0) {
168+
for (const chunk of chunks) {
169+
for (const listener of listeners) {
170+
listener(chunk);
171+
}
172+
}
173+
} else {
174+
pendingReplayRef.current = new Map([
175+
...pendingReplayRef.current,
176+
[streamUid, chunks],
177+
]);
178+
}
179+
};
180+
154181
const unsubscribeViewsTree = transport.on("update_views_tree", updateViewsTreeHandler);
155182
const unsubscribeUpdateView = transport.on("update_view", updateViewHandler);
156183
const unsubscribeDeleteView = transport.on("delete_view", deleteViewHandler);
157184
const unsubscribeStreamChunk = transport.on("stream_chunk", streamChunkHandler);
158185
const unsubscribeStreamEnd = transport.on("stream_end", streamEndHandler);
186+
const unsubscribeStreamReplay = transport.on("stream_replay", streamReplayHandler);
159187

160188
if (requestViewTreeOnMount) {
161189
transport.emit("request_views_tree");
@@ -167,7 +195,9 @@ function Client<TEvents extends Record<string | number, unknown> = Record<string
167195
unsubscribeDeleteView?.();
168196
unsubscribeStreamChunk?.();
169197
unsubscribeStreamEnd?.();
198+
unsubscribeStreamReplay?.();
170199
streamListenersRef.current = new Map();
200+
pendingReplayRef.current = new Map();
171201
transport.destroy();
172202
};
173203
}, [requestViewTreeOnMount]);

packages/echoform/src/server/App.tsx

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import type {
1313
import type { EventUid, RequestUid, ViewUid, StreamUid, PropName } from "../shared/branded.types";
1414
import { createEventUid, createPropName } from "../shared/branded.types";
1515
import type { StreamEmitterHandle } from "../shared/view-inference";
16+
import type { StreamBufferGetter } from "./contexts";
1617
import { getViewDef } from "../shared/view-builder";
1718
import { ViewFactoryContext } from "../shared/view-factory";
1819
import type { ViewFactory } from "../shared/view-factory";
@@ -263,10 +264,17 @@ function handleViewsTreeRequest(
263264
client: DecompileTransport,
264265
existingSharedViewsRef: React.RefObject<ReadonlyArray<ExistingSharedViewData>>,
265266
clientEventAuthRef: React.RefObject<Map<DecompileTransport, Set<EventUid>>>,
267+
streamBufferRegistryRef: React.RefObject<Map<StreamUid, StreamBufferGetter>>,
266268
): void {
267269
client.emit("update_views_tree", { views: snapshotViews(existingSharedViewsRef.current) });
268270
const allEventUids = existingSharedViewsRef.current.flatMap((v) => collectEventUids(v.props));
269271
clientEventAuthRef.current = new Map([...clientEventAuthRef.current, [client, new Set(allEventUids)]]);
272+
273+
for (const [streamUid, getBuffer] of streamBufferRegistryRef.current) {
274+
const chunks = getBuffer();
275+
if (chunks.length === 0) continue;
276+
client.emit("stream_replay", { streamUid, chunks });
277+
}
270278
}
271279

272280
function createNewSharedView(
@@ -328,6 +336,7 @@ const App = forwardRef<AppHandle, AppProps>(function App({ children, transport,
328336
const clientCleanupMapRef = useRef<Map<AnyTransport, () => void>>(new Map());
329337
const clientEventAuthRef = useRef<Map<DecompileTransport, Set<EventUid>>>(new Map());
330338
const eventChainRef = useRef(Promise.resolve());
339+
const streamBufferRegistryRef = useRef<Map<StreamUid, StreamBufferGetter>>(new Map());
331340
const skipValidationRef = useRef(skipCallbackValidation ?? false);
332341
skipValidationRef.current = skipCallbackValidation ?? false;
333342

@@ -352,11 +361,24 @@ const App = forwardRef<AppHandle, AppProps>(function App({ children, transport,
352361

353362
const broadcastStreamEnd = useCallback((streamUid: StreamUid): void => {
354363
broadcast("stream_end", { streamUid });
364+
const updated = new Map(streamBufferRegistryRef.current);
365+
updated.delete(streamUid);
366+
streamBufferRegistryRef.current = updated;
355367
}, [broadcast]);
356368

369+
const registerStreamBuffer = useCallback((streamUid: StreamUid, getBuffer: StreamBufferGetter): void => {
370+
streamBufferRegistryRef.current = new Map([...streamBufferRegistryRef.current, [streamUid, getBuffer]]);
371+
}, []);
372+
373+
const unregisterStreamBuffer = useCallback((streamUid: StreamUid): void => {
374+
const updated = new Map(streamBufferRegistryRef.current);
375+
updated.delete(streamUid);
376+
streamBufferRegistryRef.current = updated;
377+
}, []);
378+
357379
const registerSocketListener = useCallback((client: DecompileTransport) => {
358380
const cleanReqTree = client.on("request_views_tree", () => {
359-
handleViewsTreeRequest(client, existingSharedViewsRef, clientEventAuthRef);
381+
handleViewsTreeRequest(client, existingSharedViewsRef, clientEventAuthRef, streamBufferRegistryRef);
360382
});
361383

362384
const cleanReqEvent = client.on("request_event", (eventData: AppEvents['request_event']) => {
@@ -433,8 +455,12 @@ const App = forwardRef<AppHandle, AppProps>(function App({ children, transport,
433455
removeEventHandlers(viewEventsRef, deleteSet);
434456
clientEventAuthRef.current = deauthorizeEventUidsForAllClients(clientEventAuthRef.current, deleteSet);
435457

458+
for (const prop of deletedView.props) {
459+
if (prop.type === "stream") unregisterStreamBuffer(prop.uid);
460+
}
461+
436462
broadcast("delete_view", { viewUid: uid });
437-
}, [broadcast]);
463+
}, [broadcast, unregisterStreamBuffer]);
438464

439465
useEffect(() => {
440466
if (!transportIsClient) return;
@@ -469,7 +495,8 @@ const App = forwardRef<AppHandle, AppProps>(function App({ children, transport,
469495
views: snapshotViews(existingSharedViewsRef.current),
470496
addClient, removeClient, updateRunningView, deleteRunningView,
471497
broadcastStreamChunk, broadcastStreamEnd,
472-
}), [addClient, removeClient, updateRunningView, deleteRunningView, broadcastStreamChunk, broadcastStreamEnd]);
498+
registerStreamBuffer, unregisterStreamBuffer,
499+
}), [addClient, removeClient, updateRunningView, deleteRunningView, broadcastStreamChunk, broadcastStreamEnd, registerStreamBuffer, unregisterStreamBuffer]);
473500

474501
const viewFactory = useCallback<ViewFactory>(
475502
(name, props) => <ViewComponent name={name} props={props} />,

0 commit comments

Comments
 (0)