Skip to content

Commit 1873c37

Browse files
committed
- simplify connecting to the test server
- include all commands and endpoints when connecting to the test server - better RESP3 aggregate support
1 parent 35be6c3 commit 1873c37

4 files changed

Lines changed: 101 additions & 52 deletions

File tree

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -931,11 +931,20 @@ internal static void WriteMultiBulkHeader(PipeWriter output, long count, ResultT
931931
var span = output.GetSpan(3 + Format.MaxInt32TextLen);
932932
span[0] = type switch
933933
{
934+
ResultType.Push => (byte)'>',
935+
ResultType.Attribute => (byte)'|',
934936
ResultType.Map => (byte)'%',
935937
ResultType.Set => (byte)'~',
936938
_ => (byte)'*',
937939
};
938-
if (type is ResultType.Map & count > 1) count >>= 1;
940+
if (type is ResultType.Map & count > 0)
941+
{
942+
if ((count & 1) != 0) Throw(count);
943+
count >>= 1;
944+
static void Throw(long count) => throw new ArgumentOutOfRangeException(
945+
paramName: nameof(count),
946+
message: $"Map data must be in pairs; got {count}");
947+
}
939948
int offset = WriteRaw(span, count, offset: 1);
940949
output.Advance(offset);
941950
}

tests/StackExchange.Redis.Tests/InProcessTestServer.cs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ namespace StackExchange.Redis.Tests;
1515

1616
public class InProcessTestServer : MemoryCacheRedisServer
1717
{
18-
public Tunnel Tunnel { get; }
19-
2018
private readonly ITestOutputHelper? _log;
2119
public InProcessTestServer(ITestOutputHelper? log = null)
2220
{
@@ -26,6 +24,44 @@ public InProcessTestServer(ITestOutputHelper? log = null)
2624
Tunnel = new InProcTunnel(this);
2725
}
2826

27+
public Task<ConnectionMultiplexer> ConnectAsync(bool withPubSub = false, TextWriter? log = null)
28+
=> ConnectionMultiplexer.ConnectAsync(GetClientConfig(withPubSub), log);
29+
30+
public ConfigurationOptions GetClientConfig(bool withPubSub = false)
31+
{
32+
var commands = GetCommands();
33+
if (!withPubSub)
34+
{
35+
commands.Remove(nameof(RedisCommand.SUBSCRIBE));
36+
commands.Remove(nameof(RedisCommand.PSUBSCRIBE));
37+
commands.Remove(nameof(RedisCommand.SSUBSCRIBE));
38+
commands.Remove(nameof(RedisCommand.UNSUBSCRIBE));
39+
commands.Remove(nameof(RedisCommand.PUNSUBSCRIBE));
40+
commands.Remove(nameof(RedisCommand.SUNSUBSCRIBE));
41+
commands.Remove(nameof(RedisCommand.PUBLISH));
42+
commands.Remove(nameof(RedisCommand.SPUBLISH));
43+
}
44+
var config = new ConfigurationOptions
45+
{
46+
CommandMap = CommandMap.Create(commands),
47+
ConfigurationChannel = "",
48+
TieBreaker = "",
49+
DefaultVersion = RedisVersion,
50+
ConnectTimeout = 10000,
51+
SyncTimeout = 5000,
52+
AsyncTimeout = 5000,
53+
AllowAdmin = true,
54+
Tunnel = Tunnel,
55+
};
56+
foreach (var endpoint in GetEndPoints())
57+
{
58+
config.EndPoints.Add(endpoint);
59+
}
60+
return config;
61+
}
62+
63+
public Tunnel Tunnel { get; }
64+
2965
public override void Log(string message)
3066
{
3167
_log?.WriteLine(message);

tests/StackExchange.Redis.Tests/MovedUnitTests.cs

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,6 @@ namespace StackExchange.Redis.Tests;
1313
/// </summary>
1414
public class MovedUnitTests(ITestOutputHelper log)
1515
{
16-
private static ConfigurationOptions GetMinimalConfig(InProcessTestServer server)
17-
{
18-
return new ConfigurationOptions
19-
{
20-
CommandMap = CommandMap.Create(new HashSet<string> { "SUBSCRIBE" }, false),
21-
ConfigurationChannel = "",
22-
TieBreaker = "",
23-
EndPoints = { server.DefaultEndPoint },
24-
ConnectTimeout = 10000,
25-
SyncTimeout = 5000,
26-
AsyncTimeout = 5000,
27-
AllowAdmin = true,
28-
Tunnel = server.Tunnel,
29-
};
30-
}
31-
3216
private RedisKey Me([CallerMemberName] string callerName = "") => callerName;
3317

3418
[Theory]
@@ -41,7 +25,8 @@ public async Task CrossSlotDisallowed(ServerType serverType)
4125
string keyA = "abc", keyB = "def"; // known to be on different slots
4226

4327
using var server = new InProcessTestServer(log) { ServerType = serverType };
44-
using var muxer = await ConnectionMultiplexer.ConnectAsync(GetMinimalConfig(server));
28+
await using var muxer = await server.ConnectAsync();
29+
4530
var db = muxer.GetDatabase();
4631
await db.StringSetAsync(keyA, "value", flags: CommandFlags.FireAndForget);
4732

@@ -71,7 +56,7 @@ public async Task KeyMigrationFollowed(bool allowFollowRedirects)
7156
using var server = new InProcessTestServer(log) { ServerType = ServerType.Cluster };
7257
var secondNode = server.AddEmptyNode();
7358

74-
using var muxer = await ConnectionMultiplexer.ConnectAsync(GetMinimalConfig(server));
59+
await using var muxer = await server.ConnectAsync();
7560
var db = muxer.GetDatabase();
7661

7762
await db.StringSetAsync(key, "value");
@@ -122,9 +107,7 @@ public async Task MovedToSameEndpoint_TriggersReconnectAndRetry_CommandSucceeds(
122107
log: log) { ServerType = serverType, };
123108

124109
// Act: Connect to the test server
125-
var config = GetMinimalConfig(testServer);
126-
127-
await using var conn = await ConnectionMultiplexer.ConnectAsync(config);
110+
await using var conn = await testServer.ConnectAsync();
128111
// Ping the server to ensure it's responsive
129112
var server = conn.GetServer(testServer.DefaultEndPoint);
130113

toys/StackExchange.Redis.Server/RespServer.cs

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@ protected RespServer(TextWriter output = null)
3131
_commands = BuildCommands(this);
3232
}
3333

34+
public HashSet<string> GetCommands()
35+
{
36+
var set = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
37+
foreach (var kvp in _commands)
38+
{
39+
set.Add(kvp.Key.ToString());
40+
}
41+
return set;
42+
}
43+
3444
private static Dictionary<CommandBytes, RespCommand> BuildCommands(RespServer server)
3545
{
3646
static RedisCommandAttribute CheckSignatureAndGetAttribute(MethodInfo method)
@@ -313,30 +323,37 @@ public virtual void Log(string message)
313323

314324
public static async ValueTask WriteResponseAsync(RedisClient client, PipeWriter output, TypedRedisValue value, RedisProtocol protocol)
315325
{
316-
static void WritePrefix(PipeWriter ooutput, char pprefix)
326+
static void WritePrefix(PipeWriter output, char prefix)
317327
{
318-
var span = ooutput.GetSpan(1);
319-
span[0] = (byte)pprefix;
320-
ooutput.Advance(1);
328+
var span = output.GetSpan(1);
329+
span[0] = (byte)prefix;
330+
output.Advance(1);
321331
}
322332

323333
if (value.IsNil) return; // not actually a request (i.e. empty/whitespace request)
324334
if (client != null && client.ShouldSkipResponse()) return; // intentionally skipping the result
325-
char prefix;
335+
326336
var type = value.Type;
327-
if (protocol is RedisProtocol.Resp2 & type is not ResultType.Null) type = type.ToResp2();
337+
if (protocol is RedisProtocol.Resp2 & type is not ResultType.Null)
338+
{
339+
if (type is ResultType.VerbatimString)
340+
{
341+
var s = (string)value.AsRedisValue();
342+
if (s is { Length: >= 4 } && s[3] == ':')
343+
value = TypedRedisValue.BulkString(s.Substring(4));
344+
}
345+
type = type.ToResp2();
346+
}
328347
RetryResp2:
329348
if (protocol is RedisProtocol.Resp3 && value.IsNullValueOrArray)
330349
{
331350
output.Write("_\r\n"u8);
332351
}
333352
else
334353
{
354+
char prefix;
335355
switch (type)
336356
{
337-
case ResultType.Null:
338-
output.Write("_\r\n"u8);
339-
break;
340357
case ResultType.Integer:
341358
PhysicalConnection.WriteInteger(output, (long)value.AsRedisValue());
342359
break;
@@ -355,30 +372,34 @@ static void WritePrefix(PipeWriter ooutput, char pprefix)
355372
case ResultType.BulkString:
356373
PhysicalConnection.WriteBulkString(value.AsRedisValue(), output);
357374
break;
375+
case ResultType.Null:
376+
case ResultType.Push when value.IsNullArray:
377+
case ResultType.Map when value.IsNullArray:
378+
case ResultType.Set when value.IsNullArray:
379+
case ResultType.Attribute when value.IsNullArray:
380+
output.Write("_\r\n"u8);
381+
break;
382+
case ResultType.Array when value.IsNullArray:
383+
PhysicalConnection.WriteMultiBulkHeader(output, -1, type);
384+
break;
358385
case ResultType.Push:
359386
case ResultType.Map:
360387
case ResultType.Array:
361-
if (value.IsNullArray)
362-
{
363-
PhysicalConnection.WriteMultiBulkHeader(output, -1, type);
364-
}
365-
else
388+
case ResultType.Set:
389+
case ResultType.Attribute:
390+
var segment = value.Segment;
391+
PhysicalConnection.WriteMultiBulkHeader(output, segment.Count, type);
392+
var arr = segment.Array;
393+
int offset = segment.Offset;
394+
for (int i = 0; i < segment.Count; i++)
366395
{
367-
var segment = value.Segment;
368-
PhysicalConnection.WriteMultiBulkHeader(output, segment.Count, type);
369-
var arr = segment.Array;
370-
int offset = segment.Offset;
371-
for (int i = 0; i < segment.Count; i++)
372-
{
373-
var item = arr[offset++];
374-
if (item.IsNil)
375-
throw new InvalidOperationException("Array element cannot be nil, index " + i);
376-
377-
// note: don't pass client down; this would impact SkipReplies
378-
await WriteResponseAsync(null, output, item, protocol);
379-
}
380-
}
396+
var item = arr[offset++];
397+
if (item.IsNil)
398+
throw new InvalidOperationException("Array element cannot be nil, index " + i);
381399

400+
// note: don't pass client down; this would impact SkipReplies
401+
await WriteResponseAsync(null, output, item, protocol);
402+
}
382403
break;
383404
default:
384405
// retry with RESP2

0 commit comments

Comments
 (0)