Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/DurableTask.SqlServer/Scripts/drop-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__.SetGlobalSetting
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__.TerminateInstance
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__.PurgeInstanceStateByID
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__.PurgeInstanceStateByTime
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__.PurgeInstanceStateByFilter

-- Private sprocs
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__._AddOrchestrationEvents
Expand Down
24 changes: 24 additions & 0 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,30 @@ BEGIN
END
GO


CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.PurgeInstanceStateByFilter
@CreatedTimeFrom datetime2 = NULL,
@CreatedTimeTo datetime2 = NULL,
@RuntimeStatusFilter varchar(200) = NULL
AS
BEGIN
DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub()

DECLARE @instanceIDs InstanceIDs

INSERT INTO @instanceIDs
SELECT [InstanceID] FROM Instances
WHERE [TaskHub] = @TaskHub
AND (@CreatedTimeFrom IS NULL OR [CreatedTime] >= @CreatedTimeFrom)
AND (@CreatedTimeTo IS NULL OR [CreatedTime] <= @CreatedTimeTo)
AND (@RuntimeStatusFilter IS NULL OR [RuntimeStatus] IN (SELECT [value] FROM string_split(@RuntimeStatusFilter, ',')))

DECLARE @deletedInstances int
EXECUTE @deletedInstances = __SchemaNamePlaceholder__.PurgeInstanceStateByID @instanceIDs
RETURN @deletedInstances
END
GO

CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.SetGlobalSetting
@Name varchar(300),
@Value sql_variant
Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.SqlServer/Scripts/permissions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__.RaiseEvent TO __SchemaNamePla
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__.TerminateInstance TO __SchemaNamePlaceholder___runtime
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__.PurgeInstanceStateByID TO __SchemaNamePlaceholder___runtime
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__.PurgeInstanceStateByTime TO __SchemaNamePlaceholder___runtime
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__.PurgeInstanceStateByFilter TO __SchemaNamePlaceholder___runtime

-- Internal sprocs
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._AddOrchestrationEvents TO __SchemaNamePlaceholder___runtime
Expand Down
40 changes: 25 additions & 15 deletions src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -722,28 +722,38 @@ public override async Task<PurgeResult> PurgeInstanceStateAsync(string instanceI

public override async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
{
var purgeQuery = new SqlOrchestrationQuery
{
PageSize = 1000,
CreatedTimeFrom = purgeInstanceFilter.CreatedTimeFrom,
FetchInput = false,
FetchOutput = false,
};

using SqlConnection connection = await this.GetAndOpenConnectionAsync();
using SqlCommand command = this.GetSprocCommand(
connection, $"{this.settings.SchemaName}.PurgeInstanceStateByFilter");

SqlParameter instancesDeletedReturnValue = command.Parameters.Add("@InstancesDeleted", SqlDbType.Int);
instancesDeletedReturnValue.Direction = ParameterDirection.ReturnValue;

command.Parameters.Add("@CreatedTimeFrom", SqlDbType.DateTime2).Value = purgeInstanceFilter.CreatedTimeFrom;

if (purgeInstanceFilter.CreatedTimeTo != null)
{
purgeQuery.CreatedTimeTo = purgeInstanceFilter.CreatedTimeTo.Value;
command.Parameters.Add("@CreatedTimeTo", SqlDbType.DateTime2).Value = purgeInstanceFilter.CreatedTimeTo.Value;
}

if (purgeInstanceFilter.RuntimeStatus?.Any() == true)
{
purgeQuery.StatusFilter = new HashSet<OrchestrationStatus>(purgeInstanceFilter.RuntimeStatus);
string filter = string.Join(",", purgeInstanceFilter.RuntimeStatus);
command.Parameters.Add("@RuntimeStatusFilter", SqlDbType.VarChar, size: 200).Value = filter;
}

Stopwatch latencyStopwatch = Stopwatch.StartNew();
await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper);
int purgedInstanceCount = (int)instancesDeletedReturnValue.Value;

if (purgedInstanceCount > 0)
{
this.traceHelper.PurgedInstances(
this.userId,
purgedInstanceCount,
latencyStopwatch);
}

IReadOnlyCollection<OrchestrationState> results = await this.GetManyOrchestrationsAsync(purgeQuery, CancellationToken.None);

IEnumerable<string> instanceIds = results.Select(r => r.OrchestrationInstance.InstanceId);
int purgedInstanceCount = await this.PurgeOrchestrationHistoryAsync(instanceIds);

return new PurgeResult(purgedInstanceCount);
}

Expand Down
36 changes: 36 additions & 0 deletions test/DurableTask.SqlServer.Tests/Integration/DataRetentionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,42 @@ public async Task PurgesInstancesByStatus(OrchestrationStateTimeRangeFilterType
await this.testService.PurgeAsync(DateTime.MaxValue, filterType);
}

/// <summary>
/// Verifies that PurgeInstanceStateAsync with a PurgeInstanceFilter purges all matching instances,
/// even when the count exceeds the previous 1000-instance limit.
/// This is a regression test for https://github.com/microsoft/durabletask-mssql/issues/297.
/// </summary>
[Fact]
public async Task PurgesMoreThanOneThousandInstances()
{
const int InstanceCount = 1100;

IReadOnlyList<TestInstance<string>> instances = await this.testService.RunOrchestrations(
count: InstanceCount,
instanceIdGenerator: i => $"BulkPurge_{i:0000}",
inputGenerator: i => $"Input_{i}",
orchestrationName: "QuickComplete",
version: string.Empty,
implementation: (ctx, input) => Task.FromResult(input));

TimeSpan timeout = TimeSpan.FromSeconds(60);
await Task.WhenAll(instances.Select(instance => instance.WaitForCompletion(timeout)));

var filter = new PurgeInstanceFilter(
DateTime.MinValue,
DateTime.UtcNow.AddMinutes(1),
new[] { OrchestrationStatus.Completed });

PurgeResult result = await this.testService.PurgeAsync(filter);
Assert.Equal(InstanceCount, result.DeletedInstanceCount);

foreach (TestInstance<string> instance in instances)
{
OrchestrationState purgedState = await instance.GetStateAsync();
Assert.Null(purgedState);
}
}

/// <summary>
/// Validates that external events sent to a completed orchestration are eventually removed from the database
/// and that a log message is emitted for each discarded event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ async Task ValidateDatabaseSchemaAsync(TestDatabase database, string schemaName
$"{schemaName}.TerminateInstance",
$"{schemaName}.PurgeInstanceStateByID",
$"{schemaName}.PurgeInstanceStateByTime",
$"{schemaName}.PurgeInstanceStateByFilter",
$"{schemaName}._AddOrchestrationEvents",
$"{schemaName}._CheckpointOrchestration",
$"{schemaName}._CompleteTasks",
Expand Down
5 changes: 5 additions & 0 deletions test/DurableTask.SqlServer.Tests/Utils/TestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public Task PurgeAsync(DateTime maximumThreshold, OrchestrationStateTimeRangeFil
filterType);
}

public Task<PurgeResult> PurgeAsync(PurgeInstanceFilter filter)
{
return this.OrchestrationServiceMock.Object.PurgeInstanceStateAsync(filter);
}

public async Task DisposeAsync()
{
await this.worker.StopAsync(isForced: true);
Expand Down