Skip to content

Commit 1b01747

Browse files
committed
feat(streams): add NegotiatingServerEngine
1 parent befd348 commit 1b01747

6 files changed

Lines changed: 91 additions & 7 deletions

File tree

src/Servus.Akka/Transport/Tcp/Listener/TcpListenerStage.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,22 @@ private async Task InitializeConnectionAsync(TcpClient client)
167167
var localEndPoint = client.Client.LocalEndPoint!;
168168
var remoteEndPoint = client.Client.RemoteEndPoint!;
169169

170+
SecurityInfo? security = null;
171+
var protocol = TransportProtocol.Tcp;
172+
173+
if (stream is SslStream sslStream)
174+
{
175+
security = new SecurityInfo(
176+
sslStream.SslProtocol,
177+
sslStream.NegotiatedApplicationProtocol);
178+
protocol = TransportProtocol.Tls;
179+
}
180+
170181
var connectionInfo = new ConnectionInfo(
171182
localEndPoint,
172183
remoteEndPoint,
173-
TransportProtocol.Tcp);
184+
protocol,
185+
security);
174186

175187
var connectionFlow = Flow.FromGraph(
176188
new TcpServerConnectionStage(stream, connectionInfo));

src/TurboHTTP.Tests/Streams/ProtocolRouterSpec.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,13 @@ public void ResolveEngine_should_return_http11_for_unknown_version()
7979
Assert.NotNull(engine);
8080
Assert.IsType<Http11ServerEngine>(engine);
8181
}
82+
83+
[Fact(Timeout = 5000)]
84+
public void ResolveNegotiating_should_return_negotiating_engine()
85+
{
86+
var engine = ProtocolRouter.ResolveNegotiating(DefaultOptions);
87+
88+
Assert.NotNull(engine);
89+
Assert.IsType<NegotiatingServerEngine>(engine);
90+
}
8291
}

src/TurboHTTP/Streams/Lifecycle/ListenerActor.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,7 @@ private IServerProtocolEngine ResolveEngineForListener()
182182
return ProtocolRouter.ResolveEngine(new Version(3, 0), _serverOptions);
183183
}
184184

185-
if (_listenerOptions is TcpListenerOptions { ApplicationProtocols: [var preferred, ..] })
186-
{
187-
return ProtocolRouter.ResolveEngine(preferred, _serverOptions);
188-
}
189-
190-
return ProtocolRouter.ResolveEngine(new Version(1, 1), _serverOptions);
185+
return ProtocolRouter.ResolveNegotiating(_serverOptions);
191186
}
192187

193188
public static Props Create(
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using Akka;
2+
using Akka.Streams;
3+
using Akka.Streams.Dsl;
4+
using Servus.Akka.Transport;
5+
using TurboHTTP.Server;
6+
using TurboHTTP.Streams.Stages.Server;
7+
8+
namespace TurboHTTP.Streams;
9+
10+
internal sealed class NegotiatingServerEngine : IServerProtocolEngine
11+
{
12+
private readonly TurboServerOptions _options;
13+
14+
public NegotiatingServerEngine(TurboServerOptions options)
15+
{
16+
_options = options;
17+
}
18+
19+
public BidiFlow<ITransportInbound, HttpRequestMessage, HttpResponseMessage, ITransportOutbound, NotUsed> CreateFlow()
20+
{
21+
return BidiFlow.FromGraph(GraphDsl.Create(b =>
22+
{
23+
var connection = b.Add(new ProtocolNegotiatorConnectionStage(_options));
24+
25+
return new BidiShape<
26+
ITransportInbound,
27+
HttpRequestMessage,
28+
HttpResponseMessage,
29+
ITransportOutbound>(
30+
connection.InNetwork,
31+
connection.OutRequest,
32+
connection.InResponse,
33+
connection.OutNetwork);
34+
}));
35+
}
36+
}

src/TurboHTTP/Streams/ProtocolRouter.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,9 @@ internal static IServerProtocolEngine ResolveEngine(Version version, TurboServer
2323
_ => new Http11ServerEngine(options)
2424
};
2525
}
26+
27+
internal static IServerProtocolEngine ResolveNegotiating(TurboServerOptions options)
28+
{
29+
return new NegotiatingServerEngine(options);
30+
}
2631
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using Akka.Streams;
2+
using Akka.Streams.Stage;
3+
using Servus.Akka.Transport;
4+
using TurboHTTP.Protocol;
5+
using TurboHTTP.Server;
6+
7+
namespace TurboHTTP.Streams.Stages.Server;
8+
9+
internal sealed class ProtocolNegotiatorConnectionStage : GraphStage<ServerConnectionShape>
10+
{
11+
private readonly Inlet<ITransportInbound> _inNetwork = new("NegotiatorConnection.In.Network");
12+
private readonly Outlet<HttpRequestMessage> _outRequest = new("NegotiatorConnection.Out.Request");
13+
private readonly Inlet<HttpResponseMessage> _inResponse = new("NegotiatorConnection.In.Response");
14+
private readonly Outlet<ITransportOutbound> _outNetwork = new("NegotiatorConnection.Out.Network");
15+
private readonly TurboServerOptions _options;
16+
17+
public ProtocolNegotiatorConnectionStage(TurboServerOptions options)
18+
{
19+
_options = options;
20+
}
21+
22+
public override ServerConnectionShape Shape => new(_inNetwork, _outRequest, _inResponse, _outNetwork);
23+
24+
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
25+
=> new HttpConnectionServerStageLogic<ProtocolNegotiatingStateMachine>(this,
26+
ops => new ProtocolNegotiatingStateMachine(_options, ops));
27+
}

0 commit comments

Comments
 (0)