Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"ext-grpc": "*",
"ext-mbstring": "*",
"ext-opentelemetry": "*",
"ext-rdkafka": "*",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extension should be suggested too.

"ext-pdo": "*",
"ext-pdo_sqlite": "*",
"ext-xdebug": "*",
Expand Down
15 changes: 8 additions & 7 deletions docs/src/instrumentation/traces.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ A DSN starts with a transport and an exporter separated by a `+` character. The

Here is table list of the available transport and exporter for traces:

| Transport | Exporter | Description | Example | Default |
|-----------|-----------|--------------------------------------------------------------|-------------------------------------------|--------------|
| http(s) | otlp | OpenTelemetry exporter using HTTP protocol (over TLS) | http+otlp://localhost:4318/v1/traces | N/A |
| grpc(s) | otlp | OpenTelemetry exporter using gRPC protocol (over TLS) | grpc+otlp://localhost:4317 | N/A |
| http(s) | zipkin | Zipkin exporter using HTTP protocol (over TLS) | http+zipkin://localhost:9411/api/v2/spans | N/A |
| empty | in-memory | In-memory exporter for testing purpose | in-memory://default | N/A |
| stream | console | Console exporter for testing purpose using a stream resource | stream+console://default | php://stdout |
| Transport | Exporter | Description | Example | Default |
|-----------|-----------|---------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|--------------|
| http(s) | otlp | OpenTelemetry exporter using HTTP protocol (over TLS) | http+otlp://localhost:4318/v1/traces | N/A |
| grpc(s) | otlp | OpenTelemetry exporter using gRPC protocol (over TLS) | grpc+otlp://localhost:4317 | N/A |
| kafka | otlp | OpenTelemetry exporter using the Kafka message broker. Add query parameters for configuring the message broker. | kafka+otlp://open_telemetry_local_alpha_traces?metadata.broker.list=kafka:9092 | N/A |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The open_telemetry_local_alpha_traces is ambiguous compared to the other examples, we should leave to a "standard" host:port example.

| http(s) | zipkin | Zipkin exporter using HTTP protocol (over TLS) | http+zipkin://localhost:9411/api/v2/spans | N/A |
| empty | in-memory | In-memory exporter for testing purpose | in-memory://default | N/A |
| stream | console | Console exporter for testing purpose using a stream resource | stream+console://default | php://stdout |

Note: The `stream+console` DSN is the only DSN than can refer to a stream resource using the `path` block. For example: `stream+console://default/file.log`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ private function requestAttributes(Request $request): iterable
/**
* @param array<string> $headers
*
* @return array<string, mixed>
* @return \Generator<string, array<int, string>>
*/
private function headerAttributes(HeaderBag $headerBag, array $headers): iterable
private function headerAttributes(HeaderBag $headerBag, array $headers): \Generator
{
foreach ($headers as $header => $attribute) {
if ($headerBag->has($header)) {
Expand Down
5 changes: 5 additions & 0 deletions src/OpenTelemetry/Exporter/ConsoleExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ public function getExporter(): string
{
return $this->dsn->getExporter();
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
6 changes: 6 additions & 0 deletions src/OpenTelemetry/Exporter/ExporterDsn.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Zenstruck\Dsn;
use Zenstruck\Uri;
use Zenstruck\Uri\Part\Query;

final class ExporterDsn
{
Expand Down Expand Up @@ -60,6 +61,11 @@ public function getPort(?int $default = null): ?int
return $this->uri->port() ?? $default;
}

public function getQuery(): Query
{
return $this->uri->query();
}

/**
* @return string[]
*/
Expand Down
2 changes: 2 additions & 0 deletions src/OpenTelemetry/Exporter/ExporterEndpointInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ public function getExporter(): string;
public function getTransport(): ?string;

public static function fromDsn(ExporterDsn $dsn): self;

public function getDsn(): ExporterDsn;
}
9 changes: 9 additions & 0 deletions src/OpenTelemetry/Exporter/OtlpExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public function withSignal(string $signal): self

public function __toString()
{
if (TransportEnum::Kafka === $this->transport) {
return \sprintf('kafka://%s?%s', $this->dsn->getHost(), $this->dsn->getQuery()->toString());
}

Comment on lines +45 to +48
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this change?

$uri = $this->uriFactory->createUri();
$uri = $uri
->withScheme($this->transport->getScheme())
Expand Down Expand Up @@ -78,4 +82,9 @@ public function getExporter(): string
{
return 'otlp';
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
5 changes: 5 additions & 0 deletions src/OpenTelemetry/Log/LogExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ public function getExporter(): string
{
return $this->exporter->value;
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
5 changes: 5 additions & 0 deletions src/OpenTelemetry/Metric/MetricExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ public function getExporter(): string
{
return $this->exporter->value;
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
5 changes: 5 additions & 0 deletions src/OpenTelemetry/Trace/TraceExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,9 @@ public function getTransport(): ?string
{
return $this->transport?->value;
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
5 changes: 5 additions & 0 deletions src/OpenTelemetry/Trace/ZipkinExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ public function getExporter(): string
{
return 'zipkin';
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
85 changes: 85 additions & 0 deletions src/OpenTelemetry/Transport/KafkaTransport.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

declare(strict_types=1);

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport;

use OpenTelemetry\SDK\Common\Export\TransportInterface;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Future\CompletedFuture;
use OpenTelemetry\SDK\Common\Future\ErrorFuture;
use OpenTelemetry\SDK\Common\Future\FutureInterface;
use RdKafka\Conf;
use RdKafka\Producer;
use RdKafka\ProducerTopic;

/**
* @template-implements TransportInterface<"application/x-protobuf">
*/
final readonly class KafkaTransport implements TransportInterface
{
private const FLUSH_TIMEOUT = 10000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I don't have any problem with having a default value, I think this should be configurable from the dsn and documented.


private Producer $producer;
private ProducerTopic $topicHandle;

public function __construct(
Conf $configuration,
string $topic,
) {
if (!\class_exists(Conf::class)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wont work because PHP resolves RdKafka\Conf before the check, so the friendly exception won't be thrown:

Error: Class "RdKafka\Conf" not found

throw new \RuntimeException('The PHP extension "rdkafka" is required to use the Kafka transport.');
}

$this->producer = new Producer($configuration);
$this->topicHandle = $this->producer->newTopic($topic);
}

public function contentType(): string
{
return 'application/x-protobuf';
}

/**
* @phpstan-return FutureInterface<null>
*/
public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface
{
try {
$this->topicHandle->producev(\RD_KAFKA_PARTITION_UA, 0, $payload);
} catch (\Throwable $exception) {
return new ErrorFuture($exception);
}

return new CompletedFuture(null);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if an error occurs while queuing the payload, the new CompletedFuture(null); silences it.

}

public function shutdown(?CancellationInterface $cancellation = null): bool
{
return $this->flushInternal();
}

public function forceFlush(?CancellationInterface $cancellation = null): bool
{
return $this->flushInternal();
}

private function flushInternal(): bool
{
// librdkafka recommends retrying the flush operation a couple of times when it returns a null result.
$timeout = self::FLUSH_TIMEOUT;
$start = \microtime(true);
do {
$res = $this->producer->flush($timeout);
if (\RD_KAFKA_RESP_ERR_NO_ERROR === $res) {
return true;
}

// reduce timeout
$elapsedMs = (int) \round((\microtime(true) - $start) * 1000);
$timeout = \max(0, self::FLUSH_TIMEOUT - $elapsedMs);
} while ($timeout > 0);

return false;
}
Comment on lines +67 to +84
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This internal flush mechanic needs documentation, I have never used Kafka but I think we should provide incentives about how the transport is working under the hood.

Also, non-success errors seems to be treated equally which could hide permanent errors right?

}
30 changes: 30 additions & 0 deletions src/OpenTelemetry/Transport/KafkaTransportFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport;

use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterEndpointInterface;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterOptionsInterface;
use OpenTelemetry\SDK\Common\Export\TransportInterface;
use RdKafka\Conf;

final readonly class KafkaTransportFactory implements TransportFactoryInterface
{
public function supports(#[\SensitiveParameter] ExporterEndpointInterface $endpoint, ExporterOptionsInterface $options): bool
{
return TransportEnum::Kafka === TransportEnum::tryFrom($endpoint->getTransport());
}

public function createTransport(#[\SensitiveParameter] ExporterEndpointInterface $endpoint, ExporterOptionsInterface $options): TransportInterface
{
$dsn = $endpoint->getDsn();
$queryParameters = $dsn->getQuery()->all();
$conf = new Conf();
foreach ($queryParameters as $k => $v) {
$conf->set(\str_replace('_', '.', $k), (string) $v);
}

return new KafkaTransport($conf, $dsn->getHost());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document this transformation.

}
2 changes: 2 additions & 0 deletions src/OpenTelemetry/Transport/TransportEnum.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ enum TransportEnum: string
case Http = 'http';
case Https = 'https';
case Stream = 'stream';
case Kafka = 'kafka';

public function getScheme(): ?string
{
return match ($this) {
self::Http, self::Grpc => 'http',
self::Https, self::Grpcs => 'https',
self::Kafka => 'kafka',
default => null,
};
}
Expand Down
4 changes: 4 additions & 0 deletions src/Resources/config/services_transports.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\AbstractTransportFactory;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\GrpcTransportFactory;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\KafkaTransportFactory;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\OtlpHttpTransportFactory;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\PsrHttpTransportFactory;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\StreamTransportFactory;
Expand Down Expand Up @@ -39,6 +40,9 @@
->parent('open_telemetry.transport_factory.abstract')
->tag('open_telemetry.transport_factory')

->set('open_telemetry.transport_factory.kafka', KafkaTransportFactory::class)
->tag('open_telemetry.transport_factory')

->set('open_telemetry.transport_factory', TransportFactory::class)
->args([
tagged_iterator('open_telemetry.transport_factory'),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

declare(strict_types=1);

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Tests\Unit\OpenTelemetry\Transport;

use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\EmptyExporterOptions;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterDsn;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterEndpointInterface;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterOptionsInterface;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\OtlpExporterOptions;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Log\LogExporterEndpoint;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Metric\MetricExporterEndpoint;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Metric\MetricExporterOptions;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Trace\TraceExporterEndpoint;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\KafkaTransportFactory;
use PHPUnit\Framework\Attributes\CoversClass;
use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\TestCase;

#[CoversClass(KafkaTransportFactory::class)]
final class KafkaTransportFactoryTest extends TestCase
{
#[DataProvider('exporterProvider')]
public function testCreateTransportFromExporter(
ExporterEndpointInterface $endpoint,
ExporterOptionsInterface $options,
bool $shouldSupport,
): void {
$factory = new KafkaTransportFactory();

self::assertSame($shouldSupport, $factory->supports($endpoint, $options));

if ($shouldSupport) {
$transport = $factory->createTransport($endpoint, $options);
self::assertSame('application/x-protobuf', $transport->contentType());
}
}

/**
* @return \Generator<array{
* 0: ExporterEndpointInterface,
* 1: ExporterOptionsInterface,
* 2: bool
* }>
*/
public static function exporterProvider(): \Generator
{
// Kafka for traces
yield [
TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-traces?metadata_broker_list=localhost:9092')),
new OtlpExporterOptions(),
true,
];

// Kafka for metrics
yield [
MetricExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-metrics?metadata_broker_list=localhost:9092')),
new MetricExporterOptions(),
true,
];

// Kafka for logs
yield [
LogExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-logs?metadata_broker_list=localhost:9092')),
new OtlpExporterOptions(),
true,
];

// Not Kafka transports should not be supported
yield [
TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('grpc+otlp://localhost')),
new OtlpExporterOptions(),
false,
];

yield [
TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('http+otlp://localhost')),
new OtlpExporterOptions(),
false,
];

yield [
TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('stream+console://default')),
new EmptyExporterOptions(),
false,
];
}
}
Loading