Skip to content

Commit a58ea06

Browse files
committed
feat(examples): restore SSE transport support to all servers
Add back the legacy SSE transport endpoints (/sse and /messages) that were lost when server-utils.ts was simplified. This enables older clients like the Kotlin SDK to connect to example servers. Endpoints: - /mcp: Streamable HTTP transport (stateless mode) - /sse: Legacy SSE transport stream endpoint - /messages: Legacy SSE transport message endpoint
1 parent 2dfdb57 commit a58ea06

File tree

13 files changed

+2170
-10
lines changed

13 files changed

+2170
-10
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/**
2+
* Shared utilities for running MCP servers with various transports.
3+
*
4+
* Supports:
5+
* - Stdio transport (--stdio flag)
6+
* - Streamable HTTP transport (/mcp) - stateless mode
7+
* - Legacy SSE transport (/sse, /messages) - for older clients (e.g., Kotlin SDK)
8+
*/
9+
10+
import { createMcpExpressApp } from "@modelcontextprotocol/sdk/server/express.js";
11+
import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
12+
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
13+
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
14+
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
15+
import cors from "cors";
16+
import type { Request, Response } from "express";
17+
18+
/** Active SSE sessions: sessionId -> { server, transport } */
19+
const sseSessions = new Map<
20+
string,
21+
{ server: McpServer; transport: SSEServerTransport }
22+
>();
23+
24+
/**
25+
* Starts an MCP server using the appropriate transport based on command-line arguments.
26+
*
27+
* If `--stdio` is passed, uses stdio transport. Otherwise, uses HTTP transports.
28+
*
29+
* @param createServer - Factory function that creates a new McpServer instance.
30+
*/
31+
export async function startServer(
32+
createServer: () => McpServer,
33+
): Promise<void> {
34+
try {
35+
if (process.argv.includes("--stdio")) {
36+
await startStdioServer(createServer);
37+
} else {
38+
await startHttpServer(createServer);
39+
}
40+
} catch (e) {
41+
console.error(e);
42+
process.exit(1);
43+
}
44+
}
45+
46+
/**
47+
* Starts an MCP server with stdio transport.
48+
*
49+
* @param createServer - Factory function that creates a new McpServer instance.
50+
*/
51+
export async function startStdioServer(
52+
createServer: () => McpServer,
53+
): Promise<void> {
54+
await createServer().connect(new StdioServerTransport());
55+
}
56+
57+
/**
58+
* Starts an MCP server with HTTP transports (Streamable HTTP + legacy SSE).
59+
*
60+
* Provides:
61+
* - /mcp (GET/POST/DELETE): Streamable HTTP transport (stateless mode)
62+
* - /sse (GET) + /messages (POST): Legacy SSE transport for older clients
63+
*
64+
* @param createServer - Factory function that creates a new McpServer instance.
65+
*/
66+
export async function startHttpServer(
67+
createServer: () => McpServer,
68+
): Promise<void> {
69+
const port = parseInt(process.env.PORT ?? "3001", 10);
70+
71+
// Express app - bind to all interfaces for development/testing
72+
const expressApp = createMcpExpressApp({ host: "0.0.0.0" });
73+
expressApp.use(cors());
74+
75+
// Streamable HTTP transport (stateless mode)
76+
expressApp.all("/mcp", async (req: Request, res: Response) => {
77+
// Create fresh server and transport for each request (stateless mode)
78+
const server = createServer();
79+
const transport = new StreamableHTTPServerTransport({
80+
sessionIdGenerator: undefined,
81+
});
82+
83+
// Clean up when response ends
84+
res.on("close", () => {
85+
transport.close().catch(() => {});
86+
server.close().catch(() => {});
87+
});
88+
89+
try {
90+
await server.connect(transport);
91+
await transport.handleRequest(req, res, req.body);
92+
} catch (error) {
93+
console.error("MCP error:", error);
94+
if (!res.headersSent) {
95+
res.status(500).json({
96+
jsonrpc: "2.0",
97+
error: { code: -32603, message: "Internal server error" },
98+
id: null,
99+
});
100+
}
101+
}
102+
});
103+
104+
// Legacy SSE transport - stream endpoint
105+
expressApp.get("/sse", async (_req: Request, res: Response) => {
106+
try {
107+
const server = createServer();
108+
const transport = new SSEServerTransport("/messages", res);
109+
sseSessions.set(transport.sessionId, { server, transport });
110+
111+
res.on("close", () => {
112+
sseSessions.delete(transport.sessionId);
113+
transport.close().catch(() => {});
114+
server.close().catch(() => {});
115+
});
116+
117+
await server.connect(transport);
118+
} catch (error) {
119+
console.error("SSE error:", error);
120+
if (!res.headersSent) res.status(500).end();
121+
}
122+
});
123+
124+
// Legacy SSE transport - message endpoint
125+
expressApp.post("/messages", async (req: Request, res: Response) => {
126+
try {
127+
const sessionId = req.query.sessionId as string;
128+
const session = sseSessions.get(sessionId);
129+
130+
if (!session) {
131+
return res.status(404).json({
132+
jsonrpc: "2.0",
133+
error: { code: -32001, message: "Session not found" },
134+
id: null,
135+
});
136+
}
137+
138+
await session.transport.handlePostMessage(req, res, req.body);
139+
} catch (error) {
140+
console.error("Message error:", error);
141+
if (!res.headersSent) {
142+
res.status(500).json({
143+
jsonrpc: "2.0",
144+
error: { code: -32603, message: "Internal server error" },
145+
id: null,
146+
});
147+
}
148+
}
149+
});
150+
151+
const { promise, resolve, reject } = Promise.withResolvers<void>();
152+
153+
const httpServer = expressApp.listen(port, (err?: Error) => {
154+
if (err) return reject(err);
155+
console.log(`Server listening on http://localhost:${port}/mcp`);
156+
console.log(` SSE endpoint: http://localhost:${port}/sse`);
157+
resolve();
158+
});
159+
160+
const shutdown = () => {
161+
console.log("\nShutting down...");
162+
// Clean up all SSE sessions
163+
sseSessions.forEach(({ server, transport }) => {
164+
transport.close().catch(() => {});
165+
server.close().catch(() => {});
166+
});
167+
sseSessions.clear();
168+
httpServer.close(() => process.exit(0));
169+
};
170+
171+
process.on("SIGINT", shutdown);
172+
process.on("SIGTERM", shutdown);
173+
174+
return promise;
175+
}
176+
177+
/**
178+
* @deprecated Use startHttpServer instead
179+
*/
180+
export const startStreamableHttpServer = startHttpServer;
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/**
2+
* Shared utilities for running MCP servers with various transports.
3+
*
4+
* Supports:
5+
* - Stdio transport (--stdio flag)
6+
* - Streamable HTTP transport (/mcp) - stateless mode
7+
* - Legacy SSE transport (/sse, /messages) - for older clients (e.g., Kotlin SDK)
8+
*/
9+
10+
import { createMcpExpressApp } from "@modelcontextprotocol/sdk/server/express.js";
11+
import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
12+
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
13+
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
14+
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
15+
import cors from "cors";
16+
import type { Request, Response } from "express";
17+
18+
/** Active SSE sessions: sessionId -> { server, transport } */
19+
const sseSessions = new Map<
20+
string,
21+
{ server: McpServer; transport: SSEServerTransport }
22+
>();
23+
24+
/**
25+
* Starts an MCP server using the appropriate transport based on command-line arguments.
26+
*
27+
* If `--stdio` is passed, uses stdio transport. Otherwise, uses HTTP transports.
28+
*
29+
* @param createServer - Factory function that creates a new McpServer instance.
30+
*/
31+
export async function startServer(
32+
createServer: () => McpServer,
33+
): Promise<void> {
34+
try {
35+
if (process.argv.includes("--stdio")) {
36+
await startStdioServer(createServer);
37+
} else {
38+
await startHttpServer(createServer);
39+
}
40+
} catch (e) {
41+
console.error(e);
42+
process.exit(1);
43+
}
44+
}
45+
46+
/**
47+
* Starts an MCP server with stdio transport.
48+
*
49+
* @param createServer - Factory function that creates a new McpServer instance.
50+
*/
51+
export async function startStdioServer(
52+
createServer: () => McpServer,
53+
): Promise<void> {
54+
await createServer().connect(new StdioServerTransport());
55+
}
56+
57+
/**
58+
* Starts an MCP server with HTTP transports (Streamable HTTP + legacy SSE).
59+
*
60+
* Provides:
61+
* - /mcp (GET/POST/DELETE): Streamable HTTP transport (stateless mode)
62+
* - /sse (GET) + /messages (POST): Legacy SSE transport for older clients
63+
*
64+
* @param createServer - Factory function that creates a new McpServer instance.
65+
*/
66+
export async function startHttpServer(
67+
createServer: () => McpServer,
68+
): Promise<void> {
69+
const port = parseInt(process.env.PORT ?? "3001", 10);
70+
71+
// Express app - bind to all interfaces for development/testing
72+
const expressApp = createMcpExpressApp({ host: "0.0.0.0" });
73+
expressApp.use(cors());
74+
75+
// Streamable HTTP transport (stateless mode)
76+
expressApp.all("/mcp", async (req: Request, res: Response) => {
77+
// Create fresh server and transport for each request (stateless mode)
78+
const server = createServer();
79+
const transport = new StreamableHTTPServerTransport({
80+
sessionIdGenerator: undefined,
81+
});
82+
83+
// Clean up when response ends
84+
res.on("close", () => {
85+
transport.close().catch(() => {});
86+
server.close().catch(() => {});
87+
});
88+
89+
try {
90+
await server.connect(transport);
91+
await transport.handleRequest(req, res, req.body);
92+
} catch (error) {
93+
console.error("MCP error:", error);
94+
if (!res.headersSent) {
95+
res.status(500).json({
96+
jsonrpc: "2.0",
97+
error: { code: -32603, message: "Internal server error" },
98+
id: null,
99+
});
100+
}
101+
}
102+
});
103+
104+
// Legacy SSE transport - stream endpoint
105+
expressApp.get("/sse", async (_req: Request, res: Response) => {
106+
try {
107+
const server = createServer();
108+
const transport = new SSEServerTransport("/messages", res);
109+
sseSessions.set(transport.sessionId, { server, transport });
110+
111+
res.on("close", () => {
112+
sseSessions.delete(transport.sessionId);
113+
transport.close().catch(() => {});
114+
server.close().catch(() => {});
115+
});
116+
117+
await server.connect(transport);
118+
} catch (error) {
119+
console.error("SSE error:", error);
120+
if (!res.headersSent) res.status(500).end();
121+
}
122+
});
123+
124+
// Legacy SSE transport - message endpoint
125+
expressApp.post("/messages", async (req: Request, res: Response) => {
126+
try {
127+
const sessionId = req.query.sessionId as string;
128+
const session = sseSessions.get(sessionId);
129+
130+
if (!session) {
131+
return res.status(404).json({
132+
jsonrpc: "2.0",
133+
error: { code: -32001, message: "Session not found" },
134+
id: null,
135+
});
136+
}
137+
138+
await session.transport.handlePostMessage(req, res, req.body);
139+
} catch (error) {
140+
console.error("Message error:", error);
141+
if (!res.headersSent) {
142+
res.status(500).json({
143+
jsonrpc: "2.0",
144+
error: { code: -32603, message: "Internal server error" },
145+
id: null,
146+
});
147+
}
148+
}
149+
});
150+
151+
const { promise, resolve, reject } = Promise.withResolvers<void>();
152+
153+
const httpServer = expressApp.listen(port, (err?: Error) => {
154+
if (err) return reject(err);
155+
console.log(`Server listening on http://localhost:${port}/mcp`);
156+
console.log(` SSE endpoint: http://localhost:${port}/sse`);
157+
resolve();
158+
});
159+
160+
const shutdown = () => {
161+
console.log("\nShutting down...");
162+
// Clean up all SSE sessions
163+
sseSessions.forEach(({ server, transport }) => {
164+
transport.close().catch(() => {});
165+
server.close().catch(() => {});
166+
});
167+
sseSessions.clear();
168+
httpServer.close(() => process.exit(0));
169+
};
170+
171+
process.on("SIGINT", shutdown);
172+
process.on("SIGTERM", shutdown);
173+
174+
return promise;
175+
}
176+
177+
/**
178+
* @deprecated Use startHttpServer instead
179+
*/
180+
export const startStreamableHttpServer = startHttpServer;

0 commit comments

Comments
 (0)