Skip to content

Commit bcf41c2

Browse files
authored
add test infrastructure for rich in-proc dummy servers (#3021)
* - add test infrastructure for rich in-proc dummy servers, for simulating connection state - migrate the -MOVED/self test to the new infrastructure, and use better server/cluster/etc impl * log client id
1 parent b8ebdf7 commit bcf41c2

File tree

6 files changed

+209
-135
lines changed

6 files changed

+209
-135
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
using System;
2+
using System.IO;
3+
using System.IO.Pipelines;
4+
using System.Net;
5+
using System.Net.Sockets;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Pipelines.Sockets.Unofficial;
9+
using StackExchange.Redis.Configuration;
10+
using StackExchange.Redis.Server;
11+
using Xunit;
12+
13+
namespace StackExchange.Redis.Tests;
14+
15+
public class InProcessTestServer : MemoryCacheRedisServer
16+
{
17+
public Tunnel Tunnel { get; }
18+
19+
private readonly ITestOutputHelper? _log;
20+
public InProcessTestServer(ITestOutputHelper? log = null)
21+
{
22+
_log = log;
23+
// ReSharper disable once VirtualMemberCallInConstructor
24+
_log?.WriteLine($"Creating in-process server: {ToString()}");
25+
Tunnel = new InProcTunnel(this);
26+
}
27+
28+
private sealed class InProcTunnel(
29+
InProcessTestServer server,
30+
PipeOptions? pipeOptions = null) : Tunnel
31+
{
32+
public override ValueTask<EndPoint?> GetSocketConnectEndpointAsync(
33+
EndPoint endpoint,
34+
CancellationToken cancellationToken)
35+
{
36+
// server._log?.WriteLine($"Disabling client creation, requested endpoint: {Format.ToString(endpoint)}");
37+
return default;
38+
}
39+
40+
public override ValueTask<Stream?> BeforeAuthenticateAsync(
41+
EndPoint endpoint,
42+
ConnectionType connectionType,
43+
Socket? socket,
44+
CancellationToken cancellationToken)
45+
{
46+
server._log?.WriteLine($"Client intercepted, requested endpoint: {Format.ToString(endpoint)} for {connectionType} usage");
47+
var clientToServer = new Pipe(pipeOptions ?? PipeOptions.Default);
48+
var serverToClient = new Pipe(pipeOptions ?? PipeOptions.Default);
49+
var serverSide = new Duplex(clientToServer.Reader, serverToClient.Writer);
50+
_ = Task.Run(async () => await server.RunClientAsync(serverSide), cancellationToken);
51+
var clientSide = StreamConnection.GetDuplex(serverToClient.Reader, clientToServer.Writer);
52+
return new(clientSide);
53+
}
54+
55+
private sealed class Duplex(PipeReader input, PipeWriter output) : IDuplexPipe
56+
{
57+
public PipeReader Input => input;
58+
public PipeWriter Output => output;
59+
60+
public ValueTask Dispose()
61+
{
62+
input.Complete();
63+
output.Complete();
64+
return default;
65+
}
66+
}
67+
}
68+
/*
69+
70+
private readonly RespServer _server;
71+
public RespSocketServer(RespServer server)
72+
{
73+
_server = server ?? throw new ArgumentNullException(nameof(server));
74+
server.Shutdown.ContinueWith((_, o) => ((SocketServer)o).Dispose(), this);
75+
}
76+
protected override void OnStarted(EndPoint endPoint)
77+
=> _server.Log("Server is listening on " + endPoint);
78+
79+
protected override Task OnClientConnectedAsync(in ClientConnection client)
80+
=> _server.RunClientAsync(client.Transport);
81+
82+
protected override void Dispose(bool disposing)
83+
{
84+
if (disposing) _server.Dispose();
85+
}
86+
*/
87+
}

tests/StackExchange.Redis.Tests/MovedTestServer.cs

Lines changed: 21 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
44
using System.Net;
5+
using System.Net.Sockets;
6+
using System.Text;
57
using System.Threading;
68
using System.Threading.Tasks;
79
using StackExchange.Redis.Server;
10+
using Xunit;
811

912
namespace StackExchange.Redis.Tests;
1013

@@ -14,7 +17,7 @@ namespace StackExchange.Redis.Tests;
1417
/// When a MOVED error points to the same endpoint, it signals the client to reconnect before retrying the command,
1518
/// allowing the DNS record/proxy/load balancer to route the connection to a different underlying server host.
1619
/// </summary>
17-
public class MovedTestServer : MemoryCacheRedisServer
20+
public class MovedTestServer : InProcessTestServer
1821
{
1922
/// <summary>
2023
/// Represents the simulated server host state behind a proxy/load balancer.
@@ -34,19 +37,26 @@ private enum SimulatedHost
3437

3538
private int _setCmdCount = 0;
3639
private int _movedResponseCount = 0;
37-
private int _connectionCount = 0;
40+
3841
private SimulatedHost _currentServerHost = SimulatedHost.OldServer;
39-
private readonly ConcurrentDictionary<RedisClient, SimulatedHost> _clientHostAssignments = new();
42+
4043
private readonly Func<string> _getEndpoint;
4144
private readonly string _triggerKey;
4245
private readonly int _hashSlot;
4346
private EndPoint? _actualEndpoint;
4447

45-
public MovedTestServer(Func<string> getEndpoint, string triggerKey = "testkey", int hashSlot = 12345)
48+
public MovedTestServer(Func<string> getEndpoint, string triggerKey = "testkey", int hashSlot = 12345, ITestOutputHelper? log = null) : base(log)
4649
{
4750
_getEndpoint = getEndpoint;
4851
_triggerKey = triggerKey;
4952
_hashSlot = hashSlot;
53+
ServerType = ServerType.Cluster;
54+
RedisVersion = RedisFeatures.v7_2_0_rc1;
55+
}
56+
57+
private sealed class MovedTestClient(SimulatedHost assignedHost) : RedisClient
58+
{
59+
public SimulatedHost AssignedHost => assignedHost;
5060
}
5161

5262
/// <summary>
@@ -55,32 +65,11 @@ public MovedTestServer(Func<string> getEndpoint, string triggerKey = "testkey",
5565
/// </summary>
5666
public override RedisClient CreateClient()
5767
{
58-
var client = base.CreateClient();
59-
var assignedHost = _currentServerHost;
60-
_clientHostAssignments[client] = assignedHost;
61-
Interlocked.Increment(ref _connectionCount);
62-
Log($"New client connection established (assigned to {assignedHost}, total connections: {_connectionCount}), endpoint: {_actualEndpoint}");
68+
var client = new MovedTestClient(_currentServerHost);
69+
Log($"New client connection established (assigned to {client.AssignedHost}, total connections: {TotalClientCount}), endpoint: {_actualEndpoint}");
6370
return client;
6471
}
6572

66-
/// <summary>
67-
/// Handles the INFO command, reporting cluster mode as enabled.
68-
/// </summary>
69-
protected override TypedRedisValue Info(RedisClient client, RedisRequest request)
70-
{
71-
// Override INFO to report cluster mode enabled
72-
var section = request.Count >= 2 ? request.GetString(1) : null;
73-
74-
// Return cluster-enabled info
75-
var infoResponse = section?.Equals("CLUSTER", StringComparison.OrdinalIgnoreCase) == true
76-
? "# Cluster\r\ncluster_enabled:1\r\n"
77-
: "# Server\r\nredis_version:7.0.0\r\nredis_mode:cluster\r\n# Cluster\r\ncluster_enabled:1\r\n";
78-
79-
Log($"Returning INFO response (cluster_enabled:1), endpoint: {_actualEndpoint}");
80-
81-
return TypedRedisValue.BulkString(infoResponse);
82-
}
83-
8473
/// <summary>
8574
/// Handles CLUSTER commands, supporting SLOTS and NODES subcommands for cluster mode simulation.
8675
/// </summary>
@@ -122,10 +111,11 @@ protected override TypedRedisValue Set(RedisClient client, RedisRequest request)
122111
Interlocked.Increment(ref _setCmdCount);
123112

124113
// Get the client's assigned server host
125-
if (!_clientHostAssignments.TryGetValue(client, out var clientHost))
114+
if (client is not MovedTestClient movedClient)
126115
{
127-
throw new InvalidOperationException("Client host assignment not found - this indicates a test infrastructure error");
116+
throw new InvalidOperationException($"Client is not a {nameof(MovedTestClient)}");
128117
}
118+
var clientHost = movedClient.AssignedHost;
129119

130120
// Check if this is the trigger key from an old server client
131121
if (key == _triggerKey && clientHost == SimulatedHost.OldServer)
@@ -213,11 +203,6 @@ private TypedRedisValue GetClusterNodesResponse()
213203
/// </summary>
214204
public int MovedResponseCount => _movedResponseCount;
215205

216-
/// <summary>
217-
/// Gets the number of client connections established.
218-
/// </summary>
219-
public int ConnectionCount => _connectionCount;
220-
221206
/// <summary>
222207
/// Gets the actual endpoint the server is listening on.
223208
/// </summary>
@@ -236,10 +221,10 @@ public void SetActualEndpoint(EndPoint endPoint)
236221
/// <summary>
237222
/// Resets all counters for test reusability.
238223
/// </summary>
239-
public void ResetCounters()
224+
public override void ResetCounters()
240225
{
241226
Interlocked.Exchange(ref _setCmdCount, 0);
242227
Interlocked.Exchange(ref _movedResponseCount, 0);
243-
Interlocked.Exchange(ref _connectionCount, 0);
228+
base.ResetCounters();
244229
}
245230
}
Lines changed: 49 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
using System;
21
using System.Net;
3-
using System.Net.Sockets;
42
using System.Threading.Tasks;
5-
using StackExchange.Redis.Server;
63
using Xunit;
7-
using Xunit.Sdk;
84

95
namespace StackExchange.Redis.Tests;
106

@@ -15,18 +11,6 @@ namespace StackExchange.Redis.Tests;
1511
/// </summary>
1612
public class MovedToSameEndpointTests(ITestOutputHelper log)
1713
{
18-
/// <summary>
19-
/// Gets a free port by temporarily binding to port 0 and retrieving the OS-assigned port.
20-
/// </summary>
21-
private static int GetFreePort()
22-
{
23-
var listener = new TcpListener(IPAddress.Loopback, 0);
24-
listener.Start();
25-
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
26-
listener.Stop();
27-
return port;
28-
}
29-
3014
/// <summary>
3115
/// Integration test: Verifies that when a MOVED error points to the same endpoint,
3216
/// the client reconnects and successfully retries the operation.
@@ -49,73 +33,67 @@ private static int GetFreePort()
4933
public async Task MovedToSameEndpoint_TriggersReconnectAndRetry_CommandSucceeds()
5034
{
5135
var keyName = "MovedToSameEndpoint_TriggersReconnectAndRetry_CommandSucceeds";
52-
// Arrange: Get a free port to avoid conflicts when tests run in parallel
53-
var port = GetFreePort();
54-
var listenEndpoint = new IPEndPoint(IPAddress.Loopback, port);
5536

56-
var testServer = new MovedTestServer(
37+
var listenEndpoint = new IPEndPoint(IPAddress.Loopback, 6382);
38+
using var testServer = new MovedTestServer(
5739
getEndpoint: () => Format.ToString(listenEndpoint),
58-
triggerKey: keyName);
40+
triggerKey: keyName,
41+
log: log);
5942

60-
var socketServer = new RespSocketServer(testServer);
43+
testServer.SetActualEndpoint(listenEndpoint);
6144

62-
try
63-
{
64-
// Start listening on the free port
65-
socketServer.Listen(listenEndpoint);
66-
testServer.SetActualEndpoint(listenEndpoint);
45+
// Wait a moment for the server to fully start
46+
await Task.Delay(100);
6747

68-
// Wait a moment for the server to fully start
69-
await Task.Delay(100);
48+
// Act: Connect to the test server
49+
var config = new ConfigurationOptions
50+
{
51+
EndPoints = { listenEndpoint },
52+
ConnectTimeout = 10000,
53+
SyncTimeout = 5000,
54+
AsyncTimeout = 5000,
55+
AllowAdmin = true,
56+
Tunnel = testServer.Tunnel,
57+
};
7058

71-
// Act: Connect to the test server
72-
var config = new ConfigurationOptions
73-
{
74-
EndPoints = { listenEndpoint },
75-
ConnectTimeout = 10000,
76-
SyncTimeout = 5000,
77-
AsyncTimeout = 5000,
78-
AllowAdmin = true,
79-
};
59+
await using var conn = await ConnectionMultiplexer.ConnectAsync(config);
60+
// Ping the server to ensure it's responsive
61+
var server = conn.GetServer(listenEndpoint);
62+
log?.WriteLine((await server.InfoRawAsync()) ?? "");
63+
var id = await server.ExecuteAsync("client", "id");
64+
log?.WriteLine($"client id: {id}");
8065

81-
await using var conn = await ConnectionMultiplexer.ConnectAsync(config);
82-
// Ping the server to ensure it's responsive
83-
var server = conn.GetServer(listenEndpoint);
84-
log?.WriteLine($"info: {await server.InfoRawAsync()}");
85-
await server.PingAsync();
86-
// Verify server is detected as cluster mode
87-
Assert.Equal(ServerType.Cluster, server.ServerType);
88-
var db = conn.GetDatabase();
66+
await server.PingAsync();
67+
// Verify server is detected as cluster mode
68+
Assert.Equal(ServerType.Cluster, server.ServerType);
69+
var db = conn.GetDatabase();
8970

90-
// Record baseline counters after initial connection
91-
var initialSetCmdCount = testServer.SetCmdCount;
92-
var initialMovedResponseCount = testServer.MovedResponseCount;
93-
var initialConnectionCount = testServer.ConnectionCount;
94-
// Execute SET command: This should receive MOVED → reconnect → retry → succeed
95-
var setResult = await db.StringSetAsync(keyName, "testvalue");
71+
// Record baseline counters after initial connection
72+
var initialSetCmdCount = testServer.SetCmdCount;
73+
var initialMovedResponseCount = testServer.MovedResponseCount;
74+
var initialConnectionCount = testServer.TotalClientCount;
75+
// Execute SET command: This should receive MOVED → reconnect → retry → succeed
76+
var setResult = await db.StringSetAsync(keyName, "testvalue");
9677

97-
// Assert: Verify SET command succeeded
98-
Assert.True(setResult, "SET command should return true (OK)");
78+
// Assert: Verify SET command succeeded
79+
Assert.True(setResult, "SET command should return true (OK)");
9980

100-
// Verify the value was actually stored (proving retry succeeded)
101-
var retrievedValue = await db.StringGetAsync(keyName);
102-
Assert.Equal("testvalue", (string?)retrievedValue);
81+
// Verify the value was actually stored (proving retry succeeded)
82+
var retrievedValue = await db.StringGetAsync(keyName);
83+
Assert.Equal("testvalue", (string?)retrievedValue);
10384

104-
// Verify SET command was executed twice: once with MOVED response, once successfully
105-
var expectedSetCmdCount = initialSetCmdCount + 2;
106-
Assert.Equal(expectedSetCmdCount, testServer.SetCmdCount);
85+
// Verify SET command was executed twice: once with MOVED response, once successfully
86+
var expectedSetCmdCount = initialSetCmdCount + 2;
87+
Assert.Equal(expectedSetCmdCount, testServer.SetCmdCount);
10788

108-
// Verify MOVED response was returned exactly once
109-
var expectedMovedResponseCount = initialMovedResponseCount + 1;
110-
Assert.Equal(expectedMovedResponseCount, testServer.MovedResponseCount);
89+
// Verify MOVED response was returned exactly once
90+
var expectedMovedResponseCount = initialMovedResponseCount + 1;
91+
Assert.Equal(expectedMovedResponseCount, testServer.MovedResponseCount);
11192

112-
// Verify reconnection occurred: connection count should have increased by 1
113-
var expectedConnectionCount = initialConnectionCount + 1;
114-
Assert.Equal(expectedConnectionCount, testServer.ConnectionCount);
115-
}
116-
finally
117-
{
118-
socketServer?.Dispose();
119-
}
93+
// Verify reconnection occurred: connection count should have increased by 1
94+
var expectedConnectionCount = initialConnectionCount + 1;
95+
Assert.Equal(expectedConnectionCount, testServer.TotalClientCount);
96+
id = await server.ExecuteAsync("client", "id");
97+
log?.WriteLine($"client id: {id}");
12098
}
12199
}

toys/StackExchange.Redis.Server/RedisClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
namespace StackExchange.Redis.Server
66
{
7-
public sealed class RedisClient : IDisposable
7+
public class RedisClient : IDisposable
88
{
99
internal int SkipReplies { get; set; }
1010
internal bool ShouldSkipResponse()

0 commit comments

Comments
 (0)