diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 4126792..4412da1 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 5.7.5 +- Fixed + - `ApmTransactionManager` now registers the APM error filter **at construction time** (application startup) instead of lazily inside `OnReceiveCancelled()`. The lazy approach lost a race: during pod graceful shutdown the APM agent flushes its internal buffer concurrently with Service Bus processor teardown, so error events could be sent to APM before `ReceiverWrapper.OnExceptionOccured` ran and had a chance to register the filter. Registering at construction time — before any message processing starts — closes this window. + - **Registration reliability fix (5.7.5-preview4):** the constructor registration silently fails when the Elastic APM hosted service starts *after* the Service Bus hosted service (the typical DI registration order). `RegisterShutdownErrorFilter()` is now also called at the start of every `RunWithInTransaction()` call (guarded by `Interlocked.CompareExchange` — O(1) no-op after the first success) and before the `IsTraceEnabled()` guard in `OnReceiveCancelled()`, ensuring the filter is in place before the first message processing completes regardless of hosted-service start order. + - The filter now also suppresses `TaskCanceledException` / `OperationCanceledException` errors whose culprit originates in `AmqpReceiver.ReceiveMessagesAsyncInternal`. These error events are produced by Elastic APM's **auto-instrumented** Azure Service Bus transactions (`"AzureServiceBus RECEIVE from …"`): the Azure SDK ends its underlying `Activity` (and therefore the APM transaction) before calling `ProcessErrorAsync`, so `Agent.Tracer.CurrentTransaction` is already `null` when `OnReceiveCancelled()` runs — the transaction ID is never added to `_cancelledTransactionIds` and the transaction-ID-based filter path cannot suppress them. After switching to WebSockets transport, `TaskCanceledException` from this code path only occurs during pod graceful shutdown. + ## 5.7.4 - Fixed - Prevented `OperationCanceledException` during pod graceful shutdown from being recorded as APM errors. Added `ICancellationAwareTransactionManager` — an optional interface that `ITransactionManager` implementations can implement to react to receive-loop cancellations. `ApmTransactionManager` implements it by setting the current Elastic APM transaction outcome to `Success`, overriding the error state set by the Azure SDK's auto-instrumentation. `ReceiverWrapper` calls `OnReceiveCancelled()` via a runtime cast before logging the shutdown warning. diff --git a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs index a18aaab..866ecc4 100644 --- a/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs +++ b/src/Ev.ServiceBus.Apm/ApmTransactionManager.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; using Elastic.Apm; using Elastic.Apm.Api; @@ -14,10 +16,52 @@ namespace Ev.ServiceBus.Apm; /// public class ApmTransactionManager : ITransactionManager, ICancellationAwareTransactionManager { + // Static: Agent.AddFilter is process-wide; all ApmTransactionManager instances (one per consumer) + // must share one filter registration and one cancelled-transaction set. + // Note for tests: static state is shared across test runs within the same process. + // Call ResetForTests() in test setup/teardown to isolate tests from each other. + + // Tracks transaction IDs for which the ASB ProcessErrorAsync callback fired an OperationCanceledException + // (the standard signal that the receive loop is being stopped, most commonly during pod graceful shutdown). + // The error filter below suppresses APM error events for these transactions so that + // shutdown-induced TaskCanceledException entries do not appear in APM. + // + // IDs are removed from this set after the filter matches them (TryRemove) to keep the dictionary lean. + // Cap behaviour: entries beyond CancelledTransactionIdCap are not tracked, so their error events + // fall through to the culprit-based filter path (Case 2 in ShouldSuppressError). This is an accepted + // tradeoff — reaching 1000 entries requires ~20 consecutive graceful processor stop/start cycles on + // the same pod instance, which does not occur in normal Kubernetes rolling-deploy scenarios. + // If the cap were reached by non-AmqpReceiver OperationCanceledException paths (i.e. user-code + // cancellations unrelated to AMQP shutdown), those excess error events would not be suppressed by + // either Case 1 or Case 2 and would reach the APM server. This is acceptable: the conditions + // required to reach the cap via that path are not reachable in practice. + private static readonly ConcurrentDictionary _cancelledTransactionIds = new(); + private const int CancelledTransactionIdCap = 1000; + private static int _filterRegistered; // 0 = not registered, 1 = registered (Interlocked.CompareExchange requires int) + + // Azure.Messaging.ServiceBus internal class name that appears in the APM error culprit when + // the AMQP receive loop is cancelled. After switching to WebSockets transport (PR #202138), + // this culprit only fires during pod graceful shutdown — not from network drops (which surface + // as ServiceBusException, not TaskCanceledException). + private const string AmqpReceiverCulprit = "AmqpReceiver"; + + public ApmTransactionManager() + { + // Best-effort registration at construction time. If the Elastic APM hosted service starts + // AFTER the Service Bus hosted service (the typical registration order), Agent.IsConfigured + // is still false here and registration is deferred to the first RunWithInTransaction call. + RegisterShutdownErrorFilter(); + } + public async Task RunWithInTransaction(MessageExecutionContext executionContext, Func transaction) { if (IsTraceEnabled()) { + // Ensure the filter is registered before any message processing completes. + // This is the reliable registration point: by the time IsTraceEnabled() returns true, + // Agent.IsConfigured is guaranteed true — covering the case where the APM hosted service + // started after the Service Bus hosted service and the constructor registration was skipped. + RegisterShutdownErrorFilter(); Agent.Tracer.CurrentTransaction.Name = executionContext.ExecutionName; Agent.Tracer.CurrentTransaction.SetLabel( nameof(executionContext.ClientType), @@ -73,10 +117,71 @@ private static List GetSpanLinks(string? diagnosticId) public void OnReceiveCancelled() { - if (IsTraceEnabled()) - Agent.Tracer.CurrentTransaction.Outcome = Outcome.Success; + // Attempt registration before checking IsTraceEnabled — the filter must be in place even + // for auto-instrumented "AzureServiceBus RECEIVE" transactions where CurrentTransaction is + // null (Case 2). After this point it is too late for the current error batch, but + // registering here ensures coverage if RunWithInTransaction was never reached. + RegisterShutdownErrorFilter(); + + if (!IsTraceEnabled()) + return; + + var tx = Agent.Tracer.CurrentTransaction; + if (tx is null) return; + tx.Outcome = Outcome.Success; + // Count check and TryAdd are not atomic — under high concurrency the dictionary may reach + // CancelledTransactionIdCap + N entries. The cap is a soft limit; this is intentional. + if (_cancelledTransactionIds.Count < CancelledTransactionIdCap) + _cancelledTransactionIds.TryAdd(tx.Id, 0); + // If Count >= CancelledTransactionIdCap, this ID is not tracked here. + // The culprit-based path (Case 2 in ShouldSuppressError) still suppresses the error. } + private static void RegisterShutdownErrorFilter() + { + if (!Agent.IsConfigured || Interlocked.CompareExchange(ref _filterRegistered, 1, 0) != 0) + return; + + // Returning null from the filter drops the error event before it reaches the APM server. + Agent.AddFilter((IError error) => + ShouldSuppressError(error.TransactionId, error.Exception?.Type, error.Culprit) + ? null + : error); + } + + // Extracted for unit testability. Called by the APM filter lambda. + internal static bool ShouldSuppressError(string? transactionId, string? exceptionType, string? culprit) + { + // Case 1: transaction explicitly tracked via OnReceiveCancelled(). + // TryRemove keeps the dictionary lean — matched IDs are consumed on first use. + // No exceptionType guard is applied here: _cancelledTransactionIds is populated exclusively + // by OnReceiveCancelled(), which ReceiverWrapper only calls for OperationCanceledException. + // Any ID present in this set therefore already originates from a cancellation path. + if (transactionId is not null && _cancelledTransactionIds.TryRemove(transactionId, out _)) + return true; + + // Case 2: Elastic APM auto-instrumented "AzureServiceBus RECEIVE" transactions. + // The Azure SDK ends its Activity (and therefore the APM transaction) before firing + // ProcessErrorAsync, so Agent.Tracer.CurrentTransaction is null when OnReceiveCancelled() + // runs — the transaction ID is never added to _cancelledTransactionIds. + // After switching to WebSockets transport, TaskCanceledException originating in + // AmqpReceiver.ReceiveMessagesAsyncInternal only occurs during pod graceful shutdown. + return (exceptionType is "System.Threading.Tasks.TaskCanceledException" + or "System.OperationCanceledException") && + culprit?.Contains(AmqpReceiverCulprit, StringComparison.Ordinal) == true; + } + + // For test isolation only — resets static state between test runs in the same process. + internal static void ResetForTests() + { + _cancelledTransactionIds.Clear(); + Interlocked.Exchange(ref _filterRegistered, 0); + } + + // For test setup only — seeds a transaction ID as if OnReceiveCancelled() had recorded it. + internal static void AddCancelledTransactionIdForTests(string id) => + _cancelledTransactionIds.TryAdd(id, 0); + private static bool IsTraceEnabled() => Agent.IsConfigured && Agent.Config.Enabled && Agent.Tracer is not null && Agent.Tracer.CurrentTransaction is not null; -} \ No newline at end of file +} diff --git a/src/Ev.ServiceBus.Apm/Ev.ServiceBus.Apm.csproj b/src/Ev.ServiceBus.Apm/Ev.ServiceBus.Apm.csproj index 7bff209..c5bf004 100644 --- a/src/Ev.ServiceBus.Apm/Ev.ServiceBus.Apm.csproj +++ b/src/Ev.ServiceBus.Apm/Ev.ServiceBus.Apm.csproj @@ -18,4 +18,8 @@ + + + + diff --git a/tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs b/tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs new file mode 100644 index 0000000..b09dfbb --- /dev/null +++ b/tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs @@ -0,0 +1,125 @@ +using System; +using Ev.ServiceBus.Apm; +using Xunit; + +namespace Ev.ServiceBus.UnitTests; + +public sealed class ApmTransactionManagerTests : IDisposable +{ + public ApmTransactionManagerTests() + { + ApmTransactionManager.ResetForTests(); + } + + public void Dispose() + { + ApmTransactionManager.ResetForTests(); + } + + [Fact] + public void ShouldSuppressError_WithTrackedTransactionId_ReturnsTrue() + { + ApmTransactionManager.AddCancelledTransactionIdForTests("tx-abc"); + + var result = ApmTransactionManager.ShouldSuppressError( + "tx-abc", + "System.Threading.Tasks.TaskCanceledException", + "some.culprit"); + + Assert.True(result); + } + + [Fact] + public void ShouldSuppressError_WithTrackedTransactionId_RemovesIdAfterMatch() + { + ApmTransactionManager.AddCancelledTransactionIdForTests("tx-abc"); + ApmTransactionManager.ShouldSuppressError("tx-abc", "System.Threading.Tasks.TaskCanceledException", "some.culprit"); + + var secondResult = ApmTransactionManager.ShouldSuppressError( + "tx-abc", + "System.Threading.Tasks.TaskCanceledException", + "some.culprit"); + + Assert.False(secondResult); + } + + [Fact] + public void ShouldSuppressError_WithAmqpReceiverCulpritAndTaskCanceledException_ReturnsTrue() + { + var result = ApmTransactionManager.ShouldSuppressError( + "untracked-tx", + "System.Threading.Tasks.TaskCanceledException", + "Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+d__45"); + + Assert.True(result); + } + + [Fact] + public void ShouldSuppressError_WithAmqpReceiverCulpritAndOperationCanceledException_ReturnsTrue() + { + var result = ApmTransactionManager.ShouldSuppressError( + "untracked-tx", + "System.OperationCanceledException", + "Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+SomeInternalMethod"); + + Assert.True(result); + } + + [Fact] + public void ShouldSuppressError_WithAmqpReceiverCulpritButServiceBusException_ReturnsFalse() + { + var result = ApmTransactionManager.ShouldSuppressError( + "untracked-tx", + "Azure.Messaging.ServiceBus.ServiceBusException", + "Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+SomeInternalMethod"); + + Assert.False(result); + } + + [Fact] + public void ShouldSuppressError_WithNonAmqpCulpritAndTaskCanceledException_ReturnsFalse() + { + var result = ApmTransactionManager.ShouldSuppressError( + null, + "System.Threading.Tasks.TaskCanceledException", + "Some.Other.Namespace.SomeClass+SomeMethod"); + + Assert.False(result); + } + + [Fact] + public void ShouldSuppressError_WithUntrackedIdAndNonCancelledException_ReturnsFalse() + { + var result = ApmTransactionManager.ShouldSuppressError( + "not-tracked", + "System.InvalidOperationException", + "Some.Class+SomeMethod"); + + Assert.False(result); + } + + [Fact] + public void ShouldSuppressError_WithNullTransactionIdAndNullCulprit_ReturnsFalse() + { + var result = ApmTransactionManager.ShouldSuppressError(null, null, null); + + Assert.False(result); + } + + [Fact] + public void ShouldSuppressError_WhenCapExceeded_CulpritPathStillSuppresses() + { + // Simulate the cap-exceeded scenario: OnReceiveCancelled stops tracking IDs once the + // dictionary is full. Errors for those untracked transactions must still be suppressed + // by the culprit-based path (Case 2). + for (var i = 0; i < 1000; i++) + ApmTransactionManager.AddCancelledTransactionIdForTests($"capped-tx-{i}"); + + var result = ApmTransactionManager.ShouldSuppressError( + "untracked-due-to-cap", + "System.Threading.Tasks.TaskCanceledException", + "Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+d__45"); + + Assert.True(result); + } +} diff --git a/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj b/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj index ed62131..4ef7907 100644 --- a/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj +++ b/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj @@ -14,6 +14,7 @@ +