Skip to content

Commit 3cb8d43

Browse files
sophiatevSophia Tevosyandavidmrdavid
authored
Fixing the Exception Case for AzureStorageOrchestrationService.CompleteTaskOrchestrationWorkItemAsync (#1235)
* first commit * finally added an integration test * updating comments * more comment updates * more comment updates * Update test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs Co-authored-by: David Justo <david.justo.1996@gmail.com> * Update test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs Co-authored-by: David Justo <david.justo.1996@gmail.com> * adding a comment to the test method and updating an Assert.Equal --------- Co-authored-by: Sophia Tevosyan <stevosyan@microsoft.com> Co-authored-by: David Justo <david.justo.1996@gmail.com>
1 parent 931cbe0 commit 3cb8d43

3 files changed

Lines changed: 165 additions & 5 deletions

File tree

src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,11 +1215,16 @@ await this.CommitOutboundQueueMessages(
12151215
this.orchestrationSessionManager.AddMessageToPendingOrchestration(session.ControlQueue, messages, session.TraceActivityId, CancellationToken.None);
12161216
}
12171217
}
1218-
catch (RequestFailedException rfe) when (rfe.Status == (int)HttpStatusCode.PreconditionFailed)
1218+
// Handle the case where the 'ETag' has changed, which implies another worker has taken over this work item while
1219+
// we were trying to process it. We detect this in 2 cases:
1220+
// Common case: the resulting code is 'PreconditionFailed', which means our ETag no longer matches the one stored.
1221+
// Edge case: the resulting code is 'Conflict'. This can occur if this was the first orchestration work item.
1222+
// The 'Conflict' represents that we attempted to insert a new orchestration history when one already exists.
1223+
catch (DurableTaskStorageException dtse) when (dtse.HttpStatusCode == (int)HttpStatusCode.Conflict || dtse.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
12191224
{
12201225
// Precondition failure is expected to be handled internally and logged as a warning.
12211226
// The orchestration dispatcher will handle this exception by abandoning the work item
1222-
throw new SessionAbortedException("Aborting execution due to failed precondition.", rfe);
1227+
throw new SessionAbortedException("Aborting execution due to conflicting completion of the work item by another worker.", dtse);
12231228
}
12241229
catch (Exception e)
12251230
{

src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,7 +1203,11 @@ static string GetBlobName(TableEntity entity, string property)
12031203
}
12041204
catch (DurableTaskStorageException ex)
12051205
{
1206-
if (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
1206+
// Handle the case where the the history has already been updated by another caller.
1207+
// Common case: the resulting code is 'PreconditionFailed', which means "eTagValue" no longer matches the one stored, and TableTransactionActionType is "Update".
1208+
// Edge case: the resulting code is 'Conflict'. This is the case when eTagValue is null, and the TableTransactionActionType is "Add",
1209+
// in which case the exception indicates that the table entity we are trying to "add" already exists.
1210+
if (ex.HttpStatusCode == (int)HttpStatusCode.Conflict || ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
12071211
{
12081212
this.settings.Logger.SplitBrainDetected(
12091213
this.storageAccountName,
@@ -1214,7 +1218,7 @@ static string GetBlobName(TableEntity entity, string property)
12141218
numberOfTotalEvents,
12151219
historyEventNamesBuffer.ToString(0, historyEventNamesBuffer.Length - 1), // remove trailing comma
12161220
stopwatch.ElapsedMilliseconds,
1217-
eTagValue?.ToString());
1221+
eTagValue is null ? string.Empty : eTagValue.ToString());
12181222
}
12191223

12201224
throw;

test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs

Lines changed: 152 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ namespace DurableTask.AzureStorage.Tests
1717
using System.Collections.Generic;
1818
using System.Diagnostics;
1919
using System.Linq;
20+
using System.Net;
2021
using System.Threading;
2122
using System.Threading.Tasks;
2223
using Azure.Data.Tables;
@@ -29,6 +30,7 @@ namespace DurableTask.AzureStorage.Tests
2930
using DurableTask.AzureStorage.Storage;
3031
using DurableTask.AzureStorage.Tracking;
3132
using DurableTask.Core;
33+
using DurableTask.Core.Exceptions;
3234
using DurableTask.Core.History;
3335
using Microsoft.VisualStudio.TestTools.UnitTesting;
3436

@@ -88,6 +90,8 @@ async Task<AzureStorageOrchestrationService> EnsureTaskHubAsync(
8890
bool testDeletion,
8991
bool deleteBeforeCreate = true,
9092
string workerId = "test",
93+
int partitionCount = 4,
94+
TimeSpan? controlQueueVisibilityTimeout = null,
9195
PartitionManagerType partitionManagerType = PartitionManagerType.V2Safe)
9296
{
9397
string storageConnectionString = TestHelpers.GetTestStorageAccountConnectionString();
@@ -99,7 +103,12 @@ async Task<AzureStorageOrchestrationService> EnsureTaskHubAsync(
99103
StorageAccountClientProvider = new StorageAccountClientProvider(storageConnectionString),
100104
TaskHubName = taskHubName,
101105
WorkerId = workerId,
106+
PartitionCount = partitionCount,
102107
};
108+
if (controlQueueVisibilityTimeout != null)
109+
{
110+
settings.ControlQueueVisibilityTimeout = controlQueueVisibilityTimeout.Value;
111+
}
103112
this.SetPartitionManagerType(settings, partitionManagerType);
104113

105114

@@ -119,7 +128,7 @@ async Task<AzureStorageOrchestrationService> EnsureTaskHubAsync(
119128
// Control queues
120129
Assert.IsNotNull(service.AllControlQueues, "Control queue collection was not initialized.");
121130
ControlQueue[] controlQueues = service.AllControlQueues.ToArray();
122-
Assert.AreEqual(4, controlQueues.Length, "Expected to see the default four control queues created.");
131+
Assert.AreEqual(partitionCount, controlQueues.Length, $"Expected to see the default {partitionCount} control queues created.");
123132
foreach (ControlQueue queue in controlQueues)
124133
{
125134
Assert.IsTrue(await queue.InnerQueue.ExistsAsync(), $"Queue {queue.Name} was not created.");
@@ -483,6 +492,148 @@ await TestHelpers.WaitFor(
483492
Assert.IsTrue(queueMessages.All(msg => msg.DequeueCount == 1));
484493
}
485494

495+
/// <summary>
496+
/// Confirm that if two workers try to complete the same work item, a SessionAbortedException is thrown which wraps the
497+
/// inner DurableTaskStorageException, which has the correct status code.
498+
/// We check two cases:
499+
/// 1. If this is the first work item for the orchestration , the DurableTaskStorageException that is wrapped has status "Conflict"
500+
/// which is due to trying to insert an orchestration history when one already exists.
501+
/// 2. If this is not the first work item, the DurableTaskStorageException that is wrapped has status "PreconditionFailed"
502+
/// which is due to trying to update the existing orchestration history with a stale etag.
503+
/// </summary>
504+
/// <returns></returns>
505+
[TestMethod]
506+
public async Task MultipleWorkersAttemptingToCompleteSameWorkItem()
507+
{
508+
var orchestrationInstance = new OrchestrationInstance
509+
{
510+
InstanceId = "instance_id",
511+
ExecutionId = "execution_id",
512+
};
513+
514+
ExecutionStartedEvent startedEvent = new(-1, string.Empty)
515+
{
516+
Name = "orchestration",
517+
Version = string.Empty,
518+
OrchestrationInstance = orchestrationInstance,
519+
ScheduledStartTime = DateTime.UtcNow,
520+
};
521+
522+
// Create worker 1, wait for it to acquire the lease.
523+
// Make sure to set a small control queue visibility timeout so that worker 2 can reacquire the work item quickly once worker 1 loses the lease.
524+
var service1 = await this.EnsureTaskHubAsync(
525+
nameof(MultipleWorkersAttemptingToCompleteSameWorkItem),
526+
testDeletion: false,
527+
deleteBeforeCreate: true,
528+
partitionCount: 1,
529+
workerId: "1",
530+
controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1)
531+
);
532+
await service1.StartAsync();
533+
await TestHelpers.WaitFor(
534+
condition: () => service1.OwnedControlQueues.Any(),
535+
timeout: TimeSpan.FromSeconds(30));
536+
ControlQueue controlQueue = service1.OwnedControlQueues.Single();
537+
538+
// Create the orchestration and get the first work item and start "working" on it
539+
await service1.CreateTaskOrchestrationAsync(
540+
new TaskMessage()
541+
{
542+
OrchestrationInstance = orchestrationInstance,
543+
Event = startedEvent
544+
});
545+
var workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync(
546+
TimeSpan.FromMinutes(5),
547+
CancellationToken.None);
548+
var runtimeState = workItem1.OrchestrationRuntimeState;
549+
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
550+
runtimeState.AddEvent(startedEvent);
551+
runtimeState.AddEvent(new TaskScheduledEvent(0, "task"));
552+
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
553+
554+
// Now lose the lease
555+
BlobPartitionLease lease = await service1.ListBlobLeasesAsync().SingleAsync();
556+
await service1.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost);
557+
await TestHelpers.WaitFor(
558+
condition: () => !service1.OwnedControlQueues.Any(),
559+
timeout: TimeSpan.FromSeconds(30));
560+
561+
// Create worker 2, wait for it to now acquire the lease
562+
var service2 = await this.EnsureTaskHubAsync(
563+
nameof(MultipleWorkersAttemptingToCompleteSameWorkItem),
564+
testDeletion: false,
565+
deleteBeforeCreate: false,
566+
workerId: "2",
567+
partitionCount: 1,
568+
controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1)
569+
);
570+
await service2.StartAsync();
571+
await service2.OnOwnershipLeaseAquiredAsync(lease);
572+
await TestHelpers.WaitFor(
573+
condition: () => service2.OwnedControlQueues.Any(),
574+
timeout: TimeSpan.FromSeconds(60));
575+
576+
// Have worker 2 dequeue the same work item and start "working" on it
577+
var workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync(
578+
TimeSpan.FromMinutes(5),
579+
CancellationToken.None);
580+
workItem2.OrchestrationRuntimeState = runtimeState;
581+
582+
// Worker 2 completes the work item
583+
await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null);
584+
// Now worker 1 will attempt to complete the same work item. Since this is the first attempt to complete a work item and add a history for the orchestration (by worker 1),
585+
// there is no etag stored for the OrchestrationSession, and so the a "conflict" exception will be thrown as worker 2 already created a history for the orchestration.
586+
SessionAbortedException exception = await Assert.ThrowsExceptionAsync<SessionAbortedException>(async () =>
587+
await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null)
588+
);
589+
Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
590+
DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException;
591+
Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode);
592+
await service1.ReleaseTaskOrchestrationWorkItemAsync(workItem1);
593+
await service2.ReleaseTaskOrchestrationWorkItemAsync(workItem2);
594+
595+
// Now simulate a task completing for the orchestration
596+
var taskCompletedEvent = new TaskCompletedEvent(-1, 0, string.Empty);
597+
await service2.SendTaskOrchestrationMessageAsync(new TaskMessage { Event = taskCompletedEvent, OrchestrationInstance = orchestrationInstance });
598+
// Worker 2 gets the next work item related to this task completion and starts "working" on it
599+
workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync(
600+
TimeSpan.FromMinutes(5),
601+
CancellationToken.None);
602+
runtimeState = workItem2.OrchestrationRuntimeState;
603+
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
604+
runtimeState.AddEvent(taskCompletedEvent);
605+
runtimeState.AddEvent(new ExecutionCompletedEvent(1, string.Empty, OrchestrationStatus.Completed));
606+
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
607+
608+
// Now force worker 2 to lose the lease and have worker 1 acquire it
609+
lease = await service2.ListBlobLeasesAsync().SingleAsync();
610+
await service2.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost);
611+
await TestHelpers.WaitFor(
612+
condition: () => !service2.OwnedControlQueues.Any(),
613+
timeout: TimeSpan.FromSeconds(30));
614+
await service1.OnOwnershipLeaseAquiredAsync(lease);
615+
await TestHelpers.WaitFor(
616+
condition: () => service1.OwnedControlQueues.Any(),
617+
timeout: TimeSpan.FromSeconds(60));
618+
619+
// Worker 1 also acquires the work item and starts "working" on it
620+
workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync(
621+
TimeSpan.FromMinutes(5),
622+
CancellationToken.None);
623+
workItem1.OrchestrationRuntimeState = runtimeState;
624+
625+
// Worker 1 completes the work item
626+
await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null);
627+
// Now worker 2 attempts to complete the same work item. Since this is not the first work item for the orchestration, now an etag exists for the OrchestrationSession, and the exception
628+
// that is thrown will be "precondition failed" as the Etag is stale after worker 1 completed the work item.
629+
exception = await Assert.ThrowsExceptionAsync<SessionAbortedException>(async () =>
630+
await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null)
631+
);
632+
Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
633+
dtse = (DurableTaskStorageException)exception.InnerException;
634+
Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode);
635+
}
636+
486637
[TestMethod]
487638
public async Task MonitorIdleTaskHubDisconnected()
488639
{

0 commit comments

Comments
 (0)