diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
index 9f1c0d2ba..6a791ea3e 100644
--- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
+++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
@@ -19,10 +19,12 @@ namespace DurableTask.AzureStorage.Messaging
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+ using Azure;
using Azure.Storage.Queues.Models;
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Partitioning;
using DurableTask.AzureStorage.Storage;
+ using DurableTask.Core;
class ControlQueue : TaskHubQueue, IDisposable
{
@@ -209,6 +211,51 @@ public override Task AbandonMessageAsync(MessageData message, SessionBase? sessi
return base.AbandonMessageAsync(message, session);
}
+ ///
+ /// Abandons a message with zero visibility timeout so it becomes immediately visible
+ /// for another partition owner to pick up. This is used during drain to avoid stranding
+ /// messages that were dequeued but not yet promoted to active sessions.
+ ///
+ public async Task AbandonMessageForDrainAsync(MessageData message, CancellationToken cancellationToken = default)
+ {
+ this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _);
+
+ QueueMessage queueMessage = message.OriginalQueueMessage;
+ TaskMessage taskMessage = message.TaskMessage;
+ OrchestrationInstance instance = taskMessage.OrchestrationInstance;
+
+ this.settings.Logger.AbandoningMessage(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ taskMessage.Event.EventType.ToString(),
+ Utils.GetTaskEventId(taskMessage.Event),
+ queueMessage.MessageId,
+ instance.InstanceId,
+ instance.ExecutionId,
+ this.storageQueue.Name,
+ message.SequenceNumber,
+ queueMessage.PopReceipt,
+ visibilityTimeoutSeconds: 0);
+
+ try
+ {
+ await this.storageQueue.UpdateMessageAsync(
+ queueMessage,
+ TimeSpan.Zero,
+ clientRequestId: null,
+ cancellationToken: cancellationToken);
+ }
+ catch (Exception e)
+ {
+ this.settings.Logger.PartitionManagerWarning(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ this.settings.WorkerId,
+ this.Name,
+ $"Failed to abandon message {queueMessage.MessageId} during drain: {e}");
+ }
+ }
+
public override Task DeleteMessageAsync(MessageData message, SessionBase? session = null)
{
this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _);
diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
index abf7a58b2..6d0a48c2c 100644
--- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
+++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
@@ -216,11 +216,83 @@ public async Task DrainAsync(string partitionId, CloseReason reason, Cancellatio
}
finally
{
- // Remove the partition from memory
+ // Make dequeued-but-undispatched messages visible before dropping the partition.
+ try
+ {
+ await this.AbandonPendingMessagesAsync(partitionId, cancellationToken);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ this.settings.Logger.PartitionManagerWarning(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ this.settings.WorkerId,
+ partitionId,
+ "Canceled while abandoning pending messages during drain.");
+ }
+ catch (Exception e)
+ {
+ this.settings.Logger.PartitionManagerWarning(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ this.settings.WorkerId,
+ partitionId,
+ $"Failed to abandon pending messages during drain: {e}");
+ }
+
this.RemoveQueue(partitionId, reason, caller);
}
}
+ ///
+ /// Abandons all pending (dequeued but not yet dispatched) messages for the specified partition,
+ /// making them immediately visible in the Azure Storage queue for the new partition owner.
+ /// This prevents a throughput gap equal to the visibility timeout duration when a partition
+ /// is released during drain.
+ ///
+ async Task AbandonPendingMessagesAsync(string partitionId, CancellationToken cancellationToken)
+ {
+ var messagesToAbandon = new List<(ControlQueue Queue, MessageData Message)>();
+
+ lock (this.messageAndSessionLock)
+ {
+ var node = this.pendingOrchestrationMessageBatches.First;
+ while (node != null)
+ {
+ LinkedListNode? next = node.Next;
+ PendingMessageBatch batch = node.Value;
+
+ if (string.Equals(batch.ControlQueue.Name, partitionId, StringComparison.OrdinalIgnoreCase))
+ {
+ foreach (MessageData message in batch.Messages)
+ {
+ messagesToAbandon.Add((batch.ControlQueue, message));
+ }
+
+ this.pendingOrchestrationMessageBatches.Remove(node);
+ }
+
+ node = next;
+ }
+ }
+
+ if (messagesToAbandon.Count > 0)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ this.settings.Logger.PartitionManagerInfo(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ this.settings.WorkerId,
+ partitionId,
+ $"Abandoning {messagesToAbandon.Count} pending message(s) during drain to make them immediately visible for the new partition owner.");
+
+ await messagesToAbandon.ParallelForEachAsync(
+ this.settings.MaxStorageOperationConcurrency,
+ item => item.Queue.AbandonMessageForDrainAsync(item.Message, cancellationToken));
+ }
+ }
+
///
/// This method enumerates all the provided queue messages looking for ExecutionStarted messages. If any are found, it
/// queries table storage to ensure that each message has a matching record in the Instances table. If not, this method
@@ -592,6 +664,12 @@ async Task ScheduleOrchestrationStatePrefetch(
lock (this.messageAndSessionLock)
{
+ // Drain may have removed this batch after it was queued for dispatch.
+ if (node.List != this.pendingOrchestrationMessageBatches)
+ {
+ continue;
+ }
+
PendingMessageBatch nextBatch = node.Value;
this.pendingOrchestrationMessageBatches.Remove(node);
diff --git a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs b/test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs
similarity index 70%
rename from Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs
rename to test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs
index 126c4b9bc..65014a67c 100644
--- a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs
+++ b/test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs
@@ -18,6 +18,7 @@ namespace DurableTask.AzureStorage.Tests
using System.Diagnostics;
using System.Linq;
using System.Reflection;
+ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Messaging;
@@ -223,5 +224,91 @@ public void AbortAllSessions_NoSessions_DoesNotThrow()
manager.GetStats(out _, out _, out int count);
Assert.AreEqual(0, count, "Should still have no active sessions");
}
+
+ [TestMethod]
+ public async Task GetNextSessionAsync_DrainedReadyQueueNode_IsIgnored()
+ {
+ var settings = new AzureStorageOrchestrationServiceSettings();
+ var stats = new AzureStorageOrchestrationServiceStats();
+ var trackingStore = new Mock();
+
+ using var manager = new OrchestrationSessionManager(
+ "testaccount",
+ settings,
+ stats,
+ trackingStore.Object);
+
+ var controlQueue = CreateControlQueueWithoutStorage();
+
+ object pendingBatch = CreatePendingBatch(controlQueue);
+ object node = AddPendingBatchNode(manager, pendingBatch);
+ RemovePendingBatchNode(manager, node);
+ EnqueueReadyForProcessingNode(manager, node);
+
+ using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
+ try
+ {
+ await manager.GetNextSessionAsync(entitiesOnly: false, cts.Token);
+ Assert.Fail("Expected cancellation after the drained node was skipped.");
+ }
+ catch (OperationCanceledException)
+ {
+ Assert.IsTrue(true, "Operation cancellation was expected.");
+ }
+ }
+
+ static object CreatePendingBatch(ControlQueue controlQueue)
+ {
+ Type pendingBatchType = typeof(OrchestrationSessionManager)
+ .GetNestedType("PendingMessageBatch", BindingFlags.NonPublic);
+ Assert.IsNotNull(pendingBatchType);
+
+ object pendingBatch = Activator.CreateInstance(
+ pendingBatchType,
+ BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic,
+ binder: null,
+ args: new object[] { controlQueue, "instance1", "execution1" },
+ culture: null);
+ Assert.IsNotNull(pendingBatch);
+ return pendingBatch;
+ }
+
+ static ControlQueue CreateControlQueueWithoutStorage()
+ {
+ return (ControlQueue)RuntimeHelpers.GetUninitializedObject(typeof(ControlQueue));
+ }
+
+ static object AddPendingBatchNode(OrchestrationSessionManager manager, object pendingBatch)
+ {
+ object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches");
+ MethodInfo addLast = pendingBatches.GetType().GetMethod("AddLast", new[] { pendingBatch.GetType() });
+ Assert.IsNotNull(addLast);
+ object node = addLast.Invoke(pendingBatches, new[] { pendingBatch });
+ Assert.IsNotNull(node);
+ return node;
+ }
+
+ static void RemovePendingBatchNode(OrchestrationSessionManager manager, object node)
+ {
+ object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches");
+ MethodInfo remove = pendingBatches.GetType().GetMethod("Remove", new[] { node.GetType() });
+ Assert.IsNotNull(remove);
+ remove.Invoke(pendingBatches, new[] { node });
+ }
+
+ static void EnqueueReadyForProcessingNode(OrchestrationSessionManager manager, object node)
+ {
+ object readyQueue = GetPrivateField(manager, "orchestrationsReadyForProcessingQueue");
+ MethodInfo enqueue = readyQueue.GetType().GetMethod("Enqueue");
+ Assert.IsNotNull(enqueue);
+ enqueue.Invoke(readyQueue, new[] { node });
+ }
+
+ static object GetPrivateField(object target, string fieldName)
+ {
+ FieldInfo field = target.GetType().GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance);
+ Assert.IsNotNull(field);
+ return field.GetValue(target);
+ }
}
}