Skip to content

Commit 059906f

Browse files
Copilotxrmx
andauthored
fix: wait for kafka send futures in flaky consumer tests
Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/c8da0228-9a66-4391-a204-fe0b99b1b85b Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com>
1 parent 8467071 commit 059906f

1 file changed

Lines changed: 13 additions & 19 deletions

File tree

tests/instrumentation/kafka_tests.py

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,9 @@ def test_kafka_consume(instrument, elasticapm_client, producer, consumer, topics
111111
def delayed_send():
112112
time.sleep(0.2)
113113
elasticapm_client.begin_transaction("foo")
114-
producer.send("test", key=b"foo", value=b"bar")
115-
producer.send("test", key=b"baz", value=b"bazzinga")
114+
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
115+
producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5)
116116
elasticapm_client.end_transaction("foo")
117-
producer.flush()
118117

119118
thread = threading.Thread(target=delayed_send)
120119
thread.start()
@@ -138,10 +137,9 @@ def test_kafka_consume_ongoing_transaction(instrument, elasticapm_client, produc
138137
def delayed_send():
139138
time.sleep(0.2)
140139
elasticapm_client.begin_transaction("foo")
141-
producer.send("test", key=b"foo", value=b"bar")
142-
producer.send("test", key=b"baz", value=b"bazzinga")
140+
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
141+
producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5)
143142
elasticapm_client.end_transaction("foo")
144-
producer.flush()
145143

146144
thread = threading.Thread(target=delayed_send)
147145
thread.start()
@@ -165,10 +163,9 @@ def test_kafka_consumer_ignore_topic(instrument, elasticapm_client, producer, co
165163

166164
def delayed_send():
167165
time.sleep(0.2)
168-
producer.send(topic="foo", key=b"foo", value=b"bar")
169-
producer.send("bar", key=b"foo", value=b"bar")
170-
producer.send("test", key=b"foo", value=b"bar")
171-
producer.flush()
166+
producer.send(topic="foo", key=b"foo", value=b"bar").get(timeout=5)
167+
producer.send("bar", key=b"foo", value=b"bar").get(timeout=5)
168+
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
172169

173170
thread = threading.Thread(target=delayed_send)
174171
thread.start()
@@ -186,10 +183,9 @@ def test_kafka_consumer_ignore_topic_ongoing_transaction(instrument, elasticapm_
186183

187184
def delayed_send():
188185
time.sleep(0.2)
189-
producer.send(topic="foo", key=b"foo", value=b"bar")
190-
producer.send("bar", key=b"foo", value=b"bar")
191-
producer.send("test", key=b"foo", value=b"bar")
192-
producer.flush()
186+
producer.send(topic="foo", key=b"foo", value=b"bar").get(timeout=5)
187+
producer.send("bar", key=b"foo", value=b"bar").get(timeout=5)
188+
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
193189

194190
thread = threading.Thread(target=delayed_send)
195191
thread.start()
@@ -207,9 +203,8 @@ def delayed_send():
207203
def test_kafka_poll_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics):
208204
def delayed_send():
209205
time.sleep(0.2)
210-
producer.send("test", key=b"foo", value=b"bar")
211-
producer.send("test", key=b"baz", value=b"bazzinga")
212-
producer.flush()
206+
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
207+
producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5)
213208

214209
thread = threading.Thread(target=delayed_send)
215210
thread.start()
@@ -253,8 +248,7 @@ def test_kafka_consumer_unsampled_transaction_handles_stop_iteration(
253248
):
254249
def delayed_send():
255250
time.sleep(0.2)
256-
producer.send("test", key=b"foo", value=b"bar")
257-
producer.flush()
251+
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
258252

259253
thread = threading.Thread(target=delayed_send)
260254
thread.start()

0 commit comments

Comments
 (0)