Skip to content

Commit d4b2f39

Browse files
barshaulmgravell
andauthored
Handle MOVED error pointing to same endpoint. (#3003)
* Handle MOVED error pointing to same endpoint by triggering reconnection before retrying the request. * Better stimulate proxy/LB * Increase timeout * Fixed key name to prevent collisions * Add NeedsReconnect flag to defer reconnection to reader loop for MOVED-to-same-endpoint * drop unstable test --------- Co-authored-by: Marc Gravell <marc.gravell@gmail.com>
1 parent 11af75b commit d4b2f39

7 files changed

Lines changed: 419 additions & 33 deletions

File tree

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ internal sealed class PhysicalBridge : IDisposable
4848
private int failConnectCount = 0;
4949
private volatile bool isDisposed;
5050
private volatile bool shouldResetConnectionRetryCount;
51+
private bool _needsReconnect;
5152
private long nonPreferredEndpointCount;
5253

5354
// private volatile int missedHeartbeats;
@@ -131,6 +132,16 @@ public enum State : byte
131132
private RedisProtocol _protocol; // note starts at zero, not RESP2
132133
internal void SetProtocol(RedisProtocol protocol) => _protocol = protocol;
133134

135+
/// <summary>
136+
/// Indicates whether the bridge needs to reconnect.
137+
/// </summary>
138+
internal bool NeedsReconnect => Volatile.Read(ref _needsReconnect);
139+
140+
/// <summary>
141+
/// Marks that the bridge needs to reconnect.
142+
/// </summary>
143+
internal void MarkNeedsReconnect() => Volatile.Write(ref _needsReconnect, true);
144+
134145
public void Dispose()
135146
{
136147
isDisposed = true;
@@ -210,7 +221,7 @@ private WriteResult FailDueToNoConnection(Message message)
210221
public WriteResult TryWriteSync(Message message, bool isReplica)
211222
{
212223
if (isDisposed) throw new ObjectDisposedException(Name);
213-
if (!IsConnected) return QueueOrFailMessage(message);
224+
if (!IsConnected || NeedsReconnect) return QueueOrFailMessage(message);
214225

215226
var physical = this.physical;
216227
if (physical == null)
@@ -234,7 +245,7 @@ public WriteResult TryWriteSync(Message message, bool isReplica)
234245
public ValueTask<WriteResult> TryWriteAsync(Message message, bool isReplica, bool bypassBacklog = false)
235246
{
236247
if (isDisposed) throw new ObjectDisposedException(Name);
237-
if (!IsConnected && !bypassBacklog) return new ValueTask<WriteResult>(QueueOrFailMessage(message));
248+
if ((!IsConnected || NeedsReconnect) && !bypassBacklog) return new ValueTask<WriteResult>(QueueOrFailMessage(message));
238249

239250
var physical = this.physical;
240251
if (physical == null)
@@ -1478,6 +1489,8 @@ private bool ChangeState(State oldState, State newState)
14781489
Multiplexer.Trace("Connecting...", Name);
14791490
if (ChangeState(State.Disconnected, State.Connecting))
14801491
{
1492+
// Clear the reconnect flag as we're starting a new connection
1493+
Volatile.Write(ref _needsReconnect, false);
14811494
Interlocked.Increment(ref socketCount);
14821495
Interlocked.Exchange(ref connectStartTicks, Environment.TickCount);
14831496
// separate creation and connection for case when connection completes synchronously

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2091,9 +2091,9 @@ private async Task ReadFromPipe()
20912091
Trace($"Processed {handled} messages");
20922092
input.AdvanceTo(buffer.Start, buffer.End);
20932093

2094-
if (handled == 0 && readResult.IsCompleted)
2094+
if ((handled == 0 && readResult.IsCompleted) || BridgeCouldBeNull?.NeedsReconnect == true)
20952095
{
2096-
break; // no more data, or trailing incomplete messages
2096+
break; // no more data, trailing incomplete messages, or reconnection required
20972097
}
20982098
}
20992099
Trace("EOF");

src/StackExchange.Redis/ResultProcessor.cs

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -259,43 +259,50 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
259259
if (Format.TryParseInt32(parts[1], out int hashSlot)
260260
&& Format.TryParseEndPoint(parts[2], out var endpoint))
261261
{
262-
// no point sending back to same server, and no point sending to a dead server
263-
if (!Equals(server?.EndPoint, endpoint))
262+
// Check if MOVED points to same endpoint
263+
bool isSameEndpoint = Equals(server?.EndPoint, endpoint);
264+
if (isSameEndpoint && isMoved)
264265
{
265-
if (bridge is null)
266-
{
267-
// already toast
268-
}
269-
else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
266+
// MOVED to same endpoint detected.
267+
// This occurs when Redis/Valkey servers are behind DNS records, load balancers, or proxies.
268+
// The MOVED error signals that the client should reconnect to allow the DNS/proxy/load balancer
269+
// to route the connection to a different underlying server host, then retry the command.
270+
// Mark the bridge to reconnect - reader loop will handle disconnection and reconnection.
271+
bridge?.MarkNeedsReconnect();
272+
}
273+
if (bridge is null)
274+
{
275+
// already toast
276+
}
277+
else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
278+
{
279+
bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK");
280+
return false;
281+
}
282+
else
283+
{
284+
if (isMoved && wasNoRedirect)
270285
{
271-
bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK");
272-
return false;
286+
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
287+
{
288+
err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. ";
289+
}
290+
else
291+
{
292+
err = "Key has MOVED but CommandFlags.NoRedirect was specified - redirect not followed. ";
293+
}
273294
}
274295
else
275296
{
276-
if (isMoved && wasNoRedirect)
297+
unableToConnectError = true;
298+
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
277299
{
278-
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
279-
{
280-
err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. ";
281-
}
282-
else
283-
{
284-
err = "Key has MOVED but CommandFlags.NoRedirect was specified - redirect not followed. ";
285-
}
300+
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "
301+
+ PerfCounterHelper.GetThreadPoolAndCPUSummary();
286302
}
287303
else
288304
{
289-
unableToConnectError = true;
290-
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
291-
{
292-
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "
293-
+ PerfCounterHelper.GetThreadPoolAndCPUSummary();
294-
}
295-
else
296-
{
297-
err = "Endpoint is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
298-
}
305+
err = "Endpoint is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
299306
}
300307
}
301308
}
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Net;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using StackExchange.Redis.Server;
8+
9+
namespace StackExchange.Redis.Tests;
10+
11+
/// <summary>
12+
/// Test Redis/Valkey server that simulates MOVED errors pointing to the same endpoint.
13+
/// Used to verify client reconnection behavior when the server is behind DNS/load balancers/proxies.
14+
/// When a MOVED error points to the same endpoint, it signals the client to reconnect before retrying the command,
15+
/// allowing the DNS record/proxy/load balancer to route the connection to a different underlying server host.
16+
/// </summary>
17+
public class MovedTestServer : MemoryCacheRedisServer
18+
{
19+
/// <summary>
20+
/// Represents the simulated server host state behind a proxy/load balancer.
21+
/// </summary>
22+
private enum SimulatedHost
23+
{
24+
/// <summary>
25+
/// Old server that returns MOVED errors for the trigger key (pre-migration state).
26+
/// </summary>
27+
OldServer,
28+
29+
/// <summary>
30+
/// New server that handles requests normally (post-migration state).
31+
/// </summary>
32+
NewServer,
33+
}
34+
35+
private int _setCmdCount = 0;
36+
private int _movedResponseCount = 0;
37+
private int _connectionCount = 0;
38+
private SimulatedHost _currentServerHost = SimulatedHost.OldServer;
39+
private readonly ConcurrentDictionary<RedisClient, SimulatedHost> _clientHostAssignments = new();
40+
private readonly Func<string> _getEndpoint;
41+
private readonly string _triggerKey;
42+
private readonly int _hashSlot;
43+
private EndPoint? _actualEndpoint;
44+
45+
public MovedTestServer(Func<string> getEndpoint, string triggerKey = "testkey", int hashSlot = 12345)
46+
{
47+
_getEndpoint = getEndpoint;
48+
_triggerKey = triggerKey;
49+
_hashSlot = hashSlot;
50+
}
51+
52+
/// <summary>
53+
/// Called when a new client connection is established.
54+
/// Assigns the client to the current server host state (simulating proxy/load balancer routing).
55+
/// </summary>
56+
public override RedisClient CreateClient()
57+
{
58+
var client = base.CreateClient();
59+
var assignedHost = _currentServerHost;
60+
_clientHostAssignments[client] = assignedHost;
61+
Interlocked.Increment(ref _connectionCount);
62+
Log($"New client connection established (assigned to {assignedHost}, total connections: {_connectionCount}), endpoint: {_actualEndpoint}");
63+
return client;
64+
}
65+
66+
/// <summary>
67+
/// Handles the INFO command, reporting cluster mode as enabled.
68+
/// </summary>
69+
protected override TypedRedisValue Info(RedisClient client, RedisRequest request)
70+
{
71+
// Override INFO to report cluster mode enabled
72+
var section = request.Count >= 2 ? request.GetString(1) : null;
73+
74+
// Return cluster-enabled info
75+
var infoResponse = section?.Equals("CLUSTER", StringComparison.OrdinalIgnoreCase) == true
76+
? "# Cluster\r\ncluster_enabled:1\r\n"
77+
: "# Server\r\nredis_version:7.0.0\r\n# Cluster\r\ncluster_enabled:1\r\n";
78+
79+
Log($"Returning INFO response (cluster_enabled:1), endpoint: {_actualEndpoint}");
80+
81+
return TypedRedisValue.BulkString(infoResponse);
82+
}
83+
84+
/// <summary>
85+
/// Handles CLUSTER commands, supporting SLOTS and NODES subcommands for cluster mode simulation.
86+
/// </summary>
87+
protected override TypedRedisValue Cluster(RedisClient client, RedisRequest request)
88+
{
89+
if (request.Count < 2)
90+
{
91+
return TypedRedisValue.Error("ERR wrong number of arguments for 'cluster' command");
92+
}
93+
94+
var subcommand = request.GetString(1);
95+
96+
// Handle CLUSTER SLOTS command to support cluster mode
97+
if (subcommand.Equals("SLOTS", StringComparison.OrdinalIgnoreCase))
98+
{
99+
Log($"Returning CLUSTER SLOTS response, endpoint: {_actualEndpoint}");
100+
return GetClusterSlotsResponse();
101+
}
102+
103+
// Handle CLUSTER NODES command
104+
if (subcommand.Equals("NODES", StringComparison.OrdinalIgnoreCase))
105+
{
106+
Log($"Returning CLUSTER NODES response, endpoint: {_actualEndpoint}");
107+
return GetClusterNodesResponse();
108+
}
109+
110+
return TypedRedisValue.Error($"ERR Unknown CLUSTER subcommand '{subcommand}'");
111+
}
112+
113+
/// <summary>
114+
/// Handles SET commands. Returns MOVED error for the trigger key when requested by clients
115+
/// connected to the old server, simulating a server migration behind a proxy/load balancer.
116+
/// </summary>
117+
protected override TypedRedisValue Set(RedisClient client, RedisRequest request)
118+
{
119+
var key = request.GetKey(1);
120+
121+
// Increment SET command counter for every SET call
122+
Interlocked.Increment(ref _setCmdCount);
123+
124+
// Get the client's assigned server host
125+
if (!_clientHostAssignments.TryGetValue(client, out var clientHost))
126+
{
127+
throw new InvalidOperationException("Client host assignment not found - this indicates a test infrastructure error");
128+
}
129+
130+
// Check if this is the trigger key from an old server client
131+
if (key == _triggerKey && clientHost == SimulatedHost.OldServer)
132+
{
133+
// Transition server to new host (so future connections route to new server)
134+
_currentServerHost = SimulatedHost.NewServer;
135+
136+
Interlocked.Increment(ref _movedResponseCount);
137+
var endpoint = _getEndpoint();
138+
Log($"Returning MOVED {_hashSlot} {endpoint} for key '{key}' from {clientHost} client, server transitioned to {SimulatedHost.NewServer}, actual endpoint: {_actualEndpoint}");
139+
140+
// Return MOVED error pointing to same endpoint
141+
return TypedRedisValue.Error($"MOVED {_hashSlot} {endpoint}");
142+
}
143+
144+
// Normal processing for new server clients or other keys
145+
Log($"Processing SET normally for key '{key}' from {clientHost} client, endpoint: {_actualEndpoint}");
146+
return base.Set(client, request);
147+
}
148+
149+
/// <summary>
150+
/// Returns a CLUSTER SLOTS response indicating this endpoint serves all slots (0-16383).
151+
/// </summary>
152+
private TypedRedisValue GetClusterSlotsResponse()
153+
{
154+
// Return a minimal CLUSTER SLOTS response indicating this endpoint serves all slots (0-16383)
155+
// Format: Array of slot ranges, each containing:
156+
// [start_slot, end_slot, [host, port, node_id]]
157+
if (_actualEndpoint == null)
158+
{
159+
return TypedRedisValue.Error("ERR endpoint not set");
160+
}
161+
162+
var endpoint = _getEndpoint();
163+
var parts = endpoint.Split(':');
164+
var host = parts.Length > 0 ? parts[0] : "127.0.0.1";
165+
var port = parts.Length > 1 ? parts[1] : "6379";
166+
167+
// Build response: [[0, 16383, [host, port, node-id]]]
168+
// Inner array: [host, port, node-id]
169+
var hostPortArray = TypedRedisValue.MultiBulk((ICollection<TypedRedisValue>)new[]
170+
{
171+
TypedRedisValue.BulkString(host),
172+
TypedRedisValue.Integer(int.Parse(port)),
173+
TypedRedisValue.BulkString("test-node-id"),
174+
});
175+
// Slot range: [start_slot, end_slot, [host, port, node-id]]
176+
var slotRange = TypedRedisValue.MultiBulk((ICollection<TypedRedisValue>)new[]
177+
{
178+
TypedRedisValue.Integer(0), // start slot
179+
TypedRedisValue.Integer(16383), // end slot
180+
hostPortArray,
181+
});
182+
183+
// Outer array containing the single slot range
184+
return TypedRedisValue.MultiBulk((ICollection<TypedRedisValue>)new[] { slotRange });
185+
}
186+
187+
/// <summary>
188+
/// Returns a CLUSTER NODES response.
189+
/// </summary>
190+
private TypedRedisValue GetClusterNodesResponse()
191+
{
192+
// Return CLUSTER NODES response
193+
// Format: node-id host:port@cport flags master - ping-sent pong-recv config-epoch link-state slot-range
194+
// Example: test-node-id 127.0.0.1:6379@16379 myself,master - 0 0 1 connected 0-16383
195+
if (_actualEndpoint == null)
196+
{
197+
return TypedRedisValue.Error("ERR endpoint not set");
198+
}
199+
200+
var endpoint = _getEndpoint();
201+
var nodesInfo = $"test-node-id {endpoint}@1{endpoint.Split(':')[1]} myself,master - 0 0 1 connected 0-16383\r\n";
202+
203+
return TypedRedisValue.BulkString(nodesInfo);
204+
}
205+
206+
/// <summary>
207+
/// Gets the number of SET commands executed.
208+
/// </summary>
209+
public int SetCmdCount => _setCmdCount;
210+
211+
/// <summary>
212+
/// Gets the number of times MOVED response was returned.
213+
/// </summary>
214+
public int MovedResponseCount => _movedResponseCount;
215+
216+
/// <summary>
217+
/// Gets the number of client connections established.
218+
/// </summary>
219+
public int ConnectionCount => _connectionCount;
220+
221+
/// <summary>
222+
/// Gets the actual endpoint the server is listening on.
223+
/// </summary>
224+
public EndPoint? ActualEndpoint => _actualEndpoint;
225+
226+
/// <summary>
227+
/// Sets the actual endpoint the server is listening on.
228+
/// This should be called externally after the server starts.
229+
/// </summary>
230+
public void SetActualEndpoint(EndPoint endPoint)
231+
{
232+
_actualEndpoint = endPoint;
233+
Log($"MovedTestServer endpoint set to {endPoint}");
234+
}
235+
236+
/// <summary>
237+
/// Resets all counters for test reusability.
238+
/// </summary>
239+
public void ResetCounters()
240+
{
241+
Interlocked.Exchange(ref _setCmdCount, 0);
242+
Interlocked.Exchange(ref _movedResponseCount, 0);
243+
Interlocked.Exchange(ref _connectionCount, 0);
244+
}
245+
}

0 commit comments

Comments
 (0)