Skip to content

Commit ff6372a

Browse files
halter73Copilot
andcommitted
Restore implicit MRTR machinery for stateful sessions
Reverts most of commit 18c0df7's removal of MrtrContext/MrtrContinuation/MrtrExchange, gating it to stateful sessions only. Tools calling ElicitAsync/SampleAsync/RequestRootsAsync under DRAFT-2026-v1 on stdio and stateful Streamable HTTP again transparently suspend the handler via TCS and emit InputRequiredResult to the client, with retries resumed via continuation lookup on requestState. Stateless Streamable HTTP still requires explicit InputRequiredException for MRTR: the WrapHandlerWithMrtr gate skips the implicit machinery when !IsStatefulSession() and routes through InvokeWithInputRequiredResultHandlingAsync, which already throws when the client doesn't support MRTR on stateless. Deferred-task related machinery (DeferredTask / DeferredTaskCreationResult / DeferTaskCreation / HandleDeferredTaskCreationAsync) is NOT restored. That work was superseded by SEP-2663 (PR #1579), which uses an entirely different API surface (McpServerOptions.TaskStore + per-request task metadata) and would just have to delete the restored SEP-1686 code during its rebase. Test coverage restored: MrtrIntegrationTests (Client), MrtrHandlerLifecycleTests / MrtrMessageFilterTests / MrtrSessionLimitTests (Server), plus the deleted SessionDelete_* + RetryWithInvalidRequestState_* tests in MrtrProtocolTests and the Mrtr_ParallelAwaits theory rows in MapMcpTests.Mrtr.cs. All previously wrapped methods (tools/call, prompts/get, resources/read) are wired through the MRTR interceptor again; updated InputRequiredResult XML doc accordingly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent b0f1e30 commit ff6372a

12 files changed

Lines changed: 2053 additions & 33 deletions

File tree

src/ModelContextProtocol.Core/Protocol/InputRequiredResult.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ namespace ModelContextProtocol.Protocol;
1212
/// An <see cref="InputRequiredResult"/> is returned in response to a client-initiated request when
1313
/// the server needs the client to fulfill one or more server-initiated requests before it can produce
1414
/// a final result. Per SEP-2322 the wire format is valid for <see cref="RequestMethods.ToolsCall"/>,
15-
/// <see cref="RequestMethods.PromptsGet"/>, and <c>resources/read</c>, but this SDK currently only wires
16-
/// the MRTR interceptor into <see cref="RequestMethods.ToolsCall"/>; throwing
17-
/// <see cref="InputRequiredException"/> from a prompts or resources handler will surface as an internal
18-
/// error until the other methods are opted in.
15+
/// <see cref="RequestMethods.PromptsGet"/>, and <c>resources/read</c>; this SDK wires the MRTR
16+
/// interceptor into all three methods.
1917
/// </para>
2018
/// <para>
2119
/// At least one of <see cref="InputRequests"/> or <see cref="RequestState"/> must be present.

src/ModelContextProtocol.Core/Server/DestinationBoundMcpServer.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using ModelContextProtocol.Protocol;
2+
using System.Text.Json;
3+
using System.Text.Json.Nodes;
24

35
namespace ModelContextProtocol.Server;
46

@@ -14,6 +16,12 @@ internal sealed class DestinationBoundMcpServer(McpServerImpl server, ITransport
1416
public override IServiceProvider? Services => server.Services;
1517
public override LoggingLevel? LoggingLevel => server.LoggingLevel;
1618

19+
/// <summary>
20+
/// Gets or sets the MRTR context for the current request, if any.
21+
/// Set by <see cref="McpServerImpl.CreateDestinationBoundServer"/> when an MRTR-aware handler invocation is in progress.
22+
/// </summary>
23+
internal MrtrContext? ActiveMrtrContext { get; set; }
24+
1725
public override bool IsMrtrSupported => server.IsMrtrSupported;
1826

1927
public override ValueTask DisposeAsync() => server.DisposeAsync();
@@ -40,6 +48,16 @@ public override Task SendMessageAsync(JsonRpcMessage message, CancellationToken
4048

4149
public override Task<JsonRpcResponse> SendRequestAsync(JsonRpcRequest request, CancellationToken cancellationToken = default)
4250
{
51+
// When an MRTR context is active, intercept server-to-client requests (sampling, elicitation, roots)
52+
// and route them through the MRTR mechanism instead of sending them over the wire.
53+
// Task-augmented requests (SampleAsTaskAsync/ElicitAsTaskAsync) have a "task" property on their params
54+
// and expect a CreateTaskResult response, so they must bypass MRTR and go over the wire.
55+
if (ActiveMrtrContext is { } mrtrContext &&
56+
!(request.Params is JsonObject paramsObj && paramsObj.ContainsKey("task")))
57+
{
58+
return SendRequestViaMrtrAsync(mrtrContext, request, cancellationToken);
59+
}
60+
4361
if (request.Context is not null)
4462
{
4563
throw new ArgumentException("Only transports can provide a JsonRpcMessageContext.");
@@ -52,4 +70,23 @@ public override Task<JsonRpcResponse> SendRequestAsync(JsonRpcRequest request, C
5270

5371
return server.SendRequestAsync(request, cancellationToken);
5472
}
73+
74+
private static async Task<JsonRpcResponse> SendRequestViaMrtrAsync(
75+
MrtrContext mrtrContext, JsonRpcRequest request, CancellationToken cancellationToken)
76+
{
77+
var inputRequest = new InputRequest
78+
{
79+
Method = request.Method,
80+
Params = request.Params is { } paramsNode
81+
? JsonSerializer.Deserialize(paramsNode, McpJsonUtilities.JsonContext.Default.JsonElement)
82+
: null,
83+
};
84+
var inputResponse = await mrtrContext.RequestInputAsync(inputRequest, cancellationToken).ConfigureAwait(false);
85+
86+
return new JsonRpcResponse
87+
{
88+
Id = request.Id,
89+
Result = JsonSerializer.SerializeToNode(inputResponse.RawValue, McpJsonUtilities.JsonContext.Default.JsonElement),
90+
};
91+
}
5592
}

src/ModelContextProtocol.Core/Server/McpServerImpl.cs

Lines changed: 281 additions & 9 deletions
Large diffs are not rendered by default.
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
using ModelContextProtocol.Protocol;
2+
3+
namespace ModelContextProtocol.Server;
4+
5+
/// <summary>
6+
/// Manages the MRTR (Multi Round-Trip Request) coordination between a handler and the pipeline.
7+
/// When a handler calls <see cref="McpServer.ElicitAsync(ModelContextProtocol.Protocol.ElicitRequestParams, System.Threading.CancellationToken)"/> or
8+
/// <see cref="McpServer.SampleAsync(ModelContextProtocol.Protocol.CreateMessageRequestParams, System.Threading.CancellationToken)"/>,
9+
/// the handler sets the exchange TCS and suspends on a response TCS. The pipeline detects the exchange
10+
/// via <see cref="InitialExchangeTask"/> or the task returned by <see cref="ResetForNextExchange"/>,
11+
/// sends an <see cref="InputRequiredResult"/>, and later completes the response TCS when the retry arrives.
12+
/// </summary>
13+
internal sealed class MrtrContext
14+
{
15+
private TaskCompletionSource<MrtrExchange> _exchangeTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
16+
private int _nextInputRequestId;
17+
18+
/// <summary>
19+
/// Gets the task for the initial MRTR exchange. Set once in the constructor and never changes.
20+
/// For subsequent exchanges after a retry, use the task returned by <see cref="ResetForNextExchange"/>.
21+
/// </summary>
22+
public Task<MrtrExchange> InitialExchangeTask { get; }
23+
24+
public MrtrContext()
25+
{
26+
InitialExchangeTask = _exchangeTcs.Task;
27+
}
28+
29+
/// <summary>
30+
/// Prepares the context for the next round of exchange after a retry arrives.
31+
/// Uses <see cref="Interlocked.CompareExchange{T}"/> to atomically validate that
32+
/// <see cref="_exchangeTcs"/> still references the TCS that produced <paramref name="previousExchange"/>,
33+
/// ensuring concurrent calls reliably fail.
34+
/// </summary>
35+
/// <param name="previousExchange">The exchange from the previous round whose
36+
/// response has been (or is about to be) completed.</param>
37+
/// <returns>A task that completes when the handler requests input via
38+
/// <see cref="RequestInputAsync"/>.</returns>
39+
/// <exception cref="InvalidOperationException">The context state was modified concurrently.</exception>
40+
public Task<MrtrExchange> ResetForNextExchange(MrtrExchange previousExchange)
41+
{
42+
var newTcs = new TaskCompletionSource<MrtrExchange>(TaskCreationOptions.RunContinuationsAsynchronously);
43+
if (Interlocked.CompareExchange(ref _exchangeTcs, newTcs, previousExchange.SourceTcs) != previousExchange.SourceTcs)
44+
{
45+
throw new InvalidOperationException("MrtrContext was modified concurrently.");
46+
}
47+
48+
return newTcs.Task;
49+
}
50+
51+
/// <summary>
52+
/// Called by <see cref="McpServer.ElicitAsync(ModelContextProtocol.Protocol.ElicitRequestParams, System.Threading.CancellationToken)"/>
53+
/// or <see cref="McpServer.SampleAsync(ModelContextProtocol.Protocol.CreateMessageRequestParams, System.Threading.CancellationToken)"/>
54+
/// to request input from the client via the MRTR mechanism.
55+
/// </summary>
56+
/// <param name="inputRequest">The input request describing what the server needs.</param>
57+
/// <param name="cancellationToken">A token to cancel the wait for input.</param>
58+
/// <returns>The client's response to the input request.</returns>
59+
/// <exception cref="InvalidOperationException">A concurrent server-to-client request is already pending.</exception>
60+
public async Task<InputResponse> RequestInputAsync(InputRequest inputRequest, CancellationToken cancellationToken)
61+
{
62+
var key = $"input_{Interlocked.Increment(ref _nextInputRequestId)}";
63+
var tcs = _exchangeTcs;
64+
var exchange = new MrtrExchange(key, inputRequest, tcs);
65+
66+
// TrySetResult is the sole atomicity gate. If it returns false,
67+
// the TCS was already completed by a prior call ΓÇö concurrent exchanges
68+
// are not supported.
69+
if (!tcs.TrySetResult(exchange))
70+
{
71+
throw new InvalidOperationException(
72+
"Concurrent server-to-client requests are not supported. " +
73+
"Await each ElicitAsync, SampleAsync, or RequestRootsAsync call before making another.");
74+
}
75+
76+
return await exchange.ResponseTcs.Task.WaitAsync(cancellationToken).ConfigureAwait(false);
77+
}
78+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using System.Text.Json.Nodes;
2+
3+
namespace ModelContextProtocol.Server;
4+
5+
/// <summary>
6+
/// Represents the lifecycle state for an MRTR handler invocation across retries.
7+
/// Created when the handler starts and stored in <c>_mrtrContinuations</c> when
8+
/// the handler suspends waiting for client input.
9+
/// </summary>
10+
internal sealed class MrtrContinuation
11+
{
12+
private readonly CancellationTokenSource _handlerCts;
13+
14+
public MrtrContinuation(CancellationTokenSource handlerCts, Task<JsonNode?> handlerTask, MrtrContext mrtrContext)
15+
{
16+
_handlerCts = handlerCts;
17+
HandlerTask = handlerTask;
18+
MrtrContext = mrtrContext;
19+
}
20+
21+
/// <summary>
22+
/// Gets a token that cancels when the handler should be aborted.
23+
/// Passed to the handler at creation and remains valid across retries.
24+
/// </summary>
25+
public CancellationToken HandlerToken => _handlerCts.Token;
26+
27+
/// <summary>
28+
/// The handler task that is suspended awaiting input.
29+
/// </summary>
30+
public Task<JsonNode?> HandlerTask { get; }
31+
32+
/// <summary>
33+
/// The MRTR context for the handler's async flow.
34+
/// </summary>
35+
public MrtrContext MrtrContext { get; }
36+
37+
/// <summary>
38+
/// The exchange that is awaiting a response from the client.
39+
/// Set each time the handler suspends on a new exchange.
40+
/// </summary>
41+
public MrtrExchange? PendingExchange { get; set; }
42+
43+
/// <summary>
44+
/// Cancels the handler. Safe to call multiple times and concurrently ΓÇö
45+
/// <see cref="CancellationTokenSource.Cancel()"/> is thread-safe with itself.
46+
/// The CTS is intentionally never disposed to avoid deadlock risks from
47+
/// calling Cancel/Dispose inside synchronization primitives.
48+
/// </summary>
49+
public void CancelHandler() => _handlerCts.Cancel();
50+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using ModelContextProtocol.Protocol;
2+
3+
namespace ModelContextProtocol.Server;
4+
5+
/// <summary>
6+
/// Represents a single exchange between the handler and the pipeline during an MRTR flow.
7+
/// The handler creates the exchange and awaits the response TCS. The pipeline reads the exchange,
8+
/// sends the <see cref="InputRequest"/> to the client, and completes the TCS when the response arrives.
9+
/// </summary>
10+
internal sealed class MrtrExchange
11+
{
12+
public MrtrExchange(string key, InputRequest inputRequest, TaskCompletionSource<MrtrExchange> sourceTcs)
13+
{
14+
Key = key;
15+
InputRequest = inputRequest;
16+
SourceTcs = sourceTcs;
17+
ResponseTcs = new TaskCompletionSource<InputResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
18+
}
19+
20+
/// <summary>
21+
/// The unique key identifying this exchange within the MRTR round trip.
22+
/// </summary>
23+
public string Key { get; }
24+
25+
/// <summary>
26+
/// The input request that needs to be fulfilled by the client.
27+
/// </summary>
28+
public InputRequest InputRequest { get; }
29+
30+
/// <summary>
31+
/// The <see cref="TaskCompletionSource{TResult}"/> that this exchange was set as the result of.
32+
/// Used by <see cref="MrtrContext.ResetForNextExchange"/> on retry to validate
33+
/// the expected state via <see cref="Interlocked.CompareExchange{T}"/>.
34+
/// </summary>
35+
internal TaskCompletionSource<MrtrExchange> SourceTcs { get; }
36+
37+
/// <summary>
38+
/// The TCS that will be completed with the client's response.
39+
/// </summary>
40+
public TaskCompletionSource<InputResponse> ResponseTcs { get; }
41+
}

tests/ModelContextProtocol.AspNetCore.Tests/MapMcpTests.Mrtr.cs

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -238,39 +238,72 @@ private static async Task<string> MrtrParallelAwait(McpServer server, Cancellati
238238
RequestedSchema = new()
239239
}, ct);
240240

241-
var sampleTask = server.SampleAsync(new CreateMessageRequestParams
241+
// Start the second await — with MRTR, this throws InvalidOperationException
242+
// because MrtrContext only supports one pending exchange at a time.
243+
try
242244
{
243-
Messages = [new SamplingMessage { Role = Role.User, Content = [new TextContentBlock { Text = "Parallel sample" }] }],
244-
MaxTokens = 100
245-
}, ct);
245+
var sampleTask = server.SampleAsync(new CreateMessageRequestParams
246+
{
247+
Messages = [new SamplingMessage { Role = Role.User, Content = [new TextContentBlock { Text = "Parallel sample" }] }],
248+
MaxTokens = 100
249+
}, ct);
246250

247-
var sampleResult = await sampleTask;
248-
var elicitResult = await elicitTask;
249-
return $"parallel-ok:{elicitResult.Action}:{sampleResult.Content.OfType<TextContentBlock>().First().Text}";
251+
// If we get here, both calls succeeded (non-MRTR path)
252+
var sampleResult = await sampleTask;
253+
var elicitResult = await elicitTask;
254+
return $"parallel-ok:{elicitResult.Action}:{sampleResult.Content.OfType<TextContentBlock>().First().Text}";
255+
}
256+
catch (InvalidOperationException ex)
257+
{
258+
return ex.Message;
259+
}
250260
}
251261

252-
[Fact]
253-
public async Task Mrtr_ParallelAwaits()
262+
[Theory]
263+
[InlineData(false)]
264+
[InlineData(true)]
265+
public async Task Mrtr_ParallelAwaits(bool experimentalClient)
254266
{
255-
// Server-side parallel ElicitAsync + SampleAsync awaits use the legacy server-to-client
256-
// request path on stateful sessions, which works the same under either negotiated revision
257-
// (the spec only removes those request methods from Streamable HTTP under draft, which is
258-
// stateless-only territory). Stateless servers can't issue server-to-client requests at all.
259-
Assert.SkipWhen(Stateless, "Server-side awaits require stateful server-to-client requests.");
267+
// Parallel awaits work with regular JSON-RPC but fail with MRTR because
268+
// MrtrContext only supports one exchange at a time (TrySetResult gate).
269+
Assert.SkipWhen(Stateless, "Await-style API requires handler suspension (stateful only).");
260270

261271
ConfigureServer(MrtrParallelAwait);
262272
await using var app = Builder.Build();
263273
app.MapMcp();
264274
await app.StartAsync(TestContext.Current.CancellationToken);
265275

266-
await using var client = await ConnectAsync(configureClient: ConfigureMrtrHandlers);
276+
Action<McpClientOptions> configureClient = experimentalClient
277+
? options => { ConfigureMrtrHandlers(options); options.ProtocolVersion = "DRAFT-2026-v1"; }
278+
: ConfigureMrtrHandlers;
267279

268-
var result = await client.CallToolAsync("mrtr-parallel-await",
269-
cancellationToken: TestContext.Current.CancellationToken);
280+
await using var client = await ConnectAsync(configureClient: configureClient);
270281

271-
var text = Assert.IsType<TextContentBlock>(Assert.Single(result.Content)).Text;
272-
Assert.StartsWith("parallel-ok:", text);
273-
Assert.True(result.IsError is not true);
282+
if (experimentalClient)
283+
{
284+
// MRTR active. Parallel awaits hit the MrtrContext concurrency gate and the second
285+
// call throws InvalidOperationException, which the tool catches and returns as text.
286+
Assert.Equal("DRAFT-2026-v1", client.NegotiatedProtocolVersion);
287+
288+
var result = await client.CallToolAsync("mrtr-parallel-await",
289+
cancellationToken: TestContext.Current.CancellationToken);
290+
291+
var text = Assert.IsType<TextContentBlock>(Assert.Single(result.Content)).Text;
292+
Assert.Contains("Concurrent server-to-client requests are not supported", text);
293+
Assert.True(result.IsError is not true);
294+
}
295+
else
296+
{
297+
// Non-MRTR: awaits go through regular JSON-RPC — concurrent calls work.
298+
Assert.Equal("2025-11-25", client.NegotiatedProtocolVersion);
299+
300+
var result = await client.CallToolAsync("mrtr-parallel-await",
301+
cancellationToken: TestContext.Current.CancellationToken);
302+
303+
var text = Assert.IsType<TextContentBlock>(Assert.Single(result.Content)).Text;
304+
Assert.StartsWith("parallel-ok:", text);
305+
Assert.True(result.IsError is not true);
306+
}
274307
}
275308

276309
[McpServerTool(Name = "mrtr-elicit")]

0 commit comments

Comments
 (0)