Skip to content

Commit 014eaed

Browse files
committed
fix server merge
1 parent 6aa03d6 commit 014eaed

10 files changed

Lines changed: 121 additions & 85 deletions

File tree

src/StackExchange.Redis/RespReaderExtensions.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,10 @@ public string DebugReadTruncatedString(int maxChars)
3636
}
3737
}
3838

39-
public RedisKey ReadRedisKey()
40-
{
41-
reader.DemandScalar();
42-
return (RedisKey)reader.ReadByteArray();
43-
}
39+
public RedisKey ReadRedisKey() => (RedisKey)reader.ReadByteArray();
40+
41+
public RedisChannel ReadRedisChannel(RedisChannel.RedisChannelOptions options)
42+
=> new(reader.ReadByteArray(), options);
4443

4544
public string GetOverview()
4645
{

tests/StackExchange.Redis.Tests/InProcessTestServer.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,18 @@ public Task<ConnectionMultiplexer> ConnectAsync(bool withPubSub = false, WriteMo
3232
public override TypedRedisValue Execute(RedisClient client, in RedisRequest request)
3333
{
3434
var result = base.Execute(client, in request);
35-
Log($"[{client.Id}] {request.Command} => {(char)result.Type} ({result.Type})");
35+
if (result.IsNil)
36+
{
37+
Log($"[{client}] {request.Command} (no reply)");
38+
}
39+
else if (result.IsAggregate)
40+
{
41+
Log($"[{client}] {request.Command} => {(char)result.Type} ({result.Type}, {result.Span.Length})");
42+
}
43+
else
44+
{
45+
Log($"[{client}] {request.Command} => {(char)result.Type} ({result.Type})");
46+
}
3647
return result;
3748
}
3849

@@ -64,6 +75,7 @@ public ConfigurationOptions GetClientConfig(bool withPubSub = false, WriteMode w
6475
Tunnel = Tunnel,
6576
WriteMode = (BufferedStreamWriter.WriteMode)writeMode,
6677
};
78+
if (!string.IsNullOrEmpty(Password)) config.Password = Password;
6779

6880
/* useful for viewing *outbound* data in the log
6981
#if DEBUG
@@ -107,19 +119,19 @@ protected override void OnOutOfBand(RedisClient client, TypedRedisValue message)
107119
&& message.Span is { IsEmpty: false } span
108120
&& !span[0].IsAggregate)
109121
{
110-
_log?.WriteLine($"Client {client.Id}: {span[0].AsRedisValue()} {message} ");
122+
_log?.WriteLine($"[{client}] => {(char)message.Type} ({message.Type}, {message.Span.Length}): {span[0].AsRedisValue()}");
111123
}
112124
else
113125
{
114-
_log?.WriteLine($"Client {client.Id}: {message}");
126+
_log?.WriteLine($"[{client}] => {(char)message.Type} ({message.Type})");
115127
}
116128

117129
base.OnOutOfBand(client, message);
118130
}
119131

120132
public override TypedRedisValue OnUnknownCommand(in RedisClient client, in RedisRequest request, ReadOnlySpan<byte> command)
121133
{
122-
_log?.WriteLine($"[{client.Id}] unknown command: {Encoding.ASCII.GetString(command)}");
134+
_log?.WriteLine($"[{client}] unknown command: {Encoding.ASCII.GetString(command)}");
123135
return base.OnUnknownCommand(in client, in request, command);
124136
}
125137

tests/StackExchange.Redis.Tests/PubSubTests.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@ public class PubSubTests(ITestOutputHelper output, SharedConnectionFixture fixtu
1616
{
1717
}
1818

19-
/*
2019
[RunPerProtocol]
2120
public class InProcPubSubTests(ITestOutputHelper output, InProcServerFixture fixture)
2221
: PubSubTestBase(output, null, fixture)
2322
{
24-
protected override bool UseDedicatedInProcessServer => false;
23+
protected override bool UseDedicatedInProcessServer => true;
24+
protected override bool UseInProcessServerPubSub => true;
2525
}
26-
*/
2726

2827
[RunPerProtocol]
2928
public abstract class PubSubTestBase(

tests/StackExchange.Redis.Tests/TestBase.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,7 @@ protected static async Task UntilConditionAsync(TimeSpan maxWaitTime, Func<bool>
586586

587587
// simplified usage to get an interchangeable dedicated vs shared in-process server, useful for debugging
588588
protected virtual bool UseDedicatedInProcessServer => false; // use the shared server by default
589+
protected virtual bool UseInProcessServerPubSub => false;
589590
internal ClientFactory ConnectFactory(bool allowAdmin = false, string? channelPrefix = null, bool shared = true)
590591
{
591592
if (UseDedicatedInProcessServer)
@@ -622,8 +623,9 @@ public IInternalConnectionMultiplexer CreateClient()
622623
{
623624
if (_server is not null)
624625
{
625-
var config = _server.GetClientConfig();
626+
var config = _server.GetClientConfig(withPubSub: _testBase.UseInProcessServerPubSub);
626627
config.AllowAdmin = _allowAdmin;
628+
config.Protocol = TestContext.Current.GetProtocol();
627629
if (_channelPrefix is not null)
628630
{
629631
config.ChannelPrefix = RedisChannel.Literal(_channelPrefix);

toys/StackExchange.Redis.Server/RedisClient.Output.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System;
2-
using System.Buffers;
32
using System.IO.Pipelines;
43
using System.Threading;
54
using System.Threading.Channels;
@@ -85,13 +84,18 @@ public async Task WriteOutputAsync(PipeWriter writer, CancellationToken cancella
8584
var reader = _replies.Reader;
8685
do
8786
{
87+
int count = 0;
8888
while (reader.TryRead(out var message))
8989
{
90-
await RespServer.WriteResponseAsync(this, writer, message, Protocol);
90+
RespServer.WriteResponse(this, writer, message, Protocol);
9191
message.Recycle();
92+
count++;
9293
}
9394

94-
await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
95+
if (count != 0)
96+
{
97+
await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
98+
}
9599
}
96100
// await more data
97101
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));

toys/StackExchange.Redis.Server/RedisClient.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,57 @@
99
namespace StackExchange.Redis.Server
1010
{
1111
public partial class RedisClient(RedisServer.Node node) : IDisposable
12+
#pragma warning disable SA1001
13+
#if NET6_0_OR_GREATER
14+
, ISpanFormattable
15+
#else
16+
, IFormattable
17+
#endif
18+
#pragma warning restore SA1001
1219
{
1320
private RespScanState _readState;
1421

22+
public override string ToString()
23+
{
24+
if (Protocol is RedisProtocol.Resp2)
25+
{
26+
return IsSubscriber ? $"{Id}:sub" : Id.ToString();
27+
}
28+
return $"{Id}:r3";
29+
}
30+
31+
string IFormattable.ToString(string format, IFormatProvider formatProvider) => ToString();
32+
#if NET6_0_OR_GREATER
33+
public bool TryFormat(Span<char> destination, out int charsWritten, ReadOnlySpan<char> format, IFormatProvider provider)
34+
{
35+
if (!Id.TryFormat(destination, out charsWritten))
36+
{
37+
return false;
38+
}
39+
destination = destination.Slice(charsWritten);
40+
if (Protocol is RedisProtocol.Resp2)
41+
{
42+
if (IsSubscriber)
43+
{
44+
if (!":sub".AsSpan().TryCopyTo(destination))
45+
{
46+
return false;
47+
}
48+
charsWritten += 4;
49+
}
50+
}
51+
else
52+
{
53+
if (!":r3".AsSpan().TryCopyTo(destination))
54+
{
55+
return false;
56+
}
57+
charsWritten += 3;
58+
}
59+
return true;
60+
}
61+
#endif
62+
1563
public bool TryReadRequest(ReadOnlySequence<byte> data, out long consumed)
1664
{
1765
// skip past data we've already read

toys/StackExchange.Redis.Server/RedisRequest.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,16 @@ internal RedisRequest(scoped in RespReader reader, ref byte[] commandLease)
7676
if (Count == 0)
7777
{
7878
Command = s_EmptyCommand;
79+
KnownCommand = RedisCommand.UNKNOWN;
7980
}
8081
else
8182
{
8283
local.MoveNextScalar();
84+
unsafe
85+
{
86+
KnownCommand = local.TryParseScalar(&RedisCommandMetadata.TryParseCI, out RedisCommand cmd)
87+
? cmd : RedisCommand.UNKNOWN;
88+
}
8389
var len = local.ScalarLength();
8490
if (len > commandLease.Length)
8591
{
@@ -94,6 +100,8 @@ internal RedisRequest(scoped in RespReader reader, ref byte[] commandLease)
94100
}
95101
}
96102

103+
internal RedisCommand KnownCommand { get; }
104+
97105
internal static byte[] GetLease() => ArrayPool<byte>.Shared.Rent(16);
98106
internal static void ReleaseLease(ref byte[] commandLease)
99107
{
@@ -123,7 +131,7 @@ public RedisKey GetKey(int index, KeyFlags flags = KeyFlags.None)
123131
}
124132

125133
internal RedisChannel GetChannel(int index, RedisChannel.RedisChannelOptions options)
126-
=> throw new NotImplementedException();
134+
=> GetReader(index).ReadRedisChannel(options);
127135

128136
internal RedisRequest(ReadOnlySpan<byte> payload, ref byte[] commandLease) : this(new RespReader(payload), ref commandLease) { }
129137
internal RedisRequest(in ReadOnlySequence<byte> payload, ref byte[] commandLease) : this(new RespReader(payload), ref commandLease) { }

toys/StackExchange.Redis.Server/RedisServer.PubSub.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Collections.Generic;
44
using System.Text.RegularExpressions;
55
using System.Threading;
6+
using RESPite.Messages;
67

78
namespace StackExchange.Redis.Server;
89

@@ -182,7 +183,7 @@ public int Publish(in RedisChannel channel, in RedisValue value)
182183
// we can do simple and sharded equality lookups directly
183184
if ((simpleCount + shardedCount) != 0 && subs.TryGetValue(channel, out _))
184185
{
185-
var msg = TypedRedisValue.Rent(3, out var span, ResultType.Push);
186+
var msg = TypedRedisValue.Rent(3, out var span, PushKind);
186187
span[0] = TypedRedisValue.BulkString(channel.IsSharded ? "smessage" : "message");
187188
span[1] = TypedRedisValue.BulkString(channel);
188189
span[2] = TypedRedisValue.BulkString(value);
@@ -198,7 +199,7 @@ public int Publish(in RedisChannel channel, in RedisValue value)
198199
{
199200
if (pair.Key.IsPattern && pair.Value is { } glob && glob.IsMatch(channelName))
200201
{
201-
var msg = TypedRedisValue.Rent(4, out var span, ResultType.Push);
202+
var msg = TypedRedisValue.Rent(4, out var span, PushKind);
202203
span[0] = TypedRedisValue.BulkString("pmessage");
203204
span[1] = TypedRedisValue.BulkString(pair.Key);
204205
span[2] = TypedRedisValue.BulkString(channel);
@@ -213,11 +214,15 @@ public int Publish(in RedisChannel channel, in RedisValue value)
213214
return count;
214215
}
215216

216-
private void SendMessage(string kind, RedisChannel channel, int count)
217+
public bool IsResp2 => Protocol is RedisProtocol.Resp2;
218+
219+
public RespPrefix PushKind => IsResp2 ? RespPrefix.Array : RespPrefix.Push;
220+
221+
private void SendSubUnsubMessage(string kind, RedisChannel channel, int count)
217222
{
218223
if (Node is { } node)
219224
{
220-
var reply = TypedRedisValue.Rent(3, out var span, ResultType.Push);
225+
var reply = TypedRedisValue.Rent(3, out var span, PushKind);
221226
span[0] = TypedRedisValue.BulkString(kind);
222227
span[1] = TypedRedisValue.BulkString((byte[])channel);
223228
span[2] = TypedRedisValue.Integer(count);
@@ -239,7 +244,7 @@ internal void Subscribe(RedisChannel channel)
239244
: channel.IsPattern ? ++patternCount
240245
: ++simpleCount;
241246
}
242-
SendMessage(
247+
SendSubUnsubMessage(
243248
channel.IsSharded ? "ssubscribe"
244249
: channel.IsPattern ? "psubscribe"
245250
: "subscribe",
@@ -273,7 +278,7 @@ internal void Unsubscribe(RedisChannel channel)
273278
: channel.IsPattern ? --patternCount
274279
: --simpleCount;
275280
}
276-
SendMessage(
281+
SendSubUnsubMessage(
277282
channel.IsSharded ? "sunsubscribe"
278283
: channel.IsPattern ? "punsubscribe"
279284
: "unsubscribe",
@@ -332,7 +337,7 @@ internal void UnsubscribeAll(RedisCommand cmd)
332337
}
333338
foreach (var key in remove.AsSpan(0, count))
334339
{
335-
SendMessage(msg, key, 0);
340+
SendSubUnsubMessage(msg, key, 0);
336341
}
337342
ArrayPool<RedisChannel>.Shared.Return(remove);
338343
}

0 commit comments

Comments
 (0)