Skip to content

Commit 7ebdc4a

Browse files
committed
invent key implementation and validation using in-proc server (needed NX/EX support)
1 parent ff7f3d7 commit 7ebdc4a

7 files changed

Lines changed: 222 additions & 24 deletions

File tree

src/StackExchange.Redis/RedisServer.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,13 @@ public bool AllowReplicaWrites
5656
public RedisKey InventKey(RedisKey prefix = default)
5757
{
5858
var guid = Guid.NewGuid();
59-
throw new NotImplementedException();
59+
if (server.ServerType is ServerType.Cluster)
60+
{
61+
var hashTag = multiplexer.ServerSelectionStrategy.GetHashTag(server);
62+
if (string.IsNullOrEmpty(hashTag)) return RedisKey.Null;
63+
return prefix.Append($"{guid}:{{{hashTag}}}");
64+
}
65+
return prefix.Append(guid.ToString());
6066
}
6167

6268
public void ClientKill(EndPoint endpoint, CommandFlags flags = CommandFlags.None)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
using System;
2+
using System.Diagnostics;
3+
using System.Text;
4+
5+
namespace StackExchange.Redis;
6+
7+
internal sealed partial class ServerSelectionStrategy
8+
{
9+
// pre-computed hash-tags for each slot
10+
private static class HashTags
11+
{
12+
private static readonly string[] Cache = Populate();
13+
public static ReadOnlySpan<string> Tags => Cache;
14+
public static string Get(int slot) => Cache[slot];
15+
16+
private static string[] Populate()
17+
{
18+
// Via testing, we know that 3 characters is sufficient to populate all slots
19+
// using a total of 48643 operations - this is acceptable (same order-of-magnitude as the slot count).
20+
var slots = new string?[TotalSlots];
21+
22+
// using an alphabet of the visible ASCII characters, excluding { and } (used to denote hash-tags)
23+
ReadOnlySpan<byte> alphabet = "!\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz|~"u8;
24+
Debug.WriteLine($"Alphabet: '{Encoding.ASCII.GetString(alphabet)}', {alphabet.Length} chars");
25+
26+
Span<byte> threeChars = stackalloc byte[3];
27+
Span<byte> twoChars = threeChars.Slice(0, 2);
28+
Span<byte> oneChar = threeChars.Slice(0, 1);
29+
30+
int operations = 0;
31+
int remaining = slots.Length;
32+
33+
bool Test(ReadOnlySpan<byte> span)
34+
{
35+
var slot = GetClusterSlot(span);
36+
operations++;
37+
if (slots[slot] is { } existing)
38+
{
39+
// prefer smaller tags (but doesn't change the outcome)
40+
if (span.Length < existing.Length)
41+
{
42+
slots[slot] = Encoding.ASCII.GetString(span);
43+
}
44+
}
45+
else
46+
{
47+
// new value for this slot
48+
slots[slot] = Encoding.ASCII.GetString(span);
49+
return --remaining == 0;
50+
}
51+
52+
return false;
53+
}
54+
for (int i = 0; i < alphabet.Length; i++)
55+
{
56+
oneChar[0] = alphabet[i];
57+
58+
// Test single character keys
59+
if (i == 0) Test(oneChar);
60+
61+
for (int j = 0; j < alphabet.Length; j++)
62+
{
63+
twoChars[1] = alphabet[j];
64+
65+
// Test two characters keys
66+
if (i == 0) Test(twoChars);
67+
68+
for (int k = 0; k < alphabet.Length; k++)
69+
{
70+
threeChars[2] = alphabet[k];
71+
72+
// Test three characters - we know this is the only possible exit location
73+
if (Test(threeChars))
74+
{
75+
Debug.WriteLine($"Populated all hash-tag slots in {operations} operations");
76+
return slots!;
77+
}
78+
}
79+
}
80+
}
81+
82+
throw new InvalidOperationException(
83+
$"Failed to populate hash-tag cache after {operations} operations, {remaining} slots remaining");
84+
}
85+
}
86+
}

src/StackExchange.Redis/ServerSelectionStrategy.cs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace StackExchange.Redis
88
{
9-
internal sealed class ServerSelectionStrategy
9+
internal sealed partial class ServerSelectionStrategy
1010
{
1111
public const int NoSlot = -1, MultipleSlots = -2;
1212
private const int RedisClusterSlotCount = 16384;
@@ -404,5 +404,29 @@ internal bool CanServeSlot(ServerEndPoint server, int slot)
404404
}
405405
return false;
406406
}
407+
408+
/// <summary>
409+
/// Gets a string that can be used as a hash-tag to reference a specific slot.
410+
/// </summary>
411+
internal string GetHashTag(ServerEndPoint endpoint)
412+
{
413+
if (map is { } arr)
414+
{
415+
// inefficient way of finding a slot for a given endpoint, but: it'll work
416+
for (int i = 0; i < arr.Length; i++)
417+
{
418+
if (arr[i] == endpoint)
419+
{
420+
return HashTags.Get(i);
421+
}
422+
}
423+
}
424+
return "";
425+
}
426+
427+
/// <summary>
428+
/// Gets a string that can be used as a hash-tag to reference a specific slot.
429+
/// </summary>
430+
internal static string GetHashTag(int slot) => slot < 0 ? "" : HashTags.Get(slot);
407431
}
408432
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using Xunit;
5+
6+
namespace StackExchange.Redis.Tests;
7+
8+
public class HashTagUnitTests
9+
{
10+
[Fact]
11+
public void TestHashTagCoverage()
12+
{
13+
HashSet<string> uniques = [];
14+
Assert.Equal("", ServerSelectionStrategy.GetHashTag(ServerSelectionStrategy.NoSlot));
15+
Assert.Equal("", ServerSelectionStrategy.GetHashTag(ServerSelectionStrategy.MultipleSlots));
16+
Span<byte> buffer = stackalloc byte[3];
17+
for (int i = 0; i < ServerSelectionStrategy.TotalSlots; i++)
18+
{
19+
var tag = ServerSelectionStrategy.GetHashTag(i);
20+
Assert.False(string.IsNullOrEmpty(tag));
21+
Assert.True(uniques.Add(tag));
22+
23+
var len = Encoding.ASCII.GetBytes(tag, buffer);
24+
var slot = ServerSelectionStrategy.GetClusterSlot(buffer.Slice(0, len));
25+
Assert.Equal(i, slot);
26+
}
27+
Assert.Equal(ServerSelectionStrategy.TotalSlots, uniques.Count);
28+
}
29+
}

tests/StackExchange.Redis.Tests/MultiGroupTests/BasicMultiGroupTests.cs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,23 +91,48 @@ public Task WriteSeen(string source, ChannelMessageQueue queue) =>
9191
}
9292
protected TextWriter Log { get; } = new TextWriterOutputHelper(log);
9393

94-
[Fact]
95-
public async Task SelectByWeight()
94+
public enum InbuiltProbe
95+
{
96+
IsConnected,
97+
Ping,
98+
StringSet,
99+
}
100+
101+
[Theory]
102+
[InlineData(InbuiltProbe.IsConnected, ServerType.Standalone)]
103+
[InlineData(InbuiltProbe.Ping, ServerType.Standalone)]
104+
[InlineData(InbuiltProbe.StringSet, ServerType.Standalone)]
105+
[InlineData(InbuiltProbe.IsConnected, ServerType.Cluster)]
106+
[InlineData(InbuiltProbe.Ping, ServerType.Cluster)]
107+
[InlineData(InbuiltProbe.StringSet, ServerType.Cluster)]
108+
public async Task SelectByWeight(InbuiltProbe probe, ServerType serverType)
96109
{
110+
var healthCheck = new HealthCheck
111+
{
112+
Probe = probe switch
113+
{
114+
InbuiltProbe.IsConnected => HealthCheck.HealthCheckProbe.IsConnected,
115+
InbuiltProbe.Ping => HealthCheck.HealthCheckProbe.Ping,
116+
InbuiltProbe.StringSet => HealthCheck.HealthCheckProbe.StringSet,
117+
_ => throw new ArgumentOutOfRangeException(nameof(probe)),
118+
},
119+
};
120+
97121
EndPoint germany = new DnsEndPoint("germany", 6379);
98122
EndPoint canada = new DnsEndPoint("canada", 6379);
99123
EndPoint tokyo = new DnsEndPoint("tokyo", 6379);
100124

101-
using var server0 = new InProcessTestServer(log, endpoint: germany);
102-
using var server1 = new InProcessTestServer(log, endpoint: canada);
103-
using var server2 = new InProcessTestServer(log, endpoint: tokyo);
125+
using var server0 = new InProcessTestServer(log, endpoint: germany) { ServerType = serverType };
126+
using var server1 = new InProcessTestServer(log, endpoint: canada) { ServerType = serverType };
127+
using var server2 = new InProcessTestServer(log, endpoint: tokyo) { ServerType = serverType };
104128

105129
ConnectionGroupMember[] members = [
106130
new(server0.GetClientConfig()) { Weight = 2 },
107131
new(server1.GetClientConfig()) { Weight = 9 },
108132
new(server2.GetClientConfig()) { Weight = 3 },
109133
];
110-
await using var conn = await ConnectionMultiplexer.ConnectGroupAsync(members);
134+
var options = new MultiGroupOptions { HealthCheck = healthCheck };
135+
await using var conn = await ConnectionMultiplexer.ConnectGroupAsync(members, options);
111136
Assert.True(conn.IsConnected);
112137
var typed = Assert.IsType<MultiGroupMultiplexer>(conn);
113138

toys/StackExchange.Redis.Server/MemoryCacheRedisServer.cs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,16 +145,26 @@ protected override RedisValue Get(int database, in RedisKey key)
145145
return RedisValue.Unbox(val);
146146
}
147147

148-
protected override void Set(int database, in RedisKey key, in RedisValue value)
149-
=> GetDb(database)[key] = value.Box();
150-
151-
protected override void SetEx(int database, in RedisKey key, TimeSpan expiration, in RedisValue value)
148+
protected override bool Set(int database, in RedisKey key, in RedisValue value, TimeSpan? expiration = null, SetFlags flags = SetFlags.None)
152149
{
153150
var db = GetDb(database);
151+
switch (flags & (SetFlags.NX | SetFlags.XX))
152+
{
153+
case SetFlags.NX when Exists(database, key): return false;
154+
case SetFlags.XX when !Exists(database, key): return false;
155+
case SetFlags.NX | SetFlags.XX: throw new ArgumentOutOfRangeException(nameof(flags));
156+
}
157+
158+
if (expiration is null)
159+
{
160+
db[key] = value.Box();
161+
return true;
162+
}
154163
var now = Time();
155-
var absolute = now + expiration;
164+
var absolute = now + expiration.Value;
156165
if (absolute <= now) db.Remove(key);
157166
else db[key] = new ExpiringValue(value.Box(), absolute);
167+
return true;
158168
}
159169

160170
protected override bool Del(int database, in RedisKey key)

toys/StackExchange.Redis.Server/RedisServer.cs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ protected virtual TypedRedisValue SetEx(RedisClient client, in RedisRequest requ
352352
RedisKey key = request.GetKey(1);
353353
int seconds = request.GetInt32(2);
354354
var value = request.GetValue(3);
355-
SetEx(client.Database, key, TimeSpan.FromSeconds(seconds), value);
355+
Set(client.Database, key, value, TimeSpan.FromSeconds(seconds));
356356
return TypedRedisValue.OK;
357357
}
358358

@@ -431,12 +431,6 @@ protected virtual TypedRedisValue Exec(RedisClient client, in RedisRequest reque
431431
return results;
432432
}
433433

434-
protected virtual void SetEx(int database, in RedisKey key, TimeSpan timeout, in RedisValue value)
435-
{
436-
Set(database, key, value);
437-
Expire(database, key, timeout);
438-
}
439-
440434
[RedisCommand(3, nameof(RedisCommand.CLIENT), "setname", LockFree = true)]
441435
protected virtual TypedRedisValue ClientSetname(RedisClient client, in RedisRequest request)
442436
{
@@ -967,13 +961,37 @@ protected virtual TypedRedisValue Get(RedisClient client, in RedisRequest reques
967961

968962
protected virtual RedisValue Get(int database, in RedisKey key) => throw new NotSupportedException();
969963

970-
[RedisCommand(3)]
964+
[RedisCommand(-3)]
971965
protected virtual TypedRedisValue Set(RedisClient client, in RedisRequest request)
972966
{
973-
Set(client.Database, request.GetKey(1), request.GetValue(2));
974-
return TypedRedisValue.OK;
967+
TimeSpan? expiry = null;
968+
var key = request.GetKey(1);
969+
var value = request.GetValue(2);
970+
SetFlags flags = SetFlags.None;
971+
for (int i = 3; i < request.Count; i++)
972+
{
973+
if (request.IsString(i, "nx"u8) || request.IsString(i, "NX"u8)) flags |= SetFlags.NX;
974+
else if (request.IsString(i, "xx"u8) || request.IsString(i, "XX"u8)) flags |= SetFlags.XX;
975+
else if (request.IsString(i, "ex"u8) || request.IsString(i, "EX"u8)) expiry = TimeSpan.FromSeconds(request.GetInt32(++i));
976+
else if (request.IsString(i, "px"u8) || request.IsString(i, "PX"u8)) expiry = TimeSpan.FromMilliseconds(request.GetInt32(++i));
977+
else return TypedRedisValue.Error("ERR syntax error");
978+
}
979+
const SetFlags BOTH = SetFlags.NX | SetFlags.XX;
980+
if ((flags & BOTH) == BOTH) return TypedRedisValue.Error("ERR Invalid flags combination");
981+
var result = Set(client.Database, request.GetKey(1), request.GetValue(2), expiry, flags);
982+
return result ? TypedRedisValue.OK : TypedRedisValue.BulkString(RedisValue.Null);
983+
}
984+
985+
[Flags]
986+
public enum SetFlags
987+
{
988+
None = 0,
989+
NX = 1,
990+
XX = 2,
975991
}
976-
protected virtual void Set(int database, in RedisKey key, in RedisValue value) => throw new NotSupportedException();
992+
993+
protected virtual bool Set(int database, in RedisKey key, in RedisValue value, TimeSpan? expiry = null, SetFlags flags = SetFlags.None) => throw new NotSupportedException();
994+
977995
[RedisCommand(1)]
978996
protected new virtual TypedRedisValue Shutdown(RedisClient client, in RedisRequest request)
979997
{

0 commit comments

Comments
 (0)