Skip to content

Commit 27a381c

Browse files
frobionFrancois ROBION
andauthored
Allow heartbeat to restart the pipe thread with only sync commands (#2965)
* Allow heartbeat to restart the pipe thread with only sync commands There is a thread looping in the method PhysicalConnection.ReadFromPipe to process response from Redis, match them with the sent command and signaling the completion of the message. If this thread has an exception, its catch block will call RecordConnectionFailed which will proceed to restart a new thread to continue reading Redis responses. However, if another exception occurred in the catch before the new thread can be started (in a case of high memory pressure, OOM exceptions can happen anywhere) we are in a state where no one is reading the pipe of Redis responses, and all commands sent end in timeout. If at least one async command is sent, the heartbeat thread will detect the timeout in the OnBridgeHeartbeat method, and if no read were perform for 4 heartbeat it will issue a connection failure. With this commit, this becomes true for sync commands as well. Therefore, it ensures we will not reach a state were all commands end in timeout. * Compare count of sync commands timeouted with sync timeout --------- Co-authored-by: Francois ROBION <francois.robion@esker.com>
1 parent b1a60d5 commit 27a381c

File tree

2 files changed

+18
-9
lines changed

2 files changed

+18
-9
lines changed

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
628628
Interlocked.Exchange(ref connectTimeoutRetryCount, 0);
629629
tmp.BridgeCouldBeNull?.ServerEndPoint?.ClearUnselectable(UnselectableFlags.DidNotRespond);
630630
}
631-
int timedOutThisHeartbeat = tmp.OnBridgeHeartbeat();
631+
tmp.OnBridgeHeartbeat(out int asyncTimeoutThisHeartbeat, out int syncTimeoutThisHeartbeat);
632632
int writeEverySeconds = ServerEndPoint.WriteEverySeconds;
633633
bool configCheckDue = ServerEndPoint.ConfigCheckSeconds > 0 && ServerEndPoint.LastInfoReplicationCheckSecondsAgo >= ServerEndPoint.ConfigCheckSeconds;
634634

@@ -677,14 +677,16 @@ internal void OnHeartbeat(bool ifConnectedOnly)
677677
}
678678

679679
// This is an "always" check - we always want to evaluate a dead connection from a non-responsive sever regardless of the need to heartbeat above
680-
if (timedOutThisHeartbeat > 0
681-
&& tmp.LastReadSecondsAgo * 1_000 > (tmp.BridgeCouldBeNull?.Multiplexer.AsyncTimeoutMilliseconds * 4))
680+
var totalTimeoutThisHeartbeat = asyncTimeoutThisHeartbeat + syncTimeoutThisHeartbeat;
681+
bool deadConnectionOnAsync = asyncTimeoutThisHeartbeat > 0 && tmp.LastReadSecondsAgo * 1_000 > (tmp.BridgeCouldBeNull?.Multiplexer.AsyncTimeoutMilliseconds * 4);
682+
bool deadConnectionOnSync = syncTimeoutThisHeartbeat > 0 && tmp.LastReadSecondsAgo * 1_000 > (tmp.BridgeCouldBeNull?.Multiplexer.TimeoutMilliseconds * 4);
683+
if (deadConnectionOnAsync || deadConnectionOnSync)
682684
{
683685
// If we've received *NOTHING* on the pipe in 4 timeouts worth of time and we're timing out commands, issue a connection failure so that we reconnect
684686
// This is meant to address the scenario we see often in Linux configs where TCP retries will happen for 15 minutes.
685687
// To us as a client, we'll see the socket as green/open/fine when writing but we'll bet getting nothing back.
686688
// Since we can't depend on the pipe to fail in that case, we want to error here based on the criteria above so we reconnect broken clients much faster.
687-
tmp.BridgeCouldBeNull?.Multiplexer.Logger?.LogWarningDeadSocketDetected(tmp.LastReadSecondsAgo, timedOutThisHeartbeat);
689+
tmp.BridgeCouldBeNull?.Multiplexer.Logger?.LogWarningDeadSocketDetected(tmp.LastReadSecondsAgo, totalTimeoutThisHeartbeat);
688690
OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out _, out State oldState);
689691
tmp.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely
690692
}

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -745,10 +745,12 @@ internal void GetStormLog(StringBuilder sb)
745745
/// <summary>
746746
/// Runs on every heartbeat for a bridge, timing out any commands that are overdue and returning an integer of how many we timed out.
747747
/// </summary>
748-
/// <returns>How many commands were overdue and threw timeout exceptions.</returns>
749-
internal int OnBridgeHeartbeat()
748+
/// <param name="asyncTimeoutDetected">How many async commands were overdue and threw timeout exceptions.</param>
749+
/// <param name="syncTimeoutDetected">How many sync commands were overdue. No exception are thrown for these commands here.</param>
750+
internal void OnBridgeHeartbeat(out int asyncTimeoutDetected, out int syncTimeoutDetected)
750751
{
751-
var result = 0;
752+
asyncTimeoutDetected = 0;
753+
syncTimeoutDetected = 0;
752754
var now = Environment.TickCount;
753755
Interlocked.Exchange(ref lastBeatTickCount, now);
754756

@@ -775,7 +777,13 @@ internal int OnBridgeHeartbeat()
775777
multiplexer.OnMessageFaulted(msg, timeoutEx);
776778
msg.SetExceptionAndComplete(timeoutEx, bridge); // tell the message that it is doomed
777779
multiplexer.OnAsyncTimeout();
778-
result++;
780+
asyncTimeoutDetected++;
781+
}
782+
else
783+
{
784+
// Only count how many sync timeouts we detect here.
785+
// The actual timeout is handled in ConnectionMultiplexer.ExecuteSyncImpl().
786+
syncTimeoutDetected++;
779787
}
780788
else if (msg.IsHandshakeCompletion)
781789
{
@@ -796,7 +804,6 @@ internal int OnBridgeHeartbeat()
796804
}
797805
}
798806
}
799-
return result;
800807
}
801808

802809
internal void OnInternalError(Exception exception, [CallerMemberName] string? origin = null)

0 commit comments

Comments
 (0)