Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 5.7.5
- Fixed
- `ApmTransactionManager` now registers the APM error filter **at construction time** (application startup) instead of lazily inside `OnReceiveCancelled()`. The lazy approach lost a race: during pod graceful shutdown the APM agent flushes its internal buffer concurrently with Service Bus processor teardown, so error events could be sent to APM before `ReceiverWrapper.OnExceptionOccured` ran and had a chance to register the filter. Registering at construction time — before any message processing starts — closes this window.
- **Registration reliability fix (5.7.5-preview4):** the constructor registration silently fails when the Elastic APM hosted service starts *after* the Service Bus hosted service (the typical DI registration order). `RegisterShutdownErrorFilter()` is now also called at the start of every `RunWithInTransaction()` call (guarded by `Interlocked.CompareExchange` — O(1) no-op after the first success) and before the `IsTraceEnabled()` guard in `OnReceiveCancelled()`, ensuring the filter is in place before the first message processing completes regardless of hosted-service start order.
- The filter now also suppresses `TaskCanceledException` / `OperationCanceledException` errors whose culprit originates in `AmqpReceiver.ReceiveMessagesAsyncInternal`. These error events are produced by Elastic APM's **auto-instrumented** Azure Service Bus transactions (`"AzureServiceBus RECEIVE from …"`): the Azure SDK ends its underlying `Activity` (and therefore the APM transaction) before calling `ProcessErrorAsync`, so `Agent.Tracer.CurrentTransaction` is already `null` when `OnReceiveCancelled()` runs — the transaction ID is never added to `_cancelledTransactionIds` and the transaction-ID-based filter path cannot suppress them. After switching to WebSockets transport, `TaskCanceledException` from this code path only occurs during pod graceful shutdown.

## 5.7.4
- Fixed
- Prevented `OperationCanceledException` during pod graceful shutdown from being recorded as APM errors. Added `ICancellationAwareTransactionManager` — an optional interface that `ITransactionManager` implementations can implement to react to receive-loop cancellations. `ApmTransactionManager` implements it by setting the current Elastic APM transaction outcome to `Success`, overriding the error state set by the Azure SDK's auto-instrumentation. `ReceiverWrapper` calls `OnReceiveCancelled()` via a runtime cast before logging the shutdown warning.
Expand Down
111 changes: 108 additions & 3 deletions src/Ev.ServiceBus.Apm/ApmTransactionManager.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Apm;
using Elastic.Apm.Api;
Expand All @@ -14,10 +16,52 @@ namespace Ev.ServiceBus.Apm;
/// </summary>
public class ApmTransactionManager : ITransactionManager, ICancellationAwareTransactionManager
{
// Static: Agent.AddFilter is process-wide; all ApmTransactionManager instances (one per consumer)
// must share one filter registration and one cancelled-transaction set.
// Note for tests: static state is shared across test runs within the same process.
// Call ResetForTests() in test setup/teardown to isolate tests from each other.

// Tracks transaction IDs for which the ASB ProcessErrorAsync callback fired an OperationCanceledException
// (the standard signal that the receive loop is being stopped, most commonly during pod graceful shutdown).
// The error filter below suppresses APM error events for these transactions so that
// shutdown-induced TaskCanceledException entries do not appear in APM.
//
// IDs are removed from this set after the filter matches them (TryRemove) to keep the dictionary lean.
// Cap behaviour: entries beyond CancelledTransactionIdCap are not tracked, so their error events
// fall through to the culprit-based filter path (Case 2 in ShouldSuppressError). This is an accepted
// tradeoff — reaching 1000 entries requires ~20 consecutive graceful processor stop/start cycles on
// the same pod instance, which does not occur in normal Kubernetes rolling-deploy scenarios.
// If the cap were reached by non-AmqpReceiver OperationCanceledException paths (i.e. user-code
// cancellations unrelated to AMQP shutdown), those excess error events would not be suppressed by
// either Case 1 or Case 2 and would reach the APM server. This is acceptable: the conditions
// required to reach the cap via that path are not reachable in practice.
private static readonly ConcurrentDictionary<string, byte> _cancelledTransactionIds = new();
private const int CancelledTransactionIdCap = 1000;
private static int _filterRegistered; // 0 = not registered, 1 = registered (Interlocked.CompareExchange requires int)

// Azure.Messaging.ServiceBus internal class name that appears in the APM error culprit when
// the AMQP receive loop is cancelled. After switching to WebSockets transport (PR #202138),
// this culprit only fires during pod graceful shutdown — not from network drops (which surface
// as ServiceBusException, not TaskCanceledException).
private const string AmqpReceiverCulprit = "AmqpReceiver";

public ApmTransactionManager()
{
// Best-effort registration at construction time. If the Elastic APM hosted service starts
// AFTER the Service Bus hosted service (the typical registration order), Agent.IsConfigured
// is still false here and registration is deferred to the first RunWithInTransaction call.
RegisterShutdownErrorFilter();
}

public async Task RunWithInTransaction(MessageExecutionContext executionContext, Func<Task> transaction)
{
if (IsTraceEnabled())
{
// Ensure the filter is registered before any message processing completes.
// This is the reliable registration point: by the time IsTraceEnabled() returns true,
// Agent.IsConfigured is guaranteed true — covering the case where the APM hosted service
// started after the Service Bus hosted service and the constructor registration was skipped.
RegisterShutdownErrorFilter();
Agent.Tracer.CurrentTransaction.Name = executionContext.ExecutionName;
Agent.Tracer.CurrentTransaction.SetLabel(
nameof(executionContext.ClientType),
Expand Down Expand Up @@ -73,10 +117,71 @@ private static List<SpanLink> GetSpanLinks(string? diagnosticId)

public void OnReceiveCancelled()
{
if (IsTraceEnabled())
Agent.Tracer.CurrentTransaction.Outcome = Outcome.Success;
// Attempt registration before checking IsTraceEnabled — the filter must be in place even
// for auto-instrumented "AzureServiceBus RECEIVE" transactions where CurrentTransaction is
// null (Case 2). After this point it is too late for the current error batch, but
// registering here ensures coverage if RunWithInTransaction was never reached.
RegisterShutdownErrorFilter();

if (!IsTraceEnabled())
return;

var tx = Agent.Tracer.CurrentTransaction;
if (tx is null) return;
tx.Outcome = Outcome.Success;
// Count check and TryAdd are not atomic — under high concurrency the dictionary may reach
// CancelledTransactionIdCap + N entries. The cap is a soft limit; this is intentional.
if (_cancelledTransactionIds.Count < CancelledTransactionIdCap)
_cancelledTransactionIds.TryAdd(tx.Id, 0);
// If Count >= CancelledTransactionIdCap, this ID is not tracked here.
// The culprit-based path (Case 2 in ShouldSuppressError) still suppresses the error.
}

private static void RegisterShutdownErrorFilter()
{
if (!Agent.IsConfigured || Interlocked.CompareExchange(ref _filterRegistered, 1, 0) != 0)
return;

// Returning null from the filter drops the error event before it reaches the APM server.
Agent.AddFilter((IError error) =>
ShouldSuppressError(error.TransactionId, error.Exception?.Type, error.Culprit)
? null
: error);
}

// Extracted for unit testability. Called by the APM filter lambda.
internal static bool ShouldSuppressError(string? transactionId, string? exceptionType, string? culprit)
{
// Case 1: transaction explicitly tracked via OnReceiveCancelled().
// TryRemove keeps the dictionary lean — matched IDs are consumed on first use.
// No exceptionType guard is applied here: _cancelledTransactionIds is populated exclusively
// by OnReceiveCancelled(), which ReceiverWrapper only calls for OperationCanceledException.
// Any ID present in this set therefore already originates from a cancellation path.
if (transactionId is not null && _cancelledTransactionIds.TryRemove(transactionId, out _))
return true;

// Case 2: Elastic APM auto-instrumented "AzureServiceBus RECEIVE" transactions.
// The Azure SDK ends its Activity (and therefore the APM transaction) before firing
// ProcessErrorAsync, so Agent.Tracer.CurrentTransaction is null when OnReceiveCancelled()
// runs — the transaction ID is never added to _cancelledTransactionIds.
// After switching to WebSockets transport, TaskCanceledException originating in
// AmqpReceiver.ReceiveMessagesAsyncInternal only occurs during pod graceful shutdown.
return (exceptionType is "System.Threading.Tasks.TaskCanceledException"
or "System.OperationCanceledException") &&
culprit?.Contains(AmqpReceiverCulprit, StringComparison.Ordinal) == true;
}

// For test isolation only — resets static state between test runs in the same process.
internal static void ResetForTests()
{
_cancelledTransactionIds.Clear();
Interlocked.Exchange(ref _filterRegistered, 0);
}

// For test setup only — seeds a transaction ID as if OnReceiveCancelled() had recorded it.
internal static void AddCancelledTransactionIdForTests(string id) =>
_cancelledTransactionIds.TryAdd(id, 0);

private static bool IsTraceEnabled()
=> Agent.IsConfigured && Agent.Config.Enabled && Agent.Tracer is not null && Agent.Tracer.CurrentTransaction is not null;
}
}
4 changes: 4 additions & 0 deletions src/Ev.ServiceBus.Apm/Ev.ServiceBus.Apm.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@
<PackageReference Include="Elastic.Apm" Version="1.19.0" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Ev.ServiceBus.UnitTests" />
</ItemGroup>

</Project>
125 changes: 125 additions & 0 deletions tests/Ev.ServiceBus.UnitTests/ApmTransactionManagerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
using System;
using Ev.ServiceBus.Apm;
using Xunit;

namespace Ev.ServiceBus.UnitTests;

public sealed class ApmTransactionManagerTests : IDisposable
{
public ApmTransactionManagerTests()
{
ApmTransactionManager.ResetForTests();
}

public void Dispose()
{
ApmTransactionManager.ResetForTests();
}

[Fact]
public void ShouldSuppressError_WithTrackedTransactionId_ReturnsTrue()
{
ApmTransactionManager.AddCancelledTransactionIdForTests("tx-abc");

var result = ApmTransactionManager.ShouldSuppressError(
"tx-abc",
"System.Threading.Tasks.TaskCanceledException",
"some.culprit");

Assert.True(result);
}

[Fact]
public void ShouldSuppressError_WithTrackedTransactionId_RemovesIdAfterMatch()
{
ApmTransactionManager.AddCancelledTransactionIdForTests("tx-abc");
ApmTransactionManager.ShouldSuppressError("tx-abc", "System.Threading.Tasks.TaskCanceledException", "some.culprit");

var secondResult = ApmTransactionManager.ShouldSuppressError(
"tx-abc",
"System.Threading.Tasks.TaskCanceledException",
"some.culprit");

Assert.False(secondResult);
}

[Fact]
public void ShouldSuppressError_WithAmqpReceiverCulpritAndTaskCanceledException_ReturnsTrue()
{
var result = ApmTransactionManager.ShouldSuppressError(
"untracked-tx",
"System.Threading.Tasks.TaskCanceledException",
"Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<ReceiveMessagesAsyncInternal>d__45");

Assert.True(result);
}

[Fact]
public void ShouldSuppressError_WithAmqpReceiverCulpritAndOperationCanceledException_ReturnsTrue()
{
var result = ApmTransactionManager.ShouldSuppressError(
"untracked-tx",
"System.OperationCanceledException",
"Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+SomeInternalMethod");

Assert.True(result);
}

[Fact]
public void ShouldSuppressError_WithAmqpReceiverCulpritButServiceBusException_ReturnsFalse()
{
var result = ApmTransactionManager.ShouldSuppressError(
"untracked-tx",
"Azure.Messaging.ServiceBus.ServiceBusException",
"Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+SomeInternalMethod");

Assert.False(result);
}

[Fact]
public void ShouldSuppressError_WithNonAmqpCulpritAndTaskCanceledException_ReturnsFalse()
{
var result = ApmTransactionManager.ShouldSuppressError(
null,
"System.Threading.Tasks.TaskCanceledException",
"Some.Other.Namespace.SomeClass+SomeMethod");

Assert.False(result);
}

[Fact]
public void ShouldSuppressError_WithUntrackedIdAndNonCancelledException_ReturnsFalse()
{
var result = ApmTransactionManager.ShouldSuppressError(
"not-tracked",
"System.InvalidOperationException",
"Some.Class+SomeMethod");

Assert.False(result);
}

[Fact]
public void ShouldSuppressError_WithNullTransactionIdAndNullCulprit_ReturnsFalse()
{
var result = ApmTransactionManager.ShouldSuppressError(null, null, null);

Assert.False(result);
}

[Fact]
public void ShouldSuppressError_WhenCapExceeded_CulpritPathStillSuppresses()
{
// Simulate the cap-exceeded scenario: OnReceiveCancelled stops tracking IDs once the
// dictionary is full. Errors for those untracked transactions must still be suppressed
// by the culprit-based path (Case 2).
for (var i = 0; i < 1000; i++)
ApmTransactionManager.AddCancelledTransactionIdForTests($"capped-tx-{i}");

var result = ApmTransactionManager.ShouldSuppressError(
"untracked-due-to-cap",
"System.Threading.Tasks.TaskCanceledException",
"Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<ReceiveMessagesAsyncInternal>d__45");

Assert.True(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<ItemGroup>
<ProjectReference Include="..\..\src\Ev.ServiceBus.Abstractions\Ev.ServiceBus.Abstractions.csproj" />
<ProjectReference Include="..\..\src\Ev.ServiceBus\Ev.ServiceBus.csproj" />
<ProjectReference Include="..\..\src\Ev.ServiceBus.Apm\Ev.ServiceBus.Apm.csproj" />
<ProjectReference Include="..\Ev.ServiceBus.TestHelpers\Ev.ServiceBus.TestHelpers.csproj" />
</ItemGroup>
</Project>
Loading