Skip to content

Commit 1c4ad2b

Browse files
author
Robert Karp
committed
fix: suppress OperationCanceledException APM errors during pod shutdown (5.7.4)
Added ICancellationAwareTransactionManager to Ev.ServiceBus.Abstractions — an optional interface for ITransactionManager implementations that need to react to receive-loop cancellations. ApmTransactionManager implements it by setting the current Elastic APM transaction outcome to Success, overriding the error state set by the Azure SDK auto-instrumentation. ReceiverWrapper calls OnReceiveCancelled() via a runtime cast inside the existing cancellation guard. The 5.7.3 guard already prevented LogError for shutdown cancellations; this fix closes the remaining gap where APM errors were still recorded through the Elastic APM agent's Azure Service Bus DiagnosticSource instrumentation.
1 parent 596055e commit 1c4ad2b

5 files changed

Lines changed: 65 additions & 3 deletions

File tree

docs/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## 5.7.4
8+
- Fixed
9+
- Prevented `OperationCanceledException` during pod graceful shutdown from being recorded as APM errors. Added `ICancellationAwareTransactionManager` — an optional interface that `ITransactionManager` implementations can implement to react to receive-loop cancellations. `ApmTransactionManager` implements it by setting the current Elastic APM transaction outcome to `Success`, overriding the error state set by the Azure SDK's auto-instrumentation. `ReceiverWrapper` calls `OnReceiveCancelled()` via a runtime cast before logging the shutdown warning.
10+
711
## 5.7.3
812
- Fixed
913
- Corrected the shutdown `OperationCanceledException` guard introduced in 5.7.2. The previous guard checked `oce.CancellationToken.IsCancellationRequested`, which was always `false` because the Azure ServiceBus SDK raises `ProcessErrorAsync` with `CancellationToken.None` during processor shutdown. The guard now matches any `OperationCanceledException`, which is safe since genuine transport errors surface as `ServiceBusException`.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
namespace Ev.ServiceBus.Abstractions.Listeners;
2+
3+
/// <summary>
4+
/// Optional extension for <see cref="ITransactionManager"/> implementations that need to react
5+
/// to receive-loop cancellations (e.g. pod graceful shutdown) — for example, to prevent APM
6+
/// from recording the cancellation as an error transaction.
7+
/// </summary>
8+
public interface ICancellationAwareTransactionManager
9+
{
10+
void OnReceiveCancelled();
11+
}

src/Ev.ServiceBus.Apm/ApmTransactionManager.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace Ev.ServiceBus.Apm;
1212
/// <summary>
1313
/// Default Transaction uses Diagnostics for Elastic APM
1414
/// </summary>
15-
public class ApmTransactionManager : ITransactionManager
15+
public class ApmTransactionManager : ITransactionManager, ICancellationAwareTransactionManager
1616
{
1717
public async Task RunWithInTransaction(MessageExecutionContext executionContext, Func<Task> transaction)
1818
{
@@ -71,6 +71,12 @@ private static List<SpanLink> GetSpanLinks(string? diagnosticId)
7171
return spanLinks;
7272
}
7373

74+
public void OnReceiveCancelled()
75+
{
76+
if (IsTraceEnabled())
77+
Agent.Tracer.CurrentTransaction.Outcome = Outcome.Success;
78+
}
79+
7480
private static bool IsTraceEnabled()
7581
=> Agent.IsConfigured && Agent.Config.Enabled && Agent.Tracer is not null && Agent.Tracer.CurrentTransaction is not null;
7682
}

src/Ev.ServiceBus/Management/Wrappers/ReceiverWrapper.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ protected async Task OnExceptionOccured(ProcessErrorEventArgs exceptionEvent)
134134
{
135135
if (exceptionEvent.Exception is OperationCanceledException)
136136
{
137+
(_transactionManager as ICancellationAwareTransactionManager)?.OnReceiveCancelled();
137138
_messageProcessingLogger.LogWarning(
138139
"[Ev.ServiceBus] Receive loop cancelled for {ClientType} '{ResourceId}' during shutdown.",
139140
_composedOptions.ClientType, _composedOptions.ResourceId);

tests/Ev.ServiceBus.UnitTests/ReceiverWrapperTests.cs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@ namespace Ev.ServiceBus.UnitTests;
1313

1414
public sealed class ReceiverWrapperTests
1515
{
16-
private static TestableReceiverWrapper CreateWrapper(ILogger<LoggingExtensions.MessageProcessing>? messageLogger = null)
16+
private static TestableReceiverWrapper CreateWrapper(
17+
ILogger<LoggingExtensions.MessageProcessing>? messageLogger = null,
18+
ITransactionManager? transactionManager = null)
1719
{
1820
var mockServices = new Mock<IServiceCollection>();
1921
var composedOptions = new ComposedReceiverOptions([new QueueOptions(mockServices.Object, "test-queue")]);
2022
var parentOptions = new ServiceBusOptions();
2123

2224
var mockProvider = new Mock<IServiceProvider>();
2325
mockProvider.Setup(p => p.GetService(typeof(ITransactionManager)))
24-
.Returns(Mock.Of<ITransactionManager>());
26+
.Returns(transactionManager ?? Mock.Of<ITransactionManager>());
2527
mockProvider.Setup(p => p.GetService(typeof(ILogger<LoggingExtensions.ServiceBusClientManagement>)))
2628
.Returns(Mock.Of<ILogger<LoggingExtensions.ServiceBusClientManagement>>());
2729
mockProvider.Setup(p => p.GetService(typeof(ILogger<LoggingExtensions.MessageProcessing>)))
@@ -82,6 +84,44 @@ public async Task OnExceptionOccured_WithTaskCanceledException_DoesNotLogError()
8284
Times.Never());
8385
}
8486

87+
[Fact]
88+
public async Task OnExceptionOccured_WithOperationCanceledException_CallsOnReceiveCancelled()
89+
{
90+
var mockTransactionManager = new Mock<ITransactionManager>();
91+
var cancellationAware = mockTransactionManager.As<ICancellationAwareTransactionManager>();
92+
var wrapper = CreateWrapper(transactionManager: mockTransactionManager.Object);
93+
94+
var args = new ProcessErrorEventArgs(
95+
new OperationCanceledException("shutdown", CancellationToken.None),
96+
ServiceBusErrorSource.Receive,
97+
"test-namespace",
98+
"test-queue",
99+
CancellationToken.None);
100+
101+
await wrapper.InvokeOnExceptionOccuredAsync(args);
102+
103+
cancellationAware.Verify(x => x.OnReceiveCancelled(), Times.Once());
104+
}
105+
106+
[Fact]
107+
public async Task OnExceptionOccured_WithNonCancelledException_DoesNotCallOnReceiveCancelled()
108+
{
109+
var mockTransactionManager = new Mock<ITransactionManager>();
110+
var cancellationAware = mockTransactionManager.As<ICancellationAwareTransactionManager>();
111+
var wrapper = CreateWrapper(transactionManager: mockTransactionManager.Object);
112+
113+
var args = new ProcessErrorEventArgs(
114+
new InvalidOperationException("connection lost"),
115+
ServiceBusErrorSource.Receive,
116+
"test-namespace",
117+
"test-queue",
118+
CancellationToken.None);
119+
120+
await wrapper.InvokeOnExceptionOccuredAsync(args);
121+
122+
cancellationAware.Verify(x => x.OnReceiveCancelled(), Times.Never());
123+
}
124+
85125
[Fact]
86126
public async Task OnExceptionOccured_WithNonCancelledToken_LogsError()
87127
{

0 commit comments

Comments
 (0)