Skip to content

Commit 0bec44e

Browse files
halter73Copilot
andcommitted
Improve MRTR thread-safety, cancellation, and shutdown
Harden the MRTR (Multi Round-Trip Request) implementation to correctly handle cancellation across retries, clean shutdown, and handler lifecycle tracking. Thread-safety: - Replace mutable ExchangeTask property with immutable InitialExchangeTask and return-value data flow from ResetForNextExchange - Use Interlocked.CompareExchange in ResetForNextExchange to validate expected state, ensuring concurrent calls reliably fail - Use TrySetResult as the sole atomicity gate in RequestInputAsync, with explicit failure on concurrent exchanges - Store SourceTcs back-reference in MrtrExchange for CAS validation Cancellation: - Introduce a long-lived handler CTS (encapsulated in MrtrContinuation) that survives across retries, keeping the handler cancellable after the original request's combinedCts is disposed - Bridge each retry's cancellation to the handler CTS via CancellationTokenRegistration in AwaitMrtrHandlerAsync - Check TrySetResult/TrySetException return values on retry to detect already-cancelled exchanges - CTS is never disposed (like Kestrel's HttpContext.RequestAborted) to avoid deadlock risks from Cancel/Dispose inside synchronization primitives. CancelHandler() is the sole operation and is thread-safe. Shutdown: - Dispose session handler before iterating _mrtrContinuations so no new continuations can be created during the cleanup loop - Track MRTR handler tasks with inFlightCount + TCS drain pattern (matching McpSessionHandler.ProcessMessagesCoreAsync) so DisposeAsync waits for all handlers to complete before returning - Add ObserveHandlerCompletionAsync fire-and-forget observer that logs unhandled handler exceptions at Error level Logging: - Exclude IncompleteResultException from Error-level ToolCallError logging since it is normal MRTR control flow, not an error Simplifications: - Flow MrtrContext via JsonRpcMessageContext property instead of _pendingMrtrContexts ConcurrentDictionary with synchronous-before-await assumptions - MrtrContinuation is a lifecycle object created upfront, eliminating CTS disposal branching, orphanedCts tracking, and post-drain cleanup Tests (8 new): - ServerDisposal_CancelsHandlerCancellationToken_DuringMrtr - CancellationNotification_DuringInFlightMrtrRetry_CancelsHandler - CancellationNotification_ForExpiredRequestId_DoesNotAffectHandler - DisposeAsync_WaitsForMrtrHandler_BeforeReturning - HandlerException_DuringMrtr_IsLoggedAtErrorLevel - IncompleteResultException_IsNotLoggedAtErrorLevel Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 5487d0a commit 0bec44e

11 files changed

Lines changed: 598 additions & 122 deletions

File tree

src/ModelContextProtocol.Core/Client/McpClient.Methods.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System.Diagnostics.CodeAnalysis;
55
using System.Text.Json;
66
using System.Text.Json.Nodes;
7-
using System.Text.Json.Serialization.Metadata;
87

98
namespace ModelContextProtocol.Client;
109

src/ModelContextProtocol.Core/Client/McpClient.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,4 @@ protected McpClient()
7070
/// </para>
7171
/// </remarks>
7272
public abstract Task<ClientCompletionDetails> Completion { get; }
73-
7473
}

src/ModelContextProtocol.Core/Client/McpClientImpl.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using Microsoft.Extensions.Logging;
22
using Microsoft.Extensions.Logging.Abstractions;
33
using ModelContextProtocol.Protocol;
4-
using ModelContextProtocol.Server;
54
using System.Text.Json;
65
using System.Text.Json.Nodes;
76

@@ -488,7 +487,6 @@ private void RegisterTaskHandlers(RequestHandlers requestHandlers, IMcpTaskStore
488487

489488
// Advertise task capabilities
490489
_options.Capabilities ??= new();
491-
492490
var tasksCapability = _options.Capabilities.Tasks ??= new McpTasksCapability();
493491
tasksCapability.List ??= new ListMcpTasksCapability();
494492
tasksCapability.Cancel ??= new CancelMcpTasksCapability();

src/ModelContextProtocol.Core/Protocol/JsonRpcMessageContext.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,14 @@ public sealed class JsonRpcMessageContext
8585
/// to flow the protocol version header so the server can determine client capabilities.
8686
/// </remarks>
8787
public string? ProtocolVersion { get; set; }
88+
89+
/// <summary>
90+
/// Gets or sets the MRTR context for this request, if any.
91+
/// </summary>
92+
/// <remarks>
93+
/// Set by <see cref="McpServer"/> when an MRTR-aware handler invocation is in progress,
94+
/// so that the per-request <see cref="DestinationBoundMcpServer"/> can intercept
95+
/// server-to-client requests (e.g. ElicitAsync, SampleAsync) and route them through the MRTR mechanism.
96+
/// </remarks>
97+
internal MrtrContext? MrtrContext { get; set; }
8898
}

src/ModelContextProtocol.Core/Server/McpServer.Methods.cs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -280,13 +280,6 @@ public ValueTask<ListRootsResult> RequestRootsAsync(
280280
Throw.IfNull(requestParams);
281281
ThrowIfRootsUnsupported();
282282

283-
return RequestRootsCoreAsync(requestParams, cancellationToken);
284-
}
285-
286-
private ValueTask<ListRootsResult> RequestRootsCoreAsync(
287-
ListRootsRequestParams requestParams,
288-
CancellationToken cancellationToken)
289-
{
290283
return SendRequestAsync(
291284
RequestMethods.RootsList,
292285
requestParams,

src/ModelContextProtocol.Core/Server/McpServer.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,5 +87,4 @@ protected McpServer()
8787
/// Runs the server, listening for and handling client requests.
8888
/// </summary>
8989
public abstract Task RunAsync(CancellationToken cancellationToken = default);
90-
9190
}

src/ModelContextProtocol.Core/Server/McpServerImpl.cs

Lines changed: 164 additions & 85 deletions
Large diffs are not rendered by default.

src/ModelContextProtocol.Core/Server/MrtrContext.cs

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,46 @@ namespace ModelContextProtocol.Server;
77
/// When a handler calls <see cref="McpServer.ElicitAsync(ModelContextProtocol.Protocol.ElicitRequestParams, System.Threading.CancellationToken)"/> or
88
/// <see cref="McpServer.SampleAsync(ModelContextProtocol.Protocol.CreateMessageRequestParams, System.Threading.CancellationToken)"/>,
99
/// the handler sets the exchange TCS and suspends on a response TCS. The pipeline detects the exchange
10-
/// via <see cref="ExchangeTask"/>, sends an <see cref="IncompleteResult"/>, and later completes the
11-
/// response TCS when the retry arrives.
10+
/// via <see cref="InitialExchangeTask"/> or the task returned by <see cref="ResetForNextExchange"/>,
11+
/// sends an <see cref="IncompleteResult"/>, and later completes the response TCS when the retry arrives.
1212
/// </summary>
1313
internal sealed class MrtrContext
1414
{
1515
private TaskCompletionSource<MrtrExchange> _exchangeTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
16-
1716
private int _nextInputRequestId;
1817

1918
/// <summary>
20-
/// Gets a task that completes when the handler produces an exchange (calls ElicitAsync/SampleAsync/RequestRootsAsync).
19+
/// Gets the task for the initial MRTR exchange. Set once in the constructor and never changes.
20+
/// For subsequent exchanges after a retry, use the task returned by <see cref="ResetForNextExchange"/>.
2121
/// </summary>
22-
public Task<MrtrExchange> ExchangeTask => _exchangeTcs.Task;
22+
public Task<MrtrExchange> InitialExchangeTask { get; }
23+
24+
public MrtrContext()
25+
{
26+
InitialExchangeTask = _exchangeTcs.Task;
27+
}
28+
29+
/// <summary>
30+
/// Prepares the context for the next round of exchange after a retry arrives.
31+
/// Uses <see cref="Interlocked.CompareExchange{T}"/> to atomically validate that
32+
/// <see cref="_exchangeTcs"/> still references the TCS that produced <paramref name="previousExchange"/>,
33+
/// ensuring concurrent calls reliably fail.
34+
/// </summary>
35+
/// <param name="previousExchange">The exchange from the previous round whose
36+
/// response has been (or is about to be) completed.</param>
37+
/// <returns>A task that completes when the handler requests input via
38+
/// <see cref="RequestInputAsync"/>.</returns>
39+
/// <exception cref="InvalidOperationException">The context state was modified concurrently.</exception>
40+
public Task<MrtrExchange> ResetForNextExchange(MrtrExchange previousExchange)
41+
{
42+
var newTcs = new TaskCompletionSource<MrtrExchange>(TaskCreationOptions.RunContinuationsAsynchronously);
43+
if (Interlocked.CompareExchange(ref _exchangeTcs, newTcs, previousExchange.SourceTcs) != previousExchange.SourceTcs)
44+
{
45+
throw new InvalidOperationException("MrtrContext was modified concurrently.");
46+
}
47+
48+
return newTcs.Task;
49+
}
2350

2451
/// <summary>
2552
/// Called by <see cref="McpServer.ElicitAsync(ModelContextProtocol.Protocol.ElicitRequestParams, System.Threading.CancellationToken)"/>
@@ -32,25 +59,20 @@ internal sealed class MrtrContext
3259
/// <exception cref="InvalidOperationException">A concurrent server-to-client request is already pending.</exception>
3360
public async Task<InputResponse> RequestInputAsync(InputRequest inputRequest, CancellationToken cancellationToken)
3461
{
62+
var key = $"input_{Interlocked.Increment(ref _nextInputRequestId)}";
3563
var tcs = _exchangeTcs;
36-
if (tcs.Task.IsCompleted)
64+
var exchange = new MrtrExchange(key, inputRequest, tcs);
65+
66+
// TrySetResult is the sole atomicity gate. If it returns false,
67+
// the TCS was already completed by a prior call — concurrent exchanges
68+
// are not supported.
69+
if (!tcs.TrySetResult(exchange))
3770
{
38-
throw new InvalidOperationException("Concurrent server-to-client requests are not supported. Await each ElicitAsync, SampleAsync, or RequestRootsAsync call before making another.");
71+
throw new InvalidOperationException(
72+
"Concurrent server-to-client requests are not supported. " +
73+
"Await each ElicitAsync, SampleAsync, or RequestRootsAsync call before making another.");
3974
}
4075

41-
var key = $"input_{Interlocked.Increment(ref _nextInputRequestId)}";
42-
var exchange = new MrtrExchange(key, inputRequest);
43-
tcs.TrySetResult(exchange);
44-
4576
return await exchange.ResponseTcs.Task.WaitAsync(cancellationToken).ConfigureAwait(false);
4677
}
47-
48-
/// <summary>
49-
/// Prepares the context for the next round of exchange after a retry arrives.
50-
/// Must be called before completing the previous exchange's response TCS.
51-
/// </summary>
52-
public void ResetForNextExchange()
53-
{
54-
_exchangeTcs = new TaskCompletionSource<MrtrExchange>(TaskCreationOptions.RunContinuationsAsynchronously);
55-
}
5678
}

src/ModelContextProtocol.Core/Server/MrtrContinuation.cs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,27 @@
33
namespace ModelContextProtocol.Server;
44

55
/// <summary>
6-
/// Represents a continuation for a suspended MRTR handler, stored between round trips.
6+
/// Represents the lifecycle state for an MRTR handler invocation across retries.
7+
/// Created when the handler starts and stored in <c>_mrtrContinuations</c> when
8+
/// the handler suspends waiting for client input.
79
/// </summary>
810
internal sealed class MrtrContinuation
911
{
10-
public MrtrContinuation(Task<JsonNode?> handlerTask, MrtrContext mrtrContext, MrtrExchange pendingExchange)
12+
private readonly CancellationTokenSource _handlerCts;
13+
14+
public MrtrContinuation(CancellationTokenSource handlerCts, Task<JsonNode?> handlerTask, MrtrContext mrtrContext)
1115
{
16+
_handlerCts = handlerCts;
1217
HandlerTask = handlerTask;
1318
MrtrContext = mrtrContext;
14-
PendingExchange = pendingExchange;
1519
}
1620

21+
/// <summary>
22+
/// Gets a token that cancels when the handler should be aborted.
23+
/// Passed to the handler at creation and remains valid across retries.
24+
/// </summary>
25+
public CancellationToken HandlerToken => _handlerCts.Token;
26+
1727
/// <summary>
1828
/// The handler task that is suspended awaiting input.
1929
/// </summary>
@@ -26,6 +36,15 @@ public MrtrContinuation(Task<JsonNode?> handlerTask, MrtrContext mrtrContext, Mr
2636

2737
/// <summary>
2838
/// The exchange that is awaiting a response from the client.
39+
/// Set each time the handler suspends on a new exchange.
40+
/// </summary>
41+
public MrtrExchange? PendingExchange { get; set; }
42+
43+
/// <summary>
44+
/// Cancels the handler. Safe to call multiple times and concurrently —
45+
/// <see cref="CancellationTokenSource.Cancel()"/> is thread-safe with itself.
46+
/// The CTS is intentionally never disposed to avoid deadlock risks from
47+
/// calling Cancel/Dispose inside synchronization primitives.
2948
/// </summary>
30-
public MrtrExchange PendingExchange { get; }
49+
public void CancelHandler() => _handlerCts.Cancel();
3150
}

src/ModelContextProtocol.Core/Server/MrtrExchange.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ namespace ModelContextProtocol.Server;
99
/// </summary>
1010
internal sealed class MrtrExchange
1111
{
12-
public MrtrExchange(string key, InputRequest inputRequest)
12+
public MrtrExchange(string key, InputRequest inputRequest, TaskCompletionSource<MrtrExchange> sourceTcs)
1313
{
1414
Key = key;
1515
InputRequest = inputRequest;
16+
SourceTcs = sourceTcs;
1617
ResponseTcs = new TaskCompletionSource<InputResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
1718
}
1819

@@ -26,6 +27,13 @@ public MrtrExchange(string key, InputRequest inputRequest)
2627
/// </summary>
2728
public InputRequest InputRequest { get; }
2829

30+
/// <summary>
31+
/// The <see cref="TaskCompletionSource{TResult}"/> that this exchange was set as the result of.
32+
/// Used by <see cref="MrtrContext.ResetForNextExchange"/> on retry to validate
33+
/// the expected state via <see cref="Interlocked.CompareExchange{T}"/>.
34+
/// </summary>
35+
internal TaskCompletionSource<MrtrExchange> SourceTcs { get; }
36+
2937
/// <summary>
3038
/// The TCS that will be completed with the client's response.
3139
/// </summary>

0 commit comments

Comments
 (0)