Skip to content

Commit 38427f8

Browse files
committed
fix(quic): check RemoteEndPoint for connection migration instead of LocalEndPoint
1 parent 7647bdb commit 38427f8

15 files changed

Lines changed: 107 additions & 54 deletions

src/Servus.Akka.Tests/Transport/Quic/Client/QuicConnectionLeaseSpec.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ private QuicConnectionHandle CreateTestHandle() =>
1010
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
1111
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
1212
getLocalEndPoint: () => null,
13+
getRemoteEndPoint: () => null,
1314
dispose: () => ValueTask.CompletedTask);
1415

1516
[Fact(Timeout = 5000)]
@@ -223,6 +224,7 @@ public async Task DisposeAsync_should_dispose_handle()
223224
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
224225
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
225226
getLocalEndPoint: () => null,
227+
getRemoteEndPoint: () => null,
226228
dispose: () =>
227229
{
228230
disposeCalled = true;
@@ -248,6 +250,7 @@ public async Task DisposeAsync_should_be_idempotent()
248250
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
249251
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
250252
getLocalEndPoint: () => null,
253+
getRemoteEndPoint: () => null,
251254
dispose: () =>
252255
{
253256
disposeCount++;

src/Servus.Akka.Tests/Transport/Quic/Client/QuicConnectionMigrationSpec.cs

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,20 @@ public void Dispatch_MigrationDetected_should_push_ConnectionMigrationDetected()
4646

4747
[Fact(Timeout = 5000)]
4848
[Trait("RFC", "RFC9000-9")]
49-
public void CheckForConnectionMigration_should_detect_endpoint_change()
49+
public void CheckForConnectionMigration_should_detect_remote_endpoint_change_on_timer()
5050
{
5151
var ops = new StubOps();
5252
var sm = new QuicTransportStateMachine(ops, ActorRefs.Nobody, ActorRefs.Nobody);
5353

5454
var initialEp = new IPEndPoint(IPAddress.Parse("10.0.0.1"), 12345);
5555
var changedEp = new IPEndPoint(IPAddress.Parse("10.0.0.2"), 54321);
56-
var currentEp = initialEp;
56+
var currentRemoteEp = initialEp;
5757

5858
var handle = new QuicConnectionHandle(
5959
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
6060
acceptInboundStream: async ct => { await Task.Delay(Timeout.Infinite, ct); return null; },
61-
getLocalEndPoint: () => currentEp,
61+
getLocalEndPoint: () => new IPEndPoint(IPAddress.Loopback, 9999),
62+
getRemoteEndPoint: () => currentRemoteEp,
6263
dispose: () => ValueTask.CompletedTask);
6364

6465
var lease = new QuicConnectionLease(handle, 100);
@@ -68,39 +69,54 @@ public void CheckForConnectionMigration_should_detect_endpoint_change()
6869

6970
ops.PushedInbound.Clear();
7071

71-
var buf1 = TransportBuffer.Rent(4);
72-
buf1.Length = 4;
73-
sm.Dispatch(new InboundData(buf1, 0, 2));
72+
currentRemoteEp = changedEp;
73+
sm.OnTimer("migration-check");
7474

75-
var data1 = ops.PushedInbound.OfType<MultiplexedData>().FirstOrDefault();
76-
Assert.NotNull(data1);
77-
data1.Buffer.Dispose();
75+
Assert.Contains(ops.PushedInbound, i => i is ConnectionMigrationDetected);
76+
}
77+
78+
[Fact(Timeout = 5000)]
79+
[Trait("RFC", "RFC9000-9")]
80+
public void CheckForConnectionMigration_should_not_detect_when_remote_endpoint_unchanged()
81+
{
82+
var ops = new StubOps();
83+
var sm = new QuicTransportStateMachine(ops, ActorRefs.Nobody, ActorRefs.Nobody);
84+
85+
var stableEp = new IPEndPoint(IPAddress.Parse("10.0.0.1"), 12345);
86+
87+
var handle = new QuicConnectionHandle(
88+
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
89+
acceptInboundStream: async ct => { await Task.Delay(Timeout.Infinite, ct); return null; },
90+
getLocalEndPoint: () => new IPEndPoint(IPAddress.Loopback, 9999),
91+
getRemoteEndPoint: () => stableEp,
92+
dispose: () => ValueTask.CompletedTask);
93+
94+
var lease = new QuicConnectionLease(handle, 100);
95+
96+
sm.HandlePush(new ConnectTransport(new QuicTransportOptions { Host = "example.com", Port = 443 }));
97+
sm.Dispatch(new ConnectionLeaseAcquired(lease));
7898

7999
ops.PushedInbound.Clear();
80-
currentEp = changedEp;
81100

82-
var buf2 = TransportBuffer.Rent(4);
83-
buf2.Length = 4;
84-
sm.Dispatch(new InboundData(buf2, 0, 2));
101+
sm.OnTimer("migration-check");
85102

86-
var data2 = ops.PushedInbound.OfType<MultiplexedData>().FirstOrDefault();
87-
Assert.NotNull(data2);
88-
data2.Buffer.Dispose();
103+
Assert.DoesNotContain(ops.PushedInbound, i => i is ConnectionMigrationDetected);
89104
}
90105

91106
[Fact(Timeout = 5000)]
92107
[Trait("RFC", "RFC9000-9")]
93-
public void CheckForConnectionMigration_should_not_detect_when_endpoint_unchanged()
108+
public void InboundData_should_not_trigger_migration_check()
94109
{
95110
var ops = new StubOps();
96111
var sm = new QuicTransportStateMachine(ops, ActorRefs.Nobody, ActorRefs.Nobody);
97112

98-
var stableEp = new IPEndPoint(IPAddress.Parse("10.0.0.1"), 12345);
113+
var changedEp = new IPEndPoint(IPAddress.Parse("10.0.0.2"), 54321);
99114

100115
var handle = new QuicConnectionHandle(
101116
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
102117
acceptInboundStream: async ct => { await Task.Delay(Timeout.Infinite, ct); return null; },
103-
getLocalEndPoint: () => stableEp,
118+
getLocalEndPoint: () => new IPEndPoint(IPAddress.Loopback, 9999),
119+
getRemoteEndPoint: () => changedEp,
104120
dispose: () => ValueTask.CompletedTask);
105121

106122
var lease = new QuicConnectionLease(handle, 100);
@@ -110,26 +126,41 @@ public void CheckForConnectionMigration_should_not_detect_when_endpoint_unchange
110126

111127
ops.PushedInbound.Clear();
112128

113-
var buf1 = TransportBuffer.Rent(4);
114-
buf1.Length = 4;
115-
sm.Dispatch(new InboundData(buf1, 0, 2));
129+
var buf = TransportBuffer.Rent(4);
130+
buf.Length = 4;
131+
sm.Dispatch(new InboundData(buf, 0, 2));
116132

117133
Assert.DoesNotContain(ops.PushedInbound, i => i is ConnectionMigrationDetected);
118134

119135
var data = ops.PushedInbound.OfType<MultiplexedData>().FirstOrDefault();
120136
Assert.NotNull(data);
121137
data.Buffer.Dispose();
138+
}
122139

123-
ops.PushedInbound.Clear();
140+
[Fact(Timeout = 5000)]
141+
[Trait("RFC", "RFC9000-9")]
142+
public void Timer_should_reschedule_after_migration_check()
143+
{
144+
var ops = new StubOps();
145+
var sm = new QuicTransportStateMachine(ops, ActorRefs.Nobody, ActorRefs.Nobody);
124146

125-
var buf2 = TransportBuffer.Rent(4);
126-
buf2.Length = 4;
127-
sm.Dispatch(new InboundData(buf2, 0, 2));
147+
var stableEp = new IPEndPoint(IPAddress.Parse("10.0.0.1"), 12345);
128148

129-
Assert.DoesNotContain(ops.PushedInbound, i => i is ConnectionMigrationDetected);
149+
var handle = new QuicConnectionHandle(
150+
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
151+
acceptInboundStream: async ct => { await Task.Delay(Timeout.Infinite, ct); return null; },
152+
getLocalEndPoint: () => new IPEndPoint(IPAddress.Loopback, 9999),
153+
getRemoteEndPoint: () => stableEp,
154+
dispose: () => ValueTask.CompletedTask);
155+
156+
var lease = new QuicConnectionLease(handle, 100);
157+
158+
sm.HandlePush(new ConnectTransport(new QuicTransportOptions { Host = "example.com", Port = 443 }));
159+
sm.Dispatch(new ConnectionLeaseAcquired(lease));
160+
161+
ops.Timers.Clear();
162+
sm.OnTimer("migration-check");
130163

131-
var data2 = ops.PushedInbound.OfType<MultiplexedData>().FirstOrDefault();
132-
Assert.NotNull(data2);
133-
data2.Buffer.Dispose();
164+
Assert.True(ops.Timers.ContainsKey("migration-check"));
134165
}
135-
}
166+
}

src/Servus.Akka.Tests/Transport/Quic/Client/QuicTransportStateMachineSpec.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ private static QuicConnectionHandle CreateMockHandle()
2222
return null;
2323
},
2424
getLocalEndPoint: () => new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 12345),
25+
getRemoteEndPoint: () => null,
2526
dispose: () => ValueTask.CompletedTask);
2627
}
2728

src/Servus.Akka.Tests/Transport/Quic/Listener/QuicServerConnectionStageSpec.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public void QuicServerConnectionStage_should_have_flow_shape()
1515
openStream: (_, _) => Task.FromResult<(Stream, long)>((Stream.Null, 1)),
1616
acceptInboundStream: async ct => { await Task.Delay(Timeout.Infinite, ct); return null; },
1717
getLocalEndPoint: () => new IPEndPoint(IPAddress.Loopback, 5000),
18+
getRemoteEndPoint: () => null,
1819
dispose: () => default);
1920

2021
var connectionInfo = new ConnectionInfo(
@@ -36,6 +37,7 @@ public void QuicServerConnectionStage_shape_should_have_correct_port_names()
3637
openStream: (_, _) => Task.FromResult<(Stream, long)>((Stream.Null, 1)),
3738
acceptInboundStream: async ct => { await Task.Delay(Timeout.Infinite, ct); return null; },
3839
getLocalEndPoint: () => new IPEndPoint(IPAddress.Loopback, 5000),
40+
getRemoteEndPoint: () => null,
3941
dispose: () => default);
4042

4143
var connectionInfo = new ConnectionInfo(

src/Servus.Akka.Tests/Transport/Quic/Listener/QuicServerStateMachineSpec.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ private static QuicConnectionHandle CreateTestHandle()
2525
return null;
2626
},
2727
getLocalEndPoint: () => new IPEndPoint(IPAddress.Loopback, 5000),
28+
getRemoteEndPoint: () => null,
2829
dispose: () => default);
2930
}
3031

src/Servus.Akka.Tests/Transport/Quic/QuicConnectionHandleSpec.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public async Task OpenStreamAsync_should_delegate_to_factory()
2222
},
2323
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
2424
getLocalEndPoint: () => null,
25+
getRemoteEndPoint: () => null,
2526
dispose: () => ValueTask.CompletedTask);
2627

2728
var result = await handle.OpenStreamAsync(StreamDirection.Bidirectional, TestContext.Current.CancellationToken);
@@ -43,6 +44,7 @@ public async Task OpenStreamAsync_should_pass_direction_correctly()
4344
},
4445
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
4546
getLocalEndPoint: () => null,
47+
getRemoteEndPoint: () => null,
4648
dispose: () => ValueTask.CompletedTask);
4749

4850
await handle.OpenStreamAsync(StreamDirection.Bidirectional, TestContext.Current.CancellationToken);
@@ -67,6 +69,7 @@ public async Task OpenStreamAsync_should_pass_cancellation_token()
6769
},
6870
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
6971
getLocalEndPoint: () => null,
72+
getRemoteEndPoint: () => null,
7073
dispose: () => ValueTask.CompletedTask);
7174

7275
await handle.OpenStreamAsync(StreamDirection.Bidirectional, cts.Token);
@@ -82,6 +85,7 @@ public async Task AcceptInboundStreamAsync_should_return_null_when_no_streams()
8285
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
8386
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
8487
getLocalEndPoint: () => null,
88+
getRemoteEndPoint: () => null,
8589
dispose: () => ValueTask.CompletedTask);
8690

8791
var result = await handle.AcceptInboundStreamAsync(TestContext.Current.CancellationToken);
@@ -100,6 +104,7 @@ public async Task AcceptInboundStreamAsync_should_return_stream_when_available()
100104
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(
101105
(expectedStream, expectedStreamId)),
102106
getLocalEndPoint: () => null,
107+
getRemoteEndPoint: () => null,
103108
dispose: () => ValueTask.CompletedTask);
104109

105110
var result = await handle.AcceptInboundStreamAsync(TestContext.Current.CancellationToken);
@@ -123,6 +128,7 @@ public async Task AcceptInboundStreamAsync_should_pass_cancellation_token()
123128
return Task.FromResult<(Stream, long)?>(null);
124129
},
125130
getLocalEndPoint: () => null,
131+
getRemoteEndPoint: () => null,
126132
dispose: () => ValueTask.CompletedTask);
127133

128134
await handle.AcceptInboundStreamAsync(cts.Token);
@@ -145,6 +151,7 @@ public void LocalEndPoint_should_delegate_to_factory()
145151
getLocalEndPointCalled = true;
146152
return endPoint;
147153
},
154+
getRemoteEndPoint: () => null,
148155
dispose: () => ValueTask.CompletedTask);
149156

150157
var result = handle.LocalEndPoint();
@@ -160,6 +167,7 @@ public void LocalEndPoint_should_return_null_when_unavailable()
160167
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
161168
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
162169
getLocalEndPoint: () => null,
170+
getRemoteEndPoint: () => null,
163171
dispose: () => ValueTask.CompletedTask);
164172

165173
var result = handle.LocalEndPoint();
@@ -176,6 +184,7 @@ public async Task DisposeAsync_should_delegate_to_factory()
176184
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
177185
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
178186
getLocalEndPoint: () => null,
187+
getRemoteEndPoint: () => null,
179188
dispose: () =>
180189
{
181190
disposeCalled = true;
@@ -196,6 +205,7 @@ public async Task DisposeAsync_should_complete_successfully()
196205
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
197206
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
198207
getLocalEndPoint: () => null,
208+
getRemoteEndPoint: () => null,
199209
dispose: () => ValueTask.CompletedTask);
200210

201211
// Should not throw

src/Servus.Akka.Tests/Transport/Quic/QuicTransportEventSpec.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ private QuicConnectionHandle CreateTestConnectionHandle() =>
1313
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
1414
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
1515
getLocalEndPoint: () => null,
16+
getRemoteEndPoint: () => null,
1617
dispose: () => ValueTask.CompletedTask);
1718

1819
[Fact(Timeout = 5000)]

src/Servus.Akka.Tests/Utils/InMemoryQuicConnectionFactory.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ private static QuicConnectionHandle CreateMockHandle()
2727
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
2828
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
2929
getLocalEndPoint: () => null,
30+
getRemoteEndPoint: () => null,
3031
dispose: () => ValueTask.CompletedTask);
3132
}
3233
}

src/Servus.Akka.Tests/Utils/SlowTcpConnectionFactory.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public async Task<QuicConnectionLease> EstablishAsync(QuicTransportOptions optio
3030
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
3131
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
3232
getLocalEndPoint: () => null,
33+
getRemoteEndPoint: () => null,
3334
dispose: () => ValueTask.CompletedTask);
3435
return new QuicConnectionLease(handle, options.MaxBidirectionalStreams);
3536
}
@@ -60,6 +61,7 @@ public Task<QuicConnectionLease> EstablishAsync(QuicTransportOptions options, Ca
6061
openStream: (_, _) => Task.FromResult((Stream: (Stream)new MemoryStream(), StreamId: 0L)),
6162
acceptInboundStream: _ => Task.FromResult<(Stream, long)?>(null),
6263
getLocalEndPoint: () => null,
64+
getRemoteEndPoint: () => null,
6365
dispose: () => ValueTask.CompletedTask);
6466
return Task.FromResult(new QuicConnectionLease(handle, options.MaxBidirectionalStreams));
6567
}

src/Servus.Akka/Transport/Quic/Client/QuicClientProvider.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public QuicClientProvider(QuicTransportOptions options)
1616
}
1717

1818
public EndPoint? LocalEndPoint => _connection?.LocalEndPoint;
19+
public EndPoint? RemoteEndPoint => _connection?.RemoteEndPoint;
1920

2021
public async Task<Stream> GetStreamAsync(CancellationToken ct = default)
2122
{

0 commit comments

Comments
 (0)