-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Expand file tree
/
Copy pathInProcessTestServer.cs
More file actions
166 lines (146 loc) · 5.7 KB
/
InProcessTestServer.cs
File metadata and controls
166 lines (146 loc) · 5.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
using System;
using System.IO;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;
using StackExchange.Redis.Configuration;
using StackExchange.Redis.Server;
using Xunit;
namespace StackExchange.Redis.Tests;
public class InProcessTestServer : MemoryCacheRedisServer
{
private readonly ITestOutputHelper? _log;
public InProcessTestServer(ITestOutputHelper? log = null)
{
RedisVersion = RedisFeatures.v6_0_0; // for client to expect RESP3
_log = log;
// ReSharper disable once VirtualMemberCallInConstructor
_log?.WriteLine($"Creating in-process server: {ToString()}");
Tunnel = new InProcTunnel(this);
}
public Task<ConnectionMultiplexer> ConnectAsync(TextWriter? log = null)
=> ConnectionMultiplexer.ConnectAsync(GetClientConfig(), log);
public ConfigurationOptions GetClientConfig()
{
var commands = GetCommands();
// transactions don't work yet (needs v3 buffer features)
commands.Remove(nameof(RedisCommand.MULTI));
commands.Remove(nameof(RedisCommand.EXEC));
commands.Remove(nameof(RedisCommand.DISCARD));
commands.Remove(nameof(RedisCommand.WATCH));
commands.Remove(nameof(RedisCommand.UNWATCH));
var config = new ConfigurationOptions
{
CommandMap = CommandMap.Create(commands),
ConfigurationChannel = "",
TieBreaker = "",
DefaultVersion = RedisVersion,
ConnectTimeout = 10000,
SyncTimeout = 5000,
AsyncTimeout = 5000,
AllowAdmin = true,
Tunnel = Tunnel,
};
foreach (var endpoint in GetEndPoints())
{
config.EndPoints.Add(endpoint);
}
return config;
}
public Tunnel Tunnel { get; }
public override void Log(string message)
{
_log?.WriteLine(message);
base.Log(message);
}
protected override void OnMoved(RedisClient client, int hashSlot, Node node)
{
_log?.WriteLine($"Client {client.Id} being redirected: {hashSlot} to {node}");
base.OnMoved(client, hashSlot, node);
}
protected override void OnOutOfBand(RedisClient client, TypedRedisValue message)
{
if (message.IsAggregate
&& message.Span is { IsEmpty: false } span
&& !span[0].IsAggregate)
{
_log?.WriteLine($"Client {client.Id}: {span[0].AsRedisValue()} {message} ");
}
else
{
_log?.WriteLine($"Client {client.Id}: {message}");
}
base.OnOutOfBand(client, message);
}
public override TypedRedisValue OnUnknownCommand(in RedisClient client, in RedisRequest request, ReadOnlySpan<byte> command)
{
_log?.WriteLine($"[{client.Id}] unknown command: {Encoding.ASCII.GetString(command)}");
return base.OnUnknownCommand(in client, in request, command);
}
private sealed class InProcTunnel(
InProcessTestServer server,
PipeOptions? pipeOptions = null) : Tunnel
{
public override ValueTask<EndPoint?> GetSocketConnectEndpointAsync(
EndPoint endpoint,
CancellationToken cancellationToken)
{
if (server.TryGetNode(endpoint, out _))
{
// server._log?.WriteLine($"Disabling client creation, requested endpoint: {Format.ToString(endpoint)}");
return default;
}
return base.GetSocketConnectEndpointAsync(endpoint, cancellationToken);
}
public override ValueTask<Stream?> BeforeAuthenticateAsync(
EndPoint endpoint,
ConnectionType connectionType,
Socket? socket,
CancellationToken cancellationToken)
{
if (server.TryGetNode(endpoint, out var node))
{
server._log?.WriteLine(
$"Client intercepted, endpoint {Format.ToString(endpoint)} ({connectionType}) mapped to {server.ServerType} node {node}");
var clientToServer = new Pipe(pipeOptions ?? PipeOptions.Default);
var serverToClient = new Pipe(pipeOptions ?? PipeOptions.Default);
var serverSide = new Duplex(clientToServer.Reader, serverToClient.Writer);
_ = Task.Run(async () => await server.RunClientAsync(serverSide, node: node), cancellationToken);
var clientSide = StreamConnection.GetDuplex(serverToClient.Reader, clientToServer.Writer);
return new(clientSide);
}
return base.BeforeAuthenticateAsync(endpoint, connectionType, socket, cancellationToken);
}
private sealed class Duplex(PipeReader input, PipeWriter output) : IDuplexPipe
{
public PipeReader Input => input;
public PipeWriter Output => output;
public ValueTask Dispose()
{
input.Complete();
output.Complete();
return default;
}
}
}
/*
private readonly RespServer _server;
public RespSocketServer(RespServer server)
{
_server = server ?? throw new ArgumentNullException(nameof(server));
server.Shutdown.ContinueWith((_, o) => ((SocketServer)o).Dispose(), this);
}
protected override void OnStarted(EndPoint endPoint)
=> _server.Log("Server is listening on " + endPoint);
protected override Task OnClientConnectedAsync(in ClientConnection client)
=> _server.RunClientAsync(client.Transport);
protected override void Dispose(bool disposing)
{
if (disposing) _server.Dispose();
}
*/
}