Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 5.7.4
- Fixed
- Prevented `OperationCanceledException` during pod graceful shutdown from being recorded as APM errors. Added `ICancellationAwareTransactionManager` — an optional interface that `ITransactionManager` implementations can implement to react to receive-loop cancellations. `ApmTransactionManager` implements it by setting the current Elastic APM transaction outcome to `Success`, overriding the error state set by the Azure SDK's auto-instrumentation. `ReceiverWrapper` calls `OnReceiveCancelled()` via a runtime cast before logging the shutdown warning.

## 5.7.3
- Fixed
- Corrected the shutdown `OperationCanceledException` guard introduced in 5.7.2. The previous guard checked `oce.CancellationToken.IsCancellationRequested`, which was always `false` because the Azure ServiceBus SDK raises `ProcessErrorAsync` with `CancellationToken.None` during processor shutdown. The guard now matches any `OperationCanceledException`, which is safe since genuine transport errors surface as `ServiceBusException`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Ev.ServiceBus.Abstractions.Listeners;

/// <summary>
/// Optional extension for <see cref="ITransactionManager"/> implementations that need to react
/// to receive-loop cancellations (e.g. pod graceful shutdown) — for example, to prevent APM
/// from recording the cancellation as an error transaction.
/// </summary>
public interface ICancellationAwareTransactionManager
{
void OnReceiveCancelled();
}
8 changes: 7 additions & 1 deletion src/Ev.ServiceBus.Apm/ApmTransactionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Ev.ServiceBus.Apm;
/// <summary>
/// Default Transaction uses Diagnostics for Elastic APM
/// </summary>
public class ApmTransactionManager : ITransactionManager
public class ApmTransactionManager : ITransactionManager, ICancellationAwareTransactionManager
{
public async Task RunWithInTransaction(MessageExecutionContext executionContext, Func<Task> transaction)
{
Expand Down Expand Up @@ -71,6 +71,12 @@ private static List<SpanLink> GetSpanLinks(string? diagnosticId)
return spanLinks;
}

public void OnReceiveCancelled()
{
if (IsTraceEnabled())
Agent.Tracer.CurrentTransaction.Outcome = Outcome.Success;
}

private static bool IsTraceEnabled()
=> Agent.IsConfigured && Agent.Config.Enabled && Agent.Tracer is not null && Agent.Tracer.CurrentTransaction is not null;
}
1 change: 1 addition & 0 deletions src/Ev.ServiceBus/Management/Wrappers/ReceiverWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ protected async Task OnExceptionOccured(ProcessErrorEventArgs exceptionEvent)
{
if (exceptionEvent.Exception is OperationCanceledException)
{
(_transactionManager as ICancellationAwareTransactionManager)?.OnReceiveCancelled();
_messageProcessingLogger.LogWarning(
"[Ev.ServiceBus] Receive loop cancelled for {ClientType} '{ResourceId}' during shutdown.",
_composedOptions.ClientType, _composedOptions.ResourceId);
Expand Down
44 changes: 42 additions & 2 deletions tests/Ev.ServiceBus.UnitTests/ReceiverWrapperTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ namespace Ev.ServiceBus.UnitTests;

public sealed class ReceiverWrapperTests
{
private static TestableReceiverWrapper CreateWrapper(ILogger<LoggingExtensions.MessageProcessing>? messageLogger = null)
private static TestableReceiverWrapper CreateWrapper(
ILogger<LoggingExtensions.MessageProcessing>? messageLogger = null,
ITransactionManager? transactionManager = null)
{
var mockServices = new Mock<IServiceCollection>();
var composedOptions = new ComposedReceiverOptions([new QueueOptions(mockServices.Object, "test-queue")]);
var parentOptions = new ServiceBusOptions();

var mockProvider = new Mock<IServiceProvider>();
mockProvider.Setup(p => p.GetService(typeof(ITransactionManager)))
.Returns(Mock.Of<ITransactionManager>());
.Returns(transactionManager ?? Mock.Of<ITransactionManager>());
mockProvider.Setup(p => p.GetService(typeof(ILogger<LoggingExtensions.ServiceBusClientManagement>)))
.Returns(Mock.Of<ILogger<LoggingExtensions.ServiceBusClientManagement>>());
mockProvider.Setup(p => p.GetService(typeof(ILogger<LoggingExtensions.MessageProcessing>)))
Expand Down Expand Up @@ -82,6 +84,44 @@ public async Task OnExceptionOccured_WithTaskCanceledException_DoesNotLogError()
Times.Never());
}

[Fact]
public async Task OnExceptionOccured_WithOperationCanceledException_CallsOnReceiveCancelled()
{
var mockTransactionManager = new Mock<ITransactionManager>();
var cancellationAware = mockTransactionManager.As<ICancellationAwareTransactionManager>();
var wrapper = CreateWrapper(transactionManager: mockTransactionManager.Object);

var args = new ProcessErrorEventArgs(
new OperationCanceledException("shutdown", CancellationToken.None),
ServiceBusErrorSource.Receive,
"test-namespace",
"test-queue",
CancellationToken.None);

await wrapper.InvokeOnExceptionOccuredAsync(args);

cancellationAware.Verify(x => x.OnReceiveCancelled(), Times.Once());
}

[Fact]
public async Task OnExceptionOccured_WithNonCancelledException_DoesNotCallOnReceiveCancelled()
{
var mockTransactionManager = new Mock<ITransactionManager>();
var cancellationAware = mockTransactionManager.As<ICancellationAwareTransactionManager>();
var wrapper = CreateWrapper(transactionManager: mockTransactionManager.Object);

var args = new ProcessErrorEventArgs(
new InvalidOperationException("connection lost"),
ServiceBusErrorSource.Receive,
"test-namespace",
"test-queue",
CancellationToken.None);

await wrapper.InvokeOnExceptionOccuredAsync(args);

cancellationAware.Verify(x => x.OnReceiveCancelled(), Times.Never());
}

[Fact]
public async Task OnExceptionOccured_WithNonCancelledToken_LogsError()
{
Expand Down
Loading