Skip to content

Commit c04a6f0

Browse files
committed
feat(server): add keepAliveInterval to standalone GET SSE stream
Adds an opt-in keepAliveInterval option to WebStandardStreamableHTTPServerTransportOptions that sends periodic SSE comments (`: keepalive`) on the standalone GET SSE stream. Reverse proxies commonly close connections that are idle for 30-60s. With no server-initiated messages, the GET SSE stream has no traffic during quiet periods, causing silent disconnections. This option lets operators send harmless SSE comments at a configurable cadence to keep the connection alive. The timer is cleared on close(), closeStandaloneSSEStream(), and on stream cancellation. Disabled by default; no behavior change for existing deployments. Addresses upstream #28, #876.
1 parent df4b6cc commit c04a6f0

File tree

3 files changed

+129
-0
lines changed

3 files changed

+129
-0
lines changed

.changeset/add-sse-keepalive.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@modelcontextprotocol/server': minor
3+
---
4+
5+
Add optional `keepAliveInterval` to `WebStandardStreamableHTTPServerTransportOptions` that sends periodic SSE comments on the standalone GET stream to prevent reverse proxy idle timeout disconnections.

packages/server/src/server/streamableHttp.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,15 @@ export interface WebStandardStreamableHTTPServerTransportOptions {
141141
*/
142142
retryInterval?: number;
143143

144+
/**
145+
* Interval in milliseconds for sending SSE keepalive comments on the standalone
146+
* GET SSE stream. When set, the transport sends periodic SSE comments
147+
* (`: keepalive`) to prevent reverse proxies from closing idle connections.
148+
*
149+
* Disabled by default (no keepalive comments are sent).
150+
*/
151+
keepAliveInterval?: number;
152+
144153
/**
145154
* List of protocol versions that this transport will accept.
146155
* Used to validate the `mcp-protocol-version` header in incoming requests.
@@ -239,6 +248,8 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
239248
private _allowedOrigins?: string[];
240249
private _enableDnsRebindingProtection: boolean;
241250
private _retryInterval?: number;
251+
private _keepAliveInterval?: number;
252+
private _keepAliveTimer?: ReturnType<typeof setInterval>;
242253
private _supportedProtocolVersions: string[];
243254

244255
sessionId?: string;
@@ -256,6 +267,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
256267
this._allowedOrigins = options.allowedOrigins;
257268
this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false;
258269
this._retryInterval = options.retryInterval;
270+
this._keepAliveInterval = options.keepAliveInterval;
259271
this._supportedProtocolVersions = options.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS;
260272
}
261273

@@ -446,6 +458,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
446458
},
447459
cancel: () => {
448460
// Stream was cancelled by client
461+
this._clearKeepAliveTimer();
449462
this._streamMapping.delete(this._standaloneSseStreamId);
450463
}
451464
});
@@ -475,6 +488,19 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
475488
}
476489
});
477490

491+
// Start keepalive timer to send periodic SSE comments that prevent
492+
// reverse proxies from closing the connection due to idle timeouts
493+
if (this._keepAliveInterval !== undefined) {
494+
this._keepAliveTimer = setInterval(() => {
495+
try {
496+
streamController!.enqueue(encoder.encode(': keepalive\n\n'));
497+
} catch {
498+
// Controller is closed or errored, stop sending keepalives
499+
this._clearKeepAliveTimer();
500+
}
501+
}, this._keepAliveInterval);
502+
}
503+
478504
return new Response(readable, { headers });
479505
}
480506

@@ -897,11 +923,19 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
897923
return undefined;
898924
}
899925

926+
private _clearKeepAliveTimer(): void {
927+
if (this._keepAliveTimer !== undefined) {
928+
clearInterval(this._keepAliveTimer);
929+
this._keepAliveTimer = undefined;
930+
}
931+
}
932+
900933
async close(): Promise<void> {
901934
if (this._closed) {
902935
return;
903936
}
904937
this._closed = true;
938+
this._clearKeepAliveTimer();
905939

906940
// Close all SSE connections
907941
for (const { cleanup } of this._streamMapping.values()) {
@@ -934,6 +968,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
934968
* Use this to implement polling behavior for server-initiated notifications.
935969
*/
936970
closeStandaloneSSEStream(): void {
971+
this._clearKeepAliveTimer();
937972
const stream = this._streamMapping.get(this._standaloneSseStreamId);
938973
if (stream) {
939974
stream.cleanup();

packages/server/test/server/streamableHttp.test.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,4 +993,93 @@ describe('Zod v4', () => {
993993
expect(cleanupCalls).toEqual(['stream-1']);
994994
});
995995
});
996+
997+
describe('HTTPServerTransport - keepAliveInterval', () => {
998+
let transport: WebStandardStreamableHTTPServerTransport;
999+
let mcpServer: McpServer;
1000+
1001+
beforeEach(() => {
1002+
vi.useFakeTimers();
1003+
});
1004+
1005+
afterEach(async () => {
1006+
vi.useRealTimers();
1007+
await transport.close();
1008+
});
1009+
1010+
async function setupTransport(keepAliveInterval?: number): Promise<string> {
1011+
mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' });
1012+
1013+
transport = new WebStandardStreamableHTTPServerTransport({
1014+
sessionIdGenerator: () => randomUUID(),
1015+
keepAliveInterval
1016+
});
1017+
1018+
await mcpServer.connect(transport);
1019+
1020+
const initReq = createRequest('POST', TEST_MESSAGES.initialize);
1021+
const initRes = await transport.handleRequest(initReq);
1022+
return initRes.headers.get('mcp-session-id') as string;
1023+
}
1024+
1025+
it('should send SSE keepalive comments periodically when keepAliveInterval is set', async () => {
1026+
const sessionId = await setupTransport(50);
1027+
1028+
const getReq = createRequest('GET', undefined, { sessionId });
1029+
const getRes = await transport.handleRequest(getReq);
1030+
1031+
expect(getRes.status).toBe(200);
1032+
expect(getRes.body).not.toBeNull();
1033+
1034+
const reader = getRes.body!.getReader();
1035+
1036+
// Advance past two intervals to accumulate keepalive comments
1037+
vi.advanceTimersByTime(120);
1038+
1039+
const { value } = await reader.read();
1040+
const text = new TextDecoder().decode(value);
1041+
expect(text).toContain(': keepalive');
1042+
});
1043+
1044+
it('should not send SSE comments when keepAliveInterval is not set', async () => {
1045+
const sessionId = await setupTransport(undefined);
1046+
1047+
const getReq = createRequest('GET', undefined, { sessionId });
1048+
const getRes = await transport.handleRequest(getReq);
1049+
1050+
expect(getRes.status).toBe(200);
1051+
expect(getRes.body).not.toBeNull();
1052+
1053+
const reader = getRes.body!.getReader();
1054+
1055+
// Advance time; no keepalive should be enqueued
1056+
vi.advanceTimersByTime(200);
1057+
1058+
// Close the transport to end the stream, then read whatever was buffered
1059+
await transport.close();
1060+
1061+
const chunks: string[] = [];
1062+
for (let result = await reader.read(); !result.done; result = await reader.read()) {
1063+
chunks.push(new TextDecoder().decode(result.value));
1064+
}
1065+
1066+
const allText = chunks.join('');
1067+
expect(allText).not.toContain(': keepalive');
1068+
});
1069+
1070+
it('should clear the keepalive interval when the transport is closed', async () => {
1071+
const sessionId = await setupTransport(50);
1072+
1073+
const getReq = createRequest('GET', undefined, { sessionId });
1074+
const getRes = await transport.handleRequest(getReq);
1075+
1076+
expect(getRes.status).toBe(200);
1077+
1078+
// Close the transport, which should clear the interval
1079+
await transport.close();
1080+
1081+
// Advancing timers after close should not throw
1082+
vi.advanceTimersByTime(200);
1083+
});
1084+
});
9961085
});

0 commit comments

Comments
 (0)