Skip to content

Commit 8a61dd2

Browse files
committed
fixes and stablization fror test
1 parent ff2f62f commit 8a61dd2

File tree

10 files changed

+202
-7
lines changed

10 files changed

+202
-7
lines changed

AGENTS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ If no new rule is detected -> do not update the file.
109109
- run broader suites only when you have something real to verify (avoid re-running the same command without changes)
110110
- run coverage once per change (it is heavier than tests)
111111
- Run tests in layers: new/changed -> related suite -> broader regressions
112+
- Final verification for code changes requires the full `dotnet test -c Debug` suite to pass; targeted test runs are only for iteration, not for closing the task
112113
- After tests pass: run format
113114
- After format: run build (final check)
114115
- Summarize changes and test results before marking complete
@@ -185,6 +186,7 @@ If no new rule is detected -> do not update the file.
185186
- Avoid `this.` qualification; prefer predefined types (`int` over `Int32`)
186187
- Never use `ConfigureAwait(false)`
187188
- No magic literals - extract to constants, enums, config
189+
- In concurrency-sensitive paths like `OrleansHubLifetimeManager`, prefer the smallest behavior-preserving fix; avoid widening the async/concurrency shape unless tests prove it is necessary, because this code is easy to make unsafe
188190

189191
### Diagnostics
190192

ManagedCode.Orleans.SignalR.Client/Extensions/OrleansHubGroupExtensions.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ private static IOrleansGroupManager<THub> ResolveGroupManager<THub>(THub hub) wh
3434
{
3535
var serviceProvider = (hub.Context.GetHttpContext()?.RequestServices) ?? throw new InvalidOperationException("Unable to resolve SignalR services for the current hub connection.");
3636

37-
return serviceProvider.GetRequiredService<IOrleansGroupManager<THub>>();
37+
var groupManager = serviceProvider.GetService<IOrleansGroupManager<THub>>();
38+
if (groupManager is not null)
39+
{
40+
return groupManager;
41+
}
42+
43+
var lifetimeManager = serviceProvider.GetRequiredService<HubLifetimeManager<THub>>();
44+
return new OrleansGroupManager<THub>(lifetimeManager);
3845
}
3946
}

ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -371,15 +371,16 @@ public async Task AddToGroupsAsync(string connectionId, IReadOnlyList<string> gr
371371
if (_orleansSignalOptions.Value.GroupPartitionCount > 1)
372372
{
373373
var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain<THub>(_clusterClient);
374-
var partitionIds = await Task.Run(
375-
() => coordinatorGrain.AddConnectionToGroups(uniqueGroupNames, connectionId, subscription.Reference),
376-
cancellationToken);
377-
378-
foreach (var partitionId in partitionIds)
374+
foreach (var groupName in uniqueGroupNames)
379375
{
376+
var partitionId = await Task.Run(() => coordinatorGrain.GetPartitionForGroup(groupName), cancellationToken);
380377
var partitionGrain = NameHelperGenerator.GetGroupPartitionGrain<THub>(_clusterClient, partitionId);
381378
subscription.AddGrain(partitionGrain);
382379
}
380+
381+
await Task.Run(
382+
() => coordinatorGrain.AddConnectionToGroups(uniqueGroupNames, connectionId, subscription.Reference),
383+
cancellationToken);
383384
}
384385
else
385386
{

ManagedCode.Orleans.SignalR.Server/Extensions/OrleansDependencyInjectionExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public static ISignalRServerBuilder AddOrleans(this ISignalRServerBuilder signal
2424
signalrBuilder.Services.AddOptions<OrleansSignalROptions>().Configure(options);
2525
signalrBuilder.Services.AddSingleton(typeof(HubLifetimeManager<>), typeof(OrleansHubLifetimeManager<>));
2626
signalrBuilder.Services.AddSingleton(typeof(IOrleansHubContext<,>), typeof(OrleansHubContext<,>));
27+
signalrBuilder.Services.AddSingleton(typeof(IOrleansGroupManager<>), typeof(OrleansGroupManager<>));
2728

2829
return signalrBuilder;
2930
}

ManagedCode.Orleans.SignalR.Server/SignalRGroupCoordinatorGrain.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ private Dictionary<int, List<string>> GetPartitionsForGroups(string[] groupNames
408408
}
409409
else
410410
{
411-
continue;
411+
partition = PartitionHelper.GetPartitionId(groupName, (uint)_currentPartitionCount);
412412
}
413413

414414
ref var list = ref CollectionsMarshal.GetValueRefOrAddDefault(groupsByPartition, partition, out var exists);

ManagedCode.Orleans.SignalR.Tests/GrainPersistenceTests.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,37 @@ public async Task GroupCoordinatorBatchMethodsShouldTrackMembershipAcrossTouched
224224
}
225225
}
226226

227+
[Fact]
228+
public async Task GroupCoordinatorBatchRemoveShouldCleanupPartitionWhenAssignmentMetadataIsMissingAsync()
229+
{
230+
var client = _cluster.Cluster.Client;
231+
var coordinator = NameHelperGenerator.GetGroupCoordinatorGrain<SimpleTestHub>(client);
232+
var connectionId = $"group-cleanup-{Guid.NewGuid():N}";
233+
var groupName = $"group-drift-{Guid.NewGuid():N}";
234+
var observer = client.CreateObjectReference<ISignalRObserver>(new SignalRObserver(_ => Task.CompletedTask));
235+
236+
var partitionId = await coordinator.GetPartitionForGroup(groupName);
237+
var partition = NameHelperGenerator.GetGroupPartitionGrain<SimpleTestHub>(client, partitionId);
238+
239+
try
240+
{
241+
var addedPartitions = await coordinator.AddConnectionToGroups([groupName], connectionId, observer);
242+
addedPartitions.ShouldContain(partitionId);
243+
(await partition.HasConnection(connectionId)).ShouldBeTrue();
244+
245+
await coordinator.NotifyGroupRemoved(groupName);
246+
(await partition.HasConnection(connectionId)).ShouldBeTrue();
247+
248+
var removedPartitions = await coordinator.RemoveConnectionFromGroups([groupName], connectionId, observer);
249+
removedPartitions.ShouldContain(partitionId);
250+
(await partition.HasConnection(connectionId)).ShouldBeFalse();
251+
}
252+
finally
253+
{
254+
await coordinator.RemoveConnectionFromGroups([groupName], connectionId, observer);
255+
}
256+
}
257+
227258
private static async Task AssertRoutedAsync(Func<Task<bool>> sendAction, string reason)
228259
{
229260
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(5);

ManagedCode.Orleans.SignalR.Tests/HubSmokeTests.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,52 @@ await WaitUntilAsync(() =>
8989
await DisposeAsync(connection1, connection2);
9090
}
9191

92+
[Fact]
93+
public async Task BatchGroupHelpersShouldWorkWithoutOrleansRegistrationAsync()
94+
{
95+
using var app = new TestWebApplication(_cluster, 8089, useOrleans: false, loggerAccessor: _loggerAccessor);
96+
var groupNames = new[]
97+
{
98+
$"plain-group-alpha-{Guid.NewGuid():N}",
99+
$"plain-group-beta-{Guid.NewGuid():N}"
100+
};
101+
102+
var received = new ConcurrentQueue<string>();
103+
var member = await StartConnectionAsync(app, received.Enqueue, null, _output);
104+
var sender = await StartConnectionAsync(app, _ => { }, null, _output);
105+
106+
try
107+
{
108+
await member.InvokeAsync("AddToGroups", groupNames);
109+
110+
foreach (var groupName in groupNames)
111+
{
112+
await sender.InvokeAsync("GroupSendAsync", groupName, $"before:{groupName}");
113+
}
114+
115+
await WaitUntilAsync(() => received.Count >= groupNames.Length, TimeSpan.FromSeconds(10));
116+
117+
received.Count.ShouldBe(groupNames.Length);
118+
119+
await member.InvokeAsync("RemoveFromGroups", groupNames);
120+
while (received.TryDequeue(out _))
121+
{
122+
}
123+
124+
foreach (var groupName in groupNames)
125+
{
126+
await sender.InvokeAsync("GroupSendAsync", groupName, $"after:{groupName}");
127+
}
128+
129+
await Task.Delay(500);
130+
received.ShouldBeEmpty();
131+
}
132+
finally
133+
{
134+
await DisposeAsync(member, sender);
135+
}
136+
}
137+
92138
[Fact]
93139
public async Task UserMessageIsDeliveredToSpecificUserAsync()
94140
{

ManagedCode.Orleans.SignalR.Tests/PartitioningTests.cs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,105 @@ public async Task BatchGroupMembershipShouldAddAndRemoveAcrossMultipleGroupsAsyn
425425
}
426426
}
427427

428+
[Fact]
429+
public async Task BatchGroupMembershipShouldCleanupTouchedPartitionsWhenDisconnectHappensMidJoinAsync()
430+
{
431+
var groupNames = Enumerable.Range(0, 512)
432+
.Select(index => $"batch-disconnect-group-{index}-{Guid.NewGuid():N}")
433+
.ToArray();
434+
435+
var connection = _apps[0].CreateSignalRClient(nameof(SimpleTestHub));
436+
437+
try
438+
{
439+
await connection.StartAsync();
440+
var connectionId = connection.ConnectionId ?? throw new InvalidOperationException("ConnectionId was not initialized.");
441+
442+
var coordinator = NameHelperGenerator.GetGroupCoordinatorGrain<SimpleTestHub>(_siloCluster.Cluster.Client);
443+
var partitionIds = (await Task.WhenAll(groupNames.Select(coordinator.GetPartitionForGroup)))
444+
.Distinct()
445+
.ToArray();
446+
partitionIds.Length.ShouldBeGreaterThan(0);
447+
448+
var partitions = partitionIds
449+
.Select(partitionId => NameHelperGenerator.GetGroupPartitionGrain<SimpleTestHub>(_siloCluster.Cluster.Client, partitionId))
450+
.ToArray();
451+
452+
var joinTask = connection.InvokeAsync("AddToGroups", groupNames);
453+
454+
var joinStarted = await WaitForBatchJoinToStartAsync(partitions, connectionId, joinTask, TimeSpan.FromSeconds(3));
455+
456+
joinStarted.ShouldBeTrue();
457+
458+
await connection.StopAsync();
459+
460+
try
461+
{
462+
await joinTask;
463+
}
464+
catch (Exception ex) when (ex is HubException or InvalidOperationException or TaskCanceledException)
465+
{
466+
_testOutputHelper.WriteLine($"Join invocation ended after disconnect: {ex.GetType().Name}: {ex.Message}");
467+
}
468+
469+
var released = await WaitUntilAsync(
470+
"all touched partitions to release disconnected connection after batch join",
471+
async () => await GetTrackedPartitionCountAsync(partitions, connectionId) == 0,
472+
progress: async () =>
473+
$"tracked={await GetTrackedPartitionCountAsync(partitions, connectionId)}/{partitions.Length}",
474+
timeout: TimeSpan.FromSeconds(15));
475+
476+
released.ShouldBeTrue();
477+
}
478+
finally
479+
{
480+
await connection.DisposeAsync();
481+
}
482+
}
483+
484+
private static async Task<int> GetTrackedPartitionCountAsync(
485+
IReadOnlyCollection<Core.Interfaces.ISignalRGroupPartitionGrain> partitions,
486+
string connectionId)
487+
{
488+
var tracked = 0;
489+
490+
foreach (var partition in partitions)
491+
{
492+
if (await partition.HasConnection(connectionId))
493+
{
494+
tracked++;
495+
}
496+
}
497+
498+
return tracked;
499+
}
500+
501+
private static async Task<bool> WaitForBatchJoinToStartAsync(
502+
IReadOnlyCollection<Core.Interfaces.ISignalRGroupPartitionGrain> partitions,
503+
string connectionId,
504+
Task joinTask,
505+
TimeSpan timeout)
506+
{
507+
var deadline = DateTime.UtcNow + timeout;
508+
509+
while (DateTime.UtcNow < deadline)
510+
{
511+
if (await GetTrackedPartitionCountAsync(partitions, connectionId) > 0 && !joinTask.IsCompleted)
512+
{
513+
return true;
514+
}
515+
516+
if (joinTask.IsCompleted)
517+
{
518+
return false;
519+
}
520+
521+
await Task.Delay(TimeSpan.FromMilliseconds(5));
522+
}
523+
524+
return await GetTrackedPartitionCountAsync(partitions, connectionId) > 0 && !joinTask.IsCompleted;
525+
}
526+
428527
private async Task<bool> WaitUntilAsync(
429528
string description,
430529
Func<Task<bool>> condition,

docs/Features/Group-Partitioning.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ Group operations are routed through a coordinator grain that assigns group names
2323
- [x] Add package-level batch group APIs for hub and host-service callers.
2424
- [x] Batch coordinator and partition updates so one request does not force sequential writes per group.
2525
- [x] Cover batch add/remove with integration tests and direct coordinator verification.
26+
- [x] Restore disconnect-safe subscription tracking for partitioned batch adds.
27+
- [x] Restore the hashed cleanup fallback when batch removals see missing coordinator assignments.
28+
- [x] Add regression tests for disconnect cleanup and degraded coordinator state.
2629

2730
## Main flow
2831

@@ -44,6 +47,8 @@ flowchart TD
4447
- Partition grains hold `group -> connection -> observer` mappings and emit fan-out to observers.
4548
- Empty groups trigger cleanup so partitions can shed state.
4649
- Batch membership operations collapse repeated coordinator writes into one persistence step per request and one partition write per touched partition.
50+
- Partitioned batch adds pre-register touched partitions in the connection subscription before the coordinator write finishes so disconnect cleanup still reaches every touched partition.
51+
- Batch removals fall back to the hashed partition when coordinator assignment metadata is missing, which preserves cleanup in degraded-state recovery scenarios.
4752

4853
## Configuration knobs
4954

docs/Features/Hub-Lifetime-Manager-Integration.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ The ASP.NET Core host swaps the default SignalR hub lifetime manager with `Orlea
1919
- [x] Restore fire-and-forget fan-out for multi-group and multi-user sends.
2020
- [x] Ensure per-target send failures are logged without blocking hub execution.
2121
- [x] Route package-specific batch group membership calls through the Orleans lifetime manager.
22+
- [x] Keep batch group helper calls usable when the host is running plain `AddSignalR()`.
23+
- [x] Add regression coverage for the batch helper path without Orleans registration.
2224

2325
## Main flow
2426

@@ -38,6 +40,7 @@ flowchart TD
3840
- `AddOrleans()` registers `OrleansHubLifetimeManager<THub>` as the `HubLifetimeManager` implementation.
3941
- The lifetime manager creates a per-connection `Subscription` and registers observers with connection/group/user grains.
4042
- Package-specific batch group operations (`AddToGroupsAsync` / `RemoveFromGroupsAsync`) also route through the lifetime manager instead of looping over sequential single-group writes.
43+
- Hub batch helpers fall back to the registered `HubLifetimeManager<THub>` when `IOrleansGroupManager<THub>` is not explicitly registered, so the API still works on plain `AddSignalR()` hosts.
4144
- Detailed batching behavior and partition persistence rules live in `docs/Features/Group-Partitioning.md`.
4245

4346
## Configuration knobs

0 commit comments

Comments
 (0)