Skip to content

Commit ff2f62f

Browse files
committed
#7 Add & Remove Multiple SignalR Groups before StateLock
1 parent a1bc529 commit ff2f62f

16 files changed

Lines changed: 675 additions & 124 deletions

File tree

Directory.Build.props

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
<RepositoryUrl>https://github.com/managedcode/Orleans.SignalR</RepositoryUrl>
2727
<PackageProjectUrl>https://github.com/managedcode/Orleans.SignalR</PackageProjectUrl>
2828
<Product>Managed Code - Orleans SignalR</Product>
29-
<Version>10.0.5</Version>
30-
<PackageVersion>10.0.5</PackageVersion>
29+
<Version>10.1.0</Version>
30+
<PackageVersion>10.1.0</PackageVersion>
3131

3232
</PropertyGroup>
3333
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">

ManagedCode.Orleans.SignalR.Client/Extensions/OrleansDependencyInjectionExtensions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using ManagedCode.Orleans.SignalR.Core.Config;
3+
using ManagedCode.Orleans.SignalR.Core.HubContext;
34
using ManagedCode.Orleans.SignalR.Core.SignalR;
45
using Microsoft.AspNetCore.SignalR;
56
using Microsoft.Extensions.DependencyInjection;
@@ -21,6 +22,7 @@ public static ISignalRServerBuilder AddOrleans(this ISignalRServerBuilder signal
2122
{
2223
signalrBuilder.Services.AddOptions<OrleansSignalROptions>().Configure(options);
2324
signalrBuilder.Services.AddSingleton(typeof(HubLifetimeManager<>), typeof(OrleansHubLifetimeManager<>));
25+
signalrBuilder.Services.AddSingleton(typeof(IOrleansGroupManager<>), typeof(OrleansGroupManager<>));
2426
return signalrBuilder;
2527
}
2628
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using ManagedCode.Orleans.SignalR.Core.HubContext;
6+
using Microsoft.AspNetCore.SignalR;
7+
using Microsoft.Extensions.DependencyInjection;
8+
9+
namespace ManagedCode.Orleans.SignalR.Client.Extensions;
10+
11+
public static class OrleansHubGroupExtensions
12+
{
13+
public static Task AddToGroupsAsync<THub>(this THub hub, IReadOnlyList<string> groupNames, CancellationToken cancellationToken = default)
14+
where THub : Hub
15+
{
16+
ArgumentNullException.ThrowIfNull(hub);
17+
ArgumentNullException.ThrowIfNull(groupNames);
18+
19+
var groupManager = ResolveGroupManager(hub);
20+
return groupManager.AddToGroupsAsync(hub.Context.ConnectionId, groupNames, cancellationToken);
21+
}
22+
23+
public static Task RemoveFromGroupsAsync<THub>(this THub hub, IReadOnlyList<string> groupNames, CancellationToken cancellationToken = default)
24+
where THub : Hub
25+
{
26+
ArgumentNullException.ThrowIfNull(hub);
27+
ArgumentNullException.ThrowIfNull(groupNames);
28+
29+
var groupManager = ResolveGroupManager(hub);
30+
return groupManager.RemoveFromGroupsAsync(hub.Context.ConnectionId, groupNames, cancellationToken);
31+
}
32+
33+
private static IOrleansGroupManager<THub> ResolveGroupManager<THub>(THub hub) where THub : Hub
34+
{
35+
var serviceProvider = (hub.Context.GetHttpContext()?.RequestServices) ?? throw new InvalidOperationException("Unable to resolve SignalR services for the current hub connection.");
36+
37+
return serviceProvider.GetRequiredService<IOrleansGroupManager<THub>>();
38+
}
39+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System.Collections.Generic;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Microsoft.AspNetCore.SignalR;
5+
6+
namespace ManagedCode.Orleans.SignalR.Core.HubContext;
7+
8+
public interface IOrleansGroupManager<THub> : IGroupManager where THub : Hub
9+
{
10+
Task AddToGroupsAsync(string connectionId, IReadOnlyList<string> groupNames, CancellationToken cancellationToken = default);
11+
12+
Task RemoveFromGroupsAsync(string connectionId, IReadOnlyList<string> groupNames, CancellationToken cancellationToken = default);
13+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using ManagedCode.Orleans.SignalR.Core.SignalR;
7+
using Microsoft.AspNetCore.SignalR;
8+
9+
namespace ManagedCode.Orleans.SignalR.Core.HubContext;
10+
11+
public sealed class OrleansGroupManager<THub>(HubLifetimeManager<THub> lifetimeManager) : IOrleansGroupManager<THub> where THub : Hub
12+
{
13+
public Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
14+
{
15+
return lifetimeManager.AddToGroupAsync(connectionId, groupName, cancellationToken);
16+
}
17+
18+
public Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
19+
{
20+
return lifetimeManager.RemoveFromGroupAsync(connectionId, groupName, cancellationToken);
21+
}
22+
23+
public Task AddToGroupsAsync(string connectionId, IReadOnlyList<string> groupNames, CancellationToken cancellationToken = default)
24+
{
25+
ArgumentNullException.ThrowIfNull(groupNames);
26+
27+
if (lifetimeManager is OrleansHubLifetimeManager<THub> orleansLifetimeManager)
28+
{
29+
return orleansLifetimeManager.AddToGroupsAsync(connectionId, groupNames, cancellationToken);
30+
}
31+
32+
return Task.WhenAll(groupNames.Select(groupName => lifetimeManager.AddToGroupAsync(connectionId, groupName, cancellationToken)));
33+
}
34+
35+
public Task RemoveFromGroupsAsync(string connectionId, IReadOnlyList<string> groupNames, CancellationToken cancellationToken = default)
36+
{
37+
ArgumentNullException.ThrowIfNull(groupNames);
38+
39+
if (lifetimeManager is OrleansHubLifetimeManager<THub> orleansLifetimeManager)
40+
{
41+
return orleansLifetimeManager.RemoveFromGroupsAsync(connectionId, groupNames, cancellationToken);
42+
}
43+
44+
return Task.WhenAll(groupNames.Select(groupName => lifetimeManager.RemoveFromGroupAsync(connectionId, groupName, cancellationToken)));
45+
}
46+
}

ManagedCode.Orleans.SignalR.Core/Interfaces/ISignalRGroupCoordinatorGrain.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,15 @@ public interface ISignalRGroupCoordinatorGrain : IGrainWithStringKey
3030
[AlwaysInterleave]
3131
Task AddConnectionToGroup(string groupName, string connectionId, ISignalRObserver observer);
3232

33+
[AlwaysInterleave]
34+
Task<int[]> AddConnectionToGroups(string[] groupNames, string connectionId, ISignalRObserver observer);
35+
3336
[AlwaysInterleave]
3437
Task RemoveConnectionFromGroup(string groupName, string connectionId, ISignalRObserver observer);
3538

39+
[AlwaysInterleave]
40+
Task<int[]> RemoveConnectionFromGroups(string[] groupNames, string connectionId, ISignalRObserver observer);
41+
3642
[AlwaysInterleave]
3743
Task NotifyGroupRemoved(string groupName);
3844
}

ManagedCode.Orleans.SignalR.Core/Interfaces/ISignalRGroupPartitionGrain.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,15 @@ public interface ISignalRGroupPartitionGrain : IGrainWithIntegerKey, IObserverCo
1818
[AlwaysInterleave]
1919
Task AddConnectionToGroup(string groupName, string connectionId, ISignalRObserver observer);
2020

21+
[AlwaysInterleave]
22+
Task<string[]> AddConnectionToGroups(string connectionId, string[] groupNames, ISignalRObserver observer);
23+
2124
[AlwaysInterleave]
2225
Task RemoveConnectionFromGroup(string groupName, string connectionId, ISignalRObserver observer);
2326

27+
[AlwaysInterleave]
28+
Task<string[]> RemoveConnectionFromGroups(string connectionId, string[] groupNames, ISignalRObserver observer);
29+
2430
[ReadOnly]
2531
[AlwaysInterleave]
2632
Task<bool> HasConnection(string connectionId);

ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs

Lines changed: 104 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -349,68 +349,124 @@ public override Task SendUsersAsync(IReadOnlyList<string> userIds, string method
349349

350350
public override async Task AddToGroupAsync(string connectionId, string groupName,
351351
CancellationToken cancellationToken = new())
352+
{
353+
await AddToGroupsAsync(connectionId, [groupName], cancellationToken);
354+
}
355+
356+
public async Task AddToGroupsAsync(string connectionId, IReadOnlyList<string> groupNames,
357+
CancellationToken cancellationToken = default)
352358
{
353359
var subscription = GetSubscription(connectionId);
354360
if (subscription is null)
355361
{
356362
return;
357363
}
358364

365+
var uniqueGroupNames = GetUniqueGroupNames(groupNames);
366+
if (uniqueGroupNames.Length == 0)
367+
{
368+
return;
369+
}
370+
359371
if (_orleansSignalOptions.Value.GroupPartitionCount > 1)
360372
{
361373
var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain<THub>(_clusterClient);
362-
var partitionId = await Task.Run(() => coordinatorGrain.GetPartitionForGroup(groupName), cancellationToken);
363-
var partitionGrain = NameHelperGenerator.GetGroupPartitionGrain<THub>(_clusterClient, partitionId);
364-
var hubKey = NameHelperGenerator.CleanString(typeof(THub).FullName!);
374+
var partitionIds = await Task.Run(
375+
() => coordinatorGrain.AddConnectionToGroups(uniqueGroupNames, connectionId, subscription.Reference),
376+
cancellationToken);
365377

366-
await Task.Run(() => partitionGrain.EnsureInitialized(hubKey), cancellationToken);
367-
368-
subscription.AddGrain(partitionGrain);
369-
await Task.Run(() => partitionGrain.AddConnection(connectionId, subscription.Reference), cancellationToken);
370-
await Task.Run(() => coordinatorGrain.AddConnectionToGroup(groupName, connectionId, subscription.Reference), cancellationToken);
378+
foreach (var partitionId in partitionIds)
379+
{
380+
var partitionGrain = NameHelperGenerator.GetGroupPartitionGrain<THub>(_clusterClient, partitionId);
381+
subscription.AddGrain(partitionGrain);
382+
}
371383
}
372384
else
373385
{
374-
var groupGrain = NameHelperGenerator.GetSignalRGroupGrain<THub>(_clusterClient, groupName);
375-
await Task.Run(() => groupGrain.AddConnection(connectionId, subscription.Reference), cancellationToken);
376-
subscription.AddGrain(groupGrain);
386+
var groupGrains = uniqueGroupNames
387+
.Select(groupName => NameHelperGenerator.GetSignalRGroupGrain<THub>(_clusterClient, groupName))
388+
.Distinct()
389+
.ToArray();
390+
391+
foreach (var groupGrain in groupGrains)
392+
{
393+
subscription.AddGrain(groupGrain);
394+
}
395+
396+
var tasks = groupGrains
397+
.Select(groupGrain => Task.Run(() => groupGrain.AddConnection(connectionId, subscription.Reference), cancellationToken))
398+
.ToArray();
399+
400+
if (tasks.Length > 0)
401+
{
402+
await Task.WhenAll(tasks);
403+
}
377404
}
378405

379406
await UpdateConnectionHeartbeatAsync(connectionId, subscription);
380407
}
381408

382409
public override async Task RemoveFromGroupAsync(string connectionId, string groupName,
383410
CancellationToken cancellationToken = new())
411+
{
412+
await RemoveFromGroupsAsync(connectionId, [groupName], cancellationToken);
413+
}
414+
415+
public async Task RemoveFromGroupsAsync(string connectionId, IReadOnlyList<string> groupNames,
416+
CancellationToken cancellationToken = default)
384417
{
385418
var subscription = GetSubscription(connectionId);
386419
if (subscription is null)
387420
{
388421
return;
389422
}
390423

424+
var uniqueGroupNames = GetUniqueGroupNames(groupNames);
425+
if (uniqueGroupNames.Length == 0)
426+
{
427+
return;
428+
}
429+
391430
if (_orleansSignalOptions.Value.GroupPartitionCount > 1)
392431
{
393432
var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain<THub>(_clusterClient);
394-
var partitionId = await Task.Run(() => coordinatorGrain.GetPartitionForGroup(groupName), cancellationToken);
395-
var partitionGrain = NameHelperGenerator.GetGroupPartitionGrain<THub>(_clusterClient, partitionId);
396-
var hubKey = NameHelperGenerator.CleanString(typeof(THub).FullName!);
397-
398-
await Task.Run(() => partitionGrain.EnsureInitialized(hubKey), cancellationToken);
399-
400-
await Task.Run(() => coordinatorGrain.RemoveConnectionFromGroup(groupName, connectionId, subscription.Reference), cancellationToken);
433+
var partitionIds = await Task.Run(
434+
() => coordinatorGrain.RemoveConnectionFromGroups(uniqueGroupNames, connectionId, subscription.Reference),
435+
cancellationToken);
401436

402-
var stillTracked = await Task.Run(() => partitionGrain.HasConnection(connectionId), cancellationToken);
403-
if (!stillTracked)
437+
foreach (var partitionId in partitionIds)
404438
{
405-
subscription.RemoveGrain(partitionGrain);
406-
await UpdateConnectionHeartbeatAsync(connectionId, subscription);
439+
var partitionGrain = NameHelperGenerator.GetGroupPartitionGrain<THub>(_clusterClient, partitionId);
440+
var stillTracked = await Task.Run(() => partitionGrain.HasConnection(connectionId), cancellationToken);
441+
if (!stillTracked)
442+
{
443+
subscription.RemoveGrain(partitionGrain);
444+
}
407445
}
446+
447+
await UpdateConnectionHeartbeatAsync(connectionId, subscription);
408448
}
409449
else
410450
{
411-
var groupGrain = NameHelperGenerator.GetSignalRGroupGrain<THub>(_clusterClient, groupName);
412-
await Task.Run(() => groupGrain.RemoveConnection(connectionId, subscription.Reference), cancellationToken);
413-
subscription.RemoveGrain(groupGrain);
451+
var groupGrains = uniqueGroupNames
452+
.Select(groupName => NameHelperGenerator.GetSignalRGroupGrain<THub>(_clusterClient, groupName))
453+
.Distinct()
454+
.ToArray();
455+
456+
var tasks = groupGrains
457+
.Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscription.Reference), cancellationToken))
458+
.ToArray();
459+
460+
if (tasks.Length > 0)
461+
{
462+
await Task.WhenAll(tasks);
463+
}
464+
465+
foreach (var groupGrain in groupGrains)
466+
{
467+
subscription.RemoveGrain(groupGrain);
468+
}
469+
414470
await UpdateConnectionHeartbeatAsync(connectionId, subscription);
415471
}
416472
}
@@ -643,6 +699,29 @@ private static string GenerateInvocationId()
643699
return connection?.Features.Get<Subscription>();
644700
}
645701

702+
private static string[] GetUniqueGroupNames(IReadOnlyList<string> groupNames)
703+
{
704+
ArgumentNullException.ThrowIfNull(groupNames);
705+
706+
if (groupNames.Count == 0)
707+
{
708+
return [];
709+
}
710+
711+
var unique = new HashSet<string>(StringComparer.Ordinal);
712+
var ordered = new List<string>(groupNames.Count);
713+
714+
foreach (var groupName in groupNames)
715+
{
716+
if (unique.Add(groupName))
717+
{
718+
ordered.Add(groupName);
719+
}
720+
}
721+
722+
return ordered.ToArray();
723+
}
724+
646725
private Task UpdateConnectionHeartbeatAsync(string connectionId, Subscription subscription)
647726
{
648727
if (!_orleansSignalOptions.Value.KeepEachConnectionAlive || string.IsNullOrEmpty(subscription.HubKey))

0 commit comments

Comments
 (0)