Skip to content

Commit 2d62179

Browse files
committed
fix: minor fixes
better release
1 parent b9dc6b4 commit 2d62179

5 files changed

Lines changed: 108 additions & 14 deletions

File tree

src/Servus.Akka/Transport/Quic/Client/QuicTransportStateMachine.cs

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,15 @@ internal void Dispatch(IQuicTransportEvent evt)
7171

7272
break;
7373
case InboundPumpFailed e:
74-
OnInboundComplete(DisconnectReason.Error, e.StreamId);
74+
if (IsConnectionLevelError(e.Error))
75+
{
76+
HandleConnectionFailure(DisconnectReason.Error);
77+
}
78+
else
79+
{
80+
OnInboundComplete(DisconnectReason.Error, e.StreamId);
81+
}
82+
7583
break;
7684
case OutboundWriteDone:
7785
_ops.OnSignalPullOutbound();
@@ -313,29 +321,30 @@ private void OnAcquisitionFailed(Exception ex)
313321
_ops.OnCancelTimer(ConnectTimerKey);
314322
Tracing.For("Connection").Warning(this, "QUIC acquisition failed: {0}", ex.Message);
315323

316-
if (_pendingConnect is null)
324+
if (_pendingConnect is not null)
317325
{
326+
_pendingConnect = null;
327+
_ops.OnPushInbound(new TransportDisconnected(DisconnectReason.Error));
328+
_ops.OnSignalPullOutbound();
318329
return;
319330
}
320331

321-
_pendingConnect = null;
322-
_ops.OnPushInbound(new TransportDisconnected(DisconnectReason.Error));
323-
_ops.OnSignalPullOutbound();
332+
HandleConnectionFailure(DisconnectReason.Error);
324333
}
325334

326335
private void HandleConnectionFailure(DisconnectReason reason)
327336
{
328337
Tracing.For("Connection").Debug(this, "QUIC disconnected: {0}", reason);
329-
foreach (var (streamId, state) in _streams)
330-
{
331-
_ops.OnPushInbound(new StreamClosed(streamId, reason));
332-
_ = state.DisposeAsync();
333-
}
334-
335-
_streams.Clear();
336338

337339
if (_autoReconnect && !_upstreamFinished)
338340
{
341+
foreach (var (_, state) in _streams)
342+
{
343+
_ = state.DisposeAsync();
344+
}
345+
346+
_streams.Clear();
347+
339348
_ops.OnPushInbound(new TransportDisconnected(DisconnectReason.Transient));
340349
_isReconnecting = true;
341350
_pumpManager?.StopAll();
@@ -346,6 +355,14 @@ private void HandleConnectionFailure(DisconnectReason reason)
346355
return;
347356
}
348357

358+
foreach (var (streamId, state) in _streams)
359+
{
360+
_ops.OnPushInbound(new StreamClosed(streamId, reason));
361+
_ = state.DisposeAsync();
362+
}
363+
364+
_streams.Clear();
365+
349366
_ops.OnPushInbound(new TransportDisconnected(reason));
350367
_pumpManager?.StopAll();
351368
ReturnConnectionToPool(false);
@@ -439,6 +456,18 @@ private void CleanupTransport()
439456
_connectionHandle = null;
440457
_connectionLease = null;
441458
}
459+
private static bool IsConnectionLevelError(Exception ex)
460+
{
461+
if (ex is System.Net.Quic.QuicException qe)
462+
{
463+
return qe.QuicError is System.Net.Quic.QuicError.ConnectionAborted
464+
or System.Net.Quic.QuicError.ConnectionIdle
465+
or System.Net.Quic.QuicError.ConnectionRefused
466+
or System.Net.Quic.QuicError.ConnectionTimeout;
467+
}
468+
469+
return ex is ObjectDisposedException;
470+
}
442471
}
443472

444473
#pragma warning restore CA1416

src/TurboHTTP/Internal/OptionsFactory.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ internal static TransportOptions Build(RequestEndpoint endpoint, TurboClientOpti
3939
AllowConnectionMigration = clientOptions.Http3.AllowConnectionMigration,
4040
AllowEarlyData = clientOptions.Http3.AllowEarlyData,
4141
ApplicationProtocols = alpn,
42-
AutoReconnect = true
42+
AutoReconnect = true,
43+
ConnectionLifetime = clientOptions.PooledConnectionLifetime
4344
};
4445
}
4546

src/TurboHTTP/Protocol/Http3/StateMachine.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ internal sealed class StateMachine : IDisposable
6969
/// <summary>Whether there are in-flight requests awaiting responses.</summary>
7070
public bool HasInFlightRequests => _streamManager.HasInFlightRequests;
7171

72+
/// <summary>
73+
/// Fails an in-flight request on the given stream due to a transport error.
74+
/// Returns true if a correlated request was found and failed.
75+
/// </summary>
76+
public bool FailInflightRequest(long streamId, Exception exception) =>
77+
_streamManager.FailInflightRequest(streamId, exception);
78+
7279
/// <summary>The current connection endpoint.</summary>
7380
public RequestEndpoint Endpoint { get; private set; }
7481

@@ -392,6 +399,14 @@ public void OnConnectionLost()
392399
TableSync.Reset();
393400
_serverStreamMap.Clear();
394401
_pendingStreamType.Clear();
402+
403+
if (_transportOptions is not null)
404+
{
405+
_ops.OnOutbound(new OpenStream(-2, StreamDirection.Unidirectional));
406+
_ops.OnOutbound(new OpenStream(-3, StreamDirection.Unidirectional));
407+
_ops.OnOutbound(new OpenStream(-4, StreamDirection.Unidirectional));
408+
_ops.OnOutbound(new ConnectTransport(_transportOptions));
409+
}
395410
}
396411

397412
/// <summary>
@@ -535,6 +550,11 @@ private void ReplayBufferedFrames()
535550
}
536551

537552
ArrayPool<Http3Frame>.Shared.Return(replayArray, true);
553+
554+
for (var i = correlationIndex; i < oldCorrelations.Count; i++)
555+
{
556+
EncodeAndEmit(oldCorrelations[i]);
557+
}
538558
}
539559

540560
private void EmitSerializedFrame(Http3Frame frame, long streamId = -1)

src/TurboHTTP/Protocol/Http3/StreamManager.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,41 @@ public void FlushPendingResponse(long streamId)
9595
}
9696
}
9797

98+
/// <summary>
99+
/// Fails an in-flight request on the given stream due to a transport error.
100+
/// Removes the correlation and stream state, and completes the <see cref="PendingRequest"/>
101+
/// with an exception so the caller's <c>SendAsync</c> or <c>ReadAsStringAsync</c> throws.
102+
/// Returns true if a correlated request was found and failed.
103+
/// </summary>
104+
public bool FailInflightRequest(long streamId, Exception exception)
105+
{
106+
if (_streams.TryGetValue(streamId, out var state))
107+
{
108+
state.Reset();
109+
if (_statePool.Count < MaxPoolSize)
110+
{
111+
_statePool.Push(state);
112+
}
113+
114+
_streams.Remove(streamId);
115+
}
116+
117+
if (!_correlationMap.Remove(streamId, out var request))
118+
{
119+
return false;
120+
}
121+
122+
OnStreamClosedCallback?.Invoke(streamId);
123+
ReturnDecoder(streamId);
124+
125+
if (request.Options.TryGetValue(TcsCorrelation.Key, out var pending))
126+
{
127+
pending.TrySetException(exception);
128+
}
129+
130+
return true;
131+
}
132+
98133
/// <summary>
99134
/// Completes all in-progress response assemblies (upstream finish / connection close).
100135
/// </summary>

src/TurboHTTP/Streams/Stages/Http30ConnectionStage.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,16 @@ private void HandleSignalItem(ITransportInbound item)
217217
}
218218
case StreamClosed { StreamId: >= 0 } streamClosed:
219219
{
220-
_sm.FlushPendingResponse(streamClosed.StreamId);
220+
if (streamClosed.Reason == DisconnectReason.Error)
221+
{
222+
_sm.FailInflightRequest(streamClosed.StreamId,
223+
new HttpRequestException("HTTP/3 stream aborted by transport."));
224+
}
225+
else
226+
{
227+
_sm.FlushPendingResponse(streamClosed.StreamId);
228+
}
229+
221230
FlushResponses();
222231
TryPullRequest();
223232
return;

0 commit comments

Comments
 (0)