Skip to content

Commit d62847e

Browse files
Part way through refactoring the redis transport
1 parent c2119ed commit d62847e

1 file changed

Lines changed: 58 additions & 116 deletions

File tree

src/services/redisTransport.ts

Lines changed: 58 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,30 @@ import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/
22
import { redisClient } from "../redis.js";
33
import { Transport, TransportSendOptions } from "@modelcontextprotocol/sdk/shared/transport.js";
44
import { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js";
5-
import { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
5+
import { JSONRPCMessage, MessageExtraInfo } from "@modelcontextprotocol/sdk/types.js";
66
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
77

88
let redisTransportCounter = 0;
99

10+
interface RedisMessage {
11+
mcpMessage: JSONRPCMessage;
12+
extra?: MessageExtraInfo;
13+
options?: TransportSendOptions;
14+
}
15+
16+
function sendToMcpServer(sessionId: string, message: JSONRPCMessage, extra?: { authInfo?: AuthInfo; }, options?: TransportSendOptions): Promise<void> {
17+
const toServerChannel = getToServerChannel(sessionId);
18+
const redisMessage: RedisMessage = { mcpMessage: message, extra, options };
19+
console.log(`[sendToServerChannel] Publishing to ${toServerChannel}:`, JSON.stringify(redisMessage).substring(0, 100));
20+
return redisClient.publish(toServerChannel, JSON.stringify(redisMessage));
21+
}
22+
1023
function getToServerChannel(sessionId: string): string {
1124
return `mcp:shttp:toserver:${sessionId}`;
1225
}
1326

14-
function getToClientChannel(sessionId: string): string {
15-
return `mcp:shttp:toclient:${sessionId}`;
27+
function getToClientChannel(sessionId: string, relatedRequestId: string): string {
28+
return `mcp:shttp:toclient:${sessionId}:${relatedRequestId}`;
1629
}
1730

1831
export async function isLive(sessionId: string): Promise<boolean> {
@@ -23,6 +36,48 @@ export async function isLive(sessionId: string): Promise<boolean> {
2336
}
2437

2538

39+
export function redisRelayToMcpServer(sessionId: string, transport: Transport): () => Promise<void> {
40+
let redisCleanup: (() => Promise<void>) | undefined = undefined;
41+
const cleanup = async () => {
42+
// TODO: solve race conditions where we call cleanup while the subscription is being created / before it is created
43+
if (redisCleanup) {
44+
await redisCleanup();
45+
}
46+
}
47+
48+
new Promise<JSONRPCMessage>((resolve) => {
49+
transport.onmessage = async (message, extra) => {
50+
await sendToMcpServer(sessionId, message, extra);
51+
resolve(message);
52+
}
53+
}).then(async (message) => {
54+
// check for request id in the message
55+
if (!("id" in message)) {
56+
// if no id, it's a notification, so we return
57+
return cleanup;
58+
}
59+
// otherwise we subscribe to the response channel
60+
const toClientChannel = getToClientChannel(sessionId, message.id.toString());
61+
62+
console.log(`[redisRelayToMcpServer] Subscribing to ${toClientChannel} for response to request ${message.id}`);
63+
64+
redisCleanup = await redisClient.createSubscription(toClientChannel, async (redisMessageJson) => {
65+
const redisMessage = JSON.parse(redisMessageJson) as RedisMessage;
66+
await transport.send(redisMessage.mcpMessage, redisMessage.options);
67+
}, (error) => {
68+
console.error(`[redisRelayToMcpServer] Error in Redis subscriber for ${toClientChannel}:`, error);
69+
transport.onerror?.(error);
70+
});
71+
}).catch((error) => {
72+
console.error(`[redisRelayToMcpServer] Error setting up Redis relay for session ${sessionId}:`, error);
73+
transport.onerror?.(error);
74+
cleanup();
75+
});
76+
77+
return cleanup;
78+
}
79+
80+
2681
export class RedisTransport implements Transport {
2782
private redisCleanup: (() => Promise<void>) | undefined;
2883
private counter: number;
@@ -42,8 +97,6 @@ export class RedisTransport implements Transport {
4297
this.isLiveKey = isLiveKey;
4398
}
4499

45-
46-
47100
async start(): Promise<void> {
48101
console.log(`[RedisTransport.${this.counter}.start] Starting transport - send: ${this.sendChannel}, recv: ${this.recvChannel}`);
49102
if (this.redisCleanup) {
@@ -108,117 +161,6 @@ export class RedisTransport implements Transport {
108161
}
109162
}
110163

111-
112-
function setExtra(message: JSONRPCMessage, extra: { authInfo?: AuthInfo; } | undefined): void {
113-
if (!extra) {
114-
return;
115-
}
116-
if ("result" in message && typeof message.result === 'object') {
117-
if (!message.result._meta) {
118-
message.result._meta = {};
119-
}
120-
message.result._meta.extra = extra;
121-
}
122-
if ("params" in message && typeof message.params === 'object') {
123-
if (!message.params._meta) {
124-
message.params._meta = {};
125-
}
126-
message.params._meta.extra = extra;
127-
}
128-
}
129-
130-
function setOptions(message: JSONRPCMessage, options: TransportSendOptions): void {
131-
if ("result" in message && typeof message.result === 'object') {
132-
if (!message.result._meta) {
133-
message.result._meta = {};
134-
}
135-
message.result._meta.options = options;
136-
}
137-
if ("params" in message && typeof message.params === 'object') {
138-
if (!message.params._meta) {
139-
message.params._meta = {};
140-
}
141-
message.params._meta.options = options;
142-
}
143-
}
144-
145-
function popExtra(message: JSONRPCMessage): { authInfo?: AuthInfo; } | undefined {
146-
if ("params" in message && typeof message.params === 'object' && message.params._meta) {
147-
const extra = message.params._meta.extra as { authInfo?: AuthInfo; } | undefined;
148-
if (extra) {
149-
delete message.params._meta.extra;
150-
return extra;
151-
}
152-
}
153-
if ("result" in message && typeof message.result === 'object' && message.result._meta) {
154-
const extra = message.result._meta.extra as { authInfo?: AuthInfo; } | undefined;
155-
if (extra) {
156-
delete message.result._meta.extra;
157-
return extra;
158-
}
159-
}
160-
return undefined;
161-
}
162-
163-
function popOptions(message: JSONRPCMessage): TransportSendOptions | undefined {
164-
if ("params" in message && typeof message.params === 'object' && message.params._meta) {
165-
const options = message.params._meta.options as TransportSendOptions | undefined;
166-
if (options) {
167-
delete message.params._meta.options;
168-
return options;
169-
}
170-
}
171-
if ("result" in message && typeof message.result === 'object' && message.result._meta) {
172-
const options = message.result._meta.options as TransportSendOptions | undefined;
173-
if (options) {
174-
delete message.result._meta.options;
175-
return options;
176-
}
177-
}
178-
return undefined;
179-
}
180-
181-
182-
function relayTransports(transport1: Transport, transport2: Transport): void {
183-
console.log(`[relayTransports] Setting up relay between transports`);
184-
transport1.onmessage = (message, extra) => {
185-
console.log(`[relay] transport1 -> transport2:`, JSON.stringify(message).substring(0, 100));
186-
setExtra(message, extra);
187-
const options = popOptions(message);
188-
transport2.send(message, options)
189-
};
190-
transport2.onmessage = (message, extra) => {
191-
console.log(`[relay] transport2 -> transport1:`, JSON.stringify(message).substring(0, 100));
192-
setExtra(message, extra);
193-
const options = popOptions(message);
194-
transport1.send(message, options)
195-
};
196-
197-
transport1.onerror = (error) => {
198-
transport2.onerror?.(error);
199-
};
200-
transport2.onerror = (error) => {
201-
transport1.onerror?.(error);
202-
};
203-
204-
// Prevent circular close calls
205-
let closing = false;
206-
207-
transport1.onclose = () => {
208-
if (!closing) {
209-
closing = true;
210-
transport2.close().catch(console.error);
211-
}
212-
};
213-
transport2.onclose = () => {
214-
if (!closing) {
215-
closing = true;
216-
transport1.close().catch(console.error);
217-
}
218-
};
219-
}
220-
221-
222164
export async function startServerListeningToRedis(server: Server, sessionId: string) {
223165
console.log(`[startServerListeningToRedis] Starting background server for session ${sessionId}`);
224166
const serverRedisTransport = createBackgroundTaskSideRedisTransport(sessionId)

0 commit comments

Comments
 (0)