Skip to content

Commit c2119ed

Browse files
WIP shttp changes
1 parent 8579d44 commit c2119ed

4 files changed

Lines changed: 88 additions & 58 deletions

File tree

src/handlers/shttp.ts

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
import { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js";
22
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
33
import { Request, Response } from "express";
4-
import { getFirstShttpTransport, getShttpTransport, isLive, startServerListeningToRedis } from "../services/redisTransport.js";
4+
import { getFirstShttpTransport, getShttpTransport, isLive, RedisTransport, startServerListeningToRedis } from "../services/redisTransport.js";
55
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
66
import { randomUUID } from "crypto";
77
import { createMcpServer } from "../services/mcp.js";
8-
import getRawBody from "raw-body";
98

109

1110
declare module "express-serve-static-core" {
@@ -22,7 +21,8 @@ export async function handleStreamableHTTP(req: Request, res: Response) {
2221
console.log('Received MCP request:', JSON.stringify(req.body, null, 2));
2322
console.log('Request headers:', JSON.stringify(req.headers, null, 2));
2423

25-
let transport: StreamableHTTPServerTransport | undefined = undefined;
24+
let shttpTransport: StreamableHTTPServerTransport | undefined = undefined;
25+
let redisTransport: RedisTransport | undefined = undefined;
2626
try {
2727
// Check for existing session ID
2828
const sessionId = req.headers['mcp-session-id'] as string | undefined;
@@ -31,7 +31,11 @@ export async function handleStreamableHTTP(req: Request, res: Response) {
3131
if (sessionId && await isLive(sessionId)) {
3232
console.log('Session is live, reusing existing transport for session:', sessionId);
3333
// Reuse existing transport
34-
transport = await getShttpTransport(sessionId)
34+
// const { shttpTransport, redisTransport } = await getShttpTransport(sessionId);
35+
// destructuring to get both transports
36+
({ shttpTransport, redisTransport } = await getShttpTransport(sessionId));
37+
38+
console.log('Created transport for session:', sessionId);
3539
console.log('Retrieved transport from Redis for session:', sessionId);
3640
} else if (!sessionId && isInitializeRequest(req.body)) {
3741
console.log('New initialization request detected, creating new session');
@@ -45,20 +49,9 @@ export async function handleStreamableHTTP(req: Request, res: Response) {
4549
startServerListeningToRedis(server.server, sessionId)
4650
console.log('Started server listening to Redis for session:', sessionId);
4751

48-
transport = await getFirstShttpTransport(sessionId);
52+
({ shttpTransport, redisTransport } = await getFirstShttpTransport(sessionId));
4953
console.log('Retrieved first transport for session:', sessionId);
50-
console.log('Transport object:', transport.constructor.name, 'sessionId:', transport.sessionId);
51-
52-
// Connect the transport to the MCP server BEFORE handling the request
53-
console.log('Connecting transport to MCP server...');
54-
await server.server.connect(transport);
55-
console.log('Transport connected successfully');
56-
57-
console.log('Handling initialization request...');
58-
await transport.handleRequest(req, res, req.body);
59-
console.log('Initialization request handled successfully');
60-
console.log('=== handleStreamableHTTP END (initialization) ===');
61-
return; // Already handled
54+
console.log('Transport object:', shttpTransport.constructor.name, 'sessionId:', shttpTransport.sessionId);
6255
} else {
6356
console.log('Invalid request - no session ID and not an initialization request');
6457
console.log('Session ID present:', !!sessionId);
@@ -79,9 +72,9 @@ export async function handleStreamableHTTP(req: Request, res: Response) {
7972

8073
// Handle the request with existing transport - no need to reconnect
8174
console.log('Handling request with existing transport for session:', sessionId);
82-
console.log('Transport object:', transport.constructor.name, 'sessionId:', transport.sessionId);
83-
console.log('Request body:', JSON.stringify(req.body, null, 2));
84-
await transport.handleRequest(req, res, req.body);
75+
console.log('Transport object:', shttpTransport.constructor.name, 'sessionId:', shttpTransport.sessionId);
76+
console.log('Request body:', JSON.stringify(req.body, null, 2));
77+
await shttpTransport.handleRequest(req, res, req.body);
8578
console.log('Request handled successfully');
8679
} catch (error) {
8780
console.error('=== ERROR in handleStreamableHTTP ===');
@@ -104,12 +97,13 @@ export async function handleStreamableHTTP(req: Request, res: Response) {
10497
console.log('Response headers already sent, cannot send error response');
10598
}
10699
} finally {
107-
// if (transport) {
108-
// console.log('Closing transport in finally block');
109-
// // Close transports because they are ephemeral in this setup.
110-
// transport.close();
111-
// console.log('Transport closed');
112-
// }
100+
// Set up cleanup when response is complete
101+
res.on('finish', async () => {
102+
console.log('HTTP response finished, closing transport');
103+
await shttpTransport?.close();
104+
// await redisTransport?.close();
105+
console.log('Transport closed after response');
106+
});
113107
console.log('=== handleStreamableHTTP END ===');
114108
}
115109
}

src/index.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { requireBearerAuth } from "@modelcontextprotocol/sdk/server/auth/middleware/bearerAuth.js";
1+
import { BearerAuthMiddlewareOptions, requireBearerAuth } from "@modelcontextprotocol/sdk/server/auth/middleware/bearerAuth.js";
22
import { AuthRouterOptions, mcpAuthRouter } from "@modelcontextprotocol/sdk/server/auth/router.js";
33
import cors from "cors";
44
import express from "express";
@@ -9,6 +9,7 @@ import { handleFakeAuthorize, handleFakeAuthorizeRedirect } from "./handlers/fak
99
import { handleStreamableHTTP } from "./handlers/shttp.js";
1010
import { handleMessage, handleSSEConnection } from "./handlers/sse.js";
1111
import { redisClient } from "./redis.js";
12+
import { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js";
1213

1314
const app = express();
1415

@@ -78,6 +79,8 @@ app.use(baseSecurityHeaders);
7879
// Enable CORS pre-flight requests
7980
app.options('*', cors(corsOptions));
8081

82+
83+
const authProvider = new EverythingAuthProvider();
8184
// Auth configuration
8285
const options: AuthRouterOptions = {
8386
provider: new EverythingAuthProvider(),
@@ -89,8 +92,16 @@ const options: AuthRouterOptions = {
8992
}
9093
}
9194
};
95+
96+
const dearerAuthMiddlewareOptions: BearerAuthMiddlewareOptions = {
97+
// verifyAccessToken(token: string): Promise<AuthInfo>;
98+
verifier: {
99+
verifyAccessToken: authProvider.verifyAccessToken.bind(authProvider),
100+
}
101+
}
102+
92103
app.use(mcpAuthRouter(options));
93-
const bearerAuth = requireBearerAuth(options);
104+
const bearerAuth = requireBearerAuth(dearerAuthMiddlewareOptions);
94105

95106
// MCP routes (legacy SSE transport)
96107
app.get("/sse", cors(corsOptions), bearerAuth, authContext, sseHeaders, handleSSEConnection);

src/services/mcp.ts

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@ import {
1515
SetLevelRequestSchema,
1616
SubscribeRequestSchema,
1717
Tool,
18-
ToolSchema,
1918
UnsubscribeRequestSchema,
2019
} from "@modelcontextprotocol/sdk/types.js";
2120
import { z } from "zod";
2221
import { zodToJsonSchema } from "zod-to-json-schema";
2322

24-
type ToolInput = z.infer<typeof ToolSchema.shape.inputSchema>;
23+
type ToolInput = {
24+
type: "object";
25+
properties?: Record<string, unknown>;
26+
required?: string[];
27+
};
2528

2629
/* Input schemas for tools implemented in this server */
2730
const EchoSchema = z.object({
@@ -109,18 +112,6 @@ export const createMcpServer = () => {
109112
}
110113
);
111114

112-
// Add logging to track all incoming messages
113-
const originalSetRequestHandler = server.setRequestHandler.bind(server);
114-
server.setRequestHandler = function(schema: any, handler: any) {
115-
const wrappedHandler = async (request: any) => {
116-
console.log(`[MCP Server] Handling request: ${request.method}`);
117-
const result = await handler(request);
118-
console.log(`[MCP Server] Request ${request.method} completed`);
119-
return result;
120-
};
121-
return originalSetRequestHandler(schema, wrappedHandler);
122-
};
123-
124115
const subscriptions: Set<string> = new Set();
125116

126117
// Set up update interval for subscribed resources

src/services/redisTransport.ts

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js";
55
import { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
66
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
77

8+
let redisTransportCounter = 0;
89

910
function getToServerChannel(sessionId: string): string {
1011
return `mcp:shttp:toserver:${sessionId}`;
@@ -24,6 +25,7 @@ export async function isLive(sessionId: string): Promise<boolean> {
2425

2526
export class RedisTransport implements Transport {
2627
private redisCleanup: (() => Promise<void>) | undefined;
28+
private counter: number;
2729

2830
onclose?: (() => void) | undefined;
2931
onerror?: ((error: Error) => void) | undefined;
@@ -34,6 +36,7 @@ export class RedisTransport implements Transport {
3436
private recvChannel: string,
3537
private isLiveKey: string | undefined = undefined
3638
) {
39+
this.counter = redisTransportCounter++;
3740
this.sendChannel = sendChannel;
3841
this.recvChannel = recvChannel;
3942
this.isLiveKey = isLiveKey;
@@ -42,18 +45,26 @@ export class RedisTransport implements Transport {
4245

4346

4447
async start(): Promise<void> {
45-
console.log(`[RedisTransport.start] Starting transport - send: ${this.sendChannel}, recv: ${this.recvChannel}`);
48+
console.log(`[RedisTransport.${this.counter}.start] Starting transport - send: ${this.sendChannel}, recv: ${this.recvChannel}`);
4649
if (this.redisCleanup) {
4750
throw new Error(`Redis transport already started for channels ${this.sendChannel} and ${this.recvChannel}`);
4851
}
4952

53+
// Log when onmessage is set
54+
console.log(`[RedisTransport.${this.counter}.start] onmessage handler is ${this.onmessage ? 'SET' : 'NOT SET'}`);
55+
5056
this.redisCleanup = await redisClient.createSubscription(
5157
this.recvChannel,
5258
(json) => {
53-
console.log(`[RedisTransport] Received message on ${this.recvChannel}:`, json.substring(0, 100));
59+
console.log(`[RedisTransport.${this.counter}] Received message on ${this.recvChannel}:`, json.substring(0, 100));
5460
const message = JSON.parse(json);
5561
const extra = popExtra(message);
56-
this.onmessage?.(message, extra)
62+
if (this.onmessage) {
63+
console.log(`[RedisTransport.${this.counter}] Calling onmessage handler for ${this.recvChannel}`);
64+
this.onmessage(message, extra)
65+
} else {
66+
console.log(`[RedisTransport.${this.counter}] WARNING: No onmessage handler for ${this.recvChannel}!`);
67+
}
5768
},
5869
(error) => {
5970
console.error(
@@ -63,33 +74,37 @@ export class RedisTransport implements Transport {
6374
this.close()
6475
},
6576
);
66-
console.log(`[RedisTransport.start] Successfully subscribed to ${this.recvChannel}`);
77+
console.log(`[RedisTransport.${this.counter}.start] Successfully subscribed to ${this.recvChannel}`);
6778
}
6879

69-
send(message: JSONRPCMessage, options?: TransportSendOptions): Promise<void> {
80+
async send(message: JSONRPCMessage, options?: TransportSendOptions): Promise<void> {
7081
if (options) {
7182
setOptions(message, options);
7283
}
7384
const messageStr = JSON.stringify(message);
74-
console.log(`[RedisTransport.send] Publishing to ${this.sendChannel}:`, messageStr.substring(0, 100));
75-
return redisClient.publish(this.sendChannel, messageStr);
85+
console.log(`[RedisTransport.${this.counter}.send] Publishing to ${this.sendChannel}:`, messageStr.substring(0, 100));
86+
console.log(`[RedisTransport.${this.counter}.send] Full message:`, messageStr);
87+
await redisClient.publish(this.sendChannel, messageStr);
88+
console.log(`[RedisTransport.${this.counter}.send] Published successfully to ${this.sendChannel}`);
7689
}
7790

7891

7992
async cleanup(): Promise<void> {
8093
if (this.redisCleanup) {
94+
console.log(`[RedisTransport.${this.counter}.cleanup] Unsubscribing from ${this.recvChannel}`);
8195
await this.redisCleanup()
8296
this.redisCleanup = undefined;
97+
console.log(`[RedisTransport.${this.counter}.cleanup] Successfully unsubscribed from ${this.recvChannel}`);
8398
}
8499
if (this.isLiveKey) {
85100
await redisClient.del(this.isLiveKey);
86101
}
87102
}
88103

89104
async close(): Promise<void> {
90-
console.log(`[RedisTransport.close] Closing transport - send: ${this.sendChannel}, recv: ${this.recvChannel}`);
105+
console.log(`[RedisTransport.${this.counter}.close] Closing transport - send: ${this.sendChannel}, recv: ${this.recvChannel}`);
91106
this.onclose?.();
92-
this.cleanup()
107+
await this.cleanup()
93108
}
94109
}
95110

@@ -186,18 +201,36 @@ function relayTransports(transport1: Transport, transport2: Transport): void {
186201
transport1.onerror?.(error);
187202
};
188203

204+
// Prevent circular close calls
205+
let closing = false;
206+
189207
transport1.onclose = () => {
190-
transport2.close().catch(console.error);
208+
if (!closing) {
209+
closing = true;
210+
transport2.close().catch(console.error);
211+
}
191212
};
192213
transport2.onclose = () => {
193-
transport1.close().catch(console.error);
214+
if (!closing) {
215+
closing = true;
216+
transport1.close().catch(console.error);
217+
}
194218
};
195219
}
196220

197221

198222
export async function startServerListeningToRedis(server: Server, sessionId: string) {
199223
console.log(`[startServerListeningToRedis] Starting background server for session ${sessionId}`);
200224
const serverRedisTransport = createBackgroundTaskSideRedisTransport(sessionId)
225+
226+
// Log all messages sent by the server
227+
const originalSend = serverRedisTransport.send.bind(serverRedisTransport);
228+
serverRedisTransport.send = async (message, options) => {
229+
console.log(`[MCP Server -> Redis] Sending response:`, JSON.stringify(message).substring(0, 200));
230+
return originalSend(message, options);
231+
};
232+
233+
// The server.connect() will call start() on the transport
201234
await server.connect(serverRedisTransport)
202235
console.log(`[startServerListeningToRedis] Background server connected for session ${sessionId}`);
203236
}
@@ -216,7 +249,7 @@ function createBackgroundTaskSideRedisTransport(sessionId: string): RedisTranspo
216249
);
217250
}
218251

219-
export async function getFirstShttpTransport(sessionId: string): Promise<StreamableHTTPServerTransport> {
252+
export async function getFirstShttpTransport(sessionId: string): Promise<{shttpTransport: StreamableHTTPServerTransport, redisTransport: RedisTransport}> {
220253
const transport = new StreamableHTTPServerTransport({
221254
sessionIdGenerator: () => sessionId,
222255
enableJsonResponse: true, // Enable JSON response mode
@@ -227,14 +260,15 @@ export async function getFirstShttpTransport(sessionId: string): Promise<Streama
227260
// When shttpTransport closes, so does the redisTransport
228261
relayTransports(redisTransport, transport);
229262
await redisTransport.start()
230-
return transport;
263+
return { shttpTransport: transport, redisTransport };
231264
}
232265

233-
export async function getShttpTransport(sessionId: string): Promise<StreamableHTTPServerTransport> {
266+
export async function getShttpTransport(sessionId: string): Promise<{shttpTransport: StreamableHTTPServerTransport, redisTransport: RedisTransport}> {
234267
// Giving undefined here and setting the sessionId means the
235268
// transport wont try to create a new session.
236269
const shttpTransport = new StreamableHTTPServerTransport({
237-
sessionIdGenerator: undefined
270+
sessionIdGenerator: undefined,
271+
enableJsonResponse: true, // Use JSON response mode for all requests
238272
})
239273
shttpTransport.sessionId = sessionId;
240274

@@ -243,5 +277,5 @@ export async function getShttpTransport(sessionId: string): Promise<StreamableHT
243277
// When shttpTransport closes, so does the redisTransport
244278
relayTransports(redisTransport, shttpTransport);
245279
await redisTransport.start()
246-
return shttpTransport;
280+
return { shttpTransport, redisTransport };
247281
}

0 commit comments

Comments
 (0)