Skip to content

Commit ac7ab7f

Browse files
authored
More powerful virtual test server for unit tests (#3022)
* wip * Multi-node test infrastructure * respect CROSSSLOT in the toy server * extensively use "in" in the toy server; lots of stack values * implement decrby (for desting from CI) * implement basic TTL to support basic-ops test in primary tests * basic resp3 * - simplify connecting to the test server - include all commands and endpoints when connecting to the test server - better RESP3 aggregate support * run basic tests on RESP2+3 * Fix remaining RESP3 handshake snafus * test fixture for in-proc; run with basic tests * implement everything in multi/exec except the serialize/deserialize (d'oh!) * also force attributes to be pairs (same semantics as maps) * don't buffer any of the transaction commands
1 parent 939cc85 commit ac7ab7f

21 files changed

+1683
-492
lines changed

src/StackExchange.Redis/ClusterConfiguration.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ private SlotRange(short from, short to)
4545
/// </summary>
4646
public int To => to;
4747

48+
internal bool IsSingleSlot => From == To;
49+
4850
internal const int MinSlot = 0, MaxSlot = 16383;
4951

5052
private static SlotRange[]? s_SharedAllSlots;

src/StackExchange.Redis/ConnectionMultiplexer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ internal void CheckMessage(Message message)
356356
}
357357
}
358358

359-
internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved)
359+
internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved, bool isSelf)
360360
{
361361
// If we're being told to re-send something because the hash slot moved, that means our topology is out of date
362362
// ...and we should re-evaluate what's what.
@@ -367,7 +367,7 @@ internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool i
367367
ReconfigureIfNeeded(endpoint, false, "MOVED encountered");
368368
}
369369

370-
return ServerSelectionStrategy.TryResend(hashSlot, message, endpoint, isMoved);
370+
return ServerSelectionStrategy.TryResend(hashSlot, message, endpoint, isMoved, isSelf);
371371
}
372372

373373
/// <summary>

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,30 @@ internal static void WriteMultiBulkHeader(PipeWriter output, long count)
925925
output.Advance(offset);
926926
}
927927

928+
internal static void WriteMultiBulkHeader(PipeWriter output, long count, ResultType type)
929+
{
930+
// *{count}\r\n = 3 + MaxInt32TextLen
931+
var span = output.GetSpan(3 + Format.MaxInt32TextLen);
932+
span[0] = type switch
933+
{
934+
ResultType.Push => (byte)'>',
935+
ResultType.Attribute => (byte)'|',
936+
ResultType.Map => (byte)'%',
937+
ResultType.Set => (byte)'~',
938+
_ => (byte)'*',
939+
};
940+
if ((type is ResultType.Map or ResultType.Attribute) & count > 0)
941+
{
942+
if ((count & 1) != 0) Throw(type, count);
943+
count >>= 1;
944+
static void Throw(ResultType type, long count) => throw new ArgumentOutOfRangeException(
945+
paramName: nameof(count),
946+
message: $"{type} data must be in pairs; got {count}");
947+
}
948+
int offset = WriteRaw(span, count, offset: 1);
949+
output.Advance(offset);
950+
}
951+
928952
[MethodImpl(MethodImplOptions.AggressiveInlining)]
929953
internal static int WriteCrlf(Span<byte> span, int offset)
930954
{

src/StackExchange.Redis/ResultProcessor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
274274
{
275275
// already toast
276276
}
277-
else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
277+
else if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved, isSameEndpoint))
278278
{
279279
bridge.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK");
280280
return false;

src/StackExchange.Redis/ServerSelectionStrategy.cs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal sealed class ServerSelectionStrategy
4646
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
4747
};
4848

49-
private readonly ConnectionMultiplexer multiplexer;
49+
private readonly ConnectionMultiplexer? multiplexer;
5050
private int anyStartOffset = SharedRandom.Next(); // initialize to a random value so routing isn't uniform
5151

5252
#if NET6_0_OR_GREATER
@@ -57,7 +57,7 @@ internal sealed class ServerSelectionStrategy
5757

5858
private ServerEndPoint[]? map;
5959

60-
public ServerSelectionStrategy(ConnectionMultiplexer multiplexer) => this.multiplexer = multiplexer;
60+
public ServerSelectionStrategy(ConnectionMultiplexer? multiplexer) => this.multiplexer = multiplexer;
6161

6262
public ServerType ServerType { get; set; } = ServerType.Standalone;
6363
internal static int TotalSlots => RedisClusterSlotCount;
@@ -96,6 +96,8 @@ public int HashSlot(in RedisKey key)
9696
}
9797
}
9898

99+
private byte[] ChannelPrefix => multiplexer?.ChannelPrefix ?? [];
100+
99101
/// <summary>
100102
/// Computes the hash-slot that would be used by the given channel.
101103
/// </summary>
@@ -106,7 +108,7 @@ public int HashSlot(in RedisChannel channel)
106108

107109
ReadOnlySpan<byte> routingSpan = channel.RoutingSpan;
108110
byte[] prefix;
109-
return channel.IgnoreChannelPrefix || (prefix = multiplexer.ChannelPrefix).Length == 0
111+
return channel.IgnoreChannelPrefix || (prefix = ChannelPrefix).Length == 0
110112
? GetClusterSlot(routingSpan) : GetClusterSlotWithPrefix(prefix, routingSpan);
111113

112114
static int GetClusterSlotWithPrefix(byte[] prefixRaw, ReadOnlySpan<byte> routingSpan)
@@ -133,15 +135,15 @@ static int GetClusterSlotWithPrefix(byte[] prefixRaw, ReadOnlySpan<byte> routing
133135
/// <remarks>
134136
/// HASH_SLOT = CRC16(key) mod 16384.
135137
/// </remarks>
136-
private static unsafe int GetClusterSlot(ReadOnlySpan<byte> blob)
138+
internal static unsafe int GetClusterSlot(ReadOnlySpan<byte> key)
137139
{
138140
unchecked
139141
{
140-
fixed (byte* ptr = blob)
142+
fixed (byte* ptr = key)
141143
{
142144
fixed (ushort* crc16tab = ServerSelectionStrategy.Crc16tab)
143145
{
144-
int offset = 0, count = blob.Length, start, end;
146+
int offset = 0, count = key.Length, start, end;
145147
if ((start = IndexOf(ptr, (byte)'{', 0, count - 1)) >= 0
146148
&& (end = IndexOf(ptr, (byte)'}', start + 1, count)) >= 0
147149
&& --end != start)
@@ -169,7 +171,7 @@ private static unsafe int GetClusterSlot(ReadOnlySpan<byte> blob)
169171
// the same, so this does a pretty good job of spotting illegal commands before sending them
170172
case ServerType.Twemproxy:
171173
slot = message.GetHashSlot(this);
172-
if (slot == MultipleSlots) throw ExceptionFactory.MultiSlot(multiplexer.RawConfig.IncludeDetailInExceptions, message);
174+
if (slot == MultipleSlots) throw ExceptionFactory.MultiSlot(multiplexer?.RawConfig?.IncludeDetailInExceptions ?? false, message);
173175
break;
174176
/* just shown for completeness
175177
case ServerType.Standalone: // don't use sharding
@@ -193,13 +195,13 @@ private static unsafe int GetClusterSlot(ReadOnlySpan<byte> blob)
193195
return Select(slot, command, flags, allowDisconnected);
194196
}
195197

196-
public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved)
198+
public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved, bool isSelf)
197199
{
198200
try
199201
{
200-
if (ServerType == ServerType.Standalone || hashSlot < 0 || hashSlot >= RedisClusterSlotCount) return false;
202+
if ((ServerType == ServerType.Standalone && !isSelf) || hashSlot < 0 || hashSlot >= RedisClusterSlotCount) return false;
201203

202-
ServerEndPoint server = multiplexer.GetServerEndPoint(endpoint);
204+
ServerEndPoint? server = multiplexer?.GetServerEndPoint(endpoint);
203205
if (server != null)
204206
{
205207
bool retry = false;
@@ -230,7 +232,7 @@ public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isM
230232
}
231233
if (resendVia == null)
232234
{
233-
multiplexer.Trace("Unable to resend to " + endpoint);
235+
multiplexer?.Trace("Unable to resend to " + endpoint);
234236
}
235237
else
236238
{
@@ -248,7 +250,7 @@ public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isM
248250
arr[hashSlot] = server;
249251
if (oldServer != server)
250252
{
251-
multiplexer.OnHashSlotMoved(hashSlot, oldServer?.EndPoint, endpoint);
253+
multiplexer?.OnHashSlotMoved(hashSlot, oldServer?.EndPoint, endpoint);
252254
}
253255
}
254256

@@ -305,7 +307,7 @@ private static unsafe int IndexOf(byte* ptr, byte value, int start, int end)
305307
}
306308

307309
private ServerEndPoint? Any(RedisCommand command, CommandFlags flags, bool allowDisconnected) =>
308-
multiplexer.AnyServer(ServerType, (uint)Interlocked.Increment(ref anyStartOffset), command, flags, allowDisconnected);
310+
multiplexer?.AnyServer(ServerType, (uint)Interlocked.Increment(ref anyStartOffset), command, flags, allowDisconnected);
309311

310312
private static ServerEndPoint? FindPrimary(ServerEndPoint endpoint, RedisCommand command)
311313
{

tests/StackExchange.Redis.Tests/BasicOpTests.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,21 @@
66

77
namespace StackExchange.Redis.Tests;
88

9-
public class BasicOpsTests(ITestOutputHelper output, SharedConnectionFixture fixture) : TestBase(output, fixture)
9+
[RunPerProtocol]
10+
public class BasicOpsTests(ITestOutputHelper output, SharedConnectionFixture fixture)
11+
: BasicOpsTestsBase(output, fixture, null)
12+
{
13+
}
14+
15+
[RunPerProtocol]
16+
public class InProcBasicOpsTests(ITestOutputHelper output, InProcServerFixture fixture)
17+
: BasicOpsTestsBase(output, null, fixture)
18+
{
19+
}
20+
21+
[RunPerProtocol]
22+
public abstract class BasicOpsTestsBase(ITestOutputHelper output, SharedConnectionFixture? connection, InProcServerFixture? server)
23+
: TestBase(output, connection, server)
1024
{
1125
[Fact]
1226
public async Task PingOnce()
@@ -471,6 +485,7 @@ public async Task WrappedDatabasePrefixIntegration()
471485
public async Task TransactionSync()
472486
{
473487
await using var conn = Create();
488+
Assert.SkipUnless(conn.RawConfig.CommandMap.IsAvailable(RedisCommand.MULTI), "MULTI is not available");
474489
var db = conn.GetDatabase();
475490

476491
RedisKey key = Me();
@@ -490,6 +505,8 @@ public async Task TransactionSync()
490505
public async Task TransactionAsync()
491506
{
492507
await using var conn = Create();
508+
Assert.SkipUnless(conn.RawConfig.CommandMap.IsAvailable(RedisCommand.MULTI), "MULTI is not available");
509+
493510
var db = conn.GetDatabase();
494511

495512
RedisKey key = Me();
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using System;
2+
using StackExchange.Redis.Configuration;
3+
using Xunit;
4+
5+
[assembly: AssemblyFixture(typeof(StackExchange.Redis.Tests.InProcServerFixture))]
6+
7+
// ReSharper disable once CheckNamespace
8+
namespace StackExchange.Redis.Tests;
9+
10+
public class InProcServerFixture : IDisposable
11+
{
12+
private readonly InProcessTestServer _server = new();
13+
private readonly ConfigurationOptions _config;
14+
public InProcServerFixture()
15+
{
16+
_config = _server.GetClientConfig();
17+
Configuration = _config.ToString();
18+
}
19+
20+
public ConfigurationOptions Config => _config;
21+
22+
public string Configuration { get; }
23+
24+
public Tunnel? Tunnel => _server.Tunnel;
25+
26+
public void Dispose() => _server.Dispose();
27+
}

tests/StackExchange.Redis.Tests/HighIntegrityBasicOpsTests.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,8 @@ public class HighIntegrityBasicOpsTests(ITestOutputHelper output, SharedConnecti
66
{
77
internal override bool HighIntegrity => true;
88
}
9+
10+
public class InProcHighIntegrityBasicOpsTests(ITestOutputHelper output, InProcServerFixture fixture) : InProcBasicOpsTests(output, fixture)
11+
{
12+
internal override bool HighIntegrity => true;
13+
}

tests/StackExchange.Redis.Tests/InProcessTestServer.cs

Lines changed: 84 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.IO.Pipelines;
44
using System.Net;
55
using System.Net.Sockets;
6+
using System.Text;
67
using System.Threading;
78
using System.Threading.Tasks;
89
using Pipelines.Sockets.Unofficial;
@@ -14,17 +15,79 @@ namespace StackExchange.Redis.Tests;
1415

1516
public class InProcessTestServer : MemoryCacheRedisServer
1617
{
17-
public Tunnel Tunnel { get; }
18-
1918
private readonly ITestOutputHelper? _log;
2019
public InProcessTestServer(ITestOutputHelper? log = null)
2120
{
21+
RedisVersion = RedisFeatures.v6_0_0; // for client to expect RESP3
2222
_log = log;
2323
// ReSharper disable once VirtualMemberCallInConstructor
2424
_log?.WriteLine($"Creating in-process server: {ToString()}");
2525
Tunnel = new InProcTunnel(this);
2626
}
2727

28+
public Task<ConnectionMultiplexer> ConnectAsync(bool withPubSub = false, TextWriter? log = null)
29+
=> ConnectionMultiplexer.ConnectAsync(GetClientConfig(withPubSub), log);
30+
31+
public ConfigurationOptions GetClientConfig(bool withPubSub = false)
32+
{
33+
var commands = GetCommands();
34+
if (!withPubSub)
35+
{
36+
commands.Remove(nameof(RedisCommand.SUBSCRIBE));
37+
commands.Remove(nameof(RedisCommand.PSUBSCRIBE));
38+
commands.Remove(nameof(RedisCommand.SSUBSCRIBE));
39+
commands.Remove(nameof(RedisCommand.UNSUBSCRIBE));
40+
commands.Remove(nameof(RedisCommand.PUNSUBSCRIBE));
41+
commands.Remove(nameof(RedisCommand.SUNSUBSCRIBE));
42+
commands.Remove(nameof(RedisCommand.PUBLISH));
43+
commands.Remove(nameof(RedisCommand.SPUBLISH));
44+
}
45+
// transactions don't work yet
46+
commands.Remove(nameof(RedisCommand.MULTI));
47+
commands.Remove(nameof(RedisCommand.EXEC));
48+
commands.Remove(nameof(RedisCommand.DISCARD));
49+
commands.Remove(nameof(RedisCommand.WATCH));
50+
commands.Remove(nameof(RedisCommand.UNWATCH));
51+
52+
var config = new ConfigurationOptions
53+
{
54+
CommandMap = CommandMap.Create(commands),
55+
ConfigurationChannel = "",
56+
TieBreaker = "",
57+
DefaultVersion = RedisVersion,
58+
ConnectTimeout = 10000,
59+
SyncTimeout = 5000,
60+
AsyncTimeout = 5000,
61+
AllowAdmin = true,
62+
Tunnel = Tunnel,
63+
};
64+
foreach (var endpoint in GetEndPoints())
65+
{
66+
config.EndPoints.Add(endpoint);
67+
}
68+
return config;
69+
}
70+
71+
public Tunnel Tunnel { get; }
72+
73+
public override void Log(string message)
74+
{
75+
_log?.WriteLine(message);
76+
base.Log(message);
77+
}
78+
79+
protected override void OnMoved(RedisClient client, int hashSlot, Node node)
80+
{
81+
_log?.WriteLine($"Client {client.Id} being redirected: {hashSlot} to {node}");
82+
base.OnMoved(client, hashSlot, node);
83+
}
84+
85+
public override TypedRedisValue OnUnknownCommand(in RedisClient client, in RedisRequest request, ReadOnlySpan<byte> command)
86+
{
87+
_log?.WriteLine($"[{client.Id}] unknown command: {Encoding.ASCII.GetString(command)}");
88+
return base.OnUnknownCommand(in client, in request, command);
89+
}
90+
2891
private sealed class InProcTunnel(
2992
InProcessTestServer server,
3093
PipeOptions? pipeOptions = null) : Tunnel
@@ -33,8 +96,12 @@ private sealed class InProcTunnel(
3396
EndPoint endpoint,
3497
CancellationToken cancellationToken)
3598
{
36-
// server._log?.WriteLine($"Disabling client creation, requested endpoint: {Format.ToString(endpoint)}");
37-
return default;
99+
if (server.TryGetNode(endpoint, out _))
100+
{
101+
// server._log?.WriteLine($"Disabling client creation, requested endpoint: {Format.ToString(endpoint)}");
102+
return default;
103+
}
104+
return base.GetSocketConnectEndpointAsync(endpoint, cancellationToken);
38105
}
39106

40107
public override ValueTask<Stream?> BeforeAuthenticateAsync(
@@ -43,13 +110,18 @@ private sealed class InProcTunnel(
43110
Socket? socket,
44111
CancellationToken cancellationToken)
45112
{
46-
server._log?.WriteLine($"Client intercepted, requested endpoint: {Format.ToString(endpoint)} for {connectionType} usage");
47-
var clientToServer = new Pipe(pipeOptions ?? PipeOptions.Default);
48-
var serverToClient = new Pipe(pipeOptions ?? PipeOptions.Default);
49-
var serverSide = new Duplex(clientToServer.Reader, serverToClient.Writer);
50-
_ = Task.Run(async () => await server.RunClientAsync(serverSide), cancellationToken);
51-
var clientSide = StreamConnection.GetDuplex(serverToClient.Reader, clientToServer.Writer);
52-
return new(clientSide);
113+
if (server.TryGetNode(endpoint, out var node))
114+
{
115+
server._log?.WriteLine(
116+
$"Client intercepted, endpoint {Format.ToString(endpoint)} ({connectionType}) mapped to {server.ServerType} node {node}");
117+
var clientToServer = new Pipe(pipeOptions ?? PipeOptions.Default);
118+
var serverToClient = new Pipe(pipeOptions ?? PipeOptions.Default);
119+
var serverSide = new Duplex(clientToServer.Reader, serverToClient.Writer);
120+
_ = Task.Run(async () => await server.RunClientAsync(serverSide, node: node), cancellationToken);
121+
var clientSide = StreamConnection.GetDuplex(serverToClient.Reader, clientToServer.Writer);
122+
return new(clientSide);
123+
}
124+
return base.BeforeAuthenticateAsync(endpoint, connectionType, socket, cancellationToken);
53125
}
54126

55127
private sealed class Duplex(PipeReader input, PipeWriter output) : IDuplexPipe
@@ -65,6 +137,7 @@ public ValueTask Dispose()
65137
}
66138
}
67139
}
140+
68141
/*
69142
70143
private readonly RespServer _server;

0 commit comments

Comments
 (0)