Skip to content

Commit 3ef0120

Browse files
committed
Add NeedsReconnect flag to defer reconnection to reader loop for MOVED-to-same-endpoint
1 parent 93609b0 commit 3ef0120

4 files changed

Lines changed: 24 additions & 5 deletions

File tree

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ internal sealed class PhysicalBridge : IDisposable
4848
private int failConnectCount = 0;
4949
private volatile bool isDisposed;
5050
private volatile bool shouldResetConnectionRetryCount;
51+
private bool _needsReconnect;
5152
private long nonPreferredEndpointCount;
5253

5354
// private volatile int missedHeartbeats;
@@ -131,6 +132,16 @@ public enum State : byte
131132
private RedisProtocol _protocol; // note starts at zero, not RESP2
132133
internal void SetProtocol(RedisProtocol protocol) => _protocol = protocol;
133134

135+
/// <summary>
136+
/// Indicates whether the bridge needs to reconnect.
137+
/// </summary>
138+
internal bool NeedsReconnect => Volatile.Read(ref _needsReconnect);
139+
140+
/// <summary>
141+
/// Marks that the bridge needs to reconnect.
142+
/// </summary>
143+
internal void MarkNeedsReconnect() => Volatile.Write(ref _needsReconnect, true);
144+
134145
public void Dispose()
135146
{
136147
isDisposed = true;
@@ -210,7 +221,7 @@ private WriteResult FailDueToNoConnection(Message message)
210221
public WriteResult TryWriteSync(Message message, bool isReplica)
211222
{
212223
if (isDisposed) throw new ObjectDisposedException(Name);
213-
if (!IsConnected) return QueueOrFailMessage(message);
224+
if (!IsConnected || NeedsReconnect) return QueueOrFailMessage(message);
214225

215226
var physical = this.physical;
216227
if (physical == null)
@@ -234,7 +245,7 @@ public WriteResult TryWriteSync(Message message, bool isReplica)
234245
public ValueTask<WriteResult> TryWriteAsync(Message message, bool isReplica, bool bypassBacklog = false)
235246
{
236247
if (isDisposed) throw new ObjectDisposedException(Name);
237-
if (!IsConnected && !bypassBacklog) return new ValueTask<WriteResult>(QueueOrFailMessage(message));
248+
if ((!IsConnected || NeedsReconnect) && !bypassBacklog) return new ValueTask<WriteResult>(QueueOrFailMessage(message));
238249

239250
var physical = this.physical;
240251
if (physical == null)
@@ -1458,6 +1469,8 @@ private bool ChangeState(State oldState, State newState)
14581469
Multiplexer.Trace("Connecting...", Name);
14591470
if (ChangeState(State.Disconnected, State.Connecting))
14601471
{
1472+
// Clear the reconnect flag as we're starting a new connection
1473+
Volatile.Write(ref _needsReconnect, false);
14611474
Interlocked.Increment(ref socketCount);
14621475
Interlocked.Exchange(ref connectStartTicks, Environment.TickCount);
14631476
// separate creation and connection for case when connection completes synchronously

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2091,9 +2091,9 @@ private async Task ReadFromPipe()
20912091
Trace($"Processed {handled} messages");
20922092
input.AdvanceTo(buffer.Start, buffer.End);
20932093

2094-
if (handled == 0 && readResult.IsCompleted)
2094+
if ((handled == 0 && readResult.IsCompleted) || BridgeCouldBeNull?.NeedsReconnect == true)
20952095
{
2096-
break; // no more data, or trailing incomplete messages
2096+
break; // no more data, trailing incomplete messages, or reconnection required
20972097
}
20982098
}
20992099
Trace("EOF");

src/StackExchange.Redis/ResultProcessor.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,8 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
267267
// This occurs when Redis/Valkey servers are behind DNS records, load balancers, or proxies.
268268
// The MOVED error signals that the client should reconnect to allow the DNS/proxy/load balancer
269269
// to route the connection to a different underlying server host, then retry the command.
270-
bridge?.TryConnect(null)?.Dispose();
270+
// Mark the bridge to reconnect - reader loop will handle disconnection and reconnection.
271+
bridge?.MarkNeedsReconnect();
271272
}
272273
if (bridge is null)
273274
{

tests/StackExchange.Redis.Tests/MovedToSameEndpointTests.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ public async Task MovedToSameEndpoint_TriggersReconnectAndRetry_CommandSucceeds(
7777
};
7878

7979
await using var conn = await ConnectionMultiplexer.ConnectAsync(config);
80+
// Ping the server to ensure it's responsive
81+
var server = conn.GetServer(listenEndpoint);
82+
await server.PingAsync();
83+
// Verify server is detected as cluster mode
84+
Assert.Equal(ServerType.Cluster, server.ServerType);
8085
var db = conn.GetDatabase();
8186

8287
// Record baseline counters after initial connection

0 commit comments

Comments
 (0)