Skip to content

Commit a1cfe96

Browse files
committed
feat(messenger): add retry awareness, attribute instrumentation, and namespace span attributes
- Namespace middleware span attributes (bus.name, event.category, event.current) under symfony.messenger.* prefix for consistency with worker subscriber - Add symfony.messenger.will_retry and symfony.messenger.retry_count attributes to failure spans so operators can distinguish retriable from terminal failures - Support #[Traceable] attribute on message classes for attribute-based instrumentation mode, with custom tracer selection via TracerLocatorPass
1 parent efbb5e9 commit a1cfe96

13 files changed

Lines changed: 230 additions & 30 deletions

File tree

src/DependencyInjection/Compiler/TracerLocatorPass.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ public function process(ContainerBuilder $container): void
2323
$traceableHttpKernelEventSubscriber = $container->getDefinition('open_telemetry.instrumentation.http_kernel.trace.event_subscriber');
2424
$this->setTracerLocatorArgument($container, $traceableHttpKernelEventSubscriber, $tracers);
2525
}
26+
if ($container->hasDefinition('open_telemetry.instrumentation.messenger.worker')) {
27+
$messengerWorkerSubscriber = $container->getDefinition('open_telemetry.instrumentation.messenger.worker');
28+
$this->setTracerLocatorArgument($container, $messengerWorkerSubscriber, $tracers);
29+
}
2630
}
2731
}
2832

src/Instrumentation/Symfony/Messenger/TraceableMessengerStack.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ public function next(): MiddlewareInterface
4949
->spanBuilder('messenger.middleware')
5050
->setSpanKind(SpanKind::KIND_INTERNAL)
5151
->setParent($scope?->context())
52-
->setAttribute('event.category', $this->eventCategory)
53-
->setAttribute('bus.name', $this->busName)
52+
->setAttribute('symfony.messenger.event.category', $this->eventCategory)
53+
->setAttribute('symfony.messenger.bus.name', $this->busName)
5454
;
5555

5656
$parent = Context::getCurrent();
@@ -66,7 +66,7 @@ public function next(): MiddlewareInterface
6666
}
6767
$this->currentEvent .= sprintf(' on "%s"', $this->busName);
6868

69-
$span->setAttribute('event.current', $this->currentEvent);
69+
$span->setAttribute('symfony.messenger.event.current', $this->currentEvent);
7070

7171
$context = $span->storeInContext($parent);
7272
Context::storage()->attach($context);

src/Instrumentation/Symfony/Messenger/WorkerMessageEventSubscriber.php

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,36 @@
22

33
namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger;
44

5+
use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Attribute\Traceable;
56
use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\InstrumentationTypeEnum;
67
use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\InstrumentationTypeInterface;
78
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator\TraceStampPropagator;
89
use OpenTelemetry\API\Trace\Span;
910
use OpenTelemetry\API\Trace\SpanKind;
1011
use OpenTelemetry\API\Trace\StatusCode;
11-
use Symfony\Component\Messenger\Exception\WrappedExceptionsInterface;
1212
use OpenTelemetry\API\Trace\TracerInterface;
1313
use OpenTelemetry\Context\Context;
1414
use OpenTelemetry\Context\Propagation\MultiTextMapPropagator;
1515
use Psr\Log\LoggerInterface;
16+
use Symfony\Component\DependencyInjection\ServiceLocator;
1617
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
1718
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
1819
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
1920
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
21+
use Symfony\Component\Messenger\Exception\WrappedExceptionsInterface;
2022
use Symfony\Component\Messenger\Stamp\BusNameStamp;
23+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
24+
use Symfony\Contracts\Service\ServiceSubscriberInterface;
2125

22-
class WorkerMessageEventSubscriber implements EventSubscriberInterface, InstrumentationTypeInterface
26+
class WorkerMessageEventSubscriber implements EventSubscriberInterface, ServiceSubscriberInterface, InstrumentationTypeInterface
2327
{
2428
private ?InstrumentationTypeEnum $instrumentationType = null;
2529

2630
public function __construct(
2731
private readonly MultiTextMapPropagator $propagator,
2832
private readonly TracerInterface $tracer,
33+
/** @var ServiceLocator<TracerInterface> */
34+
private readonly ServiceLocator $tracerLocator,
2935
private readonly LoggerInterface $logger,
3036
) {
3137
}
@@ -50,9 +56,16 @@ public static function getSubscribedEvents(): array
5056
];
5157
}
5258

59+
public static function getSubscribedServices(): array
60+
{
61+
return [];
62+
}
63+
5364
public function startSpan(WorkerMessageReceivedEvent $event): void
5465
{
55-
if (InstrumentationTypeEnum::Auto !== $this->instrumentationType) {
66+
$message = $event->getEnvelope()->getMessage();
67+
68+
if (false === $this->isAutoTraceable() && false === $this->isAttributeTraceable($message)) {
5669
return;
5770
}
5871

@@ -70,9 +83,9 @@ public function startSpan(WorkerMessageReceivedEvent $event): void
7083
// ensure propagation from incoming trace
7184
$context = $this->propagator->extract($event->getEnvelope(), new TraceStampPropagator($this->logger));
7285

73-
$messageClass = get_class($event->getEnvelope()->getMessage());
86+
$messageClass = get_class($message);
7487

75-
$span = $this->tracer
88+
$span = $this->getTracer($message)
7689
->spanBuilder(sprintf('%s %s', $event->getReceiverName(), $messageClass))
7790
->setParent($context)
7891
->setSpanKind(SpanKind::KIND_CONSUMER)
@@ -97,10 +110,6 @@ public function startSpan(WorkerMessageReceivedEvent $event): void
97110

98111
public function endSpanWithSuccess(WorkerMessageHandledEvent $event): void
99112
{
100-
if (InstrumentationTypeEnum::Auto !== $this->instrumentationType) {
101-
return;
102-
}
103-
104113
$scope = Context::storage()->scope();
105114

106115
if (null === $scope) {
@@ -117,10 +126,6 @@ public function endSpanWithSuccess(WorkerMessageHandledEvent $event): void
117126

118127
public function endSpanOnError(WorkerMessageFailedEvent $event): void
119128
{
120-
if (InstrumentationTypeEnum::Auto !== $this->instrumentationType) {
121-
return;
122-
}
123-
124129
$scope = Context::storage()->scope();
125130

126131
if (null === $scope) {
@@ -130,6 +135,14 @@ public function endSpanOnError(WorkerMessageFailedEvent $event): void
130135
$scope->detach();
131136

132137
$span = Span::fromContext($scope->context());
138+
139+
$span->setAttribute('symfony.messenger.will_retry', $event->willRetry());
140+
141+
$redeliveryStamp = $event->getEnvelope()->last(RedeliveryStamp::class);
142+
if (null !== $redeliveryStamp) {
143+
$span->setAttribute('symfony.messenger.retry_count', $redeliveryStamp->getRetryCount());
144+
}
145+
133146
$exception = $event->getThrowable();
134147

135148
if ($exception instanceof WrappedExceptionsInterface) {
@@ -145,4 +158,34 @@ public function endSpanOnError(WorkerMessageFailedEvent $event): void
145158
$this->logger->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId()));
146159
$span->end();
147160
}
161+
162+
private function parseAttribute(object $message): ?Traceable
163+
{
164+
$reflection = new \ReflectionClass($message);
165+
$attribute = $reflection->getAttributes(Traceable::class)[0] ?? null;
166+
167+
return $attribute?->newInstance();
168+
}
169+
170+
private function getTracer(object $message): TracerInterface
171+
{
172+
$traceable = $this->parseAttribute($message);
173+
174+
if (null !== $traceable?->tracer) {
175+
return $this->tracerLocator->get($traceable->tracer);
176+
}
177+
178+
return $this->tracer;
179+
}
180+
181+
private function isAutoTraceable(): bool
182+
{
183+
return InstrumentationTypeEnum::Auto === $this->instrumentationType;
184+
}
185+
186+
private function isAttributeTraceable(object $message): bool
187+
{
188+
return InstrumentationTypeEnum::Attribute === $this->instrumentationType
189+
&& $this->parseAttribute($message) instanceof Traceable;
190+
}
148191
}

src/Resources/config/services_tracing_instrumentation.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,9 @@
114114
->set('open_telemetry.instrumentation.messenger.worker', WorkerMessageEventSubscriber::class)
115115
->arg('$propagator', service('open_telemetry.propagator_text_map.multi'))
116116
->arg('$tracer', service('open_telemetry.traces.default_tracer'))
117-
->tag('monolog.logger', ['channel' => 'open_telemetry'])
118117
->tag('kernel.event_subscriber')
118+
->tag('container.service_subscriber')
119+
->tag('monolog.logger', ['channel' => 'open_telemetry'])
119120

120121
// Twig
121122
->set('open_telemetry.instrumentation.twig.trace.extension', TraceableTwigExtension::class)
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
namespace App\Message;
4+
5+
use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Attribute\Traceable;
6+
7+
#[Traceable(tracer: 'open_telemetry.traces.tracers.fallback')]
8+
final readonly class FallbackTracerMessage
9+
{
10+
public function __construct(
11+
public string $name,
12+
) {
13+
}
14+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
namespace App\Message;
4+
5+
use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Attribute\Traceable;
6+
7+
#[Traceable]
8+
final readonly class TraceableMessage
9+
{
10+
public function __construct(
11+
public string $name,
12+
) {
13+
}
14+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
namespace App\MessageHandler;
4+
5+
use App\Message\FallbackTracerMessage;
6+
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
7+
8+
#[AsMessageHandler]
9+
class FallbackTracerMessageHandler
10+
{
11+
public function __invoke(FallbackTracerMessage $message): string
12+
{
13+
return $message->name;
14+
}
15+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
namespace App\MessageHandler;
4+
5+
use App\Message\TraceableMessage;
6+
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
7+
8+
#[AsMessageHandler]
9+
class TraceableMessageHandler
10+
{
11+
public function __invoke(TraceableMessage $message): string
12+
{
13+
return $message->name;
14+
}
15+
}

tests/Functional/Instrumentation/MailerTracingTest.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ public function testSend(): void
4444
self::assertSpanName($middlewareSpan, 'messenger.middleware');
4545
self::assertSpanStatus($middlewareSpan, StatusData::ok());
4646
self::assertSpanAttributes($middlewareSpan, [
47-
'event.category' => 'messenger.middleware',
48-
'bus.name' => 'default',
49-
'event.current' => '"Symfony\Component\Messenger\Middleware\StackMiddleware" on "default"',
47+
'symfony.messenger.event.category' => 'messenger.middleware',
48+
'symfony.messenger.bus.name' => 'default',
49+
'symfony.messenger.event.current' => '"Symfony\Component\Messenger\Middleware\StackMiddleware" on "default"',
5050
]);
5151
self::assertSpanEventsCount($middlewareSpan, 0);
5252

@@ -92,9 +92,9 @@ public function testSendException(): void
9292
self::assertSpanName($middlewareSpan, 'messenger.middleware');
9393
self::assertSpanStatus($middlewareSpan, StatusData::ok());
9494
self::assertSpanAttributes($middlewareSpan, [
95-
'event.category' => 'messenger.middleware',
96-
'bus.name' => 'default',
97-
'event.current' => '"Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" on "default"',
95+
'symfony.messenger.event.category' => 'messenger.middleware',
96+
'symfony.messenger.bus.name' => 'default',
97+
'symfony.messenger.event.current' => '"Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" on "default"',
9898
]);
9999
self::assertSpanEventsCount($middlewareSpan, 0);
100100

tests/Functional/Instrumentation/Messenger/MessengerDispatchTracingTest.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ public function testDispatch(): void
4141
self::assertSpanName($middlewareSpan, 'messenger.middleware');
4242
self::assertSpanStatus($middlewareSpan, StatusData::ok());
4343
self::assertSpanAttributes($middlewareSpan, [
44-
'event.category' => 'messenger.middleware',
45-
'bus.name' => 'default',
46-
'event.current' => '"Symfony\Component\Messenger\Middleware\SendMessageMiddleware" on "default"',
44+
'symfony.messenger.event.category' => 'messenger.middleware',
45+
'symfony.messenger.bus.name' => 'default',
46+
'symfony.messenger.event.current' => '"Symfony\Component\Messenger\Middleware\SendMessageMiddleware" on "default"',
4747
]);
4848
self::assertSpanEventsCount($middlewareSpan, 0);
4949
}
@@ -64,9 +64,9 @@ public function testException(): void
6464
self::assertSpanName($middlewareSpan, 'messenger.middleware');
6565
self::assertSpanStatus($middlewareSpan, StatusData::ok());
6666
self::assertSpanAttributes($middlewareSpan, [
67-
'event.category' => 'messenger.middleware',
68-
'bus.name' => 'default',
69-
'event.current' => '"Symfony\Component\Messenger\Middleware\SendMessageMiddleware" on "default"',
67+
'symfony.messenger.event.category' => 'messenger.middleware',
68+
'symfony.messenger.bus.name' => 'default',
69+
'symfony.messenger.event.current' => '"Symfony\Component\Messenger\Middleware\SendMessageMiddleware" on "default"',
7070
]);
7171
self::assertSpanEventsCount($middlewareSpan, 0);
7272
}

0 commit comments

Comments
 (0)