Skip to content

Commit f9af64f

Browse files
authored
Improve detection of connect/handshake failures and the retry-policy (#3038)
* tests for args passed to retry-policy context: #3036 * more failure modes * detect timout of critical tracer
1 parent bf7f846 commit f9af64f

File tree

11 files changed

+413
-121
lines changed

11 files changed

+413
-121
lines changed

src/StackExchange.Redis/Enums/CommandFlags.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,5 +109,7 @@ public enum CommandFlags
109109
// 1024: Removed - was used for async timeout checks; never user-specified, so not visible on the public API
110110

111111
// 2048: Use subscription connection type; never user-specified, so not visible on the public API
112+
113+
// 4096: Identifies handshake completion messages; never user-specified, so not visible on the public API
112114
}
113115
}

src/StackExchange.Redis/Message.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ internal abstract partial class Message : ICompletable
6161

6262
private const CommandFlags AskingFlag = (CommandFlags)32,
6363
ScriptUnavailableFlag = (CommandFlags)256,
64-
DemandSubscriptionConnection = (CommandFlags)2048;
64+
DemandSubscriptionConnection = (CommandFlags)2048,
65+
HandshakeCompletionFlag = (CommandFlags)4096;
6566

6667
private const CommandFlags MaskPrimaryServerPreference = CommandFlags.DemandMaster
6768
| CommandFlags.DemandReplica
@@ -720,6 +721,8 @@ internal void SetWriteTime()
720721

721722
public virtual string CommandString => Command.ToString();
722723

724+
public bool IsHandshakeCompletion => (Flags & HandshakeCompletionFlag) != 0;
725+
723726
/// <summary>
724727
/// Sends this command to the subscription connection rather than the interactive.
725728
/// </summary>
@@ -742,6 +745,8 @@ internal void SetAsking(bool value)
742745
else Flags &= ~AskingFlag; // and the bits taketh away
743746
}
744747

748+
internal void SetHandshakeCompletion() => Flags |= HandshakeCompletionFlag;
749+
745750
internal void SetNoRedirect() => Flags |= CommandFlags.NoRedirect;
746751

747752
internal void SetPreferPrimary() =>

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -617,13 +617,14 @@ internal void OnHeartbeat(bool ifConnectedOnly)
617617
// We need to time that out and cleanup the PhysicalConnection if needed, otherwise that reader and socket will remain open
618618
// for the lifetime of the application due to being orphaned, yet still referenced by the active task doing the pipe read.
619619
case (int)State.ConnectedEstablished:
620-
// Track that we should reset the count on the next disconnect, but not do so in a loop
621-
shouldResetConnectionRetryCount = true;
622620
var tmp = physical;
623621
if (tmp != null)
624622
{
625623
if (state == (int)State.ConnectedEstablished)
626624
{
625+
// Track that we should reset the count on the next disconnect, but not do so in a loop, reset
626+
// the connect-retry-count (used for backoff decay etc), and remove any non-responsive flag.
627+
shouldResetConnectionRetryCount = true;
627628
Interlocked.Exchange(ref connectTimeoutRetryCount, 0);
628629
tmp.BridgeCouldBeNull?.ServerEndPoint?.ClearUnselectable(UnselectableFlags.DidNotRespond);
629630
}

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,15 +282,15 @@ internal void SetProtocol(RedisProtocol value)
282282
}
283283

284284
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times", Justification = "Trust me yo")]
285-
internal void Shutdown()
285+
internal void Shutdown(ConnectionFailureType failureType = ConnectionFailureType.ConnectionDisposed)
286286
{
287287
var ioPipe = Interlocked.Exchange(ref _ioPipe, null); // compare to the critical read
288288
var socket = Interlocked.Exchange(ref _socket, null);
289289

290290
if (ioPipe != null)
291291
{
292292
Trace("Disconnecting...");
293-
try { BridgeCouldBeNull?.OnDisconnected(ConnectionFailureType.ConnectionDisposed, this, out _, out _); } catch { }
293+
try { BridgeCouldBeNull?.OnDisconnected(failureType, this, out _, out _); } catch { }
294294
try { ioPipe.Input?.CancelPendingRead(); } catch { }
295295
try { ioPipe.Input?.Complete(); } catch { }
296296
try { ioPipe.Output?.CancelPendingFlush(); } catch { }
@@ -777,6 +777,12 @@ internal int OnBridgeHeartbeat()
777777
multiplexer.OnAsyncTimeout();
778778
result++;
779779
}
780+
else if (msg.IsHandshakeCompletion)
781+
{
782+
// Critical handshake validation timed out; note that this doesn't have a result-box,
783+
// so doesn't get timed out via the above.
784+
Shutdown(ConnectionFailureType.UnableToConnect);
785+
}
780786
}
781787
else
782788
{

src/StackExchange.Redis/ServerEndPoint.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections;
33
using System.Collections.Generic;
4+
using System.Diagnostics;
45
using System.Linq;
56
using System.Net;
67
using System.Runtime.CompilerServices;
@@ -1068,8 +1069,10 @@ private async Task HandshakeAsync(PhysicalConnection connection, ILogger? log)
10681069
}
10691070

10701071
var tracer = GetTracerMessage(true);
1072+
tracer.SetHandshakeCompletion();
10711073
tracer = LoggingMessage.Create(log, tracer);
10721074
log?.LogInformationSendingCriticalTracer(new(this), tracer.CommandAndKey);
1075+
Debug.Assert(tracer.IsHandshakeCompletion, "Tracer message should identify as handshake completion");
10731076
await WriteDirectOrQueueFireAndForgetAsync(connection, tracer, ResultProcessor.EstablishConnection).ForAwait();
10741077

10751078
// Note: this **must** be the last thing on the subscription handshake, because after this

tests/StackExchange.Redis.Tests/InProcessTestServer.cs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,25 @@ public override void OnClientConnected(RedisClient client, object state)
173173
base.OnClientConnected(client, state);
174174
}
175175

176+
public override void OnClientCompleted(RedisClient client, Exception? fault)
177+
{
178+
if (fault is null)
179+
{
180+
_log?.WriteLine($"[{client}] completed");
181+
}
182+
else
183+
{
184+
_log?.WriteLine($"[{client}] faulted: {fault.Message} ({fault.GetType().Name})");
185+
}
186+
base.OnClientCompleted(client, fault);
187+
}
188+
189+
protected override void OnSkippedReply(RedisClient client)
190+
{
191+
_log?.WriteLine($"[{client}] skipped reply");
192+
base.OnSkippedReply(client);
193+
}
194+
176195
private sealed class InProcTunnel(
177196
InProcessTestServer server,
178197
PipeOptions? pipeOptions = null) : Tunnel
@@ -189,14 +208,15 @@ private sealed class InProcTunnel(
189208
return base.GetSocketConnectEndpointAsync(endpoint, cancellationToken);
190209
}
191210

192-
public override ValueTask<Stream?> BeforeAuthenticateAsync(
211+
public override async ValueTask<Stream?> BeforeAuthenticateAsync(
193212
EndPoint endpoint,
194213
ConnectionType connectionType,
195214
Socket? socket,
196215
CancellationToken cancellationToken)
197216
{
198217
if (server.TryGetNode(endpoint, out var node))
199218
{
219+
await server.OnAcceptClientAsync(endpoint);
200220
var clientToServer = new Pipe(pipeOptions ?? PipeOptions.Default);
201221
var serverToClient = new Pipe(pipeOptions ?? PipeOptions.Default);
202222
var serverSide = new Duplex(clientToServer.Reader, serverToClient.Writer);
@@ -211,9 +231,9 @@ private sealed class InProcTunnel(
211231
var readStream = serverToClient.Reader.AsStream();
212232
var writeStream = clientToServer.Writer.AsStream();
213233
var clientSide = new DuplexStream(readStream, writeStream);
214-
return new(clientSide);
234+
return clientSide;
215235
}
216-
return base.BeforeAuthenticateAsync(endpoint, connectionType, socket, cancellationToken);
236+
return await base.BeforeAuthenticateAsync(endpoint, connectionType, socket, cancellationToken);
217237
}
218238

219239
private sealed class Duplex(PipeReader input, PipeWriter output) : IDuplexPipe
@@ -230,6 +250,8 @@ public ValueTask Dispose()
230250
}
231251
}
232252

253+
protected virtual ValueTask OnAcceptClientAsync(EndPoint endpoint) => default;
254+
233255
/*
234256
235257
private readonly RespServer _server;
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Net;
6+
using System.Net.Sockets;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using StackExchange.Redis.Server;
10+
using Xunit;
11+
12+
namespace StackExchange.Redis.Tests;
13+
14+
public class RetryPolicyUnitTests(ITestOutputHelper log)
15+
{
16+
[Theory]
17+
[InlineData(FailureMode.Success)]
18+
[InlineData(FailureMode.ConnectionRefused)]
19+
[InlineData(FailureMode.SlowNonConnect)]
20+
[InlineData(FailureMode.NoResponses)]
21+
[InlineData(FailureMode.GarbageResponses)]
22+
public async Task RetryPolicyFailureCases(FailureMode failureMode)
23+
{
24+
using var server = new NonResponsiveServer(log);
25+
var options = server.GetClientConfig(withPubSub: false);
26+
var policy = new CountingRetryPolicy();
27+
options.ConnectRetry = 5;
28+
options.SyncTimeout = options.AsyncTimeout = options.ConnectTimeout = 1_000;
29+
options.ReconnectRetryPolicy = policy;
30+
31+
// connect while the server is stable
32+
await using var conn = await ConnectionMultiplexer.ConnectAsync(options);
33+
var db = conn.GetDatabase();
34+
db.Ping();
35+
Assert.Equal(0, policy.Clear());
36+
37+
// now tell the server to become non-responsive to the next 2, and kill the current
38+
server.FailNext(2, failureMode);
39+
server.ForAllClients(x => x.Kill());
40+
41+
for (int i = 0; i < 10; i++)
42+
{
43+
try
44+
{
45+
await db.PingAsync();
46+
break;
47+
}
48+
catch (Exception ex)
49+
{
50+
log.WriteLine($"{nameof(db.PingAsync)} attempt {i}: {ex.GetType().Name}: {ex.Message}");
51+
}
52+
}
53+
var counts = policy.GetRetryCounts();
54+
if (failureMode is FailureMode.Success)
55+
{
56+
Assert.Empty(counts);
57+
}
58+
else
59+
{
60+
Assert.Equal("0,1", string.Join(",", counts));
61+
}
62+
}
63+
64+
private sealed class CountingRetryPolicy : IReconnectRetryPolicy
65+
{
66+
private readonly struct RetryRequest(int currentRetryCount, int timeElapsedMillisecondsSinceLastRetry)
67+
{
68+
public int CurrentRetryCount { get; } = currentRetryCount;
69+
public int TimeElapsedMillisecondsSinceLastRetry { get; } = timeElapsedMillisecondsSinceLastRetry;
70+
}
71+
private readonly List<RetryRequest> retryCounts = [];
72+
73+
public int Clear()
74+
{
75+
lock (retryCounts)
76+
{
77+
int count = retryCounts.Count;
78+
retryCounts.Clear();
79+
return count;
80+
}
81+
}
82+
83+
public int[] GetRetryCounts()
84+
{
85+
lock (retryCounts)
86+
{
87+
return retryCounts.Select(x => x.CurrentRetryCount).ToArray();
88+
}
89+
}
90+
91+
public bool ShouldRetry(long currentRetryCount, int timeElapsedMillisecondsSinceLastRetry)
92+
{
93+
lock (retryCounts)
94+
{
95+
retryCounts.Add(new(checked((int)currentRetryCount), timeElapsedMillisecondsSinceLastRetry));
96+
}
97+
return true;
98+
}
99+
}
100+
101+
public enum FailureMode
102+
{
103+
Success,
104+
SlowNonConnect,
105+
ConnectionRefused,
106+
NoResponses,
107+
GarbageResponses,
108+
}
109+
private sealed class NonResponsiveServer(ITestOutputHelper log) : InProcessTestServer(log)
110+
{
111+
private int _failNext;
112+
private FailureMode _failureMode;
113+
114+
public void FailNext(int count, FailureMode failureMode)
115+
{
116+
_failNext = count;
117+
_failureMode = failureMode;
118+
}
119+
120+
protected override ValueTask OnAcceptClientAsync(EndPoint endpoint)
121+
{
122+
switch (_failureMode)
123+
{
124+
case FailureMode.SlowNonConnect when ShouldIgnoreClient():
125+
Log($"(leaving pending connect to {endpoint})");
126+
return TimeoutEventually();
127+
case FailureMode.ConnectionRefused when ShouldIgnoreClient():
128+
Log($"(rejecting connection to {endpoint})");
129+
throw new SocketException((int)SocketError.ConnectionRefused);
130+
default:
131+
return base.OnAcceptClientAsync(endpoint);
132+
}
133+
134+
static async ValueTask TimeoutEventually()
135+
{
136+
await Task.Delay(TimeSpan.FromMinutes(5)).ConfigureAwait(false);
137+
throw new TimeoutException();
138+
}
139+
}
140+
141+
private bool ShouldIgnoreClient()
142+
{
143+
while (true)
144+
{
145+
var oldValue = Volatile.Read(ref _failNext);
146+
if (oldValue <= 0) return false;
147+
var newValue = oldValue - 1;
148+
if (Interlocked.CompareExchange(ref _failNext, newValue, oldValue) == oldValue) return true;
149+
}
150+
}
151+
152+
private sealed class GarbageClient(Node node) : RedisClient(node)
153+
{
154+
protected override void WriteResponse(
155+
IBufferWriter<byte> output,
156+
TypedRedisValue value,
157+
RedisProtocol protocol)
158+
{
159+
#if NET
160+
var rand = Random.Shared;
161+
#else
162+
var rand = new Random();
163+
#endif
164+
var len = rand.Next(1, 1024);
165+
var buffer = ArrayPool<byte>.Shared.Rent(len);
166+
var span = buffer.AsSpan(0, len);
167+
try
168+
{
169+
#if NET
170+
rand.NextBytes(span);
171+
#else
172+
rand.NextBytes(buffer);
173+
#endif
174+
output.Write(span);
175+
}
176+
finally
177+
{
178+
ArrayPool<byte>.Shared.Return(buffer);
179+
}
180+
}
181+
}
182+
183+
public override RedisClient CreateClient(Node node)
184+
{
185+
RedisClient client;
186+
if (_failureMode is FailureMode.GarbageResponses && ShouldIgnoreClient())
187+
{
188+
client = new GarbageClient(node);
189+
Log($"(accepting garbage-responsive connection to {node.Host}:{node.Port})");
190+
return client;
191+
}
192+
client = base.CreateClient(node);
193+
if (_failureMode is FailureMode.NoResponses && ShouldIgnoreClient())
194+
{
195+
Log($"(accepting non-responsive connection to {node.Host}:{node.Port})");
196+
client.SkipAllReplies();
197+
}
198+
else
199+
{
200+
Log($"(accepting responsive connection to {node.Host}:{node.Port})");
201+
}
202+
return client;
203+
}
204+
}
205+
}

0 commit comments

Comments
 (0)