diff --git a/benchmark/BDN.benchmark/BDN.benchmark.csproj b/benchmark/BDN.benchmark/BDN.benchmark.csproj
index 8e6466597a5..f3bda355d6b 100644
--- a/benchmark/BDN.benchmark/BDN.benchmark.csproj
+++ b/benchmark/BDN.benchmark/BDN.benchmark.csproj
@@ -1,32 +1,32 @@
-
- Exe
- enable
- true
- ../../Garnet.snk
- false
-
+
+ Exe
+ enable
+ true
+ ../../Garnet.snk
+ false
+
-
-
+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
-
-
-
-
-
-
+
+
+
+
+
+
-
+
\ No newline at end of file
diff --git a/benchmark/BDN.benchmark/Cluster/ClusterContext.cs b/benchmark/BDN.benchmark/Cluster/ClusterContext.cs
index e428ca60a0f..d39a87a3798 100644
--- a/benchmark/BDN.benchmark/Cluster/ClusterContext.cs
+++ b/benchmark/BDN.benchmark/Cluster/ClusterContext.cs
@@ -29,16 +29,28 @@ public void Dispose()
server.Dispose();
}
- public void SetupSingleInstance(bool disableSlotVerification = false)
+ public void SetupSingleInstance(ClusterParams clusterParams)
{
var opt = new GarnetServerOptions
{
QuietMode = true,
- EnableCluster = !disableSlotVerification,
+ EnableCluster = !clusterParams.disableSlotVerification,
EndPoints = [new IPEndPoint(IPAddress.Loopback, port)],
CleanClusterConfig = true,
- ClusterAnnounceEndpoint = new IPEndPoint(IPAddress.Loopback, port)
+ ClusterAnnounceEndpoint = new IPEndPoint(IPAddress.Loopback, port),
+ EnableAOF = clusterParams.enableAof,
};
+
+ if (clusterParams.enableAof)
+ {
+ opt.EnableAOF = true;
+ opt.UseAofNullDevice = true;
+ opt.FastAofTruncate = true;
+ opt.CommitFrequencyMs = -1;
+ opt.AofPageSize = "128m";
+ opt.AofMemorySize = "256m";
+ }
+
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
opt.CheckpointDir = "/tmp";
server = new EmbeddedRespServer(opt);
@@ -168,5 +180,4 @@ public void CreateCTXNSET(int keySize = 8, int batchSize = 100)
public void Consume(byte* ptr, int length)
=> session.TryConsumeMessages(ptr, length);
}
-
}
\ No newline at end of file
diff --git a/benchmark/BDN.benchmark/Cluster/ClusterMigrate.cs b/benchmark/BDN.benchmark/Cluster/ClusterMigrate.cs
index 532989937c4..45cd0689d75 100644
--- a/benchmark/BDN.benchmark/Cluster/ClusterMigrate.cs
+++ b/benchmark/BDN.benchmark/Cluster/ClusterMigrate.cs
@@ -25,7 +25,7 @@ public unsafe class ClusterMigrate
///
public IEnumerable ClusterParamsProvider()
{
- yield return new(false);
+ yield return new(false, false);
}
ClusterContext cc;
@@ -34,7 +34,7 @@ public IEnumerable ClusterParamsProvider()
public void GlobalSetup()
{
cc = new ClusterContext();
- cc.SetupSingleInstance();
+ cc.SetupSingleInstance(Params);
cc.AddSlotRange([(0, 16383)]);
cc.CreateGetSet();
cc.CreateMGetMSet();
diff --git a/benchmark/BDN.benchmark/Cluster/ClusterOperations.cs b/benchmark/BDN.benchmark/Cluster/ClusterOperations.cs
index 2799885ae92..9da411d87bd 100644
--- a/benchmark/BDN.benchmark/Cluster/ClusterOperations.cs
+++ b/benchmark/BDN.benchmark/Cluster/ClusterOperations.cs
@@ -22,8 +22,9 @@ public unsafe class ClusterOperations
///
public IEnumerable ClusterParamsProvider()
{
- yield return new(false);
- yield return new(true);
+ yield return new(false, false);
+ yield return new(true, false);
+ yield return new(false, true);
}
ClusterContext cc;
@@ -32,7 +33,7 @@ public IEnumerable ClusterParamsProvider()
public virtual void GlobalSetup()
{
cc = new ClusterContext();
- cc.SetupSingleInstance(Params.disableSlotVerification);
+ cc.SetupSingleInstance(Params);
cc.AddSlotRange([(0, 16383)]);
cc.CreateGetSet();
cc.CreateMGetMSet();
diff --git a/benchmark/BDN.benchmark/Cluster/ClusterParams.cs b/benchmark/BDN.benchmark/Cluster/ClusterParams.cs
index a47a86568bc..68b3e5abe98 100644
--- a/benchmark/BDN.benchmark/Cluster/ClusterParams.cs
+++ b/benchmark/BDN.benchmark/Cluster/ClusterParams.cs
@@ -13,12 +13,18 @@ public struct ClusterParams
///
public bool disableSlotVerification;
+ ///
+ /// Whether to enable AOF
+ ///
+ public bool enableAof;
+
///
/// Constructor
///
- public ClusterParams(bool disableSlotVerification)
+ public ClusterParams(bool disableSlotVerification, bool enableAof)
{
this.disableSlotVerification = disableSlotVerification;
+ this.enableAof = enableAof;
}
///
@@ -26,12 +32,16 @@ public ClusterParams(bool disableSlotVerification)
///
public override string ToString()
{
- if (!disableSlotVerification)
+ if (!disableSlotVerification && !enableAof)
return "None";
var ret = "";
if (disableSlotVerification)
ret += "DSV";
+
+ if (enableAof)
+ ret += ret.Length == 0 ? "AOF" : "+AOF";
+
return ret;
}
}
diff --git a/libs/client/ClientSession/GarnetClientSession.cs b/libs/client/ClientSession/GarnetClientSession.cs
index 66865626170..8b9161c762d 100644
--- a/libs/client/ClientSession/GarnetClientSession.cs
+++ b/libs/client/ClientSession/GarnetClientSession.cs
@@ -216,7 +216,7 @@ private async Task ConnectSendSocketAsync(int millisecondsTimeout = 0, C
NoDelay = true
};
- if (await TryConnectSocketAsync(socket, endpoint, millisecondsTimeout, cancellationToken))
+ if (await TryConnectSocketAsync(socket, endpoint, millisecondsTimeout, cancellationToken).ConfigureAwait(false))
return socket;
}
}
@@ -226,7 +226,7 @@ private async Task ConnectSendSocketAsync(int millisecondsTimeout = 0, C
if (EndPoint is not UnixDomainSocketEndPoint)
socket.NoDelay = true;
- if (await TryConnectSocketAsync(socket, EndPoint, millisecondsTimeout, cancellationToken))
+ if (await TryConnectSocketAsync(socket, EndPoint, millisecondsTimeout, cancellationToken).ConfigureAwait(false))
return socket;
}
@@ -251,12 +251,12 @@ private async Task TryConnectSocketAsync(Socket socket, EndPoint endpoint,
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var connectTask = socket.ConnectAsync(endpoint, timeoutCts.Token).AsTask();
- if (await Task.WhenAny(connectTask, Task.Delay(millisecondsTimeout, timeoutCts.Token)) == connectTask)
+ if (await Task.WhenAny(connectTask, Task.Delay(millisecondsTimeout, timeoutCts.Token)).ConfigureAwait(false) == connectTask)
{
// Task completed within timeout.
// Consider that the task may have faulted or been canceled.
// We re-await the task so that any exceptions/cancellation is rethrown.
- await connectTask;
+ await connectTask.ConfigureAwait(false);
}
else
{
diff --git a/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs b/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs
index 32c70732510..d81713ba3a7 100644
--- a/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs
+++ b/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs
@@ -31,7 +31,7 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
///
///
///
- ///
+ ///
public Task ExecuteReplicaSync(string nodeId, string primary_replid, byte[] checkpointEntryData, long aofBeginAddress, long aofTailAddress)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -113,7 +113,7 @@ public Task ExecuteReplicaSync(string nodeId, string primary_replid, byt
///
///
///
- ///
+ ///
public Task ExecuteSendCkptMetadata(Memory fileTokenBytes, int fileType, Memory data)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -180,7 +180,7 @@ public Task ExecuteSendCkptMetadata(Memory fileTokenBytes, int fil
///
///
///
- ///
+ ///
public Task ExecuteSendFileSegments(Memory fileTokenBytes, int fileType, long startAddress, Span data, int segmentId = -1)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -266,7 +266,7 @@ public Task ExecuteSendFileSegments(Memory fileTokenBytes, int fil
///
///
///
- ///
+ ///
public Task ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool replayAOF, string primary_replid, byte[] checkpointEntryData, long beginAddress, long tailAddress)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -355,7 +355,7 @@ public Task ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool re
///
///
///
- ///
+ ///
public Task ExecuteAttachSync(byte[] syncMetadata)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -403,7 +403,7 @@ public Task ExecuteAttachSync(byte[] syncMetadata)
/// Set CLUSTER SYNC header info
///
///
- ///
+ ///
public void SetClusterSyncHeader(string sourceNodeId)
{
// Unlike Migration, where we don't know at the time of header initialization if we have a record or not, in Replication
diff --git a/libs/client/GarnetClient.cs b/libs/client/GarnetClient.cs
index 36cef49fb9f..c41fcff620c 100644
--- a/libs/client/GarnetClient.cs
+++ b/libs/client/GarnetClient.cs
@@ -277,7 +277,21 @@ public async Task ConnectAsync(CancellationToken token = default)
}
catch (Exception e)
{
- logger?.LogError(e, "AUTH returned error");
+ logger?.LogError(e, "AUTH returned error!");
+ throw;
+ }
+
+ try
+ {
+ if (clientName != null)
+ {
+ _ = await ExecuteForStringResultAsync(CLIENT, SETINFO).ConfigureAwait(false);
+ _ = await ExecuteForStringResultAsync(CLIENT, clientName).ConfigureAwait(false);
+ }
+ }
+ catch (Exception e)
+ {
+ logger?.LogError(e, "Client set info returned error");
throw;
}
@@ -316,7 +330,7 @@ private async Task ConnectSendSocketAsync(int millisecondsTimeout = 0, C
NoDelay = true
};
- if (await TryConnectSocketAsync(socket, endpoint, millisecondsTimeout, cancellationToken))
+ if (await TryConnectSocketAsync(socket, endpoint, millisecondsTimeout, cancellationToken).ConfigureAwait(false))
return socket;
}
}
@@ -326,7 +340,7 @@ private async Task ConnectSendSocketAsync(int millisecondsTimeout = 0, C
if (EndPoint is not UnixDomainSocketEndPoint)
socket.NoDelay = true;
- if (await TryConnectSocketAsync(socket, EndPoint, millisecondsTimeout, cancellationToken))
+ if (await TryConnectSocketAsync(socket, EndPoint, millisecondsTimeout, cancellationToken).ConfigureAwait(false))
return socket;
}
@@ -351,12 +365,12 @@ private async Task TryConnectSocketAsync(Socket socket, EndPoint endpoint,
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var connectTask = socket.ConnectAsync(endpoint, timeoutCts.Token).AsTask();
- if (await Task.WhenAny(connectTask, Task.Delay(millisecondsTimeout, timeoutCts.Token)) == connectTask)
+ if (await Task.WhenAny(connectTask, Task.Delay(millisecondsTimeout, timeoutCts.Token)).ConfigureAwait(false) == connectTask)
{
// Task completed within timeout.
// Consider that the task may have faulted or been canceled.
// We re-await the task so that any exceptions/cancellation is rethrown.
- await connectTask;
+ await connectTask.ConfigureAwait(false);
}
else
{
@@ -394,7 +408,7 @@ async Task TimeoutChecker()
var _tcsOffset = tcsOffset;
var _tailAddress = networkWriter.GetTailAddress();
- await Task.Delay(timeoutMilliseconds, token);
+ await Task.Delay(timeoutMilliseconds, token).ConfigureAwait(false);
// Check if no new tasks added + no new results processed
var _newTcsOffset = tcsOffset;
var _newNextTaskId = networkWriter.GetNextTaskId();
@@ -447,7 +461,7 @@ public async Task ReconnectAsync(CancellationToken token = default)
networkWriter?.Dispose();
}
catch { }
- await ConnectAsync(token);
+ await ConnectAsync(token).ConfigureAwait(false);
}
///
@@ -525,7 +539,7 @@ async ValueTask InputGateAsync(CancellationToken token = default)
{
if (PipelineLength() < maxOutstandingTasks)
break;
- await Task.Delay(delayMs, token);
+ await Task.Delay(delayMs, token).ConfigureAwait(false);
if (delayMs == 0) delayMs = 1;
else delayMs *= 2;
if (delayMs > 4096) delayMs = 4096;
@@ -598,7 +612,7 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory op, string par
totalLen += 1 + NumUtils.CountDigits(arraySize) + 2;
CheckLength(totalLen, tcs);
- await InputGateAsync(token);
+ await InputGateAsync(token).ConfigureAwait(false);
try
{
@@ -660,7 +674,7 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory op, string par
try
{
networkWriter.epoch.Suspend();
- await AwaitPreviousTaskAsync(taskId); // does not take token, as task is not cancelable at this point
+ await AwaitPreviousTaskAsync(taskId).ConfigureAwait(false); // does not take token, as task is not cancelable at this point
}
finally
{
@@ -711,7 +725,7 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory op, Memory op, Memory respOp, IColle
}
CheckLength(totalLen, tcs);
- await InputGateAsync(token);
+ await InputGateAsync(token).ConfigureAwait(false);
try
{
@@ -1092,7 +1106,7 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory respOp, IColle
try
{
networkWriter.epoch.Suspend();
- await AwaitPreviousTaskAsync(taskId); // does not take token, as task is not cancelable at this point
+ await AwaitPreviousTaskAsync(taskId).ConfigureAwait(false); // does not take token, as task is not cancelable at this point
}
finally
{
diff --git a/libs/client/GarnetClientAPI/GarnetClientBasicRespCommands.cs b/libs/client/GarnetClientAPI/GarnetClientBasicRespCommands.cs
index abb58d6a334..4bf9983a45c 100644
--- a/libs/client/GarnetClientAPI/GarnetClientBasicRespCommands.cs
+++ b/libs/client/GarnetClientAPI/GarnetClientBasicRespCommands.cs
@@ -572,7 +572,7 @@ public async Task StringDecrement(Memory key, long value)
/// Value
///
public async Task StringDecrement(Memory key, long value, CancellationToken token)
- => long.Parse(await ExecuteForStringResultWithCancellationAsync(DECRBY, key, Encoding.ASCII.GetBytes(value.ToString()), token));
+ => long.Parse(await ExecuteForStringResultWithCancellationAsync(DECRBY, key, Encoding.ASCII.GetBytes(value.ToString()), token).ConfigureAwait(false));
///
/// Decrement number stored at key by value.
diff --git a/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs b/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs
index dbed29ec64e..ea715ad651f 100644
--- a/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs
+++ b/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs
@@ -149,13 +149,13 @@ public async Task ExecuteForStringResultWithCancellationAsync(Memory ExecuteForStringResultWithCancellationAsync(Memory ExecuteForStringResultWithCancellationAsync(string op,
using (token.Register(TokenRegistrationStringCallback, tcs.stringTcs))
{
var _ = InternalExecuteAsync(tcs, op, args, token);
- return await tcs.stringTcs.Task;
+ return await tcs.stringTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, op, args, token);
- return await tcs.stringTcs.Task;
+ return await tcs.stringTcs.Task.ConfigureAwait(false);
}
}
@@ -377,13 +377,13 @@ public async Task> ExecuteForMemoryResultWithCancellationAsyn
using (token.Register(TokenRegistrationMemoryResultCallback, tcs.memoryByteTcs))
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.memoryByteTcs.Task;
+ return await tcs.memoryByteTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.memoryByteTcs.Task;
+ return await tcs.memoryByteTcs.Task.ConfigureAwait(false);
}
}
@@ -403,13 +403,13 @@ public async Task> ExecuteForMemoryResultWithCancellationAsyn
using (token.Register(TokenRegistrationMemoryResultCallback, tcs.memoryByteTcs))
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.memoryByteTcs.Task;
+ return await tcs.memoryByteTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.memoryByteTcs.Task;
+ return await tcs.memoryByteTcs.Task.ConfigureAwait(false);
}
}
@@ -428,13 +428,13 @@ public async Task> ExecuteForMemoryResultWithCancellationAsyn
using (token.Register(TokenRegistrationMemoryResultCallback, tcs.memoryByteTcs))
{
var _ = InternalExecuteAsync(tcs, op, args, token);
- return await tcs.memoryByteTcs.Task;
+ return await tcs.memoryByteTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, op, args, token);
- return await tcs.memoryByteTcs.Task;
+ return await tcs.memoryByteTcs.Task.ConfigureAwait(false);
}
}
@@ -452,13 +452,13 @@ public async Task> ExecuteForMemoryResultWithCancellationAsyn
using (token.Register(TokenRegistrationMemoryResultCallback, tcs.memoryByteTcs))
{
var _ = InternalExecuteAsync(tcs, respOp, args, token);
- return await tcs.memoryByteTcs.Task;
+ return await tcs.memoryByteTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, respOp, args, token);
- return await tcs.memoryByteTcs.Task;
+ return await tcs.memoryByteTcs.Task.ConfigureAwait(false);
}
}
@@ -601,13 +601,13 @@ public async Task ExecuteForStringArrayResultWithCancellationAsync(Mem
using (token.Register(TokenRegistrationStringArrayCallback, tcs.stringArrayTcs))
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.stringArrayTcs.Task;
+ return await tcs.stringArrayTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.stringArrayTcs.Task;
+ return await tcs.stringArrayTcs.Task.ConfigureAwait(false);
}
}
@@ -628,13 +628,13 @@ public async Task ExecuteForStringArrayResultWithCancellationAsync(Mem
using (token.Register(TokenRegistrationStringArrayCallback, tcs.stringArrayTcs))
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.stringArrayTcs.Task;
+ return await tcs.stringArrayTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.stringArrayTcs.Task;
+ return await tcs.stringArrayTcs.Task.ConfigureAwait(false);
}
}
@@ -654,13 +654,13 @@ public async Task ExecuteForStringArrayResultWithCancellationAsync(str
using (token.Register(TokenRegistrationStringArrayCallback, tcs.stringArrayTcs))
{
var _ = InternalExecuteAsync(tcs, op, args, token);
- return await tcs.stringArrayTcs.Task;
+ return await tcs.stringArrayTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, op, args, token);
- return await tcs.stringArrayTcs.Task;
+ return await tcs.stringArrayTcs.Task.ConfigureAwait(false);
}
}
@@ -679,13 +679,13 @@ public async Task ExecuteForStringArrayResultWithCancellationAsync(Mem
using (token.Register(TokenRegistrationStringArrayCallback, tcs.stringArrayTcs))
{
var _ = InternalExecuteAsync(tcs, respOp, args, token);
- return await tcs.stringArrayTcs.Task;
+ return await tcs.stringArrayTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, respOp, args, token);
- return await tcs.stringArrayTcs.Task;
+ return await tcs.stringArrayTcs.Task.ConfigureAwait(false);
}
}
@@ -829,13 +829,13 @@ public async Task[]> ExecuteForMemoryResultArrayWithCancellat
using (token.Register(TokenRegistrationMemoryResultArrayCallback, tcs.memoryByteArrayTcs))
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.memoryByteArrayTcs.Task;
+ return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.memoryByteArrayTcs.Task;
+ return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false);
}
}
@@ -856,13 +856,13 @@ public async Task[]> ExecuteForMemoryResultArrayWithCancellat
using (token.Register(TokenRegistrationMemoryResultArrayCallback, tcs.memoryByteArrayTcs))
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.memoryByteArrayTcs.Task;
+ return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, respOp, param1, param2, token);
- return await tcs.memoryByteArrayTcs.Task;
+ return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false);
}
}
@@ -883,13 +883,13 @@ public async Task[]> ExecuteForMemoryResultArrayWithCancellat
using (token.Register(TokenRegistrationMemoryResultArrayCallback, tcs.memoryByteArrayTcs))
{
var _ = InternalExecuteAsync(tcs, op, args, token);
- return await tcs.memoryByteArrayTcs.Task;
+ return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, op, args, token);
- return await tcs.memoryByteArrayTcs.Task;
+ return await tcs.memoryByteArrayTcs.Task.ConfigureAwait(false);
}
}
@@ -1034,13 +1034,13 @@ public async Task ExecuteForLongResultWithCancellationAsync(string op, ICo
using (token.Register(TokenRegistrationLongCallback, tcs.longTcs))
{
var _ = InternalExecuteAsync(tcs, op, args, token);
- return await tcs.longTcs.Task;
+ return await tcs.longTcs.Task.ConfigureAwait(false);
}
}
else
{
var _ = InternalExecuteAsync(tcs, op, args, token);
- return await tcs.longTcs.Task;
+ return await tcs.longTcs.Task.ConfigureAwait(false);
}
}
diff --git a/libs/client/LightEpoch.cs b/libs/client/LightEpoch.cs
index d54e254cdf4..c88acb1e64b 100644
--- a/libs/client/LightEpoch.cs
+++ b/libs/client/LightEpoch.cs
@@ -193,7 +193,7 @@ int SelectInstance()
if (kInvalidIndex == Interlocked.CompareExchange(ref entry, 1, kInvalidIndex))
return i;
}
- throw new InvalidOperationException("Exceeded maximum number of active LightEpoch instances");
+ throw new InvalidOperationException($"Exceeded maximum number of active LightEpoch instances {ActiveInstanceCount()} {InstanceIndexBuffer.MaxInstances}");
}
///
diff --git a/libs/client/Utility.cs b/libs/client/Utility.cs
index 3e008e66d40..45965a86a7e 100644
--- a/libs/client/Utility.cs
+++ b/libs/client/Utility.cs
@@ -192,14 +192,14 @@ private static async Task SlowWithCancellationAsync(Task task, Cancella
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
using (token.Register(s => ((TaskCompletionSource)s).TrySetResult(true), tcs, useSynchronizationContext))
{
- if (task != await Task.WhenAny(task, tcs.Task))
+ if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false))
{
token.ThrowIfCancellationRequested();
}
}
// make sure any exceptions in the task get unwrapped and exposed to the caller.
- return await task;
+ return await task.ConfigureAwait(false);
}
}
}
\ No newline at end of file
diff --git a/libs/cluster/Server/ClusterConfig.cs b/libs/cluster/Server/ClusterConfig.cs
index e3bb967d44f..55f7ba64e4d 100644
--- a/libs/cluster/Server/ClusterConfig.cs
+++ b/libs/cluster/Server/ClusterConfig.cs
@@ -1176,7 +1176,7 @@ public ClusterConfig MergeSlotMap(ClusterConfig senderConfig, ILogger logger = n
// will not be able to claim the slot without outside intervention
if (currentOwnerNodeId != null && currentOwnerNodeId.Equals(senderConfig.LocalNodeId, StringComparison.OrdinalIgnoreCase))
{
- logger?.LogWarning("MergeReset: {senderConfig.LocalNodeIdShort} > {i} > {LocalNodeIdShort}", senderConfig.LocalNodeIdShort, i, LocalNodeIdShort);
+ // logger?.LogWarning("MergeReset: {senderConfig.LocalNodeIdShort} > {i} > {LocalNodeIdShort}", senderConfig.LocalNodeIdShort, i, LocalNodeIdShort);
newSlotMap[i]._workerId = RESERVED_WORKER_ID;
newSlotMap[i]._state = SlotState.OFFLINE;
}
diff --git a/libs/cluster/Server/Failover/FailoverManager.cs b/libs/cluster/Server/Failover/FailoverManager.cs
index 9dd98b26513..f25989c5b00 100644
--- a/libs/cluster/Server/Failover/FailoverManager.cs
+++ b/libs/cluster/Server/Failover/FailoverManager.cs
@@ -89,7 +89,7 @@ public bool TryStartReplicaFailover(FailoverOption option, TimeSpan failoverTime
logger: logger);
_ = Task.Run(async () =>
{
- var success = await currentFailoverSession.BeginAsyncReplicaFailover();
+ var success = await currentFailoverSession.BeginAsyncReplicaFailover().ConfigureAwait(false);
lastFailoverStatus = success ? FailoverStatus.FAILOVER_COMPLETED : FailoverStatus.FAILOVER_ABORTED;
Reset();
});
@@ -121,7 +121,7 @@ public bool TryStartPrimaryFailover(string replicaAddress, int replicaPort, Fail
logger: logger);
_ = Task.Run(async () =>
{
- _ = await currentFailoverSession.BeginAsyncPrimaryFailover();
+ _ = await currentFailoverSession.BeginAsyncPrimaryFailover().ConfigureAwait(false);
Reset();
});
return true;
diff --git a/libs/cluster/Server/Failover/PrimaryFailoverSession.cs b/libs/cluster/Server/Failover/PrimaryFailoverSession.cs
index e1f5eedd0da..84161f4f328 100644
--- a/libs/cluster/Server/Failover/PrimaryFailoverSession.cs
+++ b/libs/cluster/Server/Failover/PrimaryFailoverSession.cs
@@ -18,7 +18,7 @@ private Task CheckReplicaSync(GarnetClient gclient)
if (!gclient.IsConnected)
gclient.Connect();
- return gclient.failreplicationoffset(clusterProvider.replicationManager.ReplicationOffset).WaitAsync(clusterTimeout, cts.Token);
+ return gclient.ExecuteClusterFailReplicationOffset(clusterProvider.replicationManager.ReplicationOffset).WaitAsync(clusterTimeout, cts.Token);
}
catch (Exception ex)
{
@@ -38,7 +38,7 @@ private async Task WaitForFirstReplicaSync()
tasks[tcount++] = CheckReplicaSync(_gclient);
tasks[clients.Length] = Task.Delay(failoverTimeout).ContinueWith(_ => default(long));
- var completedTask = await Task.WhenAny(tasks);
+ var completedTask = await Task.WhenAny(tasks).ConfigureAwait(false);
// No replica was able to catch up with primary so timeout
if (completedTask == tasks[clients.Length])
@@ -59,7 +59,7 @@ private async Task WaitForFirstReplicaSync()
{
var syncTask = CheckReplicaSync(clients[0]);
var timeoutTask = Task.Delay(failoverTimeout, cts.Token);
- var completedTask = await Task.WhenAny(syncTask, timeoutTask);
+ var completedTask = await Task.WhenAny(syncTask, timeoutTask).ConfigureAwait(false);
// Replica trying to failover did not caught up on time so timeout
if (completedTask == timeoutTask)
@@ -82,7 +82,7 @@ private async Task InitiateReplicaTakeOver(GarnetClient gclient)
if (!gclient.IsConnected)
gclient.Connect();
- return await gclient.Failover(FailoverOption.TAKEOVER).WaitAsync(clusterTimeout, cts.Token);
+ return await gclient.Failover(FailoverOption.TAKEOVER).WaitAsync(clusterTimeout, cts.Token).ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -104,11 +104,11 @@ public async Task BeginAsyncPrimaryFailover()
_ = clusterProvider.BumpAndWaitForEpochTransition();
status = FailoverStatus.WAITING_FOR_SYNC;
- var newPrimary = await WaitForFirstReplicaSync();
+ var newPrimary = await WaitForFirstReplicaSync().ConfigureAwait(false);
if (newPrimary == null) return false;
status = FailoverStatus.TAKING_OVER_AS_PRIMARY;
- var success = await InitiateReplicaTakeOver(newPrimary);
+ var success = await InitiateReplicaTakeOver(newPrimary).ConfigureAwait(false);
if (!success) return false;
}
catch (Exception ex)
diff --git a/libs/cluster/Server/Failover/ReplicaFailoverSession.cs b/libs/cluster/Server/Failover/ReplicaFailoverSession.cs
index 9fc01ffe6b2..44f6abb343c 100644
--- a/libs/cluster/Server/Failover/ReplicaFailoverSession.cs
+++ b/libs/cluster/Server/Failover/ReplicaFailoverSession.cs
@@ -42,7 +42,7 @@ private async Task CreateConnectionAsync(string nodeId)
try
{
if (!client.IsConnected)
- await client.ReconnectAsync().WaitAsync(failoverTimeout, cts.Token);
+ await client.ReconnectAsync().WaitAsync(failoverTimeout, cts.Token).ConfigureAwait(false);
return client;
}
@@ -84,7 +84,7 @@ private async Task PauseWritesAndWaitForSync()
// Issue stop writes to the primary
status = FailoverStatus.ISSUING_PAUSE_WRITES;
var localIdBytes = Encoding.ASCII.GetBytes(oldConfig.LocalNodeId);
- var primaryReplicationOffset = await client.failstopwrites(localIdBytes).WaitAsync(failoverTimeout, cts.Token);
+ var primaryReplicationOffset = await client.failstopwrites(localIdBytes).WaitAsync(failoverTimeout, cts.Token).ConfigureAwait(false);
// Wait for replica to catch up
status = FailoverStatus.WAITING_FOR_SYNC;
@@ -172,7 +172,7 @@ private async Task BroadcastConfigAndRequestAttach(string replicaId, byte[] conf
{
var oldPrimaryId = oldConfig.LocalNodePrimaryId;
var newConfig = clusterProvider.clusterManager.CurrentConfig;
- var client = oldPrimaryId.Equals(replicaId) ? primaryClient : await GetConnectionAsync(replicaId);
+ var client = oldPrimaryId.Equals(replicaId) ? primaryClient : await GetConnectionAsync(replicaId).ConfigureAwait(false);
try
{
@@ -211,13 +211,13 @@ await client.Gossip(configByteArray).ContinueWith(t =>
{
resp.Dispose();
}
- }, TaskContinuationOptions.RunContinuationsAsynchronously).WaitAsync(failoverTimeout, cts.Token);
+ }, TaskContinuationOptions.RunContinuationsAsynchronously).WaitAsync(failoverTimeout, cts.Token).ConfigureAwait(false);
var localAddress = oldConfig.LocalNodeIp;
var localPort = oldConfig.LocalNodePort;
// Ask replica to attach and sync
- var replicaOfResp = await client.ReplicaOf(localAddress, localPort).WaitAsync(failoverTimeout, cts.Token);
+ var replicaOfResp = await client.ReplicaOf(localAddress, localPort).WaitAsync(failoverTimeout, cts.Token).ConfigureAwait(false);
// Check if response for attach succeeded
if (!replicaOfResp.Equals("OK"))
@@ -254,7 +254,7 @@ private async Task IssueAttachReplicas()
{
try
{
- attachReplicaTasks.Add(Task.Run(async () => await BroadcastConfigAndRequestAttach(replicaId, configByteArray)));
+ attachReplicaTasks.Add(Task.Run(async () => await BroadcastConfigAndRequestAttach(replicaId, configByteArray).ConfigureAwait(false)));
}
catch (Exception ex)
{
@@ -297,7 +297,7 @@ public async Task BeginAsyncReplicaFailover()
try
{
// Issue stop writes and on ack wait for replica to catch up
- if (option is FailoverOption.DEFAULT && !await PauseWritesAndWaitForSync())
+ if (option is FailoverOption.DEFAULT && !await PauseWritesAndWaitForSync().ConfigureAwait(false))
{
return false;
}
diff --git a/libs/cluster/Server/Gossip/GarnetClientExtensions.cs b/libs/cluster/Server/Gossip/GarnetClientExtensions.cs
index 0cf3053a0f5..5ba714bc05a 100644
--- a/libs/cluster/Server/Gossip/GarnetClientExtensions.cs
+++ b/libs/cluster/Server/Gossip/GarnetClientExtensions.cs
@@ -11,7 +11,7 @@
namespace Garnet.cluster
{
- internal static partial class GarnetClientExtensions
+ internal static class GarnetClientExtensions
{
static readonly Memory GOSSIP = "GOSSIP"u8.ToArray();
static readonly Memory WITHMEET = "WITHMEET"u8.ToArray();
@@ -56,7 +56,7 @@ public static async Task failstopwrites(this GarnetClient client, Memory
///
///
- public static async Task failreplicationoffset(this GarnetClient client, long primaryReplicationOffset, CancellationToken cancellationToken = default)
+ public static async Task ExecuteClusterFailReplicationOffset(this GarnetClient client, long primaryReplicationOffset, CancellationToken cancellationToken = default)
{
var args = new Memory[] {
CmdStrings.failreplicationoffset.ToArray(),
@@ -65,7 +65,16 @@ public static async Task failreplicationoffset(this GarnetClient client, l
return await client.ExecuteForLongResultWithCancellationAsync(GarnetClient.CLUSTER, args, cancellationToken).ConfigureAwait(false);
}
- public static void ClusterPublishNoResponse(this GarnetClient client, RespCommand cmd, ref Span channel, ref Span message, CancellationToken cancellationToken = default)
+ ///
+ /// Publishes a message to a specified channel in a clustered Garnet environment without waiting for a server
+ /// response.
+ ///
+ /// The Garnet client instance used to send the publish command.
+ /// The RESP command to execute. Must be either PUBLISH or SPUBLISH.
+ /// A span containing the channel name to which the message will be published.
+ /// A span containing the message to publish to the channel.
+ /// A cancellation token that can be used to cancel the operation.
+ public static void ExecuteClusterPublishNoResponse(this GarnetClient client, RespCommand cmd, ref Span channel, ref Span message, CancellationToken cancellationToken = default)
=> client.ExecuteNoResponse(GarnetClient.CLUSTER, RespCommand.PUBLISH == cmd ? GarnetClient.PUBLISH : GarnetClient.SPUBLISH, ref channel, ref message, cancellationToken);
}
}
\ No newline at end of file
diff --git a/libs/cluster/Server/Gossip/GarnetClusterConnectionStore.cs b/libs/cluster/Server/Gossip/GarnetClusterConnectionStore.cs
index 90eaff8be8b..5f75f5c7b8f 100644
--- a/libs/cluster/Server/Gossip/GarnetClusterConnectionStore.cs
+++ b/libs/cluster/Server/Gossip/GarnetClusterConnectionStore.cs
@@ -152,7 +152,7 @@ public async Task AddConnection(GarnetServerNode conn)
{
try
{
- await AcquireWriteLockAsync();
+ await AcquireWriteLockAsync().ConfigureAwait(false);
if (_disposed) return false;
@@ -182,7 +182,7 @@ public async Task AddConnection(GarnetServerNode conn)
GarnetServerNode conn = null;
try
{
- await AcquireWriteLockAsync();
+ await AcquireWriteLockAsync().ConfigureAwait(false);
// Fail on disposed
if (_disposed) return (false, conn);
@@ -215,7 +215,7 @@ public async Task TryRemoveConnection(string nodeId)
{
try
{
- await AcquireWriteLockAsync();
+ await AcquireWriteLockAsync().ConfigureAwait(false);
// Fail on disposed
if (_disposed) return false;
diff --git a/libs/cluster/Server/Gossip/GarnetServerNode.cs b/libs/cluster/Server/Gossip/GarnetServerNode.cs
index 3b61e7c0e44..e9ed7c85bb5 100644
--- a/libs/cluster/Server/Gossip/GarnetServerNode.cs
+++ b/libs/cluster/Server/Gossip/GarnetServerNode.cs
@@ -79,7 +79,8 @@ public GarnetServerNode(ClusterProvider clusterProvider, EndPoint endpoint, SslC
this.clusterProvider = clusterProvider;
this.EndPoint = endpoint;
this.gc = new GarnetClient(
- endpoint, tlsOptions,
+ endpoint,
+ tlsOptions,
sendPageSize: opts.DisablePubSub ? defaultSendPageSize : Math.Max(defaultSendPageSize, (int)opts.PubSubPageSizeBytes()),
maxOutstandingTasks: defaultMaxOutstandingTask,
timeoutMilliseconds: opts.ClusterTimeout <= 0 ? 0 : TimeSpan.FromSeconds(opts.ClusterTimeout).Milliseconds,
@@ -178,7 +179,7 @@ byte[] GetMostRecentConfig()
///
private Task Gossip(byte[] configByteArray)
{
- return gc.Gossip(configByteArray).ContinueWith(t =>
+ return gc.Gossip(configByteArray, internalCts.Token).ContinueWith(t =>
{
try
{
@@ -212,7 +213,7 @@ private Task Gossip(byte[] configByteArray)
public async Task> TryMeetAsync(byte[] configByteArray)
{
UpdateGossipSend();
- var resp = await gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token);
+ var resp = await gc.GossipWithMeet(configByteArray, internalCts.Token).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token).ConfigureAwait(false);
return resp;
}
@@ -297,7 +298,7 @@ public void TryClusterPublish(RespCommand cmd, ref Span channel, ref Span<
}
locked = true;
- gc.ClusterPublishNoResponse(cmd, ref channel, ref message);
+ gc.ExecuteClusterPublishNoResponse(cmd, ref channel, ref message);
}
finally
{
diff --git a/libs/cluster/Server/Migration/MigrateSessionSlots.cs b/libs/cluster/Server/Migration/MigrateSessionSlots.cs
index f4e55e02bae..292866a80e1 100644
--- a/libs/cluster/Server/Migration/MigrateSessionSlots.cs
+++ b/libs/cluster/Server/Migration/MigrateSessionSlots.cs
@@ -86,7 +86,7 @@ public async Task MigrateSlotsDriverInline()
// Send store
logger?.LogWarning("Store migrate scan range [{storeBeginAddress}, {storeTailAddress}]", storeBeginAddress, storeTailAddress);
- var success = await CreateAndRunMigrateTasks(storeBeginAddress, storeTailAddress, storePageSize);
+ var success = await CreateAndRunMigrateTasks(storeBeginAddress, storeTailAddress, storePageSize).ConfigureAwait(false);
if (!success) return false;
return true;
diff --git a/libs/cluster/Server/Migration/MigrationDriver.cs b/libs/cluster/Server/Migration/MigrationDriver.cs
index 5da3d36a1e3..eeec905ab08 100644
--- a/libs/cluster/Server/Migration/MigrationDriver.cs
+++ b/libs/cluster/Server/Migration/MigrationDriver.cs
@@ -83,7 +83,7 @@ private async Task BeginAsyncMigrationTask()
// If we have any namespaces, that implies Vector Sets, and if we have any of THOSE
// we need to reserve destination sets on the other side
- if ((_namespaces?.Count ?? 0) > 0 && !await ReserveDestinationVectorSetsAsync())
+ if ((_namespaces?.Count ?? 0) > 0 && !await ReserveDestinationVectorSetsAsync().ConfigureAwait(false))
{
logger?.LogError("Failed to reserve destination vector sets, migration failed");
TryRecoverFromFailure();
@@ -93,7 +93,7 @@ private async Task BeginAsyncMigrationTask()
#region migrateData
// Migrate actual data
- if (!await MigrateSlotsDriverInline())
+ if (!await MigrateSlotsDriverInline().ConfigureAwait(false))
{
logger?.LogError("MigrateSlotsDriver failed");
TryRecoverFromFailure();
@@ -106,7 +106,7 @@ private async Task BeginAsyncMigrationTask()
// Lock config merge to avoid a background epoch bump
clusterProvider.clusterManager.SuspendConfigMerge();
configResumed = false;
- await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false);
+ await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false).ConfigureAwait(false);
// Change ownership of slots to target node.
if (!TrySetSlotRanges(GetTargetNodeId, MigrateState.NODE))
@@ -127,7 +127,7 @@ private async Task BeginAsyncMigrationTask()
}
// Gossip again to ensure that source and target agree on the slot exchange
- await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false);
+ await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false).ConfigureAwait(false);
#endregion
// Enqueue success log
diff --git a/libs/cluster/Session/RespClusterBasicCommands.cs b/libs/cluster/Session/RespClusterBasicCommands.cs
index d5aaec9680d..44429aba8c4 100644
--- a/libs/cluster/Session/RespClusterBasicCommands.cs
+++ b/libs/cluster/Session/RespClusterBasicCommands.cs
@@ -492,6 +492,7 @@ private bool NetworkClusterReset(out bool invalidParameters)
///
///
///
+ ///
private bool NetworkClusterPublish(out bool invalidParameters)
{
invalidParameters = false;
diff --git a/libs/cluster/Session/RespClusterReplicationCommands.cs b/libs/cluster/Session/RespClusterReplicationCommands.cs
index 7966343bd59..8600c93960b 100644
--- a/libs/cluster/Session/RespClusterReplicationCommands.cs
+++ b/libs/cluster/Session/RespClusterReplicationCommands.cs
@@ -2,6 +2,7 @@
// Licensed under the MIT license.
using System;
+using System.Diagnostics;
using System.Text;
using Garnet.client;
using Garnet.cluster.Server.Replication;
@@ -242,6 +243,8 @@ private bool NetworkClusterAppendLog(out bool invalidParameters)
return true;
}
+ LogPrimaryStream(previousAddress, currentAddress, nextAddress, logger);
+
var sbRecord = parseState.GetArgSliceByRef(4);
var currentConfig = clusterProvider.clusterManager.CurrentConfig;
@@ -263,6 +266,18 @@ private bool NetworkClusterAppendLog(out bool invalidParameters)
previousAddress, currentAddress, nextAddress);
}
+ [Conditional("DEBUG")]
+ static void LogPrimaryStream(long previousAddress, long currentAddress, long nextAddress, ILogger logger)
+ {
+ var state = new GarnetTestLoggingEvent()
+ {
+ Type = GarnetTestLoggingEventType.LogPrimaryStreamType,
+ Message = $"previousAddress: {previousAddress}, currentAddress: {currentAddress}, nextAddress: {nextAddress}",
+ };
+
+ logger?.LogTesting(state);
+ }
+
return true;
}
diff --git a/libs/common/Format.cs b/libs/common/Format.cs
index 345987fdb34..268df06ab9d 100644
--- a/libs/common/Format.cs
+++ b/libs/common/Format.cs
@@ -117,7 +117,7 @@ public static async Task TryCreateEndpoint(string singleAddressOrHos
foreach (var entry in ipAddresses)
{
var endpoint = new IPEndPoint(entry, port);
- var IsListening = await TryConnect(endpoint);
+ var IsListening = await TryConnect(endpoint).ConfigureAwait(false);
if (IsListening) return [endpoint];
}
}
@@ -149,7 +149,7 @@ async Task TryConnect(IPEndPoint endpoint)
{
try
{
- await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
+ await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port).ConfigureAwait(false);
logger?.LogTrace("Reachable {ip} {port}", endpoint.Address, endpoint.Port);
return true;
}
diff --git a/libs/common/LightClient.cs b/libs/common/LightClient.cs
index b72b85214fa..0eb8c199cde 100644
--- a/libs/common/LightClient.cs
+++ b/libs/common/LightClient.cs
@@ -136,7 +136,7 @@ private async Task ConnectSendSocketAsync(CancellationToken cancellation
NoDelay = true
};
- if (await TryConnectSocketAsync(socket, endpoint, cancellationToken))
+ if (await TryConnectSocketAsync(socket, endpoint, cancellationToken).ConfigureAwait(false))
return socket;
}
}
@@ -146,7 +146,7 @@ private async Task ConnectSendSocketAsync(CancellationToken cancellation
if (endpoint is not UnixDomainSocketEndPoint)
socket.NoDelay = true;
- if (await TryConnectSocketAsync(socket, endpoint, cancellationToken))
+ if (await TryConnectSocketAsync(socket, endpoint, cancellationToken).ConfigureAwait(false))
return socket;
}
diff --git a/libs/common/Networking/TcpNetworkHandlerBase.cs b/libs/common/Networking/TcpNetworkHandlerBase.cs
index 8a7fefe1c5d..8fc763f77a6 100644
--- a/libs/common/Networking/TcpNetworkHandlerBase.cs
+++ b/libs/common/Networking/TcpNetworkHandlerBase.cs
@@ -246,7 +246,7 @@ private async ValueTask HandleReceiveWithTLSAsync(object sender, SocketAsyncEven
var receiveTask = OnNetworkReceiveWithTLSAsync(e.BytesTransferred);
if (!receiveTask.IsCompletedSuccessfully)
{
- await receiveTask;
+ await receiveTask.ConfigureAwait(false);
}
e.SetBuffer(networkReceiveBuffer, networkBytesRead, networkReceiveBuffer.Length - networkBytesRead);
} while (!e.AcceptSocket.ReceiveAsync(e));
diff --git a/libs/common/ExceptionInjectionHelper.cs b/libs/common/Testing/ExceptionInjectionHelper.cs
similarity index 100%
rename from libs/common/ExceptionInjectionHelper.cs
rename to libs/common/Testing/ExceptionInjectionHelper.cs
diff --git a/libs/common/ExceptionInjectionType.cs b/libs/common/Testing/ExceptionInjectionType.cs
similarity index 100%
rename from libs/common/ExceptionInjectionType.cs
rename to libs/common/Testing/ExceptionInjectionType.cs
diff --git a/libs/common/Testing/GarnetTestLoggingEventType.cs b/libs/common/Testing/GarnetTestLoggingEventType.cs
new file mode 100644
index 00000000000..b2f67f3fb26
--- /dev/null
+++ b/libs/common/Testing/GarnetTestLoggingEventType.cs
@@ -0,0 +1,32 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using Microsoft.Extensions.Logging;
+
+namespace Garnet.common
+{
+ public enum GarnetTestLoggingEventType : int
+ {
+ LogPrimaryStreamType
+ };
+
+ public struct GarnetTestLoggingEvent
+ {
+ public GarnetTestLoggingEventType Type;
+ public string Message;
+
+ public override string ToString() => $"<{Type}>: {Message}";
+ }
+
+ public static class LoggingExtensions
+ {
+ public static void LogTesting(this ILogger logger, GarnetTestLoggingEvent state)
+ {
+ logger?.Log(LogLevel.Critical,
+ eventId: default,
+ state: state,
+ exception: null,
+ formatter: static (state, _) => $"{state}");
+ }
+ }
+}
\ No newline at end of file
diff --git a/libs/server/Metrics/GarnetServerMonitor.cs b/libs/server/Metrics/GarnetServerMonitor.cs
index ac820054946..9a34734f721 100644
--- a/libs/server/Metrics/GarnetServerMonitor.cs
+++ b/libs/server/Metrics/GarnetServerMonitor.cs
@@ -230,7 +230,7 @@ private async void MainMonitorTask(CancellationToken token)
{
while (true)
{
- await Task.Delay(monitorSamplingFrequency, token);
+ await Task.Delay(monitorSamplingFrequency, token).ConfigureAwait(false);
// Reset the session level latency metrics for the prior version, as we are
// about to make that the current version.
diff --git a/libs/server/Metrics/Info/InfoCommand.cs b/libs/server/Metrics/Info/InfoCommand.cs
index 0722fdd3b9e..77de500aff1 100644
--- a/libs/server/Metrics/Info/InfoCommand.cs
+++ b/libs/server/Metrics/Info/InfoCommand.cs
@@ -80,7 +80,6 @@ private bool NetworkINFO()
}
}
return true;
-
}
private void GetHelpMessage()
diff --git a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs
index 6feaa72ac2c..b9dff74b45d 100644
--- a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs
+++ b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs
@@ -94,7 +94,7 @@ internal async Task GetCollectionItemAsync(RespCommand com
RespServerSession session, double timeoutInSeconds, PinnedSpanByte[] cmdArgs = null)
{
var observer = new CollectionItemObserver(session, command, cmdArgs);
- return await GetCollectionItemAsync(observer, keys, timeoutInSeconds);
+ return await GetCollectionItemAsync(observer, keys, timeoutInSeconds).ConfigureAwait(false);
}
///
@@ -111,7 +111,7 @@ internal async Task MoveCollectionItemAsync(RespCommand co
RespServerSession session, double timeoutInSeconds, PinnedSpanByte[] cmdArgs)
{
var observer = new CollectionItemObserver(session, command, cmdArgs);
- return await GetCollectionItemAsync(observer, [srcKey], timeoutInSeconds);
+ return await GetCollectionItemAsync(observer, [srcKey], timeoutInSeconds).ConfigureAwait(false);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -143,7 +143,7 @@ private async Task GetCollectionItemAsync(CollectionItemOb
try
{
// Wait for either the result found notification or the timeout to expire
- await observer.ResultFoundSemaphore.WaitAsync(timeout, observer.CancellationTokenSource.Token);
+ await observer.ResultFoundSemaphore.WaitAsync(timeout, observer.CancellationTokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -693,7 +693,7 @@ private async Task Start()
// once event is dequeued successfully, call handler method
try
{
- nextEvent = await brokerEventsQueue.DequeueAsync(cts.Token);
+ nextEvent = await brokerEventsQueue.DequeueAsync(cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
diff --git a/libs/server/Resp/AsyncProcessor.cs b/libs/server/Resp/AsyncProcessor.cs
index 66d5466b875..404599fba6b 100644
--- a/libs/server/Resp/AsyncProcessor.cs
+++ b/libs/server/Resp/AsyncProcessor.cs
@@ -62,7 +62,7 @@ void NetworkGETPending(ref TGarnetApi storageApi)
RunContinuationsAsynchronously = true
};
var _storageApi = storageApi;
- _ = Task.Run(async () => await AsyncGetProcessor(_storageApi));
+ _ = Task.Run(async () => await AsyncGetProcessor(_storageApi).ConfigureAwait(false));
}
else
{
@@ -138,7 +138,7 @@ async Task AsyncGetProcessor(TGarnetApi storageApi)
// Wait for next async operation
// We do not need to cancel the wait - it should get garbage collected when the session ends
- await asyncWaiter.WaitAsync();
+ await asyncWaiter.WaitAsync().ConfigureAwait(false);
}
}
}
diff --git a/libs/server/Servers/StoreApi.cs b/libs/server/Servers/StoreApi.cs
index 5ff169c9fd5..418bbad3099 100644
--- a/libs/server/Servers/StoreApi.cs
+++ b/libs/server/Servers/StoreApi.cs
@@ -71,7 +71,7 @@ public async ValueTask WaitForCommitAsync(CancellationToken token = defaul
return false;
}
- return await storeWrapper.WaitForCommitAsync(token: token);
+ return await storeWrapper.WaitForCommitAsync(token: token).ConfigureAwait(false);
}
}
@@ -105,7 +105,7 @@ public async ValueTask CommitAOFAsync(CancellationToken token)
return false;
}
- return await storeWrapper.CommitAOFAsync(token: token);
+ return await storeWrapper.CommitAOFAsync(token: token).ConfigureAwait(false);
}
}
diff --git a/libs/server/Storage/Session/MainStore/AdvancedOps.cs b/libs/server/Storage/Session/MainStore/AdvancedOps.cs
index a28c3b75b6d..5ffc97f4c7e 100644
--- a/libs/server/Storage/Session/MainStore/AdvancedOps.cs
+++ b/libs/server/Storage/Session/MainStore/AdvancedOps.cs
@@ -98,7 +98,6 @@ public GarnetStatus Read_MainStore(ReadOnlySpan key, ref S
return GarnetStatus.NOTFOUND;
}
-
public void ReadWithPrefetch(ref TBatch batch, ref TContext context, long userContext = default)
where TBatch : IReadArgBatch
#if NET9_0_OR_GREATER
diff --git a/libs/server/TaskManager/TaskManager.cs b/libs/server/TaskManager/TaskManager.cs
index 7545296601e..9ed3be7225c 100644
--- a/libs/server/TaskManager/TaskManager.cs
+++ b/libs/server/TaskManager/TaskManager.cs
@@ -97,7 +97,7 @@ public bool RegisterAndRun(TaskType taskType, Func task
// Execute task factory
if (cleanupOnCompletion)
- taskMetadata.Task = taskFactory(taskMetadata.Cts.Token).ContinueWith(async _ => await Cancel(taskType)).Unwrap();
+ taskMetadata.Task = taskFactory(taskMetadata.Cts.Token).ContinueWith(async _ => await Cancel(taskType).ConfigureAwait(false)).Unwrap();
else
taskMetadata.Task = taskFactory(taskMetadata.Cts.Token);
}
@@ -131,7 +131,7 @@ public async Task Cancel(TaskType taskType)
{
taskMetadata.Cts.Cancel();
if (taskMetadata.Task != null)
- await taskMetadata.Task.WaitAsync(disposed ? default : cts.Token);
+ await taskMetadata.Task.WaitAsync(disposed ? default : cts.Token).ConfigureAwait(false);
}
}
catch (Exception ex)
@@ -149,7 +149,7 @@ public async Task Cancel(TaskType taskType)
public async Task Cancel(TaskPlacementCategory taskPlacementCategory)
{
foreach (var taskType in TaskTypeExtensions.GetTaskTypes(taskPlacementCategory))
- await Cancel(taskType);
+ await Cancel(taskType).ConfigureAwait(false);
}
///
@@ -171,7 +171,7 @@ public async Task WaitAsync(TaskType taskType, CancellationToken token = d
{
if (registry.TryGetValue(taskType, out var taskInfo))
{
- await taskInfo.Task.WaitAsync(token);
+ await taskInfo.Task.WaitAsync(token).ConfigureAwait(false);
return true;
}
return false;
diff --git a/libs/storage/Tsavorite/cs/src/core/Device/RandomAccessLocalStorageDevice.cs b/libs/storage/Tsavorite/cs/src/core/Device/RandomAccessLocalStorageDevice.cs
index 423f5fd20c7..0d2d85d4407 100644
--- a/libs/storage/Tsavorite/cs/src/core/Device/RandomAccessLocalStorageDevice.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Device/RandomAccessLocalStorageDevice.cs
@@ -239,7 +239,7 @@ async ValueTask WriteWorkerAsync(IntPtr sourceAddress, int segmentId, ulong dest
{
storageAccessContext.memoryManager.SetDestination((byte*)sourceAddress, (int)numBytesToWrite);
}
- await RandomAccess.WriteAsync(storageAccessContext.handle.SafeFileHandle, storageAccessContext.memoryManager.Memory, (long)destinationAddress);
+ await RandomAccess.WriteAsync(storageAccessContext.handle.SafeFileHandle, storageAccessContext.memoryManager.Memory, (long)destinationAddress).ConfigureAwait(false);
}
catch (Exception ex)
{
diff --git a/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs b/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs
index ca186c6a5f8..5a340e42615 100644
--- a/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs
@@ -193,7 +193,7 @@ int SelectInstance()
if (kInvalidIndex == Interlocked.CompareExchange(ref entry, 1, kInvalidIndex))
return i;
}
- throw new InvalidOperationException("Exceeded maximum number of active LightEpoch instances");
+ throw new InvalidOperationException($"Exceeded maximum number of active LightEpoch instances {ActiveInstanceCount()} {InstanceIndexBuffer.MaxInstances}");
}
///
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs
index 571ef07911d..f06a926ae76 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs
@@ -262,7 +262,7 @@ public async Task WaitForStateChange(SystemState currentState)
var _waitForTransitionOut = waitForTransitionOut;
if (SystemState.Equal(currentState, systemState))
{
- await _waitForTransitionOut.WaitAsync();
+ await _waitForTransitionOut.WaitAsync().ConfigureAwait(false);
}
}
@@ -273,12 +273,12 @@ public async Task WaitForStateChange(SystemState currentState)
///
public async Task WaitForCompletion(SystemState currentState)
{
- await WaitForStateChange(currentState);
+ await WaitForStateChange(currentState).ConfigureAwait(false);
currentState = systemState;
var _waitForTransitionIn = waitForTransitionIn;
if (SystemState.Equal(currentState, systemState))
{
- await _waitForTransitionIn.WaitAsync();
+ await _waitForTransitionIn.WaitAsync().ConfigureAwait(false);
}
}
@@ -304,7 +304,7 @@ void MakeTransitionWorker(SystemState nextState)
async Task ProcessWaitingListAsync(CancellationToken token = default)
{
- await waitForTransitionIn.WaitAsync(token);
+ await waitForTransitionIn.WaitAsync(token).ConfigureAwait(false);
if (waitForTransitionInException != null)
{
throw waitForTransitionInException;
@@ -332,7 +332,7 @@ async Task RunStateMachine(CancellationToken token = default)
do
{
GlobalStateMachineStep(systemState);
- await ProcessWaitingListAsync(token);
+ await ProcessWaitingListAsync(token).ConfigureAwait(false);
} while (systemState.Phase != Phase.REST);
}
catch (Exception e)
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs b/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs
index 057c2372bda..beaefe8cf1b 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs
@@ -241,7 +241,7 @@ async Task ResizerTask(CancellationToken cancellationToken)
// these calls to WaitAsync will be lost. ResizeIfNeeded retries as long as we are over budget,
// but there is still a chance we'll miss a growth+signal between that check and the next WaitAsync.
// The timeout mitigates this but it would be better to find an awaitable ManualResetEvent.
- await resizeTaskEvent.WaitAsync(TimeSpan.FromSeconds(ResizeTaskDelaySeconds), cancellationToken);
+ await resizeTaskEvent.WaitAsync(TimeSpan.FromSeconds(ResizeTaskDelaySeconds), cancellationToken).ConfigureAwait(false);
if (runState == (int)RunState.Running)
ResizeIfNeeded(cancellationToken);
if (runState != (int)RunState.Running)
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs
index 6060ab5f21e..360b16e71f8 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs
@@ -67,7 +67,6 @@ internal async ValueTask IsIndexFuzzyCheckpointCompletedAsync(CancellationToken
await t2.ConfigureAwait(false);
}
-
// Implementation of an asynchronous checkpointing scheme
// for main hash index of Tsavorite
private int mainIndexCheckpointCallbackCount;
@@ -166,7 +165,7 @@ private async ValueTask IsMainIndexCheckpointCompletedAsync(CancellationToken to
await mainIndexCheckpointTcs.Task.WaitAsync(token).ConfigureAwait(false);
}
- private unsafe void AsyncPageFlushCallback(uint errorCode, uint numBytes, object context)
+ private void AsyncPageFlushCallback(uint errorCode, uint numBytes, object context)
{
// Set the page status to flushed
var mem = ((HashIndexPageAsyncFlushResult)context).mem;
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs
index a540ab3e300..8f148c16f50 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs
@@ -761,7 +761,7 @@ private async Task TrimLogMemorySizeAsync(RecoveryStatus recoveryStatus, l
var pageIndex = hlogBase.GetPageIndexForPage(page);
if (hlogBase.IsAllocated(pageIndex))
{
- await recoveryStatus.WaitFlushAsync(pageIndex, cancellationToken);
+ await recoveryStatus.WaitFlushAsync(pageIndex, cancellationToken).ConfigureAwait(false);
hlogBase.EvictPageForRecovery(page);
lastFreedPage = page;
}
@@ -898,7 +898,7 @@ private async ValueTask RecoverHybridLogAsync(long scanFromAddress, long r
// Trim the log memory again in case we read large objects on the current page. Add 1 to tailPage so that
// when the BufferSize subtraction wraps around the buffer it won't try to evict the page we just added.
// Decrease trimPageReadCount as we process each page so we don't over-prune.
- freedPage = await TrimLogMemorySizeAsync(recoveryStatus, tailPage: p + 1, trimPageReadCount--, cancellationToken);
+ freedPage = await TrimLogMemorySizeAsync(recoveryStatus, tailPage: p + 1, trimPageReadCount--, cancellationToken).ConfigureAwait(false);
if (freedPage != NoPageFreed)
lastFreedPage = freedPage;
}
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/Revivification/CheckEmptyWorker.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/Revivification/CheckEmptyWorker.cs
index 2d6546c9773..1376454f77b 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/Revivification/CheckEmptyWorker.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/Revivification/CheckEmptyWorker.cs
@@ -54,7 +54,7 @@ private async void LaunchWorker()
{
try
{
- await Task.Delay(1000, cts.Token);
+ await Task.Delay(1000, cts.Token).ConfigureAwait(false);
if (disposed)
break;
recordPool.ScanForEmpty(cts.Token);
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs
index 619eabf2d66..6ebc2261abd 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs
@@ -495,7 +495,7 @@ public async ValueTask CompleteCheckpointAsync(CancellationToken token = default
token.ThrowIfCancellationRequested();
try
{
- await stateMachineDriver.CompleteAsync(token);
+ await stateMachineDriver.CompleteAsync(token).ConfigureAwait(false);
}
catch
{
@@ -831,7 +831,7 @@ public async Task GrowIndexAsync()
var indexResizeTask = new IndexResizeSMTask(this);
var indexResizeSM = new IndexResizeSM(indexResizeTask);
- return await stateMachineDriver.RunAsync(indexResizeSM);
+ return await stateMachineDriver.RunAsync(indexResizeSM).ConfigureAwait(false);
}
///
diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/LogCommitPolicy.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/LogCommitPolicy.cs
index a82cc6df0f0..933021d10fe 100644
--- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/LogCommitPolicy.cs
+++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/LogCommitPolicy.cs
@@ -175,7 +175,7 @@ public override bool AdmitCommit(long currentTail, bool commitRequired)
{
Task.Run(async () =>
{
- await Task.Delay(TimeSpan.FromMilliseconds(thresholdMilli));
+ await Task.Delay(TimeSpan.FromMilliseconds(thresholdMilli)).ConfigureAwait(false);
shouldRetry = 0;
log.Commit();
});
diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs
index 4bb9cf5dff1..93434dafc46 100644
--- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs
+++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs
@@ -174,9 +174,9 @@ public sealed class TsavoriteLog : IDisposable
/// Create new log instance
///
/// Log settings
- /// Log settings
+ /// User provided logger instance
public TsavoriteLog(TsavoriteLogSettings logSettings, ILogger logger = null)
- : this(logSettings, logSettings.TryRecoverLatest, logger)
+ : this(logSettings, logSettings.TryRecoverLatest, logger: logger)
{ }
///
@@ -1600,7 +1600,6 @@ public async ValueTask WaitUncommittedAsync(long nextAddress, Cancellation
/// If true, spin-wait until commit completes. Otherwise, issue commit and return immediately.
///
/// whether there is anything to commit.
-
public void Commit(bool spinWait = false, byte[] cookie = null)
{
// Take a lower-bound of the content of this commit in case our request is filtered but we need to spin
@@ -1650,6 +1649,8 @@ public bool CommitStrongly(out long commitTail, out long actualCommitNum, bool s
/// complete the commit. Throws exception if this or any
/// ongoing commit fails.
///
+ ///
+ ///
///
public async ValueTask CommitAsync(byte[] cookie = null, CancellationToken token = default)
{
@@ -2529,7 +2530,7 @@ public async ValueTask RecoverReadOnlyAsync(CancellationToken cancellationToken
private void SignalWaitingROIterators()
{
- // One RecoverReadOnly use case is to allow a TsavoriteLogIterator to continuously read a mirror TsavoriteLog (over the same log storage) of a primary TsavoriteLog.
+ // One RecoverReadOnly use case is to allow a TsavoriteLogScanIterator to continuously read a mirror TsavoriteLog (over the same log storage) of a primary TsavoriteLog.
// In this scenario, when the iterator arrives at the tail after a previous call to RestoreReadOnly, it will wait asynchronously until more data
// is committed and read by a subsequent call to RecoverReadOnly. Here, we signal iterators that we have completed recovery.
var _commitTcs = commitTcs;
@@ -3053,7 +3054,6 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
ongoingCommitRequests.Enqueue((commitTail, info));
}
-
// As an optimization, if a concurrent flush has already advanced FlushedUntilAddress
// past this commit, we can manually trigger a commit callback for safety, and return.
if (commitTail <= FlushedUntilAddress)
diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogRecoveryInfo.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogRecoveryInfo.cs
index e54cd22bb21..fe3e4c760d0 100644
--- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogRecoveryInfo.cs
+++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogRecoveryInfo.cs
@@ -44,7 +44,7 @@ public struct TsavoriteLogRecoveryInfo
public bool FastForwardAllowed;
///
- /// callback to invoke when commit is presistent
+ /// callback to invoke when commit is persisted
///
public Action Callback;
diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogScanIterator.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogScanIterator.cs
index 392e4cf02b7..ce94a4ec3ab 100644
--- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogScanIterator.cs
+++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogScanIterator.cs
@@ -461,6 +461,9 @@ public unsafe bool TryConsumeNext(T consumer) where T : ILogEntryConsumer
/// whether a next entry is present
public unsafe bool TryBulkConsumeNext(T consumer, int maxChunkSize = 0) where T : IBulkLogEntryConsumer
{
+ // Throttle and implicitly check for consumer liveness
+ consumer.Throttle();
+
if (maxChunkSize == 0) maxChunkSize = allocator.PageSize;
if (disposed)
diff --git a/libs/storage/Tsavorite/cs/src/core/Utilities/Utility.cs b/libs/storage/Tsavorite/cs/src/core/Utilities/Utility.cs
index 47d233b4068..4e71c5a52c0 100644
--- a/libs/storage/Tsavorite/cs/src/core/Utilities/Utility.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Utilities/Utility.cs
@@ -434,14 +434,14 @@ private static async Task SlowWithCancellationAsync(Task task, Cancella
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
using (token.Register(s => ((TaskCompletionSource)s).TrySetResult(true), tcs, useSynchronizationContext))
{
- if (task != await Task.WhenAny(task, tcs.Task))
+ if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false))
{
token.ThrowIfCancellationRequested();
}
}
// make sure any exceptions in the task get unwrapped and exposed to the caller.
- return await task;
+ return await task.ConfigureAwait(false);
}
///
diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
index 93d8eb6c99e..67b4b952c17 100644
--- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
+++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
@@ -197,7 +197,7 @@ await BlobManager.PerformWithRetriesAsync(
pageResults = page.Values;
continuationToken = page.ContinuationToken;
return page.Values.Count; // not accurate, in terms of bytes, but still useful for tracing purposes
- });
+ }).ConfigureAwait(false);
foreach (var item in pageResults)
{
@@ -217,7 +217,7 @@ await BlobManager.PerformWithRetriesAsync(
while (!string.IsNullOrEmpty(continuationToken));
// make sure we did not lose the lease while iterating to find the blobs
- await BlobManager.ConfirmLeaseIsGoodForAWhileAsync();
+ await BlobManager.ConfirmLeaseIsGoodForAWhileAsync().ConfigureAwait(false);
StorageErrorHandler.Token.ThrowIfCancellationRequested();
@@ -384,7 +384,7 @@ public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAs
async (numAttempts) =>
{
var client = (numAttempts > 1) ? entry.PageBlob.Default : entry.PageBlob.Aggressive;
- await client.DeleteAsync(cancellationToken: StorageErrorHandler.Token);
+ await client.DeleteAsync(cancellationToken: StorageErrorHandler.Token).ConfigureAwait(false);
return 1;
});
}
@@ -419,7 +419,7 @@ Task Delete(BlobEntry entry)
async (numAttempts) =>
{
var client = (numAttempts > 1) ? entry.PageBlob.Default : entry.PageBlob.Aggressive;
- await client.DeleteAsync(cancellationToken: StorageErrorHandler.Token);
+ await client.DeleteAsync(cancellationToken: StorageErrorHandler.Token).ConfigureAwait(false);
return 1;
});
}
@@ -585,7 +585,7 @@ await BlobManager.PerformWithRetriesAsync(
},
async () =>
{
- var response = await blobEntry.PageBlob.Default.GetPropertiesAsync();
+ var response = await blobEntry.PageBlob.Default.GetPropertiesAsync().ConfigureAwait(false);
blobEntry.ETag = response.Value.ETag;
}).ConfigureAwait(false);
@@ -642,7 +642,7 @@ await BlobManager.PerformWithRetriesAsync(
}
return length;
- });
+ }).ConfigureAwait(false);
readLength -= length;
offset += length;
@@ -666,34 +666,34 @@ unsafe void WriteToBlobAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong de
{
WriteToBlobAsync(blobEntry, sourceAddress, (long)destinationAddress, numBytesToWrite, id)
.ContinueWith((Task t) =>
+ {
+ if (pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
{
- if (pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
+ if (t.IsFaulted)
{
- if (t.IsFaulted)
- {
- BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} (Failure)");
- request.Callback(uint.MaxValue, request.NumBytes, request.Context);
- }
- else
- {
- BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id}");
- request.Callback(0, request.NumBytes, request.Context);
- }
+ BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} (Failure)");
+ request.Callback(uint.MaxValue, request.NumBytes, request.Context);
}
-
- if (underLease)
+ else
{
- InitialWriterSemaphore.Release();
+ BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id}");
+ request.Callback(0, request.NumBytes, request.Context);
}
+ }
- }, TaskContinuationOptions.ExecuteSynchronously);
+ if (underLease)
+ {
+ InitialWriterSemaphore.Release();
+ }
+
+ }, TaskContinuationOptions.ExecuteSynchronously);
}
async Task WriteToBlobAsync(BlobEntry blobEntry, IntPtr sourceAddress, long destinationAddress, uint numBytesToWrite, long id)
{
if (underLease)
{
- await InitialWriterSemaphore.WaitAsync();
+ await InitialWriterSemaphore.WaitAsync().ConfigureAwait(false);
}
long offset = 0;
diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobEntry.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobEntry.cs
index 9068b8543a8..f201d322e3c 100644
--- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobEntry.cs
+++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobEntry.cs
@@ -71,16 +71,16 @@ await azureStorageDevice.BlobManager.PerformWithRetriesAsync(
var response = await client.CreateAsync(
size: size,
conditions: new Azure.Storage.Blobs.Models.PageBlobRequestConditions() { IfNoneMatch = Azure.ETag.All },
- cancellationToken: azureStorageDevice.StorageErrorHandler.Token);
+ cancellationToken: azureStorageDevice.StorageErrorHandler.Token).ConfigureAwait(false);
ETag = response.Value.ETag;
return 1;
},
async () =>
{
- var response = await pageBlob.Default.GetPropertiesAsync();
+ var response = await pageBlob.Default.GetPropertiesAsync().ConfigureAwait(false);
ETag = response.Value.ETag;
- });
+ }).ConfigureAwait(false);
// At this point the blob is fully created. After this line all consequent writers will write immediately. We just
// need to clear the queue of pending writers.
diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobManager.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobManager.cs
index c4fc8ef4368..e6758171179 100644
--- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobManager.cs
+++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobManager.cs
@@ -107,7 +107,7 @@ async Task StartAsync()
{
leaseBlob = leaseBlobDirectory.GetBlockBlobClient(LeaseBlobName);
leaseClient = leaseBlob.WithRetries.GetBlobLeaseClient();
- await AcquireOwnership();
+ await AcquireOwnership().ConfigureAwait(false);
}
///
@@ -130,7 +130,7 @@ public async Task StopAsync()
{
shutDownOrTermination.Cancel(); // has no effect if already cancelled
- await LeaseMaintenanceLoopTask; // wait for loop to terminate cleanly
+ await LeaseMaintenanceLoopTask.ConfigureAwait(false); // wait for loop to terminate cleanly
}
///
@@ -186,7 +186,7 @@ await leaseClient.AcquireAsync(
// the previous owner has not released the lease yet,
// try again until it becomes available, should be relatively soon
// as the transport layer is supposed to shut down the previous owner when starting this
- await Task.Delay(TimeSpan.FromSeconds(1), StorageErrorHandler.Token);
+ await Task.Delay(TimeSpan.FromSeconds(1), StorageErrorHandler.Token).ConfigureAwait(false);
continue;
}
@@ -207,7 +207,7 @@ await PerformWithRetriesAsync(
try
{
var client = numAttempts > 2 ? leaseBlob.Default : leaseBlob.Aggressive;
- await client.UploadAsync(new MemoryStream());
+ await client.UploadAsync(new MemoryStream()).ConfigureAwait(false);
}
catch (Azure.RequestFailedException ex2) when (BlobUtilsV12.LeaseConflictOrExpired(ex2))
{
@@ -216,7 +216,7 @@ await PerformWithRetriesAsync(
}
return 1;
- });
+ }).ConfigureAwait(false);
continue;
}
@@ -241,7 +241,7 @@ await PerformWithRetriesAsync(
{
TimeSpan nextRetryIn = GetDelayBetweenRetries(numAttempts);
TraceHelper.TsavoritePerfWarning($"Lease acquisition failed transiently, retrying in {nextRetryIn}");
- await Task.Delay(nextRetryIn);
+ await Task.Delay(nextRetryIn).ConfigureAwait(false);
}
continue;
}
@@ -303,7 +303,7 @@ public async Task MaintenanceLoopAsync()
}
// wait for successful renewal, or exit the loop as this throws
- await NextLeaseRenewalTask;
+ await NextLeaseRenewalTask.ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -332,7 +332,7 @@ public async Task MaintenanceLoopAsync()
&& !StorageErrorHandler.IsTerminated
&& (leaseTimer?.Elapsed < LeaseDuration))
{
- await Task.Delay(20); // give storage accesses that are in progress and require the lease a chance to complete
+ await Task.Delay(20).ConfigureAwait(false); // give storage accesses that are in progress and require the lease a chance to complete
}
TraceHelper.LeaseProgress("Waited for lease users to complete");
diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobUtilsV12.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobUtilsV12.cs
index 9109dfa73c3..c0a4a55f787 100644
--- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobUtilsV12.cs
+++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/BlobUtilsV12.cs
@@ -203,7 +203,7 @@ public static async Task ForceDeleteAsync(BlobContainerClient containerCli
try
{
- await blob.DeleteAsync();
+ await blob.DeleteAsync().ConfigureAwait(false);
return true;
}
catch (Azure.RequestFailedException e) when (BlobDoesNotExist(e))
@@ -215,7 +215,7 @@ public static async Task ForceDeleteAsync(BlobContainerClient containerCli
try
{
var leaseClient = new BlobLeaseClient(blob);
- await leaseClient.BreakAsync(TimeSpan.Zero);
+ await leaseClient.BreakAsync(TimeSpan.Zero).ConfigureAwait(false);
}
catch
{
diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageErrorHandler.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageErrorHandler.cs
index 484503ab158..1425f56a818 100644
--- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageErrorHandler.cs
+++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageErrorHandler.cs
@@ -145,7 +145,7 @@ void Shutdown()
public async Task WaitForTermination(TimeSpan timeout)
{
Task timeoutTask = Task.Delay(timeout);
- var first = await Task.WhenAny(timeoutTask, shutdownComplete.Task);
+ var first = await Task.WhenAny(timeoutTask, shutdownComplete.Task).ConfigureAwait(false);
return first == shutdownComplete.Task;
}
}
diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageOperations.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageOperations.cs
index 9c2d8e12820..009f5830c4b 100644
--- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageOperations.cs
+++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/StorageOperations.cs
@@ -27,7 +27,7 @@ public async Task PerformWithRetriesAsync(
{
if (semaphore != null)
{
- await semaphore.WaitAsync();
+ await semaphore.WaitAsync().ConfigureAwait(false);
}
Stopwatch stopwatch = new();
@@ -49,7 +49,7 @@ public async Task PerformWithRetriesAsync(
}
Interlocked.Increment(ref LeaseUsers);
- await ConfirmLeaseIsGoodForAWhileAsync();
+ await ConfirmLeaseIsGoodForAWhileAsync().ConfigureAwait(false);
}
StorageErrorHandler.Token.ThrowIfCancellationRequested();
diff --git a/test/Garnet.test.cluster/ClusterTestContext.cs b/test/Garnet.test.cluster/ClusterTestContext.cs
index fce61b07f95..94ea171cb48 100644
--- a/test/Garnet.test.cluster/ClusterTestContext.cs
+++ b/test/Garnet.test.cluster/ClusterTestContext.cs
@@ -10,6 +10,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using Garnet.common;
using Garnet.server;
using Garnet.server.Auth.Settings;
using Microsoft.Extensions.Logging;
@@ -30,6 +31,7 @@ public class ClusterTestContext
public EndPointCollection endpoints;
public TextWriter logTextWriter = TestContext.Progress;
public ILoggerFactory loggerFactory;
+ public NUnitLoggerProvider loggerProvider;
public ILogger logger;
public int defaultShards = 3;
@@ -44,6 +46,12 @@ public class ClusterTestContext
public CancellationTokenSource cts;
+ public void EnableGarnetLoggingEvents(GarnetTestLoggingEventType[] events)
+ {
+ foreach (var e in events)
+ loggerProvider.GarnetTestLoggingEvents[(int)e] = true;
+ }
+
public void Setup(Dictionary monitorTests, int testTimeoutSeconds = 60)
{
cts = new CancellationTokenSource(TimeSpan.FromSeconds(testTimeoutSeconds));
@@ -52,7 +60,7 @@ public void Setup(Dictionary monitorTests, int testTimeoutSeco
var logLevel = LogLevel.Error;
if (!string.IsNullOrEmpty(TestContext.CurrentContext.Test.MethodName) && monitorTests.TryGetValue(TestContext.CurrentContext.Test.MethodName, out var value))
logLevel = value;
- loggerFactory = TestUtils.CreateLoggerFactoryInstance(logTextWriter, logLevel, scope: TestContext.CurrentContext.Test.FullName);
+ (loggerFactory, loggerProvider) = TestUtils.CreateLoggerFactoryInstance(logTextWriter, logLevel, scope: TestContext.CurrentContext.Test.FullName);
logger = loggerFactory.CreateLogger(TestContext.CurrentContext.Test.FullName);
logger.LogDebug("0. Setup >>>>>>>>>>>>");
r = new Random(674386);
diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs
index 9fabda2bf8b..d691f745b6f 100644
--- a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs
+++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs
@@ -152,6 +152,9 @@ public void ClusterSRTest([Values] bool disableObjects)
[Category("REPLICATION")]
public void ClusterSRNoCheckpointRestartSecondary([Values] bool performRMW, [Values] bool disableObjects)
{
+ if (useTLS)
+ context.EnableGarnetLoggingEvents([GarnetTestLoggingEventType.LogPrimaryStreamType]);
+
var replica_count = 1;// Per primary
var primary_count = 1;
var primaryIndex = 0;
diff --git a/test/Garnet.test/Extensions/BulkIncrementBy.cs b/test/Garnet.test/Extensions/BulkIncrementBy.cs
index c6231d11bf4..361bfe2927f 100644
--- a/test/Garnet.test/Extensions/BulkIncrementBy.cs
+++ b/test/Garnet.test/Extensions/BulkIncrementBy.cs
@@ -9,8 +9,9 @@ namespace Garnet
{
sealed class BulkIncrementBy : CustomTransactionProcedure
{
- // BULKINCRBY 2 a 10 [b 15] [c 25] ...
+ // BULKINCRBY k1 incrby1 [k2 incrby2 [k3 incrby3 ...]]
public static readonly RespCommandsInfo CommandInfo = new() { Arity = -4 };
+ public static readonly string Name = "BULKINCRBY";
public override bool Prepare(TGarnetReadApi api, ref CustomProcedureInput procInput)
{
@@ -23,7 +24,7 @@ public override bool Prepare(TGarnetReadApi api, ref CustomProce
for (var i = 0; i < count; i++)
{
AddKey(GetNextArg(ref procInput, ref offset), LockType.Exclusive, storeType: StoreType.Main);
- GetNextArg(ref procInput, ref offset);
+ _ = GetNextArg(ref procInput, ref offset);
}
return true;
diff --git a/test/Garnet.test/Extensions/BulkRead.cs b/test/Garnet.test/Extensions/BulkRead.cs
index 7b938be6e8d..f6072a79921 100644
--- a/test/Garnet.test/Extensions/BulkRead.cs
+++ b/test/Garnet.test/Extensions/BulkRead.cs
@@ -9,8 +9,9 @@ namespace Garnet
{
sealed class BulkRead : CustomTransactionProcedure
{
- // BULKREAD 3 a [b] [c]
+ // BULKREAD a [b] [c]
public static readonly RespCommandsInfo CommandInfo = new() { Arity = -3 };
+ public static readonly string Name = "BULKREAD";
public override bool Prepare(TGarnetReadApi api, ref CustomProcedureInput procInput)
{
diff --git a/test/Garnet.test/NUnitLoggerProvider.cs b/test/Garnet.test/NUnitLoggerProvider.cs
index 8d223958330..401bad77b8e 100644
--- a/test/Garnet.test/NUnitLoggerProvider.cs
+++ b/test/Garnet.test/NUnitLoggerProvider.cs
@@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.IO;
+using System.Linq;
using Garnet.common;
using Microsoft.Extensions.Logging;
@@ -18,6 +19,11 @@ public class NUnitLoggerProvider : ILoggerProvider
private readonly bool matchLevel;
private readonly LogLevel logLevel;
+ ///
+ /// Array of enabled test logging flag types
+ ///
+ public bool[] GarnetTestLoggingEvents = [.. Enum.GetValues().Select(_ => false)];
+
static readonly string[] lvl =
[
"trce",
@@ -38,13 +44,14 @@ public NUnitLoggerProvider(TextWriter textWriter, string scope = "", HashSet new NUnitLogger(categoryName, textWriter, scope, skipCmd, recvOnly: recvOnly, matchLevel: matchLevel, logLevel: logLevel);
+ public ILogger CreateLogger(string categoryName) => new NUnitLogger(this, categoryName, textWriter, scope, skipCmd, recvOnly: recvOnly, matchLevel: matchLevel, logLevel: logLevel);
public void Dispose()
{ }
private class NUnitLogger : ILogger
{
+ private readonly NUnitLoggerProvider provider;
private readonly string categoryName;
private readonly TextWriter textWriter;
private readonly string scope;
@@ -53,8 +60,9 @@ private class NUnitLogger : ILogger
private readonly bool matchLevel;
private readonly LogLevel logLevel;
- public NUnitLogger(string categoryName, TextWriter textWriter, string scope, HashSet skipCmd = null, bool recvOnly = false, bool matchLevel = false, LogLevel logLevel = LogLevel.None)
+ public NUnitLogger(NUnitLoggerProvider provider, string categoryName, TextWriter textWriter, string scope, HashSet skipCmd = null, bool recvOnly = false, bool matchLevel = false, LogLevel logLevel = LogLevel.None)
{
+ this.provider = provider;
this.categoryName = categoryName;
this.textWriter = textWriter;
this.scope = scope;
@@ -77,11 +85,26 @@ public void Log(
Exception exception,
Func formatter)
{
- if ((matchLevel && logLevel == this.logLevel) || !matchLevel)
+ if (state is GarnetTestLoggingEvent _state)
+ {
+ if (provider.GarnetTestLoggingEvents[(int)_state.Type])
+ {
+ var msg = string.Format("[{0:d1}.{1}.({2})] |{3}| <{4}> {5} ^{6}^",
+ eventId.Id,
+ LogFormatter.FormatTime(DateTime.UtcNow),
+ GetLevelStr(logLevel),
+ scope,
+ categoryName,
+ exception,
+ formatter(state, exception));
+ textWriter.Write(msg);
+ }
+ }
+ else if ((matchLevel && logLevel == this.logLevel) || !matchLevel)
{
- var msg = string.Format("[{0:D3}.{1}.({2})] |{3}| <{4}> {5} ^{6}^",
+ var msg = string.Format("[{0:d1}.{1}.({2})] |{3}| <{4}> {5} ^{6}^",
eventId.Id,
- LogFormatter.FormatDate(DateTime.UtcNow),
+ LogFormatter.FormatTime(DateTime.UtcNow),
GetLevelStr(logLevel),
scope,
categoryName,
diff --git a/test/Garnet.test/RespMetricsTest.cs b/test/Garnet.test/RespMetricsTest.cs
index 1ab26aa6ece..6c07fb22939 100644
--- a/test/Garnet.test/RespMetricsTest.cs
+++ b/test/Garnet.test/RespMetricsTest.cs
@@ -33,7 +33,7 @@ public void Setup()
server = null;
r = new Random(674386);
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
- loggerFactory = TestUtils.CreateLoggerFactoryInstance(TestContext.Progress, LogLevel.Error);
+ (loggerFactory, _) = TestUtils.CreateLoggerFactoryInstance(TestContext.Progress, LogLevel.Error);
}
[TearDown]
diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs
index f58aa882332..e362ba0deac 100644
--- a/test/Garnet.test/TestUtils.cs
+++ b/test/Garnet.test/TestUtils.cs
@@ -455,13 +455,20 @@ public static GarnetServer CreateGarnetServer(
///
///
///
- public static ILoggerFactory CreateLoggerFactoryInstance(TextWriter textWriter, LogLevel logLevel, string scope = "", HashSet skipCmd = null, bool recvOnly = false, bool matchLevel = false)
+ public static (ILoggerFactory, NUnitLoggerProvider) CreateLoggerFactoryInstance(
+ TextWriter textWriter,
+ LogLevel logLevel,
+ string scope = "",
+ HashSet skipCmd = null,
+ bool recvOnly = false,
+ bool matchLevel = false)
{
- return LoggerFactory.Create(builder =>
+ var provider = new NUnitLoggerProvider(textWriter, scope, skipCmd, recvOnly, matchLevel, logLevel);
+ return (LoggerFactory.Create(builder =>
{
- builder.AddProvider(new NUnitLoggerProvider(textWriter, scope, skipCmd, recvOnly, matchLevel, logLevel));
+ builder.AddProvider(provider);
builder.SetMinimumLevel(logLevel);
- });
+ }), provider);
}
public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnetCluster(