From 7080fe85a3d5567960ecb7c4e11f42bbb2a0ac59 Mon Sep 17 00:00:00 2001 From: Kaibo Cai Date: Fri, 15 May 2026 16:42:56 -0500 Subject: [PATCH 1/7] reset pending orchestrations when worker restart --- .../OrchestrationSessionTests.cs | 78 +++++++++++++++++++ .../Messaging/ControlQueue.cs | 45 +++++++++++ .../OrchestrationSessionManager.cs | 57 +++++++++++++- 3 files changed, 179 insertions(+), 1 deletion(-) diff --git a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs b/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs index 126c4b9bc..23976d888 100644 --- a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs @@ -22,6 +22,7 @@ namespace DurableTask.AzureStorage.Tests using System.Threading.Tasks; using DurableTask.AzureStorage.Messaging; using DurableTask.AzureStorage.Monitoring; + using DurableTask.AzureStorage.Storage; using DurableTask.AzureStorage.Tracking; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -223,5 +224,82 @@ public void AbortAllSessions_NoSessions_DoesNotThrow() manager.GetStats(out _, out _, out int count); Assert.AreEqual(0, count, "Should still have no active sessions"); } + + [TestMethod] + public async Task GetNextSessionAsync_DrainedReadyQueueNode_IsIgnored() + { + var settings = new AzureStorageOrchestrationServiceSettings + { + StorageAccountClientProvider = new StorageAccountClientProvider("UseDevelopmentStorage=true"), + }; + var stats = new AzureStorageOrchestrationServiceStats(); + var trackingStore = new Mock(); + + using var manager = new OrchestrationSessionManager( + "testaccount", + settings, + stats, + trackingStore.Object); + + var storageClient = new AzureStorageClient(settings); + var messageManager = new MessageManager(settings, storageClient, settings.TaskHubName); + var controlQueue = new ControlQueue(storageClient, "partition-0", messageManager); + + object pendingBatch = CreatePendingBatch(controlQueue); + object node = AddPendingBatchNode(manager, pendingBatch); + RemovePendingBatchNode(manager, node); + EnqueueReadyForProcessingNode(manager, node); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)); + try + { + await manager.GetNextSessionAsync(entitiesOnly: false, cts.Token); + Assert.Fail("Expected cancellation after the drained node was skipped."); + } + catch (OperationCanceledException) + { + } + } + + static object CreatePendingBatch(ControlQueue controlQueue) + { + Type pendingBatchType = typeof(OrchestrationSessionManager) + .GetNestedType("PendingMessageBatch", BindingFlags.NonPublic); + + return Activator.CreateInstance( + pendingBatchType, + BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, + binder: null, + args: new object[] { controlQueue, "instance1", "execution1" }, + culture: null); + } + + static object AddPendingBatchNode(OrchestrationSessionManager manager, object pendingBatch) + { + object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches"); + MethodInfo addLast = pendingBatches.GetType().GetMethod("AddLast", new[] { pendingBatch.GetType() }); + return addLast.Invoke(pendingBatches, new[] { pendingBatch }); + } + + static void RemovePendingBatchNode(OrchestrationSessionManager manager, object node) + { + object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches"); + MethodInfo remove = pendingBatches.GetType().GetMethod("Remove", new[] { node.GetType() }); + remove.Invoke(pendingBatches, new[] { node }); + } + + static void EnqueueReadyForProcessingNode(OrchestrationSessionManager manager, object node) + { + object readyQueue = GetPrivateField(manager, "orchestrationsReadyForProcessingQueue"); + MethodInfo enqueue = readyQueue.GetType().GetMethod("Enqueue"); + enqueue.Invoke(readyQueue, new[] { node }); + } + + static object GetPrivateField(object target, string fieldName) + { + FieldInfo field = target.GetType().GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance); + Assert.IsNotNull(field); + return field.GetValue(target); + } } } diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 9f1c0d2ba..6b9a01549 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -23,6 +23,7 @@ namespace DurableTask.AzureStorage.Messaging using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Partitioning; using DurableTask.AzureStorage.Storage; + using DurableTask.Core; class ControlQueue : TaskHubQueue, IDisposable { @@ -209,6 +210,50 @@ public override Task AbandonMessageAsync(MessageData message, SessionBase? sessi return base.AbandonMessageAsync(message, session); } + /// + /// Abandons a message with zero visibility timeout so it becomes immediately visible + /// for another partition owner to pick up. This is used during drain to avoid stranding + /// messages that were dequeued but not yet promoted to active sessions. + /// + public async Task AbandonMessageForDrainAsync(MessageData message) + { + this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _); + + QueueMessage queueMessage = message.OriginalQueueMessage; + TaskMessage taskMessage = message.TaskMessage; + OrchestrationInstance instance = taskMessage.OrchestrationInstance; + + this.settings.Logger.AbandoningMessage( + this.storageAccountName, + this.settings.TaskHubName, + taskMessage.Event.EventType.ToString(), + Utils.GetTaskEventId(taskMessage.Event), + queueMessage.MessageId, + instance.InstanceId, + instance.ExecutionId, + this.storageQueue.Name, + message.SequenceNumber, + queueMessage.PopReceipt, + visibilityTimeoutSeconds: 0); + + try + { + await this.storageQueue.UpdateMessageAsync( + queueMessage, + TimeSpan.Zero, + clientRequestId: null); + } + catch (Exception e) + { + this.settings.Logger.PartitionManagerWarning( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + this.Name, + $"Failed to abandon message {queueMessage.MessageId} during drain: {e.Message}"); + } + } + public override Task DeleteMessageAsync(MessageData message, SessionBase? session = null) { this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _); diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index abf7a58b2..3a7c2e616 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -216,11 +216,60 @@ public async Task DrainAsync(string partitionId, CloseReason reason, Cancellatio } finally { - // Remove the partition from memory + // Make dequeued-but-undispatched messages visible before dropping the partition. + await this.AbandonPendingMessagesAsync(partitionId); + this.RemoveQueue(partitionId, reason, caller); } } + /// + /// Abandons all pending (dequeued but not yet dispatched) messages for the specified partition, + /// making them immediately visible in the Azure Storage queue for the new partition owner. + /// This prevents a throughput gap equal to the visibility timeout duration when a partition + /// is released during drain. + /// + async Task AbandonPendingMessagesAsync(string partitionId) + { + var messagesToAbandon = new List<(ControlQueue Queue, MessageData Message)>(); + + lock (this.messageAndSessionLock) + { + var node = this.pendingOrchestrationMessageBatches.First; + while (node != null) + { + LinkedListNode? next = node.Next; + PendingMessageBatch batch = node.Value; + + if (string.Equals(batch.ControlQueue.Name, partitionId, StringComparison.OrdinalIgnoreCase)) + { + foreach (MessageData message in batch.Messages) + { + messagesToAbandon.Add((batch.ControlQueue, message)); + } + + this.pendingOrchestrationMessageBatches.Remove(node); + } + + node = next; + } + } + + if (messagesToAbandon.Count > 0) + { + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId, + $"Abandoning {messagesToAbandon.Count} pending message(s) during drain to make them immediately visible for the new partition owner."); + + await messagesToAbandon.ParallelForEachAsync( + this.settings.MaxStorageOperationConcurrency, + item => item.Queue.AbandonMessageForDrainAsync(item.Message)); + } + } + /// /// This method enumerates all the provided queue messages looking for ExecutionStarted messages. If any are found, it /// queries table storage to ensure that each message has a matching record in the Instances table. If not, this method @@ -592,6 +641,12 @@ async Task ScheduleOrchestrationStatePrefetch( lock (this.messageAndSessionLock) { + // Drain may have removed this batch after it was queued for dispatch. + if (node.List != this.pendingOrchestrationMessageBatches) + { + continue; + } + PendingMessageBatch nextBatch = node.Value; this.pendingOrchestrationMessageBatches.Remove(node); From 9f368785d9aa06b177e3bf6f589f230dd7625827 Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Fri, 15 May 2026 17:02:20 -0500 Subject: [PATCH 2/7] Potential fix for pull request finding 'Generic catch clause' PR feedback 01 Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> --- src/DurableTask.AzureStorage/Messaging/ControlQueue.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 6b9a01549..c5af46630 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -19,6 +19,7 @@ namespace DurableTask.AzureStorage.Messaging using System.Linq; using System.Threading; using System.Threading.Tasks; + using Azure; using Azure.Storage.Queues.Models; using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Partitioning; @@ -243,7 +244,7 @@ await this.storageQueue.UpdateMessageAsync( TimeSpan.Zero, clientRequestId: null); } - catch (Exception e) + catch (RequestFailedException e) { this.settings.Logger.PartitionManagerWarning( this.storageAccountName, From aae96d0d2c1dbdcb2cceb940852ba9c415ef9adc Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Fri, 15 May 2026 17:02:40 -0500 Subject: [PATCH 3/7] Potential fix for pull request finding 'Poor error handling: empty catch block' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> --- Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs b/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs index 23976d888..bbf75a59f 100644 --- a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs @@ -258,6 +258,7 @@ public async Task GetNextSessionAsync_DrainedReadyQueueNode_IsIgnored() } catch (OperationCanceledException) { + Assert.IsTrue(true, "Operation cancellation was expected."); } } From 7b3dc0256dfd236a292c12bae9b3252fb755ed7c Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Fri, 15 May 2026 17:03:04 -0500 Subject: [PATCH 4/7] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- src/DurableTask.AzureStorage/Messaging/ControlQueue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index c5af46630..c5beba41c 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -251,7 +251,7 @@ await this.storageQueue.UpdateMessageAsync( this.settings.TaskHubName, this.settings.WorkerId, this.Name, - $"Failed to abandon message {queueMessage.MessageId} during drain: {e.Message}"); + $"Failed to abandon message {queueMessage.MessageId} during drain: {e}"); } } From e69b4b5fb4c82dcbd810a39f5b22730478d6b637 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 15 May 2026 22:12:02 +0000 Subject: [PATCH 5/7] Refactor drained-ready-node test to avoid storage dependency Agent-Logs-Url: https://github.com/Azure/durabletask/sessions/3b99ae3b-6739-4610-8ba6-45c74a048f77 Co-authored-by: kaibocai <89094811+kaibocai@users.noreply.github.com> --- .../OrchestrationSessionTests.cs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs b/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs index bbf75a59f..f3fc0fa63 100644 --- a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs @@ -18,11 +18,11 @@ namespace DurableTask.AzureStorage.Tests using System.Diagnostics; using System.Linq; using System.Reflection; + using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using DurableTask.AzureStorage.Messaging; using DurableTask.AzureStorage.Monitoring; - using DurableTask.AzureStorage.Storage; using DurableTask.AzureStorage.Tracking; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -228,10 +228,7 @@ public void AbortAllSessions_NoSessions_DoesNotThrow() [TestMethod] public async Task GetNextSessionAsync_DrainedReadyQueueNode_IsIgnored() { - var settings = new AzureStorageOrchestrationServiceSettings - { - StorageAccountClientProvider = new StorageAccountClientProvider("UseDevelopmentStorage=true"), - }; + var settings = new AzureStorageOrchestrationServiceSettings(); var stats = new AzureStorageOrchestrationServiceStats(); var trackingStore = new Mock(); @@ -241,9 +238,7 @@ public async Task GetNextSessionAsync_DrainedReadyQueueNode_IsIgnored() stats, trackingStore.Object); - var storageClient = new AzureStorageClient(settings); - var messageManager = new MessageManager(settings, storageClient, settings.TaskHubName); - var controlQueue = new ControlQueue(storageClient, "partition-0", messageManager); + var controlQueue = CreateControlQueueWithoutStorage(); object pendingBatch = CreatePendingBatch(controlQueue); object node = AddPendingBatchNode(manager, pendingBatch); @@ -275,6 +270,11 @@ static object CreatePendingBatch(ControlQueue controlQueue) culture: null); } + static ControlQueue CreateControlQueueWithoutStorage() + { + return (ControlQueue)RuntimeHelpers.GetUninitializedObject(typeof(ControlQueue)); + } + static object AddPendingBatchNode(OrchestrationSessionManager manager, object pendingBatch) { object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches"); From a6f0dee7f177c3a704f3d29b9593b861620800ae Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 15 May 2026 22:23:06 +0000 Subject: [PATCH 6/7] Make drain abandon best-effort and cancellation-aware Agent-Logs-Url: https://github.com/Azure/durabletask/sessions/eba6ff1a-308c-42eb-aca5-08cf29ab5b84 Co-authored-by: kaibocai <89094811+kaibocai@users.noreply.github.com> --- .../Messaging/ControlQueue.cs | 5 ++-- .../OrchestrationSessionManager.cs | 29 +++++++++++++++++-- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index c5beba41c..980f541cd 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -216,7 +216,7 @@ public override Task AbandonMessageAsync(MessageData message, SessionBase? sessi /// for another partition owner to pick up. This is used during drain to avoid stranding /// messages that were dequeued but not yet promoted to active sessions. /// - public async Task AbandonMessageForDrainAsync(MessageData message) + public async Task AbandonMessageForDrainAsync(MessageData message, CancellationToken cancellationToken = default) { this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _); @@ -242,7 +242,8 @@ public async Task AbandonMessageForDrainAsync(MessageData message) await this.storageQueue.UpdateMessageAsync( queueMessage, TimeSpan.Zero, - clientRequestId: null); + clientRequestId: null, + cancellationToken: cancellationToken); } catch (RequestFailedException e) { diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index 3a7c2e616..6d0a48c2c 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -217,7 +217,28 @@ public async Task DrainAsync(string partitionId, CloseReason reason, Cancellatio finally { // Make dequeued-but-undispatched messages visible before dropping the partition. - await this.AbandonPendingMessagesAsync(partitionId); + try + { + await this.AbandonPendingMessagesAsync(partitionId, cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + this.settings.Logger.PartitionManagerWarning( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId, + "Canceled while abandoning pending messages during drain."); + } + catch (Exception e) + { + this.settings.Logger.PartitionManagerWarning( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId, + $"Failed to abandon pending messages during drain: {e}"); + } this.RemoveQueue(partitionId, reason, caller); } @@ -229,7 +250,7 @@ public async Task DrainAsync(string partitionId, CloseReason reason, Cancellatio /// This prevents a throughput gap equal to the visibility timeout duration when a partition /// is released during drain. /// - async Task AbandonPendingMessagesAsync(string partitionId) + async Task AbandonPendingMessagesAsync(string partitionId, CancellationToken cancellationToken) { var messagesToAbandon = new List<(ControlQueue Queue, MessageData Message)>(); @@ -257,6 +278,8 @@ async Task AbandonPendingMessagesAsync(string partitionId) if (messagesToAbandon.Count > 0) { + cancellationToken.ThrowIfCancellationRequested(); + this.settings.Logger.PartitionManagerInfo( this.storageAccountName, this.settings.TaskHubName, @@ -266,7 +289,7 @@ async Task AbandonPendingMessagesAsync(string partitionId) await messagesToAbandon.ParallelForEachAsync( this.settings.MaxStorageOperationConcurrency, - item => item.Queue.AbandonMessageForDrainAsync(item.Message)); + item => item.Queue.AbandonMessageForDrainAsync(item.Message, cancellationToken)); } } From dd23148c86791824536b9cf0366ea5f10e94ffd6 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 15 May 2026 22:37:58 +0000 Subject: [PATCH 7/7] Fix drain logging robustness and move session tests into active test project Agent-Logs-Url: https://github.com/Azure/durabletask/sessions/9804332d-7955-4611-9934-454c9dcfa570 Co-authored-by: kaibocai <89094811+kaibocai@users.noreply.github.com> --- .../Messaging/ControlQueue.cs | 2 +- .../OrchestrationSessionTests.cs | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) rename {Test => test}/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs (96%) diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 980f541cd..6a791ea3e 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -245,7 +245,7 @@ await this.storageQueue.UpdateMessageAsync( clientRequestId: null, cancellationToken: cancellationToken); } - catch (RequestFailedException e) + catch (Exception e) { this.settings.Logger.PartitionManagerWarning( this.storageAccountName, diff --git a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs b/test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs similarity index 96% rename from Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs rename to test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs index f3fc0fa63..65014a67c 100644 --- a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs +++ b/test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs @@ -261,13 +261,16 @@ static object CreatePendingBatch(ControlQueue controlQueue) { Type pendingBatchType = typeof(OrchestrationSessionManager) .GetNestedType("PendingMessageBatch", BindingFlags.NonPublic); + Assert.IsNotNull(pendingBatchType); - return Activator.CreateInstance( + object pendingBatch = Activator.CreateInstance( pendingBatchType, BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, binder: null, args: new object[] { controlQueue, "instance1", "execution1" }, culture: null); + Assert.IsNotNull(pendingBatch); + return pendingBatch; } static ControlQueue CreateControlQueueWithoutStorage() @@ -279,13 +282,17 @@ static object AddPendingBatchNode(OrchestrationSessionManager manager, object pe { object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches"); MethodInfo addLast = pendingBatches.GetType().GetMethod("AddLast", new[] { pendingBatch.GetType() }); - return addLast.Invoke(pendingBatches, new[] { pendingBatch }); + Assert.IsNotNull(addLast); + object node = addLast.Invoke(pendingBatches, new[] { pendingBatch }); + Assert.IsNotNull(node); + return node; } static void RemovePendingBatchNode(OrchestrationSessionManager manager, object node) { object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches"); MethodInfo remove = pendingBatches.GetType().GetMethod("Remove", new[] { node.GetType() }); + Assert.IsNotNull(remove); remove.Invoke(pendingBatches, new[] { node }); } @@ -293,6 +300,7 @@ static void EnqueueReadyForProcessingNode(OrchestrationSessionManager manager, o { object readyQueue = GetPrivateField(manager, "orchestrationsReadyForProcessingQueue"); MethodInfo enqueue = readyQueue.GetType().GetMethod("Enqueue"); + Assert.IsNotNull(enqueue); enqueue.Invoke(readyQueue, new[] { node }); }