Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions src/ModelContextProtocol.Core/Client/McpClient.Methods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -610,15 +610,15 @@ public Task SubscribeToResourceAsync(
/// Notifications for other resources are filtered out automatically.
/// </para>
/// </remarks>
public async Task<IAsyncDisposable> SubscribeToResourceAsync(
public Task<IAsyncDisposable> SubscribeToResourceAsync(
Uri uri,
Func<ResourceUpdatedNotificationParams, CancellationToken, ValueTask> handler,
RequestOptions? options = null,
CancellationToken cancellationToken = default)
{
Throw.IfNull(uri);

return await SubscribeToResourceAsync(uri.AbsoluteUri, handler, options, cancellationToken).ConfigureAwait(false);
return SubscribeToResourceAsync(uri.AbsoluteUri, handler, options, cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -654,21 +654,30 @@ public async Task<IAsyncDisposable> SubscribeToResourceAsync(
Throw.IfNullOrWhiteSpace(uri);
Throw.IfNull(handler);

// Subscribe to the resource
await SubscribeToResourceAsync(uri, options, cancellationToken).ConfigureAwait(false);

// Register a notification handler that filters for this specific resource
IAsyncDisposable handlerRegistration = RegisterNotificationHandler(
Comment thread
stephentoub marked this conversation as resolved.
NotificationMethods.ResourceUpdatedNotification,
async (notification, ct) =>
{
if (JsonSerializer.Deserialize(notification.Params, McpJsonUtilities.JsonContext.Default.ResourceUpdatedNotificationParams) is { } resourceUpdate &&
string.Equals(resourceUpdate.Uri, uri, StringComparison.Ordinal))
UriTemplate.UriTemplateComparer.Instance.Equals(resourceUpdate.Uri, uri))
{
await handler(resourceUpdate, ct).ConfigureAwait(false);
}
});

try
{
// Subscribe to the resource
await SubscribeToResourceAsync(uri, options, cancellationToken).ConfigureAwait(false);
}
catch
{
// If subscription fails, unregister the handler before propagating the exception
await handlerRegistration.DisposeAsync().ConfigureAwait(false);
throw;
}

// Return a disposable that unsubscribes and removes the handler
return new ResourceSubscription(this, uri, handlerRegistration, options);
}
Expand All @@ -694,18 +703,18 @@ public ResourceSubscription(McpClient client, string uri, IAsyncDisposable handl

public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _disposed, 1) == 0)
if (Interlocked.Exchange(ref _disposed, 1) != 0)
{
// Unsubscribe from the resource
try
{
await _client.UnsubscribeFromResourceAsync(_uri, _options, CancellationToken.None).ConfigureAwait(false);
}
catch
{
// Swallow exceptions during unsubscribe to ensure handler is still disposed
}
return;
}

try
{
// Unsubscribe from the resource
await _client.UnsubscribeFromResourceAsync(_uri, _options, CancellationToken.None).ConfigureAwait(false);
}
finally
{
// Dispose the notification handler registration
await _handlerRegistration.DisposeAsync().ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,74 @@ public async Task SubscribeToResourceAsync_DisposalIsIdempotent()
// Assert - no exception should be thrown
Assert.True(true);
}

[Fact]
public async Task SubscribeToResourceAsync_MultipleHandlersSameUri_BothReceiveNotifications()
{
// Arrange
await using McpClient client = await CreateMcpClientForServer();
const string resourceUri = "test://resource/1";
var handler1Called = new TaskCompletionSource<bool>();
var handler2Called = new TaskCompletionSource<bool>();
var handler1Count = 0;
var handler2Count = 0;

// Act - Create two subscriptions to the same URI
await using var subscription1 = await client.SubscribeToResourceAsync(
resourceUri,
(notification, ct) =>
{
Interlocked.Increment(ref handler1Count);
handler1Called.TrySetResult(true);
return default;
},
cancellationToken: TestContext.Current.CancellationToken);

await using var subscription2 = await client.SubscribeToResourceAsync(
resourceUri,
(notification, ct) =>
{
Interlocked.Increment(ref handler2Count);
handler2Called.TrySetResult(true);
return default;
},
cancellationToken: TestContext.Current.CancellationToken);

// Send a single notification
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = resourceUri },
cancellationToken: TestContext.Current.CancellationToken);

// Assert - Both handlers should be invoked
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var combined = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, TestContext.Current.CancellationToken);
await Task.WhenAll(
handler1Called.Task.WaitAsync(combined.Token),
handler2Called.Task.WaitAsync(combined.Token));

Assert.Equal(1, handler1Count);
Assert.Equal(1, handler2Count);

// Dispose one subscription
await subscription1.DisposeAsync();

// Reset the second handler's task completion
var handler2CalledAgain = new TaskCompletionSource<bool>();
Comment thread
stephentoub marked this conversation as resolved.
Outdated

// Send another notification
await Server.SendNotificationAsync(
NotificationMethods.ResourceUpdatedNotification,
new ResourceUpdatedNotificationParams { Uri = resourceUri },
cancellationToken: TestContext.Current.CancellationToken);

// Wait a bit to see if handler2 gets called again
await Task.Delay(100, TestContext.Current.CancellationToken);
Comment thread
stephentoub marked this conversation as resolved.
Outdated

// Assert - Only the second handler should still receive notifications
// Handler1 should not have been called again (still 1)
Assert.Equal(1, handler1Count);
// Handler2 should have been called again (now 2)
Assert.Equal(2, handler2Count);
}
}
Loading