Skip to content

Commit 94d13a2

Browse files
Preserve live stream subscriptions across explicit reconnects (#1972)
1 parent 7a00846 commit 94d13a2

File tree

2 files changed

+114
-1
lines changed

2 files changed

+114
-1
lines changed

apps/web/src/rpc/wsTransport.test.ts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,115 @@ describe("WsTransport", () => {
639639
await transport.dispose();
640640
});
641641

642+
it("re-subscribes live stream listeners after an explicit transport reconnect", async () => {
643+
const transport = createTransport("ws://localhost:3020");
644+
const listener = vi.fn();
645+
const onResubscribe = vi.fn();
646+
647+
const unsubscribe = transport.subscribe(
648+
(client) => client[WS_METHODS.subscribeServerLifecycle]({}),
649+
listener,
650+
{ onResubscribe },
651+
);
652+
653+
await waitFor(() => {
654+
expect(sockets).toHaveLength(1);
655+
});
656+
657+
const firstSocket = getSocket();
658+
firstSocket.open();
659+
660+
await waitFor(() => {
661+
expect(firstSocket.sent).toHaveLength(1);
662+
});
663+
664+
const firstRequest = JSON.parse(firstSocket.sent[0] ?? "{}") as { id: string };
665+
const firstEvent = {
666+
version: 1,
667+
sequence: 1,
668+
type: "welcome",
669+
payload: {
670+
environment: {
671+
environmentId: "environment-local",
672+
label: "Local environment",
673+
platform: { os: "darwin", arch: "arm64" },
674+
serverVersion: "0.0.0-test",
675+
capabilities: { repositoryIdentity: true },
676+
},
677+
cwd: "/tmp/one",
678+
projectName: "one",
679+
},
680+
};
681+
682+
firstSocket.serverMessage(
683+
JSON.stringify({
684+
_tag: "Chunk",
685+
requestId: firstRequest.id,
686+
values: [firstEvent],
687+
}),
688+
);
689+
690+
await waitFor(() => {
691+
expect(listener).toHaveBeenLastCalledWith(firstEvent);
692+
});
693+
694+
await transport.reconnect();
695+
696+
await waitFor(() => {
697+
expect(sockets).toHaveLength(2);
698+
});
699+
700+
const secondSocket = getSocket();
701+
expect(secondSocket).not.toBe(firstSocket);
702+
expect(firstSocket.readyState).toBe(MockWebSocket.CLOSED);
703+
704+
secondSocket.open();
705+
706+
await waitFor(() => {
707+
expect(secondSocket.sent).toHaveLength(1);
708+
});
709+
710+
const secondRequest = JSON.parse(secondSocket.sent[0] ?? "{}") as {
711+
id: string;
712+
tag: string;
713+
};
714+
expect(secondRequest.tag).toBe(WS_METHODS.subscribeServerLifecycle);
715+
expect(secondRequest.id).not.toBe(firstRequest.id);
716+
expect(onResubscribe).toHaveBeenCalledOnce();
717+
718+
const secondEvent = {
719+
version: 1,
720+
sequence: 2,
721+
type: "welcome",
722+
payload: {
723+
environment: {
724+
environmentId: "environment-local",
725+
label: "Local environment",
726+
platform: { os: "darwin", arch: "arm64" },
727+
serverVersion: "0.0.0-test",
728+
capabilities: { repositoryIdentity: true },
729+
},
730+
cwd: "/tmp/two",
731+
projectName: "two",
732+
},
733+
};
734+
735+
secondSocket.serverMessage(
736+
JSON.stringify({
737+
_tag: "Chunk",
738+
requestId: secondRequest.id,
739+
values: [secondEvent],
740+
}),
741+
);
742+
743+
await waitFor(() => {
744+
expect(listener).toHaveBeenLastCalledWith(secondEvent);
745+
});
746+
747+
unsubscribe();
748+
await transport.dispose();
749+
});
750+
642751
it("does not fire onResubscribe when the first stream attempt exits before any value", async () => {
643752
const transport = createTransport("ws://localhost:3020");
644753
const listener = vi.fn();

apps/web/src/rpc/wsTransport.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ export class WsTransport {
121121
return;
122122
}
123123

124+
const session = this.session;
124125
try {
125126
if (hasReceivedValue) {
126127
try {
@@ -130,7 +131,6 @@ export class WsTransport {
130131
}
131132
}
132133

133-
const session = this.session;
134134
const runningStream = this.runStreamOnSession(
135135
session,
136136
connect,
@@ -150,6 +150,10 @@ export class WsTransport {
150150
return;
151151
}
152152

153+
if (session !== this.session) {
154+
continue;
155+
}
156+
153157
const formattedError = formatErrorMessage(error);
154158
if (!isTransportConnectionErrorMessage(formattedError)) {
155159
console.warn("WebSocket RPC subscription failed", {

0 commit comments

Comments
 (0)