diff --git a/benchmark/BDN.benchmark/BDN.benchmark.csproj b/benchmark/BDN.benchmark/BDN.benchmark.csproj index 8e6466597a5..f3bda355d6b 100644 --- a/benchmark/BDN.benchmark/BDN.benchmark.csproj +++ b/benchmark/BDN.benchmark/BDN.benchmark.csproj @@ -1,32 +1,32 @@  - - Exe - enable - true - ../../Garnet.snk - false - + + Exe + enable + true + ../../Garnet.snk + false + - - + + - - - - - - - - + + + + + + + + - - - - - - + + + + + + - + \ No newline at end of file diff --git a/benchmark/BDN.benchmark/Cluster/ClusterContext.cs b/benchmark/BDN.benchmark/Cluster/ClusterContext.cs index e428ca60a0f..d39a87a3798 100644 --- a/benchmark/BDN.benchmark/Cluster/ClusterContext.cs +++ b/benchmark/BDN.benchmark/Cluster/ClusterContext.cs @@ -29,16 +29,28 @@ public void Dispose() server.Dispose(); } - public void SetupSingleInstance(bool disableSlotVerification = false) + public void SetupSingleInstance(ClusterParams clusterParams) { var opt = new GarnetServerOptions { QuietMode = true, - EnableCluster = !disableSlotVerification, + EnableCluster = !clusterParams.disableSlotVerification, EndPoints = [new IPEndPoint(IPAddress.Loopback, port)], CleanClusterConfig = true, - ClusterAnnounceEndpoint = new IPEndPoint(IPAddress.Loopback, port) + ClusterAnnounceEndpoint = new IPEndPoint(IPAddress.Loopback, port), + EnableAOF = clusterParams.enableAof, }; + + if (clusterParams.enableAof) + { + opt.EnableAOF = true; + opt.UseAofNullDevice = true; + opt.FastAofTruncate = true; + opt.CommitFrequencyMs = -1; + opt.AofPageSize = "128m"; + opt.AofMemorySize = "256m"; + } + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) opt.CheckpointDir = "/tmp"; server = new EmbeddedRespServer(opt); @@ -168,5 +180,4 @@ public void CreateCTXNSET(int keySize = 8, int batchSize = 100) public void Consume(byte* ptr, int length) => session.TryConsumeMessages(ptr, length); } - } \ No newline at end of file diff --git a/benchmark/BDN.benchmark/Cluster/ClusterMigrate.cs b/benchmark/BDN.benchmark/Cluster/ClusterMigrate.cs index 532989937c4..45cd0689d75 100644 --- a/benchmark/BDN.benchmark/Cluster/ClusterMigrate.cs +++ b/benchmark/BDN.benchmark/Cluster/ClusterMigrate.cs @@ -25,7 +25,7 @@ public unsafe class ClusterMigrate /// public IEnumerable ClusterParamsProvider() { - yield return new(false); + yield return new(false, false); } ClusterContext cc; @@ -34,7 +34,7 @@ public IEnumerable ClusterParamsProvider() public void GlobalSetup() { cc = new ClusterContext(); - cc.SetupSingleInstance(); + cc.SetupSingleInstance(Params); cc.AddSlotRange([(0, 16383)]); cc.CreateGetSet(); cc.CreateMGetMSet(); diff --git a/benchmark/BDN.benchmark/Cluster/ClusterOperations.cs b/benchmark/BDN.benchmark/Cluster/ClusterOperations.cs index 2799885ae92..9da411d87bd 100644 --- a/benchmark/BDN.benchmark/Cluster/ClusterOperations.cs +++ b/benchmark/BDN.benchmark/Cluster/ClusterOperations.cs @@ -22,8 +22,9 @@ public unsafe class ClusterOperations /// public IEnumerable ClusterParamsProvider() { - yield return new(false); - yield return new(true); + yield return new(false, false); + yield return new(true, false); + yield return new(false, true); } ClusterContext cc; @@ -32,7 +33,7 @@ public IEnumerable ClusterParamsProvider() public virtual void GlobalSetup() { cc = new ClusterContext(); - cc.SetupSingleInstance(Params.disableSlotVerification); + cc.SetupSingleInstance(Params); cc.AddSlotRange([(0, 16383)]); cc.CreateGetSet(); cc.CreateMGetMSet(); diff --git a/benchmark/BDN.benchmark/Cluster/ClusterParams.cs b/benchmark/BDN.benchmark/Cluster/ClusterParams.cs index a47a86568bc..68b3e5abe98 100644 --- a/benchmark/BDN.benchmark/Cluster/ClusterParams.cs +++ b/benchmark/BDN.benchmark/Cluster/ClusterParams.cs @@ -13,12 +13,18 @@ public struct ClusterParams /// public bool disableSlotVerification; + /// + /// Whether to enable AOF + /// + public bool enableAof; + /// /// Constructor /// - public ClusterParams(bool disableSlotVerification) + public ClusterParams(bool disableSlotVerification, bool enableAof) { this.disableSlotVerification = disableSlotVerification; + this.enableAof = enableAof; } /// @@ -26,12 +32,16 @@ public ClusterParams(bool disableSlotVerification) /// public override string ToString() { - if (!disableSlotVerification) + if (!disableSlotVerification && !enableAof) return "None"; var ret = ""; if (disableSlotVerification) ret += "DSV"; + + if (enableAof) + ret += ret.Length == 0 ? "AOF" : "+AOF"; + return ret; } } diff --git a/libs/client/ClientSession/GarnetClientSession.cs b/libs/client/ClientSession/GarnetClientSession.cs index 66865626170..8b9161c762d 100644 --- a/libs/client/ClientSession/GarnetClientSession.cs +++ b/libs/client/ClientSession/GarnetClientSession.cs @@ -216,7 +216,7 @@ private async Task ConnectSendSocketAsync(int millisecondsTimeout = 0, C NoDelay = true }; - if (await TryConnectSocketAsync(socket, endpoint, millisecondsTimeout, cancellationToken)) + if (await TryConnectSocketAsync(socket, endpoint, millisecondsTimeout, cancellationToken).ConfigureAwait(false)) return socket; } } @@ -226,7 +226,7 @@ private async Task ConnectSendSocketAsync(int millisecondsTimeout = 0, C if (EndPoint is not UnixDomainSocketEndPoint) socket.NoDelay = true; - if (await TryConnectSocketAsync(socket, EndPoint, millisecondsTimeout, cancellationToken)) + if (await TryConnectSocketAsync(socket, EndPoint, millisecondsTimeout, cancellationToken).ConfigureAwait(false)) return socket; } @@ -251,12 +251,12 @@ private async Task TryConnectSocketAsync(Socket socket, EndPoint endpoint, using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); var connectTask = socket.ConnectAsync(endpoint, timeoutCts.Token).AsTask(); - if (await Task.WhenAny(connectTask, Task.Delay(millisecondsTimeout, timeoutCts.Token)) == connectTask) + if (await Task.WhenAny(connectTask, Task.Delay(millisecondsTimeout, timeoutCts.Token)).ConfigureAwait(false) == connectTask) { // Task completed within timeout. // Consider that the task may have faulted or been canceled. // We re-await the task so that any exceptions/cancellation is rethrown. - await connectTask; + await connectTask.ConfigureAwait(false); } else { diff --git a/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs b/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs index 32c70732510..d81713ba3a7 100644 --- a/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs +++ b/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs @@ -31,7 +31,7 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo /// /// /// - /// + /// public Task ExecuteReplicaSync(string nodeId, string primary_replid, byte[] checkpointEntryData, long aofBeginAddress, long aofTailAddress) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -113,7 +113,7 @@ public Task ExecuteReplicaSync(string nodeId, string primary_replid, byt /// /// /// - /// + /// public Task ExecuteSendCkptMetadata(Memory fileTokenBytes, int fileType, Memory data) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -180,7 +180,7 @@ public Task ExecuteSendCkptMetadata(Memory fileTokenBytes, int fil /// /// /// - /// + /// public Task ExecuteSendFileSegments(Memory fileTokenBytes, int fileType, long startAddress, Span data, int segmentId = -1) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -266,7 +266,7 @@ public Task ExecuteSendFileSegments(Memory fileTokenBytes, int fil /// /// /// - /// + /// public Task ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool replayAOF, string primary_replid, byte[] checkpointEntryData, long beginAddress, long tailAddress) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -355,7 +355,7 @@ public Task ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool re /// /// /// - /// + /// public Task ExecuteAttachSync(byte[] syncMetadata) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -403,7 +403,7 @@ public Task ExecuteAttachSync(byte[] syncMetadata) /// Set CLUSTER SYNC header info /// /// - /// + /// public void SetClusterSyncHeader(string sourceNodeId) { // Unlike Migration, where we don't know at the time of header initialization if we have a record or not, in Replication diff --git a/libs/client/GarnetClient.cs b/libs/client/GarnetClient.cs index 36cef49fb9f..c41fcff620c 100644 --- a/libs/client/GarnetClient.cs +++ b/libs/client/GarnetClient.cs @@ -277,7 +277,21 @@ public async Task ConnectAsync(CancellationToken token = default) } catch (Exception e) { - logger?.LogError(e, "AUTH returned error"); + logger?.LogError(e, "AUTH returned error!"); + throw; + } + + try + { + if (clientName != null) + { + _ = await ExecuteForStringResultAsync(CLIENT, SETINFO).ConfigureAwait(false); + _ = await ExecuteForStringResultAsync(CLIENT, clientName).ConfigureAwait(false); + } + } + catch (Exception e) + { + logger?.LogError(e, "Client set info returned error"); throw; } @@ -316,7 +330,7 @@ private async Task ConnectSendSocketAsync(int millisecondsTimeout = 0, C NoDelay = true }; - if (await TryConnectSocketAsync(socket, endpoint, millisecondsTimeout, cancellationToken)) + if (await TryConnectSocketAsync(socket, endpoint, millisecondsTimeout, cancellationToken).ConfigureAwait(false)) return socket; } } @@ -326,7 +340,7 @@ private async Task ConnectSendSocketAsync(int millisecondsTimeout = 0, C if (EndPoint is not UnixDomainSocketEndPoint) socket.NoDelay = true; - if (await TryConnectSocketAsync(socket, EndPoint, millisecondsTimeout, cancellationToken)) + if (await TryConnectSocketAsync(socket, EndPoint, millisecondsTimeout, cancellationToken).ConfigureAwait(false)) return socket; } @@ -351,12 +365,12 @@ private async Task TryConnectSocketAsync(Socket socket, EndPoint endpoint, using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); var connectTask = socket.ConnectAsync(endpoint, timeoutCts.Token).AsTask(); - if (await Task.WhenAny(connectTask, Task.Delay(millisecondsTimeout, timeoutCts.Token)) == connectTask) + if (await Task.WhenAny(connectTask, Task.Delay(millisecondsTimeout, timeoutCts.Token)).ConfigureAwait(false) == connectTask) { // Task completed within timeout. // Consider that the task may have faulted or been canceled. // We re-await the task so that any exceptions/cancellation is rethrown. - await connectTask; + await connectTask.ConfigureAwait(false); } else { @@ -394,7 +408,7 @@ async Task TimeoutChecker() var _tcsOffset = tcsOffset; var _tailAddress = networkWriter.GetTailAddress(); - await Task.Delay(timeoutMilliseconds, token); + await Task.Delay(timeoutMilliseconds, token).ConfigureAwait(false); // Check if no new tasks added + no new results processed var _newTcsOffset = tcsOffset; var _newNextTaskId = networkWriter.GetNextTaskId(); @@ -447,7 +461,7 @@ public async Task ReconnectAsync(CancellationToken token = default) networkWriter?.Dispose(); } catch { } - await ConnectAsync(token); + await ConnectAsync(token).ConfigureAwait(false); } /// @@ -525,7 +539,7 @@ async ValueTask InputGateAsync(CancellationToken token = default) { if (PipelineLength() < maxOutstandingTasks) break; - await Task.Delay(delayMs, token); + await Task.Delay(delayMs, token).ConfigureAwait(false); if (delayMs == 0) delayMs = 1; else delayMs *= 2; if (delayMs > 4096) delayMs = 4096; @@ -598,7 +612,7 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory op, string par totalLen += 1 + NumUtils.CountDigits(arraySize) + 2; CheckLength(totalLen, tcs); - await InputGateAsync(token); + await InputGateAsync(token).ConfigureAwait(false); try { @@ -660,7 +674,7 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory op, string par try { networkWriter.epoch.Suspend(); - await AwaitPreviousTaskAsync(taskId); // does not take token, as task is not cancelable at this point + await AwaitPreviousTaskAsync(taskId).ConfigureAwait(false); // does not take token, as task is not cancelable at this point } finally { @@ -711,7 +725,7 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory op, Memory op, Memory respOp, IColle } CheckLength(totalLen, tcs); - await InputGateAsync(token); + await InputGateAsync(token).ConfigureAwait(false); try { @@ -1092,7 +1106,7 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory respOp, IColle try { networkWriter.epoch.Suspend(); - await AwaitPreviousTaskAsync(taskId); // does not take token, as task is not cancelable at this point + await AwaitPreviousTaskAsync(taskId).ConfigureAwait(false); // does not take token, as task is not cancelable at this point } finally { diff --git a/libs/client/GarnetClientAPI/GarnetClientBasicRespCommands.cs b/libs/client/GarnetClientAPI/GarnetClientBasicRespCommands.cs index abb58d6a334..4bf9983a45c 100644 --- a/libs/client/GarnetClientAPI/GarnetClientBasicRespCommands.cs +++ b/libs/client/GarnetClientAPI/GarnetClientBasicRespCommands.cs @@ -572,7 +572,7 @@ public async Task StringDecrement(Memory key, long value) /// Value /// public async Task StringDecrement(Memory key, long value, CancellationToken token) - => long.Parse(await ExecuteForStringResultWithCancellationAsync(DECRBY, key, Encoding.ASCII.GetBytes(value.ToString()), token)); + => long.Parse(await ExecuteForStringResultWithCancellationAsync(DECRBY, key, Encoding.ASCII.GetBytes(value.ToString()), token).ConfigureAwait(false)); /// /// Decrement number stored at key by value. diff --git a/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs b/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs index dbed29ec64e..ea715ad651f 100644 --- a/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs +++ b/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs @@ -149,13 +149,13 @@ public async Task ExecuteForStringResultWithCancellationAsync(Memory ExecuteForStringResultWithCancellationAsync(Memory ExecuteForStringResultWithCancellationAsync(string op, using (token.Register(TokenRegistrationStringCallback, tcs.stringTcs)) { var _ = InternalExecuteAsync(tcs, op, args, token); - return await tcs.stringTcs.Task; + return await tcs.stringTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, op, args, token); - return await tcs.stringTcs.Task; + return await tcs.stringTcs.Task.ConfigureAwait(false); } } @@ -377,13 +377,13 @@ public async Task> ExecuteForMemoryResultWithCancellationAsyn using (token.Register(TokenRegistrationMemoryResultCallback, tcs.memoryByteTcs)) { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.memoryByteTcs.Task; + return await tcs.memoryByteTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.memoryByteTcs.Task; + return await tcs.memoryByteTcs.Task.ConfigureAwait(false); } } @@ -403,13 +403,13 @@ public async Task> ExecuteForMemoryResultWithCancellationAsyn using (token.Register(TokenRegistrationMemoryResultCallback, tcs.memoryByteTcs)) { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.memoryByteTcs.Task; + return await tcs.memoryByteTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.memoryByteTcs.Task; + return await tcs.memoryByteTcs.Task.ConfigureAwait(false); } } @@ -428,13 +428,13 @@ public async Task> ExecuteForMemoryResultWithCancellationAsyn using (token.Register(TokenRegistrationMemoryResultCallback, tcs.memoryByteTcs)) { var _ = InternalExecuteAsync(tcs, op, args, token); - return await tcs.memoryByteTcs.Task; + return await tcs.memoryByteTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, op, args, token); - return await tcs.memoryByteTcs.Task; + return await tcs.memoryByteTcs.Task.ConfigureAwait(false); } } @@ -452,13 +452,13 @@ public async Task> ExecuteForMemoryResultWithCancellationAsyn using (token.Register(TokenRegistrationMemoryResultCallback, tcs.memoryByteTcs)) { var _ = InternalExecuteAsync(tcs, respOp, args, token); - return await tcs.memoryByteTcs.Task; + return await tcs.memoryByteTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, respOp, args, token); - return await tcs.memoryByteTcs.Task; + return await tcs.memoryByteTcs.Task.ConfigureAwait(false); } } @@ -601,13 +601,13 @@ public async Task ExecuteForStringArrayResultWithCancellationAsync(Mem using (token.Register(TokenRegistrationStringArrayCallback, tcs.stringArrayTcs)) { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.stringArrayTcs.Task; + return await tcs.stringArrayTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.stringArrayTcs.Task; + return await tcs.stringArrayTcs.Task.ConfigureAwait(false); } } @@ -628,13 +628,13 @@ public async Task ExecuteForStringArrayResultWithCancellationAsync(Mem using (token.Register(TokenRegistrationStringArrayCallback, tcs.stringArrayTcs)) { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.stringArrayTcs.Task; + return await tcs.stringArrayTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.stringArrayTcs.Task; + return await tcs.stringArrayTcs.Task.ConfigureAwait(false); } } @@ -654,13 +654,13 @@ public async Task ExecuteForStringArrayResultWithCancellationAsync(str using (token.Register(TokenRegistrationStringArrayCallback, tcs.stringArrayTcs)) { var _ = InternalExecuteAsync(tcs, op, args, token); - return await tcs.stringArrayTcs.Task; + return await tcs.stringArrayTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, op, args, token); - return await tcs.stringArrayTcs.Task; + return await tcs.stringArrayTcs.Task.ConfigureAwait(false); } } @@ -679,13 +679,13 @@ public async Task ExecuteForStringArrayResultWithCancellationAsync(Mem using (token.Register(TokenRegistrationStringArrayCallback, tcs.stringArrayTcs)) { var _ = InternalExecuteAsync(tcs, respOp, args, token); - return await tcs.stringArrayTcs.Task; + return await tcs.stringArrayTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, respOp, args, token); - return await tcs.stringArrayTcs.Task; + return await tcs.stringArrayTcs.Task.ConfigureAwait(false); } } @@ -829,13 +829,13 @@ public async Task[]> ExecuteForMemoryResultArrayWithCancellat using (token.Register(TokenRegistrationMemoryResultArrayCallback, tcs.memoryByteArrayTcs)) { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.memoryByteArrayTcs.Task; + return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.memoryByteArrayTcs.Task; + return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false); } } @@ -856,13 +856,13 @@ public async Task[]> ExecuteForMemoryResultArrayWithCancellat using (token.Register(TokenRegistrationMemoryResultArrayCallback, tcs.memoryByteArrayTcs)) { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.memoryByteArrayTcs.Task; + return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token); - return await tcs.memoryByteArrayTcs.Task; + return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false); } } @@ -883,13 +883,13 @@ public async Task[]> ExecuteForMemoryResultArrayWithCancellat using (token.Register(TokenRegistrationMemoryResultArrayCallback, tcs.memoryByteArrayTcs)) { var _ = InternalExecuteAsync(tcs, op, args, token); - return await tcs.memoryByteArrayTcs.Task; + return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, op, args, token); - return await tcs.memoryByteArrayTcs.Task; + return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false); } } @@ -1034,13 +1034,13 @@ public async Task ExecuteForLongResultWithCancellationAsync(string op, ICo using (token.Register(TokenRegistrationLongCallback, tcs.longTcs)) { var _ = InternalExecuteAsync(tcs, op, args, token); - return await tcs.longTcs.Task; + return await tcs.longTcs.Task.ConfigureAwait(false); } } else { var _ = InternalExecuteAsync(tcs, op, args, token); - return await tcs.longTcs.Task; + return await tcs.longTcs.Task.ConfigureAwait(false); } } diff --git a/libs/client/LightEpoch.cs b/libs/client/LightEpoch.cs index d54e254cdf4..c88acb1e64b 100644 --- a/libs/client/LightEpoch.cs +++ b/libs/client/LightEpoch.cs @@ -193,7 +193,7 @@ int SelectInstance() if (kInvalidIndex == Interlocked.CompareExchange(ref entry, 1, kInvalidIndex)) return i; } - throw new InvalidOperationException("Exceeded maximum number of active LightEpoch instances"); + throw new InvalidOperationException($"Exceeded maximum number of active LightEpoch instances {ActiveInstanceCount()} {InstanceIndexBuffer.MaxInstances}"); } /// diff --git a/libs/client/Utility.cs b/libs/client/Utility.cs index 3e008e66d40..45965a86a7e 100644 --- a/libs/client/Utility.cs +++ b/libs/client/Utility.cs @@ -192,14 +192,14 @@ private static async Task SlowWithCancellationAsync(Task task, Cancella var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); using (token.Register(s => ((TaskCompletionSource)s).TrySetResult(true), tcs, useSynchronizationContext)) { - if (task != await Task.WhenAny(task, tcs.Task)) + if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false)) { token.ThrowIfCancellationRequested(); } } // make sure any exceptions in the task get unwrapped and exposed to the caller. - return await task; + return await task.ConfigureAwait(false); } } } \ No newline at end of file diff --git a/libs/cluster/Server/ClusterConfig.cs b/libs/cluster/Server/ClusterConfig.cs index e3bb967d44f..55f7ba64e4d 100644 --- a/libs/cluster/Server/ClusterConfig.cs +++ b/libs/cluster/Server/ClusterConfig.cs @@ -1176,7 +1176,7 @@ public ClusterConfig MergeSlotMap(ClusterConfig senderConfig, ILogger logger = n // will not be able to claim the slot without outside intervention if (currentOwnerNodeId != null && currentOwnerNodeId.Equals(senderConfig.LocalNodeId, StringComparison.OrdinalIgnoreCase)) { - logger?.LogWarning("MergeReset: {senderConfig.LocalNodeIdShort} > {i} > {LocalNodeIdShort}", senderConfig.LocalNodeIdShort, i, LocalNodeIdShort); + // logger?.LogWarning("MergeReset: {senderConfig.LocalNodeIdShort} > {i} > {LocalNodeIdShort}", senderConfig.LocalNodeIdShort, i, LocalNodeIdShort); newSlotMap[i]._workerId = RESERVED_WORKER_ID; newSlotMap[i]._state = SlotState.OFFLINE; } diff --git a/libs/cluster/Server/Failover/FailoverManager.cs b/libs/cluster/Server/Failover/FailoverManager.cs index 9dd98b26513..f25989c5b00 100644 --- a/libs/cluster/Server/Failover/FailoverManager.cs +++ b/libs/cluster/Server/Failover/FailoverManager.cs @@ -89,7 +89,7 @@ public bool TryStartReplicaFailover(FailoverOption option, TimeSpan failoverTime logger: logger); _ = Task.Run(async () => { - var success = await currentFailoverSession.BeginAsyncReplicaFailover(); + var success = await currentFailoverSession.BeginAsyncReplicaFailover().ConfigureAwait(false); lastFailoverStatus = success ? FailoverStatus.FAILOVER_COMPLETED : FailoverStatus.FAILOVER_ABORTED; Reset(); }); @@ -121,7 +121,7 @@ public bool TryStartPrimaryFailover(string replicaAddress, int replicaPort, Fail logger: logger); _ = Task.Run(async () => { - _ = await currentFailoverSession.BeginAsyncPrimaryFailover(); + _ = await currentFailoverSession.BeginAsyncPrimaryFailover().ConfigureAwait(false); Reset(); }); return true; diff --git a/libs/cluster/Server/Failover/PrimaryFailoverSession.cs b/libs/cluster/Server/Failover/PrimaryFailoverSession.cs index e1f5eedd0da..84161f4f328 100644 --- a/libs/cluster/Server/Failover/PrimaryFailoverSession.cs +++ b/libs/cluster/Server/Failover/PrimaryFailoverSession.cs @@ -18,7 +18,7 @@ private Task CheckReplicaSync(GarnetClient gclient) if (!gclient.IsConnected) gclient.Connect(); - return gclient.failreplicationoffset(clusterProvider.replicationManager.ReplicationOffset).WaitAsync(clusterTimeout, cts.Token); + return gclient.ExecuteClusterFailReplicationOffset(clusterProvider.replicationManager.ReplicationOffset).WaitAsync(clusterTimeout, cts.Token); } catch (Exception ex) { @@ -38,7 +38,7 @@ private async Task WaitForFirstReplicaSync() tasks[tcount++] = CheckReplicaSync(_gclient); tasks[clients.Length] = Task.Delay(failoverTimeout).ContinueWith(_ => default(long)); - var completedTask = await Task.WhenAny(tasks); + var completedTask = await Task.WhenAny(tasks).ConfigureAwait(false); // No replica was able to catch up with primary so timeout if (completedTask == tasks[clients.Length]) @@ -59,7 +59,7 @@ private async Task WaitForFirstReplicaSync() { var syncTask = CheckReplicaSync(clients[0]); var timeoutTask = Task.Delay(failoverTimeout, cts.Token); - var completedTask = await Task.WhenAny(syncTask, timeoutTask); + var completedTask = await Task.WhenAny(syncTask, timeoutTask).ConfigureAwait(false); // Replica trying to failover did not caught up on time so timeout if (completedTask == timeoutTask) @@ -82,7 +82,7 @@ private async Task InitiateReplicaTakeOver(GarnetClient gclient) if (!gclient.IsConnected) gclient.Connect(); - return await gclient.Failover(FailoverOption.TAKEOVER).WaitAsync(clusterTimeout, cts.Token); + return await gclient.Failover(FailoverOption.TAKEOVER).WaitAsync(clusterTimeout, cts.Token).ConfigureAwait(false); } catch (Exception ex) { @@ -104,11 +104,11 @@ public async Task BeginAsyncPrimaryFailover() _ = clusterProvider.BumpAndWaitForEpochTransition(); status = FailoverStatus.WAITING_FOR_SYNC; - var newPrimary = await WaitForFirstReplicaSync(); + var newPrimary = await WaitForFirstReplicaSync().ConfigureAwait(false); if (newPrimary == null) return false; status = FailoverStatus.TAKING_OVER_AS_PRIMARY; - var success = await InitiateReplicaTakeOver(newPrimary); + var success = await InitiateReplicaTakeOver(newPrimary).ConfigureAwait(false); if (!success) return false; } catch (Exception ex) diff --git a/libs/cluster/Server/Failover/ReplicaFailoverSession.cs b/libs/cluster/Server/Failover/ReplicaFailoverSession.cs index 9fc01ffe6b2..44f6abb343c 100644 --- a/libs/cluster/Server/Failover/ReplicaFailoverSession.cs +++ b/libs/cluster/Server/Failover/ReplicaFailoverSession.cs @@ -42,7 +42,7 @@ private async Task CreateConnectionAsync(string nodeId) try { if (!client.IsConnected) - await client.ReconnectAsync().WaitAsync(failoverTimeout, cts.Token); + await client.ReconnectAsync().WaitAsync(failoverTimeout, cts.Token).ConfigureAwait(false); return client; } @@ -84,7 +84,7 @@ private async Task PauseWritesAndWaitForSync() // Issue stop writes to the primary status = FailoverStatus.ISSUING_PAUSE_WRITES; var localIdBytes = Encoding.ASCII.GetBytes(oldConfig.LocalNodeId); - var primaryReplicationOffset = await client.failstopwrites(localIdBytes).WaitAsync(failoverTimeout, cts.Token); + var primaryReplicationOffset = await client.failstopwrites(localIdBytes).WaitAsync(failoverTimeout, cts.Token).ConfigureAwait(false); // Wait for replica to catch up status = FailoverStatus.WAITING_FOR_SYNC; @@ -172,7 +172,7 @@ private async Task BroadcastConfigAndRequestAttach(string replicaId, byte[] conf { var oldPrimaryId = oldConfig.LocalNodePrimaryId; var newConfig = clusterProvider.clusterManager.CurrentConfig; - var client = oldPrimaryId.Equals(replicaId) ? primaryClient : await GetConnectionAsync(replicaId); + var client = oldPrimaryId.Equals(replicaId) ? primaryClient : await GetConnectionAsync(replicaId).ConfigureAwait(false); try { @@ -211,13 +211,13 @@ await client.Gossip(configByteArray).ContinueWith(t => { resp.Dispose(); } - }, TaskContinuationOptions.RunContinuationsAsynchronously).WaitAsync(failoverTimeout, cts.Token); + }, TaskContinuationOptions.RunContinuationsAsynchronously).WaitAsync(failoverTimeout, cts.Token).ConfigureAwait(false); var localAddress = oldConfig.LocalNodeIp; var localPort = oldConfig.LocalNodePort; // Ask replica to attach and sync - var replicaOfResp = await client.ReplicaOf(localAddress, localPort).WaitAsync(failoverTimeout, cts.Token); + var replicaOfResp = await client.ReplicaOf(localAddress, localPort).WaitAsync(failoverTimeout, cts.Token).ConfigureAwait(false); // Check if response for attach succeeded if (!replicaOfResp.Equals("OK")) @@ -254,7 +254,7 @@ private async Task IssueAttachReplicas() { try { - attachReplicaTasks.Add(Task.Run(async () => await BroadcastConfigAndRequestAttach(replicaId, configByteArray))); + attachReplicaTasks.Add(Task.Run(async () => await BroadcastConfigAndRequestAttach(replicaId, configByteArray).ConfigureAwait(false))); } catch (Exception ex) { @@ -297,7 +297,7 @@ public async Task BeginAsyncReplicaFailover() try { // Issue stop writes and on ack wait for replica to catch up - if (option is FailoverOption.DEFAULT && !await PauseWritesAndWaitForSync()) + if (option is FailoverOption.DEFAULT && !await PauseWritesAndWaitForSync().ConfigureAwait(false)) { return false; } diff --git a/libs/cluster/Server/Gossip/GarnetClientExtensions.cs b/libs/cluster/Server/Gossip/GarnetClientExtensions.cs index 0cf3053a0f5..5ba714bc05a 100644 --- a/libs/cluster/Server/Gossip/GarnetClientExtensions.cs +++ b/libs/cluster/Server/Gossip/GarnetClientExtensions.cs @@ -11,7 +11,7 @@ namespace Garnet.cluster { - internal static partial class GarnetClientExtensions + internal static class GarnetClientExtensions { static readonly Memory GOSSIP = "GOSSIP"u8.ToArray(); static readonly Memory WITHMEET = "WITHMEET"u8.ToArray(); @@ -56,7 +56,7 @@ public static async Task failstopwrites(this GarnetClient client, Memory /// /// - public static async Task failreplicationoffset(this GarnetClient client, long primaryReplicationOffset, CancellationToken cancellationToken = default) + public static async Task ExecuteClusterFailReplicationOffset(this GarnetClient client, long primaryReplicationOffset, CancellationToken cancellationToken = default) { var args = new Memory[] { CmdStrings.failreplicationoffset.ToArray(), @@ -65,7 +65,16 @@ public static async Task failreplicationoffset(this GarnetClient client, l return await client.ExecuteForLongResultWithCancellationAsync(GarnetClient.CLUSTER, args, cancellationToken).ConfigureAwait(false); } - public static void ClusterPublishNoResponse(this GarnetClient client, RespCommand cmd, ref Span channel, ref Span message, CancellationToken cancellationToken = default) + /// + /// Publishes a message to a specified channel in a clustered Garnet environment without waiting for a server + /// response. + /// + /// The Garnet client instance used to send the publish command. + /// The RESP command to execute. Must be either PUBLISH or SPUBLISH. + /// A span containing the channel name to which the message will be published. + /// A span containing the message to publish to the channel. + /// A cancellation token that can be used to cancel the operation. + public static void ExecuteClusterPublishNoResponse(this GarnetClient client, RespCommand cmd, ref Span channel, ref Span message, CancellationToken cancellationToken = default) => client.ExecuteNoResponse(GarnetClient.CLUSTER, RespCommand.PUBLISH == cmd ? GarnetClient.PUBLISH : GarnetClient.SPUBLISH, ref channel, ref message, cancellationToken); } } \ No newline at end of file diff --git a/libs/cluster/Server/Gossip/GarnetClusterConnectionStore.cs b/libs/cluster/Server/Gossip/GarnetClusterConnectionStore.cs index 90eaff8be8b..5f75f5c7b8f 100644 --- a/libs/cluster/Server/Gossip/GarnetClusterConnectionStore.cs +++ b/libs/cluster/Server/Gossip/GarnetClusterConnectionStore.cs @@ -152,7 +152,7 @@ public async Task AddConnection(GarnetServerNode conn) { try { - await AcquireWriteLockAsync(); + await AcquireWriteLockAsync().ConfigureAwait(false); if (_disposed) return false; @@ -182,7 +182,7 @@ public async Task AddConnection(GarnetServerNode conn) GarnetServerNode conn = null; try { - await AcquireWriteLockAsync(); + await AcquireWriteLockAsync().ConfigureAwait(false); // Fail on disposed if (_disposed) return (false, conn); @@ -215,7 +215,7 @@ public async Task TryRemoveConnection(string nodeId) { try { - await AcquireWriteLockAsync(); + await AcquireWriteLockAsync().ConfigureAwait(false); // Fail on disposed if (_disposed) return false; diff --git a/libs/cluster/Server/Gossip/GarnetServerNode.cs b/libs/cluster/Server/Gossip/GarnetServerNode.cs index 3b61e7c0e44..e9ed7c85bb5 100644 --- a/libs/cluster/Server/Gossip/GarnetServerNode.cs +++ b/libs/cluster/Server/Gossip/GarnetServerNode.cs @@ -79,7 +79,8 @@ public GarnetServerNode(ClusterProvider clusterProvider, EndPoint endpoint, SslC this.clusterProvider = clusterProvider; this.EndPoint = endpoint; this.gc = new GarnetClient( - endpoint, tlsOptions, + endpoint, + tlsOptions, sendPageSize: opts.DisablePubSub ? defaultSendPageSize : Math.Max(defaultSendPageSize, (int)opts.PubSubPageSizeBytes()), maxOutstandingTasks: defaultMaxOutstandingTask, timeoutMilliseconds: opts.ClusterTimeout <= 0 ? 0 : TimeSpan.FromSeconds(opts.ClusterTimeout).Milliseconds, @@ -178,7 +179,7 @@ byte[] GetMostRecentConfig() /// private Task Gossip(byte[] configByteArray) { - return gc.Gossip(configByteArray).ContinueWith(t => + return gc.Gossip(configByteArray, internalCts.Token).ContinueWith(t => { try { @@ -212,7 +213,7 @@ private Task Gossip(byte[] configByteArray) public async Task> TryMeetAsync(byte[] configByteArray) { UpdateGossipSend(); - var resp = await gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token); + var resp = await gc.GossipWithMeet(configByteArray, internalCts.Token).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token).ConfigureAwait(false); return resp; } @@ -297,7 +298,7 @@ public void TryClusterPublish(RespCommand cmd, ref Span channel, ref Span< } locked = true; - gc.ClusterPublishNoResponse(cmd, ref channel, ref message); + gc.ExecuteClusterPublishNoResponse(cmd, ref channel, ref message); } finally { diff --git a/libs/cluster/Server/Migration/MigrateSessionSlots.cs b/libs/cluster/Server/Migration/MigrateSessionSlots.cs index f4e55e02bae..292866a80e1 100644 --- a/libs/cluster/Server/Migration/MigrateSessionSlots.cs +++ b/libs/cluster/Server/Migration/MigrateSessionSlots.cs @@ -86,7 +86,7 @@ public async Task MigrateSlotsDriverInline() // Send store logger?.LogWarning("Store migrate scan range [{storeBeginAddress}, {storeTailAddress}]", storeBeginAddress, storeTailAddress); - var success = await CreateAndRunMigrateTasks(storeBeginAddress, storeTailAddress, storePageSize); + var success = await CreateAndRunMigrateTasks(storeBeginAddress, storeTailAddress, storePageSize).ConfigureAwait(false); if (!success) return false; return true; diff --git a/libs/cluster/Server/Migration/MigrationDriver.cs b/libs/cluster/Server/Migration/MigrationDriver.cs index 5da3d36a1e3..eeec905ab08 100644 --- a/libs/cluster/Server/Migration/MigrationDriver.cs +++ b/libs/cluster/Server/Migration/MigrationDriver.cs @@ -83,7 +83,7 @@ private async Task BeginAsyncMigrationTask() // If we have any namespaces, that implies Vector Sets, and if we have any of THOSE // we need to reserve destination sets on the other side - if ((_namespaces?.Count ?? 0) > 0 && !await ReserveDestinationVectorSetsAsync()) + if ((_namespaces?.Count ?? 0) > 0 && !await ReserveDestinationVectorSetsAsync().ConfigureAwait(false)) { logger?.LogError("Failed to reserve destination vector sets, migration failed"); TryRecoverFromFailure(); @@ -93,7 +93,7 @@ private async Task BeginAsyncMigrationTask() #region migrateData // Migrate actual data - if (!await MigrateSlotsDriverInline()) + if (!await MigrateSlotsDriverInline().ConfigureAwait(false)) { logger?.LogError("MigrateSlotsDriver failed"); TryRecoverFromFailure(); @@ -106,7 +106,7 @@ private async Task BeginAsyncMigrationTask() // Lock config merge to avoid a background epoch bump clusterProvider.clusterManager.SuspendConfigMerge(); configResumed = false; - await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false); + await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false).ConfigureAwait(false); // Change ownership of slots to target node. if (!TrySetSlotRanges(GetTargetNodeId, MigrateState.NODE)) @@ -127,7 +127,7 @@ private async Task BeginAsyncMigrationTask() } // Gossip again to ensure that source and target agree on the slot exchange - await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false); + await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false).ConfigureAwait(false); #endregion // Enqueue success log diff --git a/libs/cluster/Session/RespClusterBasicCommands.cs b/libs/cluster/Session/RespClusterBasicCommands.cs index d5aaec9680d..44429aba8c4 100644 --- a/libs/cluster/Session/RespClusterBasicCommands.cs +++ b/libs/cluster/Session/RespClusterBasicCommands.cs @@ -492,6 +492,7 @@ private bool NetworkClusterReset(out bool invalidParameters) /// /// /// + /// private bool NetworkClusterPublish(out bool invalidParameters) { invalidParameters = false; diff --git a/libs/cluster/Session/RespClusterReplicationCommands.cs b/libs/cluster/Session/RespClusterReplicationCommands.cs index 7966343bd59..8600c93960b 100644 --- a/libs/cluster/Session/RespClusterReplicationCommands.cs +++ b/libs/cluster/Session/RespClusterReplicationCommands.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Diagnostics; using System.Text; using Garnet.client; using Garnet.cluster.Server.Replication; @@ -242,6 +243,8 @@ private bool NetworkClusterAppendLog(out bool invalidParameters) return true; } + LogPrimaryStream(previousAddress, currentAddress, nextAddress, logger); + var sbRecord = parseState.GetArgSliceByRef(4); var currentConfig = clusterProvider.clusterManager.CurrentConfig; @@ -263,6 +266,18 @@ private bool NetworkClusterAppendLog(out bool invalidParameters) previousAddress, currentAddress, nextAddress); } + [Conditional("DEBUG")] + static void LogPrimaryStream(long previousAddress, long currentAddress, long nextAddress, ILogger logger) + { + var state = new GarnetTestLoggingEvent() + { + Type = GarnetTestLoggingEventType.LogPrimaryStreamType, + Message = $"previousAddress: {previousAddress}, currentAddress: {currentAddress}, nextAddress: {nextAddress}", + }; + + logger?.LogTesting(state); + } + return true; } diff --git a/libs/common/Format.cs b/libs/common/Format.cs index 345987fdb34..268df06ab9d 100644 --- a/libs/common/Format.cs +++ b/libs/common/Format.cs @@ -117,7 +117,7 @@ public static async Task TryCreateEndpoint(string singleAddressOrHos foreach (var entry in ipAddresses) { var endpoint = new IPEndPoint(entry, port); - var IsListening = await TryConnect(endpoint); + var IsListening = await TryConnect(endpoint).ConfigureAwait(false); if (IsListening) return [endpoint]; } } @@ -149,7 +149,7 @@ async Task TryConnect(IPEndPoint endpoint) { try { - await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port); + await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port).ConfigureAwait(false); logger?.LogTrace("Reachable {ip} {port}", endpoint.Address, endpoint.Port); return true; } diff --git a/libs/common/LightClient.cs b/libs/common/LightClient.cs index b72b85214fa..0eb8c199cde 100644 --- a/libs/common/LightClient.cs +++ b/libs/common/LightClient.cs @@ -136,7 +136,7 @@ private async Task ConnectSendSocketAsync(CancellationToken cancellation NoDelay = true }; - if (await TryConnectSocketAsync(socket, endpoint, cancellationToken)) + if (await TryConnectSocketAsync(socket, endpoint, cancellationToken).ConfigureAwait(false)) return socket; } } @@ -146,7 +146,7 @@ private async Task ConnectSendSocketAsync(CancellationToken cancellation if (endpoint is not UnixDomainSocketEndPoint) socket.NoDelay = true; - if (await TryConnectSocketAsync(socket, endpoint, cancellationToken)) + if (await TryConnectSocketAsync(socket, endpoint, cancellationToken).ConfigureAwait(false)) return socket; } diff --git a/libs/common/Networking/TcpNetworkHandlerBase.cs b/libs/common/Networking/TcpNetworkHandlerBase.cs index 8a7fefe1c5d..8fc763f77a6 100644 --- a/libs/common/Networking/TcpNetworkHandlerBase.cs +++ b/libs/common/Networking/TcpNetworkHandlerBase.cs @@ -246,7 +246,7 @@ private async ValueTask HandleReceiveWithTLSAsync(object sender, SocketAsyncEven var receiveTask = OnNetworkReceiveWithTLSAsync(e.BytesTransferred); if (!receiveTask.IsCompletedSuccessfully) { - await receiveTask; + await receiveTask.ConfigureAwait(false); } e.SetBuffer(networkReceiveBuffer, networkBytesRead, networkReceiveBuffer.Length - networkBytesRead); } while (!e.AcceptSocket.ReceiveAsync(e)); diff --git a/libs/common/ExceptionInjectionHelper.cs b/libs/common/Testing/ExceptionInjectionHelper.cs similarity index 100% rename from libs/common/ExceptionInjectionHelper.cs rename to libs/common/Testing/ExceptionInjectionHelper.cs diff --git a/libs/common/ExceptionInjectionType.cs b/libs/common/Testing/ExceptionInjectionType.cs similarity index 100% rename from libs/common/ExceptionInjectionType.cs rename to libs/common/Testing/ExceptionInjectionType.cs diff --git a/libs/common/Testing/GarnetTestLoggingEventType.cs b/libs/common/Testing/GarnetTestLoggingEventType.cs new file mode 100644 index 00000000000..b2f67f3fb26 --- /dev/null +++ b/libs/common/Testing/GarnetTestLoggingEventType.cs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using Microsoft.Extensions.Logging; + +namespace Garnet.common +{ + public enum GarnetTestLoggingEventType : int + { + LogPrimaryStreamType + }; + + public struct GarnetTestLoggingEvent + { + public GarnetTestLoggingEventType Type; + public string Message; + + public override string ToString() => $"<{Type}>: {Message}"; + } + + public static class LoggingExtensions + { + public static void LogTesting(this ILogger logger, GarnetTestLoggingEvent state) + { + logger?.Log(LogLevel.Critical, + eventId: default, + state: state, + exception: null, + formatter: static (state, _) => $"{state}"); + } + } +} \ No newline at end of file diff --git a/libs/server/Metrics/GarnetServerMonitor.cs b/libs/server/Metrics/GarnetServerMonitor.cs index ac820054946..9a34734f721 100644 --- a/libs/server/Metrics/GarnetServerMonitor.cs +++ b/libs/server/Metrics/GarnetServerMonitor.cs @@ -230,7 +230,7 @@ private async void MainMonitorTask(CancellationToken token) { while (true) { - await Task.Delay(monitorSamplingFrequency, token); + await Task.Delay(monitorSamplingFrequency, token).ConfigureAwait(false); // Reset the session level latency metrics for the prior version, as we are // about to make that the current version. diff --git a/libs/server/Metrics/Info/InfoCommand.cs b/libs/server/Metrics/Info/InfoCommand.cs index 0722fdd3b9e..77de500aff1 100644 --- a/libs/server/Metrics/Info/InfoCommand.cs +++ b/libs/server/Metrics/Info/InfoCommand.cs @@ -80,7 +80,6 @@ private bool NetworkINFO() } } return true; - } private void GetHelpMessage() diff --git a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs index 6feaa72ac2c..b9dff74b45d 100644 --- a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs +++ b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs @@ -94,7 +94,7 @@ internal async Task GetCollectionItemAsync(RespCommand com RespServerSession session, double timeoutInSeconds, PinnedSpanByte[] cmdArgs = null) { var observer = new CollectionItemObserver(session, command, cmdArgs); - return await GetCollectionItemAsync(observer, keys, timeoutInSeconds); + return await GetCollectionItemAsync(observer, keys, timeoutInSeconds).ConfigureAwait(false); } /// @@ -111,7 +111,7 @@ internal async Task MoveCollectionItemAsync(RespCommand co RespServerSession session, double timeoutInSeconds, PinnedSpanByte[] cmdArgs) { var observer = new CollectionItemObserver(session, command, cmdArgs); - return await GetCollectionItemAsync(observer, [srcKey], timeoutInSeconds); + return await GetCollectionItemAsync(observer, [srcKey], timeoutInSeconds).ConfigureAwait(false); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -143,7 +143,7 @@ private async Task GetCollectionItemAsync(CollectionItemOb try { // Wait for either the result found notification or the timeout to expire - await observer.ResultFoundSemaphore.WaitAsync(timeout, observer.CancellationTokenSource.Token); + await observer.ResultFoundSemaphore.WaitAsync(timeout, observer.CancellationTokenSource.Token).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -693,7 +693,7 @@ private async Task Start() // once event is dequeued successfully, call handler method try { - nextEvent = await brokerEventsQueue.DequeueAsync(cts.Token); + nextEvent = await brokerEventsQueue.DequeueAsync(cts.Token).ConfigureAwait(false); } catch (OperationCanceledException) { diff --git a/libs/server/Resp/AsyncProcessor.cs b/libs/server/Resp/AsyncProcessor.cs index 66d5466b875..404599fba6b 100644 --- a/libs/server/Resp/AsyncProcessor.cs +++ b/libs/server/Resp/AsyncProcessor.cs @@ -62,7 +62,7 @@ void NetworkGETPending(ref TGarnetApi storageApi) RunContinuationsAsynchronously = true }; var _storageApi = storageApi; - _ = Task.Run(async () => await AsyncGetProcessor(_storageApi)); + _ = Task.Run(async () => await AsyncGetProcessor(_storageApi).ConfigureAwait(false)); } else { @@ -138,7 +138,7 @@ async Task AsyncGetProcessor(TGarnetApi storageApi) // Wait for next async operation // We do not need to cancel the wait - it should get garbage collected when the session ends - await asyncWaiter.WaitAsync(); + await asyncWaiter.WaitAsync().ConfigureAwait(false); } } } diff --git a/libs/server/Servers/StoreApi.cs b/libs/server/Servers/StoreApi.cs index 5ff169c9fd5..418bbad3099 100644 --- a/libs/server/Servers/StoreApi.cs +++ b/libs/server/Servers/StoreApi.cs @@ -71,7 +71,7 @@ public async ValueTask WaitForCommitAsync(CancellationToken token = defaul return false; } - return await storeWrapper.WaitForCommitAsync(token: token); + return await storeWrapper.WaitForCommitAsync(token: token).ConfigureAwait(false); } } @@ -105,7 +105,7 @@ public async ValueTask CommitAOFAsync(CancellationToken token) return false; } - return await storeWrapper.CommitAOFAsync(token: token); + return await storeWrapper.CommitAOFAsync(token: token).ConfigureAwait(false); } } diff --git a/libs/server/Storage/Session/MainStore/AdvancedOps.cs b/libs/server/Storage/Session/MainStore/AdvancedOps.cs index a28c3b75b6d..5ffc97f4c7e 100644 --- a/libs/server/Storage/Session/MainStore/AdvancedOps.cs +++ b/libs/server/Storage/Session/MainStore/AdvancedOps.cs @@ -98,7 +98,6 @@ public GarnetStatus Read_MainStore(ReadOnlySpan key, ref S return GarnetStatus.NOTFOUND; } - public void ReadWithPrefetch(ref TBatch batch, ref TContext context, long userContext = default) where TBatch : IReadArgBatch #if NET9_0_OR_GREATER diff --git a/libs/server/TaskManager/TaskManager.cs b/libs/server/TaskManager/TaskManager.cs index 7545296601e..9ed3be7225c 100644 --- a/libs/server/TaskManager/TaskManager.cs +++ b/libs/server/TaskManager/TaskManager.cs @@ -97,7 +97,7 @@ public bool RegisterAndRun(TaskType taskType, Func task // Execute task factory if (cleanupOnCompletion) - taskMetadata.Task = taskFactory(taskMetadata.Cts.Token).ContinueWith(async _ => await Cancel(taskType)).Unwrap(); + taskMetadata.Task = taskFactory(taskMetadata.Cts.Token).ContinueWith(async _ => await Cancel(taskType).ConfigureAwait(false)).Unwrap(); else taskMetadata.Task = taskFactory(taskMetadata.Cts.Token); } @@ -131,7 +131,7 @@ public async Task Cancel(TaskType taskType) { taskMetadata.Cts.Cancel(); if (taskMetadata.Task != null) - await taskMetadata.Task.WaitAsync(disposed ? default : cts.Token); + await taskMetadata.Task.WaitAsync(disposed ? default : cts.Token).ConfigureAwait(false); } } catch (Exception ex) @@ -149,7 +149,7 @@ public async Task Cancel(TaskType taskType) public async Task Cancel(TaskPlacementCategory taskPlacementCategory) { foreach (var taskType in TaskTypeExtensions.GetTaskTypes(taskPlacementCategory)) - await Cancel(taskType); + await Cancel(taskType).ConfigureAwait(false); } /// @@ -171,7 +171,7 @@ public async Task WaitAsync(TaskType taskType, CancellationToken token = d { if (registry.TryGetValue(taskType, out var taskInfo)) { - await taskInfo.Task.WaitAsync(token); + await taskInfo.Task.WaitAsync(token).ConfigureAwait(false); return true; } return false; diff --git a/libs/storage/Tsavorite/cs/src/core/Device/RandomAccessLocalStorageDevice.cs b/libs/storage/Tsavorite/cs/src/core/Device/RandomAccessLocalStorageDevice.cs index 423f5fd20c7..0d2d85d4407 100644 --- a/libs/storage/Tsavorite/cs/src/core/Device/RandomAccessLocalStorageDevice.cs +++ b/libs/storage/Tsavorite/cs/src/core/Device/RandomAccessLocalStorageDevice.cs @@ -239,7 +239,7 @@ async ValueTask WriteWorkerAsync(IntPtr sourceAddress, int segmentId, ulong dest { storageAccessContext.memoryManager.SetDestination((byte*)sourceAddress, (int)numBytesToWrite); } - await RandomAccess.WriteAsync(storageAccessContext.handle.SafeFileHandle, storageAccessContext.memoryManager.Memory, (long)destinationAddress); + await RandomAccess.WriteAsync(storageAccessContext.handle.SafeFileHandle, storageAccessContext.memoryManager.Memory, (long)destinationAddress).ConfigureAwait(false); } catch (Exception ex) { diff --git a/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs b/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs index ca186c6a5f8..5a340e42615 100644 --- a/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs +++ b/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs @@ -193,7 +193,7 @@ int SelectInstance() if (kInvalidIndex == Interlocked.CompareExchange(ref entry, 1, kInvalidIndex)) return i; } - throw new InvalidOperationException("Exceeded maximum number of active LightEpoch instances"); + throw new InvalidOperationException($"Exceeded maximum number of active LightEpoch instances {ActiveInstanceCount()} {InstanceIndexBuffer.MaxInstances}"); } /// diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs index 571ef07911d..f06a926ae76 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs @@ -262,7 +262,7 @@ public async Task WaitForStateChange(SystemState currentState) var _waitForTransitionOut = waitForTransitionOut; if (SystemState.Equal(currentState, systemState)) { - await _waitForTransitionOut.WaitAsync(); + await _waitForTransitionOut.WaitAsync().ConfigureAwait(false); } } @@ -273,12 +273,12 @@ public async Task WaitForStateChange(SystemState currentState) /// public async Task WaitForCompletion(SystemState currentState) { - await WaitForStateChange(currentState); + await WaitForStateChange(currentState).ConfigureAwait(false); currentState = systemState; var _waitForTransitionIn = waitForTransitionIn; if (SystemState.Equal(currentState, systemState)) { - await _waitForTransitionIn.WaitAsync(); + await _waitForTransitionIn.WaitAsync().ConfigureAwait(false); } } @@ -304,7 +304,7 @@ void MakeTransitionWorker(SystemState nextState) async Task ProcessWaitingListAsync(CancellationToken token = default) { - await waitForTransitionIn.WaitAsync(token); + await waitForTransitionIn.WaitAsync(token).ConfigureAwait(false); if (waitForTransitionInException != null) { throw waitForTransitionInException; @@ -332,7 +332,7 @@ async Task RunStateMachine(CancellationToken token = default) do { GlobalStateMachineStep(systemState); - await ProcessWaitingListAsync(token); + await ProcessWaitingListAsync(token).ConfigureAwait(false); } while (systemState.Phase != Phase.REST); } catch (Exception e) diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs b/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs index 057c2372bda..beaefe8cf1b 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs @@ -241,7 +241,7 @@ async Task ResizerTask(CancellationToken cancellationToken) // these calls to WaitAsync will be lost. ResizeIfNeeded retries as long as we are over budget, // but there is still a chance we'll miss a growth+signal between that check and the next WaitAsync. // The timeout mitigates this but it would be better to find an awaitable ManualResetEvent. - await resizeTaskEvent.WaitAsync(TimeSpan.FromSeconds(ResizeTaskDelaySeconds), cancellationToken); + await resizeTaskEvent.WaitAsync(TimeSpan.FromSeconds(ResizeTaskDelaySeconds), cancellationToken).ConfigureAwait(false); if (runState == (int)RunState.Running) ResizeIfNeeded(cancellationToken); if (runState != (int)RunState.Running) diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs index 6060ab5f21e..360b16e71f8 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs @@ -67,7 +67,6 @@ internal async ValueTask IsIndexFuzzyCheckpointCompletedAsync(CancellationToken await t2.ConfigureAwait(false); } - // Implementation of an asynchronous checkpointing scheme // for main hash index of Tsavorite private int mainIndexCheckpointCallbackCount; @@ -166,7 +165,7 @@ private async ValueTask IsMainIndexCheckpointCompletedAsync(CancellationToken to await mainIndexCheckpointTcs.Task.WaitAsync(token).ConfigureAwait(false); } - private unsafe void AsyncPageFlushCallback(uint errorCode, uint numBytes, object context) + private void AsyncPageFlushCallback(uint errorCode, uint numBytes, object context) { // Set the page status to flushed var mem = ((HashIndexPageAsyncFlushResult)context).mem; diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs index a540ab3e300..8f148c16f50 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs @@ -761,7 +761,7 @@ private async Task TrimLogMemorySizeAsync(RecoveryStatus recoveryStatus, l var pageIndex = hlogBase.GetPageIndexForPage(page); if (hlogBase.IsAllocated(pageIndex)) { - await recoveryStatus.WaitFlushAsync(pageIndex, cancellationToken); + await recoveryStatus.WaitFlushAsync(pageIndex, cancellationToken).ConfigureAwait(false); hlogBase.EvictPageForRecovery(page); lastFreedPage = page; } @@ -898,7 +898,7 @@ private async ValueTask RecoverHybridLogAsync(long scanFromAddress, long r // Trim the log memory again in case we read large objects on the current page. Add 1 to tailPage so that // when the BufferSize subtraction wraps around the buffer it won't try to evict the page we just added. // Decrease trimPageReadCount as we process each page so we don't over-prune. - freedPage = await TrimLogMemorySizeAsync(recoveryStatus, tailPage: p + 1, trimPageReadCount--, cancellationToken); + freedPage = await TrimLogMemorySizeAsync(recoveryStatus, tailPage: p + 1, trimPageReadCount--, cancellationToken).ConfigureAwait(false); if (freedPage != NoPageFreed) lastFreedPage = freedPage; } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/Revivification/CheckEmptyWorker.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/Revivification/CheckEmptyWorker.cs index 2d6546c9773..1376454f77b 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/Revivification/CheckEmptyWorker.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/Revivification/CheckEmptyWorker.cs @@ -54,7 +54,7 @@ private async void LaunchWorker() { try { - await Task.Delay(1000, cts.Token); + await Task.Delay(1000, cts.Token).ConfigureAwait(false); if (disposed) break; recordPool.ScanForEmpty(cts.Token); diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs index 619eabf2d66..6ebc2261abd 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs @@ -495,7 +495,7 @@ public async ValueTask CompleteCheckpointAsync(CancellationToken token = default token.ThrowIfCancellationRequested(); try { - await stateMachineDriver.CompleteAsync(token); + await stateMachineDriver.CompleteAsync(token).ConfigureAwait(false); } catch { @@ -831,7 +831,7 @@ public async Task GrowIndexAsync() var indexResizeTask = new IndexResizeSMTask(this); var indexResizeSM = new IndexResizeSM(indexResizeTask); - return await stateMachineDriver.RunAsync(indexResizeSM); + return await stateMachineDriver.RunAsync(indexResizeSM).ConfigureAwait(false); } /// diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/LogCommitPolicy.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/LogCommitPolicy.cs index a82cc6df0f0..933021d10fe 100644 --- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/LogCommitPolicy.cs +++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/LogCommitPolicy.cs @@ -175,7 +175,7 @@ public override bool AdmitCommit(long currentTail, bool commitRequired) { Task.Run(async () => { - await Task.Delay(TimeSpan.FromMilliseconds(thresholdMilli)); + await Task.Delay(TimeSpan.FromMilliseconds(thresholdMilli)).ConfigureAwait(false); shouldRetry = 0; log.Commit(); }); diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs index 4bb9cf5dff1..93434dafc46 100644 --- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs +++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs @@ -174,9 +174,9 @@ public sealed class TsavoriteLog : IDisposable /// Create new log instance /// /// Log settings - /// Log settings + /// User provided logger instance public TsavoriteLog(TsavoriteLogSettings logSettings, ILogger logger = null) - : this(logSettings, logSettings.TryRecoverLatest, logger) + : this(logSettings, logSettings.TryRecoverLatest, logger: logger) { } /// @@ -1600,7 +1600,6 @@ public async ValueTask WaitUncommittedAsync(long nextAddress, Cancellation /// If true, spin-wait until commit completes. Otherwise, issue commit and return immediately. /// /// whether there is anything to commit. - public void Commit(bool spinWait = false, byte[] cookie = null) { // Take a lower-bound of the content of this commit in case our request is filtered but we need to spin @@ -1650,6 +1649,8 @@ public bool CommitStrongly(out long commitTail, out long actualCommitNum, bool s /// complete the commit. Throws exception if this or any /// ongoing commit fails. /// + /// + /// /// public async ValueTask CommitAsync(byte[] cookie = null, CancellationToken token = default) { @@ -2529,7 +2530,7 @@ public async ValueTask RecoverReadOnlyAsync(CancellationToken cancellationToken private void SignalWaitingROIterators() { - // One RecoverReadOnly use case is to allow a TsavoriteLogIterator to continuously read a mirror TsavoriteLog (over the same log storage) of a primary TsavoriteLog. + // One RecoverReadOnly use case is to allow a TsavoriteLogScanIterator to continuously read a mirror TsavoriteLog (over the same log storage) of a primary TsavoriteLog. // In this scenario, when the iterator arrives at the tail after a previous call to RestoreReadOnly, it will wait asynchronously until more data // is committed and read by a subsequent call to RecoverReadOnly. Here, we signal iterators that we have completed recovery. var _commitTcs = commitTcs; @@ -3053,7 +3054,6 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool ongoingCommitRequests.Enqueue((commitTail, info)); } - // As an optimization, if a concurrent flush has already advanced FlushedUntilAddress // past this commit, we can manually trigger a commit callback for safety, and return. if (commitTail <= FlushedUntilAddress) diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogRecoveryInfo.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogRecoveryInfo.cs index e54cd22bb21..fe3e4c760d0 100644 --- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogRecoveryInfo.cs +++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogRecoveryInfo.cs @@ -44,7 +44,7 @@ public struct TsavoriteLogRecoveryInfo public bool FastForwardAllowed; /// - /// callback to invoke when commit is presistent + /// callback to invoke when commit is persisted /// public Action Callback; diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogScanIterator.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogScanIterator.cs index 392e4cf02b7..ce94a4ec3ab 100644 --- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogScanIterator.cs +++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogScanIterator.cs @@ -461,6 +461,9 @@ public unsafe bool TryConsumeNext(T consumer) where T : ILogEntryConsumer /// whether a next entry is present public unsafe bool TryBulkConsumeNext(T consumer, int maxChunkSize = 0) where T : IBulkLogEntryConsumer { + // Throttle and implicitly check for consumer liveness + consumer.Throttle(); + if (maxChunkSize == 0) maxChunkSize = allocator.PageSize; if (disposed) diff --git a/libs/storage/Tsavorite/cs/src/core/Utilities/Utility.cs b/libs/storage/Tsavorite/cs/src/core/Utilities/Utility.cs index 47d233b4068..4e71c5a52c0 100644 --- a/libs/storage/Tsavorite/cs/src/core/Utilities/Utility.cs +++ b/libs/storage/Tsavorite/cs/src/core/Utilities/Utility.cs @@ -434,14 +434,14 @@ private static async Task SlowWithCancellationAsync(Task task, Cancella var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); using (token.Register(s => ((TaskCompletionSource)s).TrySetResult(true), tcs, useSynchronizationContext)) { - if (task != await Task.WhenAny(task, tcs.Task)) + if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false)) { token.ThrowIfCancellationRequested(); } } // make sure any exceptions in the task get unwrapped and exposed to the caller. - return await task; + return await task.ConfigureAwait(false); } /// diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs index 93d8eb6c99e..67b4b952c17 100644 --- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs +++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs @@ -197,7 +197,7 @@ await BlobManager.PerformWithRetriesAsync( pageResults = page.Values; continuationToken = page.ContinuationToken; return page.Values.Count; // not accurate, in terms of bytes, but still useful for tracing purposes - }); + }).ConfigureAwait(false); foreach (var item in pageResults) { @@ -217,7 +217,7 @@ await BlobManager.PerformWithRetriesAsync( while (!string.IsNullOrEmpty(continuationToken)); // make sure we did not lose the lease while iterating to find the blobs - await BlobManager.ConfirmLeaseIsGoodForAWhileAsync(); + await BlobManager.ConfirmLeaseIsGoodForAWhileAsync().ConfigureAwait(false); StorageErrorHandler.Token.ThrowIfCancellationRequested(); @@ -384,7 +384,7 @@ public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAs async (numAttempts) => { var client = (numAttempts > 1) ? entry.PageBlob.Default : entry.PageBlob.Aggressive; - await client.DeleteAsync(cancellationToken: StorageErrorHandler.Token); + await client.DeleteAsync(cancellationToken: StorageErrorHandler.Token).ConfigureAwait(false); return 1; }); } @@ -419,7 +419,7 @@ Task Delete(BlobEntry entry) async (numAttempts) => { var client = (numAttempts > 1) ? entry.PageBlob.Default : entry.PageBlob.Aggressive; - await client.DeleteAsync(cancellationToken: StorageErrorHandler.Token); + await client.DeleteAsync(cancellationToken: StorageErrorHandler.Token).ConfigureAwait(false); return 1; }); } @@ -585,7 +585,7 @@ await BlobManager.PerformWithRetriesAsync( }, async () => { - var response = await blobEntry.PageBlob.Default.GetPropertiesAsync(); + var response = await blobEntry.PageBlob.Default.GetPropertiesAsync().ConfigureAwait(false); blobEntry.ETag = response.Value.ETag; }).ConfigureAwait(false); @@ -642,7 +642,7 @@ await BlobManager.PerformWithRetriesAsync( } return length; - }); + }).ConfigureAwait(false); readLength -= length; offset += length; @@ -666,34 +666,34 @@ unsafe void WriteToBlobAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong de { WriteToBlobAsync(blobEntry, sourceAddress, (long)destinationAddress, numBytesToWrite, id) .ContinueWith((Task t) => + { + if (pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request)) { - if (pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request)) + if (t.IsFaulted) { - if (t.IsFaulted) - { - BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} (Failure)"); - request.Callback(uint.MaxValue, request.NumBytes, request.Context); - } - else - { - BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id}"); - request.Callback(0, request.NumBytes, request.Context); - } + BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} (Failure)"); + request.Callback(uint.MaxValue, request.NumBytes, request.Context); } - - if (underLease) + else { - InitialWriterSemaphore.Release(); + BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id}"); + request.Callback(0, request.NumBytes, request.Context); } + } - }, TaskContinuationOptions.ExecuteSynchronously); + if (underLease) + { + InitialWriterSemaphore.Release(); + } + + }, TaskContinuationOptions.ExecuteSynchronously); } async Task WriteToBlobAsync(BlobEntry blobEntry, IntPtr sourceAddress, long destinationAddress, uint numBytesToWrite, long id) { if (underLease) { - await InitialWriterSemaphore.WaitAsync(); + await InitialWriterSemaphore.WaitAsync().ConfigureAwait(false); } long offset = 0; diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobEntry.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobEntry.cs index 9068b8543a8..f201d322e3c 100644 --- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobEntry.cs +++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobEntry.cs @@ -71,16 +71,16 @@ await azureStorageDevice.BlobManager.PerformWithRetriesAsync( var response = await client.CreateAsync( size: size, conditions: new Azure.Storage.Blobs.Models.PageBlobRequestConditions() { IfNoneMatch = Azure.ETag.All }, - cancellationToken: azureStorageDevice.StorageErrorHandler.Token); + cancellationToken: azureStorageDevice.StorageErrorHandler.Token).ConfigureAwait(false); ETag = response.Value.ETag; return 1; }, async () => { - var response = await pageBlob.Default.GetPropertiesAsync(); + var response = await pageBlob.Default.GetPropertiesAsync().ConfigureAwait(false); ETag = response.Value.ETag; - }); + }).ConfigureAwait(false); // At this point the blob is fully created. After this line all consequent writers will write immediately. We just // need to clear the queue of pending writers. diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobManager.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobManager.cs index c4fc8ef4368..e6758171179 100644 --- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobManager.cs +++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobManager.cs @@ -107,7 +107,7 @@ async Task StartAsync() { leaseBlob = leaseBlobDirectory.GetBlockBlobClient(LeaseBlobName); leaseClient = leaseBlob.WithRetries.GetBlobLeaseClient(); - await AcquireOwnership(); + await AcquireOwnership().ConfigureAwait(false); } /// @@ -130,7 +130,7 @@ public async Task StopAsync() { shutDownOrTermination.Cancel(); // has no effect if already cancelled - await LeaseMaintenanceLoopTask; // wait for loop to terminate cleanly + await LeaseMaintenanceLoopTask.ConfigureAwait(false); // wait for loop to terminate cleanly } /// @@ -186,7 +186,7 @@ await leaseClient.AcquireAsync( // the previous owner has not released the lease yet, // try again until it becomes available, should be relatively soon // as the transport layer is supposed to shut down the previous owner when starting this - await Task.Delay(TimeSpan.FromSeconds(1), StorageErrorHandler.Token); + await Task.Delay(TimeSpan.FromSeconds(1), StorageErrorHandler.Token).ConfigureAwait(false); continue; } @@ -207,7 +207,7 @@ await PerformWithRetriesAsync( try { var client = numAttempts > 2 ? leaseBlob.Default : leaseBlob.Aggressive; - await client.UploadAsync(new MemoryStream()); + await client.UploadAsync(new MemoryStream()).ConfigureAwait(false); } catch (Azure.RequestFailedException ex2) when (BlobUtilsV12.LeaseConflictOrExpired(ex2)) { @@ -216,7 +216,7 @@ await PerformWithRetriesAsync( } return 1; - }); + }).ConfigureAwait(false); continue; } @@ -241,7 +241,7 @@ await PerformWithRetriesAsync( { TimeSpan nextRetryIn = GetDelayBetweenRetries(numAttempts); TraceHelper.TsavoritePerfWarning($"Lease acquisition failed transiently, retrying in {nextRetryIn}"); - await Task.Delay(nextRetryIn); + await Task.Delay(nextRetryIn).ConfigureAwait(false); } continue; } @@ -303,7 +303,7 @@ public async Task MaintenanceLoopAsync() } // wait for successful renewal, or exit the loop as this throws - await NextLeaseRenewalTask; + await NextLeaseRenewalTask.ConfigureAwait(false); } } catch (OperationCanceledException) @@ -332,7 +332,7 @@ public async Task MaintenanceLoopAsync() && !StorageErrorHandler.IsTerminated && (leaseTimer?.Elapsed < LeaseDuration)) { - await Task.Delay(20); // give storage accesses that are in progress and require the lease a chance to complete + await Task.Delay(20).ConfigureAwait(false); // give storage accesses that are in progress and require the lease a chance to complete } TraceHelper.LeaseProgress("Waited for lease users to complete"); diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobUtilsV12.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobUtilsV12.cs index 9109dfa73c3..c0a4a55f787 100644 --- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobUtilsV12.cs +++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobUtilsV12.cs @@ -203,7 +203,7 @@ public static async Task ForceDeleteAsync(BlobContainerClient containerCli try { - await blob.DeleteAsync(); + await blob.DeleteAsync().ConfigureAwait(false); return true; } catch (Azure.RequestFailedException e) when (BlobDoesNotExist(e)) @@ -215,7 +215,7 @@ public static async Task ForceDeleteAsync(BlobContainerClient containerCli try { var leaseClient = new BlobLeaseClient(blob); - await leaseClient.BreakAsync(TimeSpan.Zero); + await leaseClient.BreakAsync(TimeSpan.Zero).ConfigureAwait(false); } catch { diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageErrorHandler.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageErrorHandler.cs index 484503ab158..1425f56a818 100644 --- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageErrorHandler.cs +++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageErrorHandler.cs @@ -145,7 +145,7 @@ void Shutdown() public async Task WaitForTermination(TimeSpan timeout) { Task timeoutTask = Task.Delay(timeout); - var first = await Task.WhenAny(timeoutTask, shutdownComplete.Task); + var first = await Task.WhenAny(timeoutTask, shutdownComplete.Task).ConfigureAwait(false); return first == shutdownComplete.Task; } } diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageOperations.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageOperations.cs index 9c2d8e12820..009f5830c4b 100644 --- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageOperations.cs +++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageOperations.cs @@ -27,7 +27,7 @@ public async Task PerformWithRetriesAsync( { if (semaphore != null) { - await semaphore.WaitAsync(); + await semaphore.WaitAsync().ConfigureAwait(false); } Stopwatch stopwatch = new(); @@ -49,7 +49,7 @@ public async Task PerformWithRetriesAsync( } Interlocked.Increment(ref LeaseUsers); - await ConfirmLeaseIsGoodForAWhileAsync(); + await ConfirmLeaseIsGoodForAWhileAsync().ConfigureAwait(false); } StorageErrorHandler.Token.ThrowIfCancellationRequested(); diff --git a/test/Garnet.test.cluster/ClusterTestContext.cs b/test/Garnet.test.cluster/ClusterTestContext.cs index fce61b07f95..94ea171cb48 100644 --- a/test/Garnet.test.cluster/ClusterTestContext.cs +++ b/test/Garnet.test.cluster/ClusterTestContext.cs @@ -10,6 +10,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using Garnet.common; using Garnet.server; using Garnet.server.Auth.Settings; using Microsoft.Extensions.Logging; @@ -30,6 +31,7 @@ public class ClusterTestContext public EndPointCollection endpoints; public TextWriter logTextWriter = TestContext.Progress; public ILoggerFactory loggerFactory; + public NUnitLoggerProvider loggerProvider; public ILogger logger; public int defaultShards = 3; @@ -44,6 +46,12 @@ public class ClusterTestContext public CancellationTokenSource cts; + public void EnableGarnetLoggingEvents(GarnetTestLoggingEventType[] events) + { + foreach (var e in events) + loggerProvider.GarnetTestLoggingEvents[(int)e] = true; + } + public void Setup(Dictionary monitorTests, int testTimeoutSeconds = 60) { cts = new CancellationTokenSource(TimeSpan.FromSeconds(testTimeoutSeconds)); @@ -52,7 +60,7 @@ public void Setup(Dictionary monitorTests, int testTimeoutSeco var logLevel = LogLevel.Error; if (!string.IsNullOrEmpty(TestContext.CurrentContext.Test.MethodName) && monitorTests.TryGetValue(TestContext.CurrentContext.Test.MethodName, out var value)) logLevel = value; - loggerFactory = TestUtils.CreateLoggerFactoryInstance(logTextWriter, logLevel, scope: TestContext.CurrentContext.Test.FullName); + (loggerFactory, loggerProvider) = TestUtils.CreateLoggerFactoryInstance(logTextWriter, logLevel, scope: TestContext.CurrentContext.Test.FullName); logger = loggerFactory.CreateLogger(TestContext.CurrentContext.Test.FullName); logger.LogDebug("0. Setup >>>>>>>>>>>>"); r = new Random(674386); diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs index 9fabda2bf8b..d691f745b6f 100644 --- a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs +++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs @@ -152,6 +152,9 @@ public void ClusterSRTest([Values] bool disableObjects) [Category("REPLICATION")] public void ClusterSRNoCheckpointRestartSecondary([Values] bool performRMW, [Values] bool disableObjects) { + if (useTLS) + context.EnableGarnetLoggingEvents([GarnetTestLoggingEventType.LogPrimaryStreamType]); + var replica_count = 1;// Per primary var primary_count = 1; var primaryIndex = 0; diff --git a/test/Garnet.test/Extensions/BulkIncrementBy.cs b/test/Garnet.test/Extensions/BulkIncrementBy.cs index c6231d11bf4..361bfe2927f 100644 --- a/test/Garnet.test/Extensions/BulkIncrementBy.cs +++ b/test/Garnet.test/Extensions/BulkIncrementBy.cs @@ -9,8 +9,9 @@ namespace Garnet { sealed class BulkIncrementBy : CustomTransactionProcedure { - // BULKINCRBY 2 a 10 [b 15] [c 25] ... + // BULKINCRBY k1 incrby1 [k2 incrby2 [k3 incrby3 ...]] public static readonly RespCommandsInfo CommandInfo = new() { Arity = -4 }; + public static readonly string Name = "BULKINCRBY"; public override bool Prepare(TGarnetReadApi api, ref CustomProcedureInput procInput) { @@ -23,7 +24,7 @@ public override bool Prepare(TGarnetReadApi api, ref CustomProce for (var i = 0; i < count; i++) { AddKey(GetNextArg(ref procInput, ref offset), LockType.Exclusive, storeType: StoreType.Main); - GetNextArg(ref procInput, ref offset); + _ = GetNextArg(ref procInput, ref offset); } return true; diff --git a/test/Garnet.test/Extensions/BulkRead.cs b/test/Garnet.test/Extensions/BulkRead.cs index 7b938be6e8d..f6072a79921 100644 --- a/test/Garnet.test/Extensions/BulkRead.cs +++ b/test/Garnet.test/Extensions/BulkRead.cs @@ -9,8 +9,9 @@ namespace Garnet { sealed class BulkRead : CustomTransactionProcedure { - // BULKREAD 3 a [b] [c] + // BULKREAD a [b] [c] public static readonly RespCommandsInfo CommandInfo = new() { Arity = -3 }; + public static readonly string Name = "BULKREAD"; public override bool Prepare(TGarnetReadApi api, ref CustomProcedureInput procInput) { diff --git a/test/Garnet.test/NUnitLoggerProvider.cs b/test/Garnet.test/NUnitLoggerProvider.cs index 8d223958330..401bad77b8e 100644 --- a/test/Garnet.test/NUnitLoggerProvider.cs +++ b/test/Garnet.test/NUnitLoggerProvider.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using Garnet.common; using Microsoft.Extensions.Logging; @@ -18,6 +19,11 @@ public class NUnitLoggerProvider : ILoggerProvider private readonly bool matchLevel; private readonly LogLevel logLevel; + /// + /// Array of enabled test logging flag types + /// + public bool[] GarnetTestLoggingEvents = [.. Enum.GetValues().Select(_ => false)]; + static readonly string[] lvl = [ "trce", @@ -38,13 +44,14 @@ public NUnitLoggerProvider(TextWriter textWriter, string scope = "", HashSet new NUnitLogger(categoryName, textWriter, scope, skipCmd, recvOnly: recvOnly, matchLevel: matchLevel, logLevel: logLevel); + public ILogger CreateLogger(string categoryName) => new NUnitLogger(this, categoryName, textWriter, scope, skipCmd, recvOnly: recvOnly, matchLevel: matchLevel, logLevel: logLevel); public void Dispose() { } private class NUnitLogger : ILogger { + private readonly NUnitLoggerProvider provider; private readonly string categoryName; private readonly TextWriter textWriter; private readonly string scope; @@ -53,8 +60,9 @@ private class NUnitLogger : ILogger private readonly bool matchLevel; private readonly LogLevel logLevel; - public NUnitLogger(string categoryName, TextWriter textWriter, string scope, HashSet skipCmd = null, bool recvOnly = false, bool matchLevel = false, LogLevel logLevel = LogLevel.None) + public NUnitLogger(NUnitLoggerProvider provider, string categoryName, TextWriter textWriter, string scope, HashSet skipCmd = null, bool recvOnly = false, bool matchLevel = false, LogLevel logLevel = LogLevel.None) { + this.provider = provider; this.categoryName = categoryName; this.textWriter = textWriter; this.scope = scope; @@ -77,11 +85,26 @@ public void Log( Exception exception, Func formatter) { - if ((matchLevel && logLevel == this.logLevel) || !matchLevel) + if (state is GarnetTestLoggingEvent _state) + { + if (provider.GarnetTestLoggingEvents[(int)_state.Type]) + { + var msg = string.Format("[{0:d1}.{1}.({2})] |{3}| <{4}> {5} ^{6}^", + eventId.Id, + LogFormatter.FormatTime(DateTime.UtcNow), + GetLevelStr(logLevel), + scope, + categoryName, + exception, + formatter(state, exception)); + textWriter.Write(msg); + } + } + else if ((matchLevel && logLevel == this.logLevel) || !matchLevel) { - var msg = string.Format("[{0:D3}.{1}.({2})] |{3}| <{4}> {5} ^{6}^", + var msg = string.Format("[{0:d1}.{1}.({2})] |{3}| <{4}> {5} ^{6}^", eventId.Id, - LogFormatter.FormatDate(DateTime.UtcNow), + LogFormatter.FormatTime(DateTime.UtcNow), GetLevelStr(logLevel), scope, categoryName, diff --git a/test/Garnet.test/RespMetricsTest.cs b/test/Garnet.test/RespMetricsTest.cs index 1ab26aa6ece..6c07fb22939 100644 --- a/test/Garnet.test/RespMetricsTest.cs +++ b/test/Garnet.test/RespMetricsTest.cs @@ -33,7 +33,7 @@ public void Setup() server = null; r = new Random(674386); TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true); - loggerFactory = TestUtils.CreateLoggerFactoryInstance(TestContext.Progress, LogLevel.Error); + (loggerFactory, _) = TestUtils.CreateLoggerFactoryInstance(TestContext.Progress, LogLevel.Error); } [TearDown] diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs index f58aa882332..e362ba0deac 100644 --- a/test/Garnet.test/TestUtils.cs +++ b/test/Garnet.test/TestUtils.cs @@ -455,13 +455,20 @@ public static GarnetServer CreateGarnetServer( /// /// /// - public static ILoggerFactory CreateLoggerFactoryInstance(TextWriter textWriter, LogLevel logLevel, string scope = "", HashSet skipCmd = null, bool recvOnly = false, bool matchLevel = false) + public static (ILoggerFactory, NUnitLoggerProvider) CreateLoggerFactoryInstance( + TextWriter textWriter, + LogLevel logLevel, + string scope = "", + HashSet skipCmd = null, + bool recvOnly = false, + bool matchLevel = false) { - return LoggerFactory.Create(builder => + var provider = new NUnitLoggerProvider(textWriter, scope, skipCmd, recvOnly, matchLevel, logLevel); + return (LoggerFactory.Create(builder => { - builder.AddProvider(new NUnitLoggerProvider(textWriter, scope, skipCmd, recvOnly, matchLevel, logLevel)); + builder.AddProvider(provider); builder.SetMinimumLevel(logLevel); - }); + }), provider); } public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnetCluster(