Skip to content

Commit ed07fdb

Browse files
authored
Add diagnostics for messages dropped on the GET SSE stream (#1609)
1 parent acb4cc9 commit ed07fdb

3 files changed

Lines changed: 255 additions & 4 deletions

File tree

src/ModelContextProtocol.Core/Server/McpServer.Methods.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,18 @@ public static McpServer Create(
5454
/// <exception cref="InvalidOperationException">The client does not support sampling.</exception>
5555
/// <exception cref="McpException">The request failed or the client returned an error response.</exception>
5656
/// <remarks>
57+
/// <para>
58+
/// When the server is using the Streamable HTTP transport, prefer calling this method on the
59+
/// <see cref="McpServer"/> instance available via <c>RequestContext</c> from inside a tool, prompt,
60+
/// or resource handler. That routes the request through the originating POST response stream via
61+
/// <see cref="JsonRpcMessageContext.RelatedTransport"/>, which is always open for the duration of
62+
/// the request, rather than relying on the optional standalone GET SSE stream.
63+
/// </para>
64+
/// <para>
5765
/// When called during task-augmented tool execution, this method automatically updates the task
5866
/// status to <see cref="McpTaskStatus.InputRequired"/> while waiting for the client response,
5967
/// then returns to <see cref="McpTaskStatus.Working"/> when the response is received.
68+
/// </para>
6069
/// </remarks>
6170
public async ValueTask<CreateMessageResult> SampleAsync(
6271
CreateMessageRequestParams requestParams,
@@ -252,6 +261,13 @@ public async Task<ChatResponse> SampleAsync(
252261
/// <param name="serializerOptions">The <see cref="JsonSerializerOptions"/> to use for serialization. If <see langword="null"/>, <see cref="McpJsonUtilities.DefaultOptions"/> is used.</param>
253262
/// <returns>The <see cref="IChatClient"/> that can be used to issue sampling requests to the client.</returns>
254263
/// <exception cref="InvalidOperationException">The client does not support sampling.</exception>
264+
/// <remarks>
265+
/// When the server is using the Streamable HTTP transport, prefer obtaining this chat client from the
266+
/// <see cref="McpServer"/> instance available via <c>RequestContext</c> from inside a tool, prompt,
267+
/// or resource handler. That routes sampling requests through the originating POST response stream via
268+
/// <see cref="JsonRpcMessageContext.RelatedTransport"/>, which is always open for the duration of
269+
/// the request, rather than relying on the optional standalone GET SSE stream.
270+
/// </remarks>
255271
public IChatClient AsSamplingChatClient(JsonSerializerOptions? serializerOptions = null)
256272
{
257273
ThrowIfSamplingUnsupported();
@@ -273,6 +289,13 @@ public ILoggerProvider AsClientLoggerProvider() =>
273289
/// <exception cref="ArgumentNullException"><paramref name="requestParams"/> is <see langword="null"/>.</exception>
274290
/// <exception cref="InvalidOperationException">The client does not support roots.</exception>
275291
/// <exception cref="McpException">The request failed or the client returned an error response.</exception>
292+
/// <remarks>
293+
/// When the server is using the Streamable HTTP transport, prefer calling this method on the
294+
/// <see cref="McpServer"/> instance available via <c>RequestContext</c> from inside a tool, prompt,
295+
/// or resource handler. That routes the request through the originating POST response stream via
296+
/// <see cref="JsonRpcMessageContext.RelatedTransport"/>, which is always open for the duration of
297+
/// the request, rather than relying on the optional standalone GET SSE stream.
298+
/// </remarks>
276299
public ValueTask<ListRootsResult> RequestRootsAsync(
277300
ListRootsRequestParams requestParams,
278301
CancellationToken cancellationToken = default)
@@ -298,9 +321,18 @@ public ValueTask<ListRootsResult> RequestRootsAsync(
298321
/// <exception cref="InvalidOperationException">The client does not support elicitation.</exception>
299322
/// <exception cref="McpException">The request failed or the client returned an error response.</exception>
300323
/// <remarks>
324+
/// <para>
325+
/// When the server is using the Streamable HTTP transport, prefer calling this method on the
326+
/// <see cref="McpServer"/> instance available via <c>RequestContext</c> from inside a tool, prompt,
327+
/// or resource handler. That routes the request through the originating POST response stream via
328+
/// <see cref="JsonRpcMessageContext.RelatedTransport"/>, which is always open for the duration of
329+
/// the request, rather than relying on the optional standalone GET SSE stream.
330+
/// </para>
331+
/// <para>
301332
/// When called during task-augmented tool execution, this method automatically updates the task
302333
/// status to <see cref="McpTaskStatus.InputRequired"/> while waiting for user input,
303334
/// then returns to <see cref="McpTaskStatus.Working"/> when the response is received.
335+
/// </para>
304336
/// </remarks>
305337
public async ValueTask<ElicitResult> ElicitAsync(
306338
ElicitRequestParams requestParams,

src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,34 @@ public async Task<bool> HandlePostRequestAsync(JsonRpcMessage message, Stream re
221221
}
222222

223223
/// <inheritdoc/>
224+
/// <remarks>
225+
/// <para>
226+
/// This method sends server-to-client messages via the standalone SSE stream opened by an
227+
/// optional HTTP GET request (see <see cref="HandleGetRequestAsync(Stream, CancellationToken)"/>).
228+
/// </para>
229+
/// <para>
230+
/// <strong>This is generally the wrong channel for server-to-client requests.</strong> Requests
231+
/// sent via the GET stream depend on the client keeping a long-lived GET open, have no per-request
232+
/// correlation to a caller, and race with GET startup and teardown. When called from inside a
233+
/// tool, prompt, or resource handler, use the <see cref="McpServer"/> instance available via
234+
/// <c>RequestContext</c> instead — it routes through the originating POST response stream via
235+
/// <see cref="JsonRpcMessageContext.RelatedTransport"/>, which is always open for the duration of
236+
/// the request. A <see cref="LogLevel.Warning"/> diagnostic is emitted whenever a
237+
/// <see cref="JsonRpcRequest"/> is sent through this method.
238+
/// </para>
239+
/// <para>
240+
/// If no GET SSE stream has yet been opened on this session, behavior depends on the message kind:
241+
/// <see cref="JsonRpcRequest"/> messages throw <see cref="InvalidOperationException"/> because the
242+
/// awaiting caller has no way to receive a response; <see cref="JsonRpcNotification"/> messages are
243+
/// dropped (notifications are best-effort and the spec does not require clients to issue a GET)
244+
/// and a <see cref="LogLevel.Debug"/> diagnostic is logged; other messages are dropped and a
245+
/// <see cref="LogLevel.Warning"/> diagnostic is logged.
246+
/// </para>
247+
/// </remarks>
248+
/// <exception cref="InvalidOperationException">
249+
/// <see cref="Stateless"/> is <see langword="true"/>, or <paramref name="message"/> is a
250+
/// <see cref="JsonRpcRequest"/> and no GET SSE stream has been opened on this session.
251+
/// </exception>
224252
public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
225253
{
226254
Throw.IfNull(message);
@@ -234,9 +262,32 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
234262

235263
if (!_getHttpRequestStarted)
236264
{
237-
// Clients are not required to make a GET request for unsolicited messages.
238-
// If no GET request has been made, drop the message.
239-
return;
265+
switch (message)
266+
{
267+
case JsonRpcRequest request:
268+
throw new InvalidOperationException(
269+
$"Cannot send server-to-client JSON-RPC request '{request.Method}' because no GET SSE stream has been opened on this session " +
270+
$"(SessionId: '{SessionId}'). " +
271+
"Inside a tool, prompt, or resource handler, use the IMcpServer instance from RequestContext (or any IMcpServer obtained via DI from a request-scoped service provider) so the request is routed through the originating POST response stream via JsonRpcMessageContext.RelatedTransport. " +
272+
"The standalone GET SSE stream is optional for clients and is not a reliable channel for server-to-client requests.");
273+
274+
case JsonRpcNotification notification:
275+
// Clients are not required to make a GET request for unsolicited messages.
276+
// If no GET request has been made, drop the notification (best-effort).
277+
LogNotificationDroppedNoGetStream(notification.Method, SessionId ?? string.Empty);
278+
return;
279+
280+
default:
281+
// JsonRpcResponse / JsonRpcError generally flow through the originating POST response
282+
// stream, so receiving one here without a GET is unexpected. Log loudly and drop.
283+
LogMessageDroppedNoGetStream(message.GetType().Name, GetMessageId(message), SessionId ?? string.Empty);
284+
return;
285+
}
286+
}
287+
288+
if (message is JsonRpcRequest openRequest)
289+
{
290+
LogServerRequestOverGetStream(openRequest.Method, SessionId ?? string.Empty);
240291
}
241292

242293
Debug.Assert(_httpResponseTcs is not null);
@@ -263,6 +314,9 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
263314
}
264315
}
265316

317+
private static string GetMessageId(JsonRpcMessage message) =>
318+
message is JsonRpcMessageWithId withId ? withId.Id.ToString() : string.Empty;
319+
266320
/// <inheritdoc/>
267321
public async ValueTask DisposeAsync()
268322
{
@@ -323,4 +377,18 @@ public async ValueTask DisposeAsync()
323377

324378
return sseEventStreamWriter;
325379
}
380+
381+
[LoggerMessage(Level = LogLevel.Warning,
382+
Message = "Sending server-to-client JSON-RPC request '{Method}' over the standalone GET SSE stream (SessionId: '{SessionId}'). " +
383+
"Consider using the IMcpServer instance from RequestContext inside a tool, prompt, or resource handler so the request is routed through the originating POST response stream via JsonRpcMessageContext.RelatedTransport, which is more reliable than the optional GET SSE stream.")]
384+
private partial void LogServerRequestOverGetStream(string method, string sessionId);
385+
386+
[LoggerMessage(Level = LogLevel.Debug,
387+
Message = "Dropping server-to-client JSON-RPC notification '{Method}' because no GET SSE stream has been opened on this session (SessionId: '{SessionId}').")]
388+
private partial void LogNotificationDroppedNoGetStream(string method, string sessionId);
389+
390+
[LoggerMessage(Level = LogLevel.Warning,
391+
Message = "Dropping unexpected server-to-client {MessageType} (Id: '{MessageId}') because no GET SSE stream has been opened on this session (SessionId: '{SessionId}'). " +
392+
"Responses normally flow through the originating POST response stream via JsonRpcMessageContext.RelatedTransport.")]
393+
private partial void LogMessageDroppedNoGetStream(string messageType, string messageId, string sessionId);
326394
}

tests/ModelContextProtocol.AspNetCore.Tests/StreamableHttpServerConformanceTests.cs

Lines changed: 152 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,8 @@ async Task<string> GetFirstNotificationAsync()
384384
public async Task SendNotificationAsync_DoesNotThrow_WhenNoGetRequestHasBeenMade()
385385
{
386386
// Clients are not required to make a GET request for unsolicited messages.
387-
// If no GET request has been made, the messages should be dropped rather than throwing.
387+
// If no GET request has been made, the messages should be dropped rather than throwing,
388+
// and the drop should be visible as a Debug-level log so it can be diagnosed.
388389
McpServer? server = null;
389390

390391
Builder.Services.AddMcpServer()
@@ -409,6 +410,156 @@ public async Task SendNotificationAsync_DoesNotThrow_WhenNoGetRequestHasBeenMade
409410
var exception = await Record.ExceptionAsync(() =>
410411
server.SendNotificationAsync("test-method", TestContext.Current.CancellationToken));
411412
Assert.Null(exception);
413+
414+
Assert.Contains(MockLoggerProvider.LogMessages, log =>
415+
log.Category == typeof(StreamableHttpServerTransport).FullName &&
416+
log.LogLevel == LogLevel.Debug &&
417+
log.Message.Contains("test-method") &&
418+
log.Message.Contains("no GET SSE stream"));
419+
}
420+
421+
[Fact]
422+
public async Task SendRequestAsync_Throws_WhenNoGetRequestHasBeenMade()
423+
{
424+
// A server-to-client request sent before any GET SSE stream is opened can never
425+
// receive a response, so the transport should fail fast with InvalidOperationException
426+
// instead of silently dropping the message and leaving the caller hanging on the TCS
427+
// registered by SendRequestAsync.
428+
McpServer? server = null;
429+
430+
Builder.Services.AddMcpServer()
431+
.WithHttpTransport(options =>
432+
{
433+
#pragma warning disable MCPEXP002 // RunSessionHandler is experimental
434+
options.RunSessionHandler = (httpContext, mcpServer, cancellationToken) =>
435+
{
436+
server = mcpServer;
437+
return mcpServer.RunAsync(cancellationToken);
438+
};
439+
#pragma warning restore MCPEXP002
440+
});
441+
442+
await StartAsync();
443+
444+
await CallInitializeAndValidateAsync();
445+
Assert.NotNull(server);
446+
447+
var request = new JsonRpcRequest
448+
{
449+
Method = "roots/list",
450+
Id = new RequestId(42),
451+
};
452+
453+
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() =>
454+
server.SendRequestAsync(request, TestContext.Current.CancellationToken));
455+
456+
Assert.Contains("roots/list", ex.Message);
457+
Assert.Contains("no GET SSE stream", ex.Message);
458+
Assert.Contains("RequestContext", ex.Message);
459+
Assert.Contains("RelatedTransport", ex.Message);
460+
}
461+
462+
[Fact]
463+
public async Task SendMessageAsync_LogsWarning_OnUnexpectedResponse_WhenNoGetRequestHasBeenMade()
464+
{
465+
// Responses normally ride the originating POST response stream via RelatedTransport, so
466+
// receiving one through the GET path without an open GET is unexpected. The message is
467+
// dropped (preserving best-effort semantics) but a warning is logged so the situation is
468+
// visible.
469+
McpServer? server = null;
470+
471+
Builder.Services.AddMcpServer()
472+
.WithHttpTransport(options =>
473+
{
474+
#pragma warning disable MCPEXP002 // RunSessionHandler is experimental
475+
options.RunSessionHandler = (httpContext, mcpServer, cancellationToken) =>
476+
{
477+
server = mcpServer;
478+
return mcpServer.RunAsync(cancellationToken);
479+
};
480+
#pragma warning restore MCPEXP002
481+
});
482+
483+
await StartAsync();
484+
485+
await CallInitializeAndValidateAsync();
486+
Assert.NotNull(server);
487+
488+
var response = new JsonRpcResponse
489+
{
490+
Id = new RequestId(7),
491+
Result = new JsonObject(),
492+
};
493+
494+
var exception = await Record.ExceptionAsync(() =>
495+
server.SendMessageAsync(response, TestContext.Current.CancellationToken));
496+
Assert.Null(exception);
497+
498+
Assert.Contains(MockLoggerProvider.LogMessages, log =>
499+
log.Category == typeof(StreamableHttpServerTransport).FullName &&
500+
log.LogLevel == LogLevel.Warning &&
501+
log.Message.Contains(nameof(JsonRpcResponse)) &&
502+
log.Message.Contains("no GET SSE stream"));
503+
}
504+
505+
[Fact]
506+
public async Task SendRequestAsync_LogsWarning_WhenGetRequestIsOpen()
507+
{
508+
// Even when the GET SSE stream is open and the request is delivered, server-to-client
509+
// requests sent via the GET path are fragile (no per-request correlation, depend on a
510+
// long-lived GET, race with startup/teardown). A warning is logged to direct callers at
511+
// the RequestContext.RelatedTransport channel instead, without changing behavior.
512+
McpServer? server = null;
513+
514+
Builder.Services.AddMcpServer()
515+
.WithHttpTransport(options =>
516+
{
517+
#pragma warning disable MCPEXP002 // RunSessionHandler is experimental
518+
options.RunSessionHandler = (httpContext, mcpServer, cancellationToken) =>
519+
{
520+
server = mcpServer;
521+
return mcpServer.RunAsync(cancellationToken);
522+
};
523+
#pragma warning restore MCPEXP002
524+
});
525+
526+
await StartAsync();
527+
528+
await CallInitializeAndValidateAsync();
529+
Assert.NotNull(server);
530+
531+
using var getResponse = await HttpClient.GetAsync("", HttpCompletionOption.ResponseHeadersRead, TestContext.Current.CancellationToken);
532+
Assert.Equal(HttpStatusCode.OK, getResponse.StatusCode);
533+
534+
// Send a request via the GET stream and assert it lands on the wire (proving behavior is unchanged).
535+
// SendRequestAsync awaits a response that the test never produces, so use a CTS to cancel after
536+
// confirming wire delivery.
537+
var request = new JsonRpcRequest
538+
{
539+
Method = "roots/list",
540+
Id = new RequestId(99),
541+
};
542+
543+
using var requestCts = CancellationTokenSource.CreateLinkedTokenSource(TestContext.Current.CancellationToken);
544+
var sendTask = server.SendRequestAsync(request, requestCts.Token);
545+
546+
await foreach (var sseEvent in ReadSseAsync(getResponse.Content))
547+
{
548+
var received = JsonSerializer.Deserialize(sseEvent, GetJsonTypeInfo<JsonRpcRequest>());
549+
Assert.NotNull(received);
550+
Assert.Equal("roots/list", received.Method);
551+
break;
552+
}
553+
554+
// Cancel the awaited response so SendRequestAsync completes — the wire delivery has already happened.
555+
requestCts.Cancel();
556+
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => sendTask);
557+
558+
Assert.Contains(MockLoggerProvider.LogMessages, log =>
559+
log.Category == typeof(StreamableHttpServerTransport).FullName &&
560+
log.LogLevel == LogLevel.Warning &&
561+
log.Message.Contains("roots/list") &&
562+
log.Message.Contains("RequestContext"));
412563
}
413564

414565
[Fact]

0 commit comments

Comments
 (0)