Skip to content

Commit 49ad9bc

Browse files
Robert Karpclaude
andcommitted
fix: suppress APM error events for receive-loop cancellations during shutdown (5.7.5)
Setting Outcome=Success (5.7.4) was insufficient: Elastic APM captures error events at the DiagnosticSource level before ReceiverWrapper runs, so the error document was already queued regardless of the outcome override. Registers a one-time Agent.AddFilter(IError) that drops error events whose TransactionId matches a cancelled-receive transaction, preventing them from reaching the APM server. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent f0e645f commit 49ad9bc

2 files changed

Lines changed: 71 additions & 2 deletions

File tree

docs/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## 5.7.5
8+
- Fixed
9+
- `ApmTransactionManager` now registers the APM error filter **at construction time** (application startup) instead of lazily inside `OnReceiveCancelled()`. The lazy approach lost a race: during pod graceful shutdown the APM agent flushes its internal buffer concurrently with Service Bus processor teardown, so error events could be sent to APM before `ReceiverWrapper.OnExceptionOccured` ran and had a chance to register the filter. Registering at construction time — before any message processing starts — closes this window. A fallback call in `OnReceiveCancelled()` handles the edge case where the APM agent was not yet configured at construction.
10+
- The filter now also suppresses `TaskCanceledException` / `OperationCanceledException` errors whose culprit originates in `AmqpReceiver.ReceiveMessagesAsyncInternal`. These error events are produced by Elastic APM's **auto-instrumented** Azure Service Bus transactions (`"AzureServiceBus RECEIVE from …"`): the Azure SDK ends its underlying `Activity` (and therefore the APM transaction) before calling `ProcessErrorAsync`, so `Agent.Tracer.CurrentTransaction` is already `null` when `OnReceiveCancelled()` runs — the transaction ID is never added to `_cancelledTransactionIds` and the transaction-ID-based filter path cannot suppress them. After switching to WebSockets transport, `TaskCanceledException` from this code path only occurs during pod graceful shutdown.
11+
712
## 5.7.4
813
- Fixed
914
- Prevented `OperationCanceledException` during pod graceful shutdown from being recorded as APM errors. Added `ICancellationAwareTransactionManager` — an optional interface that `ITransactionManager` implementations can implement to react to receive-loop cancellations. `ApmTransactionManager` implements it by setting the current Elastic APM transaction outcome to `Success`, overriding the error state set by the Azure SDK's auto-instrumentation. `ReceiverWrapper` calls `OnReceiveCancelled()` via a runtime cast before logging the shutdown warning.

src/Ev.ServiceBus.Apm/ApmTransactionManager.cs

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.Collections.Generic;
34
using System.Diagnostics;
5+
using System.Threading;
46
using System.Threading.Tasks;
57
using Elastic.Apm;
68
using Elastic.Apm.Api;
@@ -14,6 +16,31 @@ namespace Ev.ServiceBus.Apm;
1416
/// </summary>
1517
public class ApmTransactionManager : ITransactionManager, ICancellationAwareTransactionManager
1618
{
19+
// Static: Agent.AddFilter is process-wide; all ApmTransactionManager instances (one per consumer)
20+
// must share one filter registration and one cancelled-transaction set.
21+
22+
// Tracks transaction IDs for which the ASB ProcessErrorAsync callback fired an OperationCanceledException
23+
// (the standard signal that the receive loop is being stopped, most commonly during pod graceful shutdown).
24+
// The error filter below suppresses APM error events for these transactions so that
25+
// shutdown-induced TaskCanceledException entries do not appear in APM.
26+
//
27+
// Cap behaviour: entries beyond CancelledTransactionIdCap are not tracked, so their error events
28+
// pass through the filter. This is an accepted tradeoff — reaching 1000 entries requires ~20
29+
// consecutive graceful processor stop/start cycles on the same pod instance, which does not occur
30+
// in normal Kubernetes rolling-deploy scenarios where the pod is replaced after each shutdown.
31+
private static readonly ConcurrentDictionary<string, byte> _cancelledTransactionIds = new();
32+
private const int CancelledTransactionIdCap = 1000;
33+
private static int _filterRegistered; // 0 = not registered, 1 = registered
34+
35+
public ApmTransactionManager()
36+
{
37+
// Register the shutdown-cancellation error filter at construction time (application startup),
38+
// not lazily on first OnReceiveCancelled(). During pod graceful shutdown the APM agent flushes
39+
// its buffer concurrently with Service Bus processor teardown — registering the filter after the
40+
// first OperationCanceledException fires loses that race and lets error events escape to APM.
41+
RegisterShutdownErrorFilter();
42+
}
43+
1744
public async Task RunWithInTransaction(MessageExecutionContext executionContext, Func<Task> transaction)
1845
{
1946
if (IsTraceEnabled())
@@ -73,8 +100,45 @@ private static List<SpanLink> GetSpanLinks(string? diagnosticId)
73100

74101
public void OnReceiveCancelled()
75102
{
76-
if (IsTraceEnabled())
77-
Agent.Tracer.CurrentTransaction.Outcome = Outcome.Success;
103+
if (!IsTraceEnabled())
104+
return;
105+
106+
var tx = Agent.Tracer.CurrentTransaction;
107+
if (tx is null) return;
108+
tx.Outcome = Outcome.Success;
109+
if (_cancelledTransactionIds.Count < CancelledTransactionIdCap)
110+
_cancelledTransactionIds.TryAdd(tx.Id, 0);
111+
112+
// Fallback: if the agent was not yet configured when the constructor ran, register now.
113+
RegisterShutdownErrorFilter();
114+
}
115+
116+
private static void RegisterShutdownErrorFilter()
117+
{
118+
if (!Agent.IsConfigured || Interlocked.CompareExchange(ref _filterRegistered, 1, 0) != 0)
119+
return;
120+
121+
// Returning null from the filter drops the error event before it reaches the APM server.
122+
Agent.AddFilter((IError error) =>
123+
{
124+
// Case 1: transaction explicitly tracked via OnReceiveCancelled().
125+
if (error.TransactionId is not null && _cancelledTransactionIds.ContainsKey(error.TransactionId))
126+
return null;
127+
128+
// Case 2: Elastic APM auto-instrumented "AzureServiceBus RECEIVE" transactions.
129+
// The Azure SDK ends its Activity (and therefore the APM transaction) before firing
130+
// ProcessErrorAsync, so Agent.Tracer.CurrentTransaction is null by the time
131+
// OnReceiveCancelled() runs — the transaction ID is never added to
132+
// _cancelledTransactionIds. Identify these by culprit pattern instead.
133+
// After switching to WebSockets transport, TaskCanceledException originating in
134+
// AmqpReceiver.ReceiveMessagesAsyncInternal only occurs during pod graceful shutdown.
135+
if (error.Exception?.Type is "System.Threading.Tasks.TaskCanceledException"
136+
or "System.OperationCanceledException" &&
137+
error.Culprit?.Contains("AmqpReceiver", StringComparison.Ordinal) == true)
138+
return null;
139+
140+
return error;
141+
});
78142
}
79143

80144
private static bool IsTraceEnabled()

0 commit comments

Comments
 (0)