Skip to content

Commit 64a98ea

Browse files
committed
Handle MOVED error pointing to same endpoint by triggering reconnection before retrying the request.
1 parent 84b015e commit 64a98ea

4 files changed

Lines changed: 359 additions & 28 deletions

File tree

src/StackExchange.Redis/ResultProcessor.cs

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -259,43 +259,49 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
259259
if (Format.TryParseInt32(parts[1], out int hashSlot)
260260
&& Format.TryParseEndPoint(parts[2], out var endpoint))
261261
{
262-
// no point sending back to same server, and no point sending to a dead server
263-
if (!Equals(server?.EndPoint, endpoint))
262+
// Check if MOVED points to same endpoint
263+
bool isSameEndpoint = Equals(server?.EndPoint, endpoint);
264+
if (isSameEndpoint)
264265
{
265-
if (bridge is null)
266-
{
267-
// already toast
268-
}
269-
else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
266+
// MOVED to same endpoint detected.
267+
// This occurs when Redis/Valkey servers are behind DNS records, load balancers, or proxies.
268+
// The MOVED error signals that the client should reconnect to allow the DNS/proxy/load balancer
269+
// to route the connection to a different underlying server host, then retry the command.
270+
bridge?.TryConnect(null)?.Dispose();
271+
}
272+
if (bridge is null)
273+
{
274+
// already toast
275+
}
276+
else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
277+
{
278+
bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK");
279+
return false;
280+
}
281+
else
282+
{
283+
if (isMoved && wasNoRedirect)
270284
{
271-
bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK");
272-
return false;
285+
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
286+
{
287+
err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. ";
288+
}
289+
else
290+
{
291+
err = "Key has MOVED but CommandFlags.NoRedirect was specified - redirect not followed. ";
292+
}
273293
}
274294
else
275295
{
276-
if (isMoved && wasNoRedirect)
296+
unableToConnectError = true;
297+
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
277298
{
278-
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
279-
{
280-
err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. ";
281-
}
282-
else
283-
{
284-
err = "Key has MOVED but CommandFlags.NoRedirect was specified - redirect not followed. ";
285-
}
299+
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "
300+
+ PerfCounterHelper.GetThreadPoolAndCPUSummary();
286301
}
287302
else
288303
{
289-
unableToConnectError = true;
290-
if (bridge.Multiplexer.RawConfig.IncludeDetailInExceptions)
291-
{
292-
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "
293-
+ PerfCounterHelper.GetThreadPoolAndCPUSummary();
294-
}
295-
else
296-
{
297-
err = "Endpoint is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
298-
}
304+
err = "Endpoint is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
299305
}
300306
}
301307
}
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Net;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using StackExchange.Redis.Server;
7+
8+
namespace StackExchange.Redis.Tests;
9+
10+
/// <summary>
11+
/// Test Redis/Valkey server that simulates MOVED errors pointing to the same endpoint.
12+
/// Used to verify client reconnection behavior when the server is behind DNS/load balancers/proxies.
13+
/// When a MOVED error points to the same endpoint, it signals the client to reconnect before retrying the command,
14+
/// allowing the DNS record/proxy/load balancer to route the connection to a different underlying server host.
15+
/// </summary>
16+
public class MovedTestServer : MemoryCacheRedisServer
17+
{
18+
private int _setCmdCount = 0;
19+
private int _movedResponseCount = 0;
20+
private int _connectionCount = 0;
21+
private readonly Func<string> _getEndpoint;
22+
private readonly string _triggerKey;
23+
private readonly int _hashSlot;
24+
private EndPoint? _actualEndpoint;
25+
26+
public MovedTestServer(Func<string> getEndpoint, string triggerKey = "testkey", int hashSlot = 12345)
27+
{
28+
_getEndpoint = getEndpoint;
29+
_triggerKey = triggerKey;
30+
_hashSlot = hashSlot;
31+
}
32+
33+
/// <summary>
34+
/// Called when a new client connection is established. Increments the connection counter.
35+
/// </summary>
36+
public override RedisClient CreateClient()
37+
{
38+
Interlocked.Increment(ref _connectionCount);
39+
Log($"New client connection established (total connections: {_connectionCount}), endpoint: {_actualEndpoint}");
40+
return base.CreateClient();
41+
}
42+
43+
/// <summary>
44+
/// Handles the INFO command, reporting cluster mode as enabled.
45+
/// </summary>
46+
protected override TypedRedisValue Info(RedisClient client, RedisRequest request)
47+
{
48+
// Override INFO to report cluster mode enabled
49+
var section = request.Count >= 2 ? request.GetString(1) : null;
50+
51+
// Return cluster-enabled info
52+
var infoResponse = section?.Equals("CLUSTER", StringComparison.OrdinalIgnoreCase) == true
53+
? "# Cluster\r\ncluster_enabled:1\r\n"
54+
: "# Server\r\nredis_version:7.0.0\r\n# Cluster\r\ncluster_enabled:1\r\n";
55+
56+
Log($"Returning INFO response (cluster_enabled:1), endpoint: {_actualEndpoint}");
57+
58+
return TypedRedisValue.BulkString(infoResponse);
59+
}
60+
61+
/// <summary>
62+
/// Handles CLUSTER commands, supporting SLOTS and NODES subcommands for cluster mode simulation.
63+
/// </summary>
64+
protected override TypedRedisValue Cluster(RedisClient client, RedisRequest request)
65+
{
66+
if (request.Count < 2)
67+
{
68+
return TypedRedisValue.Error("ERR wrong number of arguments for 'cluster' command");
69+
}
70+
71+
var subcommand = request.GetString(1);
72+
73+
// Handle CLUSTER SLOTS command to support cluster mode
74+
if (subcommand.Equals("SLOTS", StringComparison.OrdinalIgnoreCase))
75+
{
76+
Log($"Returning CLUSTER SLOTS response, endpoint: {_actualEndpoint}");
77+
return GetClusterSlotsResponse();
78+
}
79+
80+
// Handle CLUSTER NODES command
81+
if (subcommand.Equals("NODES", StringComparison.OrdinalIgnoreCase))
82+
{
83+
Log($"Returning CLUSTER NODES response, endpoint: {_actualEndpoint}");
84+
return GetClusterNodesResponse();
85+
}
86+
87+
return TypedRedisValue.Error($"ERR Unknown CLUSTER subcommand '{subcommand}'");
88+
}
89+
90+
/// <summary>
91+
/// Handles SET commands. Returns MOVED error on first attempt for the trigger key,
92+
/// then processes normally on subsequent attempts.
93+
/// </summary>
94+
protected override TypedRedisValue Set(RedisClient client, RedisRequest request)
95+
{
96+
var key = request.GetKey(1);
97+
98+
// Only trigger MOVED on FIRST attempt for the trigger key
99+
if (key == _triggerKey && Interlocked.Increment(ref _setCmdCount) == 1)
100+
{
101+
Interlocked.Increment(ref _movedResponseCount);
102+
var endpoint = _getEndpoint();
103+
Log($"Returning MOVED {_hashSlot} {endpoint} for key '{key}', actual endpoint: {_actualEndpoint}");
104+
105+
// Return MOVED error pointing to same endpoint
106+
// Don't close the connection - let the client handle reconnection naturally
107+
return TypedRedisValue.Error($"MOVED {_hashSlot} {endpoint}");
108+
}
109+
110+
// Normal processing on retry or other keys
111+
Log($"Processing SET normally for key '{key}', endpoint: {_actualEndpoint}");
112+
return base.Set(client, request);
113+
}
114+
115+
/// <summary>
116+
/// Returns a CLUSTER SLOTS response indicating this endpoint serves all slots (0-16383).
117+
/// </summary>
118+
private TypedRedisValue GetClusterSlotsResponse()
119+
{
120+
// Return a minimal CLUSTER SLOTS response indicating this endpoint serves all slots (0-16383)
121+
// Format: Array of slot ranges, each containing:
122+
// [start_slot, end_slot, [host, port, node_id]]
123+
if (_actualEndpoint == null)
124+
{
125+
return TypedRedisValue.Error("ERR endpoint not set");
126+
}
127+
128+
var endpoint = _getEndpoint();
129+
var parts = endpoint.Split(':');
130+
var host = parts.Length > 0 ? parts[0] : "127.0.0.1";
131+
var port = parts.Length > 1 ? parts[1] : "6379";
132+
133+
// Build response: [[0, 16383, [host, port, node-id]]]
134+
// Inner array: [host, port, node-id]
135+
var hostPortArray = TypedRedisValue.MultiBulk((ICollection<TypedRedisValue>)new[]
136+
{
137+
TypedRedisValue.BulkString(host),
138+
TypedRedisValue.Integer(int.Parse(port)),
139+
TypedRedisValue.BulkString("test-node-id"),
140+
});
141+
// Slot range: [start_slot, end_slot, [host, port, node-id]]
142+
var slotRange = TypedRedisValue.MultiBulk((ICollection<TypedRedisValue>)new[]
143+
{
144+
TypedRedisValue.Integer(0), // start slot
145+
TypedRedisValue.Integer(16383), // end slot
146+
hostPortArray,
147+
});
148+
149+
// Outer array containing the single slot range
150+
return TypedRedisValue.MultiBulk((ICollection<TypedRedisValue>)new[] { slotRange });
151+
}
152+
153+
/// <summary>
154+
/// Returns a CLUSTER NODES response.
155+
/// </summary>
156+
private TypedRedisValue GetClusterNodesResponse()
157+
{
158+
// Return CLUSTER NODES response
159+
// Format: node-id host:port@cport flags master - ping-sent pong-recv config-epoch link-state slot-range
160+
// Example: test-node-id 127.0.0.1:6379@16379 myself,master - 0 0 1 connected 0-16383
161+
if (_actualEndpoint == null)
162+
{
163+
return TypedRedisValue.Error("ERR endpoint not set");
164+
}
165+
166+
var endpoint = _getEndpoint();
167+
var nodesInfo = $"test-node-id {endpoint}@1{endpoint.Split(':')[1]} myself,master - 0 0 1 connected 0-16383\r\n";
168+
169+
return TypedRedisValue.BulkString(nodesInfo);
170+
}
171+
172+
/// <summary>
173+
/// Gets the number of SET commands executed.
174+
/// </summary>
175+
public int SetCmdCount => _setCmdCount;
176+
177+
/// <summary>
178+
/// Gets the number of times MOVED response was returned.
179+
/// </summary>
180+
public int MovedResponseCount => _movedResponseCount;
181+
182+
/// <summary>
183+
/// Gets the number of client connections established.
184+
/// </summary>
185+
public int ConnectionCount => _connectionCount;
186+
187+
/// <summary>
188+
/// Gets the actual endpoint the server is listening on.
189+
/// </summary>
190+
public EndPoint? ActualEndpoint => _actualEndpoint;
191+
192+
/// <summary>
193+
/// Sets the actual endpoint the server is listening on.
194+
/// This should be called externally after the server starts.
195+
/// </summary>
196+
public void SetActualEndpoint(EndPoint endPoint)
197+
{
198+
_actualEndpoint = endPoint;
199+
Log($"MovedTestServer endpoint set to {endPoint}");
200+
}
201+
202+
/// <summary>
203+
/// Resets all counters for test reusability.
204+
/// </summary>
205+
public void ResetCounters()
206+
{
207+
Interlocked.Exchange(ref _setCmdCount, 0);
208+
Interlocked.Exchange(ref _movedResponseCount, 0);
209+
Interlocked.Exchange(ref _connectionCount, 0);
210+
}
211+
}

0 commit comments

Comments
 (0)