|
| 1 | +using Microsoft.Extensions.AI; |
| 2 | +using Microsoft.Extensions.DependencyInjection; |
| 3 | +using ModelContextProtocol.Client; |
| 4 | +using ModelContextProtocol.Protocol; |
| 5 | +using ModelContextProtocol.Server; |
| 6 | +using ModelContextProtocol.Tests.Utils; |
| 7 | +using System.Collections.Concurrent; |
| 8 | +using System.Text.Json.Nodes; |
| 9 | + |
| 10 | +namespace ModelContextProtocol.Tests.Client; |
| 11 | + |
| 12 | +/// <summary> |
| 13 | +/// Tests proving that outgoing message filters can track and limit per-session MRTR flows. |
| 14 | +/// This demonstrates that protocol-level sessions enable session-scoped resource governance |
| 15 | +/// that would not be possible without the Mcp-Session-Id routing mechanism. |
| 16 | +/// </summary> |
| 17 | +public class McpClientMrtrSessionLimitTests : ClientServerTestBase |
| 18 | +{ |
| 19 | + /// <summary> |
| 20 | + /// Tracks the number of pending MRTR flows per session. Incremented when an IncompleteResult |
| 21 | + /// is sent (outgoing filter), decremented when a retry with requestState arrives (incoming filter). |
| 22 | + /// </summary> |
| 23 | + private readonly ConcurrentDictionary<string, int> _pendingFlowsPerSession = new(); |
| 24 | + |
| 25 | + /// <summary> |
| 26 | + /// Records every (sessionId, pendingCount) observation from the outgoing filter, |
| 27 | + /// so the test can verify the tracking was correct. |
| 28 | + /// </summary> |
| 29 | + private readonly ConcurrentBag<(string SessionId, int PendingCount)> _observations = []; |
| 30 | + |
| 31 | + /// <summary> |
| 32 | + /// Maximum allowed concurrent MRTR flows per session. If exceeded, the outgoing filter |
| 33 | + /// replaces the IncompleteResult with an error response. |
| 34 | + /// </summary> |
| 35 | + private int _maxFlowsPerSession = int.MaxValue; |
| 36 | + |
| 37 | + /// <summary> |
| 38 | + /// Counts how many IncompleteResults were blocked by the per-session limit. |
| 39 | + /// </summary> |
| 40 | + private int _blockedFlowCount; |
| 41 | + |
| 42 | + public McpClientMrtrSessionLimitTests(ITestOutputHelper testOutputHelper) |
| 43 | + : base(testOutputHelper, startServer: false) |
| 44 | + { |
| 45 | + } |
| 46 | + |
| 47 | + protected override void ConfigureServices(ServiceCollection services, IMcpServerBuilder mcpServerBuilder) |
| 48 | + { |
| 49 | + services.Configure<McpServerOptions>(options => |
| 50 | + { |
| 51 | + options.ExperimentalProtocolVersion = "2026-06-XX"; |
| 52 | + |
| 53 | + // Outgoing filter: detect IncompleteResult responses and track per session. |
| 54 | + options.Filters.Message.OutgoingFilters.Add(next => async (context, cancellationToken) => |
| 55 | + { |
| 56 | + if (context.JsonRpcMessage is JsonRpcResponse response && |
| 57 | + response.Result is JsonObject resultObj && |
| 58 | + resultObj.TryGetPropertyValue("result_type", out var resultTypeNode) && |
| 59 | + resultTypeNode?.GetValue<string>() is "incomplete") |
| 60 | + { |
| 61 | + var sessionId = context.Server.SessionId ?? "unknown"; |
| 62 | + var newCount = _pendingFlowsPerSession.AddOrUpdate(sessionId, 1, (_, c) => c + 1); |
| 63 | + _observations.Add((sessionId, newCount)); |
| 64 | + |
| 65 | + // Enforce per-session limit: if exceeded, replace the IncompleteResult |
| 66 | + // with a JSON-RPC error. This prevents the client from receiving the |
| 67 | + // IncompleteResult and starting another retry cycle. |
| 68 | + if (newCount > _maxFlowsPerSession) |
| 69 | + { |
| 70 | + // Undo the increment since we're blocking this flow. |
| 71 | + _pendingFlowsPerSession.AddOrUpdate(sessionId, 0, (_, c) => Math.Max(0, c - 1)); |
| 72 | + Interlocked.Increment(ref _blockedFlowCount); |
| 73 | + |
| 74 | + // Replace the outgoing message with a JSON-RPC error. |
| 75 | + context.JsonRpcMessage = new JsonRpcError |
| 76 | + { |
| 77 | + Id = response.Id, |
| 78 | + Error = new JsonRpcErrorDetail |
| 79 | + { |
| 80 | + Code = (int)McpErrorCode.InvalidRequest, |
| 81 | + Message = $"Too many pending MRTR flows for this session (limit: {_maxFlowsPerSession}).", |
| 82 | + } |
| 83 | + }; |
| 84 | + } |
| 85 | + } |
| 86 | + |
| 87 | + await next(context, cancellationToken); |
| 88 | + }); |
| 89 | + |
| 90 | + // Incoming filter: detect retries (requests with requestState) and decrement. |
| 91 | + options.Filters.Message.IncomingFilters.Add(next => async (context, cancellationToken) => |
| 92 | + { |
| 93 | + if (context.JsonRpcMessage is JsonRpcRequest request && |
| 94 | + request.Params is JsonObject paramsObj && |
| 95 | + paramsObj.TryGetPropertyValue("requestState", out var stateNode) && |
| 96 | + stateNode is not null) |
| 97 | + { |
| 98 | + var sessionId = context.Server.SessionId ?? "unknown"; |
| 99 | + _pendingFlowsPerSession.AddOrUpdate(sessionId, 0, (_, c) => Math.Max(0, c - 1)); |
| 100 | + } |
| 101 | + |
| 102 | + await next(context, cancellationToken); |
| 103 | + }); |
| 104 | + }); |
| 105 | + |
| 106 | + mcpServerBuilder.WithTools([ |
| 107 | + McpServerTool.Create( |
| 108 | + async (string message, McpServer server, CancellationToken ct) => |
| 109 | + { |
| 110 | + var result = await server.ElicitAsync(new ElicitRequestParams |
| 111 | + { |
| 112 | + Message = message, |
| 113 | + RequestedSchema = new() |
| 114 | + }, ct); |
| 115 | + |
| 116 | + return $"{result.Action}"; |
| 117 | + }, |
| 118 | + new McpServerToolCreateOptions |
| 119 | + { |
| 120 | + Name = "elicit-tool", |
| 121 | + Description = "A tool that requests elicitation" |
| 122 | + }), |
| 123 | + ]); |
| 124 | + } |
| 125 | + |
| 126 | + [Fact] |
| 127 | + public async Task OutgoingFilter_TracksIncompleteResultsPerSession() |
| 128 | + { |
| 129 | + // Verify that an outgoing message filter can observe IncompleteResult responses |
| 130 | + // and track the pending MRTR flow count per session using context.Server.SessionId. |
| 131 | + StartServer(); |
| 132 | + var clientOptions = new McpClientOptions { ExperimentalProtocolVersion = "2026-06-XX" }; |
| 133 | + clientOptions.Handlers.ElicitationHandler = (request, ct) => |
| 134 | + new ValueTask<ElicitResult>(new ElicitResult { Action = "accept" }); |
| 135 | + |
| 136 | + await using var client = await CreateMcpClientForServer(clientOptions); |
| 137 | + |
| 138 | + // Call the tool — triggers one MRTR round-trip. |
| 139 | + var result = await client.CallToolAsync("elicit-tool", |
| 140 | + new Dictionary<string, object?> { ["message"] = "confirm?" }, |
| 141 | + cancellationToken: TestContext.Current.CancellationToken); |
| 142 | + |
| 143 | + Assert.Equal("accept", Assert.IsType<TextContentBlock>(Assert.Single(result.Content)).Text); |
| 144 | + |
| 145 | + // Verify the filter observed exactly one IncompleteResult and tracked it. |
| 146 | + Assert.Single(_observations); |
| 147 | + var (sessionId, pendingCount) = _observations.First(); |
| 148 | + Assert.NotNull(sessionId); |
| 149 | + Assert.Equal(1, pendingCount); |
| 150 | + |
| 151 | + // After the retry completed, the count should be back to 0. |
| 152 | + Assert.Equal(0, _pendingFlowsPerSession.GetValueOrDefault(sessionId)); |
| 153 | + } |
| 154 | + |
| 155 | + [Fact] |
| 156 | + public async Task OutgoingFilter_CanEnforcePerSessionMrtrLimit() |
| 157 | + { |
| 158 | + // Verify that an outgoing message filter can enforce a per-session MRTR flow limit |
| 159 | + // by replacing the IncompleteResult with a JSON-RPC error when the limit is exceeded. |
| 160 | + // Set the limit to 0 so the very first MRTR flow is blocked. |
| 161 | + _maxFlowsPerSession = 0; |
| 162 | + |
| 163 | + StartServer(); |
| 164 | + var clientOptions = new McpClientOptions { ExperimentalProtocolVersion = "2026-06-XX" }; |
| 165 | + clientOptions.Handlers.ElicitationHandler = (request, ct) => |
| 166 | + new ValueTask<ElicitResult>(new ElicitResult { Action = "accept" }); |
| 167 | + |
| 168 | + await using var client = await CreateMcpClientForServer(clientOptions); |
| 169 | + |
| 170 | + // The tool call should fail because the outgoing filter blocks the IncompleteResult. |
| 171 | + var ex = await Assert.ThrowsAsync<McpProtocolException>(async () => |
| 172 | + await client.CallToolAsync("elicit-tool", |
| 173 | + new Dictionary<string, object?> { ["message"] = "confirm?" }, |
| 174 | + cancellationToken: TestContext.Current.CancellationToken)); |
| 175 | + |
| 176 | + Assert.Contains("Too many pending MRTR flows", ex.Message); |
| 177 | + Assert.Equal(1, _blockedFlowCount); |
| 178 | + } |
| 179 | +} |
0 commit comments