Skip to content

Commit 1e929ad

Browse files
Handle intentional WS closes and heartbeat freshness
- Ignore intentional close events during transport disposal - Track heartbeat freshness from pong traffic and reset on reconnect - Propagate request lifecycle hooks through the RPC client
1 parent 3ee6fbb commit 1e929ad

7 files changed

Lines changed: 388 additions & 47 deletions

File tree

apps/web/src/rpc/wsTransport.test.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,15 @@ describe("WsTransport (web instrumentation)", () => {
242242
socket.close(1012, "service restart");
243243

244244
await waitFor(() => {
245-
expect(onClose).toHaveBeenCalledWith({
246-
code: 1012,
247-
reason: "service restart",
248-
});
245+
expect(onClose).toHaveBeenCalledWith(
246+
{
247+
code: 1012,
248+
reason: "service restart",
249+
},
250+
{
251+
intentional: false,
252+
},
253+
);
249254
expect(getWsConnectionStatus()).toMatchObject({
250255
attemptCount: 2,
251256
closeReason: "service restart",

apps/web/src/rpc/wsTransport.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@ function createWsRpcProtocolLayer(
3131
clearAllTrackedRpcRequests();
3232
recordWsConnectionErrored(message);
3333
},
34-
onClose: (details) => {
34+
onClose: (details, context) => {
3535
clearAllTrackedRpcRequests();
36+
if (context.intentional) {
37+
return;
38+
}
3639
recordWsConnectionClosed(details);
3740
},
3841
},

packages/client-runtime/src/wsRpcClient.test.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,13 @@ describe("createWsRpcClient", () => {
4343
reconnect: vi.fn(async () => {
4444
order.push("reconnect");
4545
}),
46+
isHeartbeatFresh: vi.fn(() => true),
4647
request: vi.fn(),
4748
requestStream: vi.fn(),
4849
subscribe: vi.fn(() => () => undefined),
4950
} satisfies Pick<
5051
WsTransport,
51-
"dispose" | "reconnect" | "request" | "requestStream" | "subscribe"
52+
"dispose" | "isHeartbeatFresh" | "reconnect" | "request" | "requestStream" | "subscribe"
5253
>;
5354

5455
const client = createWsRpcClient(transport as unknown as WsTransport, {
@@ -61,6 +62,26 @@ describe("createWsRpcClient", () => {
6162
expect(order).toEqual(["beforeReconnect", "reconnect"]);
6263
});
6364

65+
it("delegates heartbeat freshness to the transport", () => {
66+
const isHeartbeatFresh = vi.fn(() => true);
67+
const transport = {
68+
dispose: vi.fn(async () => undefined),
69+
reconnect: vi.fn(async () => undefined),
70+
isHeartbeatFresh,
71+
request: vi.fn(),
72+
requestStream: vi.fn(),
73+
subscribe: vi.fn(() => () => undefined),
74+
} satisfies Pick<
75+
WsTransport,
76+
"dispose" | "isHeartbeatFresh" | "reconnect" | "request" | "requestStream" | "subscribe"
77+
>;
78+
79+
const client = createWsRpcClient(transport as unknown as WsTransport);
80+
81+
expect(client.isHeartbeatFresh()).toBe(true);
82+
expect(isHeartbeatFresh).toHaveBeenCalledOnce();
83+
});
84+
6485
it("reduces vcs status stream events into flat status snapshots", () => {
6586
const subscribe = vi.fn(<TValue>(_connect: unknown, listener: (value: TValue) => void) => {
6687
for (const event of [
@@ -89,12 +110,13 @@ describe("createWsRpcClient", () => {
89110
const transport = {
90111
dispose: vi.fn(async () => undefined),
91112
reconnect: vi.fn(async () => undefined),
113+
isHeartbeatFresh: vi.fn(() => true),
92114
request: vi.fn(),
93115
requestStream: vi.fn(),
94116
subscribe,
95117
} satisfies Pick<
96118
WsTransport,
97-
"dispose" | "reconnect" | "request" | "requestStream" | "subscribe"
119+
"dispose" | "isHeartbeatFresh" | "reconnect" | "request" | "requestStream" | "subscribe"
98120
>;
99121

100122
const client = createWsRpcClient(transport as unknown as WsTransport);
@@ -134,12 +156,13 @@ describe("createWsRpcClient", () => {
134156
const transport = {
135157
dispose: vi.fn(async () => undefined),
136158
reconnect: vi.fn(async () => undefined),
159+
isHeartbeatFresh: vi.fn(() => true),
137160
request: vi.fn(),
138161
requestStream: vi.fn(),
139162
subscribe,
140163
} satisfies Pick<
141164
WsTransport,
142-
"dispose" | "reconnect" | "request" | "requestStream" | "subscribe"
165+
"dispose" | "isHeartbeatFresh" | "reconnect" | "request" | "requestStream" | "subscribe"
143166
>;
144167

145168
const client = createWsRpcClient(transport as unknown as WsTransport);

packages/client-runtime/src/wsRpcClient.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@ export interface WsRpcClient {
125125
};
126126
readonly server: {
127127
readonly getConfig: RpcUnaryNoArgMethod<typeof WS_METHODS.serverGetConfig>;
128-
readonly refreshProviders: RpcUnaryNoArgMethod<typeof WS_METHODS.serverRefreshProviders>;
128+
readonly refreshProviders: (
129+
input?: RpcInput<typeof WS_METHODS.serverRefreshProviders>,
130+
) => ReturnType<RpcUnaryMethod<typeof WS_METHODS.serverRefreshProviders>>;
129131
readonly discoverSourceControl: RpcUnaryNoArgMethod<
130132
typeof WS_METHODS.serverDiscoverSourceControl
131133
>;
@@ -171,7 +173,7 @@ export function createWsRpcClient(
171173
): WsRpcClient {
172174
return {
173175
dispose: () => transport.dispose(),
174-
isHeartbeatFresh: () => false,
176+
isHeartbeatFresh: () => transport.isHeartbeatFresh(),
175177
reconnect: async () => {
176178
options?.beforeReconnect?.();
177179
await transport.reconnect();
@@ -278,8 +280,8 @@ export function createWsRpcClient(
278280
},
279281
server: {
280282
getConfig: () => transport.request((client) => client[WS_METHODS.serverGetConfig]({})),
281-
refreshProviders: () =>
282-
transport.request((client) => client[WS_METHODS.serverRefreshProviders]({})),
283+
refreshProviders: (input) =>
284+
transport.request((client) => client[WS_METHODS.serverRefreshProviders](input ?? {})),
283285
discoverSourceControl: () =>
284286
transport.request((client) => client[WS_METHODS.serverDiscoverSourceControl]({})),
285287
updateProvider: (input) =>

packages/client-runtime/src/wsRpcProtocol.ts

Lines changed: 181 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,34 @@ import {
1515
export interface WsProtocolLifecycleHandlers {
1616
readonly getConnectionLabel?: () => string | null;
1717
readonly getVersionMismatchHint?: () => string | null;
18+
readonly isCloseIntentional?: () => boolean;
19+
readonly isActive?: () => boolean;
1820
readonly onAttempt?: (socketUrl: string) => void;
1921
readonly onOpen?: () => void;
22+
readonly onHeartbeatPing?: () => void;
23+
readonly onHeartbeatPong?: () => void;
24+
readonly onHeartbeatTimeout?: () => void;
25+
readonly onRequestStart?: (info: {
26+
readonly id: string;
27+
readonly tag: string;
28+
readonly stream: boolean;
29+
}) => void;
30+
readonly onRequestChunk?: (info: {
31+
readonly id: string;
32+
readonly tag: string;
33+
readonly chunkCount: number;
34+
}) => void;
35+
readonly onRequestExit?: (info: {
36+
readonly id: string;
37+
readonly tag: string;
38+
readonly stream: boolean;
39+
}) => void;
40+
readonly onRequestInterrupt?: (info: { readonly id: string; readonly tag?: string }) => void;
2041
readonly onError?: (message: string) => void;
21-
readonly onClose?: (details: { readonly code: number; readonly reason: string }) => void;
42+
readonly onClose?: (
43+
details: { readonly code: number; readonly reason: string },
44+
context: { readonly intentional: boolean },
45+
) => void;
2246
}
2347

2448
export interface WsRpcProtocolRequestTelemetry {
@@ -63,48 +87,107 @@ function resolveWsRpcSocketUrl(rawUrl: string): string {
6387
return resolved.toString();
6488
}
6589

66-
function defaultLifecycleHandlers(): Required<WsProtocolLifecycleHandlers> {
90+
type ResolvedLifecycleHandlers = Required<
91+
Pick<
92+
WsProtocolLifecycleHandlers,
93+
| "getConnectionLabel"
94+
| "getVersionMismatchHint"
95+
| "isCloseIntentional"
96+
| "isActive"
97+
| "onAttempt"
98+
| "onOpen"
99+
| "onHeartbeatPing"
100+
| "onHeartbeatPong"
101+
| "onHeartbeatTimeout"
102+
| "onError"
103+
| "onClose"
104+
>
105+
>;
106+
107+
function defaultLifecycleHandlers(): ResolvedLifecycleHandlers {
67108
return {
68109
onAttempt: () => undefined,
69110
onOpen: () => undefined,
111+
onHeartbeatPing: () => undefined,
112+
onHeartbeatPong: () => undefined,
113+
onHeartbeatTimeout: () => undefined,
70114
onError: () => undefined,
71115
onClose: () => undefined,
72116
getConnectionLabel: () => null,
73117
getVersionMismatchHint: () => null,
118+
isCloseIntentional: () => false,
119+
isActive: () => true,
74120
};
75121
}
76122

77123
function resolveLifecycleHandlers(
78124
handlers: WsProtocolLifecycleHandlers | undefined,
79125
telemetryLifecycle: WsProtocolLifecycleHandlers | undefined,
80-
): Required<WsProtocolLifecycleHandlers> {
81-
if (telemetryLifecycle === undefined) {
82-
return {
83-
...defaultLifecycleHandlers(),
84-
...handlers,
85-
};
86-
}
126+
): ResolvedLifecycleHandlers {
127+
const defaults = defaultLifecycleHandlers();
128+
const isActive = handlers?.isActive ?? telemetryLifecycle?.isActive ?? defaults.isActive;
129+
const isCloseIntentional =
130+
handlers?.isCloseIntentional ??
131+
telemetryLifecycle?.isCloseIntentional ??
132+
defaults.isCloseIntentional;
87133

88134
return {
89135
getConnectionLabel: () =>
90-
handlers?.getConnectionLabel?.() ?? telemetryLifecycle.getConnectionLabel?.() ?? null,
136+
handlers?.getConnectionLabel?.() ?? telemetryLifecycle?.getConnectionLabel?.() ?? null,
91137
getVersionMismatchHint: () =>
92-
handlers?.getVersionMismatchHint?.() ?? telemetryLifecycle.getVersionMismatchHint?.() ?? null,
138+
handlers?.getVersionMismatchHint?.() ??
139+
telemetryLifecycle?.getVersionMismatchHint?.() ??
140+
null,
141+
isActive,
142+
isCloseIntentional,
93143
onAttempt: (socketUrl) => {
94-
telemetryLifecycle.onAttempt?.(socketUrl);
144+
if (!isActive()) {
145+
return;
146+
}
147+
telemetryLifecycle?.onAttempt?.(socketUrl);
95148
handlers?.onAttempt?.(socketUrl);
96149
},
97150
onOpen: () => {
98-
telemetryLifecycle.onOpen?.();
151+
if (!isActive()) {
152+
return;
153+
}
154+
telemetryLifecycle?.onOpen?.();
99155
handlers?.onOpen?.();
100156
},
157+
onHeartbeatPing: () => {
158+
if (!isActive()) {
159+
return;
160+
}
161+
telemetryLifecycle?.onHeartbeatPing?.();
162+
handlers?.onHeartbeatPing?.();
163+
},
164+
onHeartbeatPong: () => {
165+
if (!isActive()) {
166+
return;
167+
}
168+
telemetryLifecycle?.onHeartbeatPong?.();
169+
handlers?.onHeartbeatPong?.();
170+
},
171+
onHeartbeatTimeout: () => {
172+
if (!isActive()) {
173+
return;
174+
}
175+
telemetryLifecycle?.onHeartbeatTimeout?.();
176+
handlers?.onHeartbeatTimeout?.();
177+
},
101178
onError: (message) => {
102-
telemetryLifecycle.onError?.(message);
179+
if (!isActive()) {
180+
return;
181+
}
182+
telemetryLifecycle?.onError?.(message);
103183
handlers?.onError?.(message);
104184
},
105-
onClose: (details) => {
106-
telemetryLifecycle.onClose?.(details);
107-
handlers?.onClose?.(details);
185+
onClose: (details, context) => {
186+
if (!isActive()) {
187+
return;
188+
}
189+
telemetryLifecycle?.onClose?.(details, context);
190+
handlers?.onClose?.(details, context);
108191
},
109192
};
110193
}
@@ -158,10 +241,15 @@ export function createWsRpcProtocolLayer(
158241
socket.addEventListener(
159242
"close",
160243
(event) => {
161-
lifecycle.onClose({
162-
code: event.code,
163-
reason: event.reason,
164-
});
244+
lifecycle.onClose(
245+
{
246+
code: event.code,
247+
reason: event.reason,
248+
},
249+
{
250+
intentional: lifecycle.isCloseIntentional(),
251+
},
252+
);
165253
},
166254
{ once: true },
167255
);
@@ -210,6 +298,77 @@ export function createWsRpcProtocolLayer(
210298
retryTransientErrors: true,
211299
}),
212300
);
301+
const requestHooksLayer = Layer.succeed(
302+
RpcClient.RequestHooks,
303+
RpcClient.RequestHooks.of({
304+
onRequestStart: (info) =>
305+
Effect.sync(() => {
306+
if (!lifecycle.isActive()) {
307+
return;
308+
}
309+
handlers?.onRequestStart?.({
310+
id: String(info.id),
311+
tag: info.tag,
312+
stream: info.stream,
313+
});
314+
}),
315+
onRequestChunk: (info) =>
316+
Effect.sync(() => {
317+
if (!lifecycle.isActive()) {
318+
return;
319+
}
320+
handlers?.onRequestChunk?.({
321+
id: String(info.id),
322+
tag: info.tag,
323+
chunkCount: info.chunkCount,
324+
});
325+
}),
326+
onRequestExit: (info) =>
327+
Effect.sync(() => {
328+
if (!lifecycle.isActive()) {
329+
return;
330+
}
331+
handlers?.onRequestExit?.({
332+
id: String(info.id),
333+
tag: info.tag,
334+
stream: info.stream,
335+
});
336+
}),
337+
onRequestInterrupt: (info) =>
338+
Effect.sync(() => {
339+
if (!lifecycle.isActive()) {
340+
return;
341+
}
342+
handlers?.onRequestInterrupt?.({
343+
id: String(info.id),
344+
...(info.tag === undefined ? {} : { tag: info.tag }),
345+
});
346+
}),
347+
}),
348+
);
349+
const connectionHooksLayer = Layer.succeed(
350+
RpcClient.ConnectionHooks,
351+
RpcClient.ConnectionHooks.of({
352+
onConnect: Effect.void,
353+
onDisconnect: Effect.void,
354+
onPing: Effect.sync(() => {
355+
lifecycle.onHeartbeatPing();
356+
}),
357+
onPong: Effect.sync(() => {
358+
lifecycle.onHeartbeatPong();
359+
}),
360+
onPingTimeout: Effect.sync(() => {
361+
requestTelemetry?.onClearTrackedRequests?.();
362+
lifecycle.onHeartbeatTimeout();
363+
}),
364+
}),
365+
);
213366

214-
return protocolLayer.pipe(Layer.provide(Layer.mergeAll(socketLayer, RpcSerialization.layerJson)));
367+
return Layer.mergeAll(
368+
protocolLayer.pipe(
369+
Layer.provide(Layer.mergeAll(socketLayer, RpcSerialization.layerJson, connectionHooksLayer)),
370+
),
371+
requestHooksLayer,
372+
connectionHooksLayer,
373+
);
215374
}

0 commit comments

Comments
 (0)