Skip to content

Commit 1107cd1

Browse files
authored
Enhance purge with parallel batch deletes and partial purge timeout (#1321)
1 parent ef6b2c9 commit 1107cd1

12 files changed

Lines changed: 1135 additions & 34 deletions

File tree

src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// ----------------------------------------------------------------------------------
1+
// ----------------------------------------------------------------------------------
22
// Copyright Microsoft Corporation
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -1886,9 +1886,9 @@ public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string i
18861886
/// <summary>
18871887
/// Gets the state of all orchestration instances that match the specified parameters.
18881888
/// </summary>
1889-
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Fetch status grater than this value.</param>
1889+
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Fetch status greater than this value.</param>
18901890
/// <param name="createdTimeTo">CreatedTime of orchestrations. Fetch status less than this value.</param>
1891-
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several status.</param>
1891+
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several statuses.</param>
18921892
/// <param name="cancellationToken">Cancellation Token</param>
18931893
/// <returns>List of <see cref="OrchestrationState"/></returns>
18941894
public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus, CancellationToken cancellationToken = default(CancellationToken))
@@ -1900,9 +1900,9 @@ public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string i
19001900
/// <summary>
19011901
/// Gets the state of all orchestration instances that match the specified parameters.
19021902
/// </summary>
1903-
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Fetch status grater than this value.</param>
1903+
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Fetch status greater than this value.</param>
19041904
/// <param name="createdTimeTo">CreatedTime of orchestrations. Fetch status less than this value.</param>
1905-
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several status.</param>
1905+
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several statuses.</param>
19061906
/// <param name="top">Top is number of records per one request.</param>
19071907
/// <param name="continuationToken">ContinuationToken of the pager.</param>
19081908
/// <param name="cancellationToken">Cancellation Token</param>
@@ -2021,9 +2021,9 @@ public Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(string instanceId)
20212021
/// <summary>
20222022
/// Purge history for orchestrations that match the specified parameters.
20232023
/// </summary>
2024-
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Purges history grater than this value.</param>
2024+
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Purges history greater than this value.</param>
20252025
/// <param name="createdTimeTo">CreatedTime of orchestrations. Purges history less than this value.</param>
2026-
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several status.</param>
2026+
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several statuses.</param>
20272027
/// <returns>Class containing number of storage requests sent, along with instances and rows deleted/purged</returns>
20282028
public Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus)
20292029
{
@@ -2040,10 +2040,28 @@ async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync
20402040
/// <inheritdoc />
20412041
async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
20422042
{
2043-
PurgeHistoryResult storagePurgeHistoryResult = await this.PurgeInstanceHistoryAsync(
2044-
purgeInstanceFilter.CreatedTimeFrom,
2045-
purgeInstanceFilter.CreatedTimeTo,
2046-
purgeInstanceFilter.RuntimeStatus);
2043+
PurgeHistoryResult storagePurgeHistoryResult;
2044+
if (purgeInstanceFilter.Timeout.HasValue)
2045+
{
2046+
// Convert the timeout into a CancellationToken so that the tracking store
2047+
// only needs to observe a single cancellation mechanism.
2048+
using var timeoutCts = new CancellationTokenSource(purgeInstanceFilter.Timeout.Value);
2049+
storagePurgeHistoryResult = await this.trackingStore.PurgeInstanceHistoryAsync(
2050+
purgeInstanceFilter.CreatedTimeFrom,
2051+
purgeInstanceFilter.CreatedTimeTo,
2052+
purgeInstanceFilter.RuntimeStatus,
2053+
timeoutCts.Token);
2054+
}
2055+
else
2056+
{
2057+
// No timeout: use the original code path (no CancellationToken) to preserve
2058+
// backward-compatible behavior where IsComplete is null.
2059+
storagePurgeHistoryResult = await this.trackingStore.PurgeInstanceHistoryAsync(
2060+
purgeInstanceFilter.CreatedTimeFrom,
2061+
purgeInstanceFilter.CreatedTimeTo,
2062+
purgeInstanceFilter.RuntimeStatus);
2063+
}
2064+
20472065
return storagePurgeHistoryResult.ToCorePurgeHistoryResult();
20482066
}
20492067
#nullable enable

src/DurableTask.AzureStorage/MessageManager.cs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,9 @@ public string GetNewLargeMessageBlobName(MessageData message)
318318

319319
public async Task<int> DeleteLargeMessageBlobs(string sanitizedInstanceId, CancellationToken cancellationToken = default)
320320
{
321-
int storageOperationCount = 1;
322-
if (await this.blobContainer.ExistsAsync(cancellationToken))
321+
int storageOperationCount = 0;
322+
323+
try
323324
{
324325
await foreach (Page<Blob> page in this.blobContainer.ListBlobsAsync(sanitizedInstanceId, cancellationToken).AsPages())
325326
{
@@ -329,6 +330,22 @@ public async Task<int> DeleteLargeMessageBlobs(string sanitizedInstanceId, Cance
329330

330331
storageOperationCount += page.Values.Count;
331332
}
333+
334+
// Count the list operation even if no blobs found (the initial list request still happened)
335+
if (storageOperationCount == 0)
336+
{
337+
storageOperationCount = 1;
338+
}
339+
}
340+
catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404)
341+
{
342+
// Container does not exist; nothing to delete.
343+
storageOperationCount = 1;
344+
}
345+
catch (Azure.RequestFailedException ex) when (ex.Status == 404)
346+
{
347+
// Container does not exist; nothing to delete.
348+
storageOperationCount = 1;
332349
}
333350

334351
return storageOperationCount;

src/DurableTask.AzureStorage/PurgeHistoryResult.cs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,19 @@ public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDel
3333
this.RowsDeleted = rowsDeleted;
3434
}
3535

36+
/// <summary>
37+
/// Constructor for purge history statistics with completion status.
38+
/// </summary>
39+
/// <param name="storageRequests">Requests sent to storage</param>
40+
/// <param name="instancesDeleted">Number of instances deleted</param>
41+
/// <param name="rowsDeleted">Number of rows deleted</param>
42+
/// <param name="isComplete">Whether the purge operation completed all matching instances.</param>
43+
public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDeleted, bool? isComplete)
44+
: this(storageRequests, instancesDeleted, rowsDeleted)
45+
{
46+
this.IsComplete = isComplete;
47+
}
48+
3649
/// <summary>
3750
/// Number of requests sent to Storage during this execution of purge history
3851
/// </summary>
@@ -48,12 +61,20 @@ public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDel
4861
/// </summary>
4962
public int RowsDeleted { get; }
5063

64+
/// <summary>
65+
/// Gets a value indicating whether the purge operation is complete.
66+
/// <c>true</c> if all matching instances were purged;
67+
/// <c>false</c> if more instances remain and purge should be called again;
68+
/// <c>null</c> if completion status is unknown.
69+
/// </summary>
70+
public bool? IsComplete { get; }
71+
5172
/// <summary>
5273
/// Converts from AzureStorage.PurgeHistoryResult to Core.PurgeResult type
5374
/// </summary>
5475
public PurgeResult ToCorePurgeHistoryResult()
5576
{
56-
return new PurgeResult(this.InstancesDeleted);
77+
return new PurgeResult(this.InstancesDeleted, this.IsComplete);
5778
}
5879
}
5980
}

src/DurableTask.AzureStorage/Storage/Table.cs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ namespace DurableTask.AzureStorage.Storage
2626

2727
class Table
2828
{
29+
const int MaxParallelBatchDeletes = 10;
30+
2931
readonly AzureStorageClient azureStorageClient;
3032
readonly AzureStorageOrchestrationServiceStats stats;
3133
readonly TableServiceClient tableServiceClient;
@@ -117,6 +119,139 @@ public async Task<TableTransactionResults> DeleteBatchAsync<T>(IEnumerable<T> en
117119
return await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.Delete, item), cancellationToken: cancellationToken);
118120
}
119121

122+
/// <summary>
123+
/// Deletes entities in parallel batches of up to 100. Each batch is an atomic transaction,
124+
/// but multiple batches are submitted concurrently for improved throughput.
125+
/// Concurrency is limited to <see cref="MaxParallelBatchDeletes"/> concurrent batch transactions
126+
/// to avoid overwhelming storage and starving other operations.
127+
/// If a batch fails because an entity was already deleted (404/EntityNotFound),
128+
/// it falls back to individual deletes for that batch, skipping already-deleted entities.
129+
/// </summary>
130+
public async Task<TableTransactionResults> DeleteBatchParallelAsync<T>(
131+
IReadOnlyList<T> entityBatch,
132+
CancellationToken cancellationToken = default) where T : ITableEntity
133+
{
134+
if (entityBatch.Count == 0)
135+
{
136+
return new TableTransactionResults(Array.Empty<Response>(), TimeSpan.Zero, 0);
137+
}
138+
139+
const int batchSize = 100;
140+
int chunkCount = (entityBatch.Count + batchSize - 1) / batchSize;
141+
var chunks = new List<List<TableTransactionAction>>(chunkCount);
142+
143+
var currentChunk = new List<TableTransactionAction>(batchSize);
144+
foreach (T entity in entityBatch)
145+
{
146+
currentChunk.Add(new TableTransactionAction(TableTransactionActionType.Delete, entity));
147+
if (currentChunk.Count == batchSize)
148+
{
149+
chunks.Add(currentChunk);
150+
currentChunk = new List<TableTransactionAction>(batchSize);
151+
}
152+
}
153+
154+
if (currentChunk.Count > 0)
155+
{
156+
chunks.Add(currentChunk);
157+
}
158+
159+
var resultsBuilder = new TableTransactionResultsBuilder();
160+
using var semaphore = new SemaphoreSlim(MaxParallelBatchDeletes);
161+
162+
var stopwatch = Stopwatch.StartNew();
163+
TableTransactionResults[] allResults = await Task.WhenAll(
164+
chunks.Select(async chunk =>
165+
{
166+
await semaphore.WaitAsync(cancellationToken);
167+
try
168+
{
169+
return await this.ExecuteBatchWithFallbackAsync(chunk, cancellationToken);
170+
}
171+
finally
172+
{
173+
semaphore.Release();
174+
}
175+
}));
176+
stopwatch.Stop();
177+
178+
foreach (TableTransactionResults result in allResults)
179+
{
180+
resultsBuilder.Add(result);
181+
}
182+
183+
TableTransactionResults aggregatedResults = resultsBuilder.ToResults();
184+
return new TableTransactionResults(aggregatedResults.Responses, stopwatch.Elapsed, aggregatedResults.RequestCount);
185+
}
186+
187+
/// <summary>
188+
/// Executes a batch transaction. If it fails due to an entity not found (404),
189+
/// falls back to individual delete operations, skipping entities that are already gone.
190+
/// </summary>
191+
async Task<TableTransactionResults> ExecuteBatchWithFallbackAsync(
192+
List<TableTransactionAction> batch,
193+
CancellationToken cancellationToken)
194+
{
195+
try
196+
{
197+
return await this.ExecuteBatchAsync(batch, cancellationToken);
198+
}
199+
catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404)
200+
{
201+
// One or more entities in the batch were already deleted.
202+
// Fall back to individual deletes, skipping 404s.
203+
// Count the failed batch attempt as 1 storage request.
204+
TableTransactionResults fallbackResults = await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken);
205+
return new TableTransactionResults(
206+
fallbackResults.Responses,
207+
fallbackResults.Elapsed,
208+
fallbackResults.RequestCount + 1);
209+
}
210+
catch (RequestFailedException ex) when (ex.Status == 404)
211+
{
212+
TableTransactionResults fallbackResults = await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken);
213+
return new TableTransactionResults(
214+
fallbackResults.Responses,
215+
fallbackResults.Elapsed,
216+
fallbackResults.RequestCount + 1);
217+
}
218+
}
219+
220+
async Task<TableTransactionResults> DeleteEntitiesIndividuallyAsync(
221+
List<TableTransactionAction> batch,
222+
CancellationToken cancellationToken)
223+
{
224+
var responses = new List<Response>();
225+
var stopwatch = Stopwatch.StartNew();
226+
int requestCount = 0;
227+
228+
foreach (TableTransactionAction action in batch)
229+
{
230+
requestCount++;
231+
try
232+
{
233+
Response response = await this.tableClient.DeleteEntityAsync(
234+
action.Entity.PartitionKey,
235+
action.Entity.RowKey,
236+
action.Entity.ETag,
237+
cancellationToken).DecorateFailure();
238+
responses.Add(response);
239+
this.stats.TableEntitiesWritten.Increment();
240+
}
241+
catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404)
242+
{
243+
// Entity already deleted; skip.
244+
}
245+
catch (RequestFailedException ex) when (ex.Status == 404)
246+
{
247+
// Entity already deleted; skip.
248+
}
249+
}
250+
251+
stopwatch.Stop();
252+
return new TableTransactionResults(responses, stopwatch.Elapsed, requestCount);
253+
}
254+
120255
public async Task<TableTransactionResults> InsertOrMergeBatchAsync<T>(IEnumerable<T> entityBatch, CancellationToken cancellationToken = default) where T : ITableEntity
121256
{
122257
TableTransactionResults results = await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.UpsertMerge, item), cancellationToken: cancellationToken);

0 commit comments

Comments
 (0)