Skip to content

Commit f15d9cf

Browse files
committed
refactor(messenger): improve type safety, observability, and test coverage
Mark all new types as final for consistency with existing codebase. Register TraceStampPropagator as a shared DI service instead of inline construction. Default instrumentationType to Auto, add safe tracer locator fallback, nullable logger consistency, debug logging on skipped messages and null scopes, and remove dead commented-out code. Add end-to-end propagation test, WorkerMessageEventSubscriber unit tests, and PropagatorFactory test.
1 parent a1cfe96 commit f15d9cf

13 files changed

Lines changed: 378 additions & 41 deletions

File tree

phpstan-baseline.neon

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ parameters:
1212
count: 1
1313
path: src/Instrumentation/Doctrine/Middleware/TraceableConnectionV4.php
1414

15+
-
16+
message: '#^Property FriendsOfOpenTelemetry\\OpenTelemetryBundle\\Instrumentation\\Symfony\\Messenger\\AddStampForPropagationMiddleware\:\:\$logger is never read, only written\.$#'
17+
identifier: property.onlyWritten
18+
count: 1
19+
path: src/Instrumentation/Symfony/Messenger/AddStampForPropagationMiddleware.php
20+
1521
-
1622
message: '#^Property FriendsOfOpenTelemetry\\OpenTelemetryBundle\\Instrumentation\\Doctrine\\Middleware\\TraceableStatementV4\:\:\$logger is never read, only written\.$#'
1723
identifier: property.onlyWritten

src/DependencyInjection/OpenTelemetryExtension.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ private function registerMessengerTracingInstrumentationConfiguration(ContainerB
283283
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.transport');
284284
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.transport_factory');
285285
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.middleware');
286+
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace_stamp_propagator');
286287
$container->removeDefinition('open_telemetry.instrumentation.messenger.propagation.middleware');
287288
$container->removeDefinition('open_telemetry.instrumentation.messenger.worker');
288289
}

src/Instrumentation/Symfony/Messenger/AddStampForPropagationMiddleware.php

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
use Symfony\Component\Messenger\Middleware\StackInterface;
1212

1313
/**
14-
* Be aware the app consuming the message must be able to denormalize the stamp.
14+
* Injects the current trace context into the Messenger envelope as a TraceStamp
15+
* so that trace propagation is maintained across asynchronous message boundaries.
1516
*/
16-
readonly class AddStampForPropagationMiddleware implements MiddlewareInterface
17+
final readonly class AddStampForPropagationMiddleware implements MiddlewareInterface
1718
{
1819
public function __construct(
1920
private MultiTextMapPropagator $propagator,
21+
private TraceStampPropagator $traceStampPropagator,
2022
private ?LoggerInterface $logger = null,
2123
) {
2224
}
@@ -32,7 +34,9 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
3234
$scope = Context::storage()->scope();
3335

3436
if (null !== $scope) {
35-
$this->propagator->inject($envelope, new TraceStampPropagator($this->logger), Context::getCurrent());
37+
// inject() mutates $envelope by reference through the TraceStampPropagator setter,
38+
// because Envelope is immutable and with() returns a new instance.
39+
$this->propagator->inject($envelope, $this->traceStampPropagator, Context::getCurrent());
3640
}
3741

3842
return $stack->next()->handle($envelope, $stack);

src/Instrumentation/Symfony/Messenger/TraceStamp.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@
55
use Symfony\Component\Messenger\Stamp\StampInterface;
66

77
/**
8-
* @doc: https://www.w3.org/TR/trace-context/
9-
*
10-
* You can see how the trace parent generated here: https://github.com/open-telemetry/opentelemetry-php/blob/main/src/API/Trace/Propagation/TraceContextPropagator.php
8+
* @see https://www.w3.org/TR/trace-context/
119
*/
12-
readonly class TraceStamp implements StampInterface
10+
final readonly class TraceStamp implements StampInterface
1311
{
1412
public function __construct(
1513
private string $traceParent,

src/Instrumentation/Symfony/Messenger/TraceableMessengerStack.php

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,6 @@ public function next(): MiddlewareInterface
3333
$this->logger?->debug('No active scope');
3434
}
3535

36-
/* if (null !== $scope) {
37-
$span = Span::fromContext($scope->context());
38-
39-
if ($span->isRecording()) {
40-
$scope->detach();
41-
42-
$span->setStatus(StatusCode::STATUS_OK);
43-
$this->logger?->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId()));
44-
$span->end();
45-
}
46-
}*/
47-
4836
$spanBuilder = $this->tracer
4937
->spanBuilder('messenger.middleware')
5038
->setSpanKind(SpanKind::KIND_INTERNAL)

src/Instrumentation/Symfony/Messenger/WorkerMessageEventSubscriber.php

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,22 @@
2323
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2424
use Symfony\Contracts\Service\ServiceSubscriberInterface;
2525

26-
class WorkerMessageEventSubscriber implements EventSubscriberInterface, ServiceSubscriberInterface, InstrumentationTypeInterface
26+
/**
27+
* Creates consumer-side spans for messages processed by the Symfony Messenger worker,
28+
* with support for both auto and attribute-based instrumentation modes,
29+
* trace context propagation from the producer, and retry/failure metadata.
30+
*/
31+
final class WorkerMessageEventSubscriber implements EventSubscriberInterface, ServiceSubscriberInterface, InstrumentationTypeInterface
2732
{
28-
private ?InstrumentationTypeEnum $instrumentationType = null;
33+
private InstrumentationTypeEnum $instrumentationType = InstrumentationTypeEnum::Auto;
2934

3035
public function __construct(
3136
private readonly MultiTextMapPropagator $propagator,
3237
private readonly TracerInterface $tracer,
3338
/** @var ServiceLocator<TracerInterface> */
3439
private readonly ServiceLocator $tracerLocator,
35-
private readonly LoggerInterface $logger,
40+
private readonly TraceStampPropagator $traceStampPropagator,
41+
private readonly ?LoggerInterface $logger = null,
3642
) {
3743
}
3844

@@ -56,36 +62,43 @@ public static function getSubscribedEvents(): array
5662
];
5763
}
5864

65+
/**
66+
* @return class-string[]
67+
*/
5968
public static function getSubscribedServices(): array
6069
{
61-
return [];
70+
return [TracerInterface::class];
6271
}
6372

6473
public function startSpan(WorkerMessageReceivedEvent $event): void
6574
{
6675
$message = $event->getEnvelope()->getMessage();
76+
$traceable = $this->parseAttribute($message);
77+
78+
if (!$this->isAutoTraceable() && !$this->isAttributeTraceable($traceable)) {
79+
$this->logger?->debug(sprintf('Message "%s" is not traceable, skipping span creation', get_class($message)));
6780

68-
if (false === $this->isAutoTraceable() && false === $this->isAttributeTraceable($message)) {
6981
return;
7082
}
7183

7284
// Clean up any lingering scope from a previous message that was not
73-
// properly ended (e.g. worker killed, unhandled error in another subscriber).
85+
// properly ended (e.g. an exception in another high-priority subscriber
86+
// prevented the handled/failed event from firing).
7487
$previousScope = Context::storage()->scope();
7588
if (null !== $previousScope) {
7689
$previousScope->detach();
7790
$orphanedSpan = Span::fromContext($previousScope->context());
7891
$orphanedSpan->setStatus(StatusCode::STATUS_ERROR, 'Span was not properly ended');
7992
$orphanedSpan->end();
80-
$this->logger->warning(sprintf('Cleaned up orphaned span "%s"', $orphanedSpan->getContext()->getSpanId()));
93+
$this->logger?->warning(sprintf('Cleaned up orphaned span "%s"', $orphanedSpan->getContext()->getSpanId()));
8194
}
8295

8396
// ensure propagation from incoming trace
84-
$context = $this->propagator->extract($event->getEnvelope(), new TraceStampPropagator($this->logger));
97+
$context = $this->propagator->extract($event->getEnvelope(), $this->traceStampPropagator);
8598

8699
$messageClass = get_class($message);
87100

88-
$span = $this->getTracer($message)
101+
$span = $this->getTracer($traceable)
89102
->spanBuilder(sprintf('%s %s', $event->getReceiverName(), $messageClass))
90103
->setParent($context)
91104
->setSpanKind(SpanKind::KIND_CONSUMER)
@@ -99,7 +112,7 @@ public function startSpan(WorkerMessageReceivedEvent $event): void
99112
$span->setAttribute('symfony.messenger.bus.name', $busNameStamp->getBusName());
100113
}
101114

102-
$this->logger->debug(sprintf('Starting span "%s"', $span->getContext()->getSpanId()));
115+
$this->logger?->debug(sprintf('Starting span "%s"', $span->getContext()->getSpanId()));
103116

104117
Context::storage()
105118
->attach(
@@ -113,14 +126,16 @@ public function endSpanWithSuccess(WorkerMessageHandledEvent $event): void
113126
$scope = Context::storage()->scope();
114127

115128
if (null === $scope) {
129+
$this->logger?->debug('No active scope');
130+
116131
return;
117132
}
118133

119134
$scope->detach();
120135

121136
$span = Span::fromContext($scope->context());
122137
$span->setStatus(StatusCode::STATUS_OK);
123-
$this->logger->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId()));
138+
$this->logger?->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId()));
124139
$span->end();
125140
}
126141

@@ -129,6 +144,8 @@ public function endSpanOnError(WorkerMessageFailedEvent $event): void
129144
$scope = Context::storage()->scope();
130145

131146
if (null === $scope) {
147+
$this->logger?->debug('No active scope');
148+
132149
return;
133150
}
134151

@@ -155,7 +172,7 @@ public function endSpanOnError(WorkerMessageFailedEvent $event): void
155172

156173
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
157174

158-
$this->logger->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId()));
175+
$this->logger?->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId()));
159176
$span->end();
160177
}
161178

@@ -167,11 +184,15 @@ private function parseAttribute(object $message): ?Traceable
167184
return $attribute?->newInstance();
168185
}
169186

170-
private function getTracer(object $message): TracerInterface
187+
private function getTracer(?Traceable $traceable): TracerInterface
171188
{
172-
$traceable = $this->parseAttribute($message);
173-
174189
if (null !== $traceable?->tracer) {
190+
if (!$this->tracerLocator->has($traceable->tracer)) {
191+
$this->logger?->warning(sprintf('Tracer "%s" not found in service locator, using default tracer', $traceable->tracer));
192+
193+
return $this->tracer;
194+
}
195+
175196
return $this->tracerLocator->get($traceable->tracer);
176197
}
177198

@@ -183,9 +204,9 @@ private function isAutoTraceable(): bool
183204
return InstrumentationTypeEnum::Auto === $this->instrumentationType;
184205
}
185206

186-
private function isAttributeTraceable(object $message): bool
207+
private function isAttributeTraceable(?Traceable $traceable): bool
187208
{
188209
return InstrumentationTypeEnum::Attribute === $this->instrumentationType
189-
&& $this->parseAttribute($message) instanceof Traceable;
210+
&& null !== $traceable;
190211
}
191212
}

src/OpenTelemetry/Context/Propagator/PropagatorFactory.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator;
77
use OpenTelemetry\Context\Propagation\MultiTextMapPropagator;
88

9-
class PropagatorFactory
9+
final class PropagatorFactory
1010
{
1111
/**
1212
* Default propagators from https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration.

src/OpenTelemetry/Context/Propagator/TraceStampPropagator.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99
use Psr\Log\LoggerInterface;
1010
use Symfony\Component\Messenger\Envelope;
1111

12-
readonly class TraceStampPropagator implements PropagationSetterInterface, PropagationGetterInterface
12+
/**
13+
* Bridges OpenTelemetry's propagation interfaces with Symfony Messenger envelopes,
14+
* reading and writing trace context via TraceStamp instances attached to the envelope.
15+
*/
16+
final readonly class TraceStampPropagator implements PropagationSetterInterface, PropagationGetterInterface
1317
{
1418
public function __construct(
1519
private ?LoggerInterface $logger = null,

src/Resources/config/services_tracing_instrumentation.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger\TraceableMessengerTransportFactory;
1616
use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger\WorkerMessageEventSubscriber;
1717
use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Twig\TraceableTwigExtension;
18+
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator\TraceStampPropagator;
1819
use Symfony\Component\DependencyInjection\Loader\Configurator\ContainerConfigurator;
1920

2021
use function Symfony\Component\DependencyInjection\Loader\Configurator\service;
@@ -106,14 +107,19 @@
106107
->tag('monolog.logger', ['channel' => 'open_telemetry'])
107108
->alias('messenger.middleware.open_telemetry_tracer', 'open_telemetry.instrumentation.messenger.trace.middleware')
108109

110+
->set('open_telemetry.instrumentation.messenger.trace_stamp_propagator', TraceStampPropagator::class)
111+
->tag('monolog.logger', ['channel' => 'open_telemetry'])
112+
109113
->set('open_telemetry.instrumentation.messenger.propagation.middleware', AddStampForPropagationMiddleware::class)
110114
->arg('$propagator', service('open_telemetry.propagator_text_map.multi'))
115+
->arg('$traceStampPropagator', service('open_telemetry.instrumentation.messenger.trace_stamp_propagator'))
111116
->tag('messenger.middleware')
112117
->tag('monolog.logger', ['channel' => 'open_telemetry'])
113118

114119
->set('open_telemetry.instrumentation.messenger.worker', WorkerMessageEventSubscriber::class)
115120
->arg('$propagator', service('open_telemetry.propagator_text_map.multi'))
116121
->arg('$tracer', service('open_telemetry.traces.default_tracer'))
122+
->arg('$traceStampPropagator', service('open_telemetry.instrumentation.messenger.trace_stamp_propagator'))
117123
->tag('kernel.event_subscriber')
118124
->tag('container.service_subscriber')
119125
->tag('monolog.logger', ['channel' => 'open_telemetry'])
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
<?php
2+
3+
namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Tests\Functional\Instrumentation\Messenger;
4+
5+
use App\Kernel;
6+
use App\Message\DummyMessage;
7+
use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger\TraceStamp;
8+
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator\TraceStampPropagator;
9+
use FriendsOfOpenTelemetry\OpenTelemetryBundle\Tests\Functional\TracingTestCaseTrait;
10+
use OpenTelemetry\API\Trace\SpanKind;
11+
use OpenTelemetry\API\Trace\TracerInterface;
12+
use OpenTelemetry\Context\Context;
13+
use OpenTelemetry\Context\Propagation\MultiTextMapPropagator;
14+
use OpenTelemetry\SDK\Trace\StatusData;
15+
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
18+
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
19+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
20+
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
21+
use Zalas\PHPUnit\Globals\Attribute\Env;
22+
23+
#[Env('KERNEL_CLASS', Kernel::class)]
24+
#[Env('APP_ENV', 'auto')]
25+
class MessengerPropagationTracingTest extends KernelTestCase
26+
{
27+
use TracingTestCaseTrait;
28+
29+
private EventDispatcherInterface $eventDispatcher;
30+
31+
protected function setUp(): void
32+
{
33+
self::bootKernel();
34+
35+
$this->eventDispatcher = self::getContainer()->get('event_dispatcher');
36+
}
37+
38+
public function testWorkerSpanIsChildOfProducerTrace(): void
39+
{
40+
/** @var TracerInterface $tracer */
41+
$tracer = self::getContainer()->get('open_telemetry.traces.tracers.main');
42+
43+
// Simulate a producer context (e.g. an HTTP request or CLI command)
44+
$producerSpan = $tracer->spanBuilder('producer-request')->startSpan();
45+
$producerScope = $producerSpan->activate();
46+
47+
// Inject current trace context into an envelope via the propagator
48+
$envelope = new Envelope(new DummyMessage('propagation-test'), [new BusNameStamp('messenger.bus.default')]);
49+
50+
/** @var MultiTextMapPropagator $propagator */
51+
$propagator = self::getContainer()->get('open_telemetry.propagator_text_map.multi');
52+
53+
/** @var TraceStampPropagator $traceStampPropagator */
54+
$traceStampPropagator = self::getContainer()->get('open_telemetry.instrumentation.messenger.trace_stamp_propagator');
55+
$propagator->inject($envelope, $traceStampPropagator, Context::getCurrent());
56+
57+
// End the producer context before worker processing
58+
$producerScope->detach();
59+
$producerSpan->end();
60+
61+
// Assert the TraceStamp was injected
62+
$traceStamp = $envelope->last(TraceStamp::class);
63+
self::assertNotNull($traceStamp, 'TraceStamp should be present on the envelope after injection');
64+
self::assertNotEmpty($traceStamp->getTraceParent());
65+
66+
// Simulate worker processing with the stamped envelope
67+
$this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main'));
68+
$this->eventDispatcher->dispatch(new WorkerMessageHandledEvent($envelope, 'main'));
69+
70+
// Find the producer and consumer spans
71+
$spans = self::getSpans();
72+
self::assertCount(2, $spans);
73+
74+
$producerSpanData = null;
75+
$consumerSpanData = null;
76+
foreach ($spans as $span) {
77+
if ('producer-request' === $span->getName()) {
78+
$producerSpanData = $span;
79+
}
80+
if (SpanKind::KIND_CONSUMER === $span->getKind()) {
81+
$consumerSpanData = $span;
82+
}
83+
}
84+
85+
self::assertNotNull($producerSpanData, 'Producer span should exist');
86+
self::assertNotNull($consumerSpanData, 'Consumer span should exist');
87+
88+
// The consumer span must belong to the same trace
89+
self::assertSame(
90+
$producerSpanData->getContext()->getTraceId(),
91+
$consumerSpanData->getContext()->getTraceId(),
92+
'Worker span should belong to the same trace as the producer span'
93+
);
94+
95+
// The consumer span must be a direct child of the producer span
96+
self::assertSame(
97+
$producerSpanData->getContext()->getSpanId(),
98+
$consumerSpanData->getParentSpanId(),
99+
'Worker span should be a child of the producer span'
100+
);
101+
102+
self::assertSpanStatus($consumerSpanData, StatusData::ok());
103+
}
104+
}

0 commit comments

Comments
 (0)