Skip to content

Commit 8955d30

Browse files
committed
more failure modes
1 parent 7051aca commit 8955d30

4 files changed

Lines changed: 195 additions & 128 deletions

File tree

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
617617
// We need to time that out and cleanup the PhysicalConnection if needed, otherwise that reader and socket will remain open
618618
// for the lifetime of the application due to being orphaned, yet still referenced by the active task doing the pipe read.
619619
case (int)State.ConnectedEstablished:
620+
shouldResetConnectionRetryCount = true;
620621
var tmp = physical;
621622
if (tmp != null)
622623
{

tests/StackExchange.Redis.Tests/InProcessTestServer.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -208,15 +208,15 @@ private sealed class InProcTunnel(
208208
return base.GetSocketConnectEndpointAsync(endpoint, cancellationToken);
209209
}
210210

211-
public override ValueTask<Stream?> BeforeAuthenticateAsync(
211+
public override async ValueTask<Stream?> BeforeAuthenticateAsync(
212212
EndPoint endpoint,
213213
ConnectionType connectionType,
214214
Socket? socket,
215215
CancellationToken cancellationToken)
216216
{
217217
if (server.TryGetNode(endpoint, out var node))
218218
{
219-
server.OnAcceptClient(endpoint);
219+
await server.OnAcceptClientAsync(endpoint);
220220
var clientToServer = new Pipe(pipeOptions ?? PipeOptions.Default);
221221
var serverToClient = new Pipe(pipeOptions ?? PipeOptions.Default);
222222
var serverSide = new Duplex(clientToServer.Reader, serverToClient.Writer);
@@ -231,9 +231,9 @@ private sealed class InProcTunnel(
231231
var readStream = serverToClient.Reader.AsStream();
232232
var writeStream = clientToServer.Writer.AsStream();
233233
var clientSide = new DuplexStream(readStream, writeStream);
234-
return new(clientSide);
234+
return clientSide;
235235
}
236-
return base.BeforeAuthenticateAsync(endpoint, connectionType, socket, cancellationToken);
236+
return await base.BeforeAuthenticateAsync(endpoint, connectionType, socket, cancellationToken);
237237
}
238238

239239
private sealed class Duplex(PipeReader input, PipeWriter output) : IDuplexPipe
@@ -250,9 +250,7 @@ public ValueTask Dispose()
250250
}
251251
}
252252

253-
protected virtual void OnAcceptClient(EndPoint endpoint)
254-
{
255-
}
253+
protected virtual ValueTask OnAcceptClientAsync(EndPoint endpoint) => default;
256254

257255
/*
258256

tests/StackExchange.Redis.Tests/RetryPolicyUnitTests.cs

Lines changed: 87 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Buffers;
23
using System.Collections.Generic;
34
using System.Linq;
45
using System.Net;
@@ -13,9 +14,12 @@ namespace StackExchange.Redis.Tests;
1314
public class RetryPolicyUnitTests(ITestOutputHelper log)
1415
{
1516
[Theory]
16-
[InlineData(false)]
17-
[InlineData(true)]
18-
public async Task TestExponentialRetry(bool rejectConnection)
17+
[InlineData(FailureMode.Success)]
18+
[InlineData(FailureMode.ConnectionRefused)]
19+
[InlineData(FailureMode.SlowNonConnect)]
20+
[InlineData(FailureMode.NoResponses)]
21+
[InlineData(FailureMode.GarbageResponses)]
22+
public async Task TestExponentialRetry(FailureMode failureMode)
1923
{
2024
using var server = new NonResponsiveServer(log);
2125
var options = server.GetClientConfig(withPubSub: false);
@@ -31,7 +35,7 @@ public async Task TestExponentialRetry(bool rejectConnection)
3135
Assert.Equal(0, policy.Clear());
3236

3337
// now tell the server to become non-responsive to the next 2, and kill the current
34-
server.IgnoreNext(2, rejectConnection);
38+
server.FailNext(2, failureMode);
3539
server.ForAllClients(x => x.Kill());
3640

3741
for (int i = 0; i < 10; i++)
@@ -47,7 +51,14 @@ public async Task TestExponentialRetry(bool rejectConnection)
4751
}
4852
}
4953
var counts = policy.GetRetryCounts();
50-
Assert.Equal("0,1", string.Join(",", counts));
54+
if (failureMode is FailureMode.Success)
55+
{
56+
Assert.Empty(counts);
57+
}
58+
else
59+
{
60+
Assert.Equal("0,1", string.Join(",", counts));
61+
}
5162
}
5263

5364
private sealed class CountingRetryPolicy : IReconnectRetryPolicy
@@ -87,42 +98,99 @@ public bool ShouldRetry(long currentRetryCount, int timeElapsedMillisecondsSince
8798
}
8899
}
89100

101+
public enum FailureMode
102+
{
103+
Success,
104+
SlowNonConnect,
105+
ConnectionRefused,
106+
NoResponses,
107+
GarbageResponses,
108+
}
90109
private sealed class NonResponsiveServer(ITestOutputHelper log) : InProcessTestServer(log)
91110
{
92-
private int _ignoreNext;
93-
private bool _rejectConnection;
111+
private int _failNext;
112+
private FailureMode _failureMode;
94113

95-
public void IgnoreNext(int count, bool rejectConnection)
114+
public void FailNext(int count, FailureMode failureMode)
96115
{
97-
_ignoreNext = count;
98-
_rejectConnection = rejectConnection;
116+
_failNext = count;
117+
_failureMode = failureMode;
99118
}
100119

101-
protected override void OnAcceptClient(EndPoint endpoint)
120+
protected override ValueTask OnAcceptClientAsync(EndPoint endpoint)
102121
{
103-
if (_rejectConnection && ShouldIgnoreClient())
122+
switch (_failureMode)
123+
{
124+
case FailureMode.SlowNonConnect when ShouldIgnoreClient():
125+
Log($"(leaving pending connect to {endpoint})");
126+
return TimeoutEventually();
127+
case FailureMode.ConnectionRefused when ShouldIgnoreClient():
128+
Log($"(rejecting connection to {endpoint})");
129+
throw new SocketException((int)SocketError.ConnectionRefused);
130+
default:
131+
return base.OnAcceptClientAsync(endpoint);
132+
}
133+
134+
static async ValueTask TimeoutEventually()
104135
{
105-
Log($"(rejecting connection to {endpoint})");
106-
throw new SocketException((int)SocketError.ConnectionRefused);
136+
await Task.Delay(TimeSpan.FromMinutes(5)).ConfigureAwait(false);
137+
throw new TimeoutException();
107138
}
108-
base.OnAcceptClient(endpoint);
109139
}
110140

111141
private bool ShouldIgnoreClient()
112142
{
113143
while (true)
114144
{
115-
var oldValue = Volatile.Read(ref _ignoreNext);
145+
var oldValue = Volatile.Read(ref _failNext);
116146
if (oldValue <= 0) return false;
117147
var newValue = oldValue - 1;
118-
if (Interlocked.CompareExchange(ref _ignoreNext, newValue, oldValue) == oldValue) return true;
148+
if (Interlocked.CompareExchange(ref _failNext, newValue, oldValue) == oldValue) return true;
149+
}
150+
}
151+
152+
private sealed class GarbageClient(Node node) : RedisClient(node)
153+
{
154+
protected override void WriteResponse(
155+
IBufferWriter<byte> output,
156+
TypedRedisValue value,
157+
RedisProtocol protocol)
158+
{
159+
#if NET
160+
var rand = Random.Shared;
161+
#else
162+
var rand = new Random();
163+
#endif
164+
var len = rand.Next(1, 1024);
165+
var buffer = ArrayPool<byte>.Shared.Rent(len);
166+
var span = buffer.AsSpan(0, len);
167+
try
168+
{
169+
#if NET
170+
rand.NextBytes(span);
171+
#else
172+
rand.NextBytes(buffer);
173+
#endif
174+
output.Write(span);
175+
}
176+
finally
177+
{
178+
ArrayPool<byte>.Shared.Return(buffer);
179+
}
119180
}
120181
}
121182

122183
public override RedisClient CreateClient(Node node)
123184
{
124-
var client = base.CreateClient(node);
125-
if (!_rejectConnection && ShouldIgnoreClient())
185+
RedisClient client;
186+
if (_failureMode is FailureMode.GarbageResponses && ShouldIgnoreClient())
187+
{
188+
client = new GarbageClient(node);
189+
Log($"(accepting garbage-responsive connection to {node.Host}:{node.Port})");
190+
return client;
191+
}
192+
client = base.CreateClient(node);
193+
if (_failureMode is FailureMode.NoResponses && ShouldIgnoreClient())
126194
{
127195
Log($"(accepting non-responsive connection to {node.Host}:{node.Port})");
128196
client.SkipAllReplies();

0 commit comments

Comments
 (0)