Skip to content

Commit bc7124d

Browse files
committed
fix toy server pub/sub (always respond)
1 parent fdf1c52 commit bc7124d

2 files changed

Lines changed: 44 additions & 12 deletions

File tree

tests/StackExchange.Redis.Tests/PubSubTests.cs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ public async Task Ping()
160160
await using var conn = ConnectFactory(shared: false);
161161
var pub = GetAnyPrimary(conn.DefaultClient);
162162
var sub = conn.GetSubscriber();
163+
164+
await PingAsync(pub, sub, 5).ForAwait();
163165
await sub.SubscribeAsync(RedisChannel.Literal(Me()), (_, __) => { }); // to ensure we're in subscriber mode
164166
await PingAsync(pub, sub, 5).ForAwait();
165167
}
@@ -232,10 +234,28 @@ private async Task PingAsync(IServer pub, ISubscriber sub, int times = 1)
232234
// way to prove that is to use TPL objects
233235
var subTask = sub.PingAsync();
234236
var pubTask = pub.PingAsync();
235-
await Task.WhenAll(subTask, pubTask).ForAwait();
237+
try
238+
{
239+
await Task.WhenAll(subTask, pubTask).ForAwait();
240+
}
241+
catch (TimeoutException ex)
242+
{
243+
throw new TimeoutException($"Timeout; sub: {GetState(subTask)}, pub: {GetState(pubTask)}", ex);
244+
}
245+
246+
Log($"sub: {GetState(subTask)}, pub: {GetState(pubTask)}");
236247

237-
Log($"Sub PING time: {subTask.Result.TotalMilliseconds} ms");
238-
Log($"Pub PING time: {pubTask.Result.TotalMilliseconds} ms");
248+
static string GetState(Task<TimeSpan> pending)
249+
{
250+
var status = pending.Status;
251+
return status switch
252+
{
253+
TaskStatus.RanToCompletion => $"{status} in {pending.Result.TotalMilliseconds:###,##0.0}ms)",
254+
TaskStatus.Faulted when pending.Exception is { InnerExceptions.Count:1 } ae => $"{status}: {ae.InnerExceptions[0].Message}",
255+
TaskStatus.Faulted => $"{status}: {pending.Exception?.Message}",
256+
_ => status.ToString(),
257+
};
258+
}
239259
}
240260
}
241261

toys/StackExchange.Redis.Server/RedisServer.PubSub.cs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -231,18 +231,32 @@ private void SendSubUnsubMessage(string kind, RedisChannel channel, int count)
231231
}
232232
}
233233

234+
private ref int GetCountField(RedisChannel channel)
235+
=> ref channel.IsSharded ? ref shardedCount
236+
: ref channel.IsPattern ? ref patternCount
237+
: ref simpleCount;
238+
234239
internal void Subscribe(RedisChannel channel)
235240
{
236241
Regex glob = channel.IsPattern ? BuildGlob(channel) : null;
237242
var subs = Subscriptions;
238243
int count;
244+
ref int field = ref GetCountField(channel);
239245
lock (subs)
240246
{
241-
if (subs.ContainsKey(channel)) return;
242-
subs.Add(channel, glob);
243-
count = channel.IsSharded ? ++shardedCount
244-
: channel.IsPattern ? ++patternCount
245-
: ++simpleCount;
247+
#if NET
248+
count = subs.TryAdd(channel, glob) ? ++field : field;
249+
#else
250+
if (subs.ContainsKey(channel))
251+
{
252+
count = field;
253+
}
254+
else
255+
{
256+
subs.Add(channel, glob);
257+
count = ++field;
258+
}
259+
#endif
246260
}
247261
SendSubUnsubMessage(
248262
channel.IsSharded ? "ssubscribe"
@@ -271,12 +285,10 @@ internal void Unsubscribe(RedisChannel channel)
271285
var subs = SubscriptionsIfAny;
272286
if (subs is null) return;
273287
int count;
288+
ref int field = ref GetCountField(channel);
274289
lock (subs)
275290
{
276-
if (!subs.Remove(channel)) return;
277-
count = channel.IsSharded ? --shardedCount
278-
: channel.IsPattern ? --patternCount
279-
: --simpleCount;
291+
count = subs.Remove(channel) ? --field : field;
280292
}
281293
SendSubUnsubMessage(
282294
channel.IsSharded ? "sunsubscribe"

0 commit comments

Comments
 (0)