Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cb84fec
chore(messenger): add trace parent in dispatched messages (AMQP)
jvocampings Apr 16, 2025
19e51da
chore(messenger): use Messenger events to start/end span for instrume…
jvocampings Apr 17, 2025
5f8c756
Merge remote-tracking branch 'origin/messenger-propagation' into mess…
jvocampings Apr 18, 2025
d59e8e2
remove strict type, merge start/end span into 1 subscriber, close spa…
jvocampings Apr 18, 2025
0549614
propagation not related to AMQP + read incoming trace (async context)
jvocampings Apr 18, 2025
5a10f47
rename event subscriber
jvocampings Apr 18, 2025
0f8b87f
chore(messenger): use Messenger events to start/end span for instrume…
jvocampings Apr 17, 2025
643d8e6
chore(messenger): add trace parent in dispatched messages (AMQP)
jvocampings Apr 16, 2025
218c380
remove strict type, merge start/end span into 1 subscriber, close spa…
jvocampings Apr 18, 2025
6d18f41
propagation not related to AMQP + read incoming trace (async context)
jvocampings Apr 18, 2025
f855e95
rename event subscriber
jvocampings Apr 18, 2025
46aee74
fix(messenger): clean up worker subscriber and add functional tests
gaelreyrol Apr 10, 2026
e40938d
test(messenger): add tests for transport tracing and propagation midd…
gaelreyrol Apr 10, 2026
40859dc
Merge branch 'messenger-instrumentation' into jvocampings-messenger-i…
gaelreyrol Apr 12, 2026
901d013
fix(messenger): use recordException() instead of manual attributes
gaelreyrol Apr 12, 2026
406c4a5
fix(messenger): unwrap HandlerFailedException in error spans
gaelreyrol Apr 12, 2026
7bfba23
fix(console): make messenger:consume exclusion configurable
gaelreyrol Apr 12, 2026
1947678
test(messenger): add unit test for TraceStampPropagator
gaelreyrol Apr 12, 2026
4a3da42
fix(messenger): propagate tracestate alongside traceparent
gaelreyrol Apr 12, 2026
7236edc
fix(messenger): namespace bus.name attribute as symfony.messenger.bus…
gaelreyrol Apr 12, 2026
efbb5e9
fix(messenger): add scope leak safety in worker subscriber
gaelreyrol Apr 12, 2026
a1cfe96
feat(messenger): add retry awareness, attribute instrumentation, and …
gaelreyrol Apr 12, 2026
f15d9cf
refactor(messenger): improve type safety, observability, and test cov…
gaelreyrol Apr 13, 2026
6912ab2
fix(http-client): use lowercase header key for Content-Length lookup
gaelreyrol Apr 13, 2026
7c15a55
fix(messenger): use explicit scope tracking and record errors on midd…
gaelreyrol Apr 14, 2026
6ff4cc9
fix(messenger): remove erroneous kernel.reset tag from transport factory
gaelreyrol Apr 17, 2026
66387cb
Merge branch 'main' into jvocampings-messenger-instrumentation
gaelreyrol Apr 18, 2026
da3e084
Merge branch 'main' into jvocampings-messenger-instrumentation
gaelreyrol Apr 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ CLAUDE.md
.claude
GEMINI.md
.gemini

9 changes: 7 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
"phpunit/phpunit": "^13.0",
"pyrech/composer-changelogs": "^2.2",
"roave/security-advisories": "dev-master",
"symfony/amqp-messenger": "^7.4",
"symfony/browser-kit": "^7.4",
"symfony/cache": "^7.4",
"symfony/framework-bundle": "^7.4",
Expand Down Expand Up @@ -127,9 +128,13 @@
},
"scripts": {
"check-reqs": "@php tools/composer-require-checker/vendor/bin/composer-require-checker check",
"coverage": [
"coverage:html": [
"@putenv XDEBUG_MODE=coverage",
"@phpunit --coverage-html=coverage"
"@phpunit --coverage-html=coverage/html"
],
"coverage:xml": [
"@putenv XDEBUG_MODE=coverage",
"@phpunit --coverage-clover=coverage/coverage.xml"
],
"format": [
"@php-cs-fixer:fix",
Expand Down
24 changes: 24 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ parameters:
count: 1
path: src/Instrumentation/Doctrine/Middleware/TraceableConnectionV4.php

-
message: '#^Property FriendsOfOpenTelemetry\\OpenTelemetryBundle\\Instrumentation\\Symfony\\Messenger\\AddStampForPropagationMiddleware\:\:\$logger is never read, only written\.$#'
identifier: property.onlyWritten
count: 1
path: src/Instrumentation/Symfony/Messenger/AddStampForPropagationMiddleware.php

-
message: '#^Property FriendsOfOpenTelemetry\\OpenTelemetryBundle\\Instrumentation\\Doctrine\\Middleware\\TraceableStatementV4\:\:\$logger is never read, only written\.$#'
identifier: property.onlyWritten
Expand Down Expand Up @@ -90,6 +96,24 @@ parameters:
count: 1
path: tests/Unit/OpenTelemetry/Trace/TracerProvider/NoopTracerProviderFactoryTest.php

-
message: '#^Method FriendsOfOpenTelemetry\\OpenTelemetryBundle\\OpenTelemetry\\Context\\Propagator\\TraceStampPropagator\:\:set\(\) has parameter \$carrier with no type specified\.$#'
identifier: missingType.parameter
count: 1
path: src/OpenTelemetry/Context/Propagator/TraceStampPropagator.php

-
message: '#^Method FriendsOfOpenTelemetry\\OpenTelemetryBundle\\OpenTelemetry\\Context\\Propagator\\TraceStampPropagator\:\:keys\(\) has parameter \$carrier with no type specified\.$#'
identifier: missingType.parameter
count: 1
path: src/OpenTelemetry/Context/Propagator/TraceStampPropagator.php

-
message: '#^Method FriendsOfOpenTelemetry\\OpenTelemetryBundle\\OpenTelemetry\\Context\\Propagator\\TraceStampPropagator\:\:get\(\) has parameter \$carrier with no type specified\.$#'
identifier: missingType.parameter
count: 1
path: src/OpenTelemetry/Context/Propagator/TraceStampPropagator.php

-
message: '#^Call to an undefined static method ReflectionMethod\:\:createFromMethodName\(\)\.$#'
identifier: staticMethod.notFound
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ public function process(ContainerBuilder $container): void

if ($container->hasParameter('open_telemetry.instrumentation.messenger.type')) {
$messengerInstrumentationType = $container->getParameter('open_telemetry.instrumentation.messenger.type');
if ($container->hasDefinition('open_telemetry.instrumentation.http_kernel.trace.event_subscriber')) {

if ($container->hasDefinition('open_telemetry.instrumentation.messenger.worker')) {
$container->getDefinition('open_telemetry.instrumentation.messenger.worker')
->addMethodCall('setInstrumentationType', [$messengerInstrumentationType]);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/DependencyInjection/Compiler/TracerLocatorPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ public function process(ContainerBuilder $container): void
$traceableHttpKernelEventSubscriber = $container->getDefinition('open_telemetry.instrumentation.http_kernel.trace.event_subscriber');
$this->setTracerLocatorArgument($container, $traceableHttpKernelEventSubscriber, $tracers);
}
if ($container->hasDefinition('open_telemetry.instrumentation.messenger.worker')) {
$messengerWorkerSubscriber = $container->getDefinition('open_telemetry.instrumentation.messenger.worker');
$this->setTracerLocatorArgument($container, $messengerWorkerSubscriber, $tracers);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private function addInstrumentationSection(ArrayNodeDefinition $node): void
(new ArrayNodeDefinition('exclude_commands'))
->info('Exclude commands from auto instrumentation')
->scalarPrototype()->cannotBeEmpty()->end()
->defaultValue(['messenger:consume'])
))
->append($this->getMeteringInstrumentationNode())
->end()
Expand Down
3 changes: 3 additions & 0 deletions src/DependencyInjection/OpenTelemetryExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ private function registerMessengerTracingInstrumentationConfiguration(ContainerB
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.transport');
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.transport_factory');
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace.middleware');
$container->removeDefinition('open_telemetry.instrumentation.messenger.trace_stamp_propagator');
$container->removeDefinition('open_telemetry.instrumentation.messenger.propagation.middleware');
$container->removeDefinition('open_telemetry.instrumentation.messenger.worker');
}

$this->setTracingInstrumentationParams($container, 'messenger', $config, $isConfigEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public static function getSubscribedEvents(): array
];
}

/**
* @return class-string[]
*/
public static function getSubscribedServices(): array
{
return [TracerInterface::class];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ private function endSpan(): void
$statusCode = $this->response->getStatusCode();
if (0 !== $statusCode && $this->span->isRecording()) {
$headers = $this->response->getHeaders(false);
if (isset($headers['Content-Length'])) {
$this->span->setAttribute(HttpIncubatingAttributes::HTTP_RESPONSE_BODY_SIZE, $headers['Content-Length']);
if (isset($headers['content-length'][0])) {
$this->span->setAttribute(HttpIncubatingAttributes::HTTP_RESPONSE_BODY_SIZE, $headers['content-length'][0]);
}

$this->span->setAttribute(HttpAttributes::HTTP_RESPONSE_STATUS_CODE, $statusCode);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger;

use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator\TraceStampPropagator;
use OpenTelemetry\Context\Context;
use OpenTelemetry\Context\Propagation\MultiTextMapPropagator;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;

/**
* Injects the current trace context into the Messenger envelope as a TraceStamp
* so that trace propagation is maintained across asynchronous message boundaries.
*/
final readonly class AddStampForPropagationMiddleware implements MiddlewareInterface
{
public function __construct(
private MultiTextMapPropagator $propagator,
private TraceStampPropagator $traceStampPropagator,
private ?LoggerInterface $logger = null,
) {
}

public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$traceStamp = $envelope->last(TraceStamp::class);

if (null !== $traceStamp) {
return $stack->next()->handle($envelope, $stack);
}

$scope = Context::storage()->scope();

if (null !== $scope) {
// inject() mutates $envelope by reference through the TraceStampPropagator setter,
// because Envelope is immutable and with() returns a new instance.
$this->propagator->inject($envelope, $this->traceStampPropagator, Context::getCurrent());
}

return $stack->next()->handle($envelope, $stack);
}
}
27 changes: 27 additions & 0 deletions src/Instrumentation/Symfony/Messenger/TraceStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger;

use Symfony\Component\Messenger\Stamp\StampInterface;

/**
* @see https://www.w3.org/TR/trace-context/
*/
final readonly class TraceStamp implements StampInterface
{
public function __construct(
private string $traceParent,
private ?string $traceState = null,
) {
}

public function getTraceParent(): string
{
return $this->traceParent;
}

public function getTraceState(): ?string
{
return $this->traceState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope

try {
return $stack->next()->handle($envelope, $stack);
} catch (\Throwable $exception) {
$stack->stop($exception);
throw $exception;
} finally {
$stack->stop();
}
Expand Down
100 changes: 62 additions & 38 deletions src/Instrumentation/Symfony/Messenger/TraceableMessengerStack.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,35 @@

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger;

use OpenTelemetry\API\Trace\SpanInterface;
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\API\Trace\StatusCode;
use OpenTelemetry\API\Trace\TracerInterface;
use OpenTelemetry\Context\Context;
use OpenTelemetry\SDK\Trace\Span;
use OpenTelemetry\Context\ContextInterface;
use OpenTelemetry\Context\ContextStorageScopeInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;

/**
* Wraps a middleware stack to create one span per middleware in the chain.
*
* Follows a "stop previous, start new" pattern (like Symfony's own TraceableStack
* with its Stopwatch): each call to next() closes the span/scope from the prior
* call before opening a new one, so at most one scope is active at any time.
*
* A simpler alternative would be to drop this class entirely and wrap the whole
* $stack->next()->handle() chain in a single span inside TraceableMessengerMiddleware
* (one activate/detach pair in try/finally). That removes per-middleware timing
* granularity but eliminates scope management complexity.
*/
class TraceableMessengerStack implements StackInterface
{
private ?string $currentEvent = null;
private ?ContextStorageScopeInterface $currentScope = null;
private ?SpanInterface $currentSpan = null;
private ?ContextInterface $parentContext = null;

public function __construct(
private TracerInterface $tracer,
Expand All @@ -26,36 +43,31 @@ public function __construct(

public function next(): MiddlewareInterface
{
$scope = Context::storage()->scope();
if (null !== $scope) {
$this->logger?->debug(sprintf('Using scope "%s"', spl_object_id($scope)));
} else {
$this->logger?->debug('No active scope');
// "Stop previous" — close the span/scope from the prior next() call
if (null !== $this->currentScope) {
$this->logger?->debug(sprintf('Detaching scope "%s"', spl_object_id($this->currentScope)));
$this->currentScope->detach();
$this->currentScope = null;

if (null !== $this->currentSpan) {
$this->currentSpan->setStatus(StatusCode::STATUS_OK);
$this->logger?->debug(sprintf('Ending span "%s"', $this->currentSpan->getContext()->getSpanId()));
$this->currentSpan->end();
$this->currentSpan = null;
}
}

/* if (null !== $scope) {
$span = Span::fromContext($scope->context());

if ($span->isRecording()) {
$scope->detach();
// Capture the parent context once (on the first call)
$this->parentContext ??= Context::getCurrent();

$span->setStatus(StatusCode::STATUS_OK);
$this->logger?->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId()));
$span->end();
}
}*/

$spanBuilder = $this->tracer
// "Start new"
$span = $this->tracer
->spanBuilder('messenger.middleware')
->setSpanKind(SpanKind::KIND_INTERNAL)
->setParent($scope?->context())
->setAttribute('event.category', $this->eventCategory)
->setAttribute('bus.name', $this->busName)
;

$parent = Context::getCurrent();

$span = $spanBuilder->setParent($parent)->startSpan();
->setParent($this->parentContext)
->setAttribute('symfony.messenger.event.category', $this->eventCategory)
->setAttribute('symfony.messenger.bus.name', $this->busName)
->startSpan();

$this->logger?->debug(sprintf('Starting span "%s"', $span->getContext()->getSpanId()));

Expand All @@ -66,32 +78,44 @@ public function next(): MiddlewareInterface
}
$this->currentEvent .= sprintf(' on "%s"', $this->busName);

$span->setAttribute('event.current', $this->currentEvent);
$span->setAttribute('symfony.messenger.event.current', $this->currentEvent);

$context = $span->storeInContext($parent);
Context::storage()->attach($context);
$context = $span->storeInContext($this->parentContext);
$this->currentScope = Context::storage()->attach($context);
$this->currentSpan = $span;

return $nextMiddleware;
}

public function stop(): void
public function stop(?\Throwable $throwable = null): void
{
$scope = Context::storage()->scope();
if (null === $scope) {
return;
if (null !== $this->currentScope) {
$this->logger?->debug(sprintf('Detaching scope "%s"', spl_object_id($this->currentScope)));
$this->currentScope->detach();
$this->currentScope = null;
}

$scope->detach();
if (null !== $this->currentSpan) {
if (null !== $throwable) {
$this->currentSpan->recordException($throwable);
$this->currentSpan->setStatus(StatusCode::STATUS_ERROR, $throwable->getMessage());
} else {
$this->currentSpan->setStatus(StatusCode::STATUS_OK);
}
$this->logger?->debug(sprintf('Ending span "%s"', $this->currentSpan->getContext()->getSpanId()));
$this->currentSpan->end();
$this->currentSpan = null;
}

$span = Span::fromContext($scope->context());
$span->setStatus(StatusCode::STATUS_OK);
$this->logger?->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId()));
$span->end();
$this->currentEvent = null;
}

public function __clone()
{
$this->stack = clone $this->stack;
$this->currentScope = null;
$this->currentSpan = null;
$this->parentContext = null;
$this->currentEvent = null;
}
}
Loading
Loading