-
Notifications
You must be signed in to change notification settings - Fork 680
Expand file tree
/
Copy pathMcpClientResourceSubscriptionTests.cs
More file actions
366 lines (311 loc) · 14.3 KB
/
McpClientResourceSubscriptionTests.cs
File metadata and controls
366 lines (311 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
using Microsoft.Extensions.DependencyInjection;
using ModelContextProtocol.Client;
using ModelContextProtocol.Protocol;
using ModelContextProtocol.Server;
using System.ComponentModel;
namespace ModelContextProtocol.Tests.Client;
public class McpClientResourceSubscriptionTests : ClientServerTestBase
{
public McpClientResourceSubscriptionTests(ITestOutputHelper outputHelper)
: base(outputHelper)
{
}
protected override void ConfigureServices(ServiceCollection services, IMcpServerBuilder mcpServerBuilder)
{
mcpServerBuilder.WithResources<SubscribableResources>();
}
[McpServerResourceType]
private sealed class SubscribableResources
{
[McpServerResource(UriTemplate = "test://resource/{id}"), Description("A subscribable test resource")]
public static string GetResource(string id) => $"Resource content: {id}";
}
[Fact]
public async Task SubscribeToResourceAsync_WithHandler_ReceivesNotifications()
{
// Arrange
await using McpClient client = await CreateMcpClientForServer();
const string resourceUri = "test://resource/1";
var notificationReceived = new TaskCompletionSource<ResourceUpdatedNotificationParams>();
// Act
await using var subscription = await client.SubscribeToResourceAsync(
resourceUri,
(notification, ct) =>
{
notificationReceived.TrySetResult(notification);
return default(ValueTask);
},
cancellationToken: TestContext.Current.CancellationToken);
// Send a notification from the server
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = resourceUri },
cancellationToken: TestContext.Current.CancellationToken);
// Assert
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var receivedNotification = await notificationReceived.Task.WaitAsync(cts.Token);
Assert.NotNull(receivedNotification);
Assert.Equal(resourceUri, receivedNotification.Uri);
}
[Fact]
public async Task SubscribeToResourceAsync_WithHandler_FiltersNotificationsByUri()
{
// Arrange
await using McpClient client = await CreateMcpClientForServer();
const string subscribedUri = "test://resource/1";
const string otherUri = "test://resource/2";
var notificationCount = 0;
var correctNotificationReceived = new TaskCompletionSource<bool>();
// Act
await using var subscription = await client.SubscribeToResourceAsync(
subscribedUri,
(notification, ct) =>
{
Interlocked.Increment(ref notificationCount);
if (notification.Uri == subscribedUri)
{
correctNotificationReceived.TrySetResult(true);
}
return default(ValueTask);
},
cancellationToken: TestContext.Current.CancellationToken);
// Send notifications for different resources
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = otherUri },
cancellationToken: TestContext.Current.CancellationToken);
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = subscribedUri },
cancellationToken: TestContext.Current.CancellationToken);
// Assert
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await correctNotificationReceived.Task.WaitAsync(cts.Token);
// Give a small delay to ensure no other notifications are processed
await Task.Delay(100, TestContext.Current.CancellationToken);
// Should only receive the notification for the subscribed URI
Assert.Equal(1, notificationCount);
}
[Fact]
public async Task SubscribeToResourceAsync_WithHandler_DisposalUnsubscribes()
{
// Arrange
await using McpClient client = await CreateMcpClientForServer();
const string resourceUri = "test://resource/1";
var notificationCount = 0;
// Act
var subscription = await client.SubscribeToResourceAsync(
resourceUri,
(notification, ct) =>
{
Interlocked.Increment(ref notificationCount);
return default(ValueTask);
},
cancellationToken: TestContext.Current.CancellationToken);
// Send a notification - should be received
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = resourceUri },
cancellationToken: TestContext.Current.CancellationToken);
await Task.Delay(100, TestContext.Current.CancellationToken); // Allow time for notification to be processed
// Dispose the subscription
await subscription.DisposeAsync();
// Send another notification - should NOT be received
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = resourceUri },
cancellationToken: TestContext.Current.CancellationToken);
await Task.Delay(100, TestContext.Current.CancellationToken); // Allow time to ensure notification is not processed
// Assert - only the first notification should have been received
Assert.Equal(1, notificationCount);
}
[Fact]
public async Task SubscribeToResourceAsync_WithHandler_UriOverload_ReceivesNotifications()
{
// Arrange
await using McpClient client = await CreateMcpClientForServer();
var resourceUri = new Uri("test://resource/1");
var notificationReceived = new TaskCompletionSource<ResourceUpdatedNotificationParams>();
// Act
await using var subscription = await client.SubscribeToResourceAsync(
resourceUri,
(notification, ct) =>
{
notificationReceived.TrySetResult(notification);
return default(ValueTask);
},
cancellationToken: TestContext.Current.CancellationToken);
// Send a notification from the server
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = resourceUri.AbsoluteUri },
cancellationToken: TestContext.Current.CancellationToken);
// Assert
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var receivedNotification = await notificationReceived.Task.WaitAsync(cts.Token);
Assert.NotNull(receivedNotification);
Assert.Equal(resourceUri.AbsoluteUri, receivedNotification.Uri);
}
[Fact]
public async Task SubscribeToResourceAsync_WithNullHandler_ThrowsArgumentNullException()
{
// Arrange
await using McpClient client = await CreateMcpClientForServer();
// Act & Assert
await Assert.ThrowsAsync<ArgumentNullException>(async () =>
await client.SubscribeToResourceAsync(
"test://resource/1",
handler: null!,
cancellationToken: TestContext.Current.CancellationToken));
}
[Fact]
public async Task SubscribeToResourceAsync_WithNullUri_ThrowsArgumentNullException()
{
// Arrange
await using McpClient client = await CreateMcpClientForServer();
// Act & Assert
await Assert.ThrowsAsync<ArgumentNullException>(async () =>
await client.SubscribeToResourceAsync(
uri: (Uri)null!,
handler: (notification, ct) => default,
cancellationToken: TestContext.Current.CancellationToken));
}
[Fact]
public async Task SubscribeToResourceAsync_WithEmptyUri_ThrowsArgumentException()
{
// Arrange
await using McpClient client = await CreateMcpClientForServer();
// Act & Assert
await Assert.ThrowsAsync<ArgumentException>(async () =>
await client.SubscribeToResourceAsync(
uri: "",
handler: (notification, ct) => default,
cancellationToken: TestContext.Current.CancellationToken));
}
[Fact]
public async Task SubscribeToResourceAsync_MultipleSubscriptions_BothReceiveNotifications()
{
// Arrange
await using McpClient client = await CreateMcpClientForServer();
const string uri1 = "test://resource/1";
const string uri2 = "test://resource/2";
var notification1Received = new TaskCompletionSource<bool>();
var notification2Received = new TaskCompletionSource<bool>();
// Act
await using var subscription1 = await client.SubscribeToResourceAsync(
uri1,
(notification, ct) =>
{
if (notification.Uri == uri1)
{
notification1Received.TrySetResult(true);
}
return default(ValueTask);
},
cancellationToken: TestContext.Current.CancellationToken);
await using var subscription2 = await client.SubscribeToResourceAsync(
uri2,
(notification, ct) =>
{
if (notification.Uri == uri2)
{
notification2Received.TrySetResult(true);
}
return default(ValueTask);
},
cancellationToken: TestContext.Current.CancellationToken);
// Send notifications
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = uri1 },
cancellationToken: TestContext.Current.CancellationToken);
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = uri2 },
cancellationToken: TestContext.Current.CancellationToken);
// Assert
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var combined = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, TestContext.Current.CancellationToken);
await Task.WhenAll(
notification1Received.Task.WaitAsync(combined.Token),
notification2Received.Task.WaitAsync(combined.Token));
Assert.True(await notification1Received.Task);
Assert.True(await notification2Received.Task);
}
[Fact]
public async Task SubscribeToResourceAsync_DisposalIsIdempotent()
{
// Arrange
await using McpClient client = await CreateMcpClientForServer();
const string resourceUri = "test://resource/1";
var subscription = await client.SubscribeToResourceAsync(
resourceUri,
(notification, ct) => default,
cancellationToken: TestContext.Current.CancellationToken);
// Act - dispose multiple times
await subscription.DisposeAsync();
await subscription.DisposeAsync();
await subscription.DisposeAsync();
// Assert - no exception should be thrown
Assert.True(true);
}
[Fact]
public async Task SubscribeToResourceAsync_MultipleHandlersSameUri_BothReceiveNotifications()
{
// Arrange
await using McpClient client = await CreateMcpClientForServer();
const string resourceUri = "test://resource/1";
var handler1Called = new TaskCompletionSource<bool>();
var handler2Called = new TaskCompletionSource<bool>();
var handler1Count = 0;
var handler2Count = 0;
// Act - Create two subscriptions to the same URI
await using var subscription1 = await client.SubscribeToResourceAsync(
resourceUri,
(notification, ct) =>
{
Interlocked.Increment(ref handler1Count);
handler1Called.TrySetResult(true);
return default;
},
cancellationToken: TestContext.Current.CancellationToken);
await using var subscription2 = await client.SubscribeToResourceAsync(
resourceUri,
(notification, ct) =>
{
Interlocked.Increment(ref handler2Count);
handler2Called.TrySetResult(true);
return default;
},
cancellationToken: TestContext.Current.CancellationToken);
// Send a single notification
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = resourceUri },
cancellationToken: TestContext.Current.CancellationToken);
// Assert - Both handlers should be invoked
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var combined = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, TestContext.Current.CancellationToken);
await Task.WhenAll(
handler1Called.Task.WaitAsync(combined.Token),
handler2Called.Task.WaitAsync(combined.Token));
Assert.Equal(1, handler1Count);
Assert.Equal(1, handler2Count);
// Dispose one subscription
await subscription1.DisposeAsync();
// Reset the second handler's task completion
var handler2CalledAgain = new TaskCompletionSource<bool>();
// Send another notification
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = resourceUri },
cancellationToken: TestContext.Current.CancellationToken);
// Wait a bit to see if handler2 gets called again
await Task.Delay(100, TestContext.Current.CancellationToken);
// Assert - Only the second handler should still receive notifications
// Handler1 should not have been called again (still 1)
Assert.Equal(1, handler1Count);
// Handler2 should have been called again (now 2)
Assert.Equal(2, handler2Count);
}
}