forked from modelcontextprotocol/typescript-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebStandardStreamableHttp.ts
More file actions
1006 lines (898 loc) · 39.6 KB
/
webStandardStreamableHttp.ts
File metadata and controls
1006 lines (898 loc) · 39.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Web Standards Streamable HTTP Server Transport
*
* This is the core transport implementation using Web Standard APIs (Request, Response, ReadableStream).
* It can run on any runtime that supports Web Standards: Node.js 18+, Cloudflare Workers, Deno, Bun, etc.
*
* For Node.js Express/HTTP compatibility, use `StreamableHTTPServerTransport` which wraps this transport.
*/
import { Transport } from '../shared/transport.js';
import { AuthInfo } from './auth/types.js';
import {
MessageExtraInfo,
RequestInfo,
isInitializeRequest,
isJSONRPCErrorResponse,
isJSONRPCRequest,
isJSONRPCResultResponse,
JSONRPCMessage,
JSONRPCMessageSchema,
RequestId,
SUPPORTED_PROTOCOL_VERSIONS,
DEFAULT_NEGOTIATED_PROTOCOL_VERSION
} from '../types.js';
export type StreamId = string;
export type EventId = string;
/**
* Interface for resumability support via event storage
*/
export interface EventStore {
/**
* Stores an event for later retrieval
* @param streamId ID of the stream the event belongs to
* @param message The JSON-RPC message to store
* @returns The generated event ID for the stored event
*/
storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise<EventId>;
/**
* Get the stream ID associated with a given event ID.
* @param eventId The event ID to look up
* @returns The stream ID, or undefined if not found
*
* Optional: If not provided, the SDK will use the streamId returned by
* replayEventsAfter for stream mapping.
*/
getStreamIdForEventId?(eventId: EventId): Promise<StreamId | undefined>;
replayEventsAfter(
lastEventId: EventId,
{
send
}: {
send: (eventId: EventId, message: JSONRPCMessage) => Promise<void>;
}
): Promise<StreamId>;
}
/**
* Internal stream mapping for managing SSE connections
*/
interface StreamMapping {
/** Stream controller for pushing SSE data - only used with ReadableStream approach */
controller?: ReadableStreamDefaultController<Uint8Array>;
/** Text encoder for SSE formatting */
encoder?: TextEncoder;
/** Promise resolver for JSON response mode */
resolveJson?: (response: Response) => void;
/** Cleanup function to close stream and remove mapping */
cleanup: () => void;
}
/**
* Configuration options for WebStandardStreamableHTTPServerTransport
*/
export interface WebStandardStreamableHTTPServerTransportOptions {
/**
* Function that generates a session ID for the transport.
* The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash)
*
* If not provided, session management is disabled (stateless mode).
*/
sessionIdGenerator?: () => string;
/**
* A callback for session initialization events
* This is called when the server initializes a new session.
* Useful in cases when you need to register multiple mcp sessions
* and need to keep track of them.
* @param sessionId The generated session ID
*/
onsessioninitialized?: (sessionId: string) => void | Promise<void>;
/**
* A callback for session close events
* This is called when the server closes a session due to a DELETE request.
* Useful in cases when you need to clean up resources associated with the session.
* Note that this is different from the transport closing, if you are handling
* HTTP requests from multiple nodes you might want to close each
* WebStandardStreamableHTTPServerTransport after a request is completed while still keeping the
* session open/running.
* @param sessionId The session ID that was closed
*/
onsessionclosed?: (sessionId: string) => void | Promise<void>;
/**
* If true, the server will return JSON responses instead of starting an SSE stream.
* This can be useful for simple request/response scenarios without streaming.
* Default is false (SSE streams are preferred).
*/
enableJsonResponse?: boolean;
/**
* Event store for resumability support
* If provided, resumability will be enabled, allowing clients to reconnect and resume messages
*/
eventStore?: EventStore;
/**
* List of allowed host header values for DNS rebinding protection.
* If not specified, host validation is disabled.
* @deprecated Use external middleware for host validation instead.
*/
allowedHosts?: string[];
/**
* List of allowed origin header values for DNS rebinding protection.
* If not specified, origin validation is disabled.
* @deprecated Use external middleware for origin validation instead.
*/
allowedOrigins?: string[];
/**
* Enable DNS rebinding protection (requires allowedHosts and/or allowedOrigins to be configured).
* Default is false for backwards compatibility.
* @deprecated Use external middleware for DNS rebinding protection instead.
*/
enableDnsRebindingProtection?: boolean;
/**
* Retry interval in milliseconds to suggest to clients in SSE retry field.
* When set, the server will send a retry field in SSE priming events to control
* client reconnection timing for polling behavior.
*/
retryInterval?: number;
}
/**
* Options for handling a request
*/
export interface HandleRequestOptions {
/**
* Pre-parsed request body. If provided, the transport will use this instead of parsing req.json().
* Useful when using body-parser middleware that has already parsed the body.
*/
parsedBody?: unknown;
/**
* Authentication info from middleware. If provided, will be passed to message handlers.
*/
authInfo?: AuthInfo;
}
/**
* Server transport for Web Standards Streamable HTTP: this implements the MCP Streamable HTTP transport specification
* using Web Standard APIs (Request, Response, ReadableStream).
*
* This transport works on any runtime that supports Web Standards: Node.js 18+, Cloudflare Workers, Deno, Bun, etc.
*
* Usage example:
*
* ```typescript
* // Stateful mode - server sets the session ID
* const statefulTransport = new WebStandardStreamableHTTPServerTransport({
* sessionIdGenerator: () => crypto.randomUUID(),
* });
*
* // Stateless mode - explicitly set session ID to undefined
* const statelessTransport = new WebStandardStreamableHTTPServerTransport({
* sessionIdGenerator: undefined,
* });
*
* // Hono.js usage
* app.all('/mcp', async (c) => {
* return transport.handleRequest(c.req.raw);
* });
*
* // Cloudflare Workers usage
* export default {
* async fetch(request: Request): Promise<Response> {
* return transport.handleRequest(request);
* }
* };
* ```
*
* In stateful mode:
* - Session ID is generated and included in response headers
* - Session ID is always included in initialization responses
* - Requests with invalid session IDs are rejected with 404 Not Found
* - Non-initialization requests without a session ID are rejected with 400 Bad Request
* - State is maintained in-memory (connections, message history)
*
* In stateless mode:
* - No Session ID is included in any responses
* - No session validation is performed
*/
export class WebStandardStreamableHTTPServerTransport implements Transport {
// when sessionId is not set (undefined), it means the transport is in stateless mode
private sessionIdGenerator: (() => string) | undefined;
private _started: boolean = false;
private _hasHandledRequest: boolean = false;
private _streamMapping: Map<string, StreamMapping> = new Map();
private _requestToStreamMapping: Map<RequestId, string> = new Map();
private _requestResponseMap: Map<RequestId, JSONRPCMessage> = new Map();
private _initialized: boolean = false;
private _enableJsonResponse: boolean = false;
private _standaloneSseStreamId: string = '_GET_stream';
private _eventStore?: EventStore;
private _onsessioninitialized?: (sessionId: string) => void | Promise<void>;
private _onsessionclosed?: (sessionId: string) => void | Promise<void>;
private _allowedHosts?: string[];
private _allowedOrigins?: string[];
private _enableDnsRebindingProtection: boolean;
private _retryInterval?: number;
sessionId?: string;
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void;
constructor(options: WebStandardStreamableHTTPServerTransportOptions = {}) {
this.sessionIdGenerator = options.sessionIdGenerator;
this._enableJsonResponse = options.enableJsonResponse ?? false;
this._eventStore = options.eventStore;
this._onsessioninitialized = options.onsessioninitialized;
this._onsessionclosed = options.onsessionclosed;
this._allowedHosts = options.allowedHosts;
this._allowedOrigins = options.allowedOrigins;
this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false;
this._retryInterval = options.retryInterval;
}
/**
* Starts the transport. This is required by the Transport interface but is a no-op
* for the Streamable HTTP transport as connections are managed per-request.
*/
async start(): Promise<void> {
if (this._started) {
throw new Error('Transport already started');
}
this._started = true;
}
/**
* Helper to create a JSON error response
*/
private createJsonErrorResponse(
status: number,
code: number,
message: string,
options?: { headers?: Record<string, string>; data?: string }
): Response {
const error: { code: number; message: string; data?: string } = { code, message };
if (options?.data !== undefined) {
error.data = options.data;
}
return new Response(
JSON.stringify({
jsonrpc: '2.0',
error,
id: null
}),
{
status,
headers: {
'Content-Type': 'application/json',
...options?.headers
}
}
);
}
/**
* Validates request headers for DNS rebinding protection.
* @returns Error response if validation fails, undefined if validation passes.
*/
private validateRequestHeaders(req: Request): Response | undefined {
// Skip validation if protection is not enabled
if (!this._enableDnsRebindingProtection) {
return undefined;
}
// Validate Host header if allowedHosts is configured
if (this._allowedHosts && this._allowedHosts.length > 0) {
const hostHeader = req.headers.get('host');
if (!hostHeader || !this._allowedHosts.includes(hostHeader)) {
const error = `Invalid Host header: ${hostHeader}`;
this.onerror?.(new Error(error));
return this.createJsonErrorResponse(403, -32000, error);
}
}
// Validate Origin header if allowedOrigins is configured
if (this._allowedOrigins && this._allowedOrigins.length > 0) {
const originHeader = req.headers.get('origin');
if (originHeader && !this._allowedOrigins.includes(originHeader)) {
const error = `Invalid Origin header: ${originHeader}`;
this.onerror?.(new Error(error));
return this.createJsonErrorResponse(403, -32000, error);
}
}
return undefined;
}
/**
* Handles an incoming HTTP request, whether GET, POST, or DELETE
* Returns a Response object (Web Standard)
*/
async handleRequest(req: Request, options?: HandleRequestOptions): Promise<Response> {
// In stateless mode (no sessionIdGenerator), each request must use a fresh transport.
// Reusing a stateless transport causes message ID collisions between clients.
if (!this.sessionIdGenerator && this._hasHandledRequest) {
throw new Error('Stateless transport cannot be reused across requests. Create a new transport per request.');
}
this._hasHandledRequest = true;
// Validate request headers for DNS rebinding protection
const validationError = this.validateRequestHeaders(req);
if (validationError) {
return validationError;
}
switch (req.method) {
case 'POST':
return this.handlePostRequest(req, options);
case 'GET':
return this.handleGetRequest(req);
case 'DELETE':
return this.handleDeleteRequest(req);
default:
return this.handleUnsupportedRequest();
}
}
/**
* Writes a priming event to establish resumption capability.
* Only sends if eventStore is configured (opt-in for resumability) and
* the client's protocol version supports empty SSE data (>= 2025-11-25).
*/
private async writePrimingEvent(
controller: ReadableStreamDefaultController<Uint8Array>,
encoder: TextEncoder,
streamId: string,
protocolVersion: string
): Promise<void> {
if (!this._eventStore) {
return;
}
// Priming events have empty data which older clients cannot handle.
// Only send priming events to clients with protocol version >= 2025-11-25
// which includes the fix for handling empty SSE data.
if (protocolVersion < '2025-11-25') {
return;
}
const primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage);
let primingEvent = `id: ${primingEventId}\ndata: \n\n`;
if (this._retryInterval !== undefined) {
primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`;
}
controller.enqueue(encoder.encode(primingEvent));
}
/**
* Handles GET requests for SSE stream
*/
private async handleGetRequest(req: Request): Promise<Response> {
// The client MUST include an Accept header, listing text/event-stream as a supported content type.
const acceptHeader = req.headers.get('accept');
if (!acceptHeader?.includes('text/event-stream')) {
return this.createJsonErrorResponse(406, -32000, 'Not Acceptable: Client must accept text/event-stream');
}
// If an Mcp-Session-Id is returned by the server during initialization,
// clients using the Streamable HTTP transport MUST include it
// in the Mcp-Session-Id header on all of their subsequent HTTP requests.
const sessionError = this.validateSession(req);
if (sessionError) {
return sessionError;
}
const protocolError = this.validateProtocolVersion(req);
if (protocolError) {
return protocolError;
}
// Handle resumability: check for Last-Event-ID header
if (this._eventStore) {
const lastEventId = req.headers.get('last-event-id');
if (lastEventId) {
return this.replayEvents(lastEventId);
}
}
// Check if there's already an active standalone SSE stream for this session
if (this._streamMapping.get(this._standaloneSseStreamId) !== undefined) {
// Only one GET SSE stream is allowed per session
return this.createJsonErrorResponse(409, -32000, 'Conflict: Only one SSE stream is allowed per session');
}
const encoder = new TextEncoder();
let streamController: ReadableStreamDefaultController<Uint8Array>;
// Create a ReadableStream with a controller we can use to push SSE events
const readable = new ReadableStream<Uint8Array>({
start: controller => {
streamController = controller;
},
cancel: () => {
// Stream was cancelled by client
this._streamMapping.delete(this._standaloneSseStreamId);
}
});
const headers: Record<string, string> = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive'
};
// After initialization, always include the session ID if we have one
if (this.sessionId !== undefined) {
headers['mcp-session-id'] = this.sessionId;
}
// Store the stream mapping with the controller for pushing data
this._streamMapping.set(this._standaloneSseStreamId, {
controller: streamController!,
encoder,
cleanup: () => {
this._streamMapping.delete(this._standaloneSseStreamId);
try {
streamController!.close();
} catch {
// Controller might already be closed
}
}
});
return new Response(readable, { headers });
}
/**
* Replays events that would have been sent after the specified event ID
* Only used when resumability is enabled
*/
private async replayEvents(lastEventId: string): Promise<Response> {
if (!this._eventStore) {
return this.createJsonErrorResponse(400, -32000, 'Event store not configured');
}
try {
// If getStreamIdForEventId is available, use it for conflict checking
let streamId: string | undefined;
if (this._eventStore.getStreamIdForEventId) {
streamId = await this._eventStore.getStreamIdForEventId(lastEventId);
if (!streamId) {
return this.createJsonErrorResponse(400, -32000, 'Invalid event ID format');
}
// Check conflict with the SAME streamId we'll use for mapping
if (this._streamMapping.get(streamId) !== undefined) {
return this.createJsonErrorResponse(409, -32000, 'Conflict: Stream already has an active connection');
}
}
const headers: Record<string, string> = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive'
};
if (this.sessionId !== undefined) {
headers['mcp-session-id'] = this.sessionId;
}
// Create a ReadableStream with controller for SSE
const encoder = new TextEncoder();
let streamController: ReadableStreamDefaultController<Uint8Array>;
const readable = new ReadableStream<Uint8Array>({
start: controller => {
streamController = controller;
},
cancel: () => {
// Stream was cancelled by client
// Cleanup will be handled by the mapping
}
});
// Replay events - returns the streamId for backwards compatibility
const replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, {
send: async (eventId: string, message: JSONRPCMessage) => {
const success = this.writeSSEEvent(streamController!, encoder, message, eventId);
if (!success) {
this.onerror?.(new Error('Failed replay events'));
try {
streamController!.close();
} catch {
// Controller might already be closed
}
}
}
});
this._streamMapping.set(replayedStreamId, {
controller: streamController!,
encoder,
cleanup: () => {
this._streamMapping.delete(replayedStreamId);
try {
streamController!.close();
} catch {
// Controller might already be closed
}
}
});
return new Response(readable, { headers });
} catch (error) {
this.onerror?.(error as Error);
return this.createJsonErrorResponse(500, -32000, 'Error replaying events');
}
}
/**
* Writes an event to an SSE stream via controller with proper formatting
*/
private writeSSEEvent(
controller: ReadableStreamDefaultController<Uint8Array>,
encoder: TextEncoder,
message: JSONRPCMessage,
eventId?: string
): boolean {
try {
let eventData = `event: message\n`;
// Include event ID if provided - this is important for resumability
if (eventId) {
eventData += `id: ${eventId}\n`;
}
eventData += `data: ${JSON.stringify(message)}\n\n`;
controller.enqueue(encoder.encode(eventData));
return true;
} catch {
return false;
}
}
/**
* Handles unsupported requests (PUT, PATCH, etc.)
*/
private handleUnsupportedRequest(): Response {
return new Response(
JSON.stringify({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Method not allowed.'
},
id: null
}),
{
status: 405,
headers: {
Allow: 'GET, POST, DELETE',
'Content-Type': 'application/json'
}
}
);
}
/**
* Handles POST requests containing JSON-RPC messages
*/
private async handlePostRequest(req: Request, options?: HandleRequestOptions): Promise<Response> {
try {
// Validate the Accept header
const acceptHeader = req.headers.get('accept');
// The client MUST include an Accept header, listing both application/json and text/event-stream as supported content types.
if (!acceptHeader?.includes('application/json') || !acceptHeader.includes('text/event-stream')) {
return this.createJsonErrorResponse(
406,
-32000,
'Not Acceptable: Client must accept both application/json and text/event-stream'
);
}
const ct = req.headers.get('content-type');
if (!ct || !ct.includes('application/json')) {
return this.createJsonErrorResponse(415, -32000, 'Unsupported Media Type: Content-Type must be application/json');
}
// Build request info from headers and URL
const requestInfo: RequestInfo = {
headers: Object.fromEntries(req.headers.entries()),
url: new URL(req.url)
};
let rawMessage;
if (options?.parsedBody !== undefined) {
rawMessage = options.parsedBody;
} else {
try {
rawMessage = await req.json();
} catch {
return this.createJsonErrorResponse(400, -32700, 'Parse error: Invalid JSON');
}
}
let messages: JSONRPCMessage[];
// handle batch and single messages
try {
if (Array.isArray(rawMessage)) {
messages = rawMessage.map(msg => JSONRPCMessageSchema.parse(msg));
} else {
messages = [JSONRPCMessageSchema.parse(rawMessage)];
}
} catch {
return this.createJsonErrorResponse(400, -32700, 'Parse error: Invalid JSON-RPC message');
}
// Check if this is an initialization request
// https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/lifecycle/
const isInitializationRequest = messages.some(isInitializeRequest);
if (isInitializationRequest) {
// If it's a server with session management and the session ID is already set we should reject the request
// to avoid re-initialization.
if (this._initialized && this.sessionId !== undefined) {
return this.createJsonErrorResponse(400, -32600, 'Invalid Request: Server already initialized');
}
if (messages.length > 1) {
return this.createJsonErrorResponse(400, -32600, 'Invalid Request: Only one initialization request is allowed');
}
this.sessionId = this.sessionIdGenerator?.();
this._initialized = true;
// If we have a session ID and an onsessioninitialized handler, call it immediately
// This is needed in cases where the server needs to keep track of multiple sessions
if (this.sessionId && this._onsessioninitialized) {
await Promise.resolve(this._onsessioninitialized(this.sessionId));
}
}
if (!isInitializationRequest) {
// If an Mcp-Session-Id is returned by the server during initialization,
// clients using the Streamable HTTP transport MUST include it
// in the Mcp-Session-Id header on all of their subsequent HTTP requests.
const sessionError = this.validateSession(req);
if (sessionError) {
return sessionError;
}
// Mcp-Protocol-Version header is required for all requests after initialization.
const protocolError = this.validateProtocolVersion(req);
if (protocolError) {
return protocolError;
}
}
// check if it contains requests
const hasRequests = messages.some(isJSONRPCRequest);
if (!hasRequests) {
// if it only contains notifications or responses, return 202
for (const message of messages) {
this.onmessage?.(message, { authInfo: options?.authInfo, requestInfo });
}
return new Response(null, { status: 202 });
}
// The default behavior is to use SSE streaming
// but in some cases server will return JSON responses
const streamId = crypto.randomUUID();
// Extract protocol version for priming event decision.
// For initialize requests, get from request params.
// For other requests, get from header (already validated).
const initRequest = messages.find(m => isInitializeRequest(m));
const clientProtocolVersion = initRequest
? initRequest.params.protocolVersion
: (req.headers.get('mcp-protocol-version') ?? DEFAULT_NEGOTIATED_PROTOCOL_VERSION);
if (this._enableJsonResponse) {
// For JSON response mode, return a Promise that resolves when all responses are ready
return new Promise<Response>(resolve => {
this._streamMapping.set(streamId, {
resolveJson: resolve,
cleanup: () => {
this._streamMapping.delete(streamId);
}
});
for (const message of messages) {
if (isJSONRPCRequest(message)) {
this._requestToStreamMapping.set(message.id, streamId);
}
}
for (const message of messages) {
this.onmessage?.(message, { authInfo: options?.authInfo, requestInfo });
}
});
}
// SSE streaming mode - use ReadableStream with controller for more reliable data pushing
const encoder = new TextEncoder();
let streamController: ReadableStreamDefaultController<Uint8Array>;
const readable = new ReadableStream<Uint8Array>({
start: controller => {
streamController = controller;
},
cancel: () => {
// Stream was cancelled by client
this._streamMapping.delete(streamId);
}
});
const headers: Record<string, string> = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive'
};
// After initialization, always include the session ID if we have one
if (this.sessionId !== undefined) {
headers['mcp-session-id'] = this.sessionId;
}
// Store the response for this request to send messages back through this connection
// We need to track by request ID to maintain the connection
for (const message of messages) {
if (isJSONRPCRequest(message)) {
this._streamMapping.set(streamId, {
controller: streamController!,
encoder,
cleanup: () => {
this._streamMapping.delete(streamId);
try {
streamController!.close();
} catch {
// Controller might already be closed
}
}
});
this._requestToStreamMapping.set(message.id, streamId);
}
}
// Write priming event if event store is configured (after mapping is set up)
await this.writePrimingEvent(streamController!, encoder, streamId, clientProtocolVersion);
// handle each message
for (const message of messages) {
// Build closeSSEStream callback for requests when eventStore is configured
// AND client supports resumability (protocol version >= 2025-11-25).
// Old clients can't resume if the stream is closed early because they
// didn't receive a priming event with an event ID.
let closeSSEStream: (() => void) | undefined;
let closeStandaloneSSEStream: (() => void) | undefined;
if (isJSONRPCRequest(message) && this._eventStore && clientProtocolVersion >= '2025-11-25') {
closeSSEStream = () => {
this.closeSSEStream(message.id);
};
closeStandaloneSSEStream = () => {
this.closeStandaloneSSEStream();
};
}
this.onmessage?.(message, { authInfo: options?.authInfo, requestInfo, closeSSEStream, closeStandaloneSSEStream });
}
// The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses
// This will be handled by the send() method when responses are ready
return new Response(readable, { status: 200, headers });
} catch (error) {
// return JSON-RPC formatted error
this.onerror?.(error as Error);
return this.createJsonErrorResponse(400, -32700, 'Parse error', { data: String(error) });
}
}
/**
* Handles DELETE requests to terminate sessions
*/
private async handleDeleteRequest(req: Request): Promise<Response> {
const sessionError = this.validateSession(req);
if (sessionError) {
return sessionError;
}
const protocolError = this.validateProtocolVersion(req);
if (protocolError) {
return protocolError;
}
await Promise.resolve(this._onsessionclosed?.(this.sessionId!));
await this.close();
return new Response(null, { status: 200 });
}
/**
* Validates session ID for non-initialization requests.
* Returns Response error if invalid, undefined otherwise
*/
private validateSession(req: Request): Response | undefined {
if (this.sessionIdGenerator === undefined) {
// If the sessionIdGenerator ID is not set, the session management is disabled
// and we don't need to validate the session ID
return undefined;
}
if (!this._initialized) {
// If the server has not been initialized yet, reject all requests
return this.createJsonErrorResponse(400, -32000, 'Bad Request: Server not initialized');
}
const sessionId = req.headers.get('mcp-session-id');
if (!sessionId) {
// Non-initialization requests without a session ID should return 400 Bad Request
return this.createJsonErrorResponse(400, -32000, 'Bad Request: Mcp-Session-Id header is required');
}
if (sessionId !== this.sessionId) {
// Reject requests with invalid session ID with 404 Not Found
return this.createJsonErrorResponse(404, -32001, 'Session not found');
}
return undefined;
}
/**
* Validates the MCP-Protocol-Version header on incoming requests.
*
* For initialization: Version negotiation handles unknown versions gracefully
* (server responds with its supported version).
*
* For subsequent requests with MCP-Protocol-Version header:
* - Accept if in supported list
* - 400 if unsupported
*
* For HTTP requests without the MCP-Protocol-Version header:
* - Accept and default to the version negotiated at initialization
*/
private validateProtocolVersion(req: Request): Response | undefined {
const protocolVersion = req.headers.get('mcp-protocol-version');
if (protocolVersion !== null && !SUPPORTED_PROTOCOL_VERSIONS.includes(protocolVersion)) {
return this.createJsonErrorResponse(
400,
-32000,
`Bad Request: Unsupported protocol version: ${protocolVersion} (supported versions: ${SUPPORTED_PROTOCOL_VERSIONS.join(', ')})`
);
}
return undefined;
}
async close(): Promise<void> {
// Close all SSE connections
this._streamMapping.forEach(({ cleanup }) => {
cleanup();
});
this._streamMapping.clear();
// Clear any pending responses
this._requestResponseMap.clear();
this.onclose?.();
}
/**
* Close an SSE stream for a specific request, triggering client reconnection.
* Use this to implement polling behavior during long-running operations -
* client will reconnect after the retry interval specified in the priming event.
*/
closeSSEStream(requestId: RequestId): void {
const streamId = this._requestToStreamMapping.get(requestId);
if (!streamId) return;
const stream = this._streamMapping.get(streamId);
if (stream) {
stream.cleanup();
}
}
/**
* Close the standalone GET SSE stream, triggering client reconnection.
* Use this to implement polling behavior for server-initiated notifications.
*/
closeStandaloneSSEStream(): void {
const stream = this._streamMapping.get(this._standaloneSseStreamId);
if (stream) {
stream.cleanup();
}
}
async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
let requestId = options?.relatedRequestId;
if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
// If the message is a response, use the request ID from the message
requestId = message.id;
}
// Check if this message should be sent on the standalone SSE stream (no request ID)
// Ignore notifications from tools (which have relatedRequestId set)
// Those will be sent via dedicated response SSE streams
if (requestId === undefined) {
// For standalone SSE streams, we can only send requests and notifications
if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
throw new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request');
}
// Generate and store event ID if event store is provided
// Store even if stream is disconnected so events can be replayed on reconnect
let eventId: string | undefined;
if (this._eventStore) {
// Stores the event and gets the generated event ID
eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message);
}
const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId);
if (standaloneSse === undefined) {
// Stream is disconnected - event is stored for replay, nothing more to do
return;
}
// Send the message to the standalone SSE stream
if (standaloneSse.controller && standaloneSse.encoder) {
this.writeSSEEvent(standaloneSse.controller, standaloneSse.encoder, message, eventId);
}
return;
}
// Get the response for this request
const streamId = this._requestToStreamMapping.get(requestId);
if (!streamId) {
throw new Error(`No connection established for request ID: ${String(requestId)}`);
}
const stream = this._streamMapping.get(streamId);
if (!this._enableJsonResponse && stream?.controller && stream?.encoder) {
// For SSE responses, generate event ID if event store is provided
let eventId: string | undefined;
if (this._eventStore) {
eventId = await this._eventStore.storeEvent(streamId, message);
}
// Write the event to the response stream
this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
}
if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
this._requestResponseMap.set(requestId, message);
const relatedIds = Array.from(this._requestToStreamMapping.entries())
.filter(([_, sid]) => sid === streamId)
.map(([id]) => id);
// Check if we have responses for all requests using this connection
const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id));
if (allResponsesReady) {
if (!stream) {
throw new Error(`No connection established for request ID: ${String(requestId)}`);
}
if (this._enableJsonResponse && stream.resolveJson) {
// All responses ready, send as JSON
const headers: Record<string, string> = {
'Content-Type': 'application/json'
};
if (this.sessionId !== undefined) {
headers['mcp-session-id'] = this.sessionId;
}
const responses = relatedIds.map(id => this._requestResponseMap.get(id)!);
if (responses.length === 1) {
stream.resolveJson(new Response(JSON.stringify(responses[0]), { status: 200, headers }));
} else {
stream.resolveJson(new Response(JSON.stringify(responses), { status: 200, headers }));
}
} else {
// End the SSE stream
stream.cleanup();
}
// Clean up
for (const id of relatedIds) {
this._requestResponseMap.delete(id);