Skip to content

Commit 423ed12

Browse files
committed
fixes and code style
1 parent 7841f95 commit 423ed12

File tree

18 files changed

+658
-144
lines changed

18 files changed

+658
-144
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System.Threading.Tasks;
2+
using ManagedCode.Orleans.SignalR.Core.Models;
3+
using Orleans;
4+
5+
namespace ManagedCode.Orleans.SignalR.Core.Interfaces;
6+
7+
public interface ISignalRConnectionHeartbeatGrain : IGrainWithStringKey
8+
{
9+
Task Start(ConnectionHeartbeatRegistration registration);
10+
Task Stop();
11+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
using ManagedCode.Orleans.SignalR.Core.Interfaces;
3+
using Orleans;
4+
5+
namespace ManagedCode.Orleans.SignalR.Core.Models;
6+
7+
[GenerateSerializer]
8+
public sealed record ConnectionHeartbeatRegistration(
9+
[property: Id(0)] string HubKey,
10+
[property: Id(1)] bool UsePartitioning,
11+
[property: Id(2)] int PartitionId,
12+
[property: Id(3)] ISignalRObserver Observer,
13+
[property: Id(4)] TimeSpan Interval);

ManagedCode.Orleans.SignalR.Core/SignalR/NameHelperGenerator.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ public static ISignalRConnectionHolderGrain GetConnectionHolderGrain<THub>(IGrai
1717
{
1818
return grainFactory.GetGrain<ISignalRConnectionHolderGrain>(CleanString(typeof(THub).FullName!));
1919
}
20+
21+
public static ISignalRConnectionHolderGrain GetConnectionHolderGrain(IGrainFactory grainFactory, string hubKey)
22+
{
23+
return grainFactory.GetGrain<ISignalRConnectionHolderGrain>(CleanString(hubKey));
24+
}
2025

2126
public static ISignalRConnectionCoordinatorGrain GetConnectionCoordinatorGrain<THub>(IGrainFactory grainFactory)
2227
{
@@ -77,6 +82,13 @@ public static ISignalRGroupPartitionGrain GetGroupPartitionGrain(IGrainFactory g
7782
return grainFactory.GetGrain<ISignalRGroupPartitionGrain>(key);
7883
}
7984

85+
public static ISignalRConnectionHeartbeatGrain GetConnectionHeartbeatGrain(IGrainFactory grainFactory, string hubKey, string connectionId)
86+
{
87+
var normalizedConnection = CleanString(connectionId);
88+
var key = $"{CleanString(hubKey)}::{normalizedConnection}";
89+
return grainFactory.GetGrain<ISignalRConnectionHeartbeatGrain>(key);
90+
}
91+
8092
public static string CleanString(string input)
8193
{
8294
var builder = new System.Text.StringBuilder();
Lines changed: 10 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,37 @@
11
using System;
22
using System.Collections.Generic;
3-
using System.Threading;
4-
using System.Threading.Tasks;
53
using ManagedCode.Orleans.SignalR.Core.Interfaces;
64

75
namespace ManagedCode.Orleans.SignalR.Core.SignalR.Observers;
86

97
public class Subscription : IDisposable
108
{
11-
private readonly CancellationTokenSource _cts = new();
129
private readonly HashSet<IObserverConnectionManager> _grains = new();
1310
private readonly SignalRObserver _observer;
14-
private readonly IDisposable _timer;
11+
private bool _disposed;
1512

16-
public Subscription(SignalRObserver observer, TimeSpan pingTime)
13+
public Subscription(SignalRObserver observer)
1714
{
1815
_observer = observer;
19-
_timer = new Timer(Callback, this, pingTime, pingTime);
2016
}
2117

2218
~Subscription()
2319
{
2420
Dispose();
2521
}
2622

27-
public ISignalRObserver Reference { get; private set; }
23+
public ISignalRObserver Reference { get; private set; } = default!;
2824

2925
public IReadOnlyCollection<IObserverConnectionManager> Grains => _grains;
3026

3127
public void Dispose()
3228
{
33-
_cts?.Cancel();
34-
_timer?.Dispose();
29+
if (_disposed)
30+
{
31+
return;
32+
}
33+
34+
_disposed = true;
3535
_observer?.Dispose();
3636
_grains?.Clear();
3737
Reference = null!;
@@ -47,32 +47,6 @@ public void RemoveGrain(IObserverConnectionManager grain)
4747
_grains.Remove(grain);
4848
}
4949

50-
private void Callback(object? state)
51-
{
52-
var token = _cts.Token;
53-
_ = Task.Run(() => Callback(token), _cts.Token).ConfigureAwait(false);
54-
}
55-
56-
private async Task Callback(CancellationToken token)
57-
{
58-
if (!_observer.IsExist || token.IsCancellationRequested)
59-
{
60-
Dispose();
61-
return;
62-
}
63-
64-
foreach (var grain in _grains)
65-
{
66-
if (token.IsCancellationRequested)
67-
return;
68-
69-
await grain.Ping(Reference).ConfigureAwait(false);
70-
71-
if (token.IsCancellationRequested)
72-
return;
73-
}
74-
}
75-
7650
public void SetReference(ISignalRObserver reference)
7751
{
7852
Reference = reference;
@@ -82,4 +56,4 @@ public SignalRObserver GetObserver()
8256
{
8357
return _observer;
8458
}
85-
}
59+
}

ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,22 +47,36 @@ public override async Task OnConnectedAsync(HubConnectionContext connection)
4747
_connections.Add(connection);
4848
var subscription = CreateConnectionObserver(connection);
4949

50+
var hubKey = NameHelperGenerator.CleanString(typeof(THub).FullName!);
51+
var usePartitions = _orleansSignalOptions.Value.ConnectionPartitionCount > 1;
52+
var partitionId = 0;
53+
54+
if (usePartitions)
55+
{
56+
var coordinatorGrain = NameHelperGenerator.GetConnectionCoordinatorGrain<THub>(_clusterClient);
57+
partitionId = await coordinatorGrain.GetPartitionForConnection(connection.ConnectionId);
58+
var partitionGrain = NameHelperGenerator.GetConnectionPartitionGrain<THub>(_clusterClient, partitionId);
59+
subscription.AddGrain(partitionGrain);
60+
await Task.Run(() => partitionGrain.AddConnection(connection.ConnectionId, subscription.Reference));
61+
}
62+
else
63+
{
64+
var connectionHolderGrain = NameHelperGenerator.GetConnectionHolderGrain<THub>(_clusterClient);
65+
subscription.AddGrain(connectionHolderGrain);
66+
await Task.Run(() => connectionHolderGrain.AddConnection(connection.ConnectionId, subscription.Reference));
67+
}
68+
5069
if (_orleansSignalOptions.Value.KeepEachConnectionAlive)
5170
{
52-
if (_orleansSignalOptions.Value.ConnectionPartitionCount > 1)
53-
{
54-
var coordinatorGrain = NameHelperGenerator.GetConnectionCoordinatorGrain<THub>(_clusterClient);
55-
var partitionId = await coordinatorGrain.GetPartitionForConnection(connection.ConnectionId);
56-
var partitionGrain = NameHelperGenerator.GetConnectionPartitionGrain<THub>(_clusterClient, partitionId);
57-
subscription.AddGrain(partitionGrain);
58-
await Task.Run(() => partitionGrain.AddConnection(connection.ConnectionId, subscription.Reference));
59-
}
60-
else
61-
{
62-
var connectionHolderGrain = NameHelperGenerator.GetConnectionHolderGrain<THub>(_clusterClient);
63-
subscription.AddGrain(connectionHolderGrain);
64-
await Task.Run(() => connectionHolderGrain.AddConnection(connection.ConnectionId, subscription.Reference));
65-
}
71+
var heartbeatInterval = TimeIntervalHelper.GetClientTimeoutInterval(_orleansSignalOptions, _globalHubOptions, _hubOptions);
72+
var heartbeatGrain = NameHelperGenerator.GetConnectionHeartbeatGrain(_clusterClient, hubKey, connection.ConnectionId);
73+
var registration = new ConnectionHeartbeatRegistration(
74+
hubKey,
75+
usePartitions,
76+
partitionId,
77+
subscription.Reference,
78+
heartbeatInterval);
79+
await Task.Run(() => heartbeatGrain.Start(registration));
6680
}
6781

6882
if (!string.IsNullOrEmpty(connection.UserIdentifier))
@@ -94,6 +108,13 @@ public override async Task OnDisconnectedAsync(HubConnectionContext connection)
94108

95109
await Task.Run(() => NameHelperGenerator.GetConnectionCoordinatorGrain<THub>(_clusterClient)
96110
.NotifyConnectionRemoved(connection.ConnectionId));
111+
112+
if (_orleansSignalOptions.Value.KeepEachConnectionAlive)
113+
{
114+
var hubKey = NameHelperGenerator.CleanString(typeof(THub).FullName!);
115+
var heartbeatGrain = NameHelperGenerator.GetConnectionHeartbeatGrain(_clusterClient, hubKey, connection.ConnectionId);
116+
await Task.Run(() => heartbeatGrain.Stop());
117+
}
97118
}
98119

99120
public override Task SendAllAsync(string methodName, object?[] args, CancellationToken cancellationToken = new())
@@ -399,8 +420,7 @@ private Subscription CreateConnectionObserver(HubConnectionContext connection)
399420

400421
private Subscription CreateSubscription(Func<HubMessage, Task> onNextAction)
401422
{
402-
var timeSpan = TimeIntervalHelper.GetClientTimeoutInterval(_orleansSignalOptions, _globalHubOptions, _hubOptions);
403-
var subscription = new Subscription(new SignalRObserver(onNextAction), timeSpan);
423+
var subscription = new Subscription(new SignalRObserver(onNextAction));
404424
var reference = _clusterClient.CreateObjectReference<ISignalRObserver>(subscription.GetObserver());
405425
subscription.SetReference(reference);
406426
return subscription;
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using ManagedCode.Orleans.SignalR.Core.Interfaces;
5+
using ManagedCode.Orleans.SignalR.Core.Models;
6+
using ManagedCode.Orleans.SignalR.Core.SignalR;
7+
using Microsoft.Extensions.Logging;
8+
using Orleans;
9+
using Orleans.Concurrency;
10+
11+
namespace ManagedCode.Orleans.SignalR.Server;
12+
13+
[Reentrant]
14+
[GrainType($"ManagedCode.{nameof(SignalRConnectionHeartbeatGrain)}")]
15+
public class SignalRConnectionHeartbeatGrain(
16+
ILogger<SignalRConnectionHeartbeatGrain> logger) : Grain, ISignalRConnectionHeartbeatGrain
17+
{
18+
private readonly ILogger<SignalRConnectionHeartbeatGrain> _logger = logger;
19+
private ConnectionHeartbeatRegistration? _registration;
20+
private IDisposable? _timer;
21+
22+
public Task Start(ConnectionHeartbeatRegistration registration)
23+
{
24+
_registration = registration;
25+
ResetTimer(registration.Interval);
26+
_logger.LogDebug("Heartbeat started for connection grain {Key} (partitioned={Partitioned}, partitionId={PartitionId}).",
27+
this.GetPrimaryKeyString(), registration.UsePartitioning, registration.PartitionId);
28+
return Task.CompletedTask;
29+
}
30+
31+
public Task Stop()
32+
{
33+
ResetTimer(null);
34+
_registration = null;
35+
_logger.LogDebug("Heartbeat stopped for connection grain {Key}.", this.GetPrimaryKeyString());
36+
return Task.CompletedTask;
37+
}
38+
39+
public override Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
40+
{
41+
ResetTimer(null);
42+
_registration = null;
43+
return base.OnDeactivateAsync(reason, cancellationToken);
44+
}
45+
46+
private void ResetTimer(TimeSpan? interval)
47+
{
48+
_timer?.Dispose();
49+
_timer = null;
50+
51+
if (interval is { } period && period > TimeSpan.Zero)
52+
{
53+
var dueTime = TimeSpan.FromMilliseconds(Math.Max(500, period.TotalMilliseconds / 2));
54+
_timer = RegisterTimer(OnTimerTickAsync, null, dueTime, dueTime);
55+
}
56+
}
57+
58+
private async Task OnTimerTickAsync(object? _)
59+
{
60+
if (_registration is null)
61+
{
62+
return;
63+
}
64+
65+
try
66+
{
67+
if (_registration.UsePartitioning)
68+
{
69+
var partition = NameHelperGenerator.GetConnectionPartitionGrain(GrainFactory, _registration.HubKey, _registration.PartitionId);
70+
await partition.Ping(_registration.Observer);
71+
}
72+
else
73+
{
74+
var holder = NameHelperGenerator.GetConnectionHolderGrain(GrainFactory, _registration.HubKey);
75+
await holder.Ping(_registration.Observer);
76+
}
77+
}
78+
catch (Exception ex)
79+
{
80+
_logger.LogDebug(ex, "Heartbeat ping failed for connection grain {Key}.", this.GetPrimaryKeyString());
81+
}
82+
}
83+
}

ManagedCode.Orleans.SignalR.Tests/HubLoadTests.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
using System.Collections.Concurrent;
22
using System.Linq;
3-
using Shouldly;
43
using ManagedCode.Orleans.SignalR.Tests.Cluster;
4+
using ManagedCode.Orleans.SignalR.Tests.Infrastructure.Logging;
55
using ManagedCode.Orleans.SignalR.Tests.TestApp;
66
using ManagedCode.Orleans.SignalR.Tests.TestApp.Hubs;
77
using Microsoft.AspNetCore.SignalR.Client;
8+
using Shouldly;
89
using Xunit;
910
using Xunit.Abstractions;
1011

@@ -19,6 +20,7 @@ public class HubLoadTests
1920
private readonly TestWebApplication _firstApp;
2021
private readonly TestWebApplication _secondApp;
2122
private readonly ITestOutputHelper _output;
23+
private readonly TestOutputHelperAccessor _loggerAccessor = new();
2224

2325
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(30);
2426
private static readonly TimeSpan PollInterval = TimeSpan.FromMilliseconds(100);
@@ -28,8 +30,9 @@ public HubLoadTests(LoadClusterFixture cluster, ITestOutputHelper output)
2830
{
2931
_cluster = cluster;
3032
_output = output;
31-
_firstApp = new TestWebApplication(_cluster, 8081);
32-
_secondApp = new TestWebApplication(_cluster, 8082);
33+
_loggerAccessor.Output = output;
34+
_firstApp = new TestWebApplication(_cluster, 8081, loggerAccessor: _loggerAccessor);
35+
_secondApp = new TestWebApplication(_cluster, 8082, loggerAccessor: _loggerAccessor);
3336
}
3437

3538
[Fact]

ManagedCode.Orleans.SignalR.Tests/HubSmokeTests.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
using System.Collections.Concurrent;
2-
using Shouldly;
32
using ManagedCode.Orleans.SignalR.Tests.Cluster;
3+
using ManagedCode.Orleans.SignalR.Tests.Infrastructure.Logging;
44
using ManagedCode.Orleans.SignalR.Tests.TestApp;
55
using ManagedCode.Orleans.SignalR.Tests.TestApp.Hubs;
66
using Microsoft.AspNetCore.Http.Connections.Client;
77
using Microsoft.AspNetCore.SignalR.Client;
8+
using Shouldly;
89
using Xunit;
910
using Xunit.Abstractions;
1011

@@ -18,6 +19,7 @@ public class HubSmokeTests
1819
private readonly TestWebApplication _secondApp;
1920
private readonly SmokeClusterFixture _cluster;
2021
private readonly ITestOutputHelper _output;
22+
private readonly TestOutputHelperAccessor _loggerAccessor = new();
2123

2224
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(5);
2325
private static readonly TimeSpan PollInterval = TimeSpan.FromMilliseconds(50);
@@ -26,8 +28,9 @@ public HubSmokeTests(SmokeClusterFixture cluster, ITestOutputHelper output)
2628
{
2729
_cluster = cluster;
2830
_output = output;
29-
_firstApp = new TestWebApplication(_cluster, 8081);
30-
_secondApp = new TestWebApplication(_cluster, 8082);
31+
_loggerAccessor.Output = output;
32+
_firstApp = new TestWebApplication(_cluster, 8081, loggerAccessor: _loggerAccessor);
33+
_secondApp = new TestWebApplication(_cluster, 8082, loggerAccessor: _loggerAccessor);
3134
}
3235

3336
[Fact]
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
using Xunit.Abstractions;
2+
3+
namespace ManagedCode.Orleans.SignalR.Tests.Infrastructure.Logging;
4+
5+
public interface ITestOutputHelperAccessor
6+
{
7+
ITestOutputHelper? Output { get; set; }
8+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System.Threading;
2+
using Xunit.Abstractions;
3+
4+
namespace ManagedCode.Orleans.SignalR.Tests.Infrastructure.Logging;
5+
6+
internal sealed class TestOutputHelperAccessor : ITestOutputHelperAccessor
7+
{
8+
private readonly AsyncLocal<ITestOutputHelper?> _current = new();
9+
10+
public ITestOutputHelper? Output
11+
{
12+
get => _current.Value;
13+
set => _current.Value = value;
14+
}
15+
}

0 commit comments

Comments
 (0)