Skip to content

Commit de4367d

Browse files
ecofrankieRobert Karpclaude
authored
feat(#442484): suppress TaskCanceledException APM noise during pod graceful shutdown (#113)
Add guard clause to ReceiverWrapper.OnExceptionOccured: when the exception is an OperationCanceledException with IsCancellationRequested=true, log at Debug and return early instead of logging at Error. This eliminates ~250 spurious APM error entries per day caused by all concurrent receivers being cancelled simultaneously during pod graceful shutdown via CloseAsync. Real transport errors (non-cancelled token) continue to be logged at Error, preserving the original behaviour for genuine failures. Also initialise _onExceptionReceivedHandler to a no-op default (non-nullable) so OnExceptionOccured is safe to call without RegisterMessageHandler, which enables direct unit testing via a TestableReceiverWrapper subclass. Co-authored-by: Robert Karp <rkarp@ecovadisazure.onmicrosoft.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 865b539 commit de4367d

3 files changed

Lines changed: 111 additions & 2 deletions

File tree

docs/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## 5.7.2
8+
- Fixed
9+
- Suppressed spurious `OperationCanceledException` / `TaskCanceledException` APM error entries during pod graceful shutdown. When the Service Bus receive loop is cancelled with a requested cancellation token, the exception is now logged at `Warning` level instead of `Error`.
10+
711
## 5.7.1
812
- Changed
913
- Updated IMessageMetadataAccessor interface to allow to be decorated

src/Ev.ServiceBus/Management/Wrappers/ReceiverWrapper.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class ReceiverWrapper
2222
private readonly ComposedReceiverOptions _composedOptions;
2323
private readonly ITransactionManager _transactionManager;
2424

25-
private Func<ProcessErrorEventArgs, Task>? _onExceptionReceivedHandler;
25+
private Func<ProcessErrorEventArgs, Task> _onExceptionReceivedHandler = _ => Task.CompletedTask;
2626

2727
public ReceiverWrapper(ServiceBusClient? client,
2828
ComposedReceiverOptions options,
@@ -132,6 +132,14 @@ private void TrySetReceptionRegistrationOnContext(MessageContext context, IServi
132132
/// <returns></returns>
133133
protected async Task OnExceptionOccured(ProcessErrorEventArgs exceptionEvent)
134134
{
135+
if (exceptionEvent.Exception is OperationCanceledException oce && oce.CancellationToken.IsCancellationRequested)
136+
{
137+
_messageProcessingLogger.LogWarning(
138+
"[Ev.ServiceBus] Receive loop cancelled for {ClientType} '{ResourceId}' during shutdown.",
139+
_composedOptions.ClientType, _composedOptions.ResourceId);
140+
return;
141+
}
142+
135143
var processException = exceptionEvent.Exception as FailedToProcessMessageException;
136144
using (_messageProcessingLogger.ProcessingInProgress(
137145
clientType: processException?.ClientType ?? _composedOptions.ClientType.ToString(),
@@ -149,7 +157,7 @@ protected async Task OnExceptionOccured(ProcessErrorEventArgs exceptionEvent)
149157
exceptionEvent.EntityPath,
150158
processExceptionInnerException);
151159

152-
await _onExceptionReceivedHandler!(exceptionEvent);
160+
await _onExceptionReceivedHandler(exceptionEvent);
153161
}
154162
}
155163

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Azure.Messaging.ServiceBus;
5+
using Ev.ServiceBus.Abstractions;
6+
using Ev.ServiceBus.Abstractions.Listeners;
7+
using Microsoft.Extensions.DependencyInjection;
8+
using Microsoft.Extensions.Logging;
9+
using Moq;
10+
using Xunit;
11+
12+
namespace Ev.ServiceBus.UnitTests;
13+
14+
public sealed class ReceiverWrapperTests
15+
{
16+
private static TestableReceiverWrapper CreateWrapper(ILogger<LoggingExtensions.MessageProcessing>? messageLogger = null)
17+
{
18+
var mockServices = new Mock<IServiceCollection>();
19+
var composedOptions = new ComposedReceiverOptions([new QueueOptions(mockServices.Object, "test-queue")]);
20+
var parentOptions = new ServiceBusOptions();
21+
22+
var mockProvider = new Mock<IServiceProvider>();
23+
mockProvider.Setup(p => p.GetService(typeof(ITransactionManager)))
24+
.Returns(Mock.Of<ITransactionManager>());
25+
mockProvider.Setup(p => p.GetService(typeof(ILogger<LoggingExtensions.ServiceBusClientManagement>)))
26+
.Returns(Mock.Of<ILogger<LoggingExtensions.ServiceBusClientManagement>>());
27+
mockProvider.Setup(p => p.GetService(typeof(ILogger<LoggingExtensions.MessageProcessing>)))
28+
.Returns(messageLogger ?? Mock.Of<ILogger<LoggingExtensions.MessageProcessing>>());
29+
30+
return new TestableReceiverWrapper(composedOptions, parentOptions, mockProvider.Object);
31+
}
32+
33+
[Fact]
34+
public async Task OnExceptionOccured_WithCancelledToken_DoesNotLogError()
35+
{
36+
var mockLogger = new Mock<ILogger<LoggingExtensions.MessageProcessing>>();
37+
var wrapper = CreateWrapper(mockLogger.Object);
38+
39+
using var cts = new CancellationTokenSource();
40+
cts.Cancel();
41+
42+
var args = new ProcessErrorEventArgs(
43+
new OperationCanceledException("shutdown", cts.Token),
44+
ServiceBusErrorSource.Receive,
45+
"test-namespace",
46+
"test-queue",
47+
cts.Token);
48+
49+
await wrapper.InvokeOnExceptionOccuredAsync(args);
50+
51+
mockLogger.Verify(
52+
x => x.Log(
53+
LogLevel.Error,
54+
It.IsAny<EventId>(),
55+
It.Is<It.IsAnyType>((v, t) => true),
56+
It.IsAny<Exception?>(),
57+
It.Is<Func<It.IsAnyType, Exception?, string>>((v, t) => true)),
58+
Times.Never());
59+
}
60+
61+
[Fact]
62+
public async Task OnExceptionOccured_WithNonCancelledToken_LogsError()
63+
{
64+
var mockLogger = new Mock<ILogger<LoggingExtensions.MessageProcessing>>();
65+
mockLogger.Setup(x => x.IsEnabled(LogLevel.Error)).Returns(true);
66+
var wrapper = CreateWrapper(mockLogger.Object);
67+
68+
var args = new ProcessErrorEventArgs(
69+
new InvalidOperationException("connection lost"),
70+
ServiceBusErrorSource.Receive,
71+
"test-namespace",
72+
"test-queue",
73+
CancellationToken.None);
74+
75+
await wrapper.InvokeOnExceptionOccuredAsync(args);
76+
77+
mockLogger.Verify(
78+
x => x.Log(
79+
LogLevel.Error,
80+
It.IsAny<EventId>(),
81+
It.Is<It.IsAnyType>((v, t) => true),
82+
It.IsAny<Exception?>(),
83+
It.Is<Func<It.IsAnyType, Exception?, string>>((v, t) => true)),
84+
Times.Once());
85+
}
86+
87+
private sealed class TestableReceiverWrapper : ReceiverWrapper
88+
{
89+
public TestableReceiverWrapper(
90+
ComposedReceiverOptions options,
91+
ServiceBusOptions parentOptions,
92+
IServiceProvider provider)
93+
: base(null, options, parentOptions, provider) { }
94+
95+
public Task InvokeOnExceptionOccuredAsync(ProcessErrorEventArgs args) => OnExceptionOccured(args);
96+
}
97+
}

0 commit comments

Comments
 (0)