diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 9f1c0d2ba..6a791ea3e 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -19,10 +19,12 @@ namespace DurableTask.AzureStorage.Messaging using System.Linq; using System.Threading; using System.Threading.Tasks; + using Azure; using Azure.Storage.Queues.Models; using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Partitioning; using DurableTask.AzureStorage.Storage; + using DurableTask.Core; class ControlQueue : TaskHubQueue, IDisposable { @@ -209,6 +211,51 @@ public override Task AbandonMessageAsync(MessageData message, SessionBase? sessi return base.AbandonMessageAsync(message, session); } + /// + /// Abandons a message with zero visibility timeout so it becomes immediately visible + /// for another partition owner to pick up. This is used during drain to avoid stranding + /// messages that were dequeued but not yet promoted to active sessions. + /// + public async Task AbandonMessageForDrainAsync(MessageData message, CancellationToken cancellationToken = default) + { + this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _); + + QueueMessage queueMessage = message.OriginalQueueMessage; + TaskMessage taskMessage = message.TaskMessage; + OrchestrationInstance instance = taskMessage.OrchestrationInstance; + + this.settings.Logger.AbandoningMessage( + this.storageAccountName, + this.settings.TaskHubName, + taskMessage.Event.EventType.ToString(), + Utils.GetTaskEventId(taskMessage.Event), + queueMessage.MessageId, + instance.InstanceId, + instance.ExecutionId, + this.storageQueue.Name, + message.SequenceNumber, + queueMessage.PopReceipt, + visibilityTimeoutSeconds: 0); + + try + { + await this.storageQueue.UpdateMessageAsync( + queueMessage, + TimeSpan.Zero, + clientRequestId: null, + cancellationToken: cancellationToken); + } + catch (Exception e) + { + this.settings.Logger.PartitionManagerWarning( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + this.Name, + $"Failed to abandon message {queueMessage.MessageId} during drain: {e}"); + } + } + public override Task DeleteMessageAsync(MessageData message, SessionBase? session = null) { this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _); diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index abf7a58b2..6d0a48c2c 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -216,11 +216,83 @@ public async Task DrainAsync(string partitionId, CloseReason reason, Cancellatio } finally { - // Remove the partition from memory + // Make dequeued-but-undispatched messages visible before dropping the partition. + try + { + await this.AbandonPendingMessagesAsync(partitionId, cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + this.settings.Logger.PartitionManagerWarning( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId, + "Canceled while abandoning pending messages during drain."); + } + catch (Exception e) + { + this.settings.Logger.PartitionManagerWarning( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId, + $"Failed to abandon pending messages during drain: {e}"); + } + this.RemoveQueue(partitionId, reason, caller); } } + /// + /// Abandons all pending (dequeued but not yet dispatched) messages for the specified partition, + /// making them immediately visible in the Azure Storage queue for the new partition owner. + /// This prevents a throughput gap equal to the visibility timeout duration when a partition + /// is released during drain. + /// + async Task AbandonPendingMessagesAsync(string partitionId, CancellationToken cancellationToken) + { + var messagesToAbandon = new List<(ControlQueue Queue, MessageData Message)>(); + + lock (this.messageAndSessionLock) + { + var node = this.pendingOrchestrationMessageBatches.First; + while (node != null) + { + LinkedListNode? next = node.Next; + PendingMessageBatch batch = node.Value; + + if (string.Equals(batch.ControlQueue.Name, partitionId, StringComparison.OrdinalIgnoreCase)) + { + foreach (MessageData message in batch.Messages) + { + messagesToAbandon.Add((batch.ControlQueue, message)); + } + + this.pendingOrchestrationMessageBatches.Remove(node); + } + + node = next; + } + } + + if (messagesToAbandon.Count > 0) + { + cancellationToken.ThrowIfCancellationRequested(); + + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId, + $"Abandoning {messagesToAbandon.Count} pending message(s) during drain to make them immediately visible for the new partition owner."); + + await messagesToAbandon.ParallelForEachAsync( + this.settings.MaxStorageOperationConcurrency, + item => item.Queue.AbandonMessageForDrainAsync(item.Message, cancellationToken)); + } + } + /// /// This method enumerates all the provided queue messages looking for ExecutionStarted messages. If any are found, it /// queries table storage to ensure that each message has a matching record in the Instances table. If not, this method @@ -592,6 +664,12 @@ async Task ScheduleOrchestrationStatePrefetch( lock (this.messageAndSessionLock) { + // Drain may have removed this batch after it was queued for dispatch. + if (node.List != this.pendingOrchestrationMessageBatches) + { + continue; + } + PendingMessageBatch nextBatch = node.Value; this.pendingOrchestrationMessageBatches.Remove(node); diff --git a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs b/test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs similarity index 70% rename from Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs rename to test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs index 126c4b9bc..65014a67c 100644 --- a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs +++ b/test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs @@ -18,6 +18,7 @@ namespace DurableTask.AzureStorage.Tests using System.Diagnostics; using System.Linq; using System.Reflection; + using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using DurableTask.AzureStorage.Messaging; @@ -223,5 +224,91 @@ public void AbortAllSessions_NoSessions_DoesNotThrow() manager.GetStats(out _, out _, out int count); Assert.AreEqual(0, count, "Should still have no active sessions"); } + + [TestMethod] + public async Task GetNextSessionAsync_DrainedReadyQueueNode_IsIgnored() + { + var settings = new AzureStorageOrchestrationServiceSettings(); + var stats = new AzureStorageOrchestrationServiceStats(); + var trackingStore = new Mock(); + + using var manager = new OrchestrationSessionManager( + "testaccount", + settings, + stats, + trackingStore.Object); + + var controlQueue = CreateControlQueueWithoutStorage(); + + object pendingBatch = CreatePendingBatch(controlQueue); + object node = AddPendingBatchNode(manager, pendingBatch); + RemovePendingBatchNode(manager, node); + EnqueueReadyForProcessingNode(manager, node); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)); + try + { + await manager.GetNextSessionAsync(entitiesOnly: false, cts.Token); + Assert.Fail("Expected cancellation after the drained node was skipped."); + } + catch (OperationCanceledException) + { + Assert.IsTrue(true, "Operation cancellation was expected."); + } + } + + static object CreatePendingBatch(ControlQueue controlQueue) + { + Type pendingBatchType = typeof(OrchestrationSessionManager) + .GetNestedType("PendingMessageBatch", BindingFlags.NonPublic); + Assert.IsNotNull(pendingBatchType); + + object pendingBatch = Activator.CreateInstance( + pendingBatchType, + BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, + binder: null, + args: new object[] { controlQueue, "instance1", "execution1" }, + culture: null); + Assert.IsNotNull(pendingBatch); + return pendingBatch; + } + + static ControlQueue CreateControlQueueWithoutStorage() + { + return (ControlQueue)RuntimeHelpers.GetUninitializedObject(typeof(ControlQueue)); + } + + static object AddPendingBatchNode(OrchestrationSessionManager manager, object pendingBatch) + { + object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches"); + MethodInfo addLast = pendingBatches.GetType().GetMethod("AddLast", new[] { pendingBatch.GetType() }); + Assert.IsNotNull(addLast); + object node = addLast.Invoke(pendingBatches, new[] { pendingBatch }); + Assert.IsNotNull(node); + return node; + } + + static void RemovePendingBatchNode(OrchestrationSessionManager manager, object node) + { + object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches"); + MethodInfo remove = pendingBatches.GetType().GetMethod("Remove", new[] { node.GetType() }); + Assert.IsNotNull(remove); + remove.Invoke(pendingBatches, new[] { node }); + } + + static void EnqueueReadyForProcessingNode(OrchestrationSessionManager manager, object node) + { + object readyQueue = GetPrivateField(manager, "orchestrationsReadyForProcessingQueue"); + MethodInfo enqueue = readyQueue.GetType().GetMethod("Enqueue"); + Assert.IsNotNull(enqueue); + enqueue.Invoke(readyQueue, new[] { node }); + } + + static object GetPrivateField(object target, string fieldName) + { + FieldInfo field = target.GetType().GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance); + Assert.IsNotNull(field); + return field.GetValue(target); + } } }