forked from modelcontextprotocol/csharp-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathKestrelInMemoryConnection.cs
More file actions
124 lines (101 loc) · 5.4 KB
/
KestrelInMemoryConnection.cs
File metadata and controls
124 lines (101 loc) · 5.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
using System.IO.Pipelines;
namespace ModelContextProtocol.AspNetCore.Tests.Utils;
public sealed class KestrelInMemoryConnection : ConnectionContext
{
private readonly Pipe _clientToServerPipe = new();
private readonly Pipe _serverToClientPipe = new();
private readonly CancellationTokenSource _connectionClosedCts = new();
private readonly FeatureCollection _features = new();
public KestrelInMemoryConnection()
{
ConnectionClosed = _connectionClosedCts.Token;
Transport = new DuplexPipe
{
Input = _clientToServerPipe.Reader,
Output = _serverToClientPipe.Writer,
};
Application = new DuplexPipe
{
Input = _serverToClientPipe.Reader,
Output = _clientToServerPipe.Writer,
};
ClientStream = new DuplexStream(Application, _connectionClosedCts);
}
public IDuplexPipe Application { get; }
public Stream ClientStream { get; }
public override IDuplexPipe Transport { get; set; }
public override string ConnectionId { get; set; } = Guid.NewGuid().ToString("N");
public override IFeatureCollection Features => _features;
public override IDictionary<object, object?> Items { get; set; } = new Dictionary<object, object?>();
public override async ValueTask DisposeAsync()
{
// This is called by Kestrel. The client should dispose the DuplexStream which
// completes the other half of these pipes.
await _serverToClientPipe.Writer.CompleteAsync();
await _serverToClientPipe.Reader.CompleteAsync();
// Don't bother disposing the _connectionClosedCts, since this is just for testing,
// and it's annoying to synchronize with DuplexStream.
await base.DisposeAsync();
}
private class DuplexPipe : IDuplexPipe
{
public required PipeReader Input { get; init; }
public required PipeWriter Output { get; init; }
}
private class DuplexStream(IDuplexPipe duplexPipe, CancellationTokenSource connectionClosedCts) : Stream
{
private readonly Stream _readStream = duplexPipe.Input.AsStream();
private readonly Stream _writeStream = duplexPipe.Output.AsStream();
public override bool CanRead => true;
public override bool CanWrite => true;
public override bool CanSeek => false;
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// Normally, Kestrel will trigger RequestAborted when the connectionClosedCts fires causing it to gracefully close
// the connection. However, there's currently a race condition that can cause this to get missed. This at least
// unblocks HttpConnection.SendAsync when it disposes the underlying connection stream while awaiting the _readAheadTask
// as would happen with a real socket. https://github.com/dotnet/aspnetcore/pull/62385
using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, connectionClosedCts.Token);
return await _readStream.ReadAsync(buffer, offset, count, linkedTokenSource.Token);
}
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, connectionClosedCts.Token);
return await _readStream.ReadAsync(buffer, linkedTokenSource.Token);
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> _writeStream.WriteAsync(buffer, offset, count, cancellationToken);
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
=> _writeStream.WriteAsync(buffer, cancellationToken);
public override Task FlushAsync(CancellationToken cancellationToken)
=> _writeStream.FlushAsync(cancellationToken);
protected override void Dispose(bool disposing)
{
// Signal to the server the the client has closed the connection, and dispose the client-half of the Pipes.
ThreadPool.UnsafeQueueUserWorkItem(static cts => ((CancellationTokenSource)cts!).Cancel(), connectionClosedCts);
duplexPipe.Input.Complete();
duplexPipe.Output.Complete();
}
// Unsupported stuff
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
// Don't bother supporting sync or APM methods. SocketsHttpHandler shouldn't use them.
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override void Flush() => throw new NotSupportedException();
}
}