Skip to content

Commit 5a925e1

Browse files
rcdaileyfelixweinberger
authored andcommitted
feat(server): add keepAliveInterval to standalone GET SSE stream
Adds an opt-in keepAliveInterval option to WebStandardStreamableHTTPServerTransportOptions that sends periodic SSE comments (`: keepalive`) on the standalone GET SSE stream. Reverse proxies commonly close connections that are idle for 30-60s. With no server-initiated messages, the GET SSE stream has no traffic during quiet periods, causing silent disconnections. This option lets operators send harmless SSE comments at a configurable cadence to keep the connection alive. The timer is cleared on close(), closeStandaloneSSEStream(), and on stream cancellation. Disabled by default; no behavior change for existing deployments. Addresses upstream #28, #876.
1 parent fcde488 commit 5a925e1

File tree

3 files changed

+130
-0
lines changed

3 files changed

+130
-0
lines changed

.changeset/add-sse-keepalive.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@modelcontextprotocol/server': minor
3+
---
4+
5+
Add optional `keepAliveInterval` to `WebStandardStreamableHTTPServerTransportOptions` that sends periodic SSE comments on the standalone GET stream to prevent reverse proxy idle timeout disconnections.

packages/server/src/server/streamableHttp.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,15 @@ export interface WebStandardStreamableHTTPServerTransportOptions {
141141
*/
142142
retryInterval?: number;
143143

144+
/**
145+
* Interval in milliseconds for sending SSE keepalive comments on the standalone
146+
* GET SSE stream. When set, the transport sends periodic SSE comments
147+
* (`: keepalive`) to prevent reverse proxies from closing idle connections.
148+
*
149+
* Disabled by default (no keepalive comments are sent).
150+
*/
151+
keepAliveInterval?: number;
152+
144153
/**
145154
* List of protocol versions that this transport will accept.
146155
* Used to validate the `mcp-protocol-version` header in incoming requests.
@@ -238,6 +247,8 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
238247
private _allowedOrigins?: string[];
239248
private _enableDnsRebindingProtection: boolean;
240249
private _retryInterval?: number;
250+
private _keepAliveInterval?: number;
251+
private _keepAliveTimer?: ReturnType<typeof setInterval>;
241252
private _supportedProtocolVersions: string[];
242253

243254
sessionId?: string;
@@ -255,6 +266,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
255266
this._allowedOrigins = options.allowedOrigins;
256267
this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false;
257268
this._retryInterval = options.retryInterval;
269+
this._keepAliveInterval = options.keepAliveInterval;
258270
this._supportedProtocolVersions = options.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS;
259271
}
260272

@@ -445,6 +457,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
445457
},
446458
cancel: () => {
447459
// Stream was cancelled by client
460+
this._clearKeepAliveTimer();
448461
this._streamMapping.delete(this._standaloneSseStreamId);
449462
}
450463
});
@@ -474,6 +487,19 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
474487
}
475488
});
476489

490+
// Start keepalive timer to send periodic SSE comments that prevent
491+
// reverse proxies from closing the connection due to idle timeouts
492+
if (this._keepAliveInterval !== undefined) {
493+
this._keepAliveTimer = setInterval(() => {
494+
try {
495+
streamController!.enqueue(encoder.encode(': keepalive\n\n'));
496+
} catch {
497+
// Controller is closed or errored, stop sending keepalives
498+
this._clearKeepAliveTimer();
499+
}
500+
}, this._keepAliveInterval);
501+
}
502+
477503
return new Response(readable, { headers });
478504
}
479505

@@ -899,7 +925,16 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
899925
return undefined;
900926
}
901927

928+
private _clearKeepAliveTimer(): void {
929+
if (this._keepAliveTimer !== undefined) {
930+
clearInterval(this._keepAliveTimer);
931+
this._keepAliveTimer = undefined;
932+
}
933+
}
934+
902935
async close(): Promise<void> {
936+
this._clearKeepAliveTimer();
937+
903938
// Close all SSE connections
904939
for (const { cleanup } of this._streamMapping.values()) {
905940
cleanup();
@@ -931,6 +966,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
931966
* Use this to implement polling behavior for server-initiated notifications.
932967
*/
933968
closeStandaloneSSEStream(): void {
969+
this._clearKeepAliveTimer();
934970
const stream = this._streamMapping.get(this._standaloneSseStreamId);
935971
if (stream) {
936972
stream.cleanup();

packages/server/test/server/streamableHttp.test.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,4 +956,93 @@ describe('Zod v4', () => {
956956
expect(error?.message).toContain('Unsupported protocol version');
957957
});
958958
});
959+
960+
describe('HTTPServerTransport - keepAliveInterval', () => {
961+
let transport: WebStandardStreamableHTTPServerTransport;
962+
let mcpServer: McpServer;
963+
964+
beforeEach(() => {
965+
vi.useFakeTimers();
966+
});
967+
968+
afterEach(async () => {
969+
vi.useRealTimers();
970+
await transport.close();
971+
});
972+
973+
async function setupTransport(keepAliveInterval?: number): Promise<string> {
974+
mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' });
975+
976+
transport = new WebStandardStreamableHTTPServerTransport({
977+
sessionIdGenerator: () => randomUUID(),
978+
keepAliveInterval
979+
});
980+
981+
await mcpServer.connect(transport);
982+
983+
const initReq = createRequest('POST', TEST_MESSAGES.initialize);
984+
const initRes = await transport.handleRequest(initReq);
985+
return initRes.headers.get('mcp-session-id') as string;
986+
}
987+
988+
it('should send SSE keepalive comments periodically when keepAliveInterval is set', async () => {
989+
const sessionId = await setupTransport(50);
990+
991+
const getReq = createRequest('GET', undefined, { sessionId });
992+
const getRes = await transport.handleRequest(getReq);
993+
994+
expect(getRes.status).toBe(200);
995+
expect(getRes.body).not.toBeNull();
996+
997+
const reader = getRes.body!.getReader();
998+
999+
// Advance past two intervals to accumulate keepalive comments
1000+
vi.advanceTimersByTime(120);
1001+
1002+
const { value } = await reader.read();
1003+
const text = new TextDecoder().decode(value);
1004+
expect(text).toContain(': keepalive');
1005+
});
1006+
1007+
it('should not send SSE comments when keepAliveInterval is not set', async () => {
1008+
const sessionId = await setupTransport(undefined);
1009+
1010+
const getReq = createRequest('GET', undefined, { sessionId });
1011+
const getRes = await transport.handleRequest(getReq);
1012+
1013+
expect(getRes.status).toBe(200);
1014+
expect(getRes.body).not.toBeNull();
1015+
1016+
const reader = getRes.body!.getReader();
1017+
1018+
// Advance time; no keepalive should be enqueued
1019+
vi.advanceTimersByTime(200);
1020+
1021+
// Close the transport to end the stream, then read whatever was buffered
1022+
await transport.close();
1023+
1024+
const chunks: string[] = [];
1025+
for (let result = await reader.read(); !result.done; result = await reader.read()) {
1026+
chunks.push(new TextDecoder().decode(result.value));
1027+
}
1028+
1029+
const allText = chunks.join('');
1030+
expect(allText).not.toContain(': keepalive');
1031+
});
1032+
1033+
it('should clear the keepalive interval when the transport is closed', async () => {
1034+
const sessionId = await setupTransport(50);
1035+
1036+
const getReq = createRequest('GET', undefined, { sessionId });
1037+
const getRes = await transport.handleRequest(getReq);
1038+
1039+
expect(getRes.status).toBe(200);
1040+
1041+
// Close the transport, which should clear the interval
1042+
await transport.close();
1043+
1044+
// Advancing timers after close should not throw
1045+
vi.advanceTimersByTime(200);
1046+
});
1047+
});
9591048
});

0 commit comments

Comments
 (0)