Skip to content

Commit 1389219

Browse files
authored
Enable PurgeInstanceStateAsync to delete more than 1000 instances (#308)
1 parent ccd1043 commit 1389219

3 files changed

Lines changed: 59 additions & 7 deletions

File tree

src/DurableTask.SqlServer/SqlOrchestrationService.cs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ public override async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFil
729729
FetchInput = false,
730730
FetchOutput = false,
731731
};
732-
732+
733733
if (purgeInstanceFilter.CreatedTimeTo != null)
734734
{
735735
purgeQuery.CreatedTimeTo = purgeInstanceFilter.CreatedTimeTo.Value;
@@ -739,12 +739,23 @@ public override async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFil
739739
{
740740
purgeQuery.StatusFilter = new HashSet<OrchestrationStatus>(purgeInstanceFilter.RuntimeStatus);
741741
}
742-
743-
IReadOnlyCollection<OrchestrationState> results = await this.GetManyOrchestrationsAsync(purgeQuery, CancellationToken.None);
744-
745-
IEnumerable<string> instanceIds = results.Select(r => r.OrchestrationInstance.InstanceId);
746-
int purgedInstanceCount = await this.PurgeOrchestrationHistoryAsync(instanceIds);
747-
return new PurgeResult(purgedInstanceCount);
742+
743+
int totalPurgedCount = 0;
744+
while (true)
745+
{
746+
IReadOnlyCollection<OrchestrationState> results =
747+
await this.GetManyOrchestrationsAsync(purgeQuery, CancellationToken.None);
748+
749+
if (results.Count == 0)
750+
{
751+
break;
752+
}
753+
754+
IEnumerable<string> instanceIds = results.Select(r => r.OrchestrationInstance.InstanceId);
755+
totalPurgedCount += await this.PurgeOrchestrationHistoryAsync(instanceIds);
756+
}
757+
758+
return new PurgeResult(totalPurgedCount);
748759
}
749760

750761
public override async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken)

test/DurableTask.SqlServer.Tests/Integration/DataRetentionTests.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,42 @@ public async Task PurgesInstancesByStatus(OrchestrationStateTimeRangeFilterType
125125
await this.testService.PurgeAsync(DateTime.MaxValue, filterType);
126126
}
127127

128+
/// <summary>
129+
/// Verifies that PurgeInstanceStateAsync with a PurgeInstanceFilter purges all matching instances,
130+
/// even when the count exceeds the previous 1000-instance limit.
131+
/// This is a regression test for https://github.com/microsoft/durabletask-mssql/issues/297.
132+
/// </summary>
133+
[Fact]
134+
public async Task PurgesMoreThanOneThousandInstances()
135+
{
136+
const int InstanceCount = 1100;
137+
138+
IReadOnlyList<TestInstance<string>> instances = await this.testService.RunOrchestrations(
139+
count: InstanceCount,
140+
instanceIdGenerator: i => $"BulkPurge_{i:0000}",
141+
inputGenerator: i => $"Input_{i}",
142+
orchestrationName: "QuickComplete",
143+
version: string.Empty,
144+
implementation: (ctx, input) => Task.FromResult(input));
145+
146+
TimeSpan timeout = TimeSpan.FromSeconds(60);
147+
await Task.WhenAll(instances.Select(instance => instance.WaitForCompletion(timeout)));
148+
149+
var filter = new PurgeInstanceFilter(
150+
DateTime.MinValue,
151+
DateTime.UtcNow.AddMinutes(1),
152+
new[] { OrchestrationStatus.Completed });
153+
154+
PurgeResult result = await this.testService.PurgeAsync(filter);
155+
Assert.Equal(InstanceCount, result.DeletedInstanceCount);
156+
157+
foreach (TestInstance<string> instance in instances)
158+
{
159+
OrchestrationState purgedState = await instance.GetStateAsync();
160+
Assert.Null(purgedState);
161+
}
162+
}
163+
128164
/// <summary>
129165
/// Validates that external events sent to a completed orchestration are eventually removed from the database
130166
/// and that a log message is emitted for each discarded event.

test/DurableTask.SqlServer.Tests/Utils/TestService.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ public Task PurgeAsync(DateTime maximumThreshold, OrchestrationStateTimeRangeFil
107107
filterType);
108108
}
109109

110+
public Task<PurgeResult> PurgeAsync(PurgeInstanceFilter filter)
111+
{
112+
return this.OrchestrationServiceMock.Object.PurgeInstanceStateAsync(filter);
113+
}
114+
110115
public async Task DisposeAsync()
111116
{
112117
await this.worker.StopAsync(isForced: true);

0 commit comments

Comments
 (0)