Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions src/ModelContextProtocol.Core/Client/McpClient.Methods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,18 @@ public Task SubscribeToResourceAsync(string uri, RequestOptions? options = null,
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>The result of the request.</returns>
/// <exception cref="ArgumentNullException"><paramref name="requestParams"/> is <see langword="null"/>.</exception>
/// <remarks>
/// <para>
/// This method subscribes to resource update notifications but does not register a handler.
/// To receive notifications, you must separately call <see cref="McpSession.RegisterNotificationHandler(string, Func{JsonRpcNotification, CancellationToken, ValueTask})"/>
/// with <see cref="NotificationMethods.ResourceUpdatedNotification"/> and filter for the specific resource URI.
/// To unsubscribe, call <see cref="UnsubscribeFromResourceAsync(UnsubscribeRequestParams, CancellationToken)"/> and dispose the handler registration.
/// </para>
/// <para>
/// For a simpler API that handles both subscription and notification registration in a single call,
/// use <see cref="SubscribeToResourceAsync(Uri, Func{ResourceUpdatedNotificationParams, CancellationToken, ValueTask}, RequestOptions?, CancellationToken)"/>.
/// </para>
/// </remarks>
public Task SubscribeToResourceAsync(
SubscribeRequestParams requestParams,
CancellationToken cancellationToken = default)
Expand All @@ -575,6 +587,140 @@ public Task SubscribeToResourceAsync(
cancellationToken: cancellationToken).AsTask();
}

/// <summary>
/// Subscribes to a resource on the server and registers a handler for notifications when it changes.
/// </summary>
/// <param name="uri">The URI of the resource to which to subscribe.</param>
/// <param name="handler">The handler to invoke when the resource is updated. It receives <see cref="ResourceUpdatedNotificationParams"/> for the subscribed resource.</param>
/// <param name="options">Optional request options including metadata, serialization settings, and progress tracking.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>
/// A task that completes with an <see cref="IAsyncDisposable"/> that, when disposed, unsubscribes from the resource
/// and removes the notification handler.
/// </returns>
/// <exception cref="ArgumentNullException"><paramref name="uri"/> or <paramref name="handler"/> is <see langword="null"/>.</exception>
/// <remarks>
/// <para>
/// This method provides a convenient way to subscribe to resource updates and handle notifications in a single call.
/// The returned <see cref="IAsyncDisposable"/> manages both the subscription and the notification handler registration.
/// When disposed, it automatically unsubscribes from the resource and removes the handler.
/// </para>
/// <para>
/// The handler will only be invoked for notifications related to the specified resource URI.
/// Notifications for other resources are filtered out automatically.
/// </para>
/// </remarks>
public Task<IAsyncDisposable> SubscribeToResourceAsync(
Uri uri,
Func<ResourceUpdatedNotificationParams, CancellationToken, ValueTask> handler,
RequestOptions? options = null,
CancellationToken cancellationToken = default)
{
Throw.IfNull(uri);

return SubscribeToResourceAsync(uri.AbsoluteUri, handler, options, cancellationToken);
}

/// <summary>
/// Subscribes to a resource on the server and registers a handler for notifications when it changes.
/// </summary>
/// <param name="uri">The URI of the resource to which to subscribe.</param>
/// <param name="handler">The handler to invoke when the resource is updated. It receives <see cref="ResourceUpdatedNotificationParams"/> for the subscribed resource.</param>
/// <param name="options">Optional request options including metadata, serialization settings, and progress tracking.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>
/// A task that completes with an <see cref="IAsyncDisposable"/> that, when disposed, unsubscribes from the resource
/// and removes the notification handler.
/// </returns>
/// <exception cref="ArgumentNullException"><paramref name="uri"/> or <paramref name="handler"/> is <see langword="null"/>.</exception>
/// <exception cref="ArgumentException"><paramref name="uri"/> is empty or composed entirely of whitespace.</exception>
/// <remarks>
/// <para>
/// This method provides a convenient way to subscribe to resource updates and handle notifications in a single call.
/// The returned <see cref="IAsyncDisposable"/> manages both the subscription and the notification handler registration.
/// When disposed, it automatically unsubscribes from the resource and removes the handler.
/// </para>
/// <para>
/// The handler will only be invoked for notifications related to the specified resource URI.
/// Notifications for other resources are filtered out automatically.
/// </para>
/// </remarks>
public async Task<IAsyncDisposable> SubscribeToResourceAsync(
string uri,
Func<ResourceUpdatedNotificationParams, CancellationToken, ValueTask> handler,
RequestOptions? options = null,
CancellationToken cancellationToken = default)
{
Throw.IfNullOrWhiteSpace(uri);
Throw.IfNull(handler);

// Register a notification handler that filters for this specific resource
IAsyncDisposable handlerRegistration = RegisterNotificationHandler(
Comment thread
stephentoub marked this conversation as resolved.
NotificationMethods.ResourceUpdatedNotification,
async (notification, ct) =>
{
if (JsonSerializer.Deserialize(notification.Params, McpJsonUtilities.JsonContext.Default.ResourceUpdatedNotificationParams) is { } resourceUpdate &&
UriTemplate.UriTemplateComparer.Instance.Equals(resourceUpdate.Uri, uri))
{
await handler(resourceUpdate, ct).ConfigureAwait(false);
}
});

try
{
// Subscribe to the resource
await SubscribeToResourceAsync(uri, options, cancellationToken).ConfigureAwait(false);
}
catch
{
// If subscription fails, unregister the handler before propagating the exception
await handlerRegistration.DisposeAsync().ConfigureAwait(false);
throw;
}

// Return a disposable that unsubscribes and removes the handler
return new ResourceSubscription(this, uri, handlerRegistration, options);
}

/// <summary>
/// Manages a resource subscription, handling both unsubscription and handler disposal.
/// </summary>
private sealed class ResourceSubscription : IAsyncDisposable
{
private readonly McpClient _client;
private readonly string _uri;
private readonly IAsyncDisposable _handlerRegistration;
private readonly RequestOptions? _options;
private int _disposed;

public ResourceSubscription(McpClient client, string uri, IAsyncDisposable handlerRegistration, RequestOptions? options)
{
_client = client;
_uri = uri;
_handlerRegistration = handlerRegistration;
_options = options;
}

public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _disposed, 1) != 0)
{
return;
}

try
{
// Unsubscribe from the resource
await _client.UnsubscribeFromResourceAsync(_uri, _options, CancellationToken.None).ConfigureAwait(false);
}
finally
{
// Dispose the notification handler registration
await _handlerRegistration.DisposeAsync().ConfigureAwait(false);
}
}
}

/// <summary>
/// Unsubscribes from a resource on the server to stop receiving notifications about its changes.
/// </summary>
Expand Down
Loading
Loading