Skip to content

Commit d6317c5

Browse files
committed
fix(server): call onerror callback for all transport errors
Adds missing onerror callback invocations before every createJsonErrorResponse call in WebStandardStreamableHTTPServerTransport. This ensures that transport errors are no longer silently swallowed and can be observed via the onerror callback. Changes: - Add this.onerror?.() calls to 15 locations in streamableHttp.ts - Add 10 test cases to verify onerror is called for various error conditions Fixes #1395
1 parent b0ef89f commit d6317c5

File tree

2 files changed

+186
-0
lines changed

2 files changed

+186
-0
lines changed

packages/server/src/server/streamableHttp.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
376376
// The client MUST include an Accept header, listing text/event-stream as a supported content type.
377377
const acceptHeader = req.headers.get('accept');
378378
if (!acceptHeader?.includes('text/event-stream')) {
379+
this.onerror?.(new Error('Not Acceptable: Client must accept text/event-stream'));
379380
return this.createJsonErrorResponse(406, -32_000, 'Not Acceptable: Client must accept text/event-stream');
380381
}
381382

@@ -402,6 +403,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
402403
// Check if there's already an active standalone SSE stream for this session
403404
if (this._streamMapping.get(this._standaloneSseStreamId) !== undefined) {
404405
// Only one GET SSE stream is allowed per session
406+
this.onerror?.(new Error('Conflict: Only one SSE stream is allowed per session'));
405407
return this.createJsonErrorResponse(409, -32_000, 'Conflict: Only one SSE stream is allowed per session');
406408
}
407409

@@ -453,6 +455,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
453455
*/
454456
private async replayEvents(lastEventId: string): Promise<Response> {
455457
if (!this._eventStore) {
458+
this.onerror?.(new Error('Event store not configured'));
456459
return this.createJsonErrorResponse(400, -32_000, 'Event store not configured');
457460
}
458461

@@ -463,11 +466,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
463466
streamId = await this._eventStore.getStreamIdForEventId(lastEventId);
464467

465468
if (!streamId) {
469+
this.onerror?.(new Error('Invalid event ID format'));
466470
return this.createJsonErrorResponse(400, -32_000, 'Invalid event ID format');
467471
}
468472

469473
// Check conflict with the SAME streamId we'll use for mapping
470474
if (this._streamMapping.get(streamId) !== undefined) {
475+
this.onerror?.(new Error('Conflict: Stream already has an active connection'));
471476
return this.createJsonErrorResponse(409, -32_000, 'Conflict: Stream already has an active connection');
472477
}
473478
}
@@ -586,6 +591,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
586591
const acceptHeader = req.headers.get('accept');
587592
// The client MUST include an Accept header, listing both application/json and text/event-stream as supported content types.
588593
if (!acceptHeader?.includes('application/json') || !acceptHeader.includes('text/event-stream')) {
594+
this.onerror?.(new Error('Not Acceptable: Client must accept both application/json and text/event-stream'));
589595
return this.createJsonErrorResponse(
590596
406,
591597
-32_000,
@@ -595,6 +601,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
595601

596602
const ct = req.headers.get('content-type');
597603
if (!ct || !ct.includes('application/json')) {
604+
this.onerror?.(new Error('Unsupported Media Type: Content-Type must be application/json'));
598605
return this.createJsonErrorResponse(415, -32_000, 'Unsupported Media Type: Content-Type must be application/json');
599606
}
600607

@@ -608,6 +615,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
608615
try {
609616
rawMessage = await req.json();
610617
} catch {
618+
this.onerror?.(new Error('Parse error: Invalid JSON'));
611619
return this.createJsonErrorResponse(400, -32_700, 'Parse error: Invalid JSON');
612620
}
613621
} else {
@@ -622,6 +630,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
622630
? rawMessage.map(msg => JSONRPCMessageSchema.parse(msg))
623631
: [JSONRPCMessageSchema.parse(rawMessage)];
624632
} catch {
633+
this.onerror?.(new Error('Parse error: Invalid JSON-RPC message'));
625634
return this.createJsonErrorResponse(400, -32_700, 'Parse error: Invalid JSON-RPC message');
626635
}
627636

@@ -632,9 +641,11 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
632641
// If it's a server with session management and the session ID is already set we should reject the request
633642
// to avoid re-initialization.
634643
if (this._initialized && this.sessionId !== undefined) {
644+
this.onerror?.(new Error('Invalid Request: Server already initialized'));
635645
return this.createJsonErrorResponse(400, -32_600, 'Invalid Request: Server already initialized');
636646
}
637647
if (messages.length > 1) {
648+
this.onerror?.(new Error('Invalid Request: Only one initialization request is allowed'));
638649
return this.createJsonErrorResponse(400, -32_600, 'Invalid Request: Only one initialization request is allowed');
639650
}
640651
this.sessionId = this.sessionIdGenerator?.();
@@ -814,18 +825,21 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
814825
}
815826
if (!this._initialized) {
816827
// If the server has not been initialized yet, reject all requests
828+
this.onerror?.(new Error('Bad Request: Server not initialized'));
817829
return this.createJsonErrorResponse(400, -32_000, 'Bad Request: Server not initialized');
818830
}
819831

820832
const sessionId = req.headers.get('mcp-session-id');
821833

822834
if (!sessionId) {
823835
// Non-initialization requests without a session ID should return 400 Bad Request
836+
this.onerror?.(new Error('Bad Request: Mcp-Session-Id header is required'));
824837
return this.createJsonErrorResponse(400, -32_000, 'Bad Request: Mcp-Session-Id header is required');
825838
}
826839

827840
if (sessionId !== this.sessionId) {
828841
// Reject requests with invalid session ID with 404 Not Found
842+
this.onerror?.(new Error('Session not found'));
829843
return this.createJsonErrorResponse(404, -32_001, 'Session not found');
830844
}
831845

@@ -849,6 +863,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
849863
const protocolVersion = req.headers.get('mcp-protocol-version');
850864

851865
if (protocolVersion !== null && !SUPPORTED_PROTOCOL_VERSIONS.includes(protocolVersion)) {
866+
this.onerror?.(new Error(`Bad Request: Unsupported protocol version: ${protocolVersion}`));
852867
return this.createJsonErrorResponse(
853868
400,
854869
-32_000,

packages/server/test/server/streamableHttp.test.ts

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,4 +768,175 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
768768
await expect(transport.start()).rejects.toThrow('Transport already started');
769769
});
770770
});
771+
772+
describe('HTTPServerTransport - onerror callback', () => {
773+
let transport: WebStandardStreamableHTTPServerTransport;
774+
let mcpServer: McpServer;
775+
let errors: Error[];
776+
777+
beforeEach(async () => {
778+
errors = [];
779+
mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: {} });
780+
781+
transport = new WebStandardStreamableHTTPServerTransport({
782+
sessionIdGenerator: () => randomUUID()
783+
});
784+
785+
transport.onerror = err => errors.push(err);
786+
787+
await mcpServer.connect(transport);
788+
});
789+
790+
afterEach(async () => {
791+
await transport.close();
792+
});
793+
794+
async function initializeServer(): Promise<string> {
795+
const request = createRequest('POST', TEST_MESSAGES.initialize);
796+
const response = await transport.handleRequest(request);
797+
return response.headers.get('mcp-session-id') as string;
798+
}
799+
800+
it('should call onerror for invalid JSON', async () => {
801+
const request = new Request('http://localhost/mcp', {
802+
method: 'POST',
803+
headers: {
804+
Accept: 'application/json, text/event-stream',
805+
'Content-Type': 'application/json'
806+
},
807+
body: 'not valid json'
808+
});
809+
810+
const response = await transport.handleRequest(request);
811+
812+
expect(response.status).toBe(400);
813+
expect(errors.length).toBeGreaterThan(0);
814+
expect(errors[0].message).toContain('Parse error');
815+
});
816+
817+
it('should call onerror for invalid JSON-RPC message', async () => {
818+
const request = new Request('http://localhost/mcp', {
819+
method: 'POST',
820+
headers: {
821+
Accept: 'application/json, text/event-stream',
822+
'Content-Type': 'application/json'
823+
},
824+
body: JSON.stringify({ not: 'valid jsonrpc' })
825+
});
826+
827+
const response = await transport.handleRequest(request);
828+
829+
expect(response.status).toBe(400);
830+
expect(errors.length).toBeGreaterThan(0);
831+
expect(errors[0].message).toContain('Parse error');
832+
});
833+
834+
it('should call onerror for missing Accept header on POST', async () => {
835+
const request = createRequest('POST', TEST_MESSAGES.initialize, { accept: 'application/json' });
836+
837+
const response = await transport.handleRequest(request);
838+
839+
expect(response.status).toBe(406);
840+
expect(errors.length).toBeGreaterThan(0);
841+
expect(errors[0].message).toContain('Not Acceptable');
842+
});
843+
844+
it('should call onerror for unsupported Content-Type', async () => {
845+
const request = new Request('http://localhost/mcp', {
846+
method: 'POST',
847+
headers: {
848+
Accept: 'application/json, text/event-stream',
849+
'Content-Type': 'text/plain'
850+
},
851+
body: JSON.stringify(TEST_MESSAGES.initialize)
852+
});
853+
854+
const response = await transport.handleRequest(request);
855+
856+
expect(response.status).toBe(415);
857+
expect(errors.length).toBeGreaterThan(0);
858+
expect(errors[0].message).toContain('Unsupported Media Type');
859+
});
860+
861+
it('should call onerror for server not initialized', async () => {
862+
const request = createRequest('POST', TEST_MESSAGES.toolsList);
863+
864+
const response = await transport.handleRequest(request);
865+
866+
expect(response.status).toBe(400);
867+
expect(errors.length).toBeGreaterThan(0);
868+
expect(errors[0].message).toContain('Server not initialized');
869+
});
870+
871+
it('should call onerror for invalid session ID', async () => {
872+
await initializeServer();
873+
874+
const request = createRequest('POST', TEST_MESSAGES.toolsList, { sessionId: 'invalid-session-id' });
875+
876+
const response = await transport.handleRequest(request);
877+
878+
expect(response.status).toBe(404);
879+
expect(errors.length).toBeGreaterThan(0);
880+
expect(errors[0].message).toContain('Session not found');
881+
});
882+
883+
it('should call onerror for re-initialization attempt', async () => {
884+
await initializeServer();
885+
886+
const request = createRequest('POST', TEST_MESSAGES.initialize);
887+
888+
const response = await transport.handleRequest(request);
889+
890+
expect(response.status).toBe(400);
891+
expect(errors.length).toBeGreaterThan(0);
892+
expect(errors[0].message).toContain('Server already initialized');
893+
});
894+
895+
it('should call onerror for GET without Accept header', async () => {
896+
const sessionId = await initializeServer();
897+
898+
const request = createRequest('GET', undefined, { sessionId, accept: 'application/json' });
899+
900+
const response = await transport.handleRequest(request);
901+
902+
expect(response.status).toBe(406);
903+
expect(errors.length).toBeGreaterThan(0);
904+
expect(errors[0].message).toContain('Not Acceptable');
905+
});
906+
907+
it('should call onerror for concurrent SSE streams', async () => {
908+
const sessionId = await initializeServer();
909+
910+
const request1 = createRequest('GET', undefined, { sessionId });
911+
await transport.handleRequest(request1);
912+
913+
const request2 = createRequest('GET', undefined, { sessionId });
914+
const response2 = await transport.handleRequest(request2);
915+
916+
expect(response2.status).toBe(409);
917+
expect(errors.length).toBeGreaterThan(0);
918+
expect(errors[0].message).toContain('Conflict');
919+
});
920+
921+
it('should call onerror for unsupported protocol version', async () => {
922+
const sessionId = await initializeServer();
923+
924+
const request = new Request('http://localhost/mcp', {
925+
method: 'POST',
926+
headers: {
927+
'Content-Type': 'application/json',
928+
Accept: 'application/json, text/event-stream',
929+
'mcp-session-id': sessionId,
930+
'mcp-protocol-version': 'unsupported-version'
931+
},
932+
body: JSON.stringify(TEST_MESSAGES.toolsList)
933+
});
934+
935+
const response = await transport.handleRequest(request);
936+
937+
expect(response.status).toBe(400);
938+
expect(errors.length).toBeGreaterThan(0);
939+
expect(errors[0].message).toContain('Unsupported protocol version');
940+
});
941+
});
771942
});

0 commit comments

Comments
 (0)