Skip to content

Commit 9bf314d

Browse files
committed
keep messages for Users
1 parent 9e63685 commit 9bf314d

13 files changed

Lines changed: 190 additions & 19 deletions

File tree

Directory.Build.props

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
<RepositoryUrl>https://github.com/managedcode/Orleans.SignalR</RepositoryUrl>
1818
<PackageProjectUrl>https://github.com/managedcode/Orleans.SignalR</PackageProjectUrl>
1919
<Product>Managed Code - Orleans SignalR</Product>
20-
<Version>7.1.1</Version>
21-
<PackageVersion>7.1.1</PackageVersion>
20+
<Version>7.1.2</Version>
21+
<PackageVersion>7.1.2</PackageVersion>
2222

2323
</PropertyGroup>
2424
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">

ManagedCode.Orleans.SignalR.Core/Config/OrleansSignalROptions.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@ namespace ManagedCode.Orleans.SignalR.Core.Config;
44

55
public class OrleansSignalROptions
66
{
7-
public const string DefaultSignalRStreamProvider = "OrleansSignalRStreamProvider";
87
public const string OrleansSignalRStorage = "OrleansSignalRStorage";
98

109
/// <summary>
11-
/// Gets or sets the time window clients have to send a message before the server closes the connection. The default
12-
/// timeout is 30 seconds.
10+
/// Gets or sets the time window clients have to send a message before the server closes the connection.
11+
/// The default timeout is 30 seconds.
1312
/// </summary>
1413
public TimeSpan ClientTimeoutInterval { get; set; } = TimeSpan.FromSeconds(30);
1514

@@ -20,4 +19,10 @@ public class OrleansSignalROptions
2019
/// Set to false only if you don't want to send messages to the specific connectionId.
2120
/// </summary>
2221
public bool KeepEachConnectionAlive { get; set; } = true;
22+
23+
/// <summary>
24+
/// This property determines the duration for which messages are stored when a client is disconnected.
25+
/// The default timeout is 1.1 minute.
26+
/// </summary>
27+
public TimeSpan KeepMessageInterval { get; set; } = TimeSpan.FromMinutes(1.1);
2328
}

ManagedCode.Orleans.SignalR.Core/ManagedCode.Orleans.SignalR.Core.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
</PropertyGroup>
1818

1919
<ItemGroup>
20-
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="7.0.0"/>
21-
<PackageReference Include="Microsoft.Orleans.Sdk" Version="7.1.2"/>
22-
<PackageReference Include="Microsoft.Orleans.Serialization" Version="7.1.2"/>
20+
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="7.0.0" />
21+
<PackageReference Include="Microsoft.Orleans.Sdk" Version="7.1.2" />
22+
<PackageReference Include="Microsoft.Orleans.Serialization" Version="7.1.2" />
2323
</ItemGroup>
2424

2525
</Project>
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using Microsoft.AspNetCore.SignalR.Protocol;
4+
using Orleans;
5+
6+
namespace ManagedCode.Orleans.SignalR.Core.Models;
7+
8+
[Immutable]
9+
[GenerateSerializer]
10+
public class HubMessageState
11+
{
12+
[Id(0)]
13+
public Dictionary<HubMessage, DateTime> Messages { get; set; } = new();
14+
}

ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,7 @@ private Subscription CreateConnectionObserver(HubConnectionContext connection)
296296

297297
private Subscription CreateSubscription(Func<HubMessage, Task>? onNextAction)
298298
{
299-
var timeSpan =
300-
TimeIntervalHelper.GetClientTimeoutInterval(_orleansSignalOptions, _globalHubOptions, _hubOptions);
299+
var timeSpan = TimeIntervalHelper.GetClientTimeoutInterval(_orleansSignalOptions, _globalHubOptions, _hubOptions);
301300
var subscription = new Subscription(new SignalRObserver(onNextAction), timeSpan * 0.8);
302301
var reference = _clusterClient.CreateObjectReference<ISignalRObserver>(subscription.GetObserver());
303302
subscription.SetReference(reference);

ManagedCode.Orleans.SignalR.Server/Extensions/OrleansDependencyInjectionExtensions.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
using System;
2+
using System.Reflection;
23
using ManagedCode.Orleans.SignalR.Core.Config;
34
using ManagedCode.Orleans.SignalR.Core.HubContext;
45
using ManagedCode.Orleans.SignalR.Core.SignalR;
56
using Microsoft.AspNetCore.SignalR;
67
using Microsoft.Extensions.DependencyInjection;
8+
using Orleans;
9+
using Orleans.Configuration;
10+
using Orleans.Hosting;
11+
using Orleans.Runtime;
712

813
namespace ManagedCode.Orleans.SignalR.Server.Extensions;
914

@@ -27,4 +32,27 @@ public static ISignalRServerBuilder AddOrleans(this ISignalRServerBuilder signal
2732

2833
return signalrBuilder;
2934
}
35+
36+
public static ISiloBuilder ConfigureOrleansSignalR(this ISiloBuilder siloBuilder)
37+
{
38+
var timeSpan = TimeSpan.FromMinutes(5);
39+
40+
void SetSpecificCollectionAge<T>(GrainCollectionOptions options)
41+
{
42+
var attribute = typeof(T).GetCustomAttribute<GrainTypeAttribute>();
43+
if (attribute is not null)
44+
{
45+
var grainType = attribute.GetGrainType(null, null).ToString();
46+
options.ClassSpecificCollectionAge[grainType!] = timeSpan;
47+
}
48+
}
49+
50+
return siloBuilder.Configure<GrainCollectionOptions>(options =>
51+
{
52+
SetSpecificCollectionAge<SignalRConnectionHolderGrain>(options);
53+
SetSpecificCollectionAge<SignalRGroupGrain>(options);
54+
SetSpecificCollectionAge<SignalRInvocationGrain>(options);
55+
SetSpecificCollectionAge<SignalRUserGrain>(options);
56+
});
57+
}
3058
}

ManagedCode.Orleans.SignalR.Server/SignalRUserGrain.cs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System;
2+
using System.Linq;
13
using System.Threading;
24
using System.Threading.Tasks;
35
using ManagedCode.Orleans.SignalR.Core.Config;
@@ -20,16 +22,22 @@ namespace ManagedCode.Orleans.SignalR.Server;
2022
public class SignalRUserGrain : Grain, ISignalRUserGrain
2123
{
2224
private readonly ILogger<SignalRUserGrain> _logger;
25+
private readonly IOptions<OrleansSignalROptions> _orleansSignalOptions;
2326
private readonly ObserverManager<ISignalRObserver> _observerManager;
2427
private readonly IPersistentState<ConnectionState> _stateStorage;
28+
private readonly IPersistentState<HubMessageState> _messagesStorage;
2529

26-
public SignalRUserGrain(ILogger<SignalRUserGrain> logger, IOptions<OrleansSignalROptions> orleansSignalOptions,
27-
IOptions<HubOptions> hubOptions,
30+
public SignalRUserGrain(ILogger<SignalRUserGrain> logger,
31+
IOptions<OrleansSignalROptions> orleansSignalOptions, IOptions<HubOptions> hubOptions,
2832
[PersistentState(nameof(SignalRUserGrain), OrleansSignalROptions.OrleansSignalRStorage)]
29-
IPersistentState<ConnectionState> stateStorage)
33+
IPersistentState<ConnectionState> stateStorage,
34+
[PersistentState(nameof(SignalRUserGrain)+nameof(HubMessageState), OrleansSignalROptions.OrleansSignalRStorage)]
35+
IPersistentState<HubMessageState> messagesStorage)
3036
{
3137
_logger = logger;
38+
_orleansSignalOptions = orleansSignalOptions;
3239
_stateStorage = stateStorage;
40+
_messagesStorage = messagesStorage;
3341

3442
var timeSpan = TimeIntervalHelper.GetClientTimeoutInterval(orleansSignalOptions, hubOptions);
3543
_observerManager = new ObserverManager<ISignalRObserver>(timeSpan * 1.2, _logger);
@@ -42,6 +50,17 @@ public async Task AddConnection(string connectionId, ISignalRObserver observer)
4250
connectionId);
4351
_observerManager.Subscribe(observer, observer);
4452
_stateStorage.State.ConnectionIds.Add(connectionId, observer.GetPrimaryKeyString());
53+
54+
if (_messagesStorage.State.Messages.Count > 0)
55+
{
56+
var currentDateTime = DateTime.UtcNow;
57+
foreach (var message in _messagesStorage.State.Messages.ToArray())
58+
{
59+
_messagesStorage.State.Messages.Remove(message.Key);
60+
if(message.Value >= currentDateTime)
61+
await SendToUser(message.Key);
62+
}
63+
}
4564
}
4665

4766
public async Task RemoveConnection(string connectionId, ISignalRObserver observer)
@@ -56,6 +75,12 @@ public async Task RemoveConnection(string connectionId, ISignalRObserver observe
5675
public async Task SendToUser(HubMessage message)
5776
{
5877
await Task.Yield();
78+
if (_observerManager.Count == 0)
79+
{
80+
_messagesStorage.State.Messages.Add(message, DateTime.UtcNow.Add(_orleansSignalOptions.Value.KeepMessageInterval));
81+
return;
82+
}
83+
5984
_logger.LogInformation("Hub: {PrimaryKeyString}; SendToUser", this.GetPrimaryKeyString());
6085
await _observerManager.Notify(s => s.OnNextAsync(message));
6186
}
@@ -74,5 +99,18 @@ public override async Task OnDeactivateAsync(DeactivationReason reason, Cancella
7499
await _stateStorage.ClearStateAsync();
75100
else
76101
await _stateStorage.WriteStateAsync();
102+
103+
var currentDateTime = DateTime.UtcNow;
104+
foreach (var message in _messagesStorage.State.Messages.ToArray())
105+
{
106+
if (message.Value <= currentDateTime)
107+
_messagesStorage.State.Messages.Remove(message.Key);
108+
}
109+
110+
if(_messagesStorage.State.Messages.Count == 0)
111+
await _stateStorage.ClearStateAsync();
112+
else
113+
await _stateStorage.WriteStateAsync();
114+
77115
}
78116
}

ManagedCode.Orleans.SignalR.Tests/Cluster/Grains/Interfaces/ITestGrain.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ public interface ITestGrain : IGrainWithStringKey
66
Task PushMessage(string message);
77
Task<string> GetMessage(string connectionId);
88
Task<string> GetMessageInvoke(string connectionId);
9+
Task SendToUser(string userName, string message);
910
}

ManagedCode.Orleans.SignalR.Tests/Cluster/Grains/TestGrain.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,9 @@ public async Task<string> GetMessage(string connectionId)
4242

4343
return message;
4444
}
45+
46+
public Task SendToUser(string userName, string message)
47+
{
48+
return _hubContext.Clients.User(userName).SendAsync("SendMessage", message);
49+
}
4550
}

ManagedCode.Orleans.SignalR.Tests/Cluster/TestClientConfigurations.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ public class TestClientConfigurations : IClientBuilderConfigurator
1010
{
1111
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
1212
{
13-
clientBuilder.AddMemoryStreams(OrleansSignalROptions.DefaultSignalRStreamProvider);
14-
1513
clientBuilder.Services
1614
.AddSignalR()
1715
.AddOrleans();

0 commit comments

Comments
 (0)