Skip to content

Commit 1b79b46

Browse files
Copilotxrmx
andauthored
Fix flaky kafka tests by flushing producer before ending sender transactions (#2619)
* Initial plan * fix: add producer.flush() to kafka tests to prevent race conditions The producer.send() method in kafka-python is asynchronous - it queues messages to be sent by a background I/O thread. Without calling producer.flush(), messages may not be delivered to the Kafka broker before the consumer's consumer_timeout_ms=500 expires. This creates a race condition where the consumer loop exits with no items consumed, resulting in no spans being created. Added producer.flush() after producer.send() calls in all delayed_send functions to ensure messages are actually delivered to Kafka. Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/e5ad59a6-a44a-4dd2-978e-fe4844b5c198 Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com> * fix: wait for kafka send futures in flaky consumer tests Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/c8da0228-9a66-4391-a204-fe0b99b1b85b Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com> * fix: flush kafka producer before ending sender transaction Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/610642ed-3a8d-4371-bccb-485eaee1d69d Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com> Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
1 parent 6a22dfa commit 1b79b46

1 file changed

Lines changed: 6 additions & 0 deletions

File tree

tests/instrumentation/kafka_tests.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def delayed_send():
113113
elasticapm_client.begin_transaction("foo")
114114
producer.send("test", key=b"foo", value=b"bar")
115115
producer.send("test", key=b"baz", value=b"bazzinga")
116+
producer.flush()
116117
elasticapm_client.end_transaction("foo")
117118

118119
thread = threading.Thread(target=delayed_send)
@@ -139,6 +140,7 @@ def delayed_send():
139140
elasticapm_client.begin_transaction("foo")
140141
producer.send("test", key=b"foo", value=b"bar")
141142
producer.send("test", key=b"baz", value=b"bazzinga")
143+
producer.flush()
142144
elasticapm_client.end_transaction("foo")
143145

144146
thread = threading.Thread(target=delayed_send)
@@ -166,6 +168,7 @@ def delayed_send():
166168
producer.send(topic="foo", key=b"foo", value=b"bar")
167169
producer.send("bar", key=b"foo", value=b"bar")
168170
producer.send("test", key=b"foo", value=b"bar")
171+
producer.flush()
169172

170173
thread = threading.Thread(target=delayed_send)
171174
thread.start()
@@ -186,6 +189,7 @@ def delayed_send():
186189
producer.send(topic="foo", key=b"foo", value=b"bar")
187190
producer.send("bar", key=b"foo", value=b"bar")
188191
producer.send("test", key=b"foo", value=b"bar")
192+
producer.flush()
189193

190194
thread = threading.Thread(target=delayed_send)
191195
thread.start()
@@ -205,6 +209,7 @@ def delayed_send():
205209
time.sleep(0.2)
206210
producer.send("test", key=b"foo", value=b"bar")
207211
producer.send("test", key=b"baz", value=b"bazzinga")
212+
producer.flush()
208213

209214
thread = threading.Thread(target=delayed_send)
210215
thread.start()
@@ -249,6 +254,7 @@ def test_kafka_consumer_unsampled_transaction_handles_stop_iteration(
249254
def delayed_send():
250255
time.sleep(0.2)
251256
producer.send("test", key=b"foo", value=b"bar")
257+
producer.flush()
252258

253259
thread = threading.Thread(target=delayed_send)
254260
thread.start()

0 commit comments

Comments
 (0)