diff --git a/hosting/Windows/Garnet.worker/Program.cs b/hosting/Windows/Garnet.worker/Program.cs index 8418da86716..2ad736b8c53 100644 --- a/hosting/Windows/Garnet.worker/Program.cs +++ b/hosting/Windows/Garnet.worker/Program.cs @@ -1,16 +1,35 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +using System; using Garnet; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; class Program { + // Data finalization (AOF commit / checkpoint) uses up to 15 seconds internally (see GarnetServer.FinalizeDataAsync). + // Add this buffer on top of the connection-drain timeout so the host shutdown budget covers the full shutdown sequence. + private const int DataFinalizationBufferSeconds = 20; + static void Main(string[] args) { + // Pre-parse only the shutdown-timeout argument so we can configure both + // the host shutdown budget and the Worker's connection-drain timeout from a single value. + var shutdownTimeoutSeconds = ParseShutdownTimeoutSeconds(args, defaultSeconds: 5); + var shutdownTimeout = TimeSpan.FromSeconds(shutdownTimeoutSeconds); + var builder = Host.CreateApplicationBuilder(args); - builder.Services.AddHostedService(_ => new Worker(args)); + + // Tell the .NET host (and the Windows SCM via WindowsServiceLifetime) how long to wait + // before forcibly killing the process. We add DataFinalizationBufferSeconds so that AOF + // commit / checkpoint can complete after connection draining finishes. + builder.Services.Configure(opts => + { + opts.ShutdownTimeout = shutdownTimeout + TimeSpan.FromSeconds(DataFinalizationBufferSeconds); + }); + + builder.Services.AddHostedService(_ => new Worker(args, shutdownTimeout)); builder.Services.AddWindowsService(options => { @@ -20,4 +39,22 @@ static void Main(string[] args) var host = builder.Build(); host.Run(); } + + /// + /// Scans for --shutdown-timeout <value> and returns + /// the parsed integer, or if the argument is absent or invalid. + /// This lightweight pre-parse avoids a full CommandLineParser pass before the host is built. + /// + private static int ParseShutdownTimeoutSeconds(string[] args, int defaultSeconds) + { + for (var i = 0; i < args.Length - 1; i++) + { + if (args[i] is "--shutdown-timeout" or "-shutdown-timeout" && + int.TryParse(args[i + 1], out var value) && value > 0) + { + return value; + } + } + return defaultSeconds; + } } \ No newline at end of file diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs index d69adb7e3c0..2de54e52200 100644 --- a/hosting/Windows/Garnet.worker/Worker.cs +++ b/hosting/Windows/Garnet.worker/Worker.cs @@ -12,12 +12,20 @@ public class Worker : BackgroundService { private bool _isDisposed = false; private readonly string[] args; + private readonly TimeSpan _shutdownTimeout; private GarnetServer server; - public Worker(string[] args) + /// Command line arguments forwarded to . + /// + /// How long to wait for active connections to drain during graceful shutdown. + /// Must be less than the host + /// so that data finalization (AOF commit / checkpoint) can also complete within the host budget. + /// + public Worker(string[] args, TimeSpan shutdownTimeout) { this.args = args; + _shutdownTimeout = shutdownTimeout; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -43,8 +51,26 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) /// Indicates that the shutdown process should no longer be graceful. public override async Task StopAsync(CancellationToken cancellationToken) { - Dispose(); - await base.StopAsync(cancellationToken).ConfigureAwait(false); + try + { + if (server != null) + { + // If cancellation is requested, we will skip the graceful shutdown and proceed to dispose immediately + bool isForceShutdown = cancellationToken.IsCancellationRequested; + // Perform graceful shutdown with AOF commit and checkpoint when not forced Shutdown From OS. + await server.ShutdownAsync(timeout: _shutdownTimeout, noSave: isForceShutdown, token: cancellationToken).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + // Force shutdown requested - proceed to dispose + } + finally + { + // Ensure base class cleanup although cancellationToken is cancelled + await base.StopAsync(CancellationToken.None).ConfigureAwait(false); + Dispose(); + } } public override void Dispose() @@ -55,6 +81,8 @@ public override void Dispose() } server?.Dispose(); _isDisposed = true; + base.Dispose(); + GC.SuppressFinalize(this); } } } \ No newline at end of file diff --git a/libs/host/Configuration/Options.cs b/libs/host/Configuration/Options.cs index b70264e363b..f41d2a4ff9e 100644 --- a/libs/host/Configuration/Options.cs +++ b/libs/host/Configuration/Options.cs @@ -406,6 +406,11 @@ internal sealed class Options : ICloneable [Option("network-connection-limit", Required = false, HelpText = "Maximum number of simultaneously active network connections.")] public int NetworkConnectionLimit { get; set; } + [IntRangeValidation(1, int.MaxValue)] + [Option("shutdown-timeout", Required = false, HelpText = "Timeout in seconds to wait for active connections to drain during graceful shutdown. " + + "The Windows SCM default pre-kill wait is 5 seconds, so values below 5 are not recommended when running as a Windows service.")] + public int ShutdownTimeoutSeconds { get; set; } + [OptionValidation] [Option("use-azure-storage", Required = false, HelpText = "Use Azure Page Blobs for storage instead of local storage.")] public bool? UseAzureStorage { get; set; } @@ -938,6 +943,7 @@ endpoint is IPEndPoint listenEp && clusterAnnounceEndpoint[0] is IPEndPoint anno ThreadPoolMinIOCompletionThreads = ThreadPoolMinIOCompletionThreads, ThreadPoolMaxIOCompletionThreads = ThreadPoolMaxIOCompletionThreads, NetworkConnectionLimit = NetworkConnectionLimit, + ShutdownTimeoutSeconds = ShutdownTimeoutSeconds, DeviceFactoryCreator = deviceType == DeviceType.AzureStorage ? azureFactoryCreator() : new LocalStorageNamedDeviceFactoryCreator(deviceType: deviceType, logger: logger), CheckpointThrottleFlushDelayMs = CheckpointThrottleFlushDelayMs, diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index 4533decc555..4bbe8df8adc 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -10,6 +10,7 @@ using System.Runtime.InteropServices; using System.Text; using System.Threading; +using System.Threading.Tasks; using Garnet.cluster; using Garnet.common; using Garnet.networking; @@ -450,6 +451,212 @@ public void Start() Console.WriteLine("* Ready to accept connections"); } + /// + /// Performs graceful shutdown of the server. + /// Stops accepting new connections, waits for active connections to complete, commits AOF, and takes checkpoint if needed. + /// + /// Timeout for waiting on active connections (default: 30 seconds) + /// If true, skip data persistence (AOF commit and checkpoint) during shutdown + /// Cancellation token + /// Task representing the async shutdown operation + public async Task ShutdownAsync(TimeSpan? timeout = null, bool noSave = false, CancellationToken token = default) + { + var shutdownTimeout = timeout ?? TimeSpan.FromSeconds(30); + + try + { + // Quiesce existing sessions first: they will reject the next incoming message + // and close themselves, so FinalizeDataAsync runs with no concurrent writers. + if (servers != null) + { + foreach (var server in servers) + server.BeginQuiesce(); + } + + // Quiesce pub/sub fan-out so no new messages are delivered after this point. + subscribeBroker?.BeginQuiesce(); + + // Stop accepting new connections. + StopListening(); + + // Wait for existing connections to complete (cancellable) + try + { + await WaitForActiveConnectionsAsync(shutdownTimeout, token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + logger?.LogWarning("Connection draining was cancelled. Proceeding with data finalization..."); + } + } + catch (Exception ex) + { + logger?.LogError(ex, "Error during graceful shutdown"); + } + finally + { + if (!noSave) + { + // Attempt AOF commit or checkpoint as best-effort, + // even if connection draining was cancelled or failed. + // Use a bounded timeout instead of the caller's token to ensure completion. + using var finalizeCts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + try + { + await FinalizeDataAsync(finalizeCts.Token).ConfigureAwait(false); + } + catch (Exception ex) + { + logger?.LogError(ex, "Error during data finalization"); + } + } + else + { + logger?.LogInformation("Shutdown with noSave flag - skipping data persistence."); + } + } + } + + /// + /// Stop all servers from accepting new connections. + /// + private void StopListening() + { + if (servers == null) return; + + logger?.LogDebug("Stopping listeners to prevent new connections..."); + foreach (var server in servers) + { + try + { + server?.StopListening(); + } + catch (Exception ex) + { + logger?.LogWarning(ex, "Error stopping listener"); + } + } + } + + /// + /// Waits for active connections to complete within the specified timeout. + /// + private async Task WaitForActiveConnectionsAsync(TimeSpan timeout, CancellationToken token) + { + if (servers == null) return; + + // Linked Token : between external token and timeout + using var cts = CancellationTokenSource.CreateLinkedTokenSource(token); + cts.CancelAfter(timeout); + + var delays = new[] { 50, 300, 1000 }; + var delayIndex = 0; + + try + { + while (!cts.Token.IsCancellationRequested) + { + var activeConnections = GetActiveConnectionCount(); + if (activeConnections == 0) + { + logger?.LogInformation("All connections have been closed gracefully."); + return; + } + + logger?.LogInformation("Waiting for {ActiveConnections} active connections to complete...", activeConnections); + + var currentDelay = delays[delayIndex]; + if (delayIndex < delays.Length - 1) delayIndex++; + + await Task.Delay(currentDelay, cts.Token).ConfigureAwait(false); + } + } + catch (OperationCanceledException) when (token.IsCancellationRequested) + { + throw; + } + catch (OperationCanceledException) + { + // timeout reached error logging + logger?.LogWarning("Timeout reached after {TimeoutSeconds} seconds. Some connections may still be active.", + timeout.TotalSeconds); + } + catch (Exception ex) + { + logger?.LogWarning(ex, "Error checking active connections"); + await Task.Delay(500, token).ConfigureAwait(false); + } + } + + /// + /// Gets the current number of active connections directly from server instances. + /// + private long GetActiveConnectionCount() + { + long count = 0; + if (servers != null) + { + foreach (var garnetServer in servers) + { + if (garnetServer is GarnetServerBase garnetServerBase) + { + count += garnetServerBase.get_conn_active(); + } + } + } + return count; + } + + /// + /// Persists data during shutdown using AOF or checkpoint based on configuration. + /// + private async Task FinalizeDataAsync(CancellationToken token) + { + if (opts.EnableAOF) + { + logger?.LogDebug("Committing AOF before shutdown..."); + try + { + var commitSuccess = await Store.CommitAOFAsync(token).ConfigureAwait(false); + if (commitSuccess) + { + logger?.LogDebug("AOF committed successfully."); + } + else + { + logger?.LogInformation("AOF commit skipped (another commit in progress or replica mode)."); + } + } + catch (Exception ex) + { + logger?.LogError(ex, "Error committing AOF during shutdown"); + } + + return; + } + + if (!opts.EnableStorageTier) + return; + + logger?.LogDebug("Taking checkpoint for tiered storage..."); + try + { + var checkpointSuccess = await Store.TakeCheckpointAsync(background: false, token: token).ConfigureAwait(false); + if (checkpointSuccess) + { + logger?.LogDebug("Checkpoint completed successfully."); + } + else + { + logger?.LogInformation("Checkpoint skipped (another checkpoint in progress or replica mode)."); + } + } + catch (Exception ex) + { + logger?.LogError(ex, "Error taking checkpoint during shutdown"); + } + } + /// /// Dispose store (including log and checkpoint directory) /// diff --git a/libs/host/defaults.conf b/libs/host/defaults.conf index b331c3a3b4d..446879df9c2 100644 --- a/libs/host/defaults.conf +++ b/libs/host/defaults.conf @@ -296,6 +296,9 @@ /* Maximum number of simultaneously active network connections. */ "NetworkConnectionLimit" : -1, + /* Timeout in seconds to wait for active connections to drain during graceful shutdown. */ + "ShutdownTimeoutSeconds" : 5, + /* Use Azure Page Blobs for storage instead of local storage. */ "UseAzureStorage" : false, diff --git a/libs/server/PubSub/SubscribeBroker.cs b/libs/server/PubSub/SubscribeBroker.cs index 79c891ec7b4..0e8e795765e 100644 --- a/libs/server/PubSub/SubscribeBroker.cs +++ b/libs/server/PubSub/SubscribeBroker.cs @@ -20,6 +20,7 @@ public sealed class SubscribeBroker : IDisposable, ILogEntryConsumer { int sid = 0; bool initialized = false; + volatile bool isQuiescing = false; ConcurrentDictionary> subscriptions; ReadOptimizedConcurrentSet patternSubscriptions; readonly TsavoriteLog log; @@ -49,6 +50,13 @@ public SubscribeBroker(string logDir, long pageSize, int subscriberRefreshFreque this.logger = logger; } + /// + /// Signal the broker to stop delivering messages to subscribers. + /// Any or calls made after this + /// are silently dropped, ensuring no new fan-out occurs during shutdown quiesce. + /// + public void BeginQuiesce() => isQuiescing = true; + /// /// Remove all subscriptions for a session, /// called during dispose of server session @@ -297,6 +305,7 @@ public unsafe List ListAllPatternSubscriptions(ServerSessionBa /// Number of subscribers notified public unsafe int PublishNow(ArgSlice key, ArgSlice value) { + if (isQuiescing) return 0; if (subscriptions == null && patternSubscriptions == null) return 0; return Broadcast(key, value); } @@ -308,6 +317,7 @@ public unsafe int PublishNow(ArgSlice key, ArgSlice value) /// value that has been updated public unsafe void Publish(ArgSlice key, ArgSlice value) { + if (isQuiescing) return; if (subscriptions == null && patternSubscriptions == null) return; var keySB = key.SpanByte; diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index f91c2bfcd45..f5665d025bd 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -468,6 +468,20 @@ internal bool CanRunModule() public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived) { + // Reject new commands when the server is quiescing for shutdown. + // Complete any in-flight operation is not possible here (this is the entry point before parsing), + // so we send a LOADING error and dispose the connection immediately. + if (server is GarnetServerBase { IsQuiescing: true }) + { + networkSender.EnterAndGetResponseObject(out dcurr, out dend); + while (!RespWriteUtils.TryWriteError("LOADING Garnet is shutting down"u8, ref dcurr, dend)) + SendAndReset(); + Send(networkSender.GetResponseObjectHead()); + networkSender.ExitAndReturnResponseObject(); + networkSender.DisposeNetworkSender(true); + return 0; + } + bytesRead = bytesReceived; if (!txnManager.IsSkippingOperations()) readHead = 0; diff --git a/libs/server/Servers/GarnetServerBase.cs b/libs/server/Servers/GarnetServerBase.cs index b326c9f3d30..71fc42b85f4 100644 --- a/libs/server/Servers/GarnetServerBase.cs +++ b/libs/server/Servers/GarnetServerBase.cs @@ -49,6 +49,17 @@ public abstract class GarnetServerBase : IGarnetServer /// public bool Disposed { get; set; } + /// + /// Backing field for . + /// + int isQuiescing = 0; + + /// + public bool IsQuiescing => isQuiescing != 0; + + /// + public void BeginQuiesce() => Interlocked.Exchange(ref isQuiescing, 1); + /// /// Logger /// @@ -154,6 +165,12 @@ public bool AddSession(WireFormat protocol, ref ISessionProvider provider, INetw /// public abstract void Start(); + /// + public virtual void StopListening() + { + // Base implementation does nothing; derived classes should override + } + /// public abstract void Close(); diff --git a/libs/server/Servers/GarnetServerOptions.cs b/libs/server/Servers/GarnetServerOptions.cs index ad3c0db7846..ec81e255aa5 100644 --- a/libs/server/Servers/GarnetServerOptions.cs +++ b/libs/server/Servers/GarnetServerOptions.cs @@ -307,6 +307,13 @@ public class GarnetServerOptions : ServerOptions /// public int NetworkConnectionLimit = -1; + /// + /// Timeout in seconds for graceful shutdown, used to wait for active connections to drain. + /// The Windows Service Control Manager (SCM) default pre-kill wait is 5 seconds, so 5 is the minimum recommended value. + /// Note: the total host shutdown timeout must be set higher than this value to also account for data finalization (AOF commit / checkpoint). + /// + public int ShutdownTimeoutSeconds = 5; + /// /// Instance of interface to create named device factories /// diff --git a/libs/server/Servers/GarnetServerTcp.cs b/libs/server/Servers/GarnetServerTcp.cs index 6c9d2868f77..694ef6b9a0f 100644 --- a/libs/server/Servers/GarnetServerTcp.cs +++ b/libs/server/Servers/GarnetServerTcp.cs @@ -113,10 +113,15 @@ public GarnetServerTcp( /// /// Stop listening for new connections. Frees the listening port /// without waiting for active connections to drain. + /// Safe to call multiple times (idempotent). /// public override void Close() { - listenSocket.Close(); + try + { + listenSocket.Close(); + } + catch (ObjectDisposedException) { } } /// @@ -124,7 +129,7 @@ public override void Close() /// public override void Dispose() { - // Close listening socket to free the port and stop accepting new connections. + // Dispose listening socket (may already be closed by StopListening/Close during graceful shutdown). // This also prevents new connections from arriving while DisposeActiveHandlers drains existing ones. listenSocket.Dispose(); base.Dispose(); @@ -149,6 +154,23 @@ public override void Start() AcceptEventArg_Completed(null, acceptEventArg); } + /// + public override void StopListening() + { + try + { + // Delegates to Close() which closes the listen socket. + // This will cause any pending AcceptAsync to complete with OperationAborted, + // handled as a Tier 1 (clean shutdown) error in HandleAcceptError. + Close(); + logger?.LogDebug("Stopped accepting new connections on {endpoint}", EndPoint); + } + catch (Exception ex) + { + logger?.LogDebug(ex, "Error closing listen socket on {endpoint}", EndPoint); + } + } + private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e) { try diff --git a/libs/server/Servers/IGarnetServer.cs b/libs/server/Servers/IGarnetServer.cs index 2b33e77648d..93a5a621751 100644 --- a/libs/server/Servers/IGarnetServer.cs +++ b/libs/server/Servers/IGarnetServer.cs @@ -47,10 +47,29 @@ public interface IGarnetServer : IDisposable /// public void Start(); + /// + /// Stop accepting new connections (for graceful shutdown). + /// Existing connections remain active until they complete or are disposed. + /// + public void StopListening(); + /// /// Stop listening for new connections. Frees the listening port /// without waiting for active connections to drain. /// public void Close(); + + /// + /// Signal all active sessions to reject incoming commands. + /// Sessions complete any in-flight command, respond with a LOADING error + /// on the next received message, then close the connection. + /// Must be called before for deterministic shutdown. + /// + public void BeginQuiesce(); + + /// + /// True after has been called. + /// + public bool IsQuiescing { get; } } } \ No newline at end of file diff --git a/libs/server/Servers/StoreApi.cs b/libs/server/Servers/StoreApi.cs index 9ad5d9db3f0..6cdc744b876 100644 --- a/libs/server/Servers/StoreApi.cs +++ b/libs/server/Servers/StoreApi.cs @@ -93,6 +93,35 @@ public bool FlushDB(int dbId = 0, bool unsafeTruncateLog = false) } } + /// + /// Take checkpoint for all active databases + /// + /// True if method can return before checkpoint is taken + /// Cancellation token + /// false if checkpoint was skipped due to node state or another checkpoint in progress + public async ValueTask TakeCheckpointAsync(bool background = false, CancellationToken token = default) + { + using (PreventRoleChange(out var acquired)) + { + if (!acquired || IsReplica) + { + return false; + } + + return await storeWrapper.TakeCheckpointAsync(background, logger: null, token: token).ConfigureAwait(false); + } + } + + /// + /// Check if storage tier is enabled + /// + public bool IsStorageTierEnabled => storeWrapper.serverOptions.EnableStorageTier; + + /// + /// Check if AOF is enabled + /// + public bool IsAOFEnabled => storeWrapper.serverOptions.EnableAOF; + /// /// Helper to disable role changes during a using block. /// diff --git a/main/GarnetServer/Program.cs b/main/GarnetServer/Program.cs index 7b2673ebc41..b52a4cd658f 100644 --- a/main/GarnetServer/Program.cs +++ b/main/GarnetServer/Program.cs @@ -10,7 +10,7 @@ namespace Garnet /// public class Program { - static void Main(string[] args) + static async Task Main(string[] args) { try { @@ -22,7 +22,52 @@ static void Main(string[] args) // Start the server server.Start(); - Thread.Sleep(Timeout.Infinite); + using var cts = new CancellationTokenSource(); + + ConsoleCancelEventHandler cancelKeyPressHandler = (sender, e) => + { + e.Cancel = true; // Prevent the process from terminating immediately + if (!cts.IsCancellationRequested) + cts.Cancel(); + }; + + EventHandler processExitHandler = (sender, e) => + { + try + { + if (!cts.IsCancellationRequested) + cts.Cancel(); + } + catch (ObjectDisposedException) + { + // The cancellation source may already be disposed during process teardown. + } + }; + + // Signal cancellation on Ctrl+C; avoid blocking the event handler with async work + Console.CancelKeyPress += cancelKeyPressHandler; + + // Signal cancellation on SIGTERM (e.g., container orchestrators, systemd) + AppDomain.CurrentDomain.ProcessExit += processExitHandler; + + // Wait until a shutdown signal is received + try + { + try + { + await Task.Delay(Timeout.Infinite, cts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Graceful shutdown: drain connections, commit AOF, take checkpoint + await server.ShutdownAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); + } + } + finally + { + Console.CancelKeyPress -= cancelKeyPressHandler; + AppDomain.CurrentDomain.ProcessExit -= processExitHandler; + } } catch (Exception ex) { diff --git a/test/Garnet.test/GarnetServerTcpTests.cs b/test/Garnet.test/GarnetServerTcpTests.cs new file mode 100644 index 00000000000..9591243a938 --- /dev/null +++ b/test/Garnet.test/GarnetServerTcpTests.cs @@ -0,0 +1,209 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Allure.NUnit; +using Garnet.server; +using NUnit.Framework; +using NUnit.Framework.Legacy; +using StackExchange.Redis; + +namespace Garnet.test +{ + [AllureNUnit] + [TestFixture, NonParallelizable] + public class GarnetServerTcpTests : AllureTestBase + { + private GarnetServer server; + + [SetUp] + public void Setup() + { + TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir); + server.Start(); + } + + [TearDown] + public void TearDown() + { + server?.Dispose(); + TestUtils.DeleteDirectory(TestUtils.MethodTestDir); + } + + [Test] + public void StopListeningPreventsNewConnections() + { + // Arrange - Establish a working connection first + using var redis1 = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db1 = redis1.GetDatabase(0); + db1.StringSet("test", "value"); + ClassicAssert.AreEqual("value", (string)db1.StringGet("test")); + + // Act - Stop listening on all servers + foreach (var tcpServer in server.Provider.StoreWrapper.Servers.OfType()) + { + tcpServer.StopListening(); + } + + Thread.Sleep(100); // Brief delay to ensure socket is closed + + // Assert - New connections should fail + Assert.Throws(() => + { + using var redis2 = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + redis2.GetDatabase(0).Ping(); + }); + + // Existing connection should still work + ClassicAssert.AreEqual("value", (string)db1.StringGet("test")); + } + + [Test] + public void StopListeningIdempotent() + { + // Arrange + foreach (var tcpServer in server.Provider.StoreWrapper.Servers.OfType()) + { + tcpServer.StopListening(); + } + + // Act & Assert - Calling StopListening again should not throw + Assert.DoesNotThrow(() => + { + foreach (var tcpServer in server.Provider.StoreWrapper.Servers.OfType()) + { + tcpServer.StopListening(); + } + }); + } + + [Test] + public async Task StopListeningDuringActiveConnectionAttempts() + { + // Arrange - Start multiple connection attempts + var connectionTasks = new System.Collections.Generic.List(); + using var cts = new CancellationTokenSource(); + + for (int i = 0; i < 10; i++) + { + connectionTasks.Add(Task.Run(async () => + { + while (!cts.Token.IsCancellationRequested) + { + try + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + await redis.GetDatabase(0).PingAsync(); + await Task.Delay(10); + } + catch + { + // Connection failures are expected after StopListening + } + } + }, cts.Token)); + } + + await Task.Delay(50); // Let some connections establish + + // Act + foreach (var tcpServer in server.Provider.StoreWrapper.Servers.OfType()) + { + tcpServer.StopListening(); + } + + await Task.Delay(100); + cts.Cancel(); + + // Assert - All tasks should complete without unhandled exceptions + Assert.DoesNotThrowAsync(async () => await Task.WhenAll(connectionTasks)); + } + + [Test] + public async Task ShutdownAsyncCompletesGracefully() + { + // Arrange - Write data and then close the connection + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + db.StringSet("shutdown-test", "data"); + ClassicAssert.AreEqual("data", (string)db.StringGet("shutdown-test")); + } + + // Act - Graceful shutdown (no active connections) + await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(5)).ConfigureAwait(false); + + // Assert - New connections should fail after shutdown + Assert.Throws(() => + { + using var redis2 = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + redis2.GetDatabase(0).Ping(); + }); + } + + [Test] + public async Task ShutdownAsyncRespectsTimeout() + { + // Arrange - Establish a connection that will stay open + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + db.Ping(); + + // Act - Shutdown with a very short timeout + var sw = System.Diagnostics.Stopwatch.StartNew(); + await server.ShutdownAsync(timeout: TimeSpan.FromMilliseconds(200)).ConfigureAwait(false); + sw.Stop(); + + // Assert - Should complete without hanging indefinitely + // Allow generous upper bound for CI environments + ClassicAssert.Less(sw.ElapsedMilliseconds, 10_000, + "ShutdownAsync should complete within a reasonable time even with active connections"); + } + + [Test] + public async Task ShutdownAsyncRespectsCancellation() + { + // Arrange + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + redis.GetDatabase(0).Ping(); + + using var cts = new CancellationTokenSource(); + + // Act - Cancel immediately + cts.Cancel(); + Assert.DoesNotThrowAsync(async () => + { + await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(30), token: cts.Token).ConfigureAwait(false); + }); + } + + [Test] + public async Task ShutdownAsyncWithAofCommit() + { + // Arrange - Create server with AOF enabled + server?.Dispose(); + TestUtils.DeleteDirectory(TestUtils.MethodTestDir); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, enableAOF: true); + server.Start(); + + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + // Write some data + for (int i = 0; i < 100; i++) + { + db.StringSet($"aof-key-{i}", $"value-{i}"); + } + + // Act - Shutdown should commit AOF without errors + Assert.DoesNotThrowAsync(async () => + { + await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(5)).ConfigureAwait(false); + }); + } + } +} \ No newline at end of file diff --git a/test/Garnet.test/RespAdminCommandsTests.cs b/test/Garnet.test/RespAdminCommandsTests.cs index 3d1f801b2b9..bb8b68a8743 100644 --- a/test/Garnet.test/RespAdminCommandsTests.cs +++ b/test/Garnet.test/RespAdminCommandsTests.cs @@ -662,5 +662,126 @@ public void ConfigGetWrongNumberOfArguments() ClassicAssert.AreEqual(expectedMessage, ex.Message); } #endregion + + #region GracefulShutdownTests + [Test] + public async Task ShutdownAsyncStopsAcceptingNewConnections() + { + // Arrange - write data then close connection + using (var redis1 = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db1 = redis1.GetDatabase(0); + db1.StringSet("test", "value"); + } + + // Act - Initiate shutdown + await server.ShutdownAsync(TimeSpan.FromSeconds(5)); + + // Assert - New connections should fail + Assert.Throws(() => + { + using var redis2 = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + redis2.GetDatabase(0).Ping(); + }); + } + + [Test] + public async Task ShutdownAsyncWaitsForActiveConnections() + { + // Arrange - keep connection open to test timeout behavior + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + db.StringSet("key1", "value1"); + + // Act - Start shutdown with short timeout; active connection will force timeout + var sw = System.Diagnostics.Stopwatch.StartNew(); + await server.ShutdownAsync(TimeSpan.FromMilliseconds(500)); + sw.Stop(); + + // Assert - Should wait approximately the timeout duration before proceeding + ClassicAssert.GreaterOrEqual(sw.ElapsedMilliseconds, 400, + "Shutdown should wait for the timeout duration when connections are active"); + ClassicAssert.Less(sw.ElapsedMilliseconds, 5_000, + "Shutdown should not hang beyond a reasonable bound"); + } + + [Test] + public async Task ShutdownAsyncCommitsAOF() + { + // Arrange + server.Dispose(); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, enableAOF: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) + { + var db = redis.GetDatabase(0); + db.StringSet("aofKey", "aofValue"); + } + + // Act - Shutdown which should commit AOF + await server.ShutdownAsync(TimeSpan.FromSeconds(5)); + server.Dispose(false); + + // Assert - Recover and verify data persisted + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, enableAOF: true, tryRecover: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + var recoveredValue = db.StringGet("aofKey"); + ClassicAssert.AreEqual("aofValue", recoveredValue.ToString()); + } + } + + [Test] + public async Task ShutdownAsyncTakesCheckpointWhenStorageTierEnabled() + { + // Arrange + server.Dispose(); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) + { + var db = redis.GetDatabase(0); + db.StringSet("checkpointKey", "checkpointValue"); + } + + // Act - Shutdown which should take checkpoint + await server.ShutdownAsync(TimeSpan.FromSeconds(5)); + server.Dispose(false); + + // Assert - Recover from checkpoint + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + var recoveredValue = db.StringGet("checkpointKey"); + ClassicAssert.AreEqual("checkpointValue", recoveredValue.ToString()); + } + } + + [Test] + public async Task ShutdownAsyncRespectsTimeout() + { + // Arrange - keep connection open to force timeout path + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + db.StringSet("key", "value"); + + // Act - Shutdown with very short timeout + var sw = System.Diagnostics.Stopwatch.StartNew(); + await server.ShutdownAsync(TimeSpan.FromMilliseconds(100)); + sw.Stop(); + + // Assert - Should complete within reasonable time + ClassicAssert.Less(sw.ElapsedMilliseconds, 5_000, + $"Shutdown should complete within reasonable time. Actual: {sw.ElapsedMilliseconds}ms"); + } + #endregion } } \ No newline at end of file diff --git a/test/Garnet.test/ShutdownDataConsistencyTests.cs b/test/Garnet.test/ShutdownDataConsistencyTests.cs new file mode 100644 index 00000000000..236b808470a --- /dev/null +++ b/test/Garnet.test/ShutdownDataConsistencyTests.cs @@ -0,0 +1,436 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Allure.NUnit; +using NUnit.Framework; +using NUnit.Framework.Legacy; +using StackExchange.Redis; + +namespace Garnet.test +{ + /// + /// Tests to investigate data consistency under different shutdown finalization sequences: + /// 1. Checkpoint first, then AOF commit + /// 2. AOF commit first, then Checkpoint (current production order) + /// 3. AOF commit only + /// 4. Checkpoint only + /// + /// Each test writes data to main store (string keys) and object store (sorted sets), + /// performs the finalization sequence, disposes without cleanup, recovers, and verifies data. + /// + [AllureNUnit] + [TestFixture] + public class ShutdownDataConsistencyTests : AllureTestBase + { + private GarnetServer server; + + private const int KeyCount = 50; + private const string MainStoreKeyPrefix = "shutdowntest:key:"; + private const string MainStoreValuePrefix = "shutdowntest:value:"; + private const string ObjectStoreKeyPrefix = "shutdowntest:zset:"; + + [SetUp] + public void Setup() + { + TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true); + } + + [TearDown] + public void TearDown() + { + server?.Dispose(); + TestUtils.DeleteDirectory(TestUtils.MethodTestDir); + } + + /// + /// Creates a server with both AOF and storage tier enabled (low memory forces spill to disk). + /// + private GarnetServer CreateServerWithAofAndStorage(bool tryRecover = false) + { + return TestUtils.CreateGarnetServer( + TestUtils.MethodTestDir, + enableAOF: true, + lowMemory: true, + tryRecover: tryRecover); + } + + /// + /// Populates main store with string key-value pairs and object store with sorted sets. + /// + private void PopulateData() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true)); + var db = redis.GetDatabase(0); + + // Main store: string keys + for (var i = 0; i < KeyCount; i++) + { + db.StringSet($"{MainStoreKeyPrefix}{i}", $"{MainStoreValuePrefix}{i}"); + } + + // Object store: sorted sets + for (var i = 0; i < KeyCount; i++) + { + var entries = new SortedSetEntry[] + { + new($"member_a_{i}", i * 10), + new($"member_b_{i}", i * 10 + 1), + new($"member_c_{i}", i * 10 + 2), + }; + db.SortedSetAdd($"{ObjectStoreKeyPrefix}{i}", entries); + } + } + + /// + /// Verifies all main store and object store data is recovered correctly. + /// Returns (mainStoreRecovered, objectStoreRecovered) counts. + /// + private (int mainStoreRecovered, int objectStoreRecovered) VerifyRecoveredData() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + var mainStoreRecovered = 0; + for (var i = 0; i < KeyCount; i++) + { + var value = db.StringGet($"{MainStoreKeyPrefix}{i}"); + if (value.HasValue) + { + ClassicAssert.AreEqual($"{MainStoreValuePrefix}{i}", value.ToString(), + $"Main store key {MainStoreKeyPrefix}{i} has wrong value after recovery"); + mainStoreRecovered++; + } + } + + var objectStoreRecovered = 0; + for (var i = 0; i < KeyCount; i++) + { + var members = db.SortedSetRangeByScoreWithScores($"{ObjectStoreKeyPrefix}{i}"); + if (members.Length > 0) + { + ClassicAssert.AreEqual(3, members.Length, + $"Object store key {ObjectStoreKeyPrefix}{i} should have 3 members"); + ClassicAssert.AreEqual($"member_a_{i}", members[0].Element.ToString()); + ClassicAssert.AreEqual(i * 10, members[0].Score); + ClassicAssert.AreEqual($"member_b_{i}", members[1].Element.ToString()); + ClassicAssert.AreEqual(i * 10 + 1, members[1].Score); + ClassicAssert.AreEqual($"member_c_{i}", members[2].Element.ToString()); + ClassicAssert.AreEqual(i * 10 + 2, members[2].Score); + objectStoreRecovered++; + } + } + + return (mainStoreRecovered, objectStoreRecovered); + } + + /// + /// Scenario 1: Checkpoint → AOF commit sequence. + /// Takes checkpoint first, then commits AOF. + /// + [Test] + public async Task CheckpointThenAofCommit_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + PopulateData(); + + // Create CanclellationTokenSource with a long timeout to prevent hanging indefinitely in case of issues during shutdown + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + TestContext.Progress.WriteLine($"Created CancellationTokenSource with 5 minute timeout for test shutdown"); + + // Sequence: Checkpoint first, then AOF commit + await server.Store.TakeCheckpointAsync(background: false); + await server.Store.CommitAOFAsync(cts.Token); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + var (mainRecovered, objRecovered) = VerifyRecoveredData(); + + TestContext.Progress.WriteLine( + $"[Checkpoint→AOF] Main store: {mainRecovered}/{KeyCount}, Object store: {objRecovered}/{KeyCount}"); + + ClassicAssert.AreEqual(KeyCount, mainRecovered, + "Checkpoint→AOF: Not all main store keys recovered"); + ClassicAssert.AreEqual(KeyCount, objRecovered, + "Checkpoint→AOF: Not all object store keys recovered"); + } + + /// + /// Scenario 2: AOF commit → Checkpoint sequence (current production order). + /// Commits AOF first, then takes checkpoint. + /// + [Test] + public async Task AofCommitThenCheckpoint_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + PopulateData(); + + // Create CanclellationTokenSource with a long timeout to prevent hanging indefinitely in case of issues during shutdown + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + TestContext.Progress.WriteLine($"Created CancellationTokenSource with 5 minute timeout for test shutdown"); + + // Sequence: AOF commit first, then Checkpoint (matches current FinalizeDataAsync order) + await server.Store.CommitAOFAsync(cts.Token); + await server.Store.TakeCheckpointAsync(background: false); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + var (mainRecovered, objRecovered) = VerifyRecoveredData(); + + TestContext.Progress.WriteLine( + $"[AOF→Checkpoint] Main store: {mainRecovered}/{KeyCount}, Object store: {objRecovered}/{KeyCount}"); + + ClassicAssert.AreEqual(KeyCount, mainRecovered, + "AOF→Checkpoint: Not all main store keys recovered"); + ClassicAssert.AreEqual(KeyCount, objRecovered, + "AOF→Checkpoint: Not all object store keys recovered"); + } + + /// + /// Scenario 3: AOF commit only (no checkpoint). + /// Only commits AOF before shutdown. + /// + [Test] + public async Task AofCommitOnly_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + PopulateData(); + + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + + // Sequence: AOF commit only + await server.Store.CommitAOFAsync(cts.Token); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + var (mainRecovered, objRecovered) = VerifyRecoveredData(); + + TestContext.Progress.WriteLine( + $"[AOF Only] Main store: {mainRecovered}/{KeyCount}, Object store: {objRecovered}/{KeyCount}"); + + ClassicAssert.AreEqual(KeyCount, mainRecovered, + "AOF Only: Not all main store keys recovered"); + ClassicAssert.AreEqual(KeyCount, objRecovered, + "AOF Only: Not all object store keys recovered"); + } + + /// + /// Scenario 4: Checkpoint only (no AOF commit). + /// Only takes checkpoint before shutdown. + /// + [Test] + public async Task CheckpointOnly_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + PopulateData(); + + // Sequence: Checkpoint only (no AOF commit) + await server.Store.TakeCheckpointAsync(background: false); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + var (mainRecovered, objRecovered) = VerifyRecoveredData(); + + TestContext.Progress.WriteLine( + $"[Checkpoint Only] Main store: {mainRecovered}/{KeyCount}, Object store: {objRecovered}/{KeyCount}"); + + // Note: With checkpoint only (no AOF commit), data written after the last + // checkpoint but before the AOF commit may be lost. This test documents + // the actual behavior for investigation purposes. + ClassicAssert.AreEqual(KeyCount, mainRecovered, + "Checkpoint Only: Not all main store keys recovered"); + ClassicAssert.AreEqual(KeyCount, objRecovered, + "Checkpoint Only: Not all object store keys recovered"); + } + + /// + /// Scenario 5: No finalization at all (baseline - result is configuration-dependent). + /// Neither AOF commit nor checkpoint before shutdown. + /// Recovery results may vary if data was already persisted by background AOF or storage tier spill. + /// + [Test] + public void NoFinalization_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + PopulateData(); + + // No explicit finalization before shutdown + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + var (mainRecovered, objRecovered) = VerifyRecoveredData(); + + TestContext.Progress.WriteLine( + $"[No Finalization] Main store: {mainRecovered}/{KeyCount}, Object store: {objRecovered}/{KeyCount}"); + + // Baseline observation only: recovery depends on prior persistence behavior. + TestContext.Progress.WriteLine( + $"[No Finalization] Data loss: main store={KeyCount - mainRecovered}, object store={KeyCount - objRecovered}"); + } + + /// + /// Scenario 6: Interleaved writes with checkpoint then additional writes with AOF commit. + /// Simulates the case where new writes happen between checkpoint and AOF commit. + /// + [Test] + public async Task CheckpointThenMoreWritesThenAofCommit_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + // Phase 1: Initial data + PopulateData(); + + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + + // Take checkpoint + await server.Store.TakeCheckpointAsync(background: false); + + // Phase 2: Write additional data AFTER checkpoint + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + for (var i = KeyCount; i < KeyCount * 2; i++) + { + db.StringSet($"{MainStoreKeyPrefix}{i}", $"{MainStoreValuePrefix}{i}"); + } + } + + // Now commit AOF (should capture phase 2 writes) + await server.Store.CommitAOFAsync(cts.Token); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + var phase1Recovered = 0; + for (var i = 0; i < KeyCount; i++) + { + var value = db.StringGet($"{MainStoreKeyPrefix}{i}"); + if (value.HasValue && value.ToString() == $"{MainStoreValuePrefix}{i}") + phase1Recovered++; + } + + var phase2Recovered = 0; + for (var i = KeyCount; i < KeyCount * 2; i++) + { + var value = db.StringGet($"{MainStoreKeyPrefix}{i}"); + if (value.HasValue && value.ToString() == $"{MainStoreValuePrefix}{i}") + phase2Recovered++; + } + + TestContext.Progress.WriteLine( + $"[Checkpoint→Writes→AOF] Phase1: {phase1Recovered}/{KeyCount}, Phase2 (post-checkpoint): {phase2Recovered}/{KeyCount}"); + + ClassicAssert.AreEqual(KeyCount, phase1Recovered, + "Checkpoint→Writes→AOF: Not all phase 1 keys recovered"); + ClassicAssert.AreEqual(KeyCount, phase2Recovered, + "Checkpoint→Writes→AOF: Not all phase 2 (post-checkpoint) keys recovered"); + } + } + + /// + /// Scenario 7: AOF commit then additional writes then checkpoint. + /// Simulates the case where new writes happen between AOF commit and checkpoint. + /// + [Test] + public async Task AofCommitThenMoreWritesThenCheckpoint_DataConsistencyTest() + { + server = CreateServerWithAofAndStorage(); + server.Start(); + + // Phase 1: Initial data + PopulateData(); + + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + + // Commit AOF + await server.Store.CommitAOFAsync(cts.Token); + + // Phase 2: Write additional data AFTER AOF commit + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + for (var i = KeyCount; i < KeyCount * 2; i++) + { + db.StringSet($"{MainStoreKeyPrefix}{i}", $"{MainStoreValuePrefix}{i}"); + } + } + + // Now take checkpoint (should capture phase 2 writes) + await server.Store.TakeCheckpointAsync(background: false); + + server.Dispose(false); + + // Recover and verify + server = CreateServerWithAofAndStorage(tryRecover: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + var phase1Recovered = 0; + for (var i = 0; i < KeyCount; i++) + { + var value = db.StringGet($"{MainStoreKeyPrefix}{i}"); + if (value.HasValue && value.ToString() == $"{MainStoreValuePrefix}{i}") + phase1Recovered++; + } + + var phase2Recovered = 0; + for (var i = KeyCount; i < KeyCount * 2; i++) + { + var value = db.StringGet($"{MainStoreKeyPrefix}{i}"); + if (value.HasValue && value.ToString() == $"{MainStoreValuePrefix}{i}") + phase2Recovered++; + } + + TestContext.Progress.WriteLine( + $"[AOF→Writes→Checkpoint] Phase1: {phase1Recovered}/{KeyCount}, Phase2 (post-AOF): {phase2Recovered}/{KeyCount}"); + + ClassicAssert.AreEqual(KeyCount, phase1Recovered, + "AOF→Writes→Checkpoint: Not all phase 1 keys recovered"); + ClassicAssert.AreEqual(KeyCount, phase2Recovered, + "AOF→Writes→Checkpoint: Not all phase 2 (post-AOF) keys recovered"); + } + } + } +} \ No newline at end of file