Skip to content

Commit c3e32fe

Browse files
ericstjCopilot
andcommitted
Use lock-based sweep to eliminate pending request race condition
Replace the volatile flag + double-sweep approach with a lock that makes the completion flag set + sweep in ProcessMessagesCoreAsync mutually exclusive with the flag check in SendRequestAsync. This ensures that either the sweep finds the TCS, or SendRequestAsync sees the flag, even if ConcurrentDictionary's non-atomic iteration (which traverses buckets one-by-one without locks) races with a concurrent add. The lock also provides full memory barriers on all architectures, eliminating any potential store/load visibility concerns that the volatile-only approach may have had under certain timing conditions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 9d9a281 commit c3e32fe

1 file changed

Lines changed: 23 additions & 17 deletions

File tree

src/ModelContextProtocol.Core/McpSessionHandler.cs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,13 @@ internal static bool SupportsPrimingEvent(string? protocolVersion)
9191

9292
private CancellationTokenSource? _messageProcessingCts;
9393
private Task? _messageProcessingTask;
94-
private volatile bool _messageProcessingComplete;
94+
95+
// Gate used to make the completion flag + sweep in ProcessMessagesCoreAsync mutually exclusive
96+
// with the flag check in SendRequestAsync. This ensures that either the sweep finds the TCS or
97+
// SendRequestAsync sees the flag, even if ConcurrentDictionary's non-atomic iteration races
98+
// with a concurrent add. The lock also provides full memory barriers on all architectures.
99+
private readonly object _completionSweepGate = new();
100+
private bool _messageProcessingComplete;
95101

96102
/// <summary>
97103
/// Initializes a new instance of the <see cref="McpSessionHandler"/> class.
@@ -325,11 +331,6 @@ ex is OperationCanceledException &&
325331
await allHandlersCompleted.Task.ConfigureAwait(false);
326332
}
327333

328-
// Fail any pending requests, as they'll never be satisfied.
329-
// Set the flag before sweeping so that any request registered concurrently
330-
// via SendRequestAsync after the sweep will see it and fail itself.
331-
_messageProcessingComplete = true;
332-
333334
// If the transport's channel was completed with a ClientTransportClosedException,
334335
// propagate it so callers can access the structured completion details.
335336
Exception pendingException =
@@ -338,13 +339,14 @@ ex is OperationCanceledException &&
338339
? innerException
339340
: new IOException("The server shut down unexpectedly.");
340341

341-
// ConcurrentDictionary.GetEnumerator() is non-atomic: it traverses buckets
342-
// one-by-one without locks. An entry added to an already-traversed bucket
343-
// during iteration can be missed. Sweep twice so the second pass catches any
344-
// entries the first pass skipped. Entries registered after the flag is set are
345-
// self-handled by SendRequestAsync's flag check.
346-
for (int pass = 0; pass < 2; pass++)
342+
// Fail any pending requests, as they'll never be satisfied.
343+
// The lock ensures mutual exclusion with SendRequestAsync's flag check:
344+
// either our sweep finds the TCS, or SendRequestAsync sees the flag — even if
345+
// ConcurrentDictionary's non-atomic iteration misses a concurrent add.
346+
lock (_completionSweepGate)
347347
{
348+
_messageProcessingComplete = true;
349+
348350
foreach (var entry in _pendingRequests)
349351
{
350352
entry.Value.TrySetException(pendingException);
@@ -592,13 +594,17 @@ public async Task<JsonRpcResponse> SendRequestAsync(JsonRpcRequest request, Canc
592594
_pendingRequests[request.Id] = tcs;
593595

594596
// If message processing has already completed (transport closed), fail the request
595-
// immediately. This handles the race where the reading loop's cleanup sweep ran
596-
// before this request was registered, so it was missed by the sweep.
597-
if (_messageProcessingComplete)
597+
// immediately. The lock ensures mutual exclusion with ProcessMessagesCoreAsync's
598+
// sweep: either the sweep found this TCS, or we see the flag here. This eliminates
599+
// the race where ConcurrentDictionary's non-atomic iteration misses a concurrent add.
600+
lock (_completionSweepGate)
598601
{
599-
if (_pendingRequests.TryRemove(request.Id, out var removed))
602+
if (_messageProcessingComplete)
600603
{
601-
removed.TrySetException(new IOException("The server shut down unexpectedly."));
604+
if (_pendingRequests.TryRemove(request.Id, out var removed))
605+
{
606+
removed.TrySetException(new IOException("The server shut down unexpectedly."));
607+
}
602608
}
603609
}
604610

0 commit comments

Comments
 (0)