Skip to content

Commit d59c2c6

Browse files
committed
refactor(transport): introduce StreamTarget and extract ServerStreamResolver
1 parent 15dbf1a commit d59c2c6

17 files changed

Lines changed: 479 additions & 522 deletions

src/Servus.Akka.TestKit.Tests/TestConnectionStageBuilderExtensionsSpec.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public async Task OnData_should_invoke_handler_on_TransportData()
2424

2525
var stage = new TestConnectionStageBuilder()
2626
.AutoConnect()
27-
.OnData((data, ctx) =>
27+
.OnData((_, ctx) =>
2828
{
2929
handlerInvoked = true;
3030
ctx.Push(new TransportData(new byte[] { 0xFF }));
@@ -89,7 +89,7 @@ public async Task OnOpenStream_should_invoke_handler_on_OpenStream()
8989
Assert.True(handlerInvoked, "OnOpenStream handler should have been invoked");
9090
Assert.IsType<TransportConnected>(inbound[0]);
9191
var opened = Assert.IsType<StreamOpened>(inbound[1]);
92-
Assert.Equal(42L, opened.StreamId);
92+
Assert.Equal(42L, opened.Id.Value);
9393
}
9494

9595
[Fact(Timeout = 5000)]
@@ -105,7 +105,7 @@ public async Task OnMultiplexedData_should_invoke_handler_on_MultiplexedData()
105105

106106
var stage = new TestConnectionStageBuilder()
107107
.AutoConnect()
108-
.OnMultiplexedData((data, ctx) =>
108+
.OnMultiplexedData((_, _) =>
109109
{
110110
handlerInvoked.TrySetResult();
111111
})
@@ -169,7 +169,7 @@ public async Task AutoStreamOpened_should_respond_with_StreamOpened_for_matching
169169

170170
var stage = new TestConnectionStageBuilder()
171171
.AutoConnect()
172-
.AutoStreamOpened(42, StreamDirection.Bidirectional)
172+
.AutoStreamOpened(42)
173173
.Build();
174174

175175
_ = Source.From<ITransportOutbound>([
@@ -190,7 +190,7 @@ public async Task AutoStreamOpened_should_respond_with_StreamOpened_for_matching
190190

191191
Assert.IsType<TransportConnected>(inbound[0]);
192192
var opened = Assert.IsType<StreamOpened>(inbound[1]);
193-
Assert.Equal(42L, opened.StreamId);
193+
Assert.Equal(42L, (long)opened.Id);
194194
Assert.Equal(StreamDirection.Bidirectional, opened.Direction);
195195
}
196196

@@ -270,7 +270,7 @@ public async Task EchoMultiplexedData_should_echo_back_data()
270270

271271
Assert.IsType<TransportConnected>(inbound[0]);
272272
var echo = Assert.IsType<MultiplexedData>(inbound[1]);
273-
Assert.Equal(7L, echo.StreamId);
273+
Assert.Equal(7L, (long)echo.StreamId);
274274
Assert.Equal(3, echo.Buffer.Length);
275275
Assert.Equal(0x11, echo.Buffer.Span[0]);
276276
Assert.Equal(0x22, echo.Buffer.Span[1]);
@@ -287,7 +287,7 @@ public async Task OnCompleteWrites_should_invoke_handler_on_CompleteWrites()
287287

288288
var stage = new TestConnectionStageBuilder()
289289
.AutoConnect()
290-
.OnCompleteWrites((complete, ctx) =>
290+
.OnCompleteWrites((_, ctx) =>
291291
{
292292
handlerInvoked = true;
293293
ctx.Push(new TransportDisconnected(DisconnectReason.Graceful));
@@ -335,7 +335,7 @@ public async Task OnResetStream_should_invoke_handler_on_ResetStream()
335335

336336
_ = Source.From<ITransportOutbound>([
337337
new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }),
338-
new ResetStream(99, 0)
338+
new ResetStream(99)
339339
])
340340
.Via(stage.AsFlow())
341341
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
@@ -352,7 +352,7 @@ public async Task OnResetStream_should_invoke_handler_on_ResetStream()
352352
Assert.True(handlerInvoked, "OnResetStream handler should have been invoked");
353353
Assert.IsType<TransportConnected>(inbound[0]);
354354
var closed = Assert.IsType<StreamClosed>(inbound[1]);
355-
Assert.Equal(99L, closed.StreamId);
355+
Assert.Equal(99L, (long)closed.Id);
356356
Assert.Equal(DisconnectReason.Error, closed.Reason);
357357
}
358358
}

src/Servus.Akka.TestKit.Tests/TestConnectionStageExtensionsSpec.cs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public async Task PushData_bytes_should_deliver_TransportData_inbound()
2525
.AutoConnect()
2626
.Build();
2727

28-
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }))
28+
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions
29+
{ Host = "localhost", Port = 80 }))
2930
.Via(stage.AsFlow())
3031
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
3132
{
@@ -57,7 +58,8 @@ public async Task PushData_string_should_deliver_TransportData_inbound()
5758
.AutoConnect()
5859
.Build();
5960

60-
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }))
61+
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions
62+
{ Host = "localhost", Port = 80 }))
6163
.Via(stage.AsFlow())
6264
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
6365
{
@@ -88,7 +90,8 @@ public async Task PushStreamOpened_should_deliver_StreamOpened_inbound()
8890
.AutoConnect()
8991
.Build();
9092

91-
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }))
93+
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions
94+
{ Host = "localhost", Port = 80 }))
9295
.Via(stage.AsFlow())
9396
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
9497
{
@@ -100,13 +103,13 @@ public async Task PushStreamOpened_should_deliver_StreamOpened_inbound()
100103
}), _materializer);
101104

102105
await stage.WaitForOutbound(ct);
103-
stage.PushStreamOpened(42, StreamDirection.Bidirectional);
106+
stage.PushStreamOpened(42);
104107

105108
await tcs.Task.WaitAsync(ct);
106109

107110
Assert.IsType<TransportConnected>(inbound[0]);
108111
var opened = Assert.IsType<StreamOpened>(inbound[1]);
109-
Assert.Equal(42L, opened.StreamId);
112+
Assert.Equal(42L, (long)opened.Id);
110113
Assert.Equal(StreamDirection.Bidirectional, opened.Direction);
111114
}
112115

@@ -121,7 +124,8 @@ public async Task PushMultiplexedData_should_deliver_MultiplexedData_inbound()
121124
.AutoConnect()
122125
.Build();
123126

124-
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }))
127+
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions
128+
{ Host = "localhost", Port = 80 }))
125129
.Via(stage.AsFlow())
126130
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
127131
{
@@ -139,7 +143,7 @@ public async Task PushMultiplexedData_should_deliver_MultiplexedData_inbound()
139143

140144
Assert.IsType<TransportConnected>(inbound[0]);
141145
var mux = Assert.IsType<MultiplexedData>(inbound[1]);
142-
Assert.Equal(7L, mux.StreamId);
146+
Assert.Equal(7L, (long)mux.StreamId);
143147
Assert.Equal(2, mux.Buffer.Length);
144148
}
145149

@@ -154,7 +158,8 @@ public async Task SimulateInboundStream_should_push_full_lifecycle()
154158
.AutoConnect()
155159
.Build();
156160

157-
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }))
161+
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions
162+
{ Host = "localhost", Port = 80 }))
158163
.Via(stage.AsFlow())
159164
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
160165
{
@@ -172,7 +177,7 @@ public async Task SimulateInboundStream_should_push_full_lifecycle()
172177

173178
Assert.IsType<TransportConnected>(inbound[0]);
174179
var accepted = Assert.IsType<ServerStreamAccepted>(inbound[1]);
175-
Assert.Equal(5L, accepted.StreamId);
180+
Assert.Equal(5L, (long)accepted.Id);
176181
Assert.Equal(StreamDirection.Unidirectional, accepted.Direction);
177182
Assert.IsType<MultiplexedData>(inbound[2]);
178183
Assert.IsType<MultiplexedData>(inbound[3]);
@@ -190,7 +195,8 @@ public async Task PushDisconnected_should_push_TransportDisconnected()
190195
.AutoConnect()
191196
.Build();
192197

193-
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }))
198+
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions
199+
{ Host = "localhost", Port = 80 }))
194200
.Via(stage.AsFlow())
195201
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
196202
{
@@ -246,7 +252,7 @@ public async Task WaitForOpenStreamAsync_should_skip_non_open_messages()
246252
.RunWith(Sink.Ignore<ITransportInbound>().MapMaterializedValue(_ => NotUsed.Instance), _materializer);
247253

248254
var open = await stage.WaitForOpenStreamAsync(ct);
249-
Assert.Equal(1L, open.StreamId);
255+
Assert.Equal(1L, (long)open.StreamId);
250256
Assert.Equal(StreamDirection.Bidirectional, open.Direction);
251257
}
252258

@@ -261,7 +267,8 @@ public async Task PushStreamClosed_should_deliver_StreamClosed_inbound()
261267
.AutoConnect()
262268
.Build();
263269

264-
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }))
270+
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions
271+
{ Host = "localhost", Port = 80 }))
265272
.Via(stage.AsFlow())
266273
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
267274
{
@@ -279,7 +286,7 @@ public async Task PushStreamClosed_should_deliver_StreamClosed_inbound()
279286

280287
Assert.IsType<TransportConnected>(inbound[0]);
281288
var closed = Assert.IsType<StreamClosed>(inbound[1]);
282-
Assert.Equal(99L, closed.StreamId);
289+
Assert.Equal(99L, (long)closed.Id);
283290
Assert.Equal(DisconnectReason.Error, closed.Reason);
284291
}
285292

@@ -294,7 +301,8 @@ public async Task PushConnectionMigration_should_deliver_ConnectionMigrationDete
294301
.AutoConnect()
295302
.Build();
296303

297-
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }))
304+
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions
305+
{ Host = "localhost", Port = 80 }))
298306
.Via(stage.AsFlow())
299307
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
300308
{
@@ -335,7 +343,7 @@ public async Task WaitForMultiplexedDataAsync_should_skip_non_multiplexed_messag
335343
.RunWith(Sink.Ignore<ITransportInbound>().MapMaterializedValue(_ => NotUsed.Instance), _materializer);
336344

337345
var mux = await stage.WaitForMultiplexedDataAsync(ct);
338-
Assert.Equal(1L, mux.StreamId);
346+
Assert.Equal(1L, (long)mux.StreamId);
339347
}
340348

341349
[Fact(Timeout = 5000)]
@@ -349,7 +357,8 @@ public async Task PushStreamReadCompleted_should_deliver_StreamReadCompleted_inb
349357
.AutoConnect()
350358
.Build();
351359

352-
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }))
360+
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions
361+
{ Host = "localhost", Port = 80 }))
353362
.Via(stage.AsFlow())
354363
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
355364
{
@@ -367,7 +376,7 @@ public async Task PushStreamReadCompleted_should_deliver_StreamReadCompleted_inb
367376

368377
Assert.IsType<TransportConnected>(inbound[0]);
369378
var completed = Assert.IsType<StreamReadCompleted>(inbound[1]);
370-
Assert.Equal(42L, completed.StreamId);
379+
Assert.Equal(42L, (long)completed.Id);
371380
}
372381

373382
[Fact(Timeout = 5000)]
@@ -381,7 +390,8 @@ public async Task PushStreamClosed_with_error_reason_should_deliver_error()
381390
.AutoConnect()
382391
.Build();
383392

384-
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }))
393+
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions
394+
{ Host = "localhost", Port = 80 }))
385395
.Via(stage.AsFlow())
386396
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
387397
{
@@ -399,7 +409,7 @@ public async Task PushStreamClosed_with_error_reason_should_deliver_error()
399409

400410
Assert.IsType<TransportConnected>(inbound[0]);
401411
var closed = Assert.IsType<StreamClosed>(inbound[1]);
402-
Assert.Equal(55L, closed.StreamId);
412+
Assert.Equal(55L, (long)closed.Id);
403413
Assert.Equal(DisconnectReason.Error, closed.Reason);
404414
}
405415

@@ -414,7 +424,8 @@ public async Task PushDisconnected_default_reason_should_be_graceful()
414424
.AutoConnect()
415425
.Build();
416426

417-
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions { Host = "localhost", Port = 80 }))
427+
_ = Source.Single<ITransportOutbound>(new ConnectTransport(new TcpTransportOptions
428+
{ Host = "localhost", Port = 80 }))
418429
.Via(stage.AsFlow())
419430
.RunWith(Sink.ForEach<ITransportInbound>(msg =>
420431
{
@@ -434,4 +445,4 @@ public async Task PushDisconnected_default_reason_should_be_graceful()
434445
var disconnected = Assert.IsType<TransportDisconnected>(inbound[1]);
435446
Assert.Equal(DisconnectReason.Graceful, disconnected.Reason);
436447
}
437-
}
448+
}

src/Servus.Akka.Tests/Transport/MultiplexedMessagesSpec.cs

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public void OpenStream_should_carry_stream_id_and_direction()
1717
{
1818
var msg = new OpenStream(7, StreamDirection.Unidirectional);
1919

20-
Assert.Equal(7, msg.StreamId);
20+
Assert.Equal(new StreamTarget(7), msg.StreamId);
2121
Assert.Equal(StreamDirection.Unidirectional, msg.Direction);
2222
}
2323

@@ -34,7 +34,7 @@ public void CloseStream_should_carry_stream_id()
3434
{
3535
var msg = new CloseStream(55);
3636

37-
Assert.Equal(55, msg.StreamId);
37+
Assert.Equal(new StreamTarget(55), msg.StreamId);
3838
}
3939

4040
[Fact(Timeout = 5000)]
@@ -50,7 +50,7 @@ public void StreamOpened_should_carry_stream_id_and_direction()
5050
{
5151
var msg = new StreamOpened(3, StreamDirection.Unidirectional);
5252

53-
Assert.Equal(3, msg.StreamId);
53+
Assert.Equal(new StreamTarget(3), msg.Id);
5454
Assert.Equal(StreamDirection.Unidirectional, msg.Direction);
5555
}
5656

@@ -67,41 +67,24 @@ public void StreamClosed_should_carry_stream_id_and_reason()
6767
{
6868
var msg = new StreamClosed(22, DisconnectReason.Error);
6969

70-
Assert.Equal(22, msg.StreamId);
70+
Assert.Equal(new StreamTarget(22), msg.Id);
7171
Assert.Equal(DisconnectReason.Error, msg.Reason);
7272
}
7373

74-
[Fact(Timeout = 5000)]
75-
public void InboundStreamAccepted_should_implement_ITransportInbound()
76-
{
77-
ITransportInbound msg = new InboundStreamAccepted(5, 0x00);
78-
79-
Assert.IsType<InboundStreamAccepted>(msg);
80-
}
81-
82-
[Fact(Timeout = 5000)]
83-
public void InboundStreamAccepted_should_carry_stream_id_and_type()
84-
{
85-
var msg = new InboundStreamAccepted(8, 0x01);
86-
87-
Assert.Equal(8, msg.StreamId);
88-
Assert.Equal(0x01, msg.StreamType);
89-
}
90-
9174
[Fact(Timeout = 5000)]
9275
public void CompleteWrites_should_implement_ITransportOutbound()
9376
{
9477
ITransportOutbound msg = new CompleteWrites(42);
9578
var cw = Assert.IsType<CompleteWrites>(msg);
96-
Assert.Equal(42, cw.StreamId);
79+
Assert.Equal(new StreamTarget(42), cw.StreamId);
9780
}
9881

9982
[Fact(Timeout = 5000)]
10083
public void ResetStream_should_implement_ITransportOutbound()
10184
{
10285
ITransportOutbound msg = new ResetStream(7, 0x0104);
10386
var rs = Assert.IsType<ResetStream>(msg);
104-
Assert.Equal(7, rs.StreamId);
87+
Assert.Equal(new StreamTarget(7), rs.StreamId);
10588
Assert.Equal(0x0104, rs.ErrorCode);
10689
}
10790

@@ -110,7 +93,7 @@ public void ServerStreamAccepted_should_implement_ITransportInbound()
11093
{
11194
ITransportInbound msg = new ServerStreamAccepted(3, StreamDirection.Unidirectional);
11295
var ssa = Assert.IsType<ServerStreamAccepted>(msg);
113-
Assert.Equal(3, ssa.StreamId);
96+
Assert.Equal(new StreamTarget(3), ssa.Id);
11497
Assert.Equal(StreamDirection.Unidirectional, ssa.Direction);
11598
}
11699

@@ -119,6 +102,6 @@ public void StreamReadCompleted_should_implement_ITransportInbound()
119102
{
120103
ITransportInbound msg = new StreamReadCompleted(0);
121104
var src = Assert.IsType<StreamReadCompleted>(msg);
122-
Assert.Equal(0, src.StreamId);
105+
Assert.Equal(new StreamTarget(0), src.Id);
123106
}
124107
}

0 commit comments

Comments
 (0)