Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
de244ff
initial implementation
Mar 19, 2026
8f6bf9d
fixing the compilation errors
Mar 24, 2026
ea5a36f
addressing copilot comments
Mar 24, 2026
3ab0859
Merge branch 'main' into stevosyan/add-poison-message-handling
sophiatev Mar 24, 2026
4c7665d
fixing the error in the logger where I was incorrectly calling Discar…
Mar 24, 2026
3bd1dc9
moved the max dispatch count from IOrchestrationService to dispatch p…
Mar 25, 2026
2e3c7c2
updated the implementations to remove all exception-throwing in the c…
Apr 1, 2026
eecc077
comment updates
Apr 1, 2026
f0fc35d
fixed a typo, added an argument range check for the max dispatch count
Apr 1, 2026
ea7a67f
Apply suggestion from @cgillum
sophiatev Jun 2, 2026
c976817
Apply suggestions from code review
sophiatev Jun 2, 2026
559bb4d
redid the implementation to follow an interface format
Jun 3, 2026
68dbde9
mroe cleanup and PR comments
Jun 3, 2026
b9a875d
updated to wait for the async calls
Jun 3, 2026
8a77b40
Merge branch 'main' into stevosyan/add-poison-message-handling
Jun 3, 2026
c7d5803
fixing a bug related to the results count and something incorrect i h…
Jun 4, 2026
b091049
some more updates, adding a private 0 arg constructor to the rewind e…
Jun 10, 2026
868b856
Potential fix for pull request finding 'Nested 'if' statements can be…
sophiatev Jun 10, 2026
d13c7b3
Potential fix for pull request finding 'Nested 'if' statements can be…
sophiatev Jun 10, 2026
85ae250
updating the iteration logic in the entity dispatcher
Jun 10, 2026
7cbb931
addressing the first round of PR comments
Jun 12, 2026
756d592
missed changing one place from error to warning
Jun 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,17 @@ internal class RequestMessage : EntityMessage
/// </summary>
public string? ClientSpanId { get; set; }

/// <summary>
/// If the request message is poisoned, the reason it is poisoned.
/// Otherwise, null.
/// </summary>
public string? PoisonReason { get; set; }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for PoisonReason in entity request messages?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used when generating the failure response for a poisoned entity operation, i.e. here.

Since processing the history event that corresponds to the entity request is decoupled from sending the response, we need to store the poison reason in the request message so we can use it later to populate the failure details.


/// <summary>
/// The number of times this request has been dispatched.
/// </summary>
public int DispatchCount { get; set; }

/// <inheritdoc/>
public override string GetShortDescription()
{
Expand Down
10 changes: 10 additions & 0 deletions src/DurableTask.Core/Entities/OrchestrationEntityContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,16 @@ public void CompleteAcquire(OperationResult result, Guid criticalSectionId)
this.lockAcquisitionPending = false;
}

/// <summary>
/// Called when the entity lock acquisition fails.
/// </summary>
public void AbandonAcquire()
{
this.criticalSectionLocks = null;
this.criticalSectionId = null;
this.lockAcquisitionPending = false;
}

internal void AdjustOutgoingMessage(string instanceId, RequestMessage requestMessage, DateTime? cappedTime, out string eventName)
{
if (cappedTime.HasValue)
Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.Core/History/ExecutionRewoundEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public ExecutionRewoundEvent(int eventId, string? reason)
this.Reason = reason;
}

// Private ctor for JSON deserialization (required by some storage providers and out-of-proc executors)

@sophiatev sophiatev Jun 10, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR but I bug I found when testing (JSON was not able to deserialize this event because it lacked a 0-arg constructor and the other constructors all had multiple parameters)

ExecutionRewoundEvent()
: base(-1)
{
}

/// <summary>
/// Gets the event type
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.Core/History/HistoryEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ protected HistoryEvent(int eventId)
[DataMember]
public virtual EventType EventType { get; private set; }

/// <summary>
/// Gets or sets the number of times this event has been dispatched.
/// </summary>
[DataMember(EmitDefaultValue = false)]
public int DispatchCount { get; set; }

/// <summary>
/// Implementation for <see cref="IExtensibleDataObject.ExtensionData"/>.
/// </summary>
Expand Down
73 changes: 73 additions & 0 deletions src/DurableTask.Core/IPoisonMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
#nullable enable
namespace DurableTask.Core
{
using System.Threading.Tasks;
using DurableTask.Core.History;

/// <summary>
/// Provides extensibility points for detecting and handling "poison" messages and invalid work items
/// in the task dispatchers.
/// </summary>
public interface IPoisonMessageHandler
{
/// <summary>
/// Determines whether the given <see cref="HistoryEvent"/> is a poison message.
/// </summary>
/// <param name="historyEvent">The history event being dispatched.</param>
/// <param name="reason">Why the message is considered poisoned.</param>
/// <returns><c>true</c> if the message should be treated as poisoned; otherwise <c>false</c>.</returns>
public bool IsPoisonMessage(HistoryEvent historyEvent, out string? reason);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the backend to decide whether something is a poison message? Shouldn't the dispatcher be making this decision?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't, and it definitely can. I thought it would be the responsibility of the "poison message handler" since it's sort of in the name but this was just a somewhat arbitrary choice on my end. I can return responsibility to the dispatchers

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that's tricky about reviewing this interface is that we don't yet have any implementations of it. I'm thinking we should probably implement this for Azure Storage before committing to these new public APIs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually one thing I remembered is that doing it this way meant I didn't have to pass a maxDispatchCount when instantiating the dispatchers, which meant we didn't need yet another overload for the TaskHubWorker to accept this parameter when it creates them. But if we want to keep this decision on the dispatcher side and avoid another parameter, we can maybe expose this via IPoisonMessageHandler.MaxDispatchCount, if that sounds reasonable?


/// <summary>
/// Invoked to handle a poison message in the case that a message cannot necessarily
/// be "failed" by the dispatchers, so the <see cref="IPoisonMessageHandler"/> must
/// decide what to do.
/// </summary>
/// <remarks>
/// If this method returns false, the dispatcher should fall back to the default behavior
/// followed when poison message handling is not enabled.
/// </remarks>
/// <param name="orchestrationInstance">The orchestration instance the event was sent to, or null
/// if this information is not available.</param>
/// <param name="historyEvent">The "poisoned" history event.</param>
/// <param name="reason">The reason the event is "poisoned".</param>
/// <returns>True if the poison message was successfully handled, otherwise false.</returns>
Comment thread
sophiatev marked this conversation as resolved.
public Task<bool> HandlePoisonMessageAsync(OrchestrationInstance? orchestrationInstance, HistoryEvent historyEvent, string reason);

/// <summary>
/// Invoked to handle a work item that is invalid and cannot be processed at all.
/// </summary>
/// <remarks>
/// If this method returns false, the dispatcher should fall back to the default behavior
/// followed in the case of an invalid work item.
/// </remarks>
/// <param name="workItem">The work item that could not be processed.</param>
/// <param name="reason">Why the work item is invalid.</param>
/// <returns>True if the poison message was successfully handled, otherwise false.</returns>
public Task<bool> HandleInvalidWorkItemAsync(TaskOrchestrationWorkItem workItem, string reason);

/// <summary>
/// Invoked to handle a work item that is invalid and cannot be processed at all.
/// </summary>
/// <remarks>
/// If this method returns false, the dispatcher should fall back to the default behavior
/// followed in the case of an invalid work item.
/// </remarks>
/// <param name="workItem">The work item that could not be processed.</param>
/// <param name="reason">Why the work item is invalid.</param>
/// <returns>True if the poison message was successfully handled, otherwise false.</returns>
public Task<bool> HandleInvalidWorkItemAsync(TaskActivityWorkItem workItem, string reason);
}
}
1 change: 1 addition & 0 deletions src/DurableTask.Core/Logging/EventIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,6 @@ static class EventIds
public const int OrchestrationDebugTrace = 73;

public const int OrchestrationCompletedWithWarning = 74;
public const int PoisonMessageDetected = 75;
}
}
54 changes: 54 additions & 0 deletions src/DurableTask.Core/Logging/LogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1976,5 +1976,59 @@ void IEventSourceEvent.WriteEventSource() =>
Utils.PackageVersion);
}

/// <summary>
/// Log event representing the discarding of a "poison" message.
/// </summary>
internal class PoisonMessageDetected : StructuredLogEvent, IEventSourceEvent
{
public PoisonMessageDetected(OrchestrationInstance orchestrationInstance, string eventType, int taskEventId, int dispatchCount, string details)
{
this.InstanceId = orchestrationInstance?.InstanceId ?? string.Empty;
this.ExecutionId = orchestrationInstance?.ExecutionId ?? string.Empty;
this.EventType = eventType;
Comment thread
sophiatev marked this conversation as resolved.
this.TaskEventId = taskEventId;
this.Details = details;
this.DispatchCount = dispatchCount;
}

[StructuredLogField]
public string InstanceId { get; }

[StructuredLogField]
public string ExecutionId { get; }

[StructuredLogField]
public string EventType { get; }

[StructuredLogField]
public int TaskEventId { get; }

[StructuredLogField]
public int DispatchCount { get; }

[StructuredLogField]
public string Details { get; }

public override EventId EventId => new EventId(
EventIds.PoisonMessageDetected,
nameof(EventIds.PoisonMessageDetected));

public override LogLevel Level => LogLevel.Warning;

protected override string CreateLogMessage() =>
$"{this.InstanceId}: Poison message detected for {GetEventDescription(this.EventType, this.TaskEventId)} with dispatch count {this.DispatchCount}: {this.Details}";

void IEventSourceEvent.WriteEventSource() =>
StructuredEventSource.Log.PoisonMessageDetected(
this.InstanceId,
this.ExecutionId,
this.EventType,
this.TaskEventId,
this.DispatchCount,
this.Details,
Utils.AppName,
Utils.PackageVersion);
}

}
}
60 changes: 60 additions & 0 deletions src/DurableTask.Core/Logging/LogHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace DurableTask.Core.Logging
using System.Collections.Generic;
using System.Text;
using DurableTask.Core.Command;
using DurableTask.Core.Common;
using DurableTask.Core.Entities.EventFormat;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.History;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -760,6 +762,64 @@ internal void RenewActivityMessageFailed(TaskActivityWorkItem workItem, Exceptio
this.WriteStructuredLog(new LogEvents.RenewActivityMessageFailed(workItem, exception), exception);
}
}

/// <summary>
/// Logs that a "poison message" has been detected and is being dropped.
/// </summary>
/// <param name="orchestrationInstance">The orchestration instance this event was sent to.</param>
/// <param name="historyEvent">The "poisoned" event.</param>
/// <param name="details">Extra details related to the processing of this poison message.</param>
internal void PoisonMessageDetected(OrchestrationInstance? orchestrationInstance, HistoryEvent historyEvent, string details)
{
if (this.IsStructuredLoggingEnabled)
{
this.WriteStructuredLog(new LogEvents.PoisonMessageDetected(
orchestrationInstance,
historyEvent.EventType.ToString(),
Utils.GetTaskEventId(historyEvent),
historyEvent.DispatchCount,
details));
}
}

/// <summary>
/// Logs that a "poison" entity request message has been detected and is being dropped.
/// </summary>
/// <param name="orchestrationInstance">The orchestration instance this event was sent to.</param>
/// <param name="requestMessage">The "poisoned" request message.</param>
/// <param name="details">Extra details related to the processing of this poison message.</param>
internal void PoisonMessageDetected(OrchestrationInstance orchestrationInstance, RequestMessage requestMessage, string details)
{
if (this.IsStructuredLoggingEnabled)
{
this.WriteStructuredLog(new LogEvents.PoisonMessageDetected(
orchestrationInstance,
requestMessage.IsLockRequest ? "LockRequest" : "OperationRequest",
taskEventId: -1,
requestMessage.DispatchCount,
details));
}
}

/// <summary>
/// Logs that a "poison" entity lock release message has been detected and is being dropped.
/// </summary>
/// <param name="orchestrationInstance">The orchestration instance this event was sent to.</param>
/// <param name="releaseMessage">The "poisoned" release message.</param>
/// <param name="dispatchCount">The dispatch count of the release message.</param>
/// <param name="details">Extra details related to the processing of this poison message.</param>
internal void PoisonMessageDetected(OrchestrationInstance orchestrationInstance, ReleaseMessage releaseMessage, int dispatchCount, string details)
{
if (this.IsStructuredLoggingEnabled)
{
this.WriteStructuredLog(new LogEvents.PoisonMessageDetected(
orchestrationInstance,
"LockRelease",
taskEventId: -1,
dispatchCount,
details));
}
}
#endregion

internal void OrchestrationDebugTrace(string instanceId, string executionId, string details)
Expand Down
27 changes: 27 additions & 0 deletions src/DurableTask.Core/Logging/StructuredEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,33 @@ internal void DiscardingMessage(
}
}

[Event(EventIds.PoisonMessageDetected, Level = EventLevel.Warning, Version = 1)]
internal void PoisonMessageDetected(
string InstanceId,
string ExecutionId,
string EventType,
int TaskEventId,
int DispatchCount,
string Details,
string AppName,
string ExtensionVersion)
{
if (this.IsEnabled(EventLevel.Warning))
{
// TODO: Use WriteEventCore for better performance
this.WriteEvent(
EventIds.PoisonMessageDetected,
InstanceId ?? string.Empty,
ExecutionId ?? string.Empty,
EventType,
TaskEventId,
DispatchCount,
Details,
AppName,
ExtensionVersion);
}
}

[Event(EventIds.EntityBatchExecuting, Level = EventLevel.Informational, Version = 1)]
internal void EntityBatchExecuting(
string InstanceId,
Expand Down
Loading
Loading