diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d4df942..4126792 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 5.7.4 +- Fixed + - Prevented `OperationCanceledException` during pod graceful shutdown from being recorded as APM errors. Added `ICancellationAwareTransactionManager` — an optional interface that `ITransactionManager` implementations can implement to react to receive-loop cancellations. `ApmTransactionManager` implements it by setting the current Elastic APM transaction outcome to `Success`, overriding the error state set by the Azure SDK's auto-instrumentation. `ReceiverWrapper` calls `OnReceiveCancelled()` via a runtime cast before logging the shutdown warning. + ## 5.7.3 - Fixed - Corrected the shutdown `OperationCanceledException` guard introduced in 5.7.2. The previous guard checked `oce.CancellationToken.IsCancellationRequested`, which was always `false` because the Azure ServiceBus SDK raises `ProcessErrorAsync` with `CancellationToken.None` during processor shutdown. The guard now matches any `OperationCanceledException`, which is safe since genuine transport errors surface as `ServiceBusException`. diff --git a/src/Ev.ServiceBus.Abstractions/Listeners/ICancellationAwareTransactionManager.cs b/src/Ev.ServiceBus.Abstractions/Listeners/ICancellationAwareTransactionManager.cs new file mode 100644 index 0000000..91605bd --- /dev/null +++ b/src/Ev.ServiceBus.Abstractions/Listeners/ICancellationAwareTransactionManager.cs @@ -0,0 +1,11 @@ +namespace Ev.ServiceBus.Abstractions.Listeners; + +/// +/// Optional extension for implementations that need to react +/// to receive-loop cancellations (e.g. pod graceful shutdown) — for example, to prevent APM +/// from recording the cancellation as an error transaction. +/// +public interface ICancellationAwareTransactionManager +{ + void OnReceiveCancelled(); +} diff --git a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs index e9f3efb..a18aaab 100644 --- a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs +++ b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs @@ -12,7 +12,7 @@ namespace Ev.ServiceBus.Apm; /// /// Default Transaction uses Diagnostics for Elastic APM /// -public class ApmTransactionManager : ITransactionManager +public class ApmTransactionManager : ITransactionManager, ICancellationAwareTransactionManager { public async Task RunWithInTransaction(MessageExecutionContext executionContext, Func transaction) { @@ -71,6 +71,12 @@ private static List GetSpanLinks(string? diagnosticId) return spanLinks; } + public void OnReceiveCancelled() + { + if (IsTraceEnabled()) + Agent.Tracer.CurrentTransaction.Outcome = Outcome.Success; + } + private static bool IsTraceEnabled() => Agent.IsConfigured && Agent.Config.Enabled && Agent.Tracer is not null && Agent.Tracer.CurrentTransaction is not null; } \ No newline at end of file diff --git a/src/Ev.ServiceBus/Management/Wrappers/ReceiverWrapper.cs b/src/Ev.ServiceBus/Management/Wrappers/ReceiverWrapper.cs index abc180c..45fdfd1 100644 --- a/src/Ev.ServiceBus/Management/Wrappers/ReceiverWrapper.cs +++ b/src/Ev.ServiceBus/Management/Wrappers/ReceiverWrapper.cs @@ -134,6 +134,7 @@ protected async Task OnExceptionOccured(ProcessErrorEventArgs exceptionEvent) { if (exceptionEvent.Exception is OperationCanceledException) { + (_transactionManager as ICancellationAwareTransactionManager)?.OnReceiveCancelled(); _messageProcessingLogger.LogWarning( "[Ev.ServiceBus] Receive loop cancelled for {ClientType} '{ResourceId}' during shutdown.", _composedOptions.ClientType, _composedOptions.ResourceId); diff --git a/tests/Ev.ServiceBus.UnitTests/ReceiverWrapperTests.cs b/tests/Ev.ServiceBus.UnitTests/ReceiverWrapperTests.cs index b735f34..4668324 100644 --- a/tests/Ev.ServiceBus.UnitTests/ReceiverWrapperTests.cs +++ b/tests/Ev.ServiceBus.UnitTests/ReceiverWrapperTests.cs @@ -13,7 +13,9 @@ namespace Ev.ServiceBus.UnitTests; public sealed class ReceiverWrapperTests { - private static TestableReceiverWrapper CreateWrapper(ILogger? messageLogger = null) + private static TestableReceiverWrapper CreateWrapper( + ILogger? messageLogger = null, + ITransactionManager? transactionManager = null) { var mockServices = new Mock(); var composedOptions = new ComposedReceiverOptions([new QueueOptions(mockServices.Object, "test-queue")]); @@ -21,7 +23,7 @@ private static TestableReceiverWrapper CreateWrapper(ILogger(); mockProvider.Setup(p => p.GetService(typeof(ITransactionManager))) - .Returns(Mock.Of()); + .Returns(transactionManager ?? Mock.Of()); mockProvider.Setup(p => p.GetService(typeof(ILogger))) .Returns(Mock.Of>()); mockProvider.Setup(p => p.GetService(typeof(ILogger))) @@ -82,6 +84,44 @@ public async Task OnExceptionOccured_WithTaskCanceledException_DoesNotLogError() Times.Never()); } + [Fact] + public async Task OnExceptionOccured_WithOperationCanceledException_CallsOnReceiveCancelled() + { + var mockTransactionManager = new Mock(); + var cancellationAware = mockTransactionManager.As(); + var wrapper = CreateWrapper(transactionManager: mockTransactionManager.Object); + + var args = new ProcessErrorEventArgs( + new OperationCanceledException("shutdown", CancellationToken.None), + ServiceBusErrorSource.Receive, + "test-namespace", + "test-queue", + CancellationToken.None); + + await wrapper.InvokeOnExceptionOccuredAsync(args); + + cancellationAware.Verify(x => x.OnReceiveCancelled(), Times.Once()); + } + + [Fact] + public async Task OnExceptionOccured_WithNonCancelledException_DoesNotCallOnReceiveCancelled() + { + var mockTransactionManager = new Mock(); + var cancellationAware = mockTransactionManager.As(); + var wrapper = CreateWrapper(transactionManager: mockTransactionManager.Object); + + var args = new ProcessErrorEventArgs( + new InvalidOperationException("connection lost"), + ServiceBusErrorSource.Receive, + "test-namespace", + "test-queue", + CancellationToken.None); + + await wrapper.InvokeOnExceptionOccuredAsync(args); + + cancellationAware.Verify(x => x.OnReceiveCancelled(), Times.Never()); + } + [Fact] public async Task OnExceptionOccured_WithNonCancelledToken_LogsError() {