Skip to content

Commit 9a8f2f8

Browse files
author
Charlie Tonneslan
committed
fix: ensure onerror callback is called before throwing transport errors
Several transport implementations threw errors without first calling the onerror callback, causing errors to be silently swallowed when callers didn't handle the thrown error. This adds proper onerror reporting to stdio, websocket, and streamable HTTP transports. Fixes #1395
1 parent ccb78f2 commit 9a8f2f8

4 files changed

Lines changed: 96 additions & 34 deletions

File tree

packages/client/src/client/stdio.ts

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -244,16 +244,22 @@ export class StdioClientTransport implements Transport {
244244
}
245245

246246
send(message: JSONRPCMessage): Promise<void> {
247-
return new Promise(resolve => {
248-
if (!this._process?.stdin) {
249-
throw new SdkError(SdkErrorCode.NotConnected, 'Not connected');
250-
}
247+
return new Promise((resolve, reject) => {
248+
try {
249+
if (!this._process?.stdin) {
250+
throw new SdkError(SdkErrorCode.NotConnected, 'Not connected');
251+
}
251252

252-
const json = serializeMessage(message);
253-
if (this._process.stdin.write(json)) {
254-
resolve();
255-
} else {
256-
this._process.stdin.once('drain', resolve);
253+
const json = serializeMessage(message);
254+
if (this._process.stdin.write(json)) {
255+
resolve();
256+
} else {
257+
this._process.stdin.once('drain', resolve);
258+
}
259+
} catch (error) {
260+
const err = error instanceof Error ? error : new Error(String(error));
261+
this.onerror?.(err);
262+
reject(err);
257263
}
258264
});
259265
}

packages/client/src/client/websocket.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,21 @@ export class WebSocketClientTransport implements Transport {
6262

6363
send(message: JSONRPCMessage): Promise<void> {
6464
return new Promise((resolve, reject) => {
65-
if (!this._socket) {
66-
reject(new Error('Not connected'));
67-
return;
68-
}
65+
try {
66+
if (!this._socket) {
67+
const err = new Error('Not connected');
68+
this.onerror?.(err);
69+
reject(err);
70+
return;
71+
}
6972

70-
this._socket?.send(JSON.stringify(message));
71-
resolve();
73+
this._socket.send(JSON.stringify(message));
74+
resolve();
75+
} catch (error) {
76+
const err = error instanceof Error ? error : new Error(String(error));
77+
this.onerror?.(err);
78+
reject(err);
79+
}
7280
});
7381
}
7482
}

packages/server/src/server/stdio.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,18 @@ export class StdioServerTransport implements Transport {
8787
}
8888

8989
send(message: JSONRPCMessage): Promise<void> {
90-
return new Promise(resolve => {
91-
const json = serializeMessage(message);
92-
if (this._stdout.write(json)) {
93-
resolve();
94-
} else {
95-
this._stdout.once('drain', resolve);
90+
return new Promise((resolve, reject) => {
91+
try {
92+
const json = serializeMessage(message);
93+
if (this._stdout.write(json)) {
94+
resolve();
95+
} else {
96+
this._stdout.once('drain', resolve);
97+
}
98+
} catch (error) {
99+
const err = error instanceof Error ? error : new Error(String(error));
100+
this.onerror?.(err);
101+
reject(err);
96102
}
97103
});
98104
}

packages/server/src/server/streamableHttp.ts

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
239239
private _enableDnsRebindingProtection: boolean;
240240
private _retryInterval?: number;
241241
private _supportedProtocolVersions: string[];
242+
private _isClosing = false;
242243

243244
sessionId?: string;
244245
onclose?: () => void;
@@ -388,7 +389,14 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
388389
return;
389390
}
390391

391-
const primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage);
392+
let primingEventId: string;
393+
try {
394+
primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage);
395+
} catch (error) {
396+
const err = error as Error;
397+
this.onerror?.(err);
398+
throw err;
399+
}
392400

393401
let primingEvent = `id: ${primingEventId}\ndata: \n\n`;
394402
if (this._retryInterval !== undefined) {
@@ -887,14 +895,30 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
887895
}
888896

889897
async close(): Promise<void> {
890-
// Close all SSE connections
891-
for (const { cleanup } of this._streamMapping.values()) {
892-
cleanup();
898+
if (this._isClosing) return;
899+
this._isClosing = true;
900+
901+
try {
902+
// Snapshot and clear the mapping before calling cleanup functions
903+
// to prevent re-entrant deletes from cancel callbacks
904+
const entries = [...this._streamMapping.values()];
905+
this._streamMapping.clear();
906+
907+
// Close all SSE connections
908+
for (const { cleanup } of entries) {
909+
try {
910+
cleanup();
911+
} catch {
912+
// Suppress cleanup errors
913+
}
914+
}
915+
916+
// Clear any pending responses
917+
this._requestResponseMap.clear();
918+
} finally {
919+
// Don't reset _isClosing - once closed, stay closed
893920
}
894-
this._streamMapping.clear();
895921

896-
// Clear any pending responses
897-
this._requestResponseMap.clear();
898922
this.onclose?.();
899923
}
900924

@@ -937,15 +961,23 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
937961
if (requestId === undefined) {
938962
// For standalone SSE streams, we can only send requests and notifications
939963
if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
940-
throw new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request');
964+
const err = new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request');
965+
this.onerror?.(err);
966+
throw err;
941967
}
942968

943969
// Generate and store event ID if event store is provided
944970
// Store even if stream is disconnected so events can be replayed on reconnect
945971
let eventId: string | undefined;
946972
if (this._eventStore) {
947-
// Stores the event and gets the generated event ID
948-
eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message);
973+
try {
974+
// Stores the event and gets the generated event ID
975+
eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message);
976+
} catch (error) {
977+
const err = error as Error;
978+
this.onerror?.(err);
979+
throw err;
980+
}
949981
}
950982

951983
const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId);
@@ -964,7 +996,9 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
964996
// Get the response for this request
965997
const streamId = this._requestToStreamMapping.get(requestId);
966998
if (!streamId) {
967-
throw new Error(`No connection established for request ID: ${String(requestId)}`);
999+
const err = new Error(`No connection established for request ID: ${String(requestId)}`);
1000+
this.onerror?.(err);
1001+
throw err;
9681002
}
9691003

9701004
const stream = this._streamMapping.get(streamId);
@@ -974,7 +1008,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
9741008
let eventId: string | undefined;
9751009

9761010
if (this._eventStore) {
977-
eventId = await this._eventStore.storeEvent(streamId, message);
1011+
try {
1012+
eventId = await this._eventStore.storeEvent(streamId, message);
1013+
} catch (error) {
1014+
const err = error as Error;
1015+
this.onerror?.(err);
1016+
throw err;
1017+
}
9781018
}
9791019
// Write the event to the response stream
9801020
this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
@@ -989,7 +1029,9 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
9891029

9901030
if (allResponsesReady) {
9911031
if (!stream) {
992-
throw new Error(`No connection established for request ID: ${String(requestId)}`);
1032+
const err = new Error(`No connection established for request ID: ${String(requestId)}`);
1033+
this.onerror?.(err);
1034+
throw err;
9931035
}
9941036
if (this._enableJsonResponse && stream.resolveJson) {
9951037
// All responses ready, send as JSON

0 commit comments

Comments
 (0)