Skip to content

Commit 7921d01

Browse files
committed
refactored h3 and quic
1 parent da2e86b commit 7921d01

23 files changed

Lines changed: 600 additions & 274 deletions

src/Servus.Akka.Tests/Transport/MultiplexedMessagesSpec.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,38 @@ public void InboundStreamAccepted_should_carry_stream_id_and_type()
8787
Assert.Equal(8, msg.StreamId);
8888
Assert.Equal(0x01, msg.StreamType);
8989
}
90+
91+
[Fact(Timeout = 5000)]
92+
public void CompleteWrites_should_implement_ITransportOutbound()
93+
{
94+
ITransportOutbound msg = new CompleteWrites(42);
95+
var cw = Assert.IsType<CompleteWrites>(msg);
96+
Assert.Equal(42, cw.StreamId);
97+
}
98+
99+
[Fact(Timeout = 5000)]
100+
public void ResetStream_should_implement_ITransportOutbound()
101+
{
102+
ITransportOutbound msg = new ResetStream(7, 0x0104);
103+
var rs = Assert.IsType<ResetStream>(msg);
104+
Assert.Equal(7, rs.StreamId);
105+
Assert.Equal(0x0104, rs.ErrorCode);
106+
}
107+
108+
[Fact(Timeout = 5000)]
109+
public void ServerStreamAccepted_should_implement_ITransportInbound()
110+
{
111+
ITransportInbound msg = new ServerStreamAccepted(3, StreamDirection.Unidirectional);
112+
var ssa = Assert.IsType<ServerStreamAccepted>(msg);
113+
Assert.Equal(3, ssa.StreamId);
114+
Assert.Equal(StreamDirection.Unidirectional, ssa.Direction);
115+
}
116+
117+
[Fact(Timeout = 5000)]
118+
public void StreamReadCompleted_should_implement_ITransportInbound()
119+
{
120+
ITransportInbound msg = new StreamReadCompleted(0);
121+
var src = Assert.IsType<StreamReadCompleted>(msg);
122+
Assert.Equal(0, src.StreamId);
123+
}
90124
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
using Servus.Akka.Transport;
2+
using Servus.Akka.Transport.Quic;
3+
4+
namespace Servus.Akka.Tests.Transport.Quic;
5+
6+
[Collection("TransportBuffer")]
7+
public sealed class QuicStreamStateSpec
8+
{
9+
[Fact(Timeout = 5000)]
10+
public void New_state_should_be_Opening()
11+
{
12+
var state = new QuicStreamState(StreamDirection.Bidirectional);
13+
Assert.Equal(StreamPhase.Opening, state.Phase);
14+
Assert.False(state.HasHandle);
15+
}
16+
17+
[Fact(Timeout = 5000)]
18+
public void Write_in_Opening_should_buffer()
19+
{
20+
var state = new QuicStreamState(StreamDirection.Bidirectional);
21+
var buf = TransportBuffer.Rent(2);
22+
buf.FullMemory.Span[0] = 0x01;
23+
buf.FullMemory.Span[1] = 0x02;
24+
buf.Length = 2;
25+
26+
state.Write(buf);
27+
28+
Assert.Equal(StreamPhase.Opening, state.Phase);
29+
Assert.Equal(1, state.PendingWriteCount);
30+
}
31+
32+
[Fact(Timeout = 5000)]
33+
public void CompleteWrites_in_Opening_should_defer()
34+
{
35+
var state = new QuicStreamState(StreamDirection.Bidirectional);
36+
state.CompleteWrites();
37+
38+
Assert.Equal(StreamPhase.Opening, state.Phase);
39+
Assert.True(state.IsCompleteWritesDeferred);
40+
}
41+
42+
[Fact(Timeout = 5000)]
43+
public void AttachHandle_should_transition_to_Active()
44+
{
45+
var state = new QuicStreamState(StreamDirection.Bidirectional);
46+
var handle = new StreamHandle(new MemoryStream());
47+
48+
state.AttachHandle(handle);
49+
50+
Assert.Equal(StreamPhase.Active, state.Phase);
51+
Assert.True(state.HasHandle);
52+
}
53+
54+
[Fact(Timeout = 5000)]
55+
public void AttachHandle_should_flush_pending_writes()
56+
{
57+
var state = new QuicStreamState(StreamDirection.Bidirectional);
58+
var buf = TransportBuffer.Rent(2);
59+
buf.FullMemory.Span[0] = 0x01;
60+
buf.FullMemory.Span[1] = 0x02;
61+
buf.Length = 2;
62+
state.Write(buf);
63+
64+
var handle = new StreamHandle(new MemoryStream());
65+
state.AttachHandle(handle);
66+
67+
Assert.Equal(0, state.PendingWriteCount);
68+
}
69+
70+
[Fact(Timeout = 5000)]
71+
public void AttachHandle_with_deferred_CompleteWrites_should_transition_to_HalfClosedWrite()
72+
{
73+
var state = new QuicStreamState(StreamDirection.Bidirectional);
74+
state.CompleteWrites();
75+
76+
state.AttachHandle(new StreamHandle(new MemoryStream()));
77+
78+
Assert.Equal(StreamPhase.HalfClosedWrite, state.Phase);
79+
}
80+
81+
[Fact(Timeout = 5000)]
82+
public void CompleteWrites_in_Active_should_transition_to_HalfClosedWrite()
83+
{
84+
var state = new QuicStreamState(StreamDirection.Bidirectional);
85+
state.AttachHandle(new StreamHandle(new MemoryStream()));
86+
87+
state.CompleteWrites();
88+
89+
Assert.Equal(StreamPhase.HalfClosedWrite, state.Phase);
90+
}
91+
92+
[Fact(Timeout = 5000)]
93+
public void OnReadCompleted_in_HalfClosedWrite_should_transition_to_Closed()
94+
{
95+
var state = new QuicStreamState(StreamDirection.Bidirectional);
96+
state.AttachHandle(new StreamHandle(new MemoryStream()));
97+
state.CompleteWrites();
98+
99+
state.OnReadCompleted();
100+
101+
Assert.Equal(StreamPhase.Closed, state.Phase);
102+
}
103+
104+
[Fact(Timeout = 5000)]
105+
public void OnReadCompleted_in_Active_should_transition_to_HalfClosedRead()
106+
{
107+
var state = new QuicStreamState(StreamDirection.Bidirectional);
108+
state.AttachHandle(new StreamHandle(new MemoryStream()));
109+
110+
state.OnReadCompleted();
111+
112+
Assert.Equal(StreamPhase.HalfClosedRead, state.Phase);
113+
}
114+
115+
[Fact(Timeout = 5000)]
116+
public void Abort_should_transition_to_Closed()
117+
{
118+
var state = new QuicStreamState(StreamDirection.Bidirectional);
119+
state.AttachHandle(new StreamHandle(new MemoryStream()));
120+
121+
state.Abort(0);
122+
123+
Assert.Equal(StreamPhase.Closed, state.Phase);
124+
}
125+
}

src/Servus.Akka.Tests/Transport/Quic/QuicTransportStateMachineSpec.cs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void HandlePush_ConnectTransport_should_schedule_connect_timeout()
3838
}
3939

4040
[Fact(Timeout = 5000)]
41-
public void HandlePush_OpenStream_should_enqueue_when_not_connected()
41+
public void HandlePush_OpenStream_should_reject_when_not_connected()
4242
{
4343
var ops = new StubOps();
4444
var sm = new QuicTransportStateMachine(ops, ActorRefs.Nobody, ActorRefs.Nobody);
@@ -72,4 +72,26 @@ public void HandlePush_MultiplexedData_should_signal_pull_when_no_stream()
7272

7373
Assert.True(ops.PullCount > 0);
7474
}
75+
76+
[Fact(Timeout = 5000)]
77+
public void HandlePush_CompleteWrites_should_signal_pull_when_no_stream()
78+
{
79+
var ops = new StubOps();
80+
var sm = new QuicTransportStateMachine(ops, ActorRefs.Nobody, ActorRefs.Nobody);
81+
82+
sm.HandlePush(new CompleteWrites(99));
83+
84+
Assert.True(ops.PullCount > 0);
85+
}
86+
87+
[Fact(Timeout = 5000)]
88+
public void HandlePush_ResetStream_should_signal_pull_when_no_stream()
89+
{
90+
var ops = new StubOps();
91+
var sm = new QuicTransportStateMachine(ops, ActorRefs.Nobody, ActorRefs.Nobody);
92+
93+
sm.HandlePush(new ResetStream(99));
94+
95+
Assert.True(ops.PullCount > 0);
96+
}
7597
}

src/Servus.Akka.Tests/Transport/Quic/StreamContextSpec.cs

Lines changed: 0 additions & 52 deletions
This file was deleted.

src/Servus.Akka.Tests/Transport/Quic/StreamHandleSpec.cs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@ namespace Servus.Akka.Tests.Transport.Quic;
77
public sealed class StreamHandleSpec
88
{
99
[Fact(Timeout = 5000)]
10-
public async Task WriteAsync_should_write_buffer_to_stream()
10+
public void WriteAsync_should_write_buffer_to_stream()
1111
{
1212
var ms = new MemoryStream();
13-
var handle = new StreamHandle(ms, null);
13+
var handle = new StreamHandle(ms);
1414

1515
var buffer = TransportBuffer.Rent(16);
1616
buffer.FullMemory.Span[0] = 0xAA;
1717
buffer.FullMemory.Span[1] = 0xBB;
1818
buffer.Length = 2;
1919

20-
await handle.WriteAsync(buffer);
20+
handle.Write(buffer);
2121

2222
Assert.Equal(2, ms.Position);
2323
Assert.Equal(0xAA, ms.GetBuffer()[0]);
@@ -27,8 +27,8 @@ public async Task WriteAsync_should_write_buffer_to_stream()
2727
[Fact(Timeout = 5000)]
2828
public async Task ReadAsync_should_read_from_stream()
2929
{
30-
var ms = new MemoryStream(new byte[] { 0x01, 0x02, 0x03 });
31-
var handle = new StreamHandle(ms, null);
30+
var ms = new MemoryStream([0x01, 0x02, 0x03]);
31+
var handle = new StreamHandle(ms);
3232

3333
var buf = new byte[16];
3434
var read = await handle.ReadAsync(buf, CancellationToken.None);
@@ -38,20 +38,9 @@ public async Task ReadAsync_should_read_from_stream()
3838
}
3939

4040
[Fact(Timeout = 5000)]
41-
public void CompleteWrites_should_invoke_callback()
41+
public void CompleteWrites_should_not_throw()
4242
{
43-
var called = false;
44-
var handle = new StreamHandle(Stream.Null, () => called = true);
45-
46-
handle.CompleteWrites();
47-
48-
Assert.True(called);
49-
}
50-
51-
[Fact(Timeout = 5000)]
52-
public void CompleteWrites_should_not_throw_when_no_callback()
53-
{
54-
var handle = new StreamHandle(Stream.Null, null);
43+
var handle = new StreamHandle(Stream.Null);
5544
handle.CompleteWrites();
5645
}
57-
}
46+
}

src/Servus.Akka.Tests/Transport/Tcp/ClientByteMoverSpec.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public async Task ClientByteMover_should_write_data_to_inbound_channel()
2828
await ClientByteMover.MoveStreamToChannel(state, () => { }, cts.Token);
2929

3030
Assert.True(state.InboundReader.TryRead(out var buf));
31-
Assert.Equal(2, buf!.Length);
31+
Assert.Equal(2, buf.Length);
3232
Assert.Equal(0xAB, buf.Span[0]);
3333
Assert.Equal(0xCD, buf.Span[1]);
3434
buf.Dispose();

src/Servus.Akka.Tests/Transport/Tcp/ConnectionHandleSpec.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public void Write_should_send_buffer_to_outbound_channel()
2626
handle.Write(buf);
2727

2828
Assert.True(outbound.Reader.TryRead(out var received));
29-
Assert.Equal(0xAA, received!.Span[0]);
29+
Assert.Equal(0xAA, received.Span[0]);
3030
received.Dispose();
3131
cts.Dispose();
3232
}

src/Servus.Akka/Transport/ITransportInbound.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ public sealed record StreamOpened(long StreamId, StreamDirection Direction) : IT
1212

1313
public sealed record StreamClosed(long StreamId, DisconnectReason Reason) : ITransportInbound;
1414

15+
public sealed record StreamReadCompleted(long StreamId) : ITransportInbound;
16+
17+
public sealed record ServerStreamAccepted(long StreamId, StreamDirection Direction) : ITransportInbound;
18+
1519
public sealed record InboundStreamAccepted(long StreamId, long StreamType) : ITransportInbound;
1620

1721
public sealed record DataRejected(TransportBuffer Buffer) : ITransportInbound;

src/Servus.Akka/Transport/ITransportOutbound.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ public sealed record OpenStream(long StreamId, StreamDirection Direction) : ITra
1010

1111
public sealed record CloseStream(long StreamId) : ITransportOutbound;
1212

13+
public sealed record CompleteWrites(long StreamId) : ITransportOutbound;
14+
15+
public sealed record ResetStream(long StreamId, long ErrorCode = 0) : ITransportOutbound;
16+
1317
public sealed record TransportData(TransportBuffer Buffer) : ITransportOutbound, ITransportInbound;
1418

1519
public sealed record MultiplexedData(TransportBuffer Buffer, long StreamId) : ITransportOutbound, ITransportInbound;

src/Servus.Akka/Transport/Quic/QuicConnectionLease.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ namespace Servus.Akka.Transport.Quic;
33
public sealed class QuicConnectionLease : IAsyncDisposable
44
{
55
private readonly long _createdTicks = Environment.TickCount64;
6+
private readonly int _maxConcurrentStreams;
67
private bool _alive = true;
7-
private int _maxConcurrentStreams;
88

99
public QuicConnectionLease(QuicConnectionHandle handle, int maxConcurrentStreams)
1010
{

0 commit comments

Comments
 (0)