Skip to content

Commit e7b4e7c

Browse files
authored
Merge branch 'marc/v3' into marc/de-psu
2 parents 0198044 + da98bae commit e7b4e7c

14 files changed

Lines changed: 546 additions & 111 deletions

File tree

docs/ReleaseNotes.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ Current package versions:
1010

1111
- (none)
1212

13+
## 2.12.4
14+
15+
- Fix RESP3 client handshakes on non-RESP3 servers by ([#3037 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3037))
16+
- Improve detection of connect/handshake failures and how that impacts the retry-policy ([#3038 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3038))
17+
18+
1319
## 2.12.1
1420

1521
- Add missing `LCS` outputs and missing `RedisType.VectorSet` ([#3028 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3028))

src/StackExchange.Redis/Enums/CommandFlags.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,5 +109,7 @@ public enum CommandFlags
109109
// 1024: used for "no flush"; never user-specified, so not visible on the public API
110110

111111
// 2048: Use subscription connection type; never user-specified, so not visible on the public API
112+
113+
// 4096: Identifies handshake completion messages; never user-specified, so not visible on the public API
112114
}
113115
}

src/StackExchange.Redis/Message.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ internal const CommandFlags
6363
protected RedisCommand command;
6464

6565
private const CommandFlags AskingFlag = (CommandFlags)32,
66-
ScriptUnavailableFlag = (CommandFlags)256,
67-
DemandSubscriptionConnection = (CommandFlags)2048;
66+
ScriptUnavailableFlag = (CommandFlags)256,
67+
DemandSubscriptionConnection = (CommandFlags)2048,
68+
HandshakeCompletionFlag = (CommandFlags)4096;
6869

6970
private const CommandFlags MaskPrimaryServerPreference = CommandFlags.DemandMaster
7071
| CommandFlags.DemandReplica
@@ -802,6 +803,8 @@ internal void SetWriteTime()
802803
// for sync to skip flush, we need *both* NoFlush and FireAndForget; we absolutely need to flush if someone is doing a sync call
803804
internal bool IsFlushRequiredSync => (Flags & (NoFlushFlag | CommandFlags.FireAndForget)) != (NoFlushFlag | CommandFlags.FireAndForget);
804805

806+
public bool IsHandshakeCompletion => (Flags & HandshakeCompletionFlag) != 0;
807+
805808
/// <summary>
806809
/// Sends this command to the subscription connection rather than the interactive.
807810
/// </summary>
@@ -824,6 +827,8 @@ internal void SetAsking(bool value)
824827
else Flags &= ~AskingFlag; // and the bits taketh away
825828
}
826829

830+
internal void SetHandshakeCompletion() => Flags |= HandshakeCompletionFlag;
831+
827832
internal void SetNoRedirect() => Flags |= CommandFlags.NoRedirect;
828833

829834
internal void SetPreferPrimary() =>

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -600,13 +600,14 @@ internal void OnHeartbeat(bool ifConnectedOnly)
600600
// We need to time that out and cleanup the PhysicalConnection if needed, otherwise that reader and socket will remain open
601601
// for the lifetime of the application due to being orphaned, yet still referenced by the active task doing the pipe read.
602602
case (int)State.ConnectedEstablished:
603-
// Track that we should reset the count on the next disconnect, but not do so in a loop
604-
shouldResetConnectionRetryCount = true;
605603
var tmp = physical;
606604
if (tmp != null)
607605
{
608606
if (state == (int)State.ConnectedEstablished)
609607
{
608+
// Track that we should reset the count on the next disconnect, but not do so in a loop, reset
609+
// the connect-retry-count (used for backoff decay etc), and remove any non-responsive flag.
610+
shouldResetConnectionRetryCount = true;
610611
Interlocked.Exchange(ref connectTimeoutRetryCount, 0);
611612
tmp.BridgeCouldBeNull?.ServerEndPoint?.ClearUnselectable(UnselectableFlags.DidNotRespond);
612613
}

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,15 +322,15 @@ public void SetProtocol(RedisProtocol value)
322322
}
323323

324324
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times", Justification = "Trust me yo")]
325-
internal void Shutdown()
325+
internal void Shutdown(ConnectionFailureType failureType = ConnectionFailureType.ConnectionDisposed)
326326
{
327327
var output = Interlocked.Exchange(ref _output, null); // compare to the critical read
328328
var socket = Interlocked.Exchange(ref _socket, null);
329329

330330
if (output != null)
331331
{
332332
Trace("Disconnecting...");
333-
try { BridgeCouldBeNull?.OnDisconnected(ConnectionFailureType.ConnectionDisposed, this, out _, out _); } catch { }
333+
try { BridgeCouldBeNull?.OnDisconnected(failureType, this, out _, out _); } catch { }
334334
try { output.Complete(); } catch { }
335335
}
336336

@@ -782,6 +782,12 @@ internal int OnBridgeHeartbeat()
782782
multiplexer.OnAsyncTimeout();
783783
result++;
784784
}
785+
else if (msg.IsHandshakeCompletion)
786+
{
787+
// Critical handshake validation timed out; note that this doesn't have a result-box,
788+
// so doesn't get timed out via the above.
789+
Shutdown(ConnectionFailureType.UnableToConnect);
790+
}
785791
}
786792
else
787793
{

src/StackExchange.Redis/ResultProcessor.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3196,10 +3196,18 @@ public override bool SetResult(PhysicalConnection connection, Message message, r
31963196

31973197
if (connection.Protocol is null)
31983198
{
3199-
// if we didn't get a valid response from HELLO, then we have to assume RESP2 at some point
3199+
// If we didn't get a valid response from HELLO, then we have to assume RESP2 at some point.
3200+
// We need the protocol assigned before OnFullyEstablished so that the
3201+
// protocol is reliably known *before* we do next-steps.
32003202
connection.SetProtocol(RedisProtocol.Resp2);
32013203
}
32023204

3205+
if (final & establishConnection)
3206+
{
3207+
// This is what ultimately brings us to complete a connection, by advancing the state forward from a successful tracer after connection.
3208+
connection.BridgeCouldBeNull?.OnFullyEstablished(connection, $"From command: {message.Command}");
3209+
}
3210+
32033211
return final;
32043212
}
32053213

@@ -3251,11 +3259,6 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
32513259
}
32523260
if (happy)
32533261
{
3254-
if (establishConnection)
3255-
{
3256-
// This is what ultimately brings us to complete a connection, by advancing the state forward from a successful tracer after connection.
3257-
connection.BridgeCouldBeNull?.OnFullyEstablished(connection, $"From command: {message.Command}");
3258-
}
32593262
SetResult(message, happy);
32603263
return true;
32613264
}

src/StackExchange.Redis/ServerEndPoint.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections;
33
using System.Collections.Generic;
4+
using System.Diagnostics;
45
using System.Linq;
56
using System.Net;
67
using System.Runtime.CompilerServices;
@@ -695,14 +696,20 @@ internal void OnFullyEstablished(PhysicalConnection connection, string source)
695696
// Clear the unselectable flag ASAP since we are open for business
696697
ClearUnselectable(UnselectableFlags.DidNotRespond);
697698

698-
bool isResp3 = KnowOrAssumeResp3();
699+
// is *this specific* connection using RESP3? (without reference to config preferences)
700+
bool isResp3 = connection?.Protocol is >= RedisProtocol.Resp3;
699701
if (bridge == subscription || isResp3)
700702
{
701703
// Note: this MUST be fire and forget, because we might be in the middle of a Sync processing
702704
// TracerProcessor which is executing this line inside a SetResultCore().
703705
// Since we're issuing commands inside a SetResult path in a message, we'd create a deadlock by waiting.
704706
Multiplexer.EnsureSubscriptions(CommandFlags.FireAndForget);
705707
}
708+
else if (SupportsSubscriptions && Multiplexer.RawConfig.Protocol > RedisProtocol.Resp2)
709+
{
710+
// interactive, and we wanted RESP3+, but we didn't get it; spin up pub/sub
711+
Activate(ConnectionType.Subscription, null);
712+
}
706713
if (IsConnected && (IsSubscriberConnected || !SupportsSubscriptions || isResp3))
707714
{
708715
// Only connect on the second leg - we can accomplish this by checking both
@@ -1069,8 +1076,10 @@ private async Task HandshakeAsync(PhysicalConnection connection, ILogger? log)
10691076

10701077
// note that the final messages *are* flushed (no Message.NoFlushFlag)
10711078
var tracer = GetTracerMessage(true);
1079+
tracer.SetHandshakeCompletion();
10721080
tracer = LoggingMessage.Create(log, tracer);
10731081
log?.LogInformationSendingCriticalTracer(new(this), tracer.CommandAndKey);
1082+
Debug.Assert(tracer.IsHandshakeCompletion, "Tracer message should identify as handshake completion");
10741083
await WriteDirectOrQueueFireAndForgetAsync(connection, tracer, ResultProcessor.EstablishConnection).ForAwait();
10751084

10761085
// Note: this **must** be the last thing on the subscription handshake, because after this

tests/StackExchange.Redis.Tests/InProcessTestServer.cs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ namespace StackExchange.Redis.Tests;
1515
public class InProcessTestServer : MemoryCacheRedisServer
1616
{
1717
private readonly ITestOutputHelper? _log;
18-
public InProcessTestServer(ITestOutputHelper? log = null)
18+
public InProcessTestServer(ITestOutputHelper? log = null, EndPoint? endpoint = null)
19+
: base(endpoint)
1920
{
2021
RedisVersion = RedisFeatures.v6_0_0; // for client to expect RESP3
2122
_log = log;
@@ -172,6 +173,25 @@ public override void OnClientConnected(RedisClient client, object state)
172173
base.OnClientConnected(client, state);
173174
}
174175

176+
public override void OnClientCompleted(RedisClient client, Exception? fault)
177+
{
178+
if (fault is null)
179+
{
180+
_log?.WriteLine($"[{client}] completed");
181+
}
182+
else
183+
{
184+
_log?.WriteLine($"[{client}] faulted: {fault.Message} ({fault.GetType().Name})");
185+
}
186+
base.OnClientCompleted(client, fault);
187+
}
188+
189+
protected override void OnSkippedReply(RedisClient client)
190+
{
191+
_log?.WriteLine($"[{client}] skipped reply");
192+
base.OnSkippedReply(client);
193+
}
194+
175195
private sealed class InProcTunnel(
176196
InProcessTestServer server,
177197
PipeOptions? pipeOptions = null) : Tunnel
@@ -188,14 +208,15 @@ private sealed class InProcTunnel(
188208
return base.GetSocketConnectEndpointAsync(endpoint, cancellationToken);
189209
}
190210

191-
public override ValueTask<Stream?> BeforeAuthenticateAsync(
211+
public override async ValueTask<Stream?> BeforeAuthenticateAsync(
192212
EndPoint endpoint,
193213
ConnectionType connectionType,
194214
Socket? socket,
195215
CancellationToken cancellationToken)
196216
{
197217
if (server.TryGetNode(endpoint, out var node))
198218
{
219+
await server.OnAcceptClientAsync(endpoint);
199220
var clientToServer = new Pipe(pipeOptions ?? PipeOptions.Default);
200221
var serverToClient = new Pipe(pipeOptions ?? PipeOptions.Default);
201222
var serverSide = new Duplex(clientToServer.Reader, serverToClient.Writer);
@@ -210,9 +231,9 @@ private sealed class InProcTunnel(
210231
var readStream = serverToClient.Reader.AsStream();
211232
var writeStream = clientToServer.Writer.AsStream();
212233
var clientSide = new DuplexStream(readStream, writeStream);
213-
return new(clientSide);
234+
return clientSide;
214235
}
215-
return base.BeforeAuthenticateAsync(endpoint, connectionType, socket, cancellationToken);
236+
return await base.BeforeAuthenticateAsync(endpoint, connectionType, socket, cancellationToken);
216237
}
217238

218239
private sealed class Duplex(PipeReader input, PipeWriter output) : IDuplexPipe
@@ -229,6 +250,8 @@ public ValueTask Dispose()
229250
}
230251
}
231252

253+
protected virtual ValueTask OnAcceptClientAsync(EndPoint endpoint) => default;
254+
232255
/*
233256
234257
private readonly RespServer _server;
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Net;
5+
using System.Threading.Tasks;
6+
using StackExchange.Redis.Server;
7+
using Xunit;
8+
9+
namespace StackExchange.Redis.Tests;
10+
11+
public class Resp3HandshakeTests(ITestOutputHelper log)
12+
{
13+
public enum ServerResponse
14+
{
15+
Resp3, // up-level server style
16+
Resp2, // DMC hybrid style, i.e. we know about it, but: "no, you'll take RESP2"
17+
UnknownCommand, // down-level server style
18+
}
19+
20+
[Flags]
21+
public enum HandshakeFlags
22+
{
23+
None = 0,
24+
Authenticated = 1 << 0,
25+
TieBreaker = 1 << 1,
26+
ConfigChannel = 1 << 2,
27+
UsePubSub = 1 << 3,
28+
UseDatabase = 1 << 4,
29+
}
30+
31+
private static readonly int HandshakeFlagsCount = Enum.GetValues(typeof(HandshakeFlags)).Length - 1;
32+
public static IEnumerable<object[]> GetHandshakeParameters()
33+
{
34+
// all client protocols, all server-response modes; all flag permutations
35+
var clients = (RedisProtocol[])Enum.GetValues(typeof(RedisProtocol));
36+
var servers = (ServerResponse[])Enum.GetValues(typeof(ServerResponse));
37+
foreach (var client in clients)
38+
{
39+
foreach (var server in servers)
40+
{
41+
if (client is RedisProtocol.Resp2 & server is not ServerResponse.Resp2)
42+
{
43+
// we don't issue HELLO for this, nothing to test
44+
}
45+
else
46+
{
47+
int count = 1 << HandshakeFlagsCount;
48+
for (int i = 0; i < count; i++)
49+
{
50+
yield return [client, server, (HandshakeFlags)i];
51+
}
52+
}
53+
}
54+
}
55+
}
56+
57+
[Theory]
58+
[MemberData(nameof(GetHandshakeParameters))]
59+
public async Task Handshake(RedisProtocol client, ServerResponse server, HandshakeFlags flags)
60+
{
61+
using var serverObj = new HandshakeServer(server, log);
62+
serverObj.Password = (flags & HandshakeFlags.Authenticated) == 0 ? null : "mypassword";
63+
var config = serverObj.GetClientConfig();
64+
config.Protocol = client;
65+
config.TieBreaker = (flags & HandshakeFlags.TieBreaker) == 0 ? "" : "tiebreaker_key";
66+
config.ConfigurationChannel = (flags & HandshakeFlags.ConfigChannel) == 0 ? "" : "broadcast_channel";
67+
68+
using var clientObj = await ConnectionMultiplexer.ConnectAsync(config);
69+
70+
var sub = clientObj.GetSubscriber();
71+
var db = clientObj.GetDatabase();
72+
ConcurrentBag<string> received = [];
73+
RedisChannel channel = RedisChannel.Literal("mychannel");
74+
RedisKey key = "mykey";
75+
bool useDatabase = (flags & HandshakeFlags.UseDatabase) != 0;
76+
bool usePubSub = (flags & HandshakeFlags.UsePubSub) != 0;
77+
78+
if (usePubSub)
79+
{
80+
await sub.SubscribeAsync(channel, (x, y) => received.Add(y!));
81+
}
82+
if (useDatabase)
83+
{
84+
await db.StringSetAsync(key, "myvalue");
85+
}
86+
if (usePubSub)
87+
{
88+
await sub.PublishAsync(channel, "msg payload");
89+
for (int i = 0; i < 5 && received.IsEmpty; i++)
90+
{
91+
await Task.Delay(10, TestContext.Current.CancellationToken);
92+
await sub.PingAsync();
93+
}
94+
Assert.Equal("msg payload", Assert.Single(received));
95+
}
96+
97+
if (useDatabase)
98+
{
99+
Assert.Equal("myvalue", await db.StringGetAsync(key));
100+
}
101+
}
102+
103+
private static readonly EndPoint EP = new DnsEndPoint("home", 8000);
104+
private sealed class HandshakeServer(ServerResponse response, ITestOutputHelper log)
105+
: InProcessTestServer(log, EP)
106+
{
107+
protected override RedisProtocol MaxProtocol => response switch
108+
{
109+
ServerResponse.Resp3 => RedisProtocol.Resp3,
110+
_ => RedisProtocol.Resp2,
111+
};
112+
113+
protected override TypedRedisValue Hello(RedisClient client, in RedisRequest request)
114+
=> response is ServerResponse.UnknownCommand
115+
? request.CommandNotFound()
116+
: base.Hello(client, in request);
117+
}
118+
}

0 commit comments

Comments
 (0)