44using TurboHTTP . Diagnostics ;
55using TurboHTTP . Streams . Stages . Features ;
66using TurboHTTP . Tests . Shared ;
7+ using Activity = System . Diagnostics . Activity ;
78using ActivityListener = System . Diagnostics . ActivityListener ;
89using ActivitySamplingResult = System . Diagnostics . ActivitySamplingResult ;
910using ActivitySource = System . Diagnostics . ActivitySource ;
@@ -17,10 +18,20 @@ public sealed class TracingActivityLeakSpec : StreamTestBase
1718 public async Task TracingBidiStage_should_stop_activity_when_stage_tears_down_without_response ( )
1819 {
1920 var stoppedTcs = new TaskCompletionSource ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
21+ Activity ? capturedActivity = null ;
2022
2123 using var listener = new ActivityListener ( ) ;
2224 listener . ShouldListenTo = source => source . Name == TurboHttpInstrumentation . SourceName ;
2325 listener . Sample = ( ref _ ) => ActivitySamplingResult. AllData ;
26+ // Wire ActivityStopped before AddActivityListener so the callback is always
27+ // registered before the Akka dispatch thread can call PostStop.
28+ listener . ActivityStopped = stopped =>
29+ {
30+ if ( capturedActivity is { } a && ReferenceEquals ( stopped , a ) )
31+ {
32+ stoppedTcs . TrySetResult ( ) ;
33+ }
34+ } ;
2435 ActivitySource . AddActivityListener ( listener ) ;
2536
2637 var stage = new TracingBidiStage ( ) ;
@@ -55,13 +66,7 @@ public async Task TracingBidiStage_should_stop_activity_when_stage_tears_down_wi
5566 Assert . True ( forwarded . Options . TryGetValue ( TurboHttpInstrumentation . RequestActivityKey , out var activity ) ) ;
5667 Assert . NotNull ( activity ) ;
5768
58- listener . ActivityStopped = stopped =>
59- {
60- if ( ReferenceEquals ( stopped , activity ) )
61- {
62- stoppedTcs . TrySetResult ( ) ;
63- }
64- } ;
69+ capturedActivity = activity ;
6570
6671 // Tear down the stage — complete both inlets and cancel downstream
6772 reqInSub . SendComplete ( ) ;
0 commit comments