diff --git a/samples/KestrelHttp2DetachDemo/Http2Frame.cs b/samples/KestrelHttp2DetachDemo/Http2Frame.cs new file mode 100644 index 000000000..7c09a73f6 --- /dev/null +++ b/samples/KestrelHttp2DetachDemo/Http2Frame.cs @@ -0,0 +1,95 @@ +using System.Buffers; +using System.Text; + +namespace KestrelHttp2DetachDemo; + +internal enum Http2FrameType : byte +{ + Data = 0x0, + Settings = 0x4 +} + +internal enum Http2FrameFlags : byte +{ + None = 0x0, + Ack = 0x1, + EndStream = 0x1 +} + +internal sealed class Http2Frame +{ + public const int HeaderLength = 9; + + public Http2Frame(int length, Http2FrameType type, byte flags, int streamId, byte[] payload) + { + Length = length; + Type = type; + Flags = flags; + StreamId = streamId; + Payload = payload; + } + + public int Length { get; } + + public Http2FrameType Type { get; } + + public byte Flags { get; } + + public int StreamId { get; } + + public byte[] Payload { get; } + + public bool IsSettingsAck => Type == Http2FrameType.Settings && (Flags & (byte)Http2FrameFlags.Ack) != 0; + + public string PayloadText => Encoding.UTF8.GetString(Payload); + + public static Http2Frame Read(ReadOnlySequence buffer) + { + Span header = stackalloc byte[HeaderLength]; + buffer.Slice(0, HeaderLength).CopyTo(header); + + var length = ReadUInt24(header); + var type = (Http2FrameType)header[3]; + var flags = header[4]; + var streamId = ReadInt31(header.Slice(5, 4)); + var payload = buffer.Slice(HeaderLength, length).ToArray(); + + return new Http2Frame(length, type, flags, streamId, payload); + } + + public static byte[] Write(Http2FrameType type, byte flags, int streamId, ReadOnlySpan payload) + { + var frame = new byte[HeaderLength + payload.Length]; + WriteUInt24(frame, payload.Length); + frame[3] = (byte)type; + frame[4] = flags; + WriteInt31(frame.AsSpan(5, 4), streamId); + payload.CopyTo(frame.AsSpan(HeaderLength)); + return frame; + } + + private static int ReadUInt24(ReadOnlySpan source) + { + return (source[0] << 16) | (source[1] << 8) | source[2]; + } + + private static void WriteUInt24(Span destination, int value) + { + destination[0] = (byte)((value >> 16) & 0xff); + destination[1] = (byte)((value >> 8) & 0xff); + destination[2] = (byte)(value & 0xff); + } + + private static int ReadInt31(ReadOnlySpan source) + { + return ((source[0] & 0x7f) << 24) | (source[1] << 16) | (source[2] << 8) | source[3]; + } + + private static void WriteInt31(Span destination, int value) + { + destination[0] = (byte)((value >> 24) & 0x7f); + destination[1] = (byte)((value >> 16) & 0xff); + destination[2] = (byte)((value >> 8) & 0xff); + destination[3] = (byte)(value & 0xff); + } +} diff --git a/samples/KestrelHttp2DetachDemo/Http2PipelineFilter.cs b/samples/KestrelHttp2DetachDemo/Http2PipelineFilter.cs new file mode 100644 index 000000000..77652181c --- /dev/null +++ b/samples/KestrelHttp2DetachDemo/Http2PipelineFilter.cs @@ -0,0 +1,78 @@ +using System.Buffers; +using System.Text; +using SuperSocket.ProtoBase; + +namespace KestrelHttp2DetachDemo; + +internal sealed class Http2PipelineFilter : PipelineFilterBase +{ + private static readonly byte[] ClientPreface = Encoding.ASCII.GetBytes("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"); + + private readonly bool _expectClientPreface; + private bool _clientPrefaceRead; + private bool _foundHeader; + private int _totalSize; + + public Http2PipelineFilter(bool expectClientPreface = true) + { + _expectClientPreface = expectClientPreface; + } + + public override Http2Frame Filter(ref SequenceReader reader) + { + if (_expectClientPreface && !_clientPrefaceRead) + { + if (reader.Remaining < ClientPreface.Length) + return null!; + + Span preface = stackalloc byte[24]; + reader.Sequence.Slice(reader.Position, ClientPreface.Length).CopyTo(preface); + + if (!preface.SequenceEqual(ClientPreface)) + throw new ProtocolException("The client did not send a valid HTTP/2 connection preface."); + + reader.Advance(ClientPreface.Length); + _clientPrefaceRead = true; + } + + if (!_foundHeader) + { + if (reader.Remaining < Http2Frame.HeaderLength) + return null!; + + var header = reader.Sequence.Slice(reader.Position, Http2Frame.HeaderLength); + var bodyLength = GetBodyLengthFromHeader(header); + + if (bodyLength < 0) + throw new ProtocolException("Failed to get body length from the HTTP/2 frame header."); + + _foundHeader = true; + _totalSize = Http2Frame.HeaderLength + bodyLength; + } + + if (reader.Remaining < _totalSize) + return null!; + + var package = reader.Sequence.Slice(reader.Position, _totalSize); + var frame = Http2Frame.Read(package); + reader.Advance(_totalSize); + _foundHeader = false; + _totalSize = 0; + + return frame; + } + + public override void Reset() + { + base.Reset(); + _foundHeader = false; + _totalSize = 0; + } + + private static int GetBodyLengthFromHeader(ReadOnlySequence headerBuffer) + { + Span header = stackalloc byte[3]; + headerBuffer.Slice(0, 3).CopyTo(header); + return (header[0] << 16) | (header[1] << 8) | header[2]; + } +} diff --git a/samples/KestrelHttp2DetachDemo/KestrelHttp2DetachDemo.csproj b/samples/KestrelHttp2DetachDemo/KestrelHttp2DetachDemo.csproj new file mode 100644 index 000000000..44d7e96ca --- /dev/null +++ b/samples/KestrelHttp2DetachDemo/KestrelHttp2DetachDemo.csproj @@ -0,0 +1,18 @@ + + + Exe + $(SamplesTargetFramework) + false + enable + enable + Demonstrates a minimal HTTP/2 preface and SETTINGS handshake over SuperSocket.Kestrel, then DetachAsync and exact raw data transfer over the same Kestrel transport. + + + + + + + + + + diff --git a/samples/KestrelHttp2DetachDemo/Program.cs b/samples/KestrelHttp2DetachDemo/Program.cs new file mode 100644 index 000000000..739037135 --- /dev/null +++ b/samples/KestrelHttp2DetachDemo/Program.cs @@ -0,0 +1,707 @@ +using System.Buffers; +using System.IO; +using System.IO.Pipelines; +using System.Net; +using System.Net.Sockets; +using System.Text; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Server.Kestrel.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using SuperSocket.Connection; +using SuperSocket.Kestrel; +using SuperSocket.ProtoBase; + +namespace KestrelHttp2DetachDemo; + +internal static class Program +{ + private const int Port = 4052; + private const int TcpPort = 4053; + private static readonly byte[] ClientPreface = Encoding.ASCII.GetBytes("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"); + private static int _activeServerConnections; + private static TimeSpan _serverDetachDelay = TimeSpan.Zero; + private static TaskCompletionSource _serverHandshakeDetached = CreateSignal(); + + internal static int ActiveServerConnections => Volatile.Read(ref _activeServerConnections); + + private static async Task Main(string[] args) + { + Console.WriteLine("SuperSocket.Kestrel HTTP/2 detach demo"); + Console.WriteLine("Handshake uses SuperSocket.Kestrel; normal data is sent after DetachAsync."); + + var app = BuildApp(args); + await using var host = app; + + await app.StartAsync(); + + Console.WriteLine($"ASP.NET Core Kestrel server started on 127.0.0.1:{Port}."); + + try + { + if (args.Contains("--no-signal-wait", StringComparer.OrdinalIgnoreCase)) + { + await RunNoSignalWaitDetachDemoAsync(app.Services); + } + else + { + await RunCleanDetachDemoAsync(app.Services); + + if (args.Contains("--stress-early-raw", StringComparer.OrdinalIgnoreCase)) + { + await RunEarlyRawDataDuringDetachDemoAsync(app.Services); + } + else if (args.Contains("--compare-tcp", StringComparer.OrdinalIgnoreCase)) + { + await RunTcpEarlyRawComparisonDemoAsync(app.Services); + } + else + { + Console.WriteLine("Run with --no-signal-wait to verify Kestrel raw handoff without waiting for the in-process server detach signal."); + Console.WriteLine("Run with --stress-early-raw to reproduce the unsafe case where raw data arrives before the server wrapper starts detaching."); + Console.WriteLine("Run with --compare-tcp to compare the same no-wait handoff over ordinary TcpPipeConnection."); + Console.WriteLine("Demo complete."); + } + } + } + finally + { + await app.StopAsync(); + } + } + + private static WebApplication BuildApp(string[] args) + { + var builder = WebApplication.CreateBuilder(args); + + builder.Services.AddConnections(); + builder.Services.AddSocketConnectionFactory(); + builder.Services.AddSingleton(); + + builder.Logging.ClearProviders(); + builder.Logging.AddConsole(); + builder.Logging.SetMinimumLevel(LogLevel.Warning); + + builder.WebHost.ConfigureKestrel(options => + { + options.ListenLocalhost(Port, listenOptions => + { + listenOptions.UseConnectionHandler(); + }); + }); + + return builder.Build(); + } + + private static async Task RunCleanDetachDemoAsync(IServiceProvider services) + { + _serverDetachDelay = TimeSpan.Zero; + ResetServerHandshakeDetachedSignal(); + + var connectionFactory = services.GetRequiredService(); + var loggerFactory = services.GetRequiredService(); + + await using var context = await connectionFactory.ConnectAsync(new IPEndPoint(IPAddress.Loopback, Port)); + Console.WriteLine("Clean scenario: client connected with Kestrel IConnectionFactory."); + + var handshakeConnection = CreateKestrelConnection(context, loggerFactory, "client-handshake-wrapper"); + await RunClientHandshakeAndDetachAsync(context, handshakeConnection); + + Console.WriteLine($"After DetachAsync: wrapper IsClosed={handshakeConnection.IsClosed}, CloseReason={handshakeConnection.CloseReason?.ToString() ?? ""}"); + Console.WriteLine($"After DetachAsync: Kestrel context closed={context.ConnectionClosed.IsCancellationRequested}, active server connections={ActiveServerConnections}"); + await WaitForServerHandshakeDetachedAsync(); + Console.WriteLine("Server-side SuperSocket wrapper has detached; starting raw Kestrel data transfer."); + + await VerifyRawDataAfterDetachAsync(context); + + Console.WriteLine("Raw data after DetachAsync matched exactly; no byte shift or corruption was observed."); + } + + private static async Task RunNoSignalWaitDetachDemoAsync(IServiceProvider services) + { + _serverDetachDelay = TimeSpan.Zero; + ResetServerHandshakeDetachedSignal(); + + var connectionFactory = services.GetRequiredService(); + var loggerFactory = services.GetRequiredService(); + + await using var context = await connectionFactory.ConnectAsync(new IPEndPoint(IPAddress.Loopback, Port)); + Console.WriteLine("No-signal-wait scenario: client connected with Kestrel IConnectionFactory."); + + var handshakeConnection = CreateKestrelConnection(context, loggerFactory, "client-no-signal-handshake-wrapper"); + await RunClientHandshakeAndDetachAsync(context, handshakeConnection); + + Console.WriteLine("Client starts raw Kestrel data transfer immediately after client DetachAsync; no in-process server detach signal is awaited."); + await VerifyRawDataAfterDetachAsync(context); + + Console.WriteLine("No-signal-wait Kestrel raw handoff matched exactly; no byte shift or corruption was observed."); + Console.WriteLine("Demo complete."); + } + + private static async Task RunEarlyRawDataDuringDetachDemoAsync(IServiceProvider services) + { + _serverDetachDelay = TimeSpan.FromMilliseconds(250); + ResetServerHandshakeDetachedSignal(); + + var connectionFactory = services.GetRequiredService(); + var loggerFactory = services.GetRequiredService(); + + await using var context = await connectionFactory.ConnectAsync(new IPEndPoint(IPAddress.Loopback, Port)); + Console.WriteLine("Early-data scenario: client connected with Kestrel IConnectionFactory."); + + var handshakeConnection = CreateKestrelConnection(context, loggerFactory, "client-early-data-handshake-wrapper"); + await RunClientHandshakeAndDetachAsync(context, handshakeConnection); + + Console.WriteLine("Kestrel client writes raw data after client DetachAsync, without waiting for the server-side detach signal."); + await WriteRawMessageAsync(context.Transport.Output, "raw-arrived-before-server-detach"); + + const string expected = "raw-echo:raw-arrived-before-server-detach"; + + try + { + var response = await ReadRawMessageAsync(context.Transport.Input, context.ConnectionClosed).WaitAsync(TimeSpan.FromSeconds(2)); + Console.WriteLine($"Client received early raw response: {response}"); + + if (response.Equals(expected, StringComparison.Ordinal)) + throw new InvalidOperationException("The unsafe early-raw scenario unexpectedly succeeded; this should reproduce detach-time data corruption."); + + throw new InvalidOperationException($"Unexpected early raw response. Expected the unsafe scenario to fail, but got '{response}'."); + } + catch (TimeoutException) + { + await WaitForServerHandshakeDetachedAsync(); + Console.WriteLine("Expected unsafe case reproduced: early raw data was consumed by the handshake wrapper, so no valid raw echo was produced."); + } + catch (EndOfStreamException) + { + await WaitForServerHandshakeDetachedAsync(); + Console.WriteLine("Expected unsafe case reproduced: the raw stream ended after detach-time data corruption was detected."); + } + + Console.WriteLine("Demo complete."); + } + + private static async Task RunTcpEarlyRawComparisonDemoAsync(IServiceProvider services) + { + var loggerFactory = services.GetRequiredService(); + using var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + listener.Bind(new IPEndPoint(IPAddress.Loopback, TcpPort)); + listener.Listen(backlog: 1); + + var serverTask = TcpEarlyRawComparison.RunTcpEarlyRawServerAsync(listener, loggerFactory); + + using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await clientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, TcpPort)); + Console.WriteLine("TCP comparison scenario: client connected with Socket + TcpPipeConnection."); + + var handshakeConnection = new TcpPipeConnection(clientSocket, new ConnectionOptions + { + Logger = loggerFactory.CreateLogger("tcp-client-handshake-wrapper"), + ReadAsDemand = false + }); + + var settingsAckReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var readTask = ReadClientHandshakeFramesAsync(handshakeConnection, settingsAckReceived); + + await handshakeConnection.SendAsync(ClientPreface.AsMemory()); + await handshakeConnection.SendAsync(Http2Frame.Write(Http2FrameType.Settings, (byte)Http2FrameFlags.None, 0, []).AsMemory()); + await settingsAckReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + Console.WriteLine("TCP client detaches handshake wrapper before writing raw data."); + await handshakeConnection.DetachAsync(); + await readTask.WaitAsync(TimeSpan.FromSeconds(5)); + + const string message = "tcp-raw-arrived-before-server-detach"; + Console.WriteLine("TCP client writes raw data after client DetachAsync, without waiting for an in-process server detach signal."); + await WriteRawMessageAsync(clientSocket, message); + + var response = await ReadRawMessageAsync(clientSocket, CancellationToken.None).WaitAsync(TimeSpan.FromSeconds(5)); + var expected = "raw-echo:" + message; + Console.WriteLine($"TCP client received raw response: {response}"); + + if (!response.Equals(expected, StringComparison.Ordinal)) + throw new InvalidOperationException($"TCP raw data mismatch. Expected '{expected}', got '{response}'."); + + await serverTask.WaitAsync(TimeSpan.FromSeconds(5)); + Console.WriteLine("TCP comparison succeeded without waiting for an in-process server detach signal."); + Console.WriteLine("Demo complete."); + } + + private static KestrelPipeConnection CreateKestrelConnection(ConnectionContext context, ILoggerFactory loggerFactory, string loggerName) + { + return new DetachableKestrelPipeConnection(context, new ConnectionOptions + { + Logger = loggerFactory.CreateLogger(loggerName), + ReadAsDemand = false + }); + } + + private static async Task RunClientHandshakeAndDetachAsync(ConnectionContext context, IConnection connection) + { + var settingsAckReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var readTask = ReadClientHandshakeFramesAsync(connection, settingsAckReceived); + + try + { + await connection.SendAsync(ClientPreface.AsMemory()); + await connection.SendAsync(Http2Frame.Write(Http2FrameType.Settings, (byte)Http2FrameFlags.None, 0, []).AsMemory()); + Console.WriteLine("Client sent HTTP/2 connection preface and SETTINGS through SuperSocket.Kestrel."); + + await settingsAckReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + } + finally + { + Console.WriteLine("Client handshake complete; detaching SuperSocket connection wrapper..."); + await connection.DetachAsync(); + await readTask.WaitAsync(TimeSpan.FromSeconds(5)); + } + } + + private static async Task ReadClientHandshakeFramesAsync( + IConnection connection, + TaskCompletionSource settingsAckReceived) + { + try + { + await foreach (var frame in connection.RunAsync(new Http2PipelineFilter(expectClientPreface: false))) + { + if (frame.Type == Http2FrameType.Settings) + { + Console.WriteLine(frame.IsSettingsAck + ? "Client received server SETTINGS ACK." + : "Client received server SETTINGS."); + + if (frame.IsSettingsAck) + settingsAckReceived.TrySetResult(); + + continue; + } + } + } + catch (OperationCanceledException) + { + // DetachAsync cancels the reader to stop this wrapper without closing the Kestrel transport. + } + catch (Exception ex) + { + settingsAckReceived.TrySetException(ex); + } + } + + private static async Task VerifyRawDataAfterDetachAsync(ConnectionContext context) + { + var messages = new[] + { + "normal-data-1", + "normal-data-after-detach-2", + "normal-data-with-unicode-汉字-🙂" + }; + + foreach (var message in messages) + { + await WriteRawMessageAsync(context.Transport.Output, message); + var response = await ReadRawMessageAsync(context.Transport.Input, context.ConnectionClosed); + var expected = "raw-echo:" + message; + + Console.WriteLine($"Client received raw response: {response}"); + + if (!response.Equals(expected, StringComparison.Ordinal)) + throw new InvalidOperationException($"Raw data mismatch. Expected '{expected}', got '{response}'."); + } + } + + internal static async Task WriteRawMessageAsync(PipeWriter writer, string message) + { + var payload = Encoding.UTF8.GetBytes(message); + var packet = new byte[sizeof(int) + payload.Length]; + WriteInt32BigEndian(packet.AsSpan(0, sizeof(int)), payload.Length); + payload.CopyTo(packet.AsSpan(sizeof(int))); + await writer.WriteAsync(packet.AsMemory()); + } + + private static async Task WriteRawMessageAsync(Socket socket, string message) + { + var payload = Encoding.UTF8.GetBytes(message); + var packet = new byte[sizeof(int) + payload.Length]; + WriteInt32BigEndian(packet.AsSpan(0, sizeof(int)), payload.Length); + payload.CopyTo(packet.AsSpan(sizeof(int))); + await SendAllAsync(socket, packet, CancellationToken.None); + } + + internal static async Task ReadRawMessageAsync(PipeReader reader, CancellationToken cancellationToken) + { + while (true) + { + var result = await reader.ReadAsync(cancellationToken); + var buffer = result.Buffer; + + try + { + if (TryReadRawMessage(buffer, out var message, out var consumed)) + { + reader.AdvanceTo(consumed); + return message; + } + + if (result.IsCompleted) + throw new EndOfStreamException("The transport ended before a complete raw message was received."); + + reader.AdvanceTo(buffer.Start, buffer.End); + } + catch + { + reader.AdvanceTo(buffer.Start, buffer.End); + throw; + } + } + } + + private static async Task ReadRawMessageAsync(Socket socket, CancellationToken cancellationToken) + { + var lengthBuffer = new byte[sizeof(int)]; + await ReceiveExactAsync(socket, lengthBuffer, cancellationToken); + var length = ReadInt32BigEndian(lengthBuffer); + + if (length is < 0 or > 1024) + throw new ProtocolException($"Invalid raw message length {length}."); + + var payload = new byte[length]; + await ReceiveExactAsync(socket, payload, cancellationToken); + return Encoding.UTF8.GetString(payload); + } + + private static async Task SendAllAsync(Socket socket, ReadOnlyMemory packet, CancellationToken cancellationToken) + { + while (!packet.IsEmpty) + { + var sent = await socket.SendAsync(packet, SocketFlags.None, cancellationToken); + + if (sent <= 0) + throw new IOException("Socket send completed without sending data."); + + packet = packet.Slice(sent); + } + } + + private static async Task ReceiveExactAsync(Socket socket, Memory buffer, CancellationToken cancellationToken) + { + while (!buffer.IsEmpty) + { + var received = await socket.ReceiveAsync(buffer, SocketFlags.None, cancellationToken); + + if (received == 0) + throw new EndOfStreamException("Socket closed before a complete raw message was received."); + + buffer = buffer.Slice(received); + } + } + + private static bool TryReadRawMessage(ReadOnlySequence buffer, out string message, out SequencePosition consumed) + { + var reader = new SequenceReader(buffer); + + if (!reader.TryReadBigEndian(out int length)) + { + message = string.Empty; + consumed = buffer.Start; + return false; + } + + if (length is < 0 or > 1024) + throw new ProtocolException($"Invalid raw message length {length}."); + + if (reader.Remaining < length) + { + message = string.Empty; + consumed = buffer.Start; + return false; + } + + message = Encoding.UTF8.GetString(reader.Sequence.Slice(reader.Position, length)); + reader.Advance(length); + consumed = reader.Position; + return true; + } + + private static void WriteInt32BigEndian(Span destination, int value) + { + destination[0] = (byte)((value >> 24) & 0xff); + destination[1] = (byte)((value >> 16) & 0xff); + destination[2] = (byte)((value >> 8) & 0xff); + destination[3] = (byte)(value & 0xff); + } + + private static int ReadInt32BigEndian(ReadOnlySpan source) + { + return (source[0] << 24) | (source[1] << 16) | (source[2] << 8) | source[3]; + } + + internal static TimeSpan ServerDetachDelay => _serverDetachDelay; + + internal static void NotifyServerHandshakeDetached() + { + _serverHandshakeDetached.TrySetResult(); + } + + private static TaskCompletionSource CreateSignal() + { + return new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + private static void ResetServerHandshakeDetachedSignal() + { + _serverHandshakeDetached = CreateSignal(); + } + + private static async Task WaitForServerHandshakeDetachedAsync() + { + await _serverHandshakeDetached.Task.WaitAsync(TimeSpan.FromSeconds(5)); + } + + internal static void IncrementActiveServerConnections() + { + Interlocked.Increment(ref _activeServerConnections); + } + + internal static void DecrementActiveServerConnections() + { + Interlocked.Decrement(ref _activeServerConnections); + } +} + +internal sealed class DetachableKestrelPipeConnection(ConnectionContext context, ConnectionOptions options) + : KestrelPipeConnection(context, options) +{ + // Demo-only wrapper: after DetachAsync the ASP.NET Core ConnectionContext keeps owning the transport pipes. + protected override ValueTask CompleteReaderAsync(PipeReader reader, bool isDetaching) + { + return ValueTask.CompletedTask; + } + + protected override ValueTask CompleteWriterAsync(PipeWriter writer, bool isDetaching) + { + return ValueTask.CompletedTask; + } +} + +internal sealed class Http2ConnectionHandler(ILoggerFactory loggerFactory) : ConnectionHandler +{ + public override async Task OnConnectedAsync(ConnectionContext context) + { + Program.IncrementActiveServerConnections(); + + var connection = new DetachableKestrelPipeConnection(context, new ConnectionOptions + { + Logger = loggerFactory.CreateLogger(), + ReadAsDemand = false + }); + + try + { + await RunServerHandshakeAndDetachAsync(connection); + await RunRawEchoAsync(context); + } + finally + { + Program.DecrementActiveServerConnections(); + } + } + + private static async Task RunServerHandshakeAndDetachAsync(IConnection connection) + { + var settingsReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var readTask = ReadServerHandshakeFramesAsync(connection, settingsReceived); + + try + { + await settingsReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + if (Program.ServerDetachDelay > TimeSpan.Zero) + await Task.Delay(Program.ServerDetachDelay); + } + finally + { + Console.WriteLine("Server handshake complete; detaching SuperSocket connection wrapper..."); + await connection.DetachAsync(); + await readTask.WaitAsync(TimeSpan.FromSeconds(5)); + Program.NotifyServerHandshakeDetached(); + } + } + + private static async Task ReadServerHandshakeFramesAsync(IConnection connection, TaskCompletionSource settingsReceived) + { + try + { + await foreach (var frame in connection.RunAsync(new Http2PipelineFilter())) + { + Console.WriteLine($"Server received handshake {frame.Type} frame on stream {frame.StreamId}."); + + if (frame.Type == Http2FrameType.Settings && !frame.IsSettingsAck) + { + await connection.SendAsync(Http2Frame.Write(Http2FrameType.Settings, (byte)Http2FrameFlags.None, 0, []).AsMemory()); + await connection.SendAsync(Http2Frame.Write(Http2FrameType.Settings, (byte)Http2FrameFlags.Ack, 0, []).AsMemory()); + settingsReceived.TrySetResult(); + } + } + } + catch (OperationCanceledException) + { + // DetachAsync cancels the SuperSocket reader; raw Kestrel transport remains open. + } + catch (Exception ex) + { + settingsReceived.TrySetException(ex); + } + } + + private static async Task RunRawEchoAsync(ConnectionContext context) + { + while (!context.ConnectionClosed.IsCancellationRequested) + { + string message; + + try + { + message = await Program.ReadRawMessageAsync(context.Transport.Input, context.ConnectionClosed); + } + catch (EndOfStreamException) + { + return; + } + catch (OperationCanceledException) + { + return; + } + catch (ProtocolException ex) + { + Console.WriteLine($"Server detected corrupted raw framing after detach: {ex.Message}"); + return; + } + + Console.WriteLine($"Server received raw data after detach: {message}"); + await Program.WriteRawMessageAsync(context.Transport.Output, "raw-echo:" + message); + } + } +} + +internal static class TcpEarlyRawComparison +{ + public static async Task RunTcpEarlyRawServerAsync(Socket listener, ILoggerFactory loggerFactory) + { + using var serverSocket = await listener.AcceptAsync(); + var connection = new TcpPipeConnection(serverSocket, new ConnectionOptions + { + Logger = loggerFactory.CreateLogger("tcp-server-handshake-wrapper"), + ReadAsDemand = false + }); + + var settingsReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var readTask = ReadServerHandshakeFramesAsync(connection, settingsReceived); + + try + { + await settingsReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + } + finally + { + Console.WriteLine("TCP server detaches handshake wrapper immediately after SETTINGS ACK."); + await connection.DetachAsync(); + await readTask.WaitAsync(TimeSpan.FromSeconds(5)); + } + + var message = await ReadRawMessageAsync(serverSocket, CancellationToken.None).WaitAsync(TimeSpan.FromSeconds(5)); + Console.WriteLine($"TCP server received raw data after detach: {message}"); + await WriteRawMessageAsync(serverSocket, "raw-echo:" + message); + } + + private static async Task WriteRawMessageAsync(Socket socket, string message) + { + var payload = Encoding.UTF8.GetBytes(message); + var packet = new byte[sizeof(int) + payload.Length]; + WriteInt32BigEndian(packet.AsSpan(0, sizeof(int)), payload.Length); + payload.CopyTo(packet.AsSpan(sizeof(int))); + await SendAllAsync(socket, packet, CancellationToken.None); + } + + private static async Task ReadRawMessageAsync(Socket socket, CancellationToken cancellationToken) + { + var lengthBuffer = new byte[sizeof(int)]; + await ReceiveExactAsync(socket, lengthBuffer, cancellationToken); + var length = ReadInt32BigEndian(lengthBuffer); + + if (length is < 0 or > 1024) + throw new ProtocolException($"Invalid raw message length {length}."); + + var payload = new byte[length]; + await ReceiveExactAsync(socket, payload, cancellationToken); + return Encoding.UTF8.GetString(payload); + } + + private static async Task SendAllAsync(Socket socket, ReadOnlyMemory packet, CancellationToken cancellationToken) + { + while (!packet.IsEmpty) + { + var sent = await socket.SendAsync(packet, SocketFlags.None, cancellationToken); + + if (sent <= 0) + throw new IOException("Socket send completed without sending data."); + + packet = packet.Slice(sent); + } + } + + private static async Task ReceiveExactAsync(Socket socket, Memory buffer, CancellationToken cancellationToken) + { + while (!buffer.IsEmpty) + { + var received = await socket.ReceiveAsync(buffer, SocketFlags.None, cancellationToken); + + if (received == 0) + throw new EndOfStreamException("Socket closed before a complete raw message was received."); + + buffer = buffer.Slice(received); + } + } + + private static void WriteInt32BigEndian(Span destination, int value) + { + destination[0] = (byte)((value >> 24) & 0xff); + destination[1] = (byte)((value >> 16) & 0xff); + destination[2] = (byte)((value >> 8) & 0xff); + destination[3] = (byte)(value & 0xff); + } + + private static int ReadInt32BigEndian(ReadOnlySpan source) + { + return (source[0] << 24) | (source[1] << 16) | (source[2] << 8) | source[3]; + } + + private static async Task ReadServerHandshakeFramesAsync(IConnection connection, TaskCompletionSource settingsReceived) + { + try + { + await foreach (var frame in connection.RunAsync(new Http2PipelineFilter())) + { + Console.WriteLine($"TCP server received handshake {frame.Type} frame on stream {frame.StreamId}."); + + if (frame.Type == Http2FrameType.Settings && !frame.IsSettingsAck) + { + await connection.SendAsync(Http2Frame.Write(Http2FrameType.Settings, (byte)Http2FrameFlags.None, 0, []).AsMemory()); + await connection.SendAsync(Http2Frame.Write(Http2FrameType.Settings, (byte)Http2FrameFlags.Ack, 0, []).AsMemory()); + settingsReceived.TrySetResult(); + } + } + } + catch (OperationCanceledException) + { + // DetachAsync cancels the SuperSocket reader; the socket remains open for raw reads. + } + catch (Exception ex) + { + settingsReceived.TrySetException(ex); + } + } +} diff --git a/samples/KestrelHttp2DetachDemo/ServiceCollectionExtensions.cs b/samples/KestrelHttp2DetachDemo/ServiceCollectionExtensions.cs new file mode 100644 index 000000000..85accdfa4 --- /dev/null +++ b/samples/KestrelHttp2DetachDemo/ServiceCollectionExtensions.cs @@ -0,0 +1,23 @@ +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; + +namespace Microsoft.Extensions.DependencyInjection; + +internal static class ServiceCollectionExtensions +{ + private const string SocketConnectionFactoryTypeName = + "Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactory"; + + public static IServiceCollection AddSocketConnectionFactory(this IServiceCollection services) + { + var factoryType = FindSocketConnectionFactory(); + return services.AddSingleton(typeof(IConnectionFactory), factoryType); + } + + private static Type FindSocketConnectionFactory() + { + var assembly = typeof(SocketTransportOptions).Assembly; + var connectionFactoryType = assembly.GetType(SocketConnectionFactoryTypeName); + return connectionFactoryType ?? throw new NotSupportedException(SocketConnectionFactoryTypeName); + } +} diff --git a/samples/samples.sln b/samples/samples.sln index 6096e503e..232bd639a 100644 --- a/samples/samples.sln +++ b/samples/samples.sln @@ -23,6 +23,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LiveChat", "LiveChat\LiveCh EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CustomProtocol", "CustomProtocol\CustomProtocol.csproj", "{9EF45BFC-7479-4658-BA67-06D6C5DF85F4}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KestrelHttp2DetachDemo", "KestrelHttp2DetachDemo\KestrelHttp2DetachDemo.csproj", "{36B17720-B79F-48C3-93C2-96FCA0635FEE}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -32,9 +34,6 @@ Global Release|x64 = Release|x64 Release|x86 = Release|x86 EndGlobalSection - GlobalSection(SolutionProperties) = preSolution - HideSolutionNode = FALSE - EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {B63528FF-96DA-4529-A598-F648C3FB63CB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {B63528FF-96DA-4529-A598-F648C3FB63CB}.Debug|Any CPU.Build.0 = Debug|Any CPU @@ -156,5 +155,20 @@ Global {9EF45BFC-7479-4658-BA67-06D6C5DF85F4}.Release|x64.Build.0 = Release|Any CPU {9EF45BFC-7479-4658-BA67-06D6C5DF85F4}.Release|x86.ActiveCfg = Release|Any CPU {9EF45BFC-7479-4658-BA67-06D6C5DF85F4}.Release|x86.Build.0 = Release|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Debug|x64.ActiveCfg = Debug|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Debug|x64.Build.0 = Debug|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Debug|x86.ActiveCfg = Debug|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Debug|x86.Build.0 = Debug|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Release|Any CPU.Build.0 = Release|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Release|x64.ActiveCfg = Release|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Release|x64.Build.0 = Release|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Release|x86.ActiveCfg = Release|Any CPU + {36B17720-B79F-48C3-93C2-96FCA0635FEE}.Release|x86.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE EndGlobalSection EndGlobal diff --git a/src/SuperSocket.Connection/PipeConnectionBase.cs b/src/SuperSocket.Connection/PipeConnectionBase.cs index 8f27f99a4..9e7994cbd 100644 --- a/src/SuperSocket.Connection/PipeConnectionBase.cs +++ b/src/SuperSocket.Connection/PipeConnectionBase.cs @@ -409,6 +409,16 @@ protected async IAsyncEnumerable ReadPipeAsync(PipeR if (buffer.Length > 0) { BufferFilterResult lastFilterResult = default; + var advanced = false; + + // Cancellation observed after ReadAsync already returned buffered data: + // advance to buffer.Start so unread bytes stay in the pipe for the next + // owner (e.g. a detach handoff) instead of being consumed and dropped here. + if (cancellationToken.IsCancellationRequested) + { + reader.AdvanceTo(buffer.Start, buffer.Start); + break; + } foreach (var bufferFilterResult in ReadBuffer(buffer, pipelineFilter)) { @@ -416,7 +426,18 @@ protected async IAsyncEnumerable ReadPipeAsync(PipeR if (bufferFilterResult.Package != null) { + advanced = true; yield return bufferFilterResult.Package; + + // Consumer may cancel between yields; treat that the same as a + // pipe-level cancellation so the outer loop exits while keeping + // the AdvanceTo performed below for the delivered package. + if (cancellationToken.IsCancellationRequested) + { + completedOrCancelled = true; + } + + break; } if (bufferFilterResult.Exception != null) @@ -431,10 +452,17 @@ protected async IAsyncEnumerable ReadPipeAsync(PipeR pipelineFilter = _pipelineFilter as IPipelineFilter; + // Single advance point for every exit of the loop above. + // Consumed > 0: the filter parsed up to Consumed bytes. When a package was + // delivered (advanced), keep examined at the consumed position so any bytes + // after it are re-read on the next iteration or preserved for the next + // pipeline owner (detach handoff). When no package was delivered, the whole + // buffer has been examined, so examine to End to wait for more data. + // Consumed == 0: nothing consumed; examine the whole buffer, consume nothing. if (lastFilterResult.Consumed > 0) { consumed = buffer.GetPosition(lastFilterResult.Consumed); - reader.AdvanceTo(consumed, buffer.End); + reader.AdvanceTo(consumed, advanced ? consumed : buffer.End); } else { diff --git a/test/SuperSocket.Tests/PipeConnectionBaseTest.cs b/test/SuperSocket.Tests/PipeConnectionBaseTest.cs new file mode 100644 index 000000000..429d42331 --- /dev/null +++ b/test/SuperSocket.Tests/PipeConnectionBaseTest.cs @@ -0,0 +1,88 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging.Abstractions; +using SuperSocket.Connection; +using SuperSocket.ProtoBase; +using Xunit; + +namespace SuperSocket.Tests; + +public class PipeConnectionBaseTest +{ + [Fact] + public async Task TestDetachPreservesBufferedDataAfterCurrentPackage() + { + var cancellationToken = TestContext.Current.CancellationToken; + var connection = new DetachablePipeConnection(); + var receivedPackages = new List(); + var firstPackageReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var continuePackageLoop = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var readTask = Task.Run(async () => + { + await foreach (var package in connection.RunAsync(new LinePipelineFilter())) + { + receivedPackages.Add(package.Text); + + if (receivedPackages.Count == 1) + { + firstPackageReceived.TrySetResult(); + await continuePackageLoop.Task; + } + } + }, cancellationToken); + + await connection.InputWriter.WriteAsync(Encoding.UTF8.GetBytes("READY\r\nRAW\r\n"), cancellationToken); + await firstPackageReceived.Task.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken); + + var detachTask = connection.DetachAsync().AsTask(); + continuePackageLoop.SetResult(); + + await detachTask.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken); + await readTask.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken); + + Assert.Collection(receivedPackages, package => Assert.Equal("READY", package)); + + var result = await connection.RawReader.ReadAsync(cancellationToken).AsTask().WaitAsync(TimeSpan.FromSeconds(5), cancellationToken); + Assert.Equal("RAW\r\n", Encoding.UTF8.GetString(result.Buffer.ToArray())); + connection.RawReader.AdvanceTo(result.Buffer.End); + } + + private sealed class DetachablePipeConnection : PipeConnectionBase + { + private readonly Pipe _input; + + public DetachablePipeConnection() + : this(new Pipe(), new Pipe()) + { + } + + private DetachablePipeConnection(Pipe input, Pipe output) + : base(input.Reader, output.Writer, new ConnectionOptions { Logger = NullLogger.Instance }) + { + _input = input; + } + + public PipeWriter InputWriter => _input.Writer; + + public PipeReader RawReader => _input.Reader; + + protected override void Close() + { + } + + protected override ValueTask CompleteReaderAsync(PipeReader reader, bool isDetaching) + { + return ValueTask.CompletedTask; + } + + protected override ValueTask CompleteWriterAsync(PipeWriter writer, bool isDetaching) + { + return ValueTask.CompletedTask; + } + } +}