Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -71,6 +73,7 @@ public static class ProducerBuilder {
private Map<String, Object> config;
private Serializer<String> keySerializer;
private Serializer<Collection<SpanData>> valueSerializer;
private MemoryMode memoryMode = MemoryMode.IMMUTABLE_DATA;

public static ProducerBuilder newInstance() {
return new ProducerBuilder();
Expand All @@ -97,6 +100,12 @@ public ProducerBuilder setValueSerializer(Serializer<Collection<SpanData>> value
return this;
}

@CanIgnoreReturnValue
public ProducerBuilder setMemoryMode(MemoryMode memoryMode) {
this.memoryMode = Objects.requireNonNull(memoryMode, "memoryMode");
return this;
}

public Producer<String, Collection<SpanData>> build() {
if (isNull(config)) {
throw new IllegalArgumentException("producer configuration cannot be null");
Expand All @@ -111,10 +120,24 @@ public Producer<String, Collection<SpanData>> build() {
throw new IllegalArgumentException(
"Both the key and value serializers should be provided either in the configuration or by using the corresponding setters");
}

// If user hasn't explicitly set valueSerializer, choose based on memoryMode
if (valueSerializer == null && !config.containsKey(VALUE_SERIALIZER_CLASS_CONFIG)) {
valueSerializer = createDefaultSerializer();
}

if (config.containsKey(KEY_SERIALIZER_CLASS_CONFIG)) {
return new KafkaProducer<>(config);
}
return new KafkaProducer<>(config, keySerializer, valueSerializer);
}

private Serializer<Collection<SpanData>> createDefaultSerializer() {
if (memoryMode == MemoryMode.REUSABLE_DATA) {
return new PooledSpanDataSerializer();
} else {
return new SpanDataSerializer();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.kafka;

import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Object pool optimized Span data serializer.
*
* <p>This serializer reduces memory allocations by reusing {@link
* LowAllocationTraceRequestMarshaler} instances and {@link ByteArrayOutputStream} buffers. It is
* thread-safe and supports concurrent serialization calls from Kafka.
*
* <p>Usage:
*
* <pre>{@code
* // Enable via MemoryMode configuration
* KafkaSpanExporter exporter = KafkaSpanExporter.newBuilder()
* .setProducer(
* KafkaSpanExporterBuilder.ProducerBuilder.newInstance()
* .setMemoryMode(MemoryMode.REUSABLE_DATA)
* .build())
* .build();
* }</pre>
*/
public final class PooledSpanDataSerializer implements Serializer<Collection<SpanData>> {

private static final Logger logger = LoggerFactory.getLogger(PooledSpanDataSerializer.class);

// Object pool: thread-safe lock-free queue
private final Deque<LowAllocationTraceRequestMarshaler> marshalerPool =
new ConcurrentLinkedDeque<>();

// Thread-local ByteArrayOutputStream to avoid contention
private final ThreadLocal<ByteArrayOutputStream> outputStreamHolder =
ThreadLocal.withInitial(() -> new ByteArrayOutputStream(4096));

// Maximum pool size to prevent unbounded growth
private static final int MAX_POOL_SIZE = 32;

@Override
public byte[] serialize(String topic, Collection<SpanData> data) {
if (Objects.isNull(data)) {
throw new SerializationException("Cannot serialize null");
}

if (data.isEmpty()) {
return new byte[0];
}

// 1. Acquire marshaler from pool
LowAllocationTraceRequestMarshaler marshaler = marshalerPool.poll();
if (marshaler == null) {
// Pool is empty, create new instance
marshaler = new LowAllocationTraceRequestMarshaler();
}

// 2. Get thread-local ByteArrayOutputStream
ByteArrayOutputStream baos = outputStreamHolder.get();
baos.reset();

try {
// 3. Initialize and serialize (Initialize-Use pattern)
marshaler.initialize(data);
marshaler.writeBinaryTo(baos);

// 4. Return result
return baos.toByteArray();

} catch (IOException e) {
throw new SerializationException("Failed to serialize span data", e);
} finally {
// 5. Reset and return marshaler to pool (Reset-Return pattern)
marshaler.reset();
returnToPool(marshaler);
}
}

/**
* Returns marshaler to pool, with size limit to prevent memory leaks.
*
* @param marshaler the marshaler to return
*/
private void returnToPool(LowAllocationTraceRequestMarshaler marshaler) {
if (marshalerPool.size() < MAX_POOL_SIZE) {
marshalerPool.offer(marshaler);
} else {
// Pool is full, discard the instance and let GC reclaim it
// This is a defensive strategy to prevent unbounded growth in exceptional cases
if (logger.isDebugEnabled()) {
logger.debug("Marshaler pool is full, discarding instance");
}
}
}

@Override
public void close() {
// Clean up resources
marshalerPool.clear();
outputStreamHolder.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@

package io.opentelemetry.contrib.kafka;

import static java.util.stream.Collectors.toList;

import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler;
import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
Expand All @@ -30,19 +25,20 @@ public byte[] serialize(String topic, Collection<SpanData> data) {
}

ExportTraceServiceRequest convertSpansToRequest(Collection<SpanData> spans) {
List<ResourceSpans> resourceSpansList =
Arrays.stream(ResourceSpansMarshaler.create(spans))
.map(
resourceSpansMarshaler -> {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
resourceSpansMarshaler.writeBinaryTo(baos);
return ResourceSpans.parseFrom(baos.toByteArray());
} catch (IOException e) {
throw new SerializationException(e);
}
})
.collect(toList());
// Use LowAllocationTraceRequestMarshaler for more efficient conversion
// This eliminates unnecessary serialization/deserialization cycles
LowAllocationTraceRequestMarshaler marshaler = new LowAllocationTraceRequestMarshaler();
try {
marshaler.initialize(spans);

return ExportTraceServiceRequest.newBuilder().addAllResourceSpans(resourceSpansList).build();
// Serialize to bytes and parse back to protobuf message
ByteArrayOutputStream baos = new ByteArrayOutputStream();
marshaler.writeBinaryTo(baos);
return ExportTraceServiceRequest.parseFrom(baos.toByteArray());
} catch (IOException e) {
throw new SerializationException(e);
} finally {
marshaler.reset();
}
}
}
Loading