diff --git a/Directory.Build.props b/Directory.Build.props
index 58334de25..d06415e90 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -10,7 +10,7 @@
true
$(MSBuildThisFileDirectory)Shared.ruleset
NETSDK1069
- $(NoWarn);NU5105;NU1507;SER001;SER002;SER003;SER004;SER005;SER006
+ $(NoWarn);NU5105;NU1507;SER001;SER002;SER003;SER004;SER005;SER006;SER007
https://stackexchange.github.io/StackExchange.Redis/ReleaseNotes
https://stackexchange.github.io/StackExchange.Redis/
MIT
diff --git a/Directory.Packages.props b/Directory.Packages.props
index 9767a0ab1..f1f070b85 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -32,6 +32,7 @@
+
diff --git a/docs/ActiveActive.md b/docs/ActiveActive.md
new file mode 100644
index 000000000..b58edd656
--- /dev/null
+++ b/docs/ActiveActive.md
@@ -0,0 +1,269 @@
+# Active:Active
+
+## Overview
+
+The Active:Active feature provides automatic failover and intelligent routing across multiple Redis deployments. The library
+automatically selects the best available endpoint based on:
+
+1. **Availability** - Connected endpoints are always preferred over disconnected ones
+2. **Weight** - User-defined preference values (higher is better)
+3. **Latency** - Measured response times (lower is better)
+
+This enables scenarios such as:
+- Multi-datacenter deployments with automatic failover
+- Geographic routing to the nearest Redis instance
+- Graceful degradation during maintenance or outages
+- Load distribution across multiple Redis clusters
+
+## Basic Usage
+
+### Connecting to Multiple Groups
+
+To create an Active:Active connection, use `ConnectionMultiplexer.ConnectGroupAsync()` with an array of `ConnectionGroupMember` instances:
+
+```csharp
+using StackExchange.Redis;
+
+// Define your Redis endpoints
+ConnectionGroupMember[] members = [
+ new("us-east.redis.example.com:6379", name: "US East"),
+ new("us-west.redis.example.com:6379", name: "US West"),
+ new("eu-central.redis.example.com:6379", name: "EU Central")
+];
+
+// Connect to all members
+await using var conn = await ConnectionMultiplexer.ConnectGroupAsync(members);
+
+// Use the connection normally
+var db = conn.GetDatabase();
+await db.StringSetAsync("mykey", "myvalue");
+var value = await db.StringGetAsync("mykey");
+```
+
+### Using ConfigurationOptions
+
+You can also use `ConfigurationOptions` for more advanced configuration:
+
+```csharp
+var eastConfig = new ConfigurationOptions
+{
+ EndPoints = { "us-east-1.redis.example.com:6379", "us-east-2.redis.example.com:6379" },
+ Password = "your-password",
+ Ssl = true,
+};
+
+var westConfig = new ConfigurationOptions
+{
+ EndPoints = { "us-west-1.redis.example.com:6379", "us-west-2.redis.example.com:6379" },
+ Password = "another-different-password",
+ Ssl = true,
+};
+
+ConnectionGroupMember[] members = [
+ new(eastConfig, name: "US East"),
+ new(westConfig, name: "US West")
+];
+
+await using var conn = await ConnectionMultiplexer.ConnectGroupAsync(members);
+```
+
+## Configuring Weights
+
+Weights allow you to express preference for specific endpoints. Higher weights are preferred when multiple endpoints are available:
+
+```csharp
+ConnectionGroupMember[] members = [
+ new("local-dc.redis.example.com:6379") { Weight = 10 }, // Strongly preferred
+ new("nearby-dc.redis.example.com:6379") { Weight = 5 }, // Moderately preferred
+ new("remote-dc.redis.example.com:6379") { Weight = 1 } // Fallback option
+];
+
+await using var conn = await ConnectionMultiplexer.ConnectGroupAsync(members);
+```
+
+Weights can be adjusted dynamically:
+
+```csharp
+// Adjust weight based on runtime conditions
+members[0].Weight = 1; // Reduce preference for local DC
+members[2].Weight = 10; // Increase preference for remote DC
+```
+
+## Working with IDatabase
+
+The `IDatabase` interface works transparently with Active:Active connections. All operations are automatically routed to the currently selected endpoint:
+
+```csharp
+var db = conn.GetDatabase();
+
+// String operations
+await db.StringSetAsync("user:1:name", "Alice");
+var name = await db.StringGetAsync("user:1:name");
+
+// Hash operations
+await db.HashSetAsync("user:1", new HashEntry[] {
+ new("name", "Alice"),
+ new("email", "alice@example.com")
+});
+
+// List operations
+await db.ListRightPushAsync("queue:tasks", "task1");
+var task = await db.ListLeftPopAsync("queue:tasks");
+
+// Set operations
+await db.SetAddAsync("tags", new RedisValue[] { "redis", "cache", "database" });
+var members = await db.SetMembersAsync("tags");
+
+// Sorted set operations
+await db.SortedSetAddAsync("leaderboard", "player1", 100);
+var rank = await db.SortedSetRankAsync("leaderboard", "player1");
+
+// Transactions
+var tran = db.CreateTransaction();
+var t1 = tran.StringSetAsync("key1", "value1");
+var t2 = tran.StringSetAsync("key2", "value2");
+if (await tran.ExecuteAsync())
+{
+ await t1;
+ await t2;
+}
+
+// Batches
+var batch = db.CreateBatch();
+var b1 = batch.StringSetAsync("key1", "value1");
+var b2 = batch.StringSetAsync("key2", "value2");
+batch.Execute();
+await Task.WhenAll(b1, b2);
+```
+
+## Working with ISubscriber
+
+Pub/Sub operations work across all connected endpoints. When you subscribe to a channel, the subscription is established against *all* endpoints (for immediate pickup
+during failover events), and received messages are filtered in the library so only the messages for the *active* endpoint are observed. Message publishing
+occurs only to the *active* endpoint. The effect of this is that pub/sub works transparently as though
+you were only talking to the *active* endpoint:
+
+```csharp
+var subscriber = conn.GetSubscriber();
+
+// Subscribe to a channel
+await subscriber.SubscribeAsync(RedisChannel.Literal("notifications"), (channel, message) =>
+{
+ Console.WriteLine($"Received: {message}");
+});
+
+// Publish to a channel
+await subscriber.PublishAsync(RedisChannel.Literal("notifications"), "Hello, World!");
+
+// Pattern-based subscriptions
+await subscriber.SubscribeAsync(RedisChannel.Pattern("events:*"), (channel, message) =>
+{
+ Console.WriteLine($"Event on {channel}: {message}");
+});
+
+// Unsubscribe
+await subscriber.UnsubscribeAsync(RedisChannel.Literal("notifications"));
+```
+
+**Note:** When the active endpoint changes (due to failover), subscriptions are automatically re-established on the new endpoint.
+
+## Monitoring Connection Changes
+
+You can monitor when the active connection changes using the `ConnectionChanged` event:
+
+```csharp
+conn.ConnectionChanged += (sender, args) =>
+{
+ Console.WriteLine($"Connection changed: {args.Type}");
+ Console.WriteLine($"Previous: {args.PreviousGroup?.Name ?? "(none)"}");
+ Console.WriteLine($"Current: {args.Group.Name}");
+};
+```
+
+## Monitoring Member Status
+
+Each `ConnectionGroupMember` provides status information:
+
+```csharp
+foreach (var member in conn.GetMembers())
+{
+ Console.WriteLine($"{member.Name}:");
+ Console.WriteLine($" Connected: {member.IsConnected}");
+ Console.WriteLine($" Weight: {member.Weight}");
+ Console.WriteLine($" Latency: {member.Latency}");
+}
+```
+
+These are the same instances that were passed into `ConnectGroupAsync`.
+
+## Dynamic Member Management
+
+You can add or remove members dynamically using the `IConnectionGroup` interface:
+
+```csharp
+// Cast to IConnectionGroup to access dynamic member management
+var group = (IConnectionGroup)conn;
+
+// Add a new member at runtime
+var newMember = new ConnectionGroupMember("new-dc.redis.example.com:6379", name: "New Datacenter")
+{
+ Weight = 5
+};
+await group.AddAsync(newMember);
+Console.WriteLine($"Added {newMember.Name} to the group");
+
+// Remove a member
+var memberToRemove = members[2]; // Reference to an existing member
+if (group.Remove(memberToRemove))
+{
+ Console.WriteLine($"Removed {memberToRemove.Name} from the group");
+}
+else
+{
+ Console.WriteLine($"Failed to remove {memberToRemove.Name} - member not found");
+}
+
+// Check current members
+var currentMembers = group.GetMembers();
+Console.WriteLine($"Current member count: {currentMembers.Length}");
+foreach (var member in currentMembers)
+{
+ Console.WriteLine($" - {member.Name} (Weight: {member.Weight}, Connected: {member.IsConnected})");
+}
+```
+
+### Adding Members During Maintenance
+
+Add a new datacenter before removing an old one for zero-downtime migrations:
+
+```csharp
+var group = (IConnectionGroup)conn;
+
+// Add the new datacenter
+var newDC = new ConnectionGroupMember("new-location.redis.example.com:6379", name: "New Location")
+{
+ Weight = 10 // High weight to prefer the new location
+};
+await group.AddAsync(newDC);
+
+// Wait for the new member to be fully connected and healthy
+await Task.Delay(TimeSpan.FromSeconds(5));
+
+if (newDC.IsConnected)
+{
+ Console.WriteLine("New datacenter is online and healthy");
+
+ // Reduce weight of old datacenter
+ var oldDC = members[0];
+ oldDC.Weight = 1;
+
+ // Wait for traffic to shift
+ await Task.Delay(TimeSpan.FromSeconds(10));
+
+ // Remove the old datacenter
+ if (group.Remove(oldDC))
+ {
+ Console.WriteLine("Old datacenter removed successfully");
+ }
+}
+```
diff --git a/docs/exp/SER004.md b/docs/exp/SER004.md
new file mode 100644
index 000000000..ee5c3017b
--- /dev/null
+++ b/docs/exp/SER004.md
@@ -0,0 +1,17 @@
+# RESPite
+
+RESPite is an experimental library that provides high-performance low-level RESP (Redis, etc) parsing and serialization.
+It is used as the IO core for StackExchange.Redis v3+. You should not (yet) use it directly unless you have a very
+good reason to do so.
+
+To suppress this message, add the following to your `csproj` file:
+
+```xml
+$(NoWarn);SER004
+```
+
+or more granularly / locally in C#:
+
+``` c#
+#pragma warning disable SER004
+```
\ No newline at end of file
diff --git a/docs/exp/SER005.md b/docs/exp/SER005.md
new file mode 100644
index 000000000..f0d29b742
--- /dev/null
+++ b/docs/exp/SER005.md
@@ -0,0 +1,23 @@
+# Unit Testing
+
+Unit testing is great! Yay, do more of that!
+
+This type is provided for external unit testing, in particular by people using modules or server features
+not directly implemented by SE.Redis - for example to verify messsage parsing or formatting without
+talking to a RESP server.
+
+These types are considered slightly more... *mercurial*. We encourage you to use them, but *occasionally*
+(not just for fun) you might need to update your test code if we tweak something. This should not impact
+"real" library usage.
+
+To suppress this message, add the following to your `csproj` file:
+
+```xml
+$(NoWarn);SER005
+```
+
+or more granularly / locally in C#:
+
+``` c#
+#pragma warning disable SER005
+```
\ No newline at end of file
diff --git a/docs/exp/SER007.md b/docs/exp/SER007.md
new file mode 100644
index 000000000..7d7f64f5a
--- /dev/null
+++ b/docs/exp/SER007.md
@@ -0,0 +1,15 @@
+# Active:Active
+
+This feature is typically used to provide geo-redundant services; please see [full docs](/ActiveActive).
+
+To suppress this message, add the following to your `csproj` file:
+
+```xml
+$(NoWarn);SER007
+```
+
+or more granularly / locally in C#:
+
+``` c#
+#pragma warning disable SER007
+```
\ No newline at end of file
diff --git a/docs/index.md b/docs/index.md
index 0a2e6c721..144a5d7fe 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -35,6 +35,7 @@ Documentation
- [Basic Usage](Basics) - getting started and basic usage
- [Async Timeouts](AsyncTimeouts) - async timeouts and cancellation
- [Configuration](Configuration) - options available when connecting to redis
+- [Active:Active](ActiveActive) - connecting to multiple Redis endpoints for high availability
- [Pipelines and Multiplexers](PipelinesMultiplexers) - what is a multiplexer?
- [Keys, Values and Channels](KeysValues) - discusses the data-types used on the API
- [Transactions](Transactions) - how atomic transactions work in redis
diff --git a/src/RESPite/Shared/Experiments.cs b/src/RESPite/Shared/Experiments.cs
index 1d9a091fa..617934f66 100644
--- a/src/RESPite/Shared/Experiments.cs
+++ b/src/RESPite/Shared/Experiments.cs
@@ -13,6 +13,7 @@ internal static class Experiments
public const string Respite = "SER004";
public const string UnitTesting = "SER005";
public const string Server_8_8 = "SER006";
+ public const string ActiveActive = "SER007";
// ReSharper restore InconsistentNaming
}
}
diff --git a/src/StackExchange.Redis/ChannelMessageQueue.cs b/src/StackExchange.Redis/ChannelMessageQueue.cs
index 65cd170b9..55147d6b0 100644
--- a/src/StackExchange.Redis/ChannelMessageQueue.cs
+++ b/src/StackExchange.Redis/ChannelMessageQueue.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
@@ -34,7 +35,7 @@ public sealed class ChannelMessageQueue : IAsyncEnumerable
///
public Task Completion => _queue.Reader.Completion;
- internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber parent)
+ internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber? parent)
{
Channel = redisChannel;
_parent = parent;
@@ -48,8 +49,22 @@ internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber paren
private void Write(in RedisChannel channel, in RedisValue value)
{
- var writer = _queue.Writer;
- writer.TryWrite(new ChannelMessage(this, channel, value));
+ try
+ {
+ _queue.Writer.TryWrite(new ChannelMessage(this, channel, value));
+ }
+ catch (Exception ex)
+ {
+ Debug.WriteLine("pub/sub ChannelWrite.TryWrite failed: " + ex.Message);
+ }
+ }
+
+ internal void SynchronizedWrite(in RedisChannel channel, in RedisValue value)
+ {
+ lock (this)
+ {
+ Write(channel, value);
+ }
}
///
@@ -326,4 +341,7 @@ public async IAsyncEnumerator GetAsyncEnumerator(CancellationTok
}
}
#endif
+
+ internal ValueTask WaitToReadAsync(CancellationToken cancellationToken = default)
+ => _queue.Reader.WaitToReadAsync(cancellationToken);
}
diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs
index 641fccc95..e370051fa 100644
--- a/src/StackExchange.Redis/ConfigurationOptions.cs
+++ b/src/StackExchange.Redis/ConfigurationOptions.cs
@@ -40,6 +40,12 @@ public static int ParseInt32(string key, string value, int minValue = int.MinVal
return tmp;
}
+ public static float ParseSingle(string key, string value)
+ {
+ if (!Format.TryParseDouble(value, out double tmp)) throw new ArgumentOutOfRangeException(key, $"Keyword '{key}' requires a numeric value; the value '{value}' is not recognised.");
+ return (float)tmp;
+ }
+
internal static bool ParseBoolean(string key, string value)
{
if (!Format.TryParseBoolean(value, out bool tmp)) throw new ArgumentOutOfRangeException(key, $"Keyword '{key}' requires a boolean value; the value '{value}' is not recognised.");
@@ -944,9 +950,9 @@ public string ToString(bool includePassword)
};
}
- private static void Append(StringBuilder sb, object value)
+ private static void Append(StringBuilder sb, object? value)
{
- if (value == null) return;
+ if (value is null) return;
string s = Format.ToString(value);
if (!string.IsNullOrWhiteSpace(s))
{
@@ -957,7 +963,8 @@ private static void Append(StringBuilder sb, object value)
private static void Append(StringBuilder sb, string prefix, object? value)
{
- string? s = value?.ToString();
+ if (value is null) return;
+ string? s = value.ToString();
if (!string.IsNullOrWhiteSpace(s))
{
if (sb.Length != 0) sb.Append(',');
diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs
index 36f459f14..fd898befb 100644
--- a/src/StackExchange.Redis/ConnectionMultiplexer.cs
+++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs
@@ -1038,7 +1038,7 @@ public void UnRoot(int token)
}
}
- private void OnHeartbeat()
+ internal void OnHeartbeat()
{
try
{
@@ -1131,7 +1131,7 @@ public IDatabase GetDatabase(int db = -1, object? asyncState = null)
}
// DB zero is stored separately, since 0-only is a massively common use-case
- private const int MaxCachedDatabaseInstance = 16; // 17 items - [0,16]
+ internal const int MaxCachedDatabaseInstance = 16; // 17 items - [0,16]
// Side note: "databases 16" is the default in redis.conf; happy to store one extra to get nice alignment etc
private IDatabase? dbCacheZero;
private IDatabase[]? dbCacheLow;
@@ -1284,6 +1284,8 @@ public long OperationCount
}
}
+ internal uint LatencyTicks { get; private set; } = uint.MaxValue;
+
// note that the RedisChannel->byte[] converter is always direct, so this is not an alloc
// (we deal with channels far less frequently, so pay the encoding cost up-front)
internal byte[] ChannelPrefix => ((byte[]?)RawConfig.ChannelPrefix) ?? [];
@@ -2363,5 +2365,29 @@ private Task[] QuitAllServers()
long? IInternalConnectionMultiplexer.GetConnectionId(EndPoint endpoint, ConnectionType type)
=> GetServerEndPoint(endpoint)?.GetBridge(type)?.ConnectionId;
+
+ internal uint UpdateLatency()
+ {
+ var snapshot = GetServerSnapshot();
+ uint max = uint.MaxValue;
+ foreach (var server in snapshot)
+ {
+ if (server.IsConnected)
+ {
+ var latency = server.LatencyTicks;
+ if (max is uint.MaxValue || latency > max)
+ {
+ max = latency;
+ }
+ }
+ }
+
+ if (max != uint.MaxValue)
+ {
+ LatencyTicks = max;
+ }
+
+ return LatencyTicks;
+ }
}
}
diff --git a/src/StackExchange.Redis/HealthCheck.Execute.cs b/src/StackExchange.Redis/HealthCheck.Execute.cs
new file mode 100644
index 000000000..0f234f776
--- /dev/null
+++ b/src/StackExchange.Redis/HealthCheck.Execute.cs
@@ -0,0 +1,62 @@
+using System;
+using System.Diagnostics;
+using System.Net;
+using System.Threading.Tasks;
+
+namespace StackExchange.Redis;
+
+public sealed partial class HealthCheck
+{
+ ///
+ /// Evaluate the health of an endpoint.
+ ///
+ public async Task CheckHealthAsync(IConnectionMultiplexer multiplexer, EndPoint endpoint)
+ {
+ try
+ {
+ int timeout = (int)ProbeTimeout.TotalMilliseconds, success = 0, failure = 0, remaining = ProbeCount;
+ while (remaining > 0)
+ {
+ HealthCheckResult probeResult;
+ try
+ {
+ var pendingProbe = Probe.CheckHealthAsync(this, multiplexer, endpoint);
+ probeResult = await pendingProbe.TimeoutAfter(timeout).ForAwait()
+ ? await pendingProbe.ForAwait() // completed
+ : HealthCheckResult.Unhealthy; // timeout
+ }
+ catch
+ {
+ probeResult = HealthCheckResult.Unhealthy;
+ }
+
+ // update success/failure counts
+ switch (probeResult)
+ {
+ case HealthCheckResult.Healthy: success++; break;
+ case HealthCheckResult.Unhealthy: failure++; break;
+ }
+ HealthCheckProbeContext ctx = new(success, failure, --remaining);
+
+ // evaluate the policy
+ var policyResult = ProbePolicy.Evaluate(ctx);
+ if (policyResult != HealthCheckResult.Inconclusive) return policyResult;
+
+ if (probeResult is HealthCheckResult.Unhealthy && remaining > 0)
+ {
+ // delay if appropriate
+ await Task.Delay(ProbeInterval).ConfigureAwait(false);
+ }
+ }
+
+ // we got here without a result
+ return HealthCheckResult.Inconclusive;
+ }
+ catch (Exception ex)
+ {
+ // if the health check utterly fails: that isn't a good sign
+ Debug.WriteLine(ex.Message);
+ return HealthCheckResult.Unhealthy;
+ }
+ }
+}
diff --git a/src/StackExchange.Redis/HealthCheck.HealthCheckProbe.cs b/src/StackExchange.Redis/HealthCheck.HealthCheckProbe.cs
new file mode 100644
index 000000000..f397b84d4
--- /dev/null
+++ b/src/StackExchange.Redis/HealthCheck.HealthCheckProbe.cs
@@ -0,0 +1,70 @@
+using System.Diagnostics;
+using System.Net;
+using System.Threading.Tasks;
+
+namespace StackExchange.Redis;
+
+public sealed partial class HealthCheck
+{
+ ///
+ /// Describes an operation to perform as part of a health check.
+ ///
+ public abstract partial class HealthCheckProbe
+ {
+ ///
+ /// Check the health of the specified endpoint.
+ ///
+ public abstract Task CheckHealthAsync(HealthCheck healthCheck, IConnectionMultiplexer multiplexer, EndPoint endpoint);
+
+ private static Task? _inconclusive;
+
+ ///
+ /// Reports a probe that was skipped without being evaluated.
+ ///
+ protected static Task Inconclusive => _inconclusive ??= Task.FromResult(HealthCheckResult.Inconclusive);
+ }
+
+ ///
+ /// Describes a key-based (write) operation to perform as part of a health check.
+ ///
+ public abstract class KeyWriteHealthCheckProbe : HealthCheckProbe
+ {
+ ///
+ public override Task CheckHealthAsync(HealthCheck healthCheck, IConnectionMultiplexer multiplexer, EndPoint endpoint)
+ {
+ var server = multiplexer.GetServer(endpoint);
+ if (server.IsReplica) return Inconclusive;
+
+ RedisKey key = server.InventKey("health-check/"u8);
+ if (key.IsNull) return Inconclusive;
+ Debug.Assert(multiplexer.GetServer(key).EndPoint == endpoint, "Key was not routed to the correct endpoint");
+ return CheckHealthAsync(healthCheck, multiplexer.GetDatabase(), key);
+ }
+
+ ///
+ /// Check the health of the specified database using the provided key.
+ ///
+ public abstract Task CheckHealthAsync(HealthCheck healthCheck, IDatabaseAsync database, RedisKey key);
+ }
+
+ ///
+ /// Indicates the result of a health check.
+ ///
+ public enum HealthCheckResult
+ {
+ ///
+ /// The health check was skipped or could not be determined.
+ ///
+ Inconclusive,
+
+ ///
+ /// The health check was successful.
+ ///
+ Healthy,
+
+ ///
+ /// The health check failed.
+ ///
+ Unhealthy,
+ }
+}
diff --git a/src/StackExchange.Redis/HealthCheck.HealthCheckProbeContext.cs b/src/StackExchange.Redis/HealthCheck.HealthCheckProbeContext.cs
new file mode 100644
index 000000000..59667db8d
--- /dev/null
+++ b/src/StackExchange.Redis/HealthCheck.HealthCheckProbeContext.cs
@@ -0,0 +1,28 @@
+namespace StackExchange.Redis;
+
+public sealed partial class HealthCheck
+{
+ ///
+ /// Represents the context of a health check probe.
+ ///
+ public readonly struct HealthCheckProbeContext(int success, int failure, int remaining)
+ {
+ ///
+ public override string ToString() => $"Success: {Success}, Failure: {Failure}, Remaining: {Remaining}";
+
+ ///
+ /// Gets the number of successful health checks.
+ ///
+ public int Success => success;
+
+ ///
+ /// Gets the number of failed health checks.
+ ///
+ public int Failure => failure;
+
+ ///
+ /// Gets the number of remaining health checks.
+ ///
+ public int Remaining => remaining;
+ }
+}
diff --git a/src/StackExchange.Redis/HealthCheck.HealthCheckProbePolicy.cs b/src/StackExchange.Redis/HealthCheck.HealthCheckProbePolicy.cs
new file mode 100644
index 000000000..cb900735f
--- /dev/null
+++ b/src/StackExchange.Redis/HealthCheck.HealthCheckProbePolicy.cs
@@ -0,0 +1,107 @@
+namespace StackExchange.Redis;
+
+public sealed partial class HealthCheck
+{
+ ///
+ /// Attempt to evaluate the outcome of a series of health check operations.
+ ///
+ public abstract class HealthCheckProbePolicy
+ {
+ ///
+ /// Attempt to evaluate the policy given the current context.
+ ///
+ /// The state of the probes so far.
+ /// The result of the policy evaluation.
+ public abstract HealthCheckResult Evaluate(in HealthCheckProbeContext context);
+
+ ///
+ /// Require all probes to succeed.
+ ///
+ public static HealthCheckProbePolicy AllSuccess => AllSuccessHealthCheckProbePolicy.Instance;
+
+ ///
+ /// Require at least one probe to succeed.
+ ///
+ public static HealthCheckProbePolicy AnySuccess => AnySuccessHealthCheckProbePolicy.Instance;
+
+ ///
+ /// Require a majority of probes to succeed.
+ ///
+ public static HealthCheckProbePolicy MajoritySuccess => MajoritySuccessHealthCheckProbePolicy.Instance;
+
+ private sealed class AllSuccessHealthCheckProbePolicy : HealthCheckProbePolicy
+ {
+ public static readonly AllSuccessHealthCheckProbePolicy Instance = new();
+ private AllSuccessHealthCheckProbePolicy() { }
+
+ public override HealthCheckResult Evaluate(in HealthCheckProbeContext context)
+ {
+ // Fail as soon as we have any failure
+ if (context.Failure > 0)
+ {
+ return HealthCheckResult.Unhealthy;
+ }
+
+ // Succeed only when all probes have succeeded (no remaining)
+ if (context.Remaining == 0)
+ {
+ return HealthCheckResult.Healthy;
+ }
+
+ // Can't determine yet
+ return HealthCheckResult.Inconclusive;
+ }
+ }
+
+ private sealed class AnySuccessHealthCheckProbePolicy : HealthCheckProbePolicy
+ {
+ public static readonly AnySuccessHealthCheckProbePolicy Instance = new();
+ private AnySuccessHealthCheckProbePolicy() { }
+
+ public override HealthCheckResult Evaluate(in HealthCheckProbeContext context)
+ {
+ // Succeed as soon as we have any success
+ if (context.Success > 0)
+ {
+ return HealthCheckResult.Healthy;
+ }
+
+ // Fail only when all probes have failed (no remaining)
+ if (context.Remaining == 0)
+ {
+ return HealthCheckResult.Unhealthy;
+ }
+
+ // Can't determine yet
+ return HealthCheckResult.Inconclusive;
+ }
+ }
+
+ private sealed class MajoritySuccessHealthCheckProbePolicy : HealthCheckProbePolicy
+ {
+ public static readonly MajoritySuccessHealthCheckProbePolicy Instance = new();
+ private MajoritySuccessHealthCheckProbePolicy() { }
+
+ public override HealthCheckResult Evaluate(in HealthCheckProbeContext context)
+ {
+ int total = context.Success + context.Failure + context.Remaining;
+ int majority = (total / 2) + 1;
+
+ // Succeed as soon as we have enough successes for a majority
+ if (context.Success >= majority)
+ {
+ return HealthCheckResult.Healthy;
+ }
+
+ // Fail as soon as we have enough failures to make a majority impossible
+ if (context.Failure >= majority)
+ {
+ return HealthCheckResult.Unhealthy;
+ }
+
+ // Can't determine yet
+ return HealthCheckResult.Inconclusive;
+ }
+ }
+ }
+}
diff --git a/src/StackExchange.Redis/HealthCheck.PingProbe.cs b/src/StackExchange.Redis/HealthCheck.PingProbe.cs
new file mode 100644
index 000000000..822010235
--- /dev/null
+++ b/src/StackExchange.Redis/HealthCheck.PingProbe.cs
@@ -0,0 +1,28 @@
+using System.Net;
+using System.Threading.Tasks;
+
+namespace StackExchange.Redis;
+
+public sealed partial class HealthCheck
+{
+ public partial class HealthCheckProbe
+ {
+ ///
+ /// Verify that the server is responsive by sending a PING command.
+ ///
+ public static HealthCheckProbe Ping => PingProbe.Instance;
+ }
+
+ private sealed class PingProbe : HealthCheckProbe
+ {
+ public static PingProbe Instance { get; } = new();
+ private PingProbe() { }
+
+ public override async Task CheckHealthAsync(HealthCheck healthCheck, IConnectionMultiplexer multiplexer, EndPoint endpoint)
+ {
+ var server = multiplexer.GetServer(endpoint);
+ await server.PingAsync();
+ return HealthCheckResult.Healthy;
+ }
+ }
+}
diff --git a/src/StackExchange.Redis/HealthCheck.StringSetProbe.cs b/src/StackExchange.Redis/HealthCheck.StringSetProbe.cs
new file mode 100644
index 000000000..256ddc055
--- /dev/null
+++ b/src/StackExchange.Redis/HealthCheck.StringSetProbe.cs
@@ -0,0 +1,59 @@
+using System;
+using System.Buffers;
+using System.Threading.Tasks;
+
+namespace StackExchange.Redis;
+
+public sealed partial class HealthCheck
+{
+ public partial class HealthCheckProbe
+ {
+ ///
+ /// Verify that a string can be successfully set and retrieved.
+ ///
+ public static HealthCheckProbe StringSet => StringSetProbe.Instance;
+ }
+
+ internal sealed class StringSetProbe : KeyWriteHealthCheckProbe
+ {
+ public static StringSetProbe Instance { get; } = new();
+ private StringSetProbe() { }
+
+#if !NET
+ private static Random SharedRandom { get; } = new();
+#endif
+
+ public override async Task CheckHealthAsync(HealthCheck healthCheck, IDatabaseAsync database, RedisKey key)
+ {
+ // note we use the lock API here because that can selectively choose between appropriate strategies for
+ // different server versions, including DELEX
+ const int LEN = 16;
+ var pooled = ArrayPool.Shared.Rent(LEN);
+#if NET
+ Random.Shared.NextBytes(pooled.AsSpan(0, LEN));
+#else
+ SharedRandom.NextBytes(pooled);
+#endif
+ var payload = (RedisValue)pooled.AsMemory(0, LEN);
+ Lease? lease = null;
+ try
+ {
+ // write a value to the db
+ await database.LockTakeAsync(
+ key: key,
+ value: payload,
+ expiry: healthCheck.ProbeTimeout,
+ flags: CommandFlags.FireAndForget).ForAwait();
+
+ // release from the db if matches (otherwise, we have no clue what happened, so: leave alone)
+ var success = await database.LockReleaseAsync(key, payload).ForAwait();
+ return success ? HealthCheckResult.Healthy : HealthCheckResult.Unhealthy;
+ }
+ finally
+ {
+ ArrayPool.Shared.Return(pooled);
+ lease?.Dispose();
+ }
+ }
+ }
+}
diff --git a/src/StackExchange.Redis/HealthCheck.cs b/src/StackExchange.Redis/HealthCheck.cs
new file mode 100644
index 000000000..48217b68b
--- /dev/null
+++ b/src/StackExchange.Redis/HealthCheck.cs
@@ -0,0 +1,103 @@
+using System;
+
+namespace StackExchange.Redis;
+
+///
+/// Describes a health check to perform against instances.
+///
+public sealed partial class HealthCheck
+{
+ internal HealthCheck(
+ TimeSpan interval,
+ int probeCount,
+ TimeSpan probeTimeout,
+ TimeSpan probeInterval,
+ HealthCheckProbe probe,
+ HealthCheckProbePolicy healthCheckProbePolicy)
+ {
+ Interval = interval;
+ ProbeCount = probeCount;
+ ProbeTimeout = probeTimeout;
+ ProbeInterval = probeInterval;
+ Probe = probe;
+ ProbePolicy = healthCheckProbePolicy;
+ }
+
+ ///
+ /// Gets the interval at which health checks should be performed.
+ ///
+ public TimeSpan Interval { get; }
+
+ ///
+ /// Gets the number of probes to perform for this health check.
+ ///
+ public int ProbeCount { get; }
+
+ ///
+ /// Gets the time that should be allowed for an individual probe to complete.
+ ///
+ public TimeSpan ProbeTimeout { get; }
+
+ ///
+ /// Gets the interval between failed probes.
+ ///
+ public TimeSpan ProbeInterval { get; }
+
+ ///
+ /// Gets the probe to use for this health check.
+ ///
+ public HealthCheckProbe Probe { get; }
+
+ ///
+ /// Gets the policy to use for this health check.
+ ///
+ public HealthCheckProbePolicy ProbePolicy { get; }
+
+ ///
+ /// Create a builder base on this health check.
+ ///
+ public HealthCheckBuilder Builder() => new()
+ {
+ Interval = Interval,
+ ProbeCount = ProbeCount,
+ ProbeTimeout = ProbeTimeout,
+ ProbeInterval = ProbeInterval,
+ Probe = Probe,
+ ProbePolicy = ProbePolicy,
+ };
+
+ ///
+ /// Allows configuration of a .
+ ///
+ public class HealthCheckBuilder
+ {
+ ///
+ /// Create a from this builder.
+ ///
+ public HealthCheck Build() => new(
+ Interval,
+ ProbeCount,
+ ProbeTimeout,
+ ProbeInterval,
+ Probe,
+ ProbePolicy);
+
+ ///
+ public TimeSpan Interval { get; set; } = TimeSpan.FromSeconds(10);
+
+ ///
+ public int ProbeCount { get; set; } = 3;
+
+ ///
+ public TimeSpan ProbeTimeout { get; set; } = TimeSpan.FromSeconds(2);
+
+ ///
+ public TimeSpan ProbeInterval { get; set; } = TimeSpan.FromSeconds(1);
+
+ ///
+ public HealthCheckProbe Probe { get; set; } = HealthCheckProbe.Ping;
+
+ ///
+ public HealthCheckProbePolicy ProbePolicy { get; set; } = HealthCheckProbePolicy.AnySuccess;
+ }
+}
diff --git a/src/StackExchange.Redis/Interfaces/IConnectionGroup.cs b/src/StackExchange.Redis/Interfaces/IConnectionGroup.cs
new file mode 100644
index 000000000..9d2477257
--- /dev/null
+++ b/src/StackExchange.Redis/Interfaces/IConnectionGroup.cs
@@ -0,0 +1,110 @@
+using System;
+using System.Diagnostics.CodeAnalysis;
+using System.IO;
+using System.Text;
+using System.Threading.Tasks;
+using RESPite;
+
+// ReSharper disable once CheckNamespace
+namespace StackExchange.Redis;
+
+///
+/// A group of connections to redis servers, that manages connections to multiple
+/// servers, routing traffic based on the availability of the servers and their
+/// relative .
+///
+[Experimental(Experiments.ActiveActive, UrlFormat = Experiments.UrlFormat)]
+public interface IConnectionGroup : IConnectionMultiplexer
+{
+ ///
+ /// A change occured to one of the connection groups.
+ ///
+ event EventHandler? ConnectionChanged;
+
+ ///
+ /// Adds a new member to the group.
+ ///
+ Task AddAsync(ConnectionGroupMember group, TextWriter? log = null);
+
+ ///
+ /// Removes a member from the group.
+ ///
+ bool Remove(ConnectionGroupMember group);
+
+ ///
+ /// Get the members of the group.
+ ///
+ ReadOnlySpan GetMembers();
+}
+
+///
+/// Represents a change to a connection group.
+///
+[Experimental(Experiments.ActiveActive, UrlFormat = Experiments.UrlFormat)]
+public class GroupConnectionChangedEventArgs(GroupConnectionChangedEventArgs.ChangeType type, ConnectionGroupMember group, ConnectionGroupMember? previousGroup = null) : EventArgs, ICompletable
+{
+ ///
+ /// The group relating to the change. For , this is the new group.
+ ///
+ public ConnectionGroupMember Group => group;
+
+ ///
+ /// The previous group relating to the change, if applicable.
+ ///
+ public ConnectionGroupMember? PreviousGroup => previousGroup;
+
+ ///
+ /// The type of change that occurred.
+ ///
+ public ChangeType Type => type;
+
+ private EventHandler? _handler;
+ private object? _sender;
+
+ ///
+ /// The type of change that occurred.
+ ///
+ public enum ChangeType
+ {
+ ///
+ /// Unused.
+ ///
+ Unknown = 0,
+
+ ///
+ /// A new connection group was added.
+ ///
+ Added = 1,
+
+ ///
+ /// A connection group was removed.
+ ///
+ Removed = 2,
+
+ ///
+ /// A connection group became disconnected.
+ ///
+ Disconnected = 3,
+
+ ///
+ /// A connection group became reconnected.
+ ///
+ Reconnected = 4,
+
+ ///
+ /// The active connection group changed, changing how traffic is routed.
+ ///
+ ActiveChanged = 5,
+ }
+
+ internal void CompleteAsWorker(EventHandler handler, object sender)
+ {
+ _handler = handler;
+ _sender = sender;
+ ConnectionMultiplexer.CompleteAsWorker(this);
+ }
+
+ void ICompletable.AppendStormLog(StringBuilder sb) { }
+
+ bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(_handler, _sender!, this, isAsync);
+}
diff --git a/src/StackExchange.Redis/MultiGroupDatabase.Async.cs b/src/StackExchange.Redis/MultiGroupDatabase.Async.cs
new file mode 100644
index 000000000..03706f355
--- /dev/null
+++ b/src/StackExchange.Redis/MultiGroupDatabase.Async.cs
@@ -0,0 +1,66 @@
+using System;
+using System.Net;
+using System.Threading.Tasks;
+
+namespace StackExchange.Redis;
+
+internal sealed partial class MultiGroupDatabase
+{
+ // Async methods - Core operations
+ public Task DebugObjectAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
+ => GetActiveDatabase().DebugObjectAsync(key, flags);
+
+ public Task IdentifyEndpointAsync(RedisKey key = default, CommandFlags flags = CommandFlags.None)
+ => GetActiveDatabase().IdentifyEndpointAsync(key, flags);
+
+ public Task KeyMigrateAsync(RedisKey key, EndPoint toServer, int toDatabase = 0, int timeoutMilliseconds = 0, MigrateOptions migrateOptions = MigrateOptions.None, CommandFlags flags = CommandFlags.None)
+ => GetActiveDatabase().KeyMigrateAsync(key, toServer, toDatabase, timeoutMilliseconds, migrateOptions, flags);
+
+ public Task PingAsync(CommandFlags flags = CommandFlags.None)
+ => GetActiveDatabase().PingAsync(flags);
+
+ public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
+ => GetActiveDatabase().PublishAsync(channel, message, flags);
+
+ public Task ExecuteAsync(string command, params object[] args)
+ => GetActiveDatabase().ExecuteAsync(command, args);
+
+ public Task ExecuteAsync(string command, System.Collections.Generic.ICollection