Skip to content

Commit e6db662

Browse files
committed
Add durable rate limiter state cleanup
1 parent 8d6a1d5 commit e6db662

15 files changed

Lines changed: 997 additions & 9 deletions

ManagedCode.Orleans.RateLimiting.Core/Interfaces/IRateLimiterGrain.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ public interface IRateLimiterGrain : IGrainWithStringKey
1919
ValueTask ReleaseLease(Guid leaseId);
2020

2121
ValueTask ResetAsync();
22+
ValueTask DeleteStateAsync();
2223
}

ManagedCode.Orleans.RateLimiting.Core/Models/Holders/BaseRateLimiterHolder.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ public ValueTask ResetAsync()
5555
return _grain.ResetAsync();
5656
}
5757

58+
public ValueTask DeleteStateAsync()
59+
{
60+
return _grain.DeleteStateAsync();
61+
}
62+
5863
public async Task<OrleansRateLimitLease> AcquireAndCheckConfigurationAsync(TOption? options)
5964
{
6065
if (options is null && _option is null)

ManagedCode.Orleans.RateLimiting.Core/Models/Holders/ILimiterHolder.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ public interface ILimiterHolder
99
Task<OrleansRateLimitLease> AcquireAndConfigureAsync(int permitCount = 1);
1010
ValueTask<RateLimiterStatistics?> GetStatisticsAsync();
1111
ValueTask ResetAsync();
12+
ValueTask DeleteStateAsync();
1213
}

ManagedCode.Orleans.RateLimiting.Server/Grains/RateLimiterGrain.Persistence.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,22 @@ private void CaptureRuntimeSnapshot(RateLimiterGrainState<TOptions> state)
3030
state.UpdatedAtUtc = DateTimeOffset.UtcNow;
3131
}
3232

33+
private async Task ClearStoredStateAsync()
34+
{
35+
await _stateLock.WaitAsync();
36+
try
37+
{
38+
_stateDirty = false;
39+
_stateDeleted = true;
40+
_state.State = new RateLimiterGrainState<TOptions>();
41+
await _state.ClearStateAsync();
42+
}
43+
finally
44+
{
45+
_stateLock.Release();
46+
}
47+
}
48+
3349
private void DisposeStateFlushTimer()
3450
{
3551
_stateFlushTimer?.Dispose();
@@ -150,6 +166,7 @@ private async Task MutateStateAsync(Action<RateLimiterGrainState<TOptions>> upda
150166
try
151167
{
152168
update(_state.State);
169+
_stateDeleted = false;
153170
_stateDirty = true;
154171

155172
if (flushImmediately)

ManagedCode.Orleans.RateLimiting.Server/Grains/RateLimiterGrain.cs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public abstract partial class RateLimiterGrain<TLimiter, TOptions> : Grain, IDis
2222
private const int SingleSemaphoreSlot = 1;
2323

2424
private readonly SemaphoreSlim _configurationLock = new(SingleSemaphoreSlot, SingleSemaphoreSlot);
25+
private readonly TOptions _defaultOptions;
2526
private readonly ILogger _logger;
2627
private readonly object _limiterLifetimeSync = new();
2728
private readonly ConcurrentDictionary<Guid, RateLimitLease> _rateLimitLeases = new();
@@ -31,6 +32,7 @@ public abstract partial class RateLimiterGrain<TLimiter, TOptions> : Grain, IDis
3132
private int _activeAcquireCount;
3233
private bool _configurationLockDisposed;
3334
private TaskCompletionSource? _noActiveAcquires;
35+
private bool _stateDeleted;
3436
private bool _stateDirty;
3537
private IGrainTimer? _stateFlushTimer;
3638
private bool _stateLockDisposed;
@@ -43,6 +45,7 @@ protected RateLimiterGrain(
4345
IOptions<RateLimiterPersistenceOptions> persistenceOptions)
4446
{
4547
_logger = logger;
48+
_defaultOptions = options;
4649
_options = options;
4750
_state = state;
4851
_stateFlushPeriod = persistenceOptions.Value.StateFlushPeriod;
@@ -145,13 +148,32 @@ public async ValueTask ResetAsync()
145148
}
146149
}
147150

151+
public async ValueTask DeleteStateAsync()
152+
{
153+
await _configurationLock.WaitAsync();
154+
try
155+
{
156+
await WaitForActiveAcquiresAsync();
157+
DisposeRateLimiter();
158+
_options = _defaultOptions;
159+
await ClearStoredStateAsync();
160+
RateLimiter = CreateDefaultRateLimiter();
161+
}
162+
finally
163+
{
164+
_configurationLock.Release();
165+
}
166+
}
167+
148168
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
149169
{
150170
await _configurationLock.WaitAsync(cancellationToken);
151171
try
152172
{
153173
await WaitForActiveAcquiresAsync(cancellationToken);
154-
await MutateStateAsync(CaptureRuntimeSnapshot, flushImmediately: true);
174+
175+
if (!_stateDeleted || _stateDirty)
176+
await MutateStateAsync(CaptureRuntimeSnapshot, flushImmediately: true);
155177
}
156178
finally
157179
{

ManagedCode.Orleans.RateLimiting.Server/Grains/RateLimiterStorageNames.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@ namespace ManagedCode.Orleans.RateLimiting.Server.Grains;
22

33
internal static class RateLimiterStorageNames
44
{
5-
public const string StateName = ManagedCode.Orleans.RateLimiting.Server.Options.RateLimiterPersistenceDefaults.StateName;
6-
public const string StorageProviderName = ManagedCode.Orleans.RateLimiting.Server.Options.RateLimiterPersistenceDefaults.StorageProviderName;
5+
public const string StateName = ManagedCode.Orleans.RateLimiting.Server.RateLimiterStorageNames.StateName;
6+
public const string StorageProviderName = ManagedCode.Orleans.RateLimiting.Server.RateLimiterStorageNames.StorageProviderName;
77
}

ManagedCode.Orleans.RateLimiting.Server/Options/RateLimiterPersistenceDefaults.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ namespace ManagedCode.Orleans.RateLimiting.Server.Options;
33
public static class RateLimiterPersistenceDefaults
44
{
55
public const int StateFlushPeriodMinutes = 5;
6-
public const string StateName = "rateLimiterState";
7-
public const string StorageProviderName = "ManagedCode.Orleans.RateLimiting";
6+
public const string StateName = ManagedCode.Orleans.RateLimiting.Server.RateLimiterStorageNames.StateName;
7+
public const string StorageProviderName = ManagedCode.Orleans.RateLimiting.Server.RateLimiterStorageNames.StorageProviderName;
88
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace ManagedCode.Orleans.RateLimiting.Server;
2+
3+
public static class RateLimiterStorageNames
4+
{
5+
public const string StateName = "rateLimiterState";
6+
public const string StorageProviderName = "ManagedCode.Orleans.RateLimiting";
7+
}

ManagedCode.Orleans.RateLimiting.Tests/Cluster/TestSiloConfigurations.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Threading.RateLimiting;
22
using ManagedCode.Orleans.RateLimiting.Core.Extensions;
3+
using ManagedCode.Orleans.RateLimiting.Server;
34
using ManagedCode.Orleans.RateLimiting.Server.Extensions;
45
using ManagedCode.Orleans.RateLimiting.Server.Options;
56
using Microsoft.Extensions.DependencyInjection;
@@ -13,7 +14,7 @@ public class TestSiloConfigurations : ISiloConfigurator
1314

1415
public void Configure(ISiloBuilder siloBuilder)
1516
{
16-
siloBuilder.AddMemoryGrainStorage(RateLimiterPersistenceDefaults.StorageProviderName);
17+
siloBuilder.AddMemoryGrainStorage(RateLimiterStorageNames.StorageProviderName);
1718
siloBuilder.AddOrleansRateLimiting();
1819
siloBuilder.Services.Configure<RateLimiterPersistenceOptions>(options =>
1920
{
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
using System.Threading.RateLimiting;
2+
using ManagedCode.Orleans.RateLimiting.Core.Extensions;
3+
using ManagedCode.Orleans.RateLimiting.Core.Interfaces;
4+
using ManagedCode.Orleans.RateLimiting.Core.Models.Holders;
5+
using ManagedCode.Orleans.RateLimiting.Tests.Cluster;
6+
using Orleans;
7+
using Orleans.Runtime;
8+
9+
namespace ManagedCode.Orleans.RateLimiting.Tests;
10+
11+
[ClassDataSource<TestClusterApplication>(Shared = SharedType.PerTestSession)]
12+
public class RateLimiterConfigurationAndSnapshotPersistenceTests
13+
{
14+
private const int ExpectedAvailablePermits = PermitLimit - PermitCount;
15+
private const int ManagementGrainKey = 0;
16+
private const int PermitCount = 2;
17+
private const int PermitLimit = 5;
18+
private const int QueueLimit = 0;
19+
private const int SegmentsPerWindow = 2;
20+
private const int TokensPerPeriod = 1;
21+
private const int WindowMinutes = 10;
22+
23+
private static readonly TimeSpan ActivationCollectionAge = TimeSpan.Zero;
24+
private static readonly TimeSpan FixedWindow = TimeSpan.FromMinutes(WindowMinutes);
25+
26+
private readonly TestClusterApplication _testApp;
27+
28+
public RateLimiterConfigurationAndSnapshotPersistenceTests(TestClusterApplication testApp)
29+
{
30+
_testApp = testApp;
31+
}
32+
33+
[Test]
34+
public async Task FixedWindowConfigurationAndCurrentQuotaValueSurviveForcedActivationCollection()
35+
{
36+
var rateLimiter = _testApp.Cluster.Client.GetFixedWindowRateLimiter($"{nameof(FixedWindowConfigurationAndCurrentQuotaValueSurviveForcedActivationCollection)}-{Guid.NewGuid():N}");
37+
var options = new FixedWindowRateLimiterOptions
38+
{
39+
AutoReplenishment = false,
40+
PermitLimit = PermitLimit,
41+
QueueLimit = QueueLimit,
42+
Window = FixedWindow
43+
};
44+
45+
await rateLimiter.Configure(options);
46+
await AcquireAndDisposeAsync(rateLimiter);
47+
await ForceActivationCollectionAsync();
48+
49+
var configuration = await rateLimiter.GetConfiguration();
50+
configuration.PermitLimit.ShouldBe(PermitLimit);
51+
configuration.QueueLimit.ShouldBe(QueueLimit);
52+
configuration.Window.ShouldBe(FixedWindow);
53+
54+
await AssertCurrentAvailablePermitsAsync(rateLimiter);
55+
}
56+
57+
[Test]
58+
public async Task SlidingWindowConfigurationAndCurrentQuotaValueSurviveForcedActivationCollection()
59+
{
60+
var rateLimiter = _testApp.Cluster.Client.GetSlidingWindowRateLimiter($"{nameof(SlidingWindowConfigurationAndCurrentQuotaValueSurviveForcedActivationCollection)}-{Guid.NewGuid():N}");
61+
var options = new SlidingWindowRateLimiterOptions
62+
{
63+
AutoReplenishment = false,
64+
PermitLimit = PermitLimit,
65+
QueueLimit = QueueLimit,
66+
SegmentsPerWindow = SegmentsPerWindow,
67+
Window = FixedWindow
68+
};
69+
70+
await rateLimiter.Configure(options);
71+
await AcquireAndDisposeAsync(rateLimiter);
72+
await ForceActivationCollectionAsync();
73+
74+
var configuration = await rateLimiter.GetConfiguration();
75+
configuration.PermitLimit.ShouldBe(PermitLimit);
76+
configuration.QueueLimit.ShouldBe(QueueLimit);
77+
configuration.SegmentsPerWindow.ShouldBe(SegmentsPerWindow);
78+
configuration.Window.ShouldBe(FixedWindow);
79+
80+
await AssertCurrentAvailablePermitsAsync(rateLimiter);
81+
}
82+
83+
[Test]
84+
public async Task TokenBucketConfigurationAndCurrentQuotaValueSurviveForcedActivationCollection()
85+
{
86+
var rateLimiter = _testApp.Cluster.Client.GetTokenBucketRateLimiter($"{nameof(TokenBucketConfigurationAndCurrentQuotaValueSurviveForcedActivationCollection)}-{Guid.NewGuid():N}");
87+
var options = new TokenBucketRateLimiterOptions
88+
{
89+
AutoReplenishment = false,
90+
QueueLimit = QueueLimit,
91+
ReplenishmentPeriod = FixedWindow,
92+
TokenLimit = PermitLimit,
93+
TokensPerPeriod = TokensPerPeriod
94+
};
95+
96+
await rateLimiter.Configure(options);
97+
await AcquireAndDisposeAsync(rateLimiter);
98+
await ForceActivationCollectionAsync();
99+
100+
var configuration = await rateLimiter.GetConfiguration();
101+
configuration.QueueLimit.ShouldBe(QueueLimit);
102+
configuration.ReplenishmentPeriod.ShouldBe(FixedWindow);
103+
configuration.TokenLimit.ShouldBe(PermitLimit);
104+
configuration.TokensPerPeriod.ShouldBe(TokensPerPeriod);
105+
106+
await AssertCurrentAvailablePermitsAsync(rateLimiter);
107+
}
108+
109+
[Test]
110+
public async Task ConcurrencyConfigurationCurrentQuotaValueAndActiveLeasesSurviveForcedActivationCollection()
111+
{
112+
var rateLimiter = _testApp.Cluster.Client.GetConcurrencyLimiter($"{nameof(ConcurrencyConfigurationCurrentQuotaValueAndActiveLeasesSurviveForcedActivationCollection)}-{Guid.NewGuid():N}");
113+
var options = new ConcurrencyLimiterOptions
114+
{
115+
PermitLimit = PermitLimit,
116+
QueueLimit = QueueLimit
117+
};
118+
119+
await rateLimiter.Configure(options);
120+
var heldLease = await rateLimiter.AcquireAsync(PermitCount);
121+
heldLease.IsAcquired.ShouldBeTrue();
122+
123+
await ForceActivationCollectionAsync();
124+
125+
var configuration = await rateLimiter.GetConfiguration();
126+
configuration.PermitLimit.ShouldBe(PermitLimit);
127+
configuration.QueueLimit.ShouldBe(QueueLimit);
128+
129+
await AssertCurrentAvailablePermitsAsync(rateLimiter);
130+
131+
await heldLease.DisposeAsync();
132+
}
133+
134+
private static async Task AcquireAndDisposeAsync<TGrain, TOptions>(BaseRateLimiterHolder<TGrain, TOptions> rateLimiter)
135+
where TGrain : IGrainWithStringKey, IRateLimiterGrainWithConfiguration<TOptions>
136+
where TOptions : class
137+
{
138+
await using var acquiredLease = await rateLimiter.AcquireAsync(PermitCount);
139+
acquiredLease.IsAcquired.ShouldBeTrue();
140+
}
141+
142+
private static async Task AssertCurrentAvailablePermitsAsync<TGrain, TOptions>(BaseRateLimiterHolder<TGrain, TOptions> rateLimiter)
143+
where TGrain : IGrainWithStringKey, IRateLimiterGrainWithConfiguration<TOptions>
144+
where TOptions : class
145+
{
146+
var statistics = await rateLimiter.GetStatisticsAsync();
147+
statistics.ShouldNotBeNull();
148+
statistics.CurrentAvailablePermits.ShouldBe(ExpectedAvailablePermits);
149+
}
150+
151+
private async Task ForceActivationCollectionAsync()
152+
{
153+
var managementGrain = _testApp.Cluster.Client.GetGrain<IManagementGrain>(ManagementGrainKey);
154+
await managementGrain.ForceActivationCollection(ActivationCollectionAge);
155+
}
156+
}

0 commit comments

Comments
 (0)