Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ namespace DurableTask.AzureStorage.Messaging
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Storage.Queues.Models;
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Partitioning;
using DurableTask.AzureStorage.Storage;
using DurableTask.Core;

class ControlQueue : TaskHubQueue, IDisposable
{
Expand Down Expand Up @@ -209,6 +211,51 @@ public override Task AbandonMessageAsync(MessageData message, SessionBase? sessi
return base.AbandonMessageAsync(message, session);
}

/// <summary>
/// Abandons a message with zero visibility timeout so it becomes immediately visible
/// for another partition owner to pick up. This is used during drain to avoid stranding
/// messages that were dequeued but not yet promoted to active sessions.
/// </summary>
public async Task AbandonMessageForDrainAsync(MessageData message, CancellationToken cancellationToken = default)
{
this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _);

QueueMessage queueMessage = message.OriginalQueueMessage;
TaskMessage taskMessage = message.TaskMessage;
OrchestrationInstance instance = taskMessage.OrchestrationInstance;

this.settings.Logger.AbandoningMessage(
this.storageAccountName,
this.settings.TaskHubName,
taskMessage.Event.EventType.ToString(),
Utils.GetTaskEventId(taskMessage.Event),
queueMessage.MessageId,
instance.InstanceId,
instance.ExecutionId,
this.storageQueue.Name,
message.SequenceNumber,
queueMessage.PopReceipt,
visibilityTimeoutSeconds: 0);

try
{
await this.storageQueue.UpdateMessageAsync(
queueMessage,
TimeSpan.Zero,
clientRequestId: null,
cancellationToken: cancellationToken);
}
catch (Exception e)
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
this.Name,
$"Failed to abandon message {queueMessage.MessageId} during drain: {e}");
}
Comment on lines +248 to +256
}

public override Task DeleteMessageAsync(MessageData message, SessionBase? session = null)
{
this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _);
Expand Down
80 changes: 79 additions & 1 deletion src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,83 @@ public async Task DrainAsync(string partitionId, CloseReason reason, Cancellatio
}
finally
{
// Remove the partition from memory
// Make dequeued-but-undispatched messages visible before dropping the partition.
try
{
await this.AbandonPendingMessagesAsync(partitionId, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
"Canceled while abandoning pending messages during drain.");
}
catch (Exception e)
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Failed to abandon pending messages during drain: {e}");
}
Comment on lines +233 to +241

this.RemoveQueue(partitionId, reason, caller);
Comment thread
kaibocai marked this conversation as resolved.
}
}

/// <summary>
/// Abandons all pending (dequeued but not yet dispatched) messages for the specified partition,
/// making them immediately visible in the Azure Storage queue for the new partition owner.
/// This prevents a throughput gap equal to the visibility timeout duration when a partition
/// is released during drain.
/// </summary>
async Task AbandonPendingMessagesAsync(string partitionId, CancellationToken cancellationToken)
{
var messagesToAbandon = new List<(ControlQueue Queue, MessageData Message)>();

lock (this.messageAndSessionLock)
{
var node = this.pendingOrchestrationMessageBatches.First;
while (node != null)
{
LinkedListNode<PendingMessageBatch>? next = node.Next;
PendingMessageBatch batch = node.Value;

if (string.Equals(batch.ControlQueue.Name, partitionId, StringComparison.OrdinalIgnoreCase))
{
foreach (MessageData message in batch.Messages)
{
messagesToAbandon.Add((batch.ControlQueue, message));
}

this.pendingOrchestrationMessageBatches.Remove(node);
}

node = next;
}
}

if (messagesToAbandon.Count > 0)
{
cancellationToken.ThrowIfCancellationRequested();

this.settings.Logger.PartitionManagerInfo(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Abandoning {messagesToAbandon.Count} pending message(s) during drain to make them immediately visible for the new partition owner.");

await messagesToAbandon.ParallelForEachAsync(
this.settings.MaxStorageOperationConcurrency,
item => item.Queue.AbandonMessageForDrainAsync(item.Message, cancellationToken));
}
}

/// <summary>
/// This method enumerates all the provided queue messages looking for ExecutionStarted messages. If any are found, it
/// queries table storage to ensure that each message has a matching record in the Instances table. If not, this method
Expand Down Expand Up @@ -592,6 +664,12 @@ async Task ScheduleOrchestrationStatePrefetch(

lock (this.messageAndSessionLock)
{
// Drain may have removed this batch after it was queued for dispatch.
if (node.List != this.pendingOrchestrationMessageBatches)
{
continue;
}

PendingMessageBatch nextBatch = node.Value;
this.pendingOrchestrationMessageBatches.Remove(node);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace DurableTask.AzureStorage.Tests
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Messaging;
Expand Down Expand Up @@ -223,5 +224,91 @@ public void AbortAllSessions_NoSessions_DoesNotThrow()
manager.GetStats(out _, out _, out int count);
Assert.AreEqual(0, count, "Should still have no active sessions");
}

[TestMethod]
public async Task GetNextSessionAsync_DrainedReadyQueueNode_IsIgnored()
{
var settings = new AzureStorageOrchestrationServiceSettings();
var stats = new AzureStorageOrchestrationServiceStats();
var trackingStore = new Mock<ITrackingStore>();

using var manager = new OrchestrationSessionManager(
"testaccount",
settings,
stats,
trackingStore.Object);

var controlQueue = CreateControlQueueWithoutStorage();

object pendingBatch = CreatePendingBatch(controlQueue);
object node = AddPendingBatchNode(manager, pendingBatch);
RemovePendingBatchNode(manager, node);
EnqueueReadyForProcessingNode(manager, node);

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
try
{
await manager.GetNextSessionAsync(entitiesOnly: false, cts.Token);
Assert.Fail("Expected cancellation after the drained node was skipped.");
}
catch (OperationCanceledException)
{
Assert.IsTrue(true, "Operation cancellation was expected.");
}
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
}

static object CreatePendingBatch(ControlQueue controlQueue)
{
Type pendingBatchType = typeof(OrchestrationSessionManager)
.GetNestedType("PendingMessageBatch", BindingFlags.NonPublic);
Assert.IsNotNull(pendingBatchType);

object pendingBatch = Activator.CreateInstance(
pendingBatchType,
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic,
binder: null,
args: new object[] { controlQueue, "instance1", "execution1" },
culture: null);
Assert.IsNotNull(pendingBatch);
return pendingBatch;
}

static ControlQueue CreateControlQueueWithoutStorage()
{
return (ControlQueue)RuntimeHelpers.GetUninitializedObject(typeof(ControlQueue));
}

static object AddPendingBatchNode(OrchestrationSessionManager manager, object pendingBatch)
{
object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches");
MethodInfo addLast = pendingBatches.GetType().GetMethod("AddLast", new[] { pendingBatch.GetType() });
Assert.IsNotNull(addLast);
object node = addLast.Invoke(pendingBatches, new[] { pendingBatch });
Assert.IsNotNull(node);
return node;
}

static void RemovePendingBatchNode(OrchestrationSessionManager manager, object node)
{
object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches");
MethodInfo remove = pendingBatches.GetType().GetMethod("Remove", new[] { node.GetType() });
Assert.IsNotNull(remove);
remove.Invoke(pendingBatches, new[] { node });
}

static void EnqueueReadyForProcessingNode(OrchestrationSessionManager manager, object node)
{
object readyQueue = GetPrivateField(manager, "orchestrationsReadyForProcessingQueue");
MethodInfo enqueue = readyQueue.GetType().GetMethod("Enqueue");
Assert.IsNotNull(enqueue);
enqueue.Invoke(readyQueue, new[] { node });
Comment on lines +260 to +304

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied in dd23148. I added explicit Assert.IsNotNull(...) checks for reflection targets used by the helpers (PendingMessageBatch nested type and the AddLast/Remove/Enqueue methods), plus null-checks on constructed/invoked reflection results.

}

static object GetPrivateField(object target, string fieldName)
{
FieldInfo field = target.GetType().GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance);
Assert.IsNotNull(field);
return field.GetValue(target);
}
}
}
Loading