Skip to content

Commit 240ccc8

Browse files
committed
Use thread-safe data structures to track resource subscriptions
1 parent 5159f94 commit 240ccc8

2 files changed

Lines changed: 15 additions & 16 deletions

File tree

samples/EverythingServer/Program.cs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@
1111
using OpenTelemetry.Metrics;
1212
using OpenTelemetry.Resources;
1313
using OpenTelemetry.Trace;
14+
using System.Collections.Concurrent;
1415

1516
var builder = WebApplication.CreateBuilder(args);
1617

1718
// Subscriptions tracks resource URIs to McpServer instances
18-
Dictionary<string, List<IMcpServer>> subscriptions = new();
19+
// Use thread-safe data structures since handlers can run in parallel
20+
// even in the context of a single session.
21+
ConcurrentDictionary<string, ConcurrentBag<IMcpServer>> subscriptions = new();
1922

2023
builder.Services
2124
.AddMcpServer()
@@ -32,15 +35,10 @@
3235
.WithResources<SimpleResourceType>()
3336
.WithSubscribeToResourcesHandler(async (ctx, ct) =>
3437
{
35-
var uri = ctx.Params?.Uri;
36-
37-
if (uri is not null)
38+
if (ctx.Params?.Uri is { } uri)
3839
{
39-
if (!subscriptions.ContainsKey(uri))
40-
{
41-
subscriptions[uri] = new List<IMcpServer>();
42-
}
43-
subscriptions[uri].Add(ctx.Server);
40+
var bag = subscriptions.GetOrAdd(uri, _ => new ConcurrentBag<IMcpServer>());
41+
bag.Add(ctx.Server);
4442

4543
await ctx.Server.SampleAsync([
4644
new ChatMessage(ChatRole.System, "You are a helpful test server"),
@@ -58,13 +56,13 @@ await ctx.Server.SampleAsync([
5856
})
5957
.WithUnsubscribeFromResourcesHandler(async (ctx, ct) =>
6058
{
61-
var uri = ctx.Params?.Uri;
62-
if (uri is not null)
59+
if (ctx.Params?.Uri is { } uri)
6360
{
64-
if (subscriptions.ContainsKey(uri))
61+
if (subscriptions.TryGetValue(uri, out var bag))
6562
{
66-
// Remove ctx.Server from the subscription list
67-
subscriptions[uri].Remove(ctx.Server);
63+
// Remove ctx.Server from the subscription bag (ConcurrentBag does not support removal, so recreate)
64+
var newBag = new ConcurrentBag<IMcpServer>(bag.Where(s => s != ctx.Server));
65+
subscriptions[uri] = newBag;
6866
}
6967
}
7068
return new EmptyResult();
@@ -145,7 +143,7 @@ await ctx.Server.SampleAsync([
145143
.WithLogging(b => b.SetResourceBuilder(resource))
146144
.UseOtlpExporter();
147145

148-
builder.Services.AddSingleton<IDictionary<string, List<IMcpServer>>>(subscriptions);
146+
builder.Services.AddSingleton(subscriptions);
149147
builder.Services.AddHostedService<SubscriptionMessageSender>();
150148
builder.Services.AddHostedService<LoggingUpdateMessageSender>();
151149

samples/EverythingServer/SubscriptionMessageSender.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
using ModelContextProtocol;
22
using ModelContextProtocol.Server;
33

4-
internal class SubscriptionMessageSender(IDictionary<string, List<IMcpServer>> subscriptions) : BackgroundService
4+
using System.Collections.Concurrent;
5+
internal class SubscriptionMessageSender(ConcurrentDictionary<string, ConcurrentBag<IMcpServer>> subscriptions) : BackgroundService
56
{
67
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
78
{

0 commit comments

Comments
 (0)