Skip to content

Commit ea5ca54

Browse files
committed
Adds Akka.NET TCP client support
1 parent 810f609 commit ea5ca54

6 files changed

Lines changed: 334 additions & 1 deletion

File tree

src/TurboHttp.Tests/HpackTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Text;
21
using TurboHttp.Protocol;
32

43
namespace TurboHttp.Tests;
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Akka.Actor;
6+
7+
namespace Servus.Akka.IO;
8+
9+
public sealed record CloseConnection
10+
{
11+
public static readonly CloseConnection Instance = new();
12+
}
13+
14+
internal static class TcpClientByteMover
15+
{
16+
internal static async Task MoveStreamToPipe(TcpClientState state, IActorRef runner, CancellationToken ct)
17+
{
18+
Exception? pipeError = null;
19+
try
20+
{
21+
while (!ct.IsCancellationRequested)
22+
{
23+
try
24+
{
25+
var bytesRead = await state.Stream.ReadAsync(state.GetWriteMemory(), ct).ConfigureAwait(false);
26+
if (bytesRead == 0)
27+
{
28+
runner.Tell(CloseConnection.Instance);
29+
return;
30+
}
31+
32+
state.Pipe.Writer.Advance(bytesRead);
33+
}
34+
catch (OperationCanceledException)
35+
{
36+
// no need to log here
37+
return;
38+
}
39+
catch (Exception ex)
40+
{
41+
pipeError = ex;
42+
runner.Tell(CloseConnection.Instance);
43+
return;
44+
}
45+
46+
// make data available to PipeReader
47+
var result = await state.Pipe.Writer.FlushAsync(ct);
48+
if (result.IsCompleted)
49+
{
50+
return;
51+
}
52+
}
53+
}
54+
finally
55+
{
56+
// Always complete the pipe writer on any exit path so that ReadFromPipeAsync
57+
// can detect writer completion via result.IsCompleted rather than depending
58+
// solely on CancellationToken callback timing. Without this, ReadFromPipeAsync
59+
// can stall indefinitely on a loaded CI system if the cancellation callback
60+
// dispatch is delayed by thread pool pressure.
61+
await state.Pipe.Writer.CompleteAsync(pipeError).ConfigureAwait(false);
62+
}
63+
}
64+
65+
internal static async Task MovePipeToChannel(TcpClientState state, IActorRef runner, CancellationToken ct)
66+
{
67+
while (!ct.IsCancellationRequested)
68+
{
69+
try
70+
{
71+
var result = await state.Pipe.Reader.ReadAsync(ct);
72+
if (result.IsCanceled)
73+
{
74+
// PipeReader.ReadAsync can return with IsCanceled=true when the token is
75+
// cancelled rather than throwing OperationCanceledException. In that case
76+
// the buffer is empty and we must not write a zero-length entry into
77+
// _readsFromTransport. Advance past the empty buffer and exit cleanly.
78+
state.Pipe.Reader.AdvanceTo(result.Buffer.Start);
79+
runner.Tell(CloseConnection.Instance);
80+
return;
81+
}
82+
83+
// consume this entire sequence by copying it into a pooled buffer
84+
var buffer = result.Buffer;
85+
var length = (int) buffer.Length;
86+
if (length > 0)
87+
{
88+
var pooled = MemoryPool<byte>.Shared.Rent(length);
89+
buffer.CopyTo(pooled.Memory.Span);
90+
state.InboundWriter.TryWrite((pooled, length));
91+
}
92+
93+
// tell the pipe we're done with this data
94+
state.Pipe.Reader.AdvanceTo(buffer.End);
95+
96+
if (result.IsCompleted)
97+
{
98+
runner.Tell(CloseConnection.Instance);
99+
return;
100+
}
101+
}
102+
catch (OperationCanceledException)
103+
{
104+
runner.Tell(CloseConnection.Instance);
105+
return;
106+
}
107+
catch (Exception)
108+
{
109+
// PipeWriter was completed with an exception (e.g. socket IOException propagated
110+
// through DoWriteToPipeAsync). The faulted pipe surfaces as an exception here
111+
// rather than as result.IsCompleted, so we must handle it explicitly to ensure
112+
// ReadFinished is always self-told and BackgroundTasksCompleted can fire.
113+
runner.Tell(CloseConnection.Instance);
114+
return;
115+
}
116+
}
117+
}
118+
119+
internal static async Task MoveChannelToStream(TcpClientState state, IActorRef runner, CancellationToken ct)
120+
{
121+
while (!state.OutboundReader.Completion.IsCompleted)
122+
{
123+
try
124+
{
125+
while (await state.OutboundReader.WaitToReadAsync(ct).ConfigureAwait(false))
126+
while (state.OutboundReader.TryRead(out var item))
127+
{
128+
var (buffer, readableBytes) = item;
129+
try
130+
{
131+
var workingBuffer = buffer.Memory;
132+
while (readableBytes > 0 && state.Stream is not null)
133+
{
134+
var slice = workingBuffer[..readableBytes];
135+
await state.Stream.WriteAsync(slice, ct).ConfigureAwait(false);
136+
readableBytes = 0;
137+
}
138+
}
139+
finally
140+
{
141+
// free the pooled buffer
142+
buffer.Dispose();
143+
}
144+
}
145+
}
146+
catch (OperationCanceledException)
147+
{
148+
// we're being shut down
149+
return;
150+
}
151+
catch (Exception ex)
152+
{
153+
return;
154+
}
155+
}
156+
157+
state.OutboundWriter.TryComplete(); // can't write anymore either
158+
}
159+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using System.Buffers;
2+
using System.Net;
3+
using System.Net.Sockets;
4+
using System.Threading;
5+
using System.Threading.Channels;
6+
using Akka.Actor;
7+
8+
namespace Servus.Akka.IO;
9+
10+
public class TcpClientRunner : ReceiveActor
11+
{
12+
private readonly TcpClient _client;
13+
private readonly IActorRef _handler;
14+
private readonly TcpClientState _state;
15+
private readonly CancellationTokenSource _cts = new();
16+
private readonly IActorRef _selfClosure;
17+
18+
public record TcpClientConnected(
19+
EndPoint RemoteEndPoint,
20+
ChannelReader<(IMemoryOwner<byte> buffer, int readableBytes)> InboundReader,
21+
ChannelWriter<(IMemoryOwner<byte> buffer, int readableBytes)> OutboundWriter);
22+
23+
public record TcpDisconnected(EndPoint RemoteEndPoint);
24+
25+
public TcpClientRunner(TcpClient client, int maxFrameSize, IActorRef handler,
26+
Channel<(IMemoryOwner<byte> buffer, int readableBytes)>? inboundChannel = null,
27+
Channel<(IMemoryOwner<byte> buffer, int readableBytes)>? outboundChannel = null)
28+
{
29+
_client = client;
30+
_handler = handler;
31+
32+
_state = new TcpClientState(maxFrameSize, _client.GetStream(), inboundChannel, outboundChannel);
33+
34+
_selfClosure = Context.Self;
35+
36+
Receive<CloseConnection>(_ =>
37+
{
38+
_cts.Cancel();
39+
_handler.Tell(new TcpDisconnected(_client.Client.RemoteEndPoint!));
40+
Context.Self.Tell(PoisonPill.Instance);
41+
});
42+
}
43+
44+
protected override void PreStart()
45+
{
46+
base.PreStart();
47+
48+
_handler.Tell(new TcpClientConnected(_client.Client.RemoteEndPoint!, _state.InboundReader,
49+
_state.OutboundWriter));
50+
51+
_ = TcpClientByteMover.MoveStreamToPipe(_state, _selfClosure, _cts.Token);
52+
_ = TcpClientByteMover.MovePipeToChannel(_state, _selfClosure, _cts.Token);
53+
_ = TcpClientByteMover.MoveChannelToStream(_state, _selfClosure, _cts.Token);
54+
}
55+
}

src/TurboHttp/IO/TcpClientState.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using System;
2+
using System.Buffers;
3+
using System.IO.Pipelines;
4+
using System.Net.Sockets;
5+
using System.Threading.Channels;
6+
7+
namespace Servus.Akka.IO;
8+
9+
internal sealed class TcpClientState
10+
{
11+
public int MaxFrameSize { get; }
12+
public NetworkStream Stream { get; }
13+
14+
private readonly Channel<(IMemoryOwner<byte> buffer, int readableBytes)> _inboundChannel;
15+
private readonly Channel<(IMemoryOwner<byte> buffer, int readableBytes)> _outboundChannel;
16+
17+
public ChannelReader<(IMemoryOwner<byte> buffer, int readableBytes)> OutboundReader => _outboundChannel.Reader;
18+
public ChannelWriter<(IMemoryOwner<byte> buffer, int readableBytes)> OutboundWriter => _outboundChannel.Writer;
19+
20+
public ChannelReader<(IMemoryOwner<byte> buffer, int readableBytes)> InboundReader => _inboundChannel.Reader;
21+
public ChannelWriter<(IMemoryOwner<byte> buffer, int readableBytes)> InboundWriter => _inboundChannel.Writer;
22+
public Pipe Pipe { get; }
23+
24+
public TcpClientState(int maxFrameSize,
25+
NetworkStream stream,
26+
Channel<(IMemoryOwner<byte> buffer, int readableBytes)>? inboundChannel,
27+
Channel<(IMemoryOwner<byte> buffer, int readableBytes)>? outboundChannel)
28+
{
29+
_inboundChannel = inboundChannel ?? Channel.CreateUnbounded<(IMemoryOwner<byte> buffer, int readableBytes)>();
30+
_outboundChannel = outboundChannel ?? Channel.CreateUnbounded<(IMemoryOwner<byte> buffer, int readableBytes)>();
31+
32+
MaxFrameSize = maxFrameSize;
33+
Stream = stream;
34+
Pipe = new Pipe(new PipeOptions(pauseWriterThreshold: GetBufferSize(),
35+
resumeWriterThreshold: GetBufferSize() / 2,
36+
useSynchronizationContext: false));
37+
}
38+
39+
40+
public Memory<byte> GetWriteMemory() => Pipe.Writer.GetMemory(MaxFrameSize / 4);
41+
42+
private int GetBufferSize()
43+
{
44+
return MaxFrameSize switch
45+
{
46+
// if the max frame size is under 128kb, scale it up to 512kb
47+
<= 128 * 1024 => 512 * 1024,
48+
// between 128kb and 1mb, scale it up to 2mb
49+
<= 1024 * 1024 => 2 * 1024 * 1024,
50+
// if the max frame size is above 1mb, 2x it
51+
_ => MaxFrameSize * 2
52+
};
53+
}
54+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Net.Sockets;
4+
using Akka.Actor;
5+
6+
namespace Servus.Akka.IO;
7+
8+
public sealed class TcpConnectionManagerActor : ReceiveActor
9+
{
10+
public sealed record OpenConnection(string Host, int Port, IActorRef Handler, int MaxFrameSize = 65536);
11+
12+
public sealed record ConnectionReady(IActorRef Runner, string Host, int Port);
13+
14+
public sealed record ConnectionFailed(string Host, int Port, Exception Reason);
15+
16+
private readonly Dictionary<IActorRef, (string Host, int Port)> _runners = new();
17+
private int _connCounter;
18+
19+
public TcpConnectionManagerActor()
20+
{
21+
Receive<OpenConnection>(HandleOpenConnection);
22+
Receive<TcpClientRunner.TcpDisconnected>(HandleDisconnected);
23+
}
24+
25+
private void HandleOpenConnection(OpenConnection msg)
26+
{
27+
var caller = Sender;
28+
try
29+
{
30+
var client = new TcpClient();
31+
client.Connect(msg.Host, msg.Port);
32+
33+
var connId = ++_connCounter;
34+
var runner = Context.ResolveChildActor<TcpClientRunner>(
35+
$"tcp-runner-{msg.Host.Replace(".", "-")}-{msg.Port}-{connId}", client, msg.Handler, msg.MaxFrameSize);
36+
_runners[runner] = (msg.Host, msg.Port);
37+
38+
caller.Tell(new ConnectionReady(runner, msg.Host, msg.Port));
39+
}
40+
catch (Exception ex)
41+
{
42+
caller.Tell(new ConnectionFailed(msg.Host, msg.Port, ex));
43+
}
44+
}
45+
46+
private void HandleDisconnected(TcpClientRunner.TcpDisconnected msg)
47+
{
48+
_runners.Remove(Sender, out _);
49+
}
50+
51+
protected override SupervisorStrategy SupervisorStrategy() =>
52+
new OneForOneStrategy(
53+
maxNrOfRetries: 3,
54+
withinTimeRange: TimeSpan.FromSeconds(30),
55+
decider: new Restart()
56+
);
57+
}
58+
59+
public class Restart : IDecider
60+
{
61+
public Directive Decide(Exception cause)
62+
{
63+
return Directive.Restart;
64+
}
65+
}

src/TurboHttp/TurboHttp.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
<ItemGroup>
1111
<PackageReference Include="Akka.Streams" Version="1.5.60" />
12+
<PackageReference Include="Servus.Akka" Version="0.3.10" />
1213
</ItemGroup>
1314

1415
</Project>

0 commit comments

Comments
 (0)