From 8cc09ec654bdf39f57274edadd9424033aeda219 Mon Sep 17 00:00:00 2001 From: Robert Karp Date: Mon, 15 Jun 2026 10:11:23 +0200 Subject: [PATCH 1/6] fix: suppress APM error events for receive-loop cancellations during shutdown (5.7.5) Setting Outcome=Success (5.7.4) was insufficient: Elastic APM captures error events at the DiagnosticSource level before ReceiverWrapper runs, so the error document was already queued regardless of the outcome override. Registers a one-time Agent.AddFilter(IError) that drops error events whose TransactionId matches a cancelled-receive transaction, preventing them from reaching the APM server. Co-Authored-By: Claude Sonnet 4.6 --- docs/CHANGELOG.md | 4 ++ .../ApmTransactionManager.cs | 52 ++++++++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 4126792..b9d79c6 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 5.7.5 +- Fixed + - `ApmTransactionManager` now registers the APM error filter **at construction time** (application startup) instead of lazily inside `OnReceiveCancelled()`. The lazy approach lost a race: during pod graceful shutdown the APM agent flushes its internal buffer concurrently with Service Bus processor teardown, so error events could be sent to APM before `ReceiverWrapper.OnExceptionOccured` ran and had a chance to register the filter. Registering at construction time — before any message processing starts — closes this window. A fallback call in `OnReceiveCancelled()` handles the edge case where the APM agent was not yet configured at construction. + ## 5.7.4 - Fixed - Prevented `OperationCanceledException` during pod graceful shutdown from being recorded as APM errors. Added `ICancellationAwareTransactionManager` — an optional interface that `ITransactionManager` implementations can implement to react to receive-loop cancellations. `ApmTransactionManager` implements it by setting the current Elastic APM transaction outcome to `Success`, overriding the error state set by the Azure SDK's auto-instrumentation. `ReceiverWrapper` calls `OnReceiveCancelled()` via a runtime cast before logging the shutdown warning. diff --git a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs index a18aaab..5045fbc 100644 --- a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs +++ b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; using Elastic.Apm; using Elastic.Apm.Api; @@ -14,6 +16,31 @@ namespace Ev.ServiceBus.Apm; /// public class ApmTransactionManager : ITransactionManager, ICancellationAwareTransactionManager { + // Static: Agent.AddFilter is process-wide; all ApmTransactionManager instances (one per consumer) + // must share one filter registration and one cancelled-transaction set. + + // Tracks transaction IDs for which the ASB ProcessErrorAsync callback fired an OperationCanceledException + // (the standard signal that the receive loop is being stopped, most commonly during pod graceful shutdown). + // The error filter below suppresses APM error events for these transactions so that + // shutdown-induced TaskCanceledException entries do not appear in APM. + // + // Cap behaviour: entries beyond CancelledTransactionIdCap are not tracked, so their error events + // pass through the filter. This is an accepted tradeoff — reaching 1000 entries requires ~20 + // consecutive graceful processor stop/start cycles on the same pod instance, which does not occur + // in normal Kubernetes rolling-deploy scenarios where the pod is replaced after each shutdown. + private static readonly ConcurrentDictionary _cancelledTransactionIds = new(); + private const int CancelledTransactionIdCap = 1000; + private static int _filterRegistered; // 0 = not registered, 1 = registered + + public ApmTransactionManager() + { + // Register the shutdown-cancellation error filter at construction time (application startup), + // not lazily on first OnReceiveCancelled(). During pod graceful shutdown the APM agent flushes + // its buffer concurrently with Service Bus processor teardown — registering the filter after the + // first OperationCanceledException fires loses that race and lets error events escape to APM. + RegisterShutdownErrorFilter(); + } + public async Task RunWithInTransaction(MessageExecutionContext executionContext, Func transaction) { if (IsTraceEnabled()) @@ -73,8 +100,29 @@ private static List GetSpanLinks(string? diagnosticId) public void OnReceiveCancelled() { - if (IsTraceEnabled()) - Agent.Tracer.CurrentTransaction.Outcome = Outcome.Success; + if (!IsTraceEnabled()) + return; + + var tx = Agent.Tracer.CurrentTransaction; + if (tx is null) return; + tx.Outcome = Outcome.Success; + if (_cancelledTransactionIds.Count < CancelledTransactionIdCap) + _cancelledTransactionIds.TryAdd(tx.Id, 0); + + // Fallback: if the agent was not yet configured when the constructor ran, register now. + RegisterShutdownErrorFilter(); + } + + private static void RegisterShutdownErrorFilter() + { + if (!Agent.IsConfigured || Interlocked.CompareExchange(ref _filterRegistered, 1, 0) != 0) + return; + + // Returning null from the filter drops the error event before it reaches the APM server. + Agent.AddFilter((IError error) => + error.TransactionId is not null && _cancelledTransactionIds.ContainsKey(error.TransactionId) + ? null + : error); } private static bool IsTraceEnabled() From fba02a482726546a9144185aa57c0ca347c5005f Mon Sep 17 00:00:00 2001 From: Robert Karp Date: Wed, 17 Jun 2026 15:38:54 +0200 Subject: [PATCH 2/6] fix: suppress auto-instrumented AmqpReceiver TaskCanceledException APM errors during shutdown (5.7.5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: Elastic APM creates "AzureServiceBus RECEIVE" transactions via DiagnosticSource auto-instrumentation. The Azure SDK ends its Activity before firing ProcessErrorAsync, so Agent.Tracer.CurrentTransaction is null in OnReceiveCancelled() — the TX ID is never tracked and the filter passes the error through. Fix: add culprit-based suppression for TaskCanceledException on the AmqpReceiver path (post-WebSockets fix, only fires on shutdown). Co-Authored-By: Claude Sonnet 4.6 --- docs/CHANGELOG.md | 1 + .../ApmTransactionManager.cs | 22 ++++++++++++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b9d79c6..41cce07 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## 5.7.5 - Fixed - `ApmTransactionManager` now registers the APM error filter **at construction time** (application startup) instead of lazily inside `OnReceiveCancelled()`. The lazy approach lost a race: during pod graceful shutdown the APM agent flushes its internal buffer concurrently with Service Bus processor teardown, so error events could be sent to APM before `ReceiverWrapper.OnExceptionOccured` ran and had a chance to register the filter. Registering at construction time — before any message processing starts — closes this window. A fallback call in `OnReceiveCancelled()` handles the edge case where the APM agent was not yet configured at construction. + - The filter now also suppresses `TaskCanceledException` / `OperationCanceledException` errors whose culprit originates in `AmqpReceiver.ReceiveMessagesAsyncInternal`. These error events are produced by Elastic APM's **auto-instrumented** Azure Service Bus transactions (`"AzureServiceBus RECEIVE from …"`): the Azure SDK ends its underlying `Activity` (and therefore the APM transaction) before calling `ProcessErrorAsync`, so `Agent.Tracer.CurrentTransaction` is already `null` when `OnReceiveCancelled()` runs — the transaction ID is never added to `_cancelledTransactionIds` and the transaction-ID-based filter path cannot suppress them. After switching to WebSockets transport, `TaskCanceledException` from this code path only occurs during pod graceful shutdown. ## 5.7.4 - Fixed diff --git a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs index 5045fbc..6522cb4 100644 --- a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs +++ b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs @@ -120,9 +120,25 @@ private static void RegisterShutdownErrorFilter() // Returning null from the filter drops the error event before it reaches the APM server. Agent.AddFilter((IError error) => - error.TransactionId is not null && _cancelledTransactionIds.ContainsKey(error.TransactionId) - ? null - : error); + { + // Case 1: transaction explicitly tracked via OnReceiveCancelled(). + if (error.TransactionId is not null && _cancelledTransactionIds.ContainsKey(error.TransactionId)) + return null; + + // Case 2: Elastic APM auto-instrumented "AzureServiceBus RECEIVE" transactions. + // The Azure SDK ends its Activity (and therefore the APM transaction) before firing + // ProcessErrorAsync, so Agent.Tracer.CurrentTransaction is null by the time + // OnReceiveCancelled() runs — the transaction ID is never added to + // _cancelledTransactionIds. Identify these by culprit pattern instead. + // After switching to WebSockets transport, TaskCanceledException originating in + // AmqpReceiver.ReceiveMessagesAsyncInternal only occurs during pod graceful shutdown. + if (error.Exception?.Type is "System.Threading.Tasks.TaskCanceledException" + or "System.OperationCanceledException" && + error.Culprit?.Contains("AmqpReceiver", StringComparison.Ordinal) == true) + return null; + + return error; + }); } private static bool IsTraceEnabled() From e4d13a4837fd9947ddc67114b3b7d160af78d93d Mon Sep 17 00:00:00 2001 From: Robert Karp Date: Wed, 17 Jun 2026 16:18:42 +0200 Subject: [PATCH 3/6] =?UTF-8?q?fix:=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20TryRemove,=20AmqpReceiverCulprit=20const,=20cap=20c?= =?UTF-8?q?omment,=20filter=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace ContainsKey with TryRemove in the filter so matched transaction IDs are consumed on first use, keeping the dictionary lean and avoiding unnecessary cap pressure - Extract 'AmqpReceiver' magic string to private const AmqpReceiverCulprit with comment documenting the Azure SDK source and why the culprit path is safe post-WebSockets fix - Extract ShouldSuppressError as an internal static method for unit testability - Add inline comment at the CancelledTransactionIdCap guard explaining the culprit fallback - Add InternalsVisibleTo("Ev.ServiceBus.UnitTests") to Ev.ServiceBus.Apm - Add ApmTransactionManagerTests: 8 tests covering both filter paths, the TryRemove consume-once behaviour, and non-suppressed exception types Co-Authored-By: Claude Sonnet 4.6 --- .../ApmTransactionManager.cs | 73 ++++++++---- .../Ev.ServiceBus.Apm.csproj | 4 + .../ApmTransactionManagerTests.cs | 108 ++++++++++++++++++ .../Ev.ServiceBus.UnitTests.csproj | 1 + 4 files changed, 162 insertions(+), 24 deletions(-) create mode 100644 tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs diff --git a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs index 6522cb4..8a08c40 100644 --- a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs +++ b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs @@ -18,19 +18,28 @@ public class ApmTransactionManager : ITransactionManager, ICancellationAwareTran { // Static: Agent.AddFilter is process-wide; all ApmTransactionManager instances (one per consumer) // must share one filter registration and one cancelled-transaction set. + // Note for tests: static state is shared across test runs within the same process. + // Call ResetForTests() in test setup/teardown to isolate tests from each other. // Tracks transaction IDs for which the ASB ProcessErrorAsync callback fired an OperationCanceledException // (the standard signal that the receive loop is being stopped, most commonly during pod graceful shutdown). // The error filter below suppresses APM error events for these transactions so that // shutdown-induced TaskCanceledException entries do not appear in APM. // + // IDs are removed from this set after the filter matches them (TryRemove) to keep the dictionary lean. // Cap behaviour: entries beyond CancelledTransactionIdCap are not tracked, so their error events - // pass through the filter. This is an accepted tradeoff — reaching 1000 entries requires ~20 - // consecutive graceful processor stop/start cycles on the same pod instance, which does not occur - // in normal Kubernetes rolling-deploy scenarios where the pod is replaced after each shutdown. + // fall through to the culprit-based filter path (Case 2 in ShouldSuppressError). This is an accepted + // tradeoff — reaching 1000 entries requires ~20 consecutive graceful processor stop/start cycles on + // the same pod instance, which does not occur in normal Kubernetes rolling-deploy scenarios. private static readonly ConcurrentDictionary _cancelledTransactionIds = new(); private const int CancelledTransactionIdCap = 1000; - private static int _filterRegistered; // 0 = not registered, 1 = registered + private static int _filterRegistered; // 0 = not registered, 1 = registered (Interlocked.CompareExchange requires int) + + // Azure.Messaging.ServiceBus internal class name that appears in the APM error culprit when + // the AMQP receive loop is cancelled. After switching to WebSockets transport (PR #202138), + // this culprit only fires during pod graceful shutdown — not from network drops (which surface + // as ServiceBusException, not TaskCanceledException). + private const string AmqpReceiverCulprit = "AmqpReceiver"; public ApmTransactionManager() { @@ -108,6 +117,8 @@ public void OnReceiveCancelled() tx.Outcome = Outcome.Success; if (_cancelledTransactionIds.Count < CancelledTransactionIdCap) _cancelledTransactionIds.TryAdd(tx.Id, 0); + // If Count >= CancelledTransactionIdCap, this ID is not tracked here. + // The culprit-based path (Case 2 in ShouldSuppressError) still suppresses the error. // Fallback: if the agent was not yet configured when the constructor ran, register now. RegisterShutdownErrorFilter(); @@ -120,27 +131,41 @@ private static void RegisterShutdownErrorFilter() // Returning null from the filter drops the error event before it reaches the APM server. Agent.AddFilter((IError error) => - { - // Case 1: transaction explicitly tracked via OnReceiveCancelled(). - if (error.TransactionId is not null && _cancelledTransactionIds.ContainsKey(error.TransactionId)) - return null; - - // Case 2: Elastic APM auto-instrumented "AzureServiceBus RECEIVE" transactions. - // The Azure SDK ends its Activity (and therefore the APM transaction) before firing - // ProcessErrorAsync, so Agent.Tracer.CurrentTransaction is null by the time - // OnReceiveCancelled() runs — the transaction ID is never added to - // _cancelledTransactionIds. Identify these by culprit pattern instead. - // After switching to WebSockets transport, TaskCanceledException originating in - // AmqpReceiver.ReceiveMessagesAsyncInternal only occurs during pod graceful shutdown. - if (error.Exception?.Type is "System.Threading.Tasks.TaskCanceledException" - or "System.OperationCanceledException" && - error.Culprit?.Contains("AmqpReceiver", StringComparison.Ordinal) == true) - return null; - - return error; - }); + ShouldSuppressError(error.TransactionId, error.Exception?.Type, error.Culprit) + ? null + : error); + } + + // Extracted for unit testability. Called by the APM filter lambda. + internal static bool ShouldSuppressError(string? transactionId, string? exceptionType, string? culprit) + { + // Case 1: transaction explicitly tracked via OnReceiveCancelled(). + // TryRemove keeps the dictionary lean — matched IDs are consumed on first use. + if (transactionId is not null && _cancelledTransactionIds.TryRemove(transactionId, out _)) + return true; + + // Case 2: Elastic APM auto-instrumented "AzureServiceBus RECEIVE" transactions. + // The Azure SDK ends its Activity (and therefore the APM transaction) before firing + // ProcessErrorAsync, so Agent.Tracer.CurrentTransaction is null when OnReceiveCancelled() + // runs — the transaction ID is never added to _cancelledTransactionIds. + // After switching to WebSockets transport, TaskCanceledException originating in + // AmqpReceiver.ReceiveMessagesAsyncInternal only occurs during pod graceful shutdown. + return exceptionType is "System.Threading.Tasks.TaskCanceledException" + or "System.OperationCanceledException" && + culprit?.Contains(AmqpReceiverCulprit, StringComparison.Ordinal) == true; } + // For test isolation only — resets static state between test runs in the same process. + internal static void ResetForTests() + { + _cancelledTransactionIds.Clear(); + Interlocked.Exchange(ref _filterRegistered, 0); + } + + // For test setup only — seeds a transaction ID as if OnReceiveCancelled() had recorded it. + internal static void AddCancelledTransactionIdForTests(string id) => + _cancelledTransactionIds.TryAdd(id, 0); + private static bool IsTraceEnabled() => Agent.IsConfigured && Agent.Config.Enabled && Agent.Tracer is not null && Agent.Tracer.CurrentTransaction is not null; -} \ No newline at end of file +} diff --git a/src/Ev.ServiceBus.Apm/Ev.ServiceBus.Apm.csproj b/src/Ev.ServiceBus.Apm/Ev.ServiceBus.Apm.csproj index 7bff209..c5bf004 100644 --- a/src/Ev.ServiceBus.Apm/Ev.ServiceBus.Apm.csproj +++ b/src/Ev.ServiceBus.Apm/Ev.ServiceBus.Apm.csproj @@ -18,4 +18,8 @@ + + + + diff --git a/tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs b/tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs new file mode 100644 index 0000000..49b7fd9 --- /dev/null +++ b/tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs @@ -0,0 +1,108 @@ +using System; +using Ev.ServiceBus.Apm; +using Xunit; + +namespace Ev.ServiceBus.UnitTests; + +public sealed class ApmTransactionManagerTests : IDisposable +{ + public ApmTransactionManagerTests() + { + ApmTransactionManager.ResetForTests(); + } + + public void Dispose() + { + ApmTransactionManager.ResetForTests(); + } + + [Fact] + public void ShouldSuppressError_WithTrackedTransactionId_ReturnsTrue() + { + ApmTransactionManager.AddCancelledTransactionIdForTests("tx-abc"); + + var result = ApmTransactionManager.ShouldSuppressError( + "tx-abc", + "System.Threading.Tasks.TaskCanceledException", + "some.culprit"); + + Assert.True(result); + } + + [Fact] + public void ShouldSuppressError_WithTrackedTransactionId_RemovesIdAfterMatch() + { + ApmTransactionManager.AddCancelledTransactionIdForTests("tx-abc"); + ApmTransactionManager.ShouldSuppressError("tx-abc", "System.Threading.Tasks.TaskCanceledException", "some.culprit"); + + var secondResult = ApmTransactionManager.ShouldSuppressError( + "tx-abc", + "System.Threading.Tasks.TaskCanceledException", + "some.culprit"); + + Assert.False(secondResult); + } + + [Fact] + public void ShouldSuppressError_WithAmqpReceiverCulpritAndTaskCanceledException_ReturnsTrue() + { + var result = ApmTransactionManager.ShouldSuppressError( + "untracked-tx", + "System.Threading.Tasks.TaskCanceledException", + "Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+d__45"); + + Assert.True(result); + } + + [Fact] + public void ShouldSuppressError_WithAmqpReceiverCulpritAndOperationCanceledException_ReturnsTrue() + { + var result = ApmTransactionManager.ShouldSuppressError( + "untracked-tx", + "System.OperationCanceledException", + "Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+SomeInternalMethod"); + + Assert.True(result); + } + + [Fact] + public void ShouldSuppressError_WithAmqpReceiverCulpritButServiceBusException_ReturnsFalse() + { + var result = ApmTransactionManager.ShouldSuppressError( + "untracked-tx", + "Azure.Messaging.ServiceBus.ServiceBusException", + "Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+SomeInternalMethod"); + + Assert.False(result); + } + + [Fact] + public void ShouldSuppressError_WithNonAmqpCulpritAndTaskCanceledException_ReturnsFalse() + { + var result = ApmTransactionManager.ShouldSuppressError( + null, + "System.Threading.Tasks.TaskCanceledException", + "Some.Other.Namespace.SomeClass+SomeMethod"); + + Assert.False(result); + } + + [Fact] + public void ShouldSuppressError_WithUntrackedIdAndNonCancelledException_ReturnsFalse() + { + var result = ApmTransactionManager.ShouldSuppressError( + "not-tracked", + "System.InvalidOperationException", + "Some.Class+SomeMethod"); + + Assert.False(result); + } + + [Fact] + public void ShouldSuppressError_WithNullTransactionIdAndNullCulprit_ReturnsFalse() + { + var result = ApmTransactionManager.ShouldSuppressError(null, null, null); + + Assert.False(result); + } +} diff --git a/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj b/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj index ed62131..4ef7907 100644 --- a/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj +++ b/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj @@ -14,6 +14,7 @@ + From 6d1e6625fc204b2d5ea37cd1ca1bf5ce030f171c Mon Sep 17 00:00:00 2001 From: Robert Karp Date: Wed, 17 Jun 2026 16:33:35 +0200 Subject: [PATCH 4/6] =?UTF-8?q?fix:=20address=20Michal's=20follow-up=20?= =?UTF-8?q?=E2=80=94=20add=20parens,=20TOCTOU=20comment,=20cap-exceeded=20?= =?UTF-8?q?test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add explicit parentheses around the is-pattern in ShouldSuppressError to document operator precedence intent: (exceptionType is "TCE" or "OCE") && culprit.Contains(...) - Document the non-atomic Count+TryAdd in OnReceiveCancelled as an intentional soft cap - Add ShouldSuppressError_WhenCapExceeded_CulpritPathStillSuppresses test: seeds 1000 entries to simulate cap exhaustion, then verifies the culprit path still suppresses the error for an untracked transaction ID Co-Authored-By: Claude Sonnet 4.6 --- src/Ev.ServiceBus.Apm/ApmTransactionManager.cs | 6 ++++-- .../ApmTransactionManagerTests.cs | 17 +++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs index 8a08c40..8328272 100644 --- a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs +++ b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs @@ -115,6 +115,8 @@ public void OnReceiveCancelled() var tx = Agent.Tracer.CurrentTransaction; if (tx is null) return; tx.Outcome = Outcome.Success; + // Count check and TryAdd are not atomic — under high concurrency the dictionary may reach + // CancelledTransactionIdCap + N entries. The cap is a soft limit; this is intentional. if (_cancelledTransactionIds.Count < CancelledTransactionIdCap) _cancelledTransactionIds.TryAdd(tx.Id, 0); // If Count >= CancelledTransactionIdCap, this ID is not tracked here. @@ -150,8 +152,8 @@ internal static bool ShouldSuppressError(string? transactionId, string? exceptio // runs — the transaction ID is never added to _cancelledTransactionIds. // After switching to WebSockets transport, TaskCanceledException originating in // AmqpReceiver.ReceiveMessagesAsyncInternal only occurs during pod graceful shutdown. - return exceptionType is "System.Threading.Tasks.TaskCanceledException" - or "System.OperationCanceledException" && + return (exceptionType is "System.Threading.Tasks.TaskCanceledException" + or "System.OperationCanceledException") && culprit?.Contains(AmqpReceiverCulprit, StringComparison.Ordinal) == true; } diff --git a/tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs b/tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs index 49b7fd9..b09dfbb 100644 --- a/tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs +++ b/tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs @@ -105,4 +105,21 @@ public void ShouldSuppressError_WithNullTransactionIdAndNullCulprit_ReturnsFalse Assert.False(result); } + + [Fact] + public void ShouldSuppressError_WhenCapExceeded_CulpritPathStillSuppresses() + { + // Simulate the cap-exceeded scenario: OnReceiveCancelled stops tracking IDs once the + // dictionary is full. Errors for those untracked transactions must still be suppressed + // by the culprit-based path (Case 2). + for (var i = 0; i < 1000; i++) + ApmTransactionManager.AddCancelledTransactionIdForTests($"capped-tx-{i}"); + + var result = ApmTransactionManager.ShouldSuppressError( + "untracked-due-to-cap", + "System.Threading.Tasks.TaskCanceledException", + "Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+d__45"); + + Assert.True(result); + } } From 0dc966847fb660427a202de25de650e1249b1e1a Mon Sep 17 00:00:00 2001 From: Robert Karp Date: Thu, 18 Jun 2026 13:44:07 +0200 Subject: [PATCH 5/6] fix: ensure APM error filter is registered before first message processing (5.7.5) The Service Bus hosted service starts before the Elastic APM hosted service (DI registration order in consuming apps), so Agent.IsConfigured is false when ApmTransactionManager is constructed and the constructor's filter registration is silently skipped. RegisterShutdownErrorFilter() is now also called: - At the start of RunWithInTransaction() when IsTraceEnabled() is true (agent is definitively up; O(1) no-op on all subsequent calls via Interlocked.CompareExchange guard) - Before the IsTraceEnabled() guard in OnReceiveCancelled() This guarantees the filter is in place before the first message processing completes and before any graceful-shutdown RECEIVE activity errors can fire. Co-Authored-By: Claude Sonnet 4.6 --- docs/CHANGELOG.md | 3 ++- .../ApmTransactionManager.cs | 21 ++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 41cce07..4412da1 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,7 +6,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## 5.7.5 - Fixed - - `ApmTransactionManager` now registers the APM error filter **at construction time** (application startup) instead of lazily inside `OnReceiveCancelled()`. The lazy approach lost a race: during pod graceful shutdown the APM agent flushes its internal buffer concurrently with Service Bus processor teardown, so error events could be sent to APM before `ReceiverWrapper.OnExceptionOccured` ran and had a chance to register the filter. Registering at construction time — before any message processing starts — closes this window. A fallback call in `OnReceiveCancelled()` handles the edge case where the APM agent was not yet configured at construction. + - `ApmTransactionManager` now registers the APM error filter **at construction time** (application startup) instead of lazily inside `OnReceiveCancelled()`. The lazy approach lost a race: during pod graceful shutdown the APM agent flushes its internal buffer concurrently with Service Bus processor teardown, so error events could be sent to APM before `ReceiverWrapper.OnExceptionOccured` ran and had a chance to register the filter. Registering at construction time — before any message processing starts — closes this window. + - **Registration reliability fix (5.7.5-preview4):** the constructor registration silently fails when the Elastic APM hosted service starts *after* the Service Bus hosted service (the typical DI registration order). `RegisterShutdownErrorFilter()` is now also called at the start of every `RunWithInTransaction()` call (guarded by `Interlocked.CompareExchange` — O(1) no-op after the first success) and before the `IsTraceEnabled()` guard in `OnReceiveCancelled()`, ensuring the filter is in place before the first message processing completes regardless of hosted-service start order. - The filter now also suppresses `TaskCanceledException` / `OperationCanceledException` errors whose culprit originates in `AmqpReceiver.ReceiveMessagesAsyncInternal`. These error events are produced by Elastic APM's **auto-instrumented** Azure Service Bus transactions (`"AzureServiceBus RECEIVE from …"`): the Azure SDK ends its underlying `Activity` (and therefore the APM transaction) before calling `ProcessErrorAsync`, so `Agent.Tracer.CurrentTransaction` is already `null` when `OnReceiveCancelled()` runs — the transaction ID is never added to `_cancelledTransactionIds` and the transaction-ID-based filter path cannot suppress them. After switching to WebSockets transport, `TaskCanceledException` from this code path only occurs during pod graceful shutdown. ## 5.7.4 diff --git a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs index 8328272..edbe6a4 100644 --- a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs +++ b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs @@ -43,10 +43,9 @@ public class ApmTransactionManager : ITransactionManager, ICancellationAwareTran public ApmTransactionManager() { - // Register the shutdown-cancellation error filter at construction time (application startup), - // not lazily on first OnReceiveCancelled(). During pod graceful shutdown the APM agent flushes - // its buffer concurrently with Service Bus processor teardown — registering the filter after the - // first OperationCanceledException fires loses that race and lets error events escape to APM. + // Best-effort registration at construction time. If the Elastic APM hosted service starts + // AFTER the Service Bus hosted service (the typical registration order), Agent.IsConfigured + // is still false here and registration is deferred to the first RunWithInTransaction call. RegisterShutdownErrorFilter(); } @@ -54,6 +53,11 @@ public async Task RunWithInTransaction(MessageExecutionContext executionContext, { if (IsTraceEnabled()) { + // Ensure the filter is registered before any message processing completes. + // This is the reliable registration point: by the time IsTraceEnabled() returns true, + // Agent.IsConfigured is guaranteed true — covering the case where the APM hosted service + // started after the Service Bus hosted service and the constructor registration was skipped. + RegisterShutdownErrorFilter(); Agent.Tracer.CurrentTransaction.Name = executionContext.ExecutionName; Agent.Tracer.CurrentTransaction.SetLabel( nameof(executionContext.ClientType), @@ -109,6 +113,12 @@ private static List GetSpanLinks(string? diagnosticId) public void OnReceiveCancelled() { + // Attempt registration before checking IsTraceEnabled — the filter must be in place even + // for auto-instrumented "AzureServiceBus RECEIVE" transactions where CurrentTransaction is + // null (Case 2). After this point it is too late for the current error batch, but + // registering here ensures coverage if RunWithInTransaction was never reached. + RegisterShutdownErrorFilter(); + if (!IsTraceEnabled()) return; @@ -121,9 +131,6 @@ public void OnReceiveCancelled() _cancelledTransactionIds.TryAdd(tx.Id, 0); // If Count >= CancelledTransactionIdCap, this ID is not tracked here. // The culprit-based path (Case 2 in ShouldSuppressError) still suppresses the error. - - // Fallback: if the agent was not yet configured when the constructor ran, register now. - RegisterShutdownErrorFilter(); } private static void RegisterShutdownErrorFilter() From eff43eab3ef64a0694dfa7b9e672e442cc28ab54 Mon Sep 17 00:00:00 2001 From: Robert Karp Date: Thu, 18 Jun 2026 16:07:06 +0200 Subject: [PATCH 6/6] docs: clarify Case 1 type-safety and cap behaviour in ApmTransactionManager Address review comments from PR #116: - Case 1 has no exceptionType guard because _cancelledTransactionIds is populated exclusively via OnReceiveCancelled(), which ReceiverWrapper only calls for OperationCanceledException. - Cap overflow for non-AmqpReceiver paths won't be suppressed; this is documented as acceptable given the conditions required to reach it. Co-Authored-By: Claude Sonnet 4.6 --- src/Ev.ServiceBus.Apm/ApmTransactionManager.cs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs index edbe6a4..866ecc4 100644 --- a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs +++ b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs @@ -31,6 +31,10 @@ public class ApmTransactionManager : ITransactionManager, ICancellationAwareTran // fall through to the culprit-based filter path (Case 2 in ShouldSuppressError). This is an accepted // tradeoff — reaching 1000 entries requires ~20 consecutive graceful processor stop/start cycles on // the same pod instance, which does not occur in normal Kubernetes rolling-deploy scenarios. + // If the cap were reached by non-AmqpReceiver OperationCanceledException paths (i.e. user-code + // cancellations unrelated to AMQP shutdown), those excess error events would not be suppressed by + // either Case 1 or Case 2 and would reach the APM server. This is acceptable: the conditions + // required to reach the cap via that path are not reachable in practice. private static readonly ConcurrentDictionary _cancelledTransactionIds = new(); private const int CancelledTransactionIdCap = 1000; private static int _filterRegistered; // 0 = not registered, 1 = registered (Interlocked.CompareExchange requires int) @@ -150,6 +154,9 @@ internal static bool ShouldSuppressError(string? transactionId, string? exceptio { // Case 1: transaction explicitly tracked via OnReceiveCancelled(). // TryRemove keeps the dictionary lean — matched IDs are consumed on first use. + // No exceptionType guard is applied here: _cancelledTransactionIds is populated exclusively + // by OnReceiveCancelled(), which ReceiverWrapper only calls for OperationCanceledException. + // Any ID present in this set therefore already originates from a cancellation path. if (transactionId is not null && _cancelledTransactionIds.TryRemove(transactionId, out _)) return true;