Skip to content

Commit 8467071

Browse files
Copilotxrmx
andauthored
fix: add producer.flush() to kafka tests to prevent race conditions
The producer.send() method in kafka-python is asynchronous - it queues messages to be sent by a background I/O thread. Without calling producer.flush(), messages may not be delivered to the Kafka broker before the consumer's consumer_timeout_ms=500 expires. This creates a race condition where the consumer loop exits with no items consumed, resulting in no spans being created. Added producer.flush() after producer.send() calls in all delayed_send functions to ensure messages are actually delivered to Kafka. Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/e5ad59a6-a44a-4dd2-978e-fe4844b5c198 Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com>
1 parent bb6f53f commit 8467071

1 file changed

Lines changed: 6 additions & 0 deletions

File tree

tests/instrumentation/kafka_tests.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ def delayed_send():
114114
producer.send("test", key=b"foo", value=b"bar")
115115
producer.send("test", key=b"baz", value=b"bazzinga")
116116
elasticapm_client.end_transaction("foo")
117+
producer.flush()
117118

118119
thread = threading.Thread(target=delayed_send)
119120
thread.start()
@@ -140,6 +141,7 @@ def delayed_send():
140141
producer.send("test", key=b"foo", value=b"bar")
141142
producer.send("test", key=b"baz", value=b"bazzinga")
142143
elasticapm_client.end_transaction("foo")
144+
producer.flush()
143145

144146
thread = threading.Thread(target=delayed_send)
145147
thread.start()
@@ -166,6 +168,7 @@ def delayed_send():
166168
producer.send(topic="foo", key=b"foo", value=b"bar")
167169
producer.send("bar", key=b"foo", value=b"bar")
168170
producer.send("test", key=b"foo", value=b"bar")
171+
producer.flush()
169172

170173
thread = threading.Thread(target=delayed_send)
171174
thread.start()
@@ -186,6 +189,7 @@ def delayed_send():
186189
producer.send(topic="foo", key=b"foo", value=b"bar")
187190
producer.send("bar", key=b"foo", value=b"bar")
188191
producer.send("test", key=b"foo", value=b"bar")
192+
producer.flush()
189193

190194
thread = threading.Thread(target=delayed_send)
191195
thread.start()
@@ -205,6 +209,7 @@ def delayed_send():
205209
time.sleep(0.2)
206210
producer.send("test", key=b"foo", value=b"bar")
207211
producer.send("test", key=b"baz", value=b"bazzinga")
212+
producer.flush()
208213

209214
thread = threading.Thread(target=delayed_send)
210215
thread.start()
@@ -249,6 +254,7 @@ def test_kafka_consumer_unsampled_transaction_handles_stop_iteration(
249254
def delayed_send():
250255
time.sleep(0.2)
251256
producer.send("test", key=b"foo", value=b"bar")
257+
producer.flush()
252258

253259
thread = threading.Thread(target=delayed_send)
254260
thread.start()

0 commit comments

Comments
 (0)