Skip to content

Commit d305ba3

Browse files
sophiatevSophia Tevosyan
andauthored
Fixing Azure Storage History/Instance Table Inconsistency (#1253)
* first commit * fixing build error * updating to a better integration test * fixed spacing * ignoring the old allowreplay test * fixing a bug where i wasnt checking that the execution id is the same * addressing PR comments --------- Co-authored-by: Sophia Tevosyan <stevosyan@microsoft.com>
1 parent 72d06ab commit d305ba3

6 files changed

Lines changed: 194 additions & 4 deletions

File tree

src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -793,7 +793,12 @@ async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(boo
793793
TraceContext = currentRequestTraceContext,
794794
};
795795

796-
string warningMessage = await this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances);
796+
string warningMessage = await this.IsExecutableInstanceAsync(
797+
session.RuntimeState,
798+
orchestrationWorkItem.NewMessages,
799+
settings.AllowReplayingTerminalInstances,
800+
session.TrackingStoreContext,
801+
cancellationToken);
797802
if (!string.IsNullOrEmpty(warningMessage))
798803
{
799804
// If all messages belong to the same execution ID, then all of them need to be discarded.
@@ -1050,7 +1055,12 @@ internal static void TraceMessageReceived(AzureStorageOrchestrationServiceSettin
10501055
data.Episode.GetValueOrDefault(-1));
10511056
}
10521057

1053-
async Task<string> IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMessage> newMessages, bool allowReplayingTerminalInstances)
1058+
async Task<string> IsExecutableInstanceAsync(
1059+
OrchestrationRuntimeState runtimeState,
1060+
IList<TaskMessage> newMessages,
1061+
bool allowReplayingTerminalInstances,
1062+
object trackingStoreContext,
1063+
CancellationToken cancellationToken)
10541064
{
10551065
if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent))
10561066
{
@@ -1085,12 +1095,25 @@ await this.trackingStore.UpdateStatusForTerminationAsync(
10851095
}
10861096

10871097
if (runtimeState.ExecutionStartedEvent != null &&
1088-
!allowReplayingTerminalInstances &&
10891098
runtimeState.OrchestrationStatus != OrchestrationStatus.Running &&
10901099
runtimeState.OrchestrationStatus != OrchestrationStatus.Pending &&
10911100
runtimeState.OrchestrationStatus != OrchestrationStatus.Suspended)
10921101
{
1093-
return $"Instance is {runtimeState.OrchestrationStatus}";
1102+
InstanceStatus instanceStatus = await this.trackingStore.FetchInstanceStatusAsync(runtimeState.OrchestrationInstance.InstanceId);
1103+
if (instanceStatus.State.OrchestrationInstance.ExecutionId == runtimeState.OrchestrationInstance.ExecutionId
1104+
&& instanceStatus.State.OrchestrationStatus != runtimeState.OrchestrationStatus)
1105+
{
1106+
await this.trackingStore.UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
1107+
runtimeState.OrchestrationInstance.InstanceId,
1108+
runtimeState.OrchestrationInstance.ExecutionId,
1109+
runtimeState,
1110+
trackingStoreContext,
1111+
cancellationToken);
1112+
}
1113+
if (!allowReplayingTerminalInstances)
1114+
{
1115+
return $"Instance is {runtimeState.OrchestrationStatus}";
1116+
}
10941117
}
10951118

10961119
return null;

src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,57 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
10481048
return eTagValue;
10491049
}
10501050

1051+
public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
1052+
string instanceId,
1053+
string executionId,
1054+
OrchestrationRuntimeState runtimeState,
1055+
object trackingStoreContext,
1056+
CancellationToken cancellationToken = default)
1057+
{
1058+
if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed &&
1059+
runtimeState.OrchestrationStatus != OrchestrationStatus.Canceled &&
1060+
runtimeState.OrchestrationStatus != OrchestrationStatus.Failed &&
1061+
runtimeState.OrchestrationStatus != OrchestrationStatus.Terminated)
1062+
{
1063+
return;
1064+
}
1065+
1066+
TrackingStoreContext context = (TrackingStoreContext)trackingStoreContext;
1067+
if (context.Blobs.Count > 0)
1068+
{
1069+
var tasks = new List<Task>(context.Blobs.Count);
1070+
foreach (string blobName in context.Blobs)
1071+
{
1072+
tasks.Add(this.messageManager.DeleteBlobAsync(blobName));
1073+
}
1074+
await Task.WhenAll(tasks);
1075+
}
1076+
1077+
string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId);
1078+
var instanceEntity = new TableEntity(sanitizedInstanceId, string.Empty)
1079+
{
1080+
// TODO: Translating null to "null" is a temporary workaround. We should prioritize
1081+
// https://github.com/Azure/durabletask/issues/477 so that this is no longer necessary.
1082+
["CustomStatus"] = runtimeState.Status ?? "null",
1083+
["ExecutionId"] = executionId,
1084+
["LastUpdatedTime"] = runtimeState.Events.Last().Timestamp,
1085+
["RuntimeStatus"] = runtimeState.OrchestrationStatus.ToString(),
1086+
["CompletedTime"] = runtimeState.CompletedTime
1087+
};
1088+
1089+
Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew();
1090+
await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity);
1091+
1092+
this.settings.Logger.InstanceStatusUpdate(
1093+
this.storageAccountName,
1094+
this.taskHubName,
1095+
instanceId,
1096+
executionId,
1097+
runtimeState.OrchestrationStatus,
1098+
Utils.GetEpisodeNumber(runtimeState),
1099+
orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds);
1100+
}
1101+
10511102
static int GetEstimatedByteCount(TableEntity entity)
10521103
{
10531104
// Assume at least 1 KB of data per entity to account for static-length properties

src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,19 @@ interface ITrackingStore
103103
/// <returns>Returns the instance status or <c>null</c> if none was found.</returns>
104104
Task<InstanceStatus> FetchInstanceStatusAsync(string instanceId, CancellationToken cancellationToken = default);
105105

106+
/// <summary>
107+
/// Updates the instance status of the specified orchestration instance to match that of <paramref name="runtimeState"/> for a completed orchestration.
108+
/// Also deletes any orphaned blobs of <paramref name="trackingStoreContext"/>.
109+
/// This method is meant to be called in the case that there is an inconsistency between the instance and history table due to a failure during a call to
110+
/// <see cref="UpdateStateAsync"/> for a completing orchestration. If the orchestration is not in a terminal state, the method will immediately return and do nothing.
111+
/// </summary>
112+
/// <param name="instanceId">The ID of the orchestration.</param>
113+
/// <param name="executionId">The execution ID of the orchestration.</param>
114+
/// <param name="runtimeState">The runtime state of the orchestration.</param>
115+
/// <param name="trackingStoreContext">Additional context for the execution that is maintained by the tracking store.</param>
116+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
117+
Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default);
118+
106119
/// <summary>
107120
/// Get The Orchestration State for querying all orchestration instances
108121
/// </summary>

src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,5 +191,31 @@ public override async Task UpdateStatusForTerminationAsync(
191191
instanceEntity.Single().State.Output = output;
192192
await this.instanceStore.WriteEntitiesAsync(instanceEntity);
193193
}
194+
195+
public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
196+
string instanceId,
197+
string executionId,
198+
OrchestrationRuntimeState runtimeState,
199+
object trackingStoreContext,
200+
CancellationToken cancellationToken = default)
201+
{
202+
if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed &&
203+
runtimeState.OrchestrationStatus != OrchestrationStatus.Canceled &&
204+
runtimeState.OrchestrationStatus != OrchestrationStatus.Failed &&
205+
runtimeState.OrchestrationStatus != OrchestrationStatus.Terminated)
206+
{
207+
return;
208+
}
209+
210+
// No blobs to delete for this tracking store implementation
211+
await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[]
212+
{
213+
new OrchestrationStateInstanceEntity()
214+
{
215+
State = Core.Common.Utils.BuildOrchestrationState(runtimeState),
216+
SequenceNumber = runtimeState.Events.Count
217+
}
218+
});
219+
}
194220
}
195221
}

src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,5 +108,8 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo
108108

109109
/// <inheritdoc />
110110
public abstract Task<ETag?> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default);
111+
112+
/// <inheritdoc />
113+
public abstract Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default);
111114
}
112115
}

test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2401,6 +2401,7 @@ await Task.WhenAll(
24012401
[DataRow(false, true, false)]
24022402
[DataRow(false, false, true)]
24032403
[DataRow(false, false, false)]
2404+
[Ignore("Skipping since this functionality has since changed, see TestWorkerFailingDuringCompleteWorkItemCall")]
24042405
public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSessions, bool sendTerminateEvent, bool allowReplayingTerminalInstances)
24052406
{
24062407
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(
@@ -2475,6 +2476,79 @@ public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSession
24752476
}
24762477
}
24772478

2479+
/// <summary>
2480+
/// Confirm that if a worker fails after committing the new history but before updating the instance state in a call to
2481+
/// <see cref="AzureStorageOrchestrationService.CompleteTaskOrchestrationWorkItemAsync"/> for an orchestration that has
2482+
/// reached a terminal state, then storage is brought to consistent state by the call to
2483+
/// <see cref="AzureStorageOrchestrationService.LockNextTaskOrchestrationWorkItemAsync"/>.
2484+
/// Since we cannot simulate a worker failure at this precise point, instead what is done by this test is that we
2485+
/// let an orchestration run to completion, and then manually change the instance table state back to "Running".
2486+
/// We then send an event to the orchestration, which triggers a call to lock the next task work item, at which point
2487+
/// the inconsistent state in storage for the terminal instance is recognized, the instance state is updated, and the work item discarded.
2488+
/// Note that this test does not confirm that orphaned blobs are deleted by the call to lock the next orchestration work item
2489+
/// in the case of a terminal orchestration with inconsistent state in storage. This is because there is no easy way to mock/inject
2490+
/// the tracking store context object that is part of the orchestration session state which keeps track of the blobs.
2491+
/// </summary>
2492+
/// <returns></returns>
2493+
[DataTestMethod]
2494+
[DataRow(true, true)]
2495+
[DataRow(false, true)]
2496+
[DataRow(true, false)]
2497+
[DataRow(false, false)]
2498+
public async Task TestWorkerFailingDuringCompleteWorkItemCall(bool enableExtendedSessions, bool terminate)
2499+
{
2500+
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions))
2501+
{
2502+
await host.StartAsync();
2503+
2504+
// Run simple orchestrator to completion, this will help us obtain a valid terminal history for the orchestrator
2505+
var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), "hello!");
2506+
var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10));
2507+
Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
2508+
2509+
// Simulate having an "out of date" Instance table, by setting it's runtime status to "Running".
2510+
// This simulates the scenario where the History table was updated, but not the Instance table.
2511+
var instanceId = client.InstanceId;
2512+
AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings(
2513+
enableExtendedSessions);
2514+
AzureStorageClient azureStorageClient = new AzureStorageClient(settings);
2515+
2516+
Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName);
2517+
TableEntity entity = new TableEntity(instanceId, "")
2518+
{
2519+
["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G")
2520+
};
2521+
await instanceTable.MergeEntityAsync(entity, Azure.ETag.All);
2522+
2523+
// Assert that the status in the Instance table reads "Running"
2524+
IList<OrchestrationState> state = await client.GetStateAsync(instanceId);
2525+
OrchestrationStatus forcedStatus = state.First().OrchestrationStatus;
2526+
Assert.AreEqual(OrchestrationStatus.Running, forcedStatus);
2527+
2528+
// The type of event sent should not matter - the event itself should be discarded, and the instance table updated
2529+
// to reflect the status in the history table.
2530+
if (terminate)
2531+
{
2532+
await client.TerminateAsync("testing");
2533+
}
2534+
else
2535+
{
2536+
await client.RaiseEventAsync("Foo", "Bar");
2537+
}
2538+
await Task.Delay(TimeSpan.FromSeconds(30));
2539+
2540+
// A replay should have occurred, forcing the instance table to be updated with a terminal status
2541+
state = await client.GetStateAsync(instanceId);
2542+
Assert.AreEqual(1, state.Count);
2543+
2544+
status = state.First();
2545+
OrchestrationStatus expectedStatus = OrchestrationStatus.Completed;
2546+
Assert.AreEqual(expectedStatus, status.OrchestrationStatus);
2547+
2548+
await host.StopAsync();
2549+
}
2550+
}
2551+
24782552
[TestMethod]
24792553
[DataRow(VersioningSettings.VersionMatchStrategy.Strict)]
24802554
[DataRow(VersioningSettings.VersionMatchStrategy.CurrentOrOlder)]

0 commit comments

Comments
 (0)