Skip to content

Commit 38e07e6

Browse files
feat(Instrumentation/Messenger): implement worker and middleware instrumentation (#173)
* chore(messenger): add trace parent in dispatched messages (AMQP) * chore(messenger): use Messenger events to start/end span for instrumentation * remove strict type, merge start/end span into 1 subscriber, close span on error and on message handled * propagation not related to AMQP + read incoming trace (async context) * rename event subscriber * chore(messenger): use Messenger events to start/end span for instrumentation * chore(messenger): add trace parent in dispatched messages (AMQP) * remove strict type, merge start/end span into 1 subscriber, close span on error and on message handled * propagation not related to AMQP + read incoming trace (async context) * rename event subscriber * fix(messenger): clean up worker subscriber and add functional tests Fix several issues in the WorkerMessageEventSubscriber introduced by PR #173: - Replace SDK Span import with API Span to respect API/SDK separation - Implement InstrumentationTypeInterface for consistency with other subscribers - Add event priorities (10000/-10000) to wrap all other processing - Add messaging semantic convention attributes (operation.type, destination.name) - Include message class name in span name for better trace readability - Remove stale imports and duplicate propagation middleware service definition - Clean up propagation middleware when messenger tracing is disabled - Add PHPStan baseline entries for untyped $carrier interface params - Add functional tests for worker message handled, failed, and attribute mode - Reorganize messenger tests into Messenger/ subdirectory - Disable retry on test transport to isolate worker span assertions * test(messenger): add tests for transport tracing and propagation middleware Cover TraceableMessengerTransport (get/ack/reject spans + TransportException error recording) and AddStampForPropagationMiddleware (stamp skip, no-scope passthrough, active-scope injection). * fix(messenger): use recordException() instead of manual attributes Align WorkerMessageEventSubscriber.endSpanOnError() with every other subscriber in the codebase (HttpKernel, Console, Doctrine, Mailer, TransportTracer) by using $span->recordException() which creates a proper OTel exception event with exception.type, exception.message, and exception.stacktrace per the OTel semantic conventions. Update tests to assert on span events rather than span attributes. * fix(messenger): unwrap HandlerFailedException in error spans When Symfony Messenger handlers fail, the exception is wrapped in a HandlerFailedException (or other WrappedExceptionsInterface implementors like DelayedMessageHandlingException). Without unwrapping, the span records the generic wrapper message instead of the actual root causes. Now each nested exception is recorded as a separate span event via recordException(), matching how Sentry handles these wrapped exceptions. * fix(console): make messenger:consume exclusion configurable Replace the hardcoded $notSupportedCommands array and isNotSupported() method with a default value on the existing exclude_commands config node. messenger:consume is now the default entry in console.tracing.exclude_commands, following the same pattern as Sentry's configurable excluded_commands. Users can override the list or clear it entirely via configuration. * test(messenger): add unit test for TraceStampPropagator Cover all branches of set(), get(), and keys() methods: - Valid envelope carrier for set/get/keys - Invalid carrier throws InvalidArgumentException - Non-traceparent key is ignored (set) or returns null (get) - Missing TraceStamp returns null from get() Closes the 34.78% patch coverage gap reported by Codecov. * fix(messenger): propagate tracestate alongside traceparent The W3C Trace Context spec defines two headers: traceparent and tracestate. Previously only traceparent was propagated, silently dropping vendor-specific trace context (Datadog, AWS X-Ray, etc.). Add optional traceState property to TraceStamp and update TraceStampPropagator to handle both keys in set(), get(), and keys(). When tracestate arrives after traceparent, the existing stamp is replaced with one carrying both values. * fix(messenger): namespace bus.name attribute as symfony.messenger.bus.name bus.name is not a standard OTel semantic convention. Rename to symfony.messenger.bus.name to follow the existing codebase pattern (symfony.console.*, symfony.kernel.*). Only changes the worker subscriber attribute; the pre-existing bus.name in TraceableMessengerStack is left unchanged (out of scope). * fix(messenger): add scope leak safety in worker subscriber If startSpan fires but neither endSpanWithSuccess nor endSpanOnError fires (e.g. worker killed, unhandled error in another subscriber), the OTel context scope leaks into subsequent messages. Now startSpan checks for a lingering scope at the beginning of each message and cleans it up: detaches the scope, marks the orphaned span as ERROR, and ends it. This prevents context pollution across messages. * feat(messenger): add retry awareness, attribute instrumentation, and namespace span attributes - Namespace middleware span attributes (bus.name, event.category, event.current) under symfony.messenger.* prefix for consistency with worker subscriber - Add symfony.messenger.will_retry and symfony.messenger.retry_count attributes to failure spans so operators can distinguish retriable from terminal failures - Support #[Traceable] attribute on message classes for attribute-based instrumentation mode, with custom tracer selection via TracerLocatorPass * refactor(messenger): improve type safety, observability, and test coverage Mark all new types as final for consistency with existing codebase. Register TraceStampPropagator as a shared DI service instead of inline construction. Default instrumentationType to Auto, add safe tracer locator fallback, nullable logger consistency, debug logging on skipped messages and null scopes, and remove dead commented-out code. Add end-to-end propagation test, WorkerMessageEventSubscriber unit tests, and PropagatorFactory test. * fix(http-client): use lowercase header key for Content-Length lookup Symfony's getHeaders() normalizes header names to lowercase, so the Content-Length check was unreachable. Access the first array element since headers are returned as arrays. * fix(messenger): use explicit scope tracking and record errors on middleware spans TraceableMessengerStack now tracks its own scope, span, and parent context instead of probing global context storage — fixing scope leaks where earlier middleware spans were never ended. TraceableMessengerMiddleware catches exceptions and records them with STATUS_ERROR on the active span, aligning with every other instrumentation in the bundle. * fix(messenger): remove erroneous kernel.reset tag from transport factory TraceableMessengerTransportFactory was tagged `kernel.reset` with method `reset`, but the class has no such method. Symfony's ServicesResetter invokes the declared method on every tagged service between worker iterations, so `messenger:consume` threw as soon as a retry (or any post-handler reset) ran: Attempted to call an undefined method named "reset" of class "...\TraceableMessengerTransportFactory". The tag was introduced with the original messenger tracing (6f3c032) and never had a corresponding method — likely speculative, possibly confused with messenger transports that sometimes implement ResetInterface. Removal is preferred over adding an empty reset(): the factory's dependencies (inner TransportFactory, TracerInterface, LoggerInterface) are immutable DI services, createTransport() returns a fresh TraceableMessengerTransport without retaining a reference, and supports() is pure — there is no per-message state to clear. An empty reset() would silence the error but imply a ResetInterface-style contract the class does not fulfill and mislead anyone later adding mutable state. --------- Co-authored-by: Gaël Reyrol <me@gaelreyrol.dev>
1 parent e2701b3 commit 38e07e6

40 files changed

Lines changed: 1886 additions & 82 deletions

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,3 @@ CLAUDE.md
2525
.claude
2626
GEMINI.md
2727
.gemini
28-

composer.json

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
"phpunit/phpunit": "^13.0",
6363
"pyrech/composer-changelogs": "^2.2",
6464
"roave/security-advisories": "dev-master",
65+
"symfony/amqp-messenger": "^7.4",
6566
"symfony/browser-kit": "^7.4",
6667
"symfony/cache": "^7.4",
6768
"symfony/framework-bundle": "^7.4",
@@ -127,9 +128,13 @@
127128
},
128129
"scripts": {
129130
"check-reqs": "@php tools/composer-require-checker/vendor/bin/composer-require-checker check",
130-
"coverage": [
131+
"coverage:html": [
131132
"@putenv XDEBUG_MODE=coverage",
132-
"@phpunit --coverage-html=coverage"
133+
"@phpunit --coverage-html=coverage/html"
134+
],
135+
"coverage:xml": [
136+
"@putenv XDEBUG_MODE=coverage",
137+
"@phpunit --coverage-clover=coverage/coverage.xml"
133138
],
134139
"format": [
135140
"@php-cs-fixer:fix",

phpstan-baseline.neon

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ parameters:
1212
count: 1
1313
path: src/Instrumentation/Doctrine/Middleware/TraceableConnectionV4.php
1414

15+
-
16+
message: '#^Property FriendsOfOpenTelemetry\\OpenTelemetryBundle\\Instrumentation\\Symfony\\Messenger\\AddStampForPropagationMiddleware\:\:\$logger is never read, only written\.$#'
17+
identifier: property.onlyWritten
18+
count: 1
19+
path: src/Instrumentation/Symfony/Messenger/AddStampForPropagationMiddleware.php
20+
1521
-
1622
message: '#^Property FriendsOfOpenTelemetry\\OpenTelemetryBundle\\Instrumentation\\Doctrine\\Middleware\\TraceableStatementV4\:\:\$logger is never read, only written\.$#'
1723
identifier: property.onlyWritten
@@ -90,6 +96,24 @@ parameters:
9096
count: 1
9197
path: tests/Unit/OpenTelemetry/Trace/TracerProvider/NoopTracerProviderFactoryTest.php
9298

99+
-
100+
message: '#^Method FriendsOfOpenTelemetry\\OpenTelemetryBundle\\OpenTelemetry\\Context\\Propagator\\TraceStampPropagator\:\:set\(\) has parameter \$carrier with no type specified\.$#'
101+
identifier: missingType.parameter
102+
count: 1
103+
path: src/OpenTelemetry/Context/Propagator/TraceStampPropagator.php
104+
105+
-
106+
message: '#^Method FriendsOfOpenTelemetry\\OpenTelemetryBundle\\OpenTelemetry\\Context\\Propagator\\TraceStampPropagator\:\:keys\(\) has parameter \$carrier with no type specified\.$#'
107+
identifier: missingType.parameter
108+
count: 1
109+
path: src/OpenTelemetry/Context/Propagator/TraceStampPropagator.php
110+
111+
-
112+
message: '#^Method FriendsOfOpenTelemetry\\OpenTelemetryBundle\\OpenTelemetry\\Context\\Propagator\\TraceStampPropagator\:\:get\(\) has parameter \$carrier with no type specified\.$#'
113+
identifier: missingType.parameter
114+
count: 1
115+
path: src/OpenTelemetry/Context/Propagator/TraceStampPropagator.php
116+
93117
-
94118
message: '#^Call to an undefined static method ReflectionMethod\:\:createFromMethodName\(\)\.$#'
95119
identifier: staticMethod.notFound

src/DependencyInjection/Compiler/SetInstrumentationTypePass.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ public function process(ContainerBuilder $container): void
2727

2828
if ($container->hasParameter('open_telemetry.instrumentation.messenger.type')) {
2929
$messengerInstrumentationType = $container->getParameter('open_telemetry.instrumentation.messenger.type');
30-
if ($container->hasDefinition('open_telemetry.instrumentation.http_kernel.trace.event_subscriber')) {
30+
31+
if ($container->hasDefinition('open_telemetry.instrumentation.messenger.worker')) {
32+
$container->getDefinition('open_telemetry.instrumentation.messenger.worker')
33+
->addMethodCall('setInstrumentationType', [$messengerInstrumentationType]);
3134
}
3235
}
3336
}

src/DependencyInjection/Compiler/TracerLocatorPass.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ public function process(ContainerBuilder $container): void
2323
$traceableHttpKernelEventSubscriber = $container->getDefinition('open_telemetry.instrumentation.http_kernel.trace.event_subscriber');
2424
$this->setTracerLocatorArgument($container, $traceableHttpKernelEventSubscriber, $tracers);
2525
}
26+
if ($container->hasDefinition('open_telemetry.instrumentation.messenger.worker')) {
27+
$messengerWorkerSubscriber = $container->getDefinition('open_telemetry.instrumentation.messenger.worker');
28+
$this->setTracerLocatorArgument($container, $messengerWorkerSubscriber, $tracers);
29+
}
2630
}
2731
}
2832

src/DependencyInjection/Configuration.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ private function addInstrumentationSection(ArrayNodeDefinition $node): void
104104
(new ArrayNodeDefinition('exclude_commands'))
105105
->info('Exclude commands from auto instrumentation')
106106
->scalarPrototype()->cannotBeEmpty()->end()
107+
->defaultValue(['messenger:consume'])
107108
))
108109
->append($this->getMeteringInstrumentationNode())
109110
->end()

src/DependencyInjection/OpenTelemetryExtension.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,9 @@ private function registerMessengerTracingInstrumentationConfiguration(ContainerB
283283
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.transport');
284284
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.transport_factory');
285285
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.middleware');
286+
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace_stamp_propagator');
287+
$container->removeDefinition('open_telemetry.instrumentation.messenger.propagation.middleware');
288+
$container->removeDefinition('open_telemetry.instrumentation.messenger.worker');
286289
}
287290

288291
$this->setTracingInstrumentationParams($container, 'messenger', $config, $isConfigEnabled);

src/Instrumentation/Symfony/Console/TraceableConsoleEventSubscriber.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public static function getSubscribedEvents(): array
5656
];
5757
}
5858

59+
/**
60+
* @return class-string[]
61+
*/
5962
public static function getSubscribedServices(): array
6063
{
6164
return [TracerInterface::class];

src/Instrumentation/Symfony/HttpClient/TraceableResponse.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ private function endSpan(): void
138138
$statusCode = $this->response->getStatusCode();
139139
if (0 !== $statusCode && $this->span->isRecording()) {
140140
$headers = $this->response->getHeaders(false);
141-
if (isset($headers['Content-Length'])) {
142-
$this->span->setAttribute(HttpIncubatingAttributes::HTTP_RESPONSE_BODY_SIZE, $headers['Content-Length']);
141+
if (isset($headers['content-length'][0])) {
142+
$this->span->setAttribute(HttpIncubatingAttributes::HTTP_RESPONSE_BODY_SIZE, $headers['content-length'][0]);
143143
}
144144

145145
$this->span->setAttribute(HttpAttributes::HTTP_RESPONSE_STATUS_CODE, $statusCode);
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger;
4+
5+
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator\TraceStampPropagator;
6+
use OpenTelemetry\Context\Context;
7+
use OpenTelemetry\Context\Propagation\MultiTextMapPropagator;
8+
use Psr\Log\LoggerInterface;
9+
use Symfony\Component\Messenger\Envelope;
10+
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
11+
use Symfony\Component\Messenger\Middleware\StackInterface;
12+
13+
/**
14+
* Injects the current trace context into the Messenger envelope as a TraceStamp
15+
* so that trace propagation is maintained across asynchronous message boundaries.
16+
*/
17+
final readonly class AddStampForPropagationMiddleware implements MiddlewareInterface
18+
{
19+
public function __construct(
20+
private MultiTextMapPropagator $propagator,
21+
private TraceStampPropagator $traceStampPropagator,
22+
private ?LoggerInterface $logger = null,
23+
) {
24+
}
25+
26+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
27+
{
28+
$traceStamp = $envelope->last(TraceStamp::class);
29+
30+
if (null !== $traceStamp) {
31+
return $stack->next()->handle($envelope, $stack);
32+
}
33+
34+
$scope = Context::storage()->scope();
35+
36+
if (null !== $scope) {
37+
// inject() mutates $envelope by reference through the TraceStampPropagator setter,
38+
// because Envelope is immutable and with() returns a new instance.
39+
$this->propagator->inject($envelope, $this->traceStampPropagator, Context::getCurrent());
40+
}
41+
42+
return $stack->next()->handle($envelope, $stack);
43+
}
44+
}

0 commit comments

Comments
 (0)