Fix flaky kafka tests by flushing producer before ending sender transactions#2619
Fix flaky kafka tests by flushing producer before ending sender transactions#2619
Conversation
The producer.send() method in kafka-python is asynchronous - it queues messages to be sent by a background I/O thread. Without calling producer.flush(), messages may not be delivered to the Kafka broker before the consumer's consumer_timeout_ms=500 expires. This creates a race condition where the consumer loop exits with no items consumed, resulting in no spans being created. Added producer.flush() after producer.send() calls in all delayed_send functions to ensure messages are actually delivered to Kafka. Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/e5ad59a6-a44a-4dd2-978e-fe4844b5c198 Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com>
|
@copilot please take a look at the failure and update the PR with a fix |
Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/c8da0228-9a66-4391-a204-fe0b99b1b85b Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com>
I investigated the matrix failure and updated the PR in |
|
@copilot what about calling flush before closing the transaction instead, could that have the same effect instead of waiting for every send? Am I misunderstanding? |
|
@copilot please investigate on calling flush before closing the transaction instead of waiting for every send |
Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/610642ed-3a8d-4371-bccb-485eaee1d69d Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com>
Yes — in the tests that open and close a producer-side transaction inside
I investigated that approach and updated the PR in |
producer.send()in kafka-python is async — messages are queued to a background I/O thread. The consumer has a 500ms timeout. Without flushing the producer, messages may not reach the broker before the consumer gives up, resulting in zero items consumed and zero spans.A matrix run showed that flushing after ending the sender-side transaction could change timing enough to break
test_kafka_consumeunderpython-3.9 / kafka-python-newest.delayed_sendfunctions across kafka tests to use a singleproducer.flush()producer.flush()to run beforeelasticapm_client.end_transaction("foo")