diff --git a/.gitignore b/.gitignore index f77a7266..b7ad5a99 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,3 @@ CLAUDE.md .claude GEMINI.md .gemini - diff --git a/composer.json b/composer.json index 8d979aee..50bd870e 100644 --- a/composer.json +++ b/composer.json @@ -62,6 +62,7 @@ "phpunit/phpunit": "^13.0", "pyrech/composer-changelogs": "^2.2", "roave/security-advisories": "dev-master", + "symfony/amqp-messenger": "^7.4", "symfony/browser-kit": "^7.4", "symfony/cache": "^7.4", "symfony/framework-bundle": "^7.4", @@ -127,9 +128,13 @@ }, "scripts": { "check-reqs": "@php tools/composer-require-checker/vendor/bin/composer-require-checker check", - "coverage": [ + "coverage:html": [ "@putenv XDEBUG_MODE=coverage", - "@phpunit --coverage-html=coverage" + "@phpunit --coverage-html=coverage/html" + ], + "coverage:xml": [ + "@putenv XDEBUG_MODE=coverage", + "@phpunit --coverage-clover=coverage/coverage.xml" ], "format": [ "@php-cs-fixer:fix", diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index f17054a8..c7fe50d5 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -12,6 +12,12 @@ parameters: count: 1 path: src/Instrumentation/Doctrine/Middleware/TraceableConnectionV4.php + - + message: '#^Property FriendsOfOpenTelemetry\\OpenTelemetryBundle\\Instrumentation\\Symfony\\Messenger\\AddStampForPropagationMiddleware\:\:\$logger is never read, only written\.$#' + identifier: property.onlyWritten + count: 1 + path: src/Instrumentation/Symfony/Messenger/AddStampForPropagationMiddleware.php + - message: '#^Property FriendsOfOpenTelemetry\\OpenTelemetryBundle\\Instrumentation\\Doctrine\\Middleware\\TraceableStatementV4\:\:\$logger is never read, only written\.$#' identifier: property.onlyWritten @@ -90,6 +96,24 @@ parameters: count: 1 path: tests/Unit/OpenTelemetry/Trace/TracerProvider/NoopTracerProviderFactoryTest.php + - + message: '#^Method FriendsOfOpenTelemetry\\OpenTelemetryBundle\\OpenTelemetry\\Context\\Propagator\\TraceStampPropagator\:\:set\(\) has parameter \$carrier with no type specified\.$#' + identifier: missingType.parameter + count: 1 + path: src/OpenTelemetry/Context/Propagator/TraceStampPropagator.php + + - + message: '#^Method FriendsOfOpenTelemetry\\OpenTelemetryBundle\\OpenTelemetry\\Context\\Propagator\\TraceStampPropagator\:\:keys\(\) has parameter \$carrier with no type specified\.$#' + identifier: missingType.parameter + count: 1 + path: src/OpenTelemetry/Context/Propagator/TraceStampPropagator.php + + - + message: '#^Method FriendsOfOpenTelemetry\\OpenTelemetryBundle\\OpenTelemetry\\Context\\Propagator\\TraceStampPropagator\:\:get\(\) has parameter \$carrier with no type specified\.$#' + identifier: missingType.parameter + count: 1 + path: src/OpenTelemetry/Context/Propagator/TraceStampPropagator.php + - message: '#^Call to an undefined static method ReflectionMethod\:\:createFromMethodName\(\)\.$#' identifier: staticMethod.notFound diff --git a/src/DependencyInjection/Compiler/SetInstrumentationTypePass.php b/src/DependencyInjection/Compiler/SetInstrumentationTypePass.php index 103dd371..7da70cc1 100644 --- a/src/DependencyInjection/Compiler/SetInstrumentationTypePass.php +++ b/src/DependencyInjection/Compiler/SetInstrumentationTypePass.php @@ -27,7 +27,10 @@ public function process(ContainerBuilder $container): void if ($container->hasParameter('open_telemetry.instrumentation.messenger.type')) { $messengerInstrumentationType = $container->getParameter('open_telemetry.instrumentation.messenger.type'); - if ($container->hasDefinition('open_telemetry.instrumentation.http_kernel.trace.event_subscriber')) { + + if ($container->hasDefinition('open_telemetry.instrumentation.messenger.worker')) { + $container->getDefinition('open_telemetry.instrumentation.messenger.worker') + ->addMethodCall('setInstrumentationType', [$messengerInstrumentationType]); } } } diff --git a/src/DependencyInjection/Compiler/TracerLocatorPass.php b/src/DependencyInjection/Compiler/TracerLocatorPass.php index 2d4306f1..4dbe6ffb 100644 --- a/src/DependencyInjection/Compiler/TracerLocatorPass.php +++ b/src/DependencyInjection/Compiler/TracerLocatorPass.php @@ -23,6 +23,10 @@ public function process(ContainerBuilder $container): void $traceableHttpKernelEventSubscriber = $container->getDefinition('open_telemetry.instrumentation.http_kernel.trace.event_subscriber'); $this->setTracerLocatorArgument($container, $traceableHttpKernelEventSubscriber, $tracers); } + if ($container->hasDefinition('open_telemetry.instrumentation.messenger.worker')) { + $messengerWorkerSubscriber = $container->getDefinition('open_telemetry.instrumentation.messenger.worker'); + $this->setTracerLocatorArgument($container, $messengerWorkerSubscriber, $tracers); + } } } diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php index 7f9945cd..2dc970cf 100644 --- a/src/DependencyInjection/Configuration.php +++ b/src/DependencyInjection/Configuration.php @@ -104,6 +104,7 @@ private function addInstrumentationSection(ArrayNodeDefinition $node): void (new ArrayNodeDefinition('exclude_commands')) ->info('Exclude commands from auto instrumentation') ->scalarPrototype()->cannotBeEmpty()->end() + ->defaultValue(['messenger:consume']) )) ->append($this->getMeteringInstrumentationNode()) ->end() diff --git a/src/DependencyInjection/OpenTelemetryExtension.php b/src/DependencyInjection/OpenTelemetryExtension.php index cb1068c0..399f52ba 100644 --- a/src/DependencyInjection/OpenTelemetryExtension.php +++ b/src/DependencyInjection/OpenTelemetryExtension.php @@ -283,6 +283,9 @@ private function registerMessengerTracingInstrumentationConfiguration(ContainerB $container->removeDefinition('open_telemetry.instrumentation.messenger.trace.transport'); $container->removeDefinition('open_telemetry.instrumentation.messenger.trace.transport_factory'); $container->removeDefinition('open_telemetry.instrumentation.messenger.trace.middleware'); + $container->removeDefinition('open_telemetry.instrumentation.messenger.trace_stamp_propagator'); + $container->removeDefinition('open_telemetry.instrumentation.messenger.propagation.middleware'); + $container->removeDefinition('open_telemetry.instrumentation.messenger.worker'); } $this->setTracingInstrumentationParams($container, 'messenger', $config, $isConfigEnabled); diff --git a/src/Instrumentation/Symfony/Console/TraceableConsoleEventSubscriber.php b/src/Instrumentation/Symfony/Console/TraceableConsoleEventSubscriber.php index 4fd6b70d..f95e15cb 100644 --- a/src/Instrumentation/Symfony/Console/TraceableConsoleEventSubscriber.php +++ b/src/Instrumentation/Symfony/Console/TraceableConsoleEventSubscriber.php @@ -56,6 +56,9 @@ public static function getSubscribedEvents(): array ]; } + /** + * @return class-string[] + */ public static function getSubscribedServices(): array { return [TracerInterface::class]; diff --git a/src/Instrumentation/Symfony/HttpClient/TraceableResponse.php b/src/Instrumentation/Symfony/HttpClient/TraceableResponse.php index 5ba38c17..fa333d91 100644 --- a/src/Instrumentation/Symfony/HttpClient/TraceableResponse.php +++ b/src/Instrumentation/Symfony/HttpClient/TraceableResponse.php @@ -138,8 +138,8 @@ private function endSpan(): void $statusCode = $this->response->getStatusCode(); if (0 !== $statusCode && $this->span->isRecording()) { $headers = $this->response->getHeaders(false); - if (isset($headers['Content-Length'])) { - $this->span->setAttribute(HttpIncubatingAttributes::HTTP_RESPONSE_BODY_SIZE, $headers['Content-Length']); + if (isset($headers['content-length'][0])) { + $this->span->setAttribute(HttpIncubatingAttributes::HTTP_RESPONSE_BODY_SIZE, $headers['content-length'][0]); } $this->span->setAttribute(HttpAttributes::HTTP_RESPONSE_STATUS_CODE, $statusCode); diff --git a/src/Instrumentation/Symfony/Messenger/AddStampForPropagationMiddleware.php b/src/Instrumentation/Symfony/Messenger/AddStampForPropagationMiddleware.php new file mode 100644 index 00000000..2ef52905 --- /dev/null +++ b/src/Instrumentation/Symfony/Messenger/AddStampForPropagationMiddleware.php @@ -0,0 +1,44 @@ +last(TraceStamp::class); + + if (null !== $traceStamp) { + return $stack->next()->handle($envelope, $stack); + } + + $scope = Context::storage()->scope(); + + if (null !== $scope) { + // inject() mutates $envelope by reference through the TraceStampPropagator setter, + // because Envelope is immutable and with() returns a new instance. + $this->propagator->inject($envelope, $this->traceStampPropagator, Context::getCurrent()); + } + + return $stack->next()->handle($envelope, $stack); + } +} diff --git a/src/Instrumentation/Symfony/Messenger/TraceStamp.php b/src/Instrumentation/Symfony/Messenger/TraceStamp.php new file mode 100644 index 00000000..b69e02aa --- /dev/null +++ b/src/Instrumentation/Symfony/Messenger/TraceStamp.php @@ -0,0 +1,27 @@ +traceParent; + } + + public function getTraceState(): ?string + { + return $this->traceState; + } +} diff --git a/src/Instrumentation/Symfony/Messenger/TraceableMessengerMiddleware.php b/src/Instrumentation/Symfony/Messenger/TraceableMessengerMiddleware.php index c7beb570..4a5c3015 100644 --- a/src/Instrumentation/Symfony/Messenger/TraceableMessengerMiddleware.php +++ b/src/Instrumentation/Symfony/Messenger/TraceableMessengerMiddleware.php @@ -30,6 +30,9 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope try { return $stack->next()->handle($envelope, $stack); + } catch (\Throwable $exception) { + $stack->stop($exception); + throw $exception; } finally { $stack->stop(); } diff --git a/src/Instrumentation/Symfony/Messenger/TraceableMessengerStack.php b/src/Instrumentation/Symfony/Messenger/TraceableMessengerStack.php index fa751dc9..0a87b7af 100644 --- a/src/Instrumentation/Symfony/Messenger/TraceableMessengerStack.php +++ b/src/Instrumentation/Symfony/Messenger/TraceableMessengerStack.php @@ -2,18 +2,35 @@ namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger; +use OpenTelemetry\API\Trace\SpanInterface; use OpenTelemetry\API\Trace\SpanKind; use OpenTelemetry\API\Trace\StatusCode; use OpenTelemetry\API\Trace\TracerInterface; use OpenTelemetry\Context\Context; -use OpenTelemetry\SDK\Trace\Span; +use OpenTelemetry\Context\ContextInterface; +use OpenTelemetry\Context\ContextStorageScopeInterface; use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\Middleware\MiddlewareInterface; use Symfony\Component\Messenger\Middleware\StackInterface; +/** + * Wraps a middleware stack to create one span per middleware in the chain. + * + * Follows a "stop previous, start new" pattern (like Symfony's own TraceableStack + * with its Stopwatch): each call to next() closes the span/scope from the prior + * call before opening a new one, so at most one scope is active at any time. + * + * A simpler alternative would be to drop this class entirely and wrap the whole + * $stack->next()->handle() chain in a single span inside TraceableMessengerMiddleware + * (one activate/detach pair in try/finally). That removes per-middleware timing + * granularity but eliminates scope management complexity. + */ class TraceableMessengerStack implements StackInterface { private ?string $currentEvent = null; + private ?ContextStorageScopeInterface $currentScope = null; + private ?SpanInterface $currentSpan = null; + private ?ContextInterface $parentContext = null; public function __construct( private TracerInterface $tracer, @@ -26,36 +43,31 @@ public function __construct( public function next(): MiddlewareInterface { - $scope = Context::storage()->scope(); - if (null !== $scope) { - $this->logger?->debug(sprintf('Using scope "%s"', spl_object_id($scope))); - } else { - $this->logger?->debug('No active scope'); + // "Stop previous" — close the span/scope from the prior next() call + if (null !== $this->currentScope) { + $this->logger?->debug(sprintf('Detaching scope "%s"', spl_object_id($this->currentScope))); + $this->currentScope->detach(); + $this->currentScope = null; + + if (null !== $this->currentSpan) { + $this->currentSpan->setStatus(StatusCode::STATUS_OK); + $this->logger?->debug(sprintf('Ending span "%s"', $this->currentSpan->getContext()->getSpanId())); + $this->currentSpan->end(); + $this->currentSpan = null; + } } - /* if (null !== $scope) { - $span = Span::fromContext($scope->context()); - - if ($span->isRecording()) { - $scope->detach(); + // Capture the parent context once (on the first call) + $this->parentContext ??= Context::getCurrent(); - $span->setStatus(StatusCode::STATUS_OK); - $this->logger?->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId())); - $span->end(); - } - }*/ - - $spanBuilder = $this->tracer + // "Start new" + $span = $this->tracer ->spanBuilder('messenger.middleware') ->setSpanKind(SpanKind::KIND_INTERNAL) - ->setParent($scope?->context()) - ->setAttribute('event.category', $this->eventCategory) - ->setAttribute('bus.name', $this->busName) - ; - - $parent = Context::getCurrent(); - - $span = $spanBuilder->setParent($parent)->startSpan(); + ->setParent($this->parentContext) + ->setAttribute('symfony.messenger.event.category', $this->eventCategory) + ->setAttribute('symfony.messenger.bus.name', $this->busName) + ->startSpan(); $this->logger?->debug(sprintf('Starting span "%s"', $span->getContext()->getSpanId())); @@ -66,32 +78,44 @@ public function next(): MiddlewareInterface } $this->currentEvent .= sprintf(' on "%s"', $this->busName); - $span->setAttribute('event.current', $this->currentEvent); + $span->setAttribute('symfony.messenger.event.current', $this->currentEvent); - $context = $span->storeInContext($parent); - Context::storage()->attach($context); + $context = $span->storeInContext($this->parentContext); + $this->currentScope = Context::storage()->attach($context); + $this->currentSpan = $span; return $nextMiddleware; } - public function stop(): void + public function stop(?\Throwable $throwable = null): void { - $scope = Context::storage()->scope(); - if (null === $scope) { - return; + if (null !== $this->currentScope) { + $this->logger?->debug(sprintf('Detaching scope "%s"', spl_object_id($this->currentScope))); + $this->currentScope->detach(); + $this->currentScope = null; } - $scope->detach(); + if (null !== $this->currentSpan) { + if (null !== $throwable) { + $this->currentSpan->recordException($throwable); + $this->currentSpan->setStatus(StatusCode::STATUS_ERROR, $throwable->getMessage()); + } else { + $this->currentSpan->setStatus(StatusCode::STATUS_OK); + } + $this->logger?->debug(sprintf('Ending span "%s"', $this->currentSpan->getContext()->getSpanId())); + $this->currentSpan->end(); + $this->currentSpan = null; + } - $span = Span::fromContext($scope->context()); - $span->setStatus(StatusCode::STATUS_OK); - $this->logger?->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId())); - $span->end(); $this->currentEvent = null; } public function __clone() { $this->stack = clone $this->stack; + $this->currentScope = null; + $this->currentSpan = null; + $this->parentContext = null; + $this->currentEvent = null; } } diff --git a/src/Instrumentation/Symfony/Messenger/WorkerMessageEventSubscriber.php b/src/Instrumentation/Symfony/Messenger/WorkerMessageEventSubscriber.php new file mode 100644 index 00000000..63d1f4e1 --- /dev/null +++ b/src/Instrumentation/Symfony/Messenger/WorkerMessageEventSubscriber.php @@ -0,0 +1,212 @@ + */ + private readonly ServiceLocator $tracerLocator, + private readonly TraceStampPropagator $traceStampPropagator, + private readonly ?LoggerInterface $logger = null, + ) { + } + + public function setInstrumentationType(InstrumentationTypeEnum $type): void + { + $this->instrumentationType = $type; + } + + public static function getSubscribedEvents(): array + { + return [ + WorkerMessageReceivedEvent::class => [ + ['startSpan', 10000], + ], + WorkerMessageFailedEvent::class => [ + ['endSpanOnError', -10000], + ], + WorkerMessageHandledEvent::class => [ + ['endSpanWithSuccess', -10000], + ], + ]; + } + + /** + * @return class-string[] + */ + public static function getSubscribedServices(): array + { + return [TracerInterface::class]; + } + + public function startSpan(WorkerMessageReceivedEvent $event): void + { + $message = $event->getEnvelope()->getMessage(); + $traceable = $this->parseAttribute($message); + + if (!$this->isAutoTraceable() && !$this->isAttributeTraceable($traceable)) { + $this->logger?->debug(sprintf('Message "%s" is not traceable, skipping span creation', get_class($message))); + + return; + } + + // Clean up any lingering scope from a previous message that was not + // properly ended (e.g. an exception in another high-priority subscriber + // prevented the handled/failed event from firing). + $previousScope = Context::storage()->scope(); + if (null !== $previousScope) { + $previousScope->detach(); + $orphanedSpan = Span::fromContext($previousScope->context()); + $orphanedSpan->setStatus(StatusCode::STATUS_ERROR, 'Span was not properly ended'); + $orphanedSpan->end(); + $this->logger?->warning(sprintf('Cleaned up orphaned span "%s"', $orphanedSpan->getContext()->getSpanId())); + } + + // ensure propagation from incoming trace + $context = $this->propagator->extract($event->getEnvelope(), $this->traceStampPropagator); + + $messageClass = get_class($message); + + $span = $this->getTracer($traceable) + ->spanBuilder(sprintf('%s %s', $event->getReceiverName(), $messageClass)) + ->setParent($context) + ->setSpanKind(SpanKind::KIND_CONSUMER) + ->setAttribute('messaging.operation.type', 'process') + ->setAttribute('messaging.destination.name', $event->getReceiverName()) + ->startSpan(); + + $busNameStamp = $event->getEnvelope()->last(BusNameStamp::class); + + if (null !== $busNameStamp) { + $span->setAttribute('symfony.messenger.bus.name', $busNameStamp->getBusName()); + } + + $this->logger?->debug(sprintf('Starting span "%s"', $span->getContext()->getSpanId())); + + Context::storage() + ->attach( + $span->storeInContext($context) + ) + ; + } + + public function endSpanWithSuccess(WorkerMessageHandledEvent $event): void + { + $scope = Context::storage()->scope(); + + if (null === $scope) { + $this->logger?->debug('No active scope'); + + return; + } + + $scope->detach(); + + $span = Span::fromContext($scope->context()); + $span->setStatus(StatusCode::STATUS_OK); + $this->logger?->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId())); + $span->end(); + } + + public function endSpanOnError(WorkerMessageFailedEvent $event): void + { + $scope = Context::storage()->scope(); + + if (null === $scope) { + $this->logger?->debug('No active scope'); + + return; + } + + $scope->detach(); + + $span = Span::fromContext($scope->context()); + + $span->setAttribute('symfony.messenger.will_retry', $event->willRetry()); + + $redeliveryStamp = $event->getEnvelope()->last(RedeliveryStamp::class); + if (null !== $redeliveryStamp) { + $span->setAttribute('symfony.messenger.retry_count', $redeliveryStamp->getRetryCount()); + } + + $exception = $event->getThrowable(); + + if ($exception instanceof WrappedExceptionsInterface) { + foreach ($exception->getWrappedExceptions() as $nestedException) { + $span->recordException($nestedException); + } + } else { + $span->recordException($exception); + } + + $span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage()); + + $this->logger?->debug(sprintf('Ending span "%s"', $span->getContext()->getSpanId())); + $span->end(); + } + + private function parseAttribute(object $message): ?Traceable + { + $reflection = new \ReflectionClass($message); + $attribute = $reflection->getAttributes(Traceable::class)[0] ?? null; + + return $attribute?->newInstance(); + } + + private function getTracer(?Traceable $traceable): TracerInterface + { + if (null !== $traceable?->tracer) { + if (!$this->tracerLocator->has($traceable->tracer)) { + $this->logger?->warning(sprintf('Tracer "%s" not found in service locator, using default tracer', $traceable->tracer)); + + return $this->tracer; + } + + return $this->tracerLocator->get($traceable->tracer); + } + + return $this->tracer; + } + + private function isAutoTraceable(): bool + { + return InstrumentationTypeEnum::Auto === $this->instrumentationType; + } + + private function isAttributeTraceable(?Traceable $traceable): bool + { + return InstrumentationTypeEnum::Attribute === $this->instrumentationType + && null !== $traceable; + } +} diff --git a/src/OpenTelemetry/Context/Propagator/PropagatorFactory.php b/src/OpenTelemetry/Context/Propagator/PropagatorFactory.php new file mode 100644 index 00000000..49fabbd0 --- /dev/null +++ b/src/OpenTelemetry/Context/Propagator/PropagatorFactory.php @@ -0,0 +1,21 @@ +with(new TraceStamp($value)); + $this->logger?->debug("Trace stamp added to envelope for propagation with traceparent: $value"); + + return; + } + + if (TraceContextPropagator::TRACESTATE === $key) { + $existing = $carrier->last(TraceStamp::class); + + if (null === $existing) { + return; + } + + $carrier = $carrier->with(new TraceStamp($existing->getTraceParent(), $value)); + $this->logger?->debug("Trace stamp updated with tracestate: $value"); + } + } + + public function keys($carrier): array + { + if (!$carrier instanceof Envelope) { + throw new \InvalidArgumentException(sprintf('The carrier for trace stamp propagation must be instance of %s', Envelope::class)); + } + + return [TraceContextPropagator::TRACEPARENT, TraceContextPropagator::TRACESTATE]; + } + + public function get($carrier, string $key): ?string + { + if (!$carrier instanceof Envelope) { + throw new \InvalidArgumentException(sprintf('The carrier for trace stamp propagation must be instance of %s', Envelope::class)); + } + + $traceStamp = $carrier->last(TraceStamp::class); + + if (null === $traceStamp) { + return null; + } + + if (TraceContextPropagator::TRACEPARENT === $key) { + $traceParent = $traceStamp->getTraceParent(); + $this->logger?->debug("Get trace parent from TraceStamp with value: $traceParent"); + + return $traceParent; + } + + if (TraceContextPropagator::TRACESTATE === $key) { + return $traceStamp->getTraceState(); + } + + return null; + } +} diff --git a/src/Resources/config/services.php b/src/Resources/config/services.php index f09736e4..c4a7f21c 100644 --- a/src/Resources/config/services.php +++ b/src/Resources/config/services.php @@ -1,6 +1,7 @@ set('open_telemetry.propagator_text_map.noop', NoopTextMapPropagator::class) ->set('open_telemetry.propagator_text_map.multi', MultiTextMapPropagator::class) + ->factory([PropagatorFactory::class, 'createDefault']) ->set('open_telemetry.propagation_getter.headers', HeadersPropagationGetter::class) ->set('open_telemetry.propagation_getter.sanitize_combined_headers', SanitizeCombinedHeadersPropagationGetter::class) diff --git a/src/Resources/config/services_tracing_instrumentation.php b/src/Resources/config/services_tracing_instrumentation.php index 94d9a7e0..848f7e41 100644 --- a/src/Resources/config/services_tracing_instrumentation.php +++ b/src/Resources/config/services_tracing_instrumentation.php @@ -9,10 +9,13 @@ use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\HttpKernel\TraceableHttpKernelEventSubscriber; use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Mailer\TraceableMailer; use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Mailer\TraceableMailerTransport; +use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger\AddStampForPropagationMiddleware; use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger\TraceableMessengerMiddleware; use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger\TraceableMessengerTransport; use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger\TraceableMessengerTransportFactory; +use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Symfony\Messenger\WorkerMessageEventSubscriber; use FriendsOfOpenTelemetry\OpenTelemetryBundle\Instrumentation\Twig\TraceableTwigExtension; +use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Context\Propagator\TraceStampPropagator; use Symfony\Component\DependencyInjection\Loader\Configurator\ContainerConfigurator; use function Symfony\Component\DependencyInjection\Loader\Configurator\service; @@ -94,7 +97,6 @@ ->arg('$tracer', service('open_telemetry.traces.default_tracer')) ->arg('$transportFactory', service('messenger.transport_factory')) ->tag('messenger.transport_factory') - ->tag('kernel.reset', ['method' => 'reset']) ->tag('monolog.logger', ['channel' => 'open_telemetry']) ->alias('messenger.transport.open_telemetry_tracer.factory', 'open_telemetry.instrumentation.messenger.trace.transport_factory') @@ -104,6 +106,23 @@ ->tag('monolog.logger', ['channel' => 'open_telemetry']) ->alias('messenger.middleware.open_telemetry_tracer', 'open_telemetry.instrumentation.messenger.trace.middleware') + ->set('open_telemetry.instrumentation.messenger.trace_stamp_propagator', TraceStampPropagator::class) + ->tag('monolog.logger', ['channel' => 'open_telemetry']) + + ->set('open_telemetry.instrumentation.messenger.propagation.middleware', AddStampForPropagationMiddleware::class) + ->arg('$propagator', service('open_telemetry.propagator_text_map.multi')) + ->arg('$traceStampPropagator', service('open_telemetry.instrumentation.messenger.trace_stamp_propagator')) + ->tag('messenger.middleware') + ->tag('monolog.logger', ['channel' => 'open_telemetry']) + + ->set('open_telemetry.instrumentation.messenger.worker', WorkerMessageEventSubscriber::class) + ->arg('$propagator', service('open_telemetry.propagator_text_map.multi')) + ->arg('$tracer', service('open_telemetry.traces.default_tracer')) + ->arg('$traceStampPropagator', service('open_telemetry.instrumentation.messenger.trace_stamp_propagator')) + ->tag('kernel.event_subscriber') + ->tag('container.service_subscriber') + ->tag('monolog.logger', ['channel' => 'open_telemetry']) + // Twig ->set('open_telemetry.instrumentation.twig.trace.extension', TraceableTwigExtension::class) ->arg('$tracer', service('open_telemetry.traces.default_tracer')) diff --git a/tests/Functional/Application/config/packages/framework.yaml b/tests/Functional/Application/config/packages/framework.yaml index df6c7723..e5d6d7ec 100644 --- a/tests/Functional/Application/config/packages/framework.yaml +++ b/tests/Functional/Application/config/packages/framework.yaml @@ -11,7 +11,10 @@ framework: system: cache.adapter.array messenger: transports: - main: 'trace(in-memory://default)' + main: + dsn: 'trace(in-memory://default)' + retry_strategy: + max_retries: 0 routing: 'App\Message\*': main buses: diff --git a/tests/Functional/Application/config/packages/open_telemetry.yaml b/tests/Functional/Application/config/packages/open_telemetry.yaml index 407376c6..38bcd6b1 100644 --- a/tests/Functional/Application/config/packages/open_telemetry.yaml +++ b/tests/Functional/Application/config/packages/open_telemetry.yaml @@ -30,6 +30,7 @@ open_telemetry: tracing: enabled: true messenger: + type: attribute tracing: enabled: true twig: @@ -111,6 +112,10 @@ when@auto: enabled: true exclude_paths: - ^/auto-exclude$ + messenger: + type: auto + tracing: + enabled: true when@empty_excludes: open_telemetry: instrumentation: diff --git a/tests/Functional/Application/src/Message/FallbackTracerMessage.php b/tests/Functional/Application/src/Message/FallbackTracerMessage.php new file mode 100644 index 00000000..23121467 --- /dev/null +++ b/tests/Functional/Application/src/Message/FallbackTracerMessage.php @@ -0,0 +1,14 @@ +name; + } +} diff --git a/tests/Functional/Application/src/MessageHandler/TraceableMessageHandler.php b/tests/Functional/Application/src/MessageHandler/TraceableMessageHandler.php new file mode 100644 index 00000000..f5bd93ed --- /dev/null +++ b/tests/Functional/Application/src/MessageHandler/TraceableMessageHandler.php @@ -0,0 +1,15 @@ +name; + } +} diff --git a/tests/Functional/Instrumentation/HttpClientTracingTest.php b/tests/Functional/Instrumentation/HttpClientTracingTest.php index a7e1d9cd..30b91499 100644 --- a/tests/Functional/Instrumentation/HttpClientTracingTest.php +++ b/tests/Functional/Instrumentation/HttpClientTracingTest.php @@ -29,7 +29,7 @@ public function testOk(): void 'url' => 'http://localhost/ok', 'http_code' => 200, 'http_method' => 'GET', - 'response_headers' => ['Content-Type' => 'application/json'], + 'response_headers' => ['Content-Type' => 'application/json', 'Content-Length' => '16'], ])), self::getContainer()->get('open_telemetry.traces.tracers.main'), new Psr17Factory(), @@ -39,7 +39,7 @@ public function testOk(): void self::assertSame(Response::HTTP_OK, $response->getStatusCode()); self::assertSame('{"status": "ok"}', $response->getContent()); - self::assertSame(['content-type' => ['application/json']], $response->getHeaders()); + self::assertSame(['content-type' => ['application/json'], 'content-length' => ['16']], $response->getHeaders()); self::assertSpansCount(1); @@ -53,6 +53,7 @@ public function testOk(): void 'url.query' => '', 'url.fragment' => '', 'http.request.method' => 'GET', + 'http.response.body.size' => '16', 'http.response.status_code' => 200, ]); self::assertSpanEventsCount($mainSpan, 0); diff --git a/tests/Functional/Instrumentation/MailerTracingTest.php b/tests/Functional/Instrumentation/MailerTracingTest.php index f7b660ed..0b18aaf3 100644 --- a/tests/Functional/Instrumentation/MailerTracingTest.php +++ b/tests/Functional/Instrumentation/MailerTracingTest.php @@ -32,25 +32,45 @@ public function testSend(): void $mailer->send($email); self::assertEmailCount(1); - self::assertSpansCount(3); + self::assertSpansCount(5); + + $sendMiddlewareSpan = self::getSpans()[0]; + self::assertSpanName($sendMiddlewareSpan, 'messenger.middleware'); + self::assertSpanStatus($sendMiddlewareSpan, StatusData::ok()); + self::assertSpanAttributes($sendMiddlewareSpan, [ + 'symfony.messenger.event.category' => 'messenger.middleware', + 'symfony.messenger.bus.name' => 'default', + 'symfony.messenger.event.current' => '"Symfony\Component\Messenger\Middleware\SendMessageMiddleware" on "default"', + ]); + self::assertSpanEventsCount($sendMiddlewareSpan, 0); - $transportSpan = self::getSpans()[0]; + $transportSpan = self::getSpans()[1]; self::assertSpanName($transportSpan, 'mailer.transport.send'); self::assertSpanStatus($transportSpan, StatusData::unset()); self::assertSpanAttributes($transportSpan, []); self::assertSpanEventsCount($transportSpan, 0); - $middlewareSpan = self::getSpans()[1]; - self::assertSpanName($middlewareSpan, 'messenger.middleware'); - self::assertSpanStatus($middlewareSpan, StatusData::ok()); - self::assertSpanAttributes($middlewareSpan, [ - 'event.category' => 'messenger.middleware', - 'bus.name' => 'default', - 'event.current' => '"Symfony\Component\Messenger\Middleware\StackMiddleware" on "default"', + $handleMiddlewareSpan = self::getSpans()[2]; + self::assertSpanName($handleMiddlewareSpan, 'messenger.middleware'); + self::assertSpanStatus($handleMiddlewareSpan, StatusData::ok()); + self::assertSpanAttributes($handleMiddlewareSpan, [ + 'symfony.messenger.event.category' => 'messenger.middleware', + 'symfony.messenger.bus.name' => 'default', + 'symfony.messenger.event.current' => '"Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" on "default"', + ]); + self::assertSpanEventsCount($handleMiddlewareSpan, 0); + + $tailMiddlewareSpan = self::getSpans()[3]; + self::assertSpanName($tailMiddlewareSpan, 'messenger.middleware'); + self::assertSpanStatus($tailMiddlewareSpan, StatusData::ok()); + self::assertSpanAttributes($tailMiddlewareSpan, [ + 'symfony.messenger.event.category' => 'messenger.middleware', + 'symfony.messenger.bus.name' => 'default', + 'symfony.messenger.event.current' => '"Symfony\Component\Messenger\Middleware\StackMiddleware" on "default"', ]); - self::assertSpanEventsCount($middlewareSpan, 0); + self::assertSpanEventsCount($tailMiddlewareSpan, 0); - $mainSpan = self::getSpans()[2]; + $mainSpan = self::getSpans()[4]; self::assertSpanName($mainSpan, 'mailer.send'); self::assertSpanStatus($mainSpan, StatusData::unset()); self::assertSpanAttributes($mainSpan, []); @@ -80,25 +100,35 @@ public function testSendException(): void } self::assertEmailCount(1); - self::assertSpansCount(3); + self::assertSpansCount(4); + + $sendMiddlewareSpan = self::getSpans()[0]; + self::assertSpanName($sendMiddlewareSpan, 'messenger.middleware'); + self::assertSpanStatus($sendMiddlewareSpan, StatusData::ok()); + self::assertSpanAttributes($sendMiddlewareSpan, [ + 'symfony.messenger.event.category' => 'messenger.middleware', + 'symfony.messenger.bus.name' => 'default', + 'symfony.messenger.event.current' => '"Symfony\Component\Messenger\Middleware\SendMessageMiddleware" on "default"', + ]); + self::assertSpanEventsCount($sendMiddlewareSpan, 0); - $transportSpan = self::getSpans()[0]; + $transportSpan = self::getSpans()[1]; self::assertSpanName($transportSpan, 'mailer.transport.send'); self::assertSpanStatus($transportSpan, new StatusData(StatusCode::STATUS_ERROR, 'Connection could not be established with host "localhost:25": stream_socket_client(): Unable to connect to localhost:25 (Connection refused)')); self::assertSpanAttributes($transportSpan, []); self::assertSpanEventsCount($transportSpan, 1); - $middlewareSpan = self::getSpans()[1]; - self::assertSpanName($middlewareSpan, 'messenger.middleware'); - self::assertSpanStatus($middlewareSpan, StatusData::ok()); - self::assertSpanAttributes($middlewareSpan, [ - 'event.category' => 'messenger.middleware', - 'bus.name' => 'default', - 'event.current' => '"Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" on "default"', + $handleMiddlewareSpan = self::getSpans()[2]; + self::assertSpanName($handleMiddlewareSpan, 'messenger.middleware'); + self::assertSpanStatus($handleMiddlewareSpan, new StatusData(StatusCode::STATUS_ERROR, 'Handling "Symfony\Component\Mailer\Messenger\SendEmailMessage" failed: Connection could not be established with host "localhost:25": stream_socket_client(): Unable to connect to localhost:25 (Connection refused)')); + self::assertSpanAttributes($handleMiddlewareSpan, [ + 'symfony.messenger.event.category' => 'messenger.middleware', + 'symfony.messenger.bus.name' => 'default', + 'symfony.messenger.event.current' => '"Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" on "default"', ]); - self::assertSpanEventsCount($middlewareSpan, 0); + self::assertSpanEventsCount($handleMiddlewareSpan, 1); - $mainSpan = self::getSpans()[2]; + $mainSpan = self::getSpans()[3]; self::assertSpanName($mainSpan, 'mailer.send'); self::assertSpanStatus($mainSpan, new StatusData(StatusCode::STATUS_ERROR, 'Connection could not be established with host "localhost:25": stream_socket_client(): Unable to connect to localhost:25 (Connection refused)')); self::assertSpanAttributes($mainSpan, []); diff --git a/tests/Functional/Instrumentation/MessengerTracingTest.php b/tests/Functional/Instrumentation/Messenger/MessengerDispatchTracingTest.php similarity index 76% rename from tests/Functional/Instrumentation/MessengerTracingTest.php rename to tests/Functional/Instrumentation/Messenger/MessengerDispatchTracingTest.php index 0a502939..ffde1a4f 100644 --- a/tests/Functional/Instrumentation/MessengerTracingTest.php +++ b/tests/Functional/Instrumentation/Messenger/MessengerDispatchTracingTest.php @@ -1,6 +1,6 @@ 'messenger.middleware', - 'bus.name' => 'default', - 'event.current' => '"Symfony\Component\Messenger\Middleware\SendMessageMiddleware" on "default"', + 'symfony.messenger.event.category' => 'messenger.middleware', + 'symfony.messenger.bus.name' => 'default', + 'symfony.messenger.event.current' => '"Symfony\Component\Messenger\Middleware\SendMessageMiddleware" on "default"', ]); self::assertSpanEventsCount($middlewareSpan, 0); } - public function testException(): void + public function testDispatchOfAsyncExceptionMessage(): void { $this->bus->dispatch(new ExceptionMessage('test')); @@ -64,9 +64,9 @@ public function testException(): void self::assertSpanName($middlewareSpan, 'messenger.middleware'); self::assertSpanStatus($middlewareSpan, StatusData::ok()); self::assertSpanAttributes($middlewareSpan, [ - 'event.category' => 'messenger.middleware', - 'bus.name' => 'default', - 'event.current' => '"Symfony\Component\Messenger\Middleware\SendMessageMiddleware" on "default"', + 'symfony.messenger.event.category' => 'messenger.middleware', + 'symfony.messenger.bus.name' => 'default', + 'symfony.messenger.event.current' => '"Symfony\Component\Messenger\Middleware\SendMessageMiddleware" on "default"', ]); self::assertSpanEventsCount($middlewareSpan, 0); } diff --git a/tests/Functional/Instrumentation/Messenger/MessengerPropagationTracingTest.php b/tests/Functional/Instrumentation/Messenger/MessengerPropagationTracingTest.php new file mode 100644 index 00000000..05ff6397 --- /dev/null +++ b/tests/Functional/Instrumentation/Messenger/MessengerPropagationTracingTest.php @@ -0,0 +1,104 @@ +eventDispatcher = self::getContainer()->get('event_dispatcher'); + } + + public function testWorkerSpanIsChildOfProducerTrace(): void + { + /** @var TracerInterface $tracer */ + $tracer = self::getContainer()->get('open_telemetry.traces.tracers.main'); + + // Simulate a producer context (e.g. an HTTP request or CLI command) + $producerSpan = $tracer->spanBuilder('producer-request')->startSpan(); + $producerScope = $producerSpan->activate(); + + // Inject current trace context into an envelope via the propagator + $envelope = new Envelope(new DummyMessage('propagation-test'), [new BusNameStamp('messenger.bus.default')]); + + /** @var MultiTextMapPropagator $propagator */ + $propagator = self::getContainer()->get('open_telemetry.propagator_text_map.multi'); + + /** @var TraceStampPropagator $traceStampPropagator */ + $traceStampPropagator = self::getContainer()->get('open_telemetry.instrumentation.messenger.trace_stamp_propagator'); + $propagator->inject($envelope, $traceStampPropagator, Context::getCurrent()); + + // End the producer context before worker processing + $producerScope->detach(); + $producerSpan->end(); + + // Assert the TraceStamp was injected + $traceStamp = $envelope->last(TraceStamp::class); + self::assertNotNull($traceStamp, 'TraceStamp should be present on the envelope after injection'); + self::assertNotEmpty($traceStamp->getTraceParent()); + + // Simulate worker processing with the stamped envelope + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main')); + $this->eventDispatcher->dispatch(new WorkerMessageHandledEvent($envelope, 'main')); + + // Find the producer and consumer spans + $spans = self::getSpans(); + self::assertCount(2, $spans); + + $producerSpanData = null; + $consumerSpanData = null; + foreach ($spans as $span) { + if ('producer-request' === $span->getName()) { + $producerSpanData = $span; + } + if (SpanKind::KIND_CONSUMER === $span->getKind()) { + $consumerSpanData = $span; + } + } + + self::assertNotNull($producerSpanData, 'Producer span should exist'); + self::assertNotNull($consumerSpanData, 'Consumer span should exist'); + + // The consumer span must belong to the same trace + self::assertSame( + $producerSpanData->getContext()->getTraceId(), + $consumerSpanData->getContext()->getTraceId(), + 'Worker span should belong to the same trace as the producer span' + ); + + // The consumer span must be a direct child of the producer span + self::assertSame( + $producerSpanData->getContext()->getSpanId(), + $consumerSpanData->getParentSpanId(), + 'Worker span should be a child of the producer span' + ); + + self::assertSpanStatus($consumerSpanData, StatusData::ok()); + } +} diff --git a/tests/Functional/Instrumentation/Messenger/MessengerTransportTracingTest.php b/tests/Functional/Instrumentation/Messenger/MessengerTransportTracingTest.php new file mode 100644 index 00000000..93040a13 --- /dev/null +++ b/tests/Functional/Instrumentation/Messenger/MessengerTransportTracingTest.php @@ -0,0 +1,69 @@ +transport = self::getContainer()->get('messenger.transport.main'); + } + + public function testGetCreatesSpan(): void + { + $this->transport->send(new Envelope(new DummyMessage('test'))); + iterator_to_array($this->transport->get()); + + self::assertSpansCount(2); + + $sendSpan = self::getSpans()[0]; + self::assertSpanName($sendSpan, 'messenger.transport.send'); + self::assertSpanStatus($sendSpan, StatusData::unset()); + + $getSpan = self::getSpans()[1]; + self::assertSpanName($getSpan, 'messenger.transport.get'); + self::assertSpanStatus($getSpan, StatusData::unset()); + } + + public function testAckCreatesSpan(): void + { + $this->transport->send(new Envelope(new DummyMessage('test'))); + $envelopes = iterator_to_array($this->transport->get()); + + $this->transport->ack($envelopes[0]); + + self::assertSpansCount(3); + + self::assertSpanName(self::getSpans()[2], 'messenger.transport.ack'); + self::assertSpanStatus(self::getSpans()[2], StatusData::unset()); + } + + public function testRejectCreatesSpan(): void + { + $this->transport->send(new Envelope(new DummyMessage('test'))); + $envelopes = iterator_to_array($this->transport->get()); + + $this->transport->reject($envelopes[0]); + + self::assertSpansCount(3); + + self::assertSpanName(self::getSpans()[2], 'messenger.transport.reject'); + self::assertSpanStatus(self::getSpans()[2], StatusData::unset()); + } +} diff --git a/tests/Functional/Instrumentation/Messenger/MessengerWorkerAttributeTracingTest.php b/tests/Functional/Instrumentation/Messenger/MessengerWorkerAttributeTracingTest.php new file mode 100644 index 00000000..cedbe458 --- /dev/null +++ b/tests/Functional/Instrumentation/Messenger/MessengerWorkerAttributeTracingTest.php @@ -0,0 +1,82 @@ +eventDispatcher = self::getContainer()->get('event_dispatcher'); + } + + public function testAttributeModeIgnoresNonTraceableMessage(): void + { + $envelope = new Envelope(new DummyMessage('test')); + + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main')); + $this->eventDispatcher->dispatch(new WorkerMessageHandledEvent($envelope, 'main')); + + self::assertSpansCount(0); + } + + public function testAttributeModeCreatesSpanForTraceableMessage(): void + { + $envelope = new Envelope(new TraceableMessage('test')); + + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main')); + $this->eventDispatcher->dispatch(new WorkerMessageHandledEvent($envelope, 'main')); + + self::assertSpansCount(1); + + $span = self::getSpans()[0]; + self::assertSpanName($span, 'main App\Message\TraceableMessage'); + self::assertSpanStatus($span, StatusData::ok()); + self::assertSame(SpanKind::KIND_CONSUMER, $span->getKind()); + self::assertSpanAttributesSubSet($span, [ + 'messaging.operation.type' => 'process', + 'messaging.destination.name' => 'main', + ]); + } + + public function testAttributeModeWithCustomTracer(): void + { + $envelope = new Envelope(new FallbackTracerMessage('test')); + + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main')); + $this->eventDispatcher->dispatch(new WorkerMessageHandledEvent($envelope, 'main')); + + self::assertSpansCount(0); + + self::assertSpansCount(1, 'open_telemetry.traces.exporters.fallback'); + + $span = self::getSpans('open_telemetry.traces.exporters.fallback')[0]; + self::assertSpanName($span, 'main App\Message\FallbackTracerMessage'); + self::assertSpanStatus($span, StatusData::ok()); + self::assertSame(SpanKind::KIND_CONSUMER, $span->getKind()); + self::assertSpanAttributesSubSet($span, [ + 'messaging.operation.type' => 'process', + 'messaging.destination.name' => 'main', + ]); + } +} diff --git a/tests/Functional/Instrumentation/Messenger/MessengerWorkerTracingTest.php b/tests/Functional/Instrumentation/Messenger/MessengerWorkerTracingTest.php new file mode 100644 index 00000000..76305412 --- /dev/null +++ b/tests/Functional/Instrumentation/Messenger/MessengerWorkerTracingTest.php @@ -0,0 +1,223 @@ +eventDispatcher = self::getContainer()->get('event_dispatcher'); + } + + public function testWorkerMessageHandled(): void + { + $envelope = new Envelope(new DummyMessage('test'), [new BusNameStamp('messenger.bus.default')]); + + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main')); + $this->eventDispatcher->dispatch(new WorkerMessageHandledEvent($envelope, 'main')); + + self::assertSpansCount(1); + + $span = self::getSpans()[0]; + self::assertSpanName($span, 'main App\Message\DummyMessage'); + self::assertSpanStatus($span, StatusData::ok()); + self::assertSame(SpanKind::KIND_CONSUMER, $span->getKind()); + self::assertSpanAttributesSubSet($span, [ + 'messaging.operation.type' => 'process', + 'messaging.destination.name' => 'main', + 'symfony.messenger.bus.name' => 'messenger.bus.default', + ]); + self::assertSpanEventsCount($span, 0); + } + + public function testWorkerMessageHandledWithoutBusNameStamp(): void + { + $envelope = new Envelope(new DummyMessage('test')); + + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main')); + $this->eventDispatcher->dispatch(new WorkerMessageHandledEvent($envelope, 'main')); + + self::assertSpansCount(1); + + $span = self::getSpans()[0]; + self::assertSpanName($span, 'main App\Message\DummyMessage'); + self::assertSpanStatus($span, StatusData::ok()); + self::assertSpanAttributes($span, [ + 'messaging.operation.type' => 'process', + 'messaging.destination.name' => 'main', + ]); + } + + public function testWorkerMessageFailed(): void + { + $envelope = new Envelope(new DummyMessage('test'), [new BusNameStamp('messenger.bus.default')]); + $exception = new \RuntimeException('Something went wrong'); + + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main')); + $this->eventDispatcher->dispatch(new WorkerMessageFailedEvent($envelope, 'main', $exception)); + + self::assertSpansCount(1); + + $span = self::getSpans()[0]; + self::assertSpanName($span, 'main App\Message\DummyMessage'); + self::assertSpanStatus($span, new StatusData(StatusCode::STATUS_ERROR, 'Something went wrong')); + self::assertSame(SpanKind::KIND_CONSUMER, $span->getKind()); + self::assertSpanAttributesSubSet($span, [ + 'messaging.operation.type' => 'process', + 'messaging.destination.name' => 'main', + 'symfony.messenger.bus.name' => 'messenger.bus.default', + 'symfony.messenger.will_retry' => false, + ]); + self::assertSpanEventsCount($span, 1); + + $exceptionEvent = $span->getEvents()[0]; + self::assertSpanEventName($exceptionEvent, 'exception'); + self::assertSpanEventAttributesSubSet($exceptionEvent, [ + 'exception.type' => 'RuntimeException', + 'exception.message' => 'Something went wrong', + ]); + } + + public function testWorkerMessageFailedWithPreviousException(): void + { + $envelope = new Envelope(new DummyMessage('test')); + $previous = new \LogicException('Root cause'); + $exception = new \RuntimeException('Something went wrong', 0, $previous); + + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main')); + $this->eventDispatcher->dispatch(new WorkerMessageFailedEvent($envelope, 'main', $exception)); + + self::assertSpansCount(1); + + $span = self::getSpans()[0]; + self::assertSpanStatus($span, new StatusData(StatusCode::STATUS_ERROR, 'Something went wrong')); + self::assertSpanEventsCount($span, 1); + + $exceptionEvent = $span->getEvents()[0]; + self::assertSpanEventName($exceptionEvent, 'exception'); + self::assertSpanEventAttributesSubSet($exceptionEvent, [ + 'exception.type' => 'RuntimeException', + 'exception.message' => 'Something went wrong', + ]); + } + + public function testWorkerMessageFailedWithHandlerFailedException(): void + { + $envelope = new Envelope(new DummyMessage('test')); + $nested1 = new \RuntimeException('Handler A failed'); + $nested2 = new \LogicException('Handler B failed'); + $exception = new HandlerFailedException($envelope, [$nested1, $nested2]); + + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main')); + $this->eventDispatcher->dispatch(new WorkerMessageFailedEvent($envelope, 'main', $exception)); + + self::assertSpansCount(1); + + $span = self::getSpans()[0]; + self::assertSpanStatus($span, new StatusData(StatusCode::STATUS_ERROR, $exception->getMessage())); + self::assertSpanEventsCount($span, 2); + + $event1 = $span->getEvents()[0]; + self::assertSpanEventName($event1, 'exception'); + self::assertSpanEventAttributesSubSet($event1, [ + 'exception.type' => 'RuntimeException', + 'exception.message' => 'Handler A failed', + ]); + + $event2 = $span->getEvents()[1]; + self::assertSpanEventName($event2, 'exception'); + self::assertSpanEventAttributesSubSet($event2, [ + 'exception.type' => 'LogicException', + 'exception.message' => 'Handler B failed', + ]); + } + + public function testWorkerMessageFailedWillRetry(): void + { + $envelope = new Envelope(new DummyMessage('test'), [new BusNameStamp('messenger.bus.default')]); + $exception = new \RuntimeException('Transient failure'); + + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main')); + + $failedEvent = new WorkerMessageFailedEvent($envelope, 'main', $exception); + $failedEvent->setForRetry(); + $this->eventDispatcher->dispatch($failedEvent); + + self::assertSpansCount(1); + + $span = self::getSpans()[0]; + self::assertSpanStatus($span, new StatusData(StatusCode::STATUS_ERROR, 'Transient failure')); + self::assertSpanAttributesSubSet($span, [ + 'symfony.messenger.will_retry' => true, + ]); + } + + public function testWorkerMessageFailedWithRedeliveryStamp(): void + { + $envelope = new Envelope(new DummyMessage('test'), [ + new BusNameStamp('messenger.bus.default'), + new RedeliveryStamp(3), + ]); + $exception = new \RuntimeException('Retry exhausted'); + + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope, 'main')); + $this->eventDispatcher->dispatch(new WorkerMessageFailedEvent($envelope, 'main', $exception)); + + self::assertSpansCount(1); + + $span = self::getSpans()[0]; + self::assertSpanStatus($span, new StatusData(StatusCode::STATUS_ERROR, 'Retry exhausted')); + self::assertSpanAttributesSubSet($span, [ + 'symfony.messenger.will_retry' => false, + 'symfony.messenger.retry_count' => 3, + ]); + } + + public function testLingeringScopeIsCleanedOnNextMessage(): void + { + $envelope1 = new Envelope(new DummyMessage('first')); + $envelope2 = new Envelope(new DummyMessage('second')); + + // First message starts a span but is never handled/failed + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope1, 'main')); + + // Second message arrives — should clean up the orphaned span + $this->eventDispatcher->dispatch(new WorkerMessageReceivedEvent($envelope2, 'main')); + $this->eventDispatcher->dispatch(new WorkerMessageHandledEvent($envelope2, 'main')); + + self::assertSpansCount(2); + + $orphanedSpan = self::getSpans()[0]; + self::assertSpanName($orphanedSpan, 'main App\Message\DummyMessage'); + self::assertSpanStatus($orphanedSpan, new StatusData(StatusCode::STATUS_ERROR, 'Span was not properly ended')); + + $normalSpan = self::getSpans()[1]; + self::assertSpanName($normalSpan, 'main App\Message\DummyMessage'); + self::assertSpanStatus($normalSpan, StatusData::ok()); + } +} diff --git a/tests/Unit/DependencyInjection/Compiler/TracerLocatorPassTest.php b/tests/Unit/DependencyInjection/Compiler/TracerLocatorPassTest.php index 06639da4..e312dcf0 100644 --- a/tests/Unit/DependencyInjection/Compiler/TracerLocatorPassTest.php +++ b/tests/Unit/DependencyInjection/Compiler/TracerLocatorPassTest.php @@ -17,6 +17,7 @@ protected function registerCompilerPass(ContainerBuilder $container): void $container->setDefinition('open_telemetry.instrumentation.console.trace.event_subscriber', new Definition()); $container->setDefinition('open_telemetry.instrumentation.http_kernel.trace.event_subscriber', new Definition()); + $container->setDefinition('open_telemetry.instrumentation.messenger.worker', new Definition()); } public function testNoTracerLocator(): void @@ -28,6 +29,9 @@ public function testNoTracerLocator(): void $httpKernel = $this->container->getDefinition('open_telemetry.instrumentation.http_kernel.trace.event_subscriber'); self::assertEquals([], $httpKernel->getArguments()); + + $messenger = $this->container->getDefinition('open_telemetry.instrumentation.messenger.worker'); + self::assertEquals([], $messenger->getArguments()); } public function testTracerLocator(): void @@ -42,5 +46,8 @@ public function testTracerLocator(): void $httpKernel = $this->container->getDefinition('open_telemetry.instrumentation.http_kernel.trace.event_subscriber'); self::assertArrayHasKey('$tracerLocator', $httpKernel->getArguments()); + + $messenger = $this->container->getDefinition('open_telemetry.instrumentation.messenger.worker'); + self::assertArrayHasKey('$tracerLocator', $messenger->getArguments()); } } diff --git a/tests/Unit/DependencyInjection/ConfigurationFormatTest.php b/tests/Unit/DependencyInjection/ConfigurationFormatTest.php index 6330ca33..45b139b4 100644 --- a/tests/Unit/DependencyInjection/ConfigurationFormatTest.php +++ b/tests/Unit/DependencyInjection/ConfigurationFormatTest.php @@ -44,7 +44,7 @@ public function testDefaultCompatibility(): void 'type' => 'auto', 'tracing' => [ 'enabled' => false, - 'exclude_commands' => [], + 'exclude_commands' => ['messenger:consume'], ], 'metering' => [ 'enabled' => false, diff --git a/tests/Unit/DependencyInjection/ConfigurationTest.php b/tests/Unit/DependencyInjection/ConfigurationTest.php index e3332a6d..1234a3cb 100644 --- a/tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/tests/Unit/DependencyInjection/ConfigurationTest.php @@ -58,7 +58,7 @@ public function testEmptyConfiguration(): void 'type' => 'auto', 'tracing' => [ 'enabled' => false, - 'exclude_commands' => [], + 'exclude_commands' => ['messenger:consume'], ], 'metering' => [ 'enabled' => false, @@ -177,7 +177,10 @@ public function testReferenceConfiguration(): void tracer: ~ # Exclude commands from auto instrumentation - exclude_commands: [] + exclude_commands: + + # Default: + - messenger:consume metering: enabled: false diff --git a/tests/Unit/Instrumentation/Symfony/Messenger/AddStampForPropagationMiddlewareTest.php b/tests/Unit/Instrumentation/Symfony/Messenger/AddStampForPropagationMiddlewareTest.php new file mode 100644 index 00000000..5cfd1fb4 --- /dev/null +++ b/tests/Unit/Instrumentation/Symfony/Messenger/AddStampForPropagationMiddlewareTest.php @@ -0,0 +1,116 @@ +scope())) { + $scope->detach(); + } + + $this->propagator = new MultiTextMapPropagator([TraceContextPropagator::getInstance()]); + $this->traceStampPropagator = new TraceStampPropagator(); + } + + public function testSkipsInjectionWhenTraceStampAlreadyPresent(): void + { + $middleware = new AddStampForPropagationMiddleware($this->propagator, $this->traceStampPropagator); + + $originalStamp = new TraceStamp('00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01'); + $envelope = new Envelope(new \stdClass(), [$originalStamp]); + + $capturedEnvelope = null; + $stack = $this->createStackMock($capturedEnvelope); + + $middleware->handle($envelope, $stack); + + self::assertNotNull($capturedEnvelope); + self::assertCount(1, $capturedEnvelope->all(TraceStamp::class)); + self::assertSame($originalStamp, $capturedEnvelope->last(TraceStamp::class)); + } + + public function testDoesNotInjectWhenNoActiveScope(): void + { + $middleware = new AddStampForPropagationMiddleware($this->propagator, $this->traceStampPropagator); + + $envelope = new Envelope(new \stdClass()); + + $capturedEnvelope = null; + $stack = $this->createStackMock($capturedEnvelope); + + $middleware->handle($envelope, $stack); + + self::assertNotNull($capturedEnvelope); + self::assertNull($capturedEnvelope->last(TraceStamp::class)); + } + + public function testInjectsTraceStampWhenScopeIsActive(): void + { + $tracerProvider = new TracerProvider(new SimpleSpanProcessor(new InMemoryExporter())); + $tracer = $tracerProvider->getTracer('test'); + $span = $tracer->spanBuilder('test')->startSpan(); + $scope = $span->activate(); + + try { + $middleware = new AddStampForPropagationMiddleware($this->propagator, $this->traceStampPropagator); + + $envelope = new Envelope(new \stdClass()); + + $capturedEnvelope = null; + $stack = $this->createStackMock($capturedEnvelope); + + $middleware->handle($envelope, $stack); + + self::assertNotNull($capturedEnvelope); + + $traceStamp = $capturedEnvelope->last(TraceStamp::class); + self::assertNotNull($traceStamp); + self::assertMatchesRegularExpression( + '/^00-[0-9a-f]{32}-[0-9a-f]{16}-[0-9a-f]{2}$/', + $traceStamp->getTraceParent(), + ); + } finally { + $scope->detach(); + $span->end(); + } + } + + private function createStackMock(?Envelope &$capturedEnvelope): StackInterface + { + $nextMiddleware = $this->createMock(MiddlewareInterface::class); + $nextMiddleware->expects($this->once()) + ->method('handle') + ->willReturnCallback(function (Envelope $envelope, StackInterface $stack) use (&$capturedEnvelope) { + $capturedEnvelope = $envelope; + + return $envelope; + }); + + $stack = $this->createMock(StackInterface::class); + $stack->method('next')->willReturn($nextMiddleware); + + return $stack; + } +} diff --git a/tests/Unit/Instrumentation/Symfony/Messenger/TraceableMessengerMiddlewareTest.php b/tests/Unit/Instrumentation/Symfony/Messenger/TraceableMessengerMiddlewareTest.php new file mode 100644 index 00000000..c95d8933 --- /dev/null +++ b/tests/Unit/Instrumentation/Symfony/Messenger/TraceableMessengerMiddlewareTest.php @@ -0,0 +1,175 @@ +scope())) { + $scope->detach(); + } + + $this->exporter = new InMemoryExporter(); + $tracerProvider = new TracerProvider(new SimpleSpanProcessor($this->exporter)); + + $this->middleware = new TraceableMessengerMiddleware( + $tracerProvider->getTracer('test'), + ); + } + + protected function tearDown(): void + { + while (null !== ($scope = Context::storage()->scope())) { + $scope->detach(); + } + } + + public function testHandleCreatesSpanWithCorrectAttributes(): void + { + $envelope = new Envelope(new \stdClass()); + $stack = $this->createPassthroughStack(); + + $this->middleware->handle($envelope, $stack); + + $spans = $this->exporter->getSpans(); + self::assertCount(1, $spans); + self::assertSame('messenger.middleware', $spans[0]->getName()); + self::assertSame(SpanKind::KIND_INTERNAL, $spans[0]->getKind()); + self::assertSame('messenger.middleware', $spans[0]->getAttributes()->get('symfony.messenger.event.category')); + self::assertSame('default', $spans[0]->getAttributes()->get('symfony.messenger.bus.name')); + self::assertNotNull($spans[0]->getAttributes()->get('symfony.messenger.event.current')); + } + + public function testHandleSetsStatusOkOnSuccess(): void + { + $envelope = new Envelope(new \stdClass()); + $stack = $this->createPassthroughStack(); + + $this->middleware->handle($envelope, $stack); + + $spans = $this->exporter->getSpans(); + self::assertCount(1, $spans); + self::assertSame(StatusCode::STATUS_OK, $spans[0]->getStatus()->getCode()); + self::assertCount(0, $spans[0]->getEvents()); + } + + public function testHandleSetsStatusErrorOnException(): void + { + $envelope = new Envelope(new \stdClass()); + $exception = new \RuntimeException('Something went wrong'); + $stack = $this->createThrowingStack($exception); + + try { + $this->middleware->handle($envelope, $stack); + self::fail('Expected RuntimeException was not thrown'); + } catch (\RuntimeException $caught) { + self::assertSame($exception, $caught); + } + + $spans = $this->exporter->getSpans(); + self::assertCount(1, $spans); + self::assertSame(StatusCode::STATUS_ERROR, $spans[0]->getStatus()->getCode()); + self::assertSame('Something went wrong', $spans[0]->getStatus()->getDescription()); + self::assertCount(1, $spans[0]->getEvents()); + self::assertSame('exception', $spans[0]->getEvents()[0]->getName()); + } + + public function testHandleSetsStatusErrorOnError(): void + { + $envelope = new Envelope(new \stdClass()); + $error = new \Error('Type error'); + $stack = $this->createThrowingStack($error); + + try { + $this->middleware->handle($envelope, $stack); + self::fail('Expected Error was not thrown'); + } catch (\Error $caught) { + self::assertSame($error, $caught); + } + + $spans = $this->exporter->getSpans(); + self::assertCount(1, $spans); + self::assertSame(StatusCode::STATUS_ERROR, $spans[0]->getStatus()->getCode()); + self::assertSame('Type error', $spans[0]->getStatus()->getDescription()); + self::assertCount(1, $spans[0]->getEvents()); + self::assertSame('exception', $spans[0]->getEvents()[0]->getName()); + } + + public function testStopIsIdempotent(): void + { + $envelope = new Envelope(new \stdClass()); + $stack = $this->createPassthroughStack(); + + $this->middleware->handle($envelope, $stack); + + $spans = $this->exporter->getSpans(); + self::assertCount(1, $spans); + } + + public function testCustomBusNameAndEventCategory(): void + { + $exporter = new InMemoryExporter(); + $tracerProvider = new TracerProvider(new SimpleSpanProcessor($exporter)); + + $middleware = new TraceableMessengerMiddleware( + $tracerProvider->getTracer('test'), + busName: 'command.bus', + eventCategory: 'custom.category', + ); + + $envelope = new Envelope(new \stdClass()); + $stack = $this->createPassthroughStack(); + + $middleware->handle($envelope, $stack); + + $spans = $exporter->getSpans(); + self::assertCount(1, $spans); + self::assertSame('custom.category', $spans[0]->getAttributes()->get('symfony.messenger.event.category')); + self::assertSame('command.bus', $spans[0]->getAttributes()->get('symfony.messenger.bus.name')); + self::assertStringContainsString('on "command.bus"', $spans[0]->getAttributes()->get('symfony.messenger.event.current')); + } + + private function createPassthroughStack(): StackInterface + { + $nextMiddleware = $this->createMock(MiddlewareInterface::class); + $nextMiddleware->method('handle') + ->willReturnCallback(fn (Envelope $envelope) => $envelope); + + $stack = $this->createMock(StackInterface::class); + $stack->method('next')->willReturn($nextMiddleware); + + return $stack; + } + + private function createThrowingStack(\Throwable $throwable): StackInterface + { + $nextMiddleware = $this->createMock(MiddlewareInterface::class); + $nextMiddleware->method('handle') + ->willThrowException($throwable); + + $stack = $this->createMock(StackInterface::class); + $stack->method('next')->willReturn($nextMiddleware); + + return $stack; + } +} diff --git a/tests/Unit/Instrumentation/Symfony/Messenger/TraceableMessengerTransportTest.php b/tests/Unit/Instrumentation/Symfony/Messenger/TraceableMessengerTransportTest.php new file mode 100644 index 00000000..68494cd3 --- /dev/null +++ b/tests/Unit/Instrumentation/Symfony/Messenger/TraceableMessengerTransportTest.php @@ -0,0 +1,109 @@ +exporter = new InMemoryExporter(); + $tracerProvider = new TracerProvider(new SimpleSpanProcessor($this->exporter)); + + $innerTransport = $this->createMock(TransportInterface::class); + $innerTransport->method('get')->willThrowException(new TransportException('Connection failed')); + $innerTransport->method('send')->willThrowException(new TransportException('Send failed')); + $innerTransport->method('ack')->willThrowException(new TransportException('Ack failed')); + $innerTransport->method('reject')->willThrowException(new TransportException('Reject failed')); + + $this->transport = new TraceableMessengerTransport( + $innerTransport, + $tracerProvider->getTracer('test'), + ); + } + + public function testGetRethrowsTransportExceptionAndRecordsError(): void + { + try { + iterator_to_array($this->transport->get()); + self::fail('Expected TransportException was not thrown'); + } catch (TransportException $e) { + self::assertSame('Connection failed', $e->getMessage()); + } + + $spans = $this->exporter->getSpans(); + self::assertCount(1, $spans); + self::assertSame('messenger.transport.get', $spans[0]->getName()); + self::assertSame(StatusCode::STATUS_ERROR, $spans[0]->getStatus()->getCode()); + self::assertSame('Connection failed', $spans[0]->getStatus()->getDescription()); + self::assertCount(1, $spans[0]->getEvents()); + self::assertSame('exception', $spans[0]->getEvents()[0]->getName()); + } + + public function testSendRethrowsTransportExceptionAndRecordsError(): void + { + try { + $this->transport->send(new Envelope(new \stdClass())); + self::fail('Expected TransportException was not thrown'); + } catch (TransportException $e) { + self::assertSame('Send failed', $e->getMessage()); + } + + $spans = $this->exporter->getSpans(); + self::assertCount(1, $spans); + self::assertSame('messenger.transport.send', $spans[0]->getName()); + self::assertSame(StatusCode::STATUS_ERROR, $spans[0]->getStatus()->getCode()); + self::assertSame('Send failed', $spans[0]->getStatus()->getDescription()); + self::assertCount(1, $spans[0]->getEvents()); + } + + public function testAckRethrowsTransportExceptionAndRecordsError(): void + { + try { + $this->transport->ack(new Envelope(new \stdClass())); + self::fail('Expected TransportException was not thrown'); + } catch (TransportException $e) { + self::assertSame('Ack failed', $e->getMessage()); + } + + $spans = $this->exporter->getSpans(); + self::assertCount(1, $spans); + self::assertSame('messenger.transport.ack', $spans[0]->getName()); + self::assertSame(StatusCode::STATUS_ERROR, $spans[0]->getStatus()->getCode()); + self::assertSame('Ack failed', $spans[0]->getStatus()->getDescription()); + self::assertCount(1, $spans[0]->getEvents()); + } + + public function testRejectRethrowsTransportExceptionAndRecordsError(): void + { + try { + $this->transport->reject(new Envelope(new \stdClass())); + self::fail('Expected TransportException was not thrown'); + } catch (TransportException $e) { + self::assertSame('Reject failed', $e->getMessage()); + } + + $spans = $this->exporter->getSpans(); + self::assertCount(1, $spans); + self::assertSame('messenger.transport.reject', $spans[0]->getName()); + self::assertSame(StatusCode::STATUS_ERROR, $spans[0]->getStatus()->getCode()); + self::assertSame('Reject failed', $spans[0]->getStatus()->getDescription()); + self::assertCount(1, $spans[0]->getEvents()); + } +} diff --git a/tests/Unit/Instrumentation/Symfony/Messenger/WorkerMessageEventSubscriberTest.php b/tests/Unit/Instrumentation/Symfony/Messenger/WorkerMessageEventSubscriberTest.php new file mode 100644 index 00000000..17c45601 --- /dev/null +++ b/tests/Unit/Instrumentation/Symfony/Messenger/WorkerMessageEventSubscriberTest.php @@ -0,0 +1,180 @@ +scope())) { + $scope->detach(); + } + + $this->exporter = new InMemoryExporter(); + $tracerProvider = new TracerProvider(new SimpleSpanProcessor($this->exporter)); + $this->tracer = $tracerProvider->getTracer('test'); + $this->logger = $this->createMock(LoggerInterface::class); + + $this->subscriber = new WorkerMessageEventSubscriber( + new MultiTextMapPropagator([]), + $this->tracer, + new ServiceLocator([]), + new TraceStampPropagator(), + $this->logger, + ); + } + + protected function tearDown(): void + { + while (null !== ($scope = Context::storage()->scope())) { + $scope->detach(); + } + } + + public function testGetSubscribedEventsReturnsCorrectMap(): void + { + $events = WorkerMessageEventSubscriber::getSubscribedEvents(); + + self::assertArrayHasKey(WorkerMessageReceivedEvent::class, $events); + self::assertArrayHasKey(WorkerMessageFailedEvent::class, $events); + self::assertArrayHasKey(WorkerMessageHandledEvent::class, $events); + + self::assertSame([['startSpan', 10000]], $events[WorkerMessageReceivedEvent::class]); + self::assertSame([['endSpanOnError', -10000]], $events[WorkerMessageFailedEvent::class]); + self::assertSame([['endSpanWithSuccess', -10000]], $events[WorkerMessageHandledEvent::class]); + } + + public function testGetSubscribedServicesReturnsTracerInterface(): void + { + $services = WorkerMessageEventSubscriber::getSubscribedServices(); + + self::assertSame([TracerInterface::class], $services); + } + + public function testStartSpanCreatesConsumerSpan(): void + { + $envelope = new Envelope(new \stdClass()); + $event = new WorkerMessageReceivedEvent($envelope, 'main'); + + $this->subscriber->startSpan($event); + + // Verify scope was attached + $scope = Context::storage()->scope(); + self::assertNotNull($scope); + + // End the span via the handled event + $this->subscriber->endSpanWithSuccess(new WorkerMessageHandledEvent($envelope, 'main')); + + $spans = $this->exporter->getSpans(); + self::assertCount(1, $spans); + self::assertSame('main stdClass', $spans[0]->getName()); + self::assertSame(SpanKind::KIND_CONSUMER, $spans[0]->getKind()); + self::assertSame(StatusCode::STATUS_OK, $spans[0]->getStatus()->getCode()); + } + + public function testStartSpanSkipsWhenNotTraceable(): void + { + $this->subscriber->setInstrumentationType(InstrumentationTypeEnum::Attribute); + + $envelope = new Envelope(new \stdClass()); + $event = new WorkerMessageReceivedEvent($envelope, 'main'); + + $this->subscriber->startSpan($event); + + self::assertNull(Context::storage()->scope()); + self::assertCount(0, $this->exporter->getSpans()); + } + + public function testEndSpanWithSuccessLogsWhenNoScope(): void + { + $this->logger->expects($this->once()) + ->method('debug') + ->with('No active scope'); + + $envelope = new Envelope(new \stdClass()); + $this->subscriber->endSpanWithSuccess(new WorkerMessageHandledEvent($envelope, 'main')); + + self::assertCount(0, $this->exporter->getSpans()); + } + + public function testEndSpanOnErrorLogsWhenNoScope(): void + { + $this->logger->expects($this->once()) + ->method('debug') + ->with('No active scope'); + + $envelope = new Envelope(new \stdClass()); + $exception = new \RuntimeException('fail'); + $this->subscriber->endSpanOnError(new WorkerMessageFailedEvent($envelope, 'main', $exception)); + + self::assertCount(0, $this->exporter->getSpans()); + } + + public function testGetTracerFallsBackWhenTracerNotInLocator(): void + { + $fallbackTracer = (new TracerProvider(new SimpleSpanProcessor($this->exporter)))->getTracer('fallback'); + + $subscriber = new WorkerMessageEventSubscriber( + new MultiTextMapPropagator([]), + $fallbackTracer, + new ServiceLocator([]), + new TraceStampPropagator(), + $this->logger, + ); + $subscriber->setInstrumentationType(InstrumentationTypeEnum::Attribute); + + // FallbackTracerMessage has #[Traceable(tracer: 'open_telemetry.traces.tracers.fallback')] + // but the locator is empty, so it should fall back to the default tracer with a warning + $message = new FallbackTracerMessage('test'); + + $this->logger->expects($this->once()) + ->method('warning') + ->with(self::stringContains('not found in service locator')); + + $envelope = new Envelope($message); + $subscriber->startSpan(new WorkerMessageReceivedEvent($envelope, 'main')); + $subscriber->endSpanWithSuccess(new WorkerMessageHandledEvent($envelope, 'main')); + + $spans = $this->exporter->getSpans(); + self::assertCount(1, $spans); + } + + public function testDefaultInstrumentationTypeIsAuto(): void + { + // Without calling setInstrumentationType, the subscriber should use Auto mode + $envelope = new Envelope(new \stdClass()); + + $this->subscriber->startSpan(new WorkerMessageReceivedEvent($envelope, 'main')); + $this->subscriber->endSpanWithSuccess(new WorkerMessageHandledEvent($envelope, 'main')); + + self::assertCount(1, $this->exporter->getSpans()); + } +} diff --git a/tests/Unit/OpenTelemetry/Context/Propagator/PropagatorFactoryTest.php b/tests/Unit/OpenTelemetry/Context/Propagator/PropagatorFactoryTest.php new file mode 100644 index 00000000..b9e30d29 --- /dev/null +++ b/tests/Unit/OpenTelemetry/Context/Propagator/PropagatorFactoryTest.php @@ -0,0 +1,22 @@ +fields(); + + self::assertContains('traceparent', $fields); + self::assertContains('tracestate', $fields); + self::assertContains('baggage', $fields); + } +} diff --git a/tests/Unit/OpenTelemetry/Context/Propagator/TraceStampPropagatorTest.php b/tests/Unit/OpenTelemetry/Context/Propagator/TraceStampPropagatorTest.php new file mode 100644 index 00000000..ab6c8cd6 --- /dev/null +++ b/tests/Unit/OpenTelemetry/Context/Propagator/TraceStampPropagatorTest.php @@ -0,0 +1,141 @@ +propagator = new TraceStampPropagator(); + } + + public function testSetAddsTraceStampToEnvelope(): void + { + $carrier = new Envelope(new \stdClass()); + $traceParent = '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01'; + + $this->propagator->set($carrier, TraceContextPropagator::TRACEPARENT, $traceParent); + + $stamp = $carrier->last(TraceStamp::class); + self::assertNotNull($stamp); + self::assertSame($traceParent, $stamp->getTraceParent()); + } + + public function testSetTracestateUpdatesExistingStamp(): void + { + $carrier = new Envelope(new \stdClass()); + $traceParent = '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01'; + + $this->propagator->set($carrier, TraceContextPropagator::TRACEPARENT, $traceParent); + $this->propagator->set($carrier, TraceContextPropagator::TRACESTATE, 'vendor=value'); + + $stamp = $carrier->last(TraceStamp::class); + self::assertNotNull($stamp); + self::assertSame($traceParent, $stamp->getTraceParent()); + self::assertSame('vendor=value', $stamp->getTraceState()); + } + + public function testSetTracestateIgnoredWithoutExistingStamp(): void + { + $carrier = new Envelope(new \stdClass()); + + $this->propagator->set($carrier, TraceContextPropagator::TRACESTATE, 'vendor=value'); + + self::assertNull($carrier->last(TraceStamp::class)); + } + + public function testSetIgnoresUnknownKey(): void + { + $carrier = new Envelope(new \stdClass()); + + $this->propagator->set($carrier, 'baggage', 'key=value'); + + self::assertNull($carrier->last(TraceStamp::class)); + } + + public function testSetThrowsOnInvalidCarrier(): void + { + $carrier = 'not-an-envelope'; + + $this->expectException(\InvalidArgumentException::class); + $this->propagator->set($carrier, TraceContextPropagator::TRACEPARENT, 'value'); + } + + public function testGetReturnsTraceParentFromStamp(): void + { + $traceParent = '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01'; + $carrier = new Envelope(new \stdClass(), [new TraceStamp($traceParent)]); + + $result = $this->propagator->get($carrier, TraceContextPropagator::TRACEPARENT); + + self::assertSame($traceParent, $result); + } + + public function testGetReturnsNullWhenNoTraceStamp(): void + { + $carrier = new Envelope(new \stdClass()); + + $result = $this->propagator->get($carrier, TraceContextPropagator::TRACEPARENT); + + self::assertNull($result); + } + + public function testGetReturnsTraceStateFromStamp(): void + { + $carrier = new Envelope(new \stdClass(), [new TraceStamp('00-abc-def-01', 'vendor=value')]); + + $result = $this->propagator->get($carrier, TraceContextPropagator::TRACESTATE); + + self::assertSame('vendor=value', $result); + } + + public function testGetReturnsNullTraceStateWhenNotSet(): void + { + $carrier = new Envelope(new \stdClass(), [new TraceStamp('00-abc-def-01')]); + + $result = $this->propagator->get($carrier, TraceContextPropagator::TRACESTATE); + + self::assertNull($result); + } + + public function testGetReturnsNullForUnknownKey(): void + { + $traceParent = '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01'; + $carrier = new Envelope(new \stdClass(), [new TraceStamp($traceParent)]); + + $result = $this->propagator->get($carrier, 'baggage'); + + self::assertNull($result); + } + + public function testGetThrowsOnInvalidCarrier(): void + { + $this->expectException(\InvalidArgumentException::class); + $this->propagator->get('not-an-envelope', TraceContextPropagator::TRACEPARENT); + } + + public function testKeysReturnsBothHeaders(): void + { + $carrier = new Envelope(new \stdClass()); + + $keys = $this->propagator->keys($carrier); + + self::assertSame([TraceContextPropagator::TRACEPARENT, TraceContextPropagator::TRACESTATE], $keys); + } + + public function testKeysThrowsOnInvalidCarrier(): void + { + $this->expectException(\InvalidArgumentException::class); + $this->propagator->keys('not-an-envelope'); + } +}