Skip to content

feat(OpenTelemetry/Transport): implement Kafka transport#196

Draft
GeoDpto wants to merge 1 commit intoFriendsOfOpenTelemetry:mainfrom
paycoreio:feature/added-support-for-kafka-transport
Draft

feat(OpenTelemetry/Transport): implement Kafka transport#196
GeoDpto wants to merge 1 commit intoFriendsOfOpenTelemetry:mainfrom
paycoreio:feature/added-support-for-kafka-transport

Conversation

@GeoDpto
Copy link
Copy Markdown

@GeoDpto GeoDpto commented Sep 12, 2025

Summary
Our project encountered performance bottlenecks due to synchronous communication between services and the OpenTelemetry Collector.

Problem
The synchronous transport introduced delays and reduced throughput when exporting telemetry data.

Solution
OpenTelemetry provides different transport protocols for exporting data. In production, it's recommended to integrate the OpenTelemetry Collector, which can consume Kafka topics using the Protobuf serialization, so we chose this option. This enables asynchronous communication between services.
This merge request implements the Kafka transport and factory providing possibility to process data asynchronously.

Changes

  • Implemented the Kafka transport
  • Added a factory for the Kafka transport
  • Added a new required method to the Endpoint contract

@GeoDpto GeoDpto requested a review from a team as a code owner September 12, 2025 10:02
@GeoDpto GeoDpto force-pushed the feature/added-support-for-kafka-transport branch from ed4ff11 to 18ea244 Compare September 12, 2025 10:13
Copy link
Copy Markdown
Contributor

@gaelreyrol gaelreyrol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @GeoDpto, thank you very much for this pull request!

A few comments outside of your changes:

  1. TransportFactoryTest was not updated with Kafka test case.
  2. Remove all the declare(strict_types=1) usage.

Comment on lines +67 to +84
private function flushInternal(): bool
{
// librdkafka recommends retrying the flush operation a couple of times when it returns a null result.
$timeout = self::FLUSH_TIMEOUT;
$start = \microtime(true);
do {
$res = $this->producer->flush($timeout);
if (\RD_KAFKA_RESP_ERR_NO_ERROR === $res) {
return true;
}

// reduce timeout
$elapsedMs = (int) \round((\microtime(true) - $start) * 1000);
$timeout = \max(0, self::FLUSH_TIMEOUT - $elapsedMs);
} while ($timeout > 0);

return false;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This internal flush mechanic needs documentation, I have never used Kafka but I think we should provide incentives about how the transport is working under the hood.

Also, non-success errors seems to be treated equally which could hide permanent errors right?

}

return new KafkaTransport($conf, $dsn->getHost());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document this transformation.

*/
final readonly class KafkaTransport implements TransportInterface
{
private const FLUSH_TIMEOUT = 10000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I don't have any problem with having a default value, I think this should be configurable from the dsn and documented.

|-----------|-----------|---------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|--------------|
| http(s) | otlp | OpenTelemetry exporter using HTTP protocol (over TLS) | http+otlp://localhost:4318/v1/traces | N/A |
| grpc(s) | otlp | OpenTelemetry exporter using gRPC protocol (over TLS) | grpc+otlp://localhost:4317 | N/A |
| kafka | otlp | OpenTelemetry exporter using the Kafka message broker. Add query parameters for configuring the message broker. | kafka+otlp://open_telemetry_local_alpha_traces?metadata.broker.list=kafka:9092 | N/A |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The open_telemetry_local_alpha_traces is ambiguous compared to the other examples, we should leave to a "standard" host:port example.

Comment on lines +17 to +18
if (!\class_exists(Conf::class)) {
self::markTestSkipped('rdkafka extension not available in the test environment.');
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not skip it 🙂

Comment on lines +45 to +48
if (TransportEnum::Kafka === $this->transport) {
return \sprintf('kafka://%s?%s', $this->dsn->getHost(), $this->dsn->getQuery()->toString());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this change?

Conf $configuration,
string $topic,
) {
if (!\class_exists(Conf::class)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wont work because PHP resolves RdKafka\Conf before the check, so the friendly exception won't be thrown:

Error: Class "RdKafka\Conf" not found

Comment thread composer.json
"ext-grpc": "*",
"ext-mbstring": "*",
"ext-opentelemetry": "*",
"ext-rdkafka": "*",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extension should be suggested too.

return new ErrorFuture($exception);
}

return new CompletedFuture(null);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if an error occurs while queuing the payload, the new CompletedFuture(null); silences it.

@gaelreyrol gaelreyrol changed the title Added the Kafka transport implementation feat(OpenTelemetry/Transport): implement Kafka transport Apr 10, 2026
@gaelreyrol gaelreyrol added this to the Beta milestone Apr 10, 2026
@gaelreyrol gaelreyrol marked this pull request as draft April 18, 2026 07:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants