diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporterBuilder.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporterBuilder.java index d7f1d3c17..3b6a7c566 100644 --- a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporterBuilder.java +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporterBuilder.java @@ -11,9 +11,11 @@ import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.Collection; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.kafka.clients.producer.KafkaProducer; @@ -71,6 +73,7 @@ public static class ProducerBuilder { private Map config; private Serializer keySerializer; private Serializer> valueSerializer; + private MemoryMode memoryMode = MemoryMode.IMMUTABLE_DATA; public static ProducerBuilder newInstance() { return new ProducerBuilder(); @@ -97,6 +100,12 @@ public ProducerBuilder setValueSerializer(Serializer> value return this; } + @CanIgnoreReturnValue + public ProducerBuilder setMemoryMode(MemoryMode memoryMode) { + this.memoryMode = Objects.requireNonNull(memoryMode, "memoryMode"); + return this; + } + public Producer> build() { if (isNull(config)) { throw new IllegalArgumentException("producer configuration cannot be null"); @@ -111,10 +120,24 @@ public Producer> build() { throw new IllegalArgumentException( "Both the key and value serializers should be provided either in the configuration or by using the corresponding setters"); } + + // If user hasn't explicitly set valueSerializer, choose based on memoryMode + if (valueSerializer == null && !config.containsKey(VALUE_SERIALIZER_CLASS_CONFIG)) { + valueSerializer = createDefaultSerializer(); + } + if (config.containsKey(KEY_SERIALIZER_CLASS_CONFIG)) { return new KafkaProducer<>(config); } return new KafkaProducer<>(config, keySerializer, valueSerializer); } + + private Serializer> createDefaultSerializer() { + if (memoryMode == MemoryMode.REUSABLE_DATA) { + return new PooledSpanDataSerializer(); + } else { + return new SpanDataSerializer(); + } + } } } diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/PooledSpanDataSerializer.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/PooledSpanDataSerializer.java new file mode 100644 index 000000000..bf2e378e1 --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/PooledSpanDataSerializer.java @@ -0,0 +1,116 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka; + +import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.Deque; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedDeque; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Object pool optimized Span data serializer. + * + *

This serializer reduces memory allocations by reusing {@link + * LowAllocationTraceRequestMarshaler} instances and {@link ByteArrayOutputStream} buffers. It is + * thread-safe and supports concurrent serialization calls from Kafka. + * + *

Usage: + * + *

{@code
+ * // Enable via MemoryMode configuration
+ * KafkaSpanExporter exporter = KafkaSpanExporter.newBuilder()
+ *     .setProducer(
+ *         KafkaSpanExporterBuilder.ProducerBuilder.newInstance()
+ *             .setMemoryMode(MemoryMode.REUSABLE_DATA)
+ *             .build())
+ *     .build();
+ * }
+ */ +public final class PooledSpanDataSerializer implements Serializer> { + + private static final Logger logger = LoggerFactory.getLogger(PooledSpanDataSerializer.class); + + // Object pool: thread-safe lock-free queue + private final Deque marshalerPool = + new ConcurrentLinkedDeque<>(); + + // Thread-local ByteArrayOutputStream to avoid contention + private final ThreadLocal outputStreamHolder = + ThreadLocal.withInitial(() -> new ByteArrayOutputStream(4096)); + + // Maximum pool size to prevent unbounded growth + private static final int MAX_POOL_SIZE = 32; + + @Override + public byte[] serialize(String topic, Collection data) { + if (Objects.isNull(data)) { + throw new SerializationException("Cannot serialize null"); + } + + if (data.isEmpty()) { + return new byte[0]; + } + + // 1. Acquire marshaler from pool + LowAllocationTraceRequestMarshaler marshaler = marshalerPool.poll(); + if (marshaler == null) { + // Pool is empty, create new instance + marshaler = new LowAllocationTraceRequestMarshaler(); + } + + // 2. Get thread-local ByteArrayOutputStream + ByteArrayOutputStream baos = outputStreamHolder.get(); + baos.reset(); + + try { + // 3. Initialize and serialize (Initialize-Use pattern) + marshaler.initialize(data); + marshaler.writeBinaryTo(baos); + + // 4. Return result + return baos.toByteArray(); + + } catch (IOException e) { + throw new SerializationException("Failed to serialize span data", e); + } finally { + // 5. Reset and return marshaler to pool (Reset-Return pattern) + marshaler.reset(); + returnToPool(marshaler); + } + } + + /** + * Returns marshaler to pool, with size limit to prevent memory leaks. + * + * @param marshaler the marshaler to return + */ + private void returnToPool(LowAllocationTraceRequestMarshaler marshaler) { + if (marshalerPool.size() < MAX_POOL_SIZE) { + marshalerPool.offer(marshaler); + } else { + // Pool is full, discard the instance and let GC reclaim it + // This is a defensive strategy to prevent unbounded growth in exceptional cases + if (logger.isDebugEnabled()) { + logger.debug("Marshaler pool is full, discarding instance"); + } + } + } + + @Override + public void close() { + // Clean up resources + marshalerPool.clear(); + outputStreamHolder.remove(); + } +} diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataSerializer.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataSerializer.java index 8f31eb412..a643adadb 100644 --- a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataSerializer.java +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataSerializer.java @@ -5,17 +5,12 @@ package io.opentelemetry.contrib.kafka; -import static java.util.stream.Collectors.toList; - -import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler; +import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; -import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; -import java.util.List; import java.util.Objects; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; @@ -30,19 +25,20 @@ public byte[] serialize(String topic, Collection data) { } ExportTraceServiceRequest convertSpansToRequest(Collection spans) { - List resourceSpansList = - Arrays.stream(ResourceSpansMarshaler.create(spans)) - .map( - resourceSpansMarshaler -> { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - resourceSpansMarshaler.writeBinaryTo(baos); - return ResourceSpans.parseFrom(baos.toByteArray()); - } catch (IOException e) { - throw new SerializationException(e); - } - }) - .collect(toList()); + // Use LowAllocationTraceRequestMarshaler for more efficient conversion + // This eliminates unnecessary serialization/deserialization cycles + LowAllocationTraceRequestMarshaler marshaler = new LowAllocationTraceRequestMarshaler(); + try { + marshaler.initialize(spans); - return ExportTraceServiceRequest.newBuilder().addAllResourceSpans(resourceSpansList).build(); + // Serialize to bytes and parse back to protobuf message + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + marshaler.writeBinaryTo(baos); + return ExportTraceServiceRequest.parseFrom(baos.toByteArray()); + } catch (IOException e) { + throw new SerializationException(e); + } finally { + marshaler.reset(); + } } } diff --git a/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/PooledSpanDataSerializerTest.java b/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/PooledSpanDataSerializerTest.java new file mode 100644 index 000000000..f2b7b66a9 --- /dev/null +++ b/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/PooledSpanDataSerializerTest.java @@ -0,0 +1,176 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka; + +import static io.opentelemetry.contrib.kafka.TestUtil.makeBasicSpan; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.google.common.collect.ImmutableList; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.errors.SerializationException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class PooledSpanDataSerializerTest { + private PooledSpanDataSerializer testSubject; + + @AfterEach + void tearDown() { + if (testSubject != null) { + testSubject.close(); + } + } + + @Test + void serialize() { + testSubject = new PooledSpanDataSerializer(); + SpanData span1 = makeBasicSpan("span-1"); + SpanData span2 = makeBasicSpan("span-2"); + ImmutableList spans = ImmutableList.of(span1, span2); + + byte[] actual = testSubject.serialize("test-topic", spans); + + assertThat(actual).isNotNull(); + assertThat(actual).isNotEmpty(); + } + + @Test + void serializeEmptyData() { + testSubject = new PooledSpanDataSerializer(); + byte[] actual = testSubject.serialize("test-topic", Collections.emptySet()); + + assertThat(actual).isEmpty(); + } + + @Test + void serializeNullDataThrowsException() { + testSubject = new PooledSpanDataSerializer(); + assertThatThrownBy(() -> testSubject.serialize("test-topic", null)) + .isInstanceOf(SerializationException.class) + .hasMessage("Cannot serialize null"); + } + + @Test + void serializeMultipleTimesReusesMarshaler() { + testSubject = new PooledSpanDataSerializer(); + SpanData span1 = makeBasicSpan("span-1"); + SpanData span2 = makeBasicSpan("span-2"); + ImmutableList spans = ImmutableList.of(span1, span2); + + // Serialize multiple times to verify object pooling works + byte[] result1 = testSubject.serialize("test-topic", spans); + byte[] result2 = testSubject.serialize("test-topic", spans); + byte[] result3 = testSubject.serialize("test-topic", spans); + + // All results should be valid and identical (same input) + assertThat(result1).isNotEmpty(); + assertThat(result2).isEqualTo(result1); + assertThat(result3).isEqualTo(result1); + } + + @Test + void outputMatchesTraditionalSerializer() throws Exception { + testSubject = new PooledSpanDataSerializer(); + SpanDataSerializer traditionalSerializer = new SpanDataSerializer(); + + SpanData span1 = makeBasicSpan("span-1"); + SpanData span2 = makeBasicSpan("span-2"); + ImmutableList spans = ImmutableList.of(span1, span2); + + byte[] pooledResult = testSubject.serialize("test-topic", spans); + byte[] traditionalResult = traditionalSerializer.serialize("test-topic", spans); + + // Both should be deserializable to the same protobuf message + ExportTraceServiceRequest pooledRequest = ExportTraceServiceRequest.parseFrom(pooledResult); + ExportTraceServiceRequest traditionalRequest = + ExportTraceServiceRequest.parseFrom(traditionalResult); + + // Verify structure + assertThat(pooledRequest.getResourceSpansList()) + .hasSize(traditionalRequest.getResourceSpansList().size()); + assertThat(pooledRequest.getResourceSpans(0).getScopeSpans(0).getSpans(0).getName()) + .isEqualTo("span-1"); + assertThat(pooledRequest.getResourceSpans(0).getScopeSpans(0).getSpans(1).getName()) + .isEqualTo("span-2"); + } + + @Test + void concurrentSerializationIsThreadSafe() throws Exception { + testSubject = new PooledSpanDataSerializer(); + int numThreads = 10; + int iterationsPerThread = 100; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numThreads); + List errors = Collections.synchronizedList(new ArrayList<>()); + + for (int i = 0; i < numThreads; i++) { + int threadId = i; + var unused = + executor.submit( + () -> { + try { + startLatch.await(); // Wait for all threads to be ready + for (int j = 0; j < iterationsPerThread; j++) { + SpanData span = makeBasicSpan("thread-" + threadId + "-span-" + j); + ImmutableList spans = ImmutableList.of(span); + byte[] result = testSubject.serialize("test-topic", spans); + + // Verify result is valid + assertThat(result).isNotEmpty(); + ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(result); + assertThat(request.getResourceSpans(0).getScopeSpans(0).getSpans(0).getName()) + .isEqualTo("thread-" + threadId + "-span-" + j); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + errors.add(e); + } catch (Throwable e) { + errors.add(e); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); // Start all threads + assertThat(doneLatch.await(30, TimeUnit.SECONDS)).isTrue(); + executor.shutdown(); + + // Verify no errors occurred + if (!errors.isEmpty()) { + throw new AssertionError("Concurrent serialization failed with errors: " + errors); + } + } + + @Test + void closeReleasesResources() { + testSubject = new PooledSpanDataSerializer(); + SpanData span = makeBasicSpan("span-1"); + ImmutableList spans = ImmutableList.of(span); + + // Use the serializer + testSubject.serialize("test-topic", spans); + + // Close should not throw + testSubject.close(); + + // Can still use after close (just won't have pooled resources) + byte[] result = testSubject.serialize("test-topic", spans); + assertThat(result).isNotEmpty(); + } +}