-
Notifications
You must be signed in to change notification settings - Fork 10
feat(OpenTelemetry/Transport): implement Kafka transport #196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,13 +42,14 @@ A DSN starts with a transport and an exporter separated by a `+` character. The | |
|
|
||
| Here is table list of the available transport and exporter for traces: | ||
|
|
||
| | Transport | Exporter | Description | Example | Default | | ||
| |-----------|-----------|--------------------------------------------------------------|-------------------------------------------|--------------| | ||
| | http(s) | otlp | OpenTelemetry exporter using HTTP protocol (over TLS) | http+otlp://localhost:4318/v1/traces | N/A | | ||
| | grpc(s) | otlp | OpenTelemetry exporter using gRPC protocol (over TLS) | grpc+otlp://localhost:4317 | N/A | | ||
| | http(s) | zipkin | Zipkin exporter using HTTP protocol (over TLS) | http+zipkin://localhost:9411/api/v2/spans | N/A | | ||
| | empty | in-memory | In-memory exporter for testing purpose | in-memory://default | N/A | | ||
| | stream | console | Console exporter for testing purpose using a stream resource | stream+console://default | php://stdout | | ||
| | Transport | Exporter | Description | Example | Default | | ||
| |-----------|-----------|---------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|--------------| | ||
| | http(s) | otlp | OpenTelemetry exporter using HTTP protocol (over TLS) | http+otlp://localhost:4318/v1/traces | N/A | | ||
| | grpc(s) | otlp | OpenTelemetry exporter using gRPC protocol (over TLS) | grpc+otlp://localhost:4317 | N/A | | ||
| | kafka | otlp | OpenTelemetry exporter using the Kafka message broker. Add query parameters for configuring the message broker. | kafka+otlp://open_telemetry_local_alpha_traces?metadata.broker.list=kafka:9092 | N/A | | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| | http(s) | zipkin | Zipkin exporter using HTTP protocol (over TLS) | http+zipkin://localhost:9411/api/v2/spans | N/A | | ||
| | empty | in-memory | In-memory exporter for testing purpose | in-memory://default | N/A | | ||
| | stream | console | Console exporter for testing purpose using a stream resource | stream+console://default | php://stdout | | ||
|
|
||
| Note: The `stream+console` DSN is the only DSN than can refer to a stream resource using the `path` block. For example: `stream+console://default/file.log`. | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,10 @@ public function withSignal(string $signal): self | |
|
|
||
| public function __toString() | ||
| { | ||
| if (TransportEnum::Kafka === $this->transport) { | ||
| return \sprintf('kafka://%s?%s', $this->dsn->getHost(), $this->dsn->getQuery()->toString()); | ||
| } | ||
|
|
||
|
Comment on lines
+45
to
+48
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you elaborate on this change? |
||
| $uri = $this->uriFactory->createUri(); | ||
| $uri = $uri | ||
| ->withScheme($this->transport->getScheme()) | ||
|
|
@@ -78,4 +82,9 @@ public function getExporter(): string | |
| { | ||
| return 'otlp'; | ||
| } | ||
|
|
||
| public function getDsn(): ExporterDsn | ||
| { | ||
| return $this->dsn; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| <?php | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport; | ||
|
|
||
| use OpenTelemetry\SDK\Common\Export\TransportInterface; | ||
| use OpenTelemetry\SDK\Common\Future\CancellationInterface; | ||
| use OpenTelemetry\SDK\Common\Future\CompletedFuture; | ||
| use OpenTelemetry\SDK\Common\Future\ErrorFuture; | ||
| use OpenTelemetry\SDK\Common\Future\FutureInterface; | ||
| use RdKafka\Conf; | ||
| use RdKafka\Producer; | ||
| use RdKafka\ProducerTopic; | ||
|
|
||
| /** | ||
| * @template-implements TransportInterface<"application/x-protobuf"> | ||
| */ | ||
| final readonly class KafkaTransport implements TransportInterface | ||
| { | ||
| private const FLUSH_TIMEOUT = 10000; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While I don't have any problem with having a default value, I think this should be configurable from the dsn and documented. |
||
|
|
||
| private Producer $producer; | ||
| private ProducerTopic $topicHandle; | ||
|
|
||
| public function __construct( | ||
| Conf $configuration, | ||
| string $topic, | ||
| ) { | ||
| if (!\class_exists(Conf::class)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This wont work because PHP resolves RdKafka\Conf before the check, so the friendly exception won't be thrown: |
||
| throw new \RuntimeException('The PHP extension "rdkafka" is required to use the Kafka transport.'); | ||
| } | ||
|
|
||
| $this->producer = new Producer($configuration); | ||
| $this->topicHandle = $this->producer->newTopic($topic); | ||
| } | ||
|
|
||
| public function contentType(): string | ||
| { | ||
| return 'application/x-protobuf'; | ||
| } | ||
|
|
||
| /** | ||
| * @phpstan-return FutureInterface<null> | ||
| */ | ||
| public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface | ||
| { | ||
| try { | ||
| $this->topicHandle->producev(\RD_KAFKA_PARTITION_UA, 0, $payload); | ||
| } catch (\Throwable $exception) { | ||
| return new ErrorFuture($exception); | ||
| } | ||
|
|
||
| return new CompletedFuture(null); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if an error occurs while queuing the payload, the |
||
| } | ||
|
|
||
| public function shutdown(?CancellationInterface $cancellation = null): bool | ||
| { | ||
| return $this->flushInternal(); | ||
| } | ||
|
|
||
| public function forceFlush(?CancellationInterface $cancellation = null): bool | ||
| { | ||
| return $this->flushInternal(); | ||
| } | ||
|
|
||
| private function flushInternal(): bool | ||
| { | ||
| // librdkafka recommends retrying the flush operation a couple of times when it returns a null result. | ||
| $timeout = self::FLUSH_TIMEOUT; | ||
| $start = \microtime(true); | ||
| do { | ||
| $res = $this->producer->flush($timeout); | ||
| if (\RD_KAFKA_RESP_ERR_NO_ERROR === $res) { | ||
| return true; | ||
| } | ||
|
|
||
| // reduce timeout | ||
| $elapsedMs = (int) \round((\microtime(true) - $start) * 1000); | ||
| $timeout = \max(0, self::FLUSH_TIMEOUT - $elapsedMs); | ||
| } while ($timeout > 0); | ||
|
|
||
| return false; | ||
| } | ||
|
Comment on lines
+67
to
+84
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This internal flush mechanic needs documentation, I have never used Kafka but I think we should provide incentives about how the transport is working under the hood. Also, non-success errors seems to be treated equally which could hide permanent errors right? |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| <?php | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport; | ||
|
|
||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterEndpointInterface; | ||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterOptionsInterface; | ||
| use OpenTelemetry\SDK\Common\Export\TransportInterface; | ||
| use RdKafka\Conf; | ||
|
|
||
| final readonly class KafkaTransportFactory implements TransportFactoryInterface | ||
| { | ||
| public function supports(#[\SensitiveParameter] ExporterEndpointInterface $endpoint, ExporterOptionsInterface $options): bool | ||
| { | ||
| return TransportEnum::Kafka === TransportEnum::tryFrom($endpoint->getTransport()); | ||
| } | ||
|
|
||
| public function createTransport(#[\SensitiveParameter] ExporterEndpointInterface $endpoint, ExporterOptionsInterface $options): TransportInterface | ||
| { | ||
| $dsn = $endpoint->getDsn(); | ||
| $queryParameters = $dsn->getQuery()->all(); | ||
| $conf = new Conf(); | ||
| foreach ($queryParameters as $k => $v) { | ||
| $conf->set(\str_replace('_', '.', $k), (string) $v); | ||
| } | ||
|
|
||
| return new KafkaTransport($conf, $dsn->getHost()); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should document this transformation. |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| <?php | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Tests\Unit\OpenTelemetry\Transport; | ||
|
|
||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\EmptyExporterOptions; | ||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterDsn; | ||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterEndpointInterface; | ||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterOptionsInterface; | ||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\OtlpExporterOptions; | ||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Log\LogExporterEndpoint; | ||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Metric\MetricExporterEndpoint; | ||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Metric\MetricExporterOptions; | ||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Trace\TraceExporterEndpoint; | ||
| use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\KafkaTransportFactory; | ||
| use PHPUnit\Framework\Attributes\CoversClass; | ||
| use PHPUnit\Framework\Attributes\DataProvider; | ||
| use PHPUnit\Framework\TestCase; | ||
|
|
||
| #[CoversClass(KafkaTransportFactory::class)] | ||
| final class KafkaTransportFactoryTest extends TestCase | ||
| { | ||
| #[DataProvider('exporterProvider')] | ||
| public function testCreateTransportFromExporter( | ||
| ExporterEndpointInterface $endpoint, | ||
| ExporterOptionsInterface $options, | ||
| bool $shouldSupport, | ||
| ): void { | ||
| $factory = new KafkaTransportFactory(); | ||
|
|
||
| self::assertSame($shouldSupport, $factory->supports($endpoint, $options)); | ||
|
|
||
| if ($shouldSupport) { | ||
| $transport = $factory->createTransport($endpoint, $options); | ||
| self::assertSame('application/x-protobuf', $transport->contentType()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * @return \Generator<array{ | ||
| * 0: ExporterEndpointInterface, | ||
| * 1: ExporterOptionsInterface, | ||
| * 2: bool | ||
| * }> | ||
| */ | ||
| public static function exporterProvider(): \Generator | ||
| { | ||
| // Kafka for traces | ||
| yield [ | ||
| TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-traces?metadata_broker_list=localhost:9092')), | ||
| new OtlpExporterOptions(), | ||
| true, | ||
| ]; | ||
|
|
||
| // Kafka for metrics | ||
| yield [ | ||
| MetricExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-metrics?metadata_broker_list=localhost:9092')), | ||
| new MetricExporterOptions(), | ||
| true, | ||
| ]; | ||
|
|
||
| // Kafka for logs | ||
| yield [ | ||
| LogExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-logs?metadata_broker_list=localhost:9092')), | ||
| new OtlpExporterOptions(), | ||
| true, | ||
| ]; | ||
|
|
||
| // Not Kafka transports should not be supported | ||
| yield [ | ||
| TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('grpc+otlp://localhost')), | ||
| new OtlpExporterOptions(), | ||
| false, | ||
| ]; | ||
|
|
||
| yield [ | ||
| TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('http+otlp://localhost')), | ||
| new OtlpExporterOptions(), | ||
| false, | ||
| ]; | ||
|
|
||
| yield [ | ||
| TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('stream+console://default')), | ||
| new EmptyExporterOptions(), | ||
| false, | ||
| ]; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extension should be suggested too.