Skip to content

Commit bb74010

Browse files
committed
detect timout of critical tracer
1 parent 8955d30 commit bb74010

6 files changed

Lines changed: 20 additions & 5 deletions

File tree

src/StackExchange.Redis/Enums/CommandFlags.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,5 +109,7 @@ public enum CommandFlags
109109
// 1024: Removed - was used for async timeout checks; never user-specified, so not visible on the public API
110110

111111
// 2048: Use subscription connection type; never user-specified, so not visible on the public API
112+
113+
// 4096: Identifies handshake completion messages; never user-specified, so not visible on the public API
112114
}
113115
}

src/StackExchange.Redis/Message.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ internal abstract partial class Message : ICompletable
6161

6262
private const CommandFlags AskingFlag = (CommandFlags)32,
6363
ScriptUnavailableFlag = (CommandFlags)256,
64-
DemandSubscriptionConnection = (CommandFlags)2048;
64+
DemandSubscriptionConnection = (CommandFlags)2048,
65+
HandshakeCompletionFlag = (CommandFlags)4096;
6566

6667
private const CommandFlags MaskPrimaryServerPreference = CommandFlags.DemandMaster
6768
| CommandFlags.DemandReplica
@@ -720,6 +721,8 @@ internal void SetWriteTime()
720721

721722
public virtual string CommandString => Command.ToString();
722723

724+
public bool IsHandshakeCompletion => (Flags & HandshakeCompletionFlag) != 0;
725+
723726
/// <summary>
724727
/// Sends this command to the subscription connection rather than the interactive.
725728
/// </summary>
@@ -742,6 +745,8 @@ internal void SetAsking(bool value)
742745
else Flags &= ~AskingFlag; // and the bits taketh away
743746
}
744747

748+
internal void SetHandshakeCompletion() => Flags |= HandshakeCompletionFlag;
749+
745750
internal void SetNoRedirect() => Flags |= CommandFlags.NoRedirect;
746751

747752
internal void SetPreferPrimary() =>

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,6 @@ internal void OnHeartbeat(bool ifConnectedOnly)
617617
// We need to time that out and cleanup the PhysicalConnection if needed, otherwise that reader and socket will remain open
618618
// for the lifetime of the application due to being orphaned, yet still referenced by the active task doing the pipe read.
619619
case (int)State.ConnectedEstablished:
620-
shouldResetConnectionRetryCount = true;
621620
var tmp = physical;
622621
if (tmp != null)
623622
{

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,15 +282,15 @@ internal void SetProtocol(RedisProtocol value)
282282
}
283283

284284
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times", Justification = "Trust me yo")]
285-
internal void Shutdown()
285+
internal void Shutdown(ConnectionFailureType failureType = ConnectionFailureType.ConnectionDisposed)
286286
{
287287
var ioPipe = Interlocked.Exchange(ref _ioPipe, null); // compare to the critical read
288288
var socket = Interlocked.Exchange(ref _socket, null);
289289

290290
if (ioPipe != null)
291291
{
292292
Trace("Disconnecting...");
293-
try { BridgeCouldBeNull?.OnDisconnected(ConnectionFailureType.ConnectionDisposed, this, out _, out _); } catch { }
293+
try { BridgeCouldBeNull?.OnDisconnected(failureType, this, out _, out _); } catch { }
294294
try { ioPipe.Input?.CancelPendingRead(); } catch { }
295295
try { ioPipe.Input?.Complete(); } catch { }
296296
try { ioPipe.Output?.CancelPendingFlush(); } catch { }
@@ -777,6 +777,12 @@ internal int OnBridgeHeartbeat()
777777
multiplexer.OnAsyncTimeout();
778778
result++;
779779
}
780+
else if (msg.IsHandshakeCompletion)
781+
{
782+
// Critical handshake validation timed out; note that this doesn't have a result-box,
783+
// so doesn't get timed out via the above.
784+
Shutdown(ConnectionFailureType.UnableToConnect);
785+
}
780786
}
781787
else
782788
{

src/StackExchange.Redis/ServerEndPoint.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections;
33
using System.Collections.Generic;
4+
using System.Diagnostics;
45
using System.Linq;
56
using System.Net;
67
using System.Runtime.CompilerServices;
@@ -1068,8 +1069,10 @@ private async Task HandshakeAsync(PhysicalConnection connection, ILogger? log)
10681069
}
10691070

10701071
var tracer = GetTracerMessage(true);
1072+
tracer.SetHandshakeCompletion();
10711073
tracer = LoggingMessage.Create(log, tracer);
10721074
log?.LogInformationSendingCriticalTracer(new(this), tracer.CommandAndKey);
1075+
Debug.Assert(tracer.IsHandshakeCompletion, "Tracer message should identify as handshake completion");
10731076
await WriteDirectOrQueueFireAndForgetAsync(connection, tracer, ResultProcessor.EstablishConnection).ForAwait();
10741077

10751078
// Note: this **must** be the last thing on the subscription handshake, because after this

tests/StackExchange.Redis.Tests/RetryPolicyUnitTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class RetryPolicyUnitTests(ITestOutputHelper log)
1919
[InlineData(FailureMode.SlowNonConnect)]
2020
[InlineData(FailureMode.NoResponses)]
2121
[InlineData(FailureMode.GarbageResponses)]
22-
public async Task TestExponentialRetry(FailureMode failureMode)
22+
public async Task RetryPolicyFailureCases(FailureMode failureMode)
2323
{
2424
using var server = new NonResponsiveServer(log);
2525
var options = server.GetClientConfig(withPubSub: false);

0 commit comments

Comments
 (0)