-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathredisTransport.test.ts
More file actions
582 lines (454 loc) · 17.2 KB
/
Copy pathredisTransport.test.ts
File metadata and controls
582 lines (454 loc) · 17.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
import { jest } from '@jest/globals';
import { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
import { MockRedisClient, setRedisClient } from '../../shared/redis.js';
import {
ServerRedisTransport,
redisRelayToMcpServer,
isLive,
shutdownSession,
setSessionOwner,
getSessionOwner,
validateSessionOwnership,
isSessionOwnedBy
} from './redisTransport.js';
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
describe('Redis Transport', () => {
let mockRedis: MockRedisClient;
beforeEach(() => {
mockRedis = new MockRedisClient();
setRedisClient(mockRedis);
jest.resetAllMocks();
});
afterEach(() => {
// Clear all Redis data and subscriptions
mockRedis.clear();
});
describe('ServerRedisTransport', () => {
let transport: ServerRedisTransport;
const sessionId = 'test-session-123';
beforeEach(() => {
transport = new ServerRedisTransport(sessionId);
});
afterEach(async () => {
if (transport) {
await transport.close();
}
});
it('should create transport with session ID', () => {
expect(transport).toBeInstanceOf(ServerRedisTransport);
});
it('should send response messages to request-specific channels', async () => {
const responseMessage: JSONRPCMessage = {
jsonrpc: '2.0',
id: 123,
result: { data: 'test response' }
};
const mockSubscriber = jest.fn();
await mockRedis.createSubscription(
`mcp:shttp:toclient:${sessionId}:123`,
mockSubscriber,
jest.fn()
);
await transport.send(responseMessage, { relatedRequestId: 123 });
expect(mockSubscriber).toHaveBeenCalledWith(
JSON.stringify({
type: 'mcp',
message: responseMessage,
options: { relatedRequestId: 123 }
})
);
});
it('should send notification messages to notification channel', async () => {
const notificationMessage: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'notifications/message',
params: { message: 'test notification' }
};
const mockSubscriber = jest.fn();
await mockRedis.createSubscription(
`mcp:shttp:toclient:${sessionId}:__GET_stream`,
mockSubscriber,
jest.fn()
);
await transport.send(notificationMessage);
expect(mockSubscriber).toHaveBeenCalledWith(
JSON.stringify({
type: 'mcp',
message: notificationMessage,
options: undefined
})
);
});
it('should handle close gracefully', async () => {
const onCloseMock = jest.fn();
transport.onclose = onCloseMock;
await transport.close();
expect(onCloseMock).toHaveBeenCalled();
});
it('should respond to shutdown control messages', async () => {
await transport.start();
const onCloseMock = jest.fn();
transport.onclose = onCloseMock;
// Send a shutdown control message
await shutdownSession(sessionId);
// Wait for async processing
await new Promise(resolve => setTimeout(resolve, 10));
expect(onCloseMock).toHaveBeenCalled();
});
it('should receive MCP messages from clients and call onmessage', async () => {
const onMessageMock = jest.fn();
transport.onmessage = onMessageMock;
await transport.start();
// Simulate client sending a message to server
const clientMessage: JSONRPCMessage = {
jsonrpc: '2.0',
id: 'test-req',
method: 'tools/list',
params: {}
};
await mockRedis.publish(
`mcp:shttp:toserver:${sessionId}`,
JSON.stringify({
type: 'mcp',
message: clientMessage,
extra: { authInfo: { token: 'test-token', clientId: 'test-client', scopes: [] } }
})
);
// Wait for async processing
await new Promise(resolve => setTimeout(resolve, 10));
expect(onMessageMock).toHaveBeenCalledWith(
clientMessage,
{ authInfo: { token: 'test-token', clientId: 'test-client', scopes: [] } }
);
await transport.close();
});
});
describe('redisRelayToMcpServer', () => {
let mockTransport: Transport;
const sessionId = 'test-session-456';
beforeEach(() => {
mockTransport = {
onmessage: undefined,
onclose: undefined,
onerror: undefined,
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
start: jest.fn(() => Promise.resolve())
};
});
it('should set up message relay from transport to server', async () => {
const cleanup = await redisRelayToMcpServer(sessionId, mockTransport);
// Simulate a message from the transport
const requestMessage: JSONRPCMessage = {
jsonrpc: '2.0',
id: 'req-123',
method: 'tools/list',
params: {}
};
// Trigger the onmessage handler
mockTransport.onmessage?.(requestMessage, { authInfo: { token: 'test-token', clientId: 'test-client', scopes: [] } });
// Wait a bit for async processing
await new Promise(resolve => setTimeout(resolve, 10));
// Check that message was published to server channel
const serverSubscriber = jest.fn();
await mockRedis.createSubscription(
`mcp:shttp:toserver:${sessionId}`,
serverSubscriber,
jest.fn()
);
// The message should have been published
expect(mockRedis.numsub(`mcp:shttp:toserver:${sessionId}`)).resolves.toBe(1);
await cleanup();
});
it('should subscribe to response channel for request messages', async () => {
const cleanup = await redisRelayToMcpServer(sessionId, mockTransport);
const requestMessage: JSONRPCMessage = {
jsonrpc: '2.0',
id: 'req-456',
method: 'tools/call',
params: { name: 'echo', arguments: { text: 'hello' } }
};
// Trigger the onmessage handler
mockTransport.onmessage?.(requestMessage, { authInfo: { token: 'test-token', clientId: 'test-client', scopes: [] } });
// Wait for subscription setup
await new Promise(resolve => setTimeout(resolve, 10));
// Now simulate a response from the server
const responseMessage: JSONRPCMessage = {
jsonrpc: '2.0',
id: 'req-456',
result: { content: [{ type: 'text', text: 'hello' }] }
};
await mockRedis.publish(
`mcp:shttp:toclient:${sessionId}:req-456`,
JSON.stringify({
type: 'mcp',
message: responseMessage,
options: undefined
})
);
// Check that the response was sent back to the transport
expect(mockTransport.send).toHaveBeenCalledWith(responseMessage, undefined);
await cleanup();
});
it('should not subscribe for notification messages (no id)', async () => {
const cleanup = await redisRelayToMcpServer(sessionId, mockTransport);
const notificationMessage: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'notifications/message',
params: { message: 'test' }
};
// Trigger the onmessage handler
mockTransport.onmessage?.(notificationMessage);
// Wait a bit
await new Promise(resolve => setTimeout(resolve, 10));
// Should not create any response channel subscriptions
expect(await mockRedis.numsub(`mcp:shttp:toclient:${sessionId}:undefined`)).toBe(0);
await cleanup();
});
});
describe('isLive', () => {
const sessionId = 'test-session-789';
it('should return true when session has active subscribers', async () => {
// Create a subscription to the server channel
await mockRedis.createSubscription(
`mcp:shttp:toserver:${sessionId}`,
jest.fn(),
jest.fn()
);
expect(await isLive(sessionId)).toBe(true);
});
it('should return false when session has no subscribers', async () => {
expect(await isLive(sessionId)).toBe(false);
});
});
describe('Session Ownership', () => {
const sessionId = 'test-session-ownership';
const userId = 'test-user-123';
it('should set and get session owner', async () => {
await setSessionOwner(sessionId, userId);
const owner = await getSessionOwner(sessionId);
expect(owner).toBe(userId);
});
it('should validate session ownership correctly', async () => {
await setSessionOwner(sessionId, userId);
expect(await validateSessionOwnership(sessionId, userId)).toBe(true);
expect(await validateSessionOwnership(sessionId, 'different-user')).toBe(false);
});
it('should set the ownership key with a TTL', async () => {
const setSpy = jest.spyOn(mockRedis, 'set');
await setSessionOwner(sessionId, userId);
expect(setSpy).toHaveBeenCalledWith(
`session:${sessionId}:owner`,
userId,
expect.objectContaining({ EX: expect.any(Number) })
);
});
it('should refresh the ownership TTL on successful validation only', async () => {
const expireSpy = jest.spyOn(mockRedis, 'expire');
await setSessionOwner(sessionId, userId);
expect(await validateSessionOwnership(sessionId, userId)).toBe(true);
expect(expireSpy).toHaveBeenCalledWith(`session:${sessionId}:owner`, expect.any(Number));
expireSpy.mockClear();
expect(await validateSessionOwnership(sessionId, 'different-user')).toBe(false);
expect(expireSpy).not.toHaveBeenCalled();
});
it('should remove the ownership key when the server transport closes', async () => {
await setSessionOwner(sessionId, userId);
expect(await getSessionOwner(sessionId)).toBe(userId);
const transport = new ServerRedisTransport(sessionId);
await transport.start();
await transport.close();
expect(await getSessionOwner(sessionId)).toBeNull();
});
it('should remove the ownership key when the session is shut down via control message', async () => {
await setSessionOwner(sessionId, userId);
const transport = new ServerRedisTransport(sessionId);
await transport.start();
await shutdownSession(sessionId);
await new Promise(resolve => setTimeout(resolve, 10));
expect(await getSessionOwner(sessionId)).toBeNull();
});
it('should check if session is owned by user including liveness', async () => {
// Session not live yet
expect(await isSessionOwnedBy(sessionId, userId)).toBe(false);
// Make session live
await mockRedis.createSubscription(
`mcp:shttp:toserver:${sessionId}`,
jest.fn(),
jest.fn()
);
// Still false because no owner set
expect(await isSessionOwnedBy(sessionId, userId)).toBe(false);
// Set owner
await setSessionOwner(sessionId, userId);
// Now should be true
expect(await isSessionOwnedBy(sessionId, userId)).toBe(true);
// False for different user
expect(await isSessionOwnedBy(sessionId, 'different-user')).toBe(false);
});
});
describe('Integration: Redis message flow', () => {
const sessionId = 'integration-test-session';
it('should relay messages between client and server through Redis', async () => {
// Set up client-side transport simulation
const clientTransport: Transport = {
onmessage: undefined,
onclose: undefined,
onerror: undefined,
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
start: jest.fn(() => Promise.resolve())
};
const cleanup = await redisRelayToMcpServer(sessionId, clientTransport);
// Client sends a request
const listToolsRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 'integration-req-1',
method: 'tools/list',
params: {}
};
// Set up subscription to simulate server receiving the message
const serverSubscriber = jest.fn();
await mockRedis.createSubscription(
`mcp:shttp:toserver:${sessionId}`,
serverSubscriber,
jest.fn()
);
// Simulate client sending request
clientTransport.onmessage?.(listToolsRequest);
// Wait for async processing
await new Promise(resolve => setTimeout(resolve, 10));
// Verify the message was published to server channel
expect(serverSubscriber).toHaveBeenCalledWith(
JSON.stringify({
type: 'mcp',
message: listToolsRequest,
extra: undefined,
options: undefined
})
);
// Simulate server sending response
const serverResponse: JSONRPCMessage = {
jsonrpc: '2.0',
id: 'integration-req-1',
result: { tools: [{ name: 'echo', description: 'Echo tool' }] }
};
await mockRedis.publish(
`mcp:shttp:toclient:${sessionId}:integration-req-1`,
JSON.stringify({
type: 'mcp',
message: serverResponse,
options: undefined
})
);
// Wait for response processing
await new Promise(resolve => setTimeout(resolve, 10));
// Verify the response was sent back to client
expect(clientTransport.send).toHaveBeenCalledWith(serverResponse, undefined);
await cleanup();
});
});
describe('Control Messages', () => {
const sessionId = 'test-control-session';
it('should send shutdown control messages', async () => {
const controlSubscriber = jest.fn();
await mockRedis.createSubscription(
`mcp:control:${sessionId}`,
controlSubscriber,
jest.fn()
);
await shutdownSession(sessionId);
const callArgs = controlSubscriber.mock.calls[0][0] as string;
const message = JSON.parse(callArgs);
expect(message.type).toBe('control');
expect(message.action).toBe('SHUTDOWN');
expect(typeof message.timestamp).toBe('number');
});
it('should properly shutdown server transport via control message', async () => {
const transport = new ServerRedisTransport(sessionId);
const onCloseMock = jest.fn();
transport.onclose = onCloseMock;
await transport.start();
// Send shutdown signal
await shutdownSession(sessionId);
// Wait for async processing
await new Promise(resolve => setTimeout(resolve, 10));
expect(onCloseMock).toHaveBeenCalled();
});
});
describe('Inactivity Timeout', () => {
const sessionId = 'test-inactivity-session';
beforeEach(() => {
jest.useFakeTimers({ doNotFake: ['setImmediate', 'nextTick'] });
});
afterEach(() => {
jest.useRealTimers();
});
it('should shutdown session after 5 minutes of inactivity', async () => {
const transport = new ServerRedisTransport(sessionId);
const shutdownSpy = jest.spyOn(mockRedis, 'publish');
await transport.start();
// Fast-forward time by 5 minutes
jest.advanceTimersByTime(5 * 60 * 1000);
// Should have published shutdown control message
expect(shutdownSpy).toHaveBeenCalledWith(
`mcp:control:${sessionId}`,
expect.stringContaining('"action":"SHUTDOWN"')
);
await transport.close();
});
it('should reset timeout when message is received', async () => {
const transport = new ServerRedisTransport(sessionId);
const onMessageMock = jest.fn();
transport.onmessage = onMessageMock;
await transport.start();
// Fast-forward 4 minutes
jest.advanceTimersByTime(4 * 60 * 1000);
// Manually publish a message to trigger the subscription handler
const testMessage = { jsonrpc: '2.0', method: 'ping' };
await mockRedis.publish(
`mcp:shttp:toserver:${sessionId}`,
JSON.stringify({
type: 'mcp',
message: testMessage
})
);
// Wait for message to be processed
await new Promise(resolve => setImmediate(resolve));
// Verify message was received
expect(onMessageMock).toHaveBeenCalledWith(testMessage, undefined);
// Clear the publish spy to check only future calls
const shutdownSpy = jest.spyOn(mockRedis, 'publish');
shutdownSpy.mockClear();
// Fast-forward 4 more minutes (total 8, but only 4 since last message)
jest.advanceTimersByTime(4 * 60 * 1000);
// Should not have shutdown yet
expect(shutdownSpy).not.toHaveBeenCalledWith(
`mcp:control:${sessionId}`,
expect.stringContaining('"action":"SHUTDOWN"')
);
// Fast-forward 2 more minutes to exceed timeout
jest.advanceTimersByTime(2 * 60 * 1000);
// Now should have shutdown
expect(shutdownSpy).toHaveBeenCalledWith(
`mcp:control:${sessionId}`,
expect.stringContaining('"action":"SHUTDOWN"')
);
await transport.close();
}, 10000);
it('should clear timeout on close', async () => {
const transport = new ServerRedisTransport(sessionId);
const shutdownSpy = jest.spyOn(mockRedis, 'publish');
await transport.start();
// Close transport before timeout
await transport.close();
// Fast-forward past timeout
jest.advanceTimersByTime(10 * 60 * 1000);
// Should not have triggered shutdown
expect(shutdownSpy).not.toHaveBeenCalledWith(
`mcp:control:${sessionId}`,
expect.stringContaining('"action":"SHUTDOWN"')
);
});
});
});