Skip to content

Commit bc086f3

Browse files
authored
Implement pub/sub in the toy server (#3027)
* likely looking pub/sub * pub/sub test fixes * cleanup on outbound write * fix publish return val * ping/subs * fix pub/sub command check * disable InProcPubSubTests for now, server is brittle
1 parent a098426 commit bc086f3

File tree

9 files changed

+673
-121
lines changed

9 files changed

+673
-121
lines changed

tests/StackExchange.Redis.Tests/InProcessTestServer.cs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,14 @@ public InProcessTestServer(ITestOutputHelper? log = null)
2525
Tunnel = new InProcTunnel(this);
2626
}
2727

28-
public Task<ConnectionMultiplexer> ConnectAsync(bool withPubSub = false, TextWriter? log = null)
29-
=> ConnectionMultiplexer.ConnectAsync(GetClientConfig(withPubSub), log);
28+
public Task<ConnectionMultiplexer> ConnectAsync(TextWriter? log = null)
29+
=> ConnectionMultiplexer.ConnectAsync(GetClientConfig(), log);
3030

31-
public ConfigurationOptions GetClientConfig(bool withPubSub = false)
31+
public ConfigurationOptions GetClientConfig()
3232
{
3333
var commands = GetCommands();
34-
if (!withPubSub)
35-
{
36-
commands.Remove(nameof(RedisCommand.SUBSCRIBE));
37-
commands.Remove(nameof(RedisCommand.PSUBSCRIBE));
38-
commands.Remove(nameof(RedisCommand.SSUBSCRIBE));
39-
commands.Remove(nameof(RedisCommand.UNSUBSCRIBE));
40-
commands.Remove(nameof(RedisCommand.PUNSUBSCRIBE));
41-
commands.Remove(nameof(RedisCommand.SUNSUBSCRIBE));
42-
commands.Remove(nameof(RedisCommand.PUBLISH));
43-
commands.Remove(nameof(RedisCommand.SPUBLISH));
44-
}
45-
// transactions don't work yet
34+
35+
// transactions don't work yet (needs v3 buffer features)
4636
commands.Remove(nameof(RedisCommand.MULTI));
4737
commands.Remove(nameof(RedisCommand.EXEC));
4838
commands.Remove(nameof(RedisCommand.DISCARD));
@@ -82,6 +72,22 @@ protected override void OnMoved(RedisClient client, int hashSlot, Node node)
8272
base.OnMoved(client, hashSlot, node);
8373
}
8474

75+
protected override void OnOutOfBand(RedisClient client, TypedRedisValue message)
76+
{
77+
if (message.IsAggregate
78+
&& message.Span is { IsEmpty: false } span
79+
&& !span[0].IsAggregate)
80+
{
81+
_log?.WriteLine($"Client {client.Id}: {span[0].AsRedisValue()} {message} ");
82+
}
83+
else
84+
{
85+
_log?.WriteLine($"Client {client.Id}: {message}");
86+
}
87+
88+
base.OnOutOfBand(client, message);
89+
}
90+
8591
public override TypedRedisValue OnUnknownCommand(in RedisClient client, in RedisRequest request, ReadOnlySpan<byte> command)
8692
{
8793
_log?.WriteLine($"[{client.Id}] unknown command: {Encoding.ASCII.GetString(command)}");

tests/StackExchange.Redis.Tests/PubSubTests.cs

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,31 @@
1111
namespace StackExchange.Redis.Tests;
1212

1313
[RunPerProtocol]
14-
public class PubSubTests(ITestOutputHelper output, SharedConnectionFixture fixture) : TestBase(output, fixture)
14+
public class PubSubTests(ITestOutputHelper output, SharedConnectionFixture fixture)
15+
: PubSubTestBase(output, fixture, null)
16+
{
17+
}
18+
19+
/*
20+
[RunPerProtocol]
21+
public class InProcPubSubTests(ITestOutputHelper output, InProcServerFixture fixture)
22+
: PubSubTestBase(output, null, fixture)
23+
{
24+
protected override bool UseDedicatedInProcessServer => false;
25+
}
26+
*/
27+
28+
[RunPerProtocol]
29+
public abstract class PubSubTestBase(
30+
ITestOutputHelper output,
31+
SharedConnectionFixture? connection,
32+
InProcServerFixture? server)
33+
: TestBase(output, connection, server)
1534
{
1635
[Fact]
1736
public async Task ExplicitPublishMode()
1837
{
19-
await using var conn = Create(channelPrefix: "foo:", log: Writer);
38+
await using var conn = ConnectFactory(channelPrefix: "foo:");
2039

2140
var pub = conn.GetSubscriber();
2241
int a = 0, b = 0, c = 0, d = 0;
@@ -54,9 +73,9 @@ await UntilConditionAsync(
5473
[InlineData("Foo:", true, "f")]
5574
public async Task TestBasicPubSub(string? channelPrefix, bool wildCard, string breaker)
5675
{
57-
await using var conn = Create(channelPrefix: channelPrefix, shared: false, log: Writer);
76+
await using var conn = ConnectFactory(channelPrefix: channelPrefix, shared: false);
5877

59-
var pub = GetAnyPrimary(conn);
78+
var pub = GetAnyPrimary(conn.DefaultClient);
6079
var sub = conn.GetSubscriber();
6180
await PingAsync(pub, sub).ForAwait();
6281
HashSet<string?> received = [];
@@ -139,10 +158,10 @@ public async Task TestBasicPubSub(string? channelPrefix, bool wildCard, string b
139158
[Fact]
140159
public async Task TestBasicPubSubFireAndForget()
141160
{
142-
await using var conn = Create(shared: false, log: Writer);
161+
await using var conn = ConnectFactory(shared: false);
143162

144-
var profiler = conn.AddProfiler();
145-
var pub = GetAnyPrimary(conn);
163+
var profiler = conn.DefaultClient.AddProfiler();
164+
var pub = GetAnyPrimary(conn.DefaultClient);
146165
var sub = conn.GetSubscriber();
147166

148167
RedisChannel key = RedisChannel.Literal(Me() + Guid.NewGuid());
@@ -214,9 +233,9 @@ private async Task PingAsync(IServer pub, ISubscriber sub, int times = 1)
214233
[Fact]
215234
public async Task TestPatternPubSub()
216235
{
217-
await using var conn = Create(shared: false, log: Writer);
236+
await using var conn = ConnectFactory(shared: false);
218237

219-
var pub = GetAnyPrimary(conn);
238+
var pub = GetAnyPrimary(conn.DefaultClient);
220239
var sub = conn.GetSubscriber();
221240

222241
HashSet<string?> received = [];
@@ -273,7 +292,7 @@ public async Task TestPatternPubSub()
273292
[Fact]
274293
public async Task TestPublishWithNoSubscribers()
275294
{
276-
await using var conn = Create();
295+
await using var conn = ConnectFactory();
277296

278297
var sub = conn.GetSubscriber();
279298
#pragma warning disable CS0618
@@ -285,7 +304,7 @@ public async Task TestPublishWithNoSubscribers()
285304
public async Task TestMassivePublishWithWithoutFlush_Local()
286305
{
287306
Skip.UnlessLongRunning();
288-
await using var conn = Create();
307+
await using var conn = ConnectFactory();
289308

290309
var sub = conn.GetSubscriber();
291310
TestMassivePublish(sub, Me(), "local");
@@ -335,7 +354,7 @@ private void TestMassivePublish(ISubscriber sub, string channel, string caption)
335354
[Fact]
336355
public async Task SubscribeAsyncEnumerable()
337356
{
338-
await using var conn = Create(syncTimeout: 20000, shared: false, log: Writer);
357+
await using var conn = ConnectFactory(shared: false);
339358

340359
var sub = conn.GetSubscriber();
341360
RedisChannel channel = RedisChannel.Literal(Me());
@@ -370,7 +389,7 @@ public async Task SubscribeAsyncEnumerable()
370389
[Fact]
371390
public async Task PubSubGetAllAnyOrder()
372391
{
373-
await using var conn = Create(syncTimeout: 20000, shared: false, log: Writer);
392+
await using var conn = ConnectFactory(shared: false);
374393

375394
var sub = conn.GetSubscriber();
376395
RedisChannel channel = RedisChannel.Literal(Me());
@@ -625,9 +644,10 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Async()
625644
[Fact]
626645
public async Task TestPublishWithSubscribers()
627646
{
628-
await using var connA = Create(shared: false, log: Writer);
629-
await using var connB = Create(shared: false, log: Writer);
630-
await using var connPub = Create();
647+
await using var pair = ConnectFactory(shared: false);
648+
await using var connA = pair.DefaultClient;
649+
await using var connB = pair.CreateClient();
650+
await using var connPub = pair.CreateClient();
631651

632652
var channel = Me();
633653
var listenA = connA.GetSubscriber();
@@ -652,9 +672,10 @@ public async Task TestPublishWithSubscribers()
652672
[Fact]
653673
public async Task TestMultipleSubscribersGetMessage()
654674
{
655-
await using var connA = Create(shared: false, log: Writer);
656-
await using var connB = Create(shared: false, log: Writer);
657-
await using var connPub = Create();
675+
await using var pair = ConnectFactory(shared: false);
676+
await using var connA = pair.DefaultClient;
677+
await using var connB = pair.CreateClient();
678+
await using var connPub = pair.CreateClient();
658679

659680
var channel = RedisChannel.Literal(Me());
660681
var listenA = connA.GetSubscriber();
@@ -682,7 +703,7 @@ public async Task TestMultipleSubscribersGetMessage()
682703
[Fact]
683704
public async Task Issue38()
684705
{
685-
await using var conn = Create(log: Writer);
706+
await using var conn = ConnectFactory();
686707

687708
var sub = conn.GetSubscriber();
688709
int count = 0;
@@ -717,9 +738,10 @@ public async Task Issue38()
717738
[Fact]
718739
public async Task TestPartialSubscriberGetMessage()
719740
{
720-
await using var connA = Create();
721-
await using var connB = Create();
722-
await using var connPub = Create();
741+
await using var pair = ConnectFactory();
742+
await using var connA = pair.DefaultClient;
743+
await using var connB = pair.CreateClient();
744+
await using var connPub = pair.CreateClient();
723745

724746
int gotA = 0, gotB = 0;
725747
var listenA = connA.GetSubscriber();
@@ -750,8 +772,9 @@ public async Task TestPartialSubscriberGetMessage()
750772
[Fact]
751773
public async Task TestSubscribeUnsubscribeAndSubscribeAgain()
752774
{
753-
await using var connPub = Create();
754-
await using var connSub = Create();
775+
await using var pair = ConnectFactory();
776+
await using var connPub = pair.DefaultClient;
777+
await using var connSub = pair.CreateClient();
755778

756779
var prefix = Me();
757780
var pub = connPub.GetSubscriber();

tests/StackExchange.Redis.Tests/TestBase.cs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ namespace StackExchange.Redis.Tests;
1616

1717
public abstract class TestBase : IDisposable
1818
{
19-
private ITestOutputHelper Output { get; }
19+
protected ITestOutputHelper Output { get; }
2020
protected TextWriterOutputHelper Writer { get; }
2121
protected virtual string GetConfiguration()
2222
{
@@ -585,4 +585,74 @@ protected static async Task UntilConditionAsync(TimeSpan maxWaitTime, Func<bool>
585585
spent += wait;
586586
}
587587
}
588+
589+
// simplified usage to get an interchangeable dedicated vs shared in-process server, useful for debugging
590+
protected virtual bool UseDedicatedInProcessServer => false; // use the shared server by default
591+
internal ClientFactory ConnectFactory(bool allowAdmin = false, string? channelPrefix = null, bool shared = true)
592+
{
593+
if (UseDedicatedInProcessServer)
594+
{
595+
var server = new InProcessTestServer(Output);
596+
return new ClientFactory(this, allowAdmin, channelPrefix, shared, server);
597+
}
598+
return new ClientFactory(this, allowAdmin, channelPrefix, shared, null);
599+
}
600+
601+
internal sealed class ClientFactory : IDisposable, IAsyncDisposable
602+
{
603+
private readonly TestBase _testBase;
604+
private readonly bool _allowAdmin;
605+
private readonly string? _channelPrefix;
606+
private readonly bool _shared;
607+
private readonly InProcessTestServer? _server;
608+
private IInternalConnectionMultiplexer? _defaultClient;
609+
610+
internal ClientFactory(TestBase testBase, bool allowAdmin, string? channelPrefix, bool shared, InProcessTestServer? server)
611+
{
612+
_testBase = testBase;
613+
_allowAdmin = allowAdmin;
614+
_channelPrefix = channelPrefix;
615+
_shared = shared;
616+
_server = server;
617+
}
618+
619+
public IInternalConnectionMultiplexer DefaultClient => _defaultClient ??= CreateClient();
620+
621+
public InProcessTestServer? Server => _server;
622+
623+
public IInternalConnectionMultiplexer CreateClient()
624+
{
625+
if (_server is not null)
626+
{
627+
var config = _server.GetClientConfig();
628+
config.AllowAdmin = _allowAdmin;
629+
if (_channelPrefix is not null)
630+
{
631+
config.ChannelPrefix = RedisChannel.Literal(_channelPrefix);
632+
}
633+
return ConnectionMultiplexer.ConnectAsync(config).Result;
634+
}
635+
return _testBase.Create(allowAdmin: _allowAdmin, channelPrefix: _channelPrefix, shared: _shared);
636+
}
637+
638+
public IDatabase GetDatabase(int db = -1) => DefaultClient.GetDatabase(db);
639+
640+
public ISubscriber GetSubscriber() => DefaultClient.GetSubscriber();
641+
642+
public void Dispose()
643+
{
644+
_server?.Dispose();
645+
_defaultClient?.Dispose();
646+
}
647+
648+
public ValueTask DisposeAsync()
649+
{
650+
_server?.Dispose();
651+
if (_defaultClient is not null)
652+
{
653+
return _defaultClient.DisposeAsync();
654+
}
655+
return default;
656+
}
657+
}
588658
}

0 commit comments

Comments
 (0)