Skip to content

Commit 2a1fdf1

Browse files
authored
Merge pull request #8 from managedcode/codex/fix-batch-group-cleanup
fix: harden batch group cleanup
2 parents 8a61dd2 + f2e440b commit 2a1fdf1

File tree

7 files changed

+142
-46
lines changed

7 files changed

+142
-46
lines changed

AGENTS.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ If no new rule is detected -> do not update the file.
186186
- Avoid `this.` qualification; prefer predefined types (`int` over `Int32`)
187187
- Never use `ConfigureAwait(false)`
188188
- No magic literals - extract to constants, enums, config
189+
- Prefer the simplest clear implementation over incidental complexity, especially on hot paths; if two fixes are correct, choose the one that is easier to reason about and keeps group operations fast
189190
- In concurrency-sensitive paths like `OrleansHubLifetimeManager`, prefer the smallest behavior-preserving fix; avoid widening the async/concurrency shape unless tests prove it is necessary, because this code is easy to make unsafe
191+
- Never introduce explicit `lock`/`Monitor` synchronization in Orleans-related paths; fix races via ordering, idempotent cleanup, or Orleans/concurrent primitives instead of `_syncRoot`-style locking
192+
- For Orleans-facing changes, follow the request scheduling model explicitly: prefer grain-aligned ordering/idempotency fixes over host-side synchronization, because group operations must stay fast and consistent with Orleans execution semantics
190193

191194
### Diagnostics
192195

ManagedCode.Orleans.SignalR.Core/SignalR/Observers/Subscription.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.Collections.Generic;
34
using System.Collections.Immutable;
45
using ManagedCode.Orleans.SignalR.Core.Interfaces;
@@ -8,8 +9,9 @@ namespace ManagedCode.Orleans.SignalR.Core.SignalR.Observers;
89

910
public sealed class Subscription(SignalRObserver observer) : IDisposable
1011
{
11-
private readonly HashSet<IObserverConnectionManager> _grains = new();
12-
private readonly HashSet<GrainId> _heartbeatGrainIds = new();
12+
// Use ConcurrentDictionary as a concurrent hash-set because batch group mutations can overlap disconnect cleanup.
13+
private readonly ConcurrentDictionary<IObserverConnectionManager, bool> _grains = new();
14+
private readonly ConcurrentDictionary<GrainId, bool> _heartbeatGrainIds = new();
1315
private bool _disposed;
1416

1517
public ISignalRObserver Reference { get; private set; } = default!;
@@ -20,7 +22,7 @@ public sealed class Subscription(SignalRObserver observer) : IDisposable
2022

2123
public int PartitionId { get; private set; }
2224

23-
public IReadOnlyCollection<IObserverConnectionManager> Grains => _grains;
25+
public IReadOnlyCollection<IObserverConnectionManager> Grains => _grains.IsEmpty ? [] : [.. _grains.Keys];
2426

2527
public void Dispose()
2628
{
@@ -30,6 +32,7 @@ public void Dispose()
3032
}
3133

3234
_disposed = true;
35+
3336
observer?.Dispose();
3437
_grains.Clear();
3538
_heartbeatGrainIds.Clear();
@@ -41,14 +44,14 @@ public void Dispose()
4144

4245
public void AddGrain(IObserverConnectionManager grain)
4346
{
44-
_grains.Add(grain);
45-
_heartbeatGrainIds.Add(((GrainReference)grain).GrainId);
47+
_grains.TryAdd(grain, true);
48+
_heartbeatGrainIds.TryAdd(((GrainReference)grain).GrainId, true);
4649
}
4750

4851
public void RemoveGrain(IObserverConnectionManager grain)
4952
{
50-
_grains.Remove(grain);
51-
_heartbeatGrainIds.Remove(((GrainReference)grain).GrainId);
53+
_grains.TryRemove(grain, out _);
54+
_heartbeatGrainIds.TryRemove(((GrainReference)grain).GrainId, out _);
5255
}
5356

5457
public void ClearGrains()
@@ -76,13 +79,13 @@ public SignalRObserver GetObserver()
7679

7780
public ImmutableArray<GrainId> GetHeartbeatGrainIds()
7881
{
79-
if (_heartbeatGrainIds.Count == 0)
82+
if (_heartbeatGrainIds.IsEmpty)
8083
{
8184
return ImmutableArray<GrainId>.Empty;
8285
}
8386

8487
var builder = ImmutableArray.CreateBuilder<GrainId>(_heartbeatGrainIds.Count);
85-
foreach (var grainId in _heartbeatGrainIds)
88+
foreach (var grainId in _heartbeatGrainIds.Keys)
8689
{
8790
builder.Add(grainId);
8891
}

ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs

Lines changed: 87 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -368,19 +368,42 @@ public async Task AddToGroupsAsync(string connectionId, IReadOnlyList<string> gr
368368
return;
369369
}
370370

371+
var subscriptionReference = subscription.Reference;
372+
371373
if (_orleansSignalOptions.Value.GroupPartitionCount > 1)
372374
{
373375
var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain<THub>(_clusterClient);
374-
foreach (var groupName in uniqueGroupNames)
376+
var partitionIds = await Task.Run(
377+
() => coordinatorGrain.AddConnectionToGroups(uniqueGroupNames, connectionId, subscriptionReference),
378+
cancellationToken);
379+
380+
if (IsConnectionDisconnected(connectionId))
381+
{
382+
await CleanupDisconnectedBatchPartitionMembershipAsync(
383+
coordinatorGrain,
384+
uniqueGroupNames,
385+
connectionId,
386+
subscriptionReference,
387+
cancellationToken);
388+
return;
389+
}
390+
391+
foreach (var partitionId in partitionIds)
375392
{
376-
var partitionId = await Task.Run(() => coordinatorGrain.GetPartitionForGroup(groupName), cancellationToken);
377393
var partitionGrain = NameHelperGenerator.GetGroupPartitionGrain<THub>(_clusterClient, partitionId);
378394
subscription.AddGrain(partitionGrain);
379395
}
380396

381-
await Task.Run(
382-
() => coordinatorGrain.AddConnectionToGroups(uniqueGroupNames, connectionId, subscription.Reference),
383-
cancellationToken);
397+
if (IsConnectionDisconnected(connectionId))
398+
{
399+
await CleanupDisconnectedBatchPartitionMembershipAsync(
400+
coordinatorGrain,
401+
uniqueGroupNames,
402+
connectionId,
403+
subscriptionReference,
404+
cancellationToken);
405+
return;
406+
}
384407
}
385408
else
386409
{
@@ -389,19 +412,47 @@ await Task.Run(
389412
.Distinct()
390413
.ToArray();
391414

392-
foreach (var groupGrain in groupGrains)
393-
{
394-
subscription.AddGrain(groupGrain);
395-
}
396-
397415
var tasks = groupGrains
398-
.Select(groupGrain => Task.Run(() => groupGrain.AddConnection(connectionId, subscription.Reference), cancellationToken))
416+
.Select(groupGrain => Task.Run(() => groupGrain.AddConnection(connectionId, subscriptionReference), cancellationToken))
399417
.ToArray();
400418

401419
if (tasks.Length > 0)
402420
{
403421
await Task.WhenAll(tasks);
404422
}
423+
424+
if (IsConnectionDisconnected(connectionId))
425+
{
426+
var cleanupTasks = groupGrains
427+
.Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscriptionReference), cancellationToken))
428+
.ToArray();
429+
430+
if (cleanupTasks.Length > 0)
431+
{
432+
await Task.WhenAll(cleanupTasks);
433+
}
434+
435+
return;
436+
}
437+
438+
foreach (var groupGrain in groupGrains)
439+
{
440+
subscription.AddGrain(groupGrain);
441+
}
442+
443+
if (IsConnectionDisconnected(connectionId))
444+
{
445+
var cleanupTasks = groupGrains
446+
.Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscriptionReference), cancellationToken))
447+
.ToArray();
448+
449+
if (cleanupTasks.Length > 0)
450+
{
451+
await Task.WhenAll(cleanupTasks);
452+
}
453+
454+
return;
455+
}
405456
}
406457

407458
await UpdateConnectionHeartbeatAsync(connectionId, subscription);
@@ -428,11 +479,13 @@ public async Task RemoveFromGroupsAsync(string connectionId, IReadOnlyList<strin
428479
return;
429480
}
430481

482+
var subscriptionReference = subscription.Reference;
483+
431484
if (_orleansSignalOptions.Value.GroupPartitionCount > 1)
432485
{
433486
var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain<THub>(_clusterClient);
434487
var partitionIds = await Task.Run(
435-
() => coordinatorGrain.RemoveConnectionFromGroups(uniqueGroupNames, connectionId, subscription.Reference),
488+
() => coordinatorGrain.RemoveConnectionFromGroups(uniqueGroupNames, connectionId, subscriptionReference),
436489
cancellationToken);
437490

438491
foreach (var partitionId in partitionIds)
@@ -455,7 +508,7 @@ public async Task RemoveFromGroupsAsync(string connectionId, IReadOnlyList<strin
455508
.ToArray();
456509

457510
var tasks = groupGrains
458-
.Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscription.Reference), cancellationToken))
511+
.Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscriptionReference), cancellationToken))
459512
.ToArray();
460513

461514
if (tasks.Length > 0)
@@ -723,9 +776,29 @@ private static string[] GetUniqueGroupNames(IReadOnlyList<string> groupNames)
723776
return ordered.ToArray();
724777
}
725778

779+
private static async Task CleanupDisconnectedBatchPartitionMembershipAsync(
780+
ISignalRGroupCoordinatorGrain coordinatorGrain,
781+
string[] groupNames,
782+
string connectionId,
783+
ISignalRObserver subscriptionReference,
784+
CancellationToken cancellationToken)
785+
{
786+
await Task.Run(
787+
() => coordinatorGrain.RemoveConnectionFromGroups(groupNames, connectionId, subscriptionReference),
788+
cancellationToken);
789+
}
790+
791+
private bool IsConnectionDisconnected(string connectionId)
792+
{
793+
var connection = _connections[connectionId];
794+
return connection is null || connection.ConnectionAborted.IsCancellationRequested;
795+
}
796+
726797
private Task UpdateConnectionHeartbeatAsync(string connectionId, Subscription subscription)
727798
{
728-
if (!_orleansSignalOptions.Value.KeepEachConnectionAlive || string.IsNullOrEmpty(subscription.HubKey))
799+
if (!_orleansSignalOptions.Value.KeepEachConnectionAlive ||
800+
string.IsNullOrEmpty(subscription.HubKey) ||
801+
IsConnectionDisconnected(connectionId))
729802
{
730803
return Task.CompletedTask;
731804
}

ManagedCode.Orleans.SignalR.Server/SignalRObserverGrainBase.cs

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ protected void TrackConnection(string connectionId, ISignalRObserver observer)
121121

122122
if (_gracePeriodEnabled && HealthTracker.IsInGracePeriod(connectionId))
123123
{
124-
// Critical: do NOT replay buffered SignalR messages on the Orleans scheduler.
125-
_ = Task.Run(() => RestoreObserverFromGracePeriodAsync(connectionId, observer));
124+
_ = RestoreObserverFromGracePeriodAsync(connectionId, observer);
126125
}
127126
}
128127

@@ -149,8 +148,7 @@ protected void TouchObserver(ISignalRObserver observer)
149148
if (_gracePeriodEnabled && _observerToConnectionId.TryGetValue(observer, out var connectionId) &&
150149
HealthTracker.IsInGracePeriod(connectionId))
151150
{
152-
// Critical: do NOT replay buffered SignalR messages on the Orleans scheduler.
153-
_ = Task.Run(() => RestoreObserverFromGracePeriodAsync(connectionId, observer));
151+
_ = RestoreObserverFromGracePeriodAsync(connectionId, observer);
154152
}
155153
}
156154

@@ -462,23 +460,7 @@ protected async Task<int> RestoreObserverFromGracePeriodAsync(string connectionI
462460
connectionId,
463461
bufferedMessages.Count);
464462

465-
var replayedCount = 0;
466-
foreach (var message in bufferedMessages)
467-
{
468-
try
469-
{
470-
await observer.OnNextAsync(message);
471-
replayedCount++;
472-
}
473-
catch (Exception ex)
474-
{
475-
Logger.LogWarning(
476-
ex,
477-
"Failed to replay buffered message to connection {ConnectionId}. Stopping replay.",
478-
connectionId);
479-
break;
480-
}
481-
}
463+
var replayedCount = await ReplayBufferedMessagesAsync(connectionId, observer, bufferedMessages);
482464

483465
if (replayedCount > 0)
484466
{
@@ -497,6 +479,36 @@ protected async Task<int> RestoreObserverFromGracePeriodAsync(string connectionI
497479
return replayedCount;
498480
}
499481

482+
private async Task<int> ReplayBufferedMessagesAsync(
483+
string connectionId,
484+
ISignalRObserver observer,
485+
IReadOnlyList<HubMessage> bufferedMessages)
486+
{
487+
// Critical: do NOT replay buffered SignalR messages on the Orleans scheduler.
488+
return await Task.Run(async () =>
489+
{
490+
var replayedCount = 0;
491+
foreach (var message in bufferedMessages)
492+
{
493+
try
494+
{
495+
await observer.OnNextAsync(message);
496+
replayedCount++;
497+
}
498+
catch (Exception ex)
499+
{
500+
Logger.LogWarning(
501+
ex,
502+
"Failed to replay buffered message to connection {ConnectionId}. Stopping replay.",
503+
connectionId);
504+
break;
505+
}
506+
}
507+
508+
return replayedCount;
509+
});
510+
}
511+
500512
/// <summary>
501513
/// Called when grace periods expire for observers.
502514
/// Override to implement custom cleanup logic.

docs/Features/Group-Partitioning.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ Group operations are routed through a coordinator grain that assigns group names
2323
- [x] Add package-level batch group APIs for hub and host-service callers.
2424
- [x] Batch coordinator and partition updates so one request does not force sequential writes per group.
2525
- [x] Cover batch add/remove with integration tests and direct coordinator verification.
26-
- [x] Restore disconnect-safe subscription tracking for partitioned batch adds.
26+
- [x] Restore disconnect-safe cleanup for partitioned batch adds.
2727
- [x] Restore the hashed cleanup fallback when batch removals see missing coordinator assignments.
2828
- [x] Add regression tests for disconnect cleanup and degraded coordinator state.
29+
- [x] Re-run partition cleanup when a disconnected batch join finishes late so late partition adds cannot outlive the connection.
2930

3031
## Main flow
3132

@@ -47,7 +48,7 @@ flowchart TD
4748
- Partition grains hold `group -> connection -> observer` mappings and emit fan-out to observers.
4849
- Empty groups trigger cleanup so partitions can shed state.
4950
- Batch membership operations collapse repeated coordinator writes into one persistence step per request and one partition write per touched partition.
50-
- Partitioned batch adds pre-register touched partitions in the connection subscription before the coordinator write finishes so disconnect cleanup still reaches every touched partition.
51+
- If that coordinator write finishes after the connection has already disconnected, the lifetime manager immediately issues a compensating batch remove through the coordinator so late partition adds do not recreate stale membership.
5152
- Batch removals fall back to the hashed partition when coordinator assignment metadata is missing, which preserves cleanup in degraded-state recovery scenarios.
5253

5354
## Configuration knobs

docs/Features/Hub-Lifetime-Manager-Integration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ The ASP.NET Core host swaps the default SignalR hub lifetime manager with `Orlea
2121
- [x] Route package-specific batch group membership calls through the Orleans lifetime manager.
2222
- [x] Keep batch group helper calls usable when the host is running plain `AddSignalR()`.
2323
- [x] Add regression coverage for the batch helper path without Orleans registration.
24+
- [x] Re-run batch partition cleanup after disconnect when a late coordinator write finishes.
2425

2526
## Main flow
2627

@@ -41,6 +42,7 @@ flowchart TD
4142
- The lifetime manager creates a per-connection `Subscription` and registers observers with connection/group/user grains.
4243
- Package-specific batch group operations (`AddToGroupsAsync` / `RemoveFromGroupsAsync`) also route through the lifetime manager instead of looping over sequential single-group writes.
4344
- Hub batch helpers fall back to the registered `HubLifetimeManager<THub>` when `IOrleansGroupManager<THub>` is not explicitly registered, so the API still works on plain `AddSignalR()` hosts.
45+
- If a partitioned batch join finishes after the connection has already disconnected, the lifetime manager immediately routes a compensating batch remove through the coordinator before returning.
4446
- Detailed batching behavior and partition persistence rules live in `docs/Features/Group-Partitioning.md`.
4547

4648
## Configuration knobs

docs/Features/Observer-Health-and-Circuit-Breaker.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Observer delivery is protected by health tracking, circuit breaker logic, and op
2020
- [x] Route grace-period expiration to observer cleanup and health-state removal.
2121
- [x] Offload observer notifications from the Orleans scheduler and document why it is critical.
2222
- [x] Add tests for circuit-breaker threshold behavior (enabled vs disabled).
23+
- [x] Keep grace-period state transitions on the Orleans scheduler and offload only observer replay I/O.
2324

2425
## Main flow
2526

@@ -41,6 +42,7 @@ flowchart TD
4142
- Failed deliveries are recorded in a rolling failure window.
4243
- When thresholds are exceeded, the circuit opens and delivery is skipped.
4344
- If a grace period is configured, messages are buffered and replayed on recovery; expired grace periods remove observers.
45+
- Grace-period recovery mutates `ObserverHealthTracker` on the grain scheduler first, then replays buffered observer callbacks off-scheduler so Orleans-owned state is never touched from a thread-pool turn.
4446
- Without a grace period, reaching the failure threshold removes the observer immediately.
4547

4648
## Configuration knobs

0 commit comments

Comments
 (0)