Skip to content

Kafka client instrumentation causes InvalidProducerEpochException with EOS in Kafka Streams #17500

@cbrown-cedar

Description

@cbrown-cedar

Describe the bug

The Kafka client instrumentation causes InvalidProducerEpochException crash loops when used with Kafka Streams applications running exactly-once semantics (EOS v2).

KafkaProducerInstrumentation wraps every send() callback with ProducerCallback, which calls producerInstrumenter().end() on the Kafka network I/O thread before invoking the original Kafka Streams callback. This overhead on the network thread delays the completion signal that Kafka's transactional state machine depends on, allowing InitProducerId to bump the producer epoch before all in-flight batches have been acknowledged. This causes the broker to reject subsequent sends from the old epoch.

Steps to reproduce

  1. Run a Kafka Streams application with processing.guarantee=exactly_once_v2
  2. Attach the OTel Java agent with default Kafka instrumentation enabled (OTEL_INSTRUMENTATION_KAFKA_ENABLED=true)
  3. Allow the application to complete state restoration and begin processing
  4. Observe InvalidProducerEpochException errors occurring within the first few seconds of processing, crashing the application

Expected behavior

The OTel agent's Kafka instrumentation is compatible with Kafka Streams EOS v2. Attaching the agent does not affect transactional producer correctness.

Actual behavior

The application crash-loops with:

org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:306)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:286)
    at io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.ProducerCallback.onCompletion(ProducerCallback.java:36)
    at org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1567)

It seems the root cause is in ProducerCallback.onCompletion(): it calls producerInstrumenter().end() on the Kafka network I/O thread before invoking the original callback. For ordinary producers this ordering is fine, but for Kafka Streams' internally-managed transactional producers it delays the RecordCollectorImpl completion signal, which can trigger Kafka Streams' error recovery path (initTransactions()) while old batches are still in flight.

Note: I believe this issue is specific to Kafka Streams EOS because Kafka Streams owns the transactional producer lifecycle and calls initTransactions() automatically as part of task management. A user-managed transactional producer would not exhibit this behavior.

Javaagent or library instrumentation version

v2.10.0

Environment

JDK: Amazon Corretto 21 (AL2023)
OS: Linux (AWS ECS Fargate, ARM64)
Kafka Streams: 3.8.0
Kafka Broker: MSK 3.8.x
Processing guarantee: exactly_once_v2

Additional context

The errors occur immediately after state restoration completes, not due to timeouts or network issues. A single instance is sufficient to reproduce. It is not a multi-instance rebalancing issue.

The kafka-streams-0.11 instrumentation already suppresses consumer-side tracing inside the stream loop via StreamThreadInstrumentation and KafkaClientsConsumerProcessTracing. The producer side has no equivalent protection.

A targeted fix would reuse the existing KafkaClientsConsumerProcessTracing ThreadLocal (already set to false inside StreamThread.runLoop()) to detect the Streams context at send() time, carry that flag into ProducerCallback, and reverse the operation order in onCompletion() when in a Streams context calling the original callback first, then ending the span. This preserves all instrumentation (spans, header propagation, error recording) while eliminating the overhead that precedes the critical Kafka callback on the network thread.

Tip

React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingneeds triageNew issue that requires triage

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions