Skip to content

Commit d2bad11

Browse files
Copilotxrmx
andauthored
fix: flush kafka producer before ending sender transaction
Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/610642ed-3a8d-4371-bccb-485eaee1d69d Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com>
1 parent 059906f commit d2bad11

1 file changed

Lines changed: 19 additions & 13 deletions

File tree

tests/instrumentation/kafka_tests.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,9 @@ def test_kafka_consume(instrument, elasticapm_client, producer, consumer, topics
111111
def delayed_send():
112112
time.sleep(0.2)
113113
elasticapm_client.begin_transaction("foo")
114-
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
115-
producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5)
114+
producer.send("test", key=b"foo", value=b"bar")
115+
producer.send("test", key=b"baz", value=b"bazzinga")
116+
producer.flush()
116117
elasticapm_client.end_transaction("foo")
117118

118119
thread = threading.Thread(target=delayed_send)
@@ -137,8 +138,9 @@ def test_kafka_consume_ongoing_transaction(instrument, elasticapm_client, produc
137138
def delayed_send():
138139
time.sleep(0.2)
139140
elasticapm_client.begin_transaction("foo")
140-
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
141-
producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5)
141+
producer.send("test", key=b"foo", value=b"bar")
142+
producer.send("test", key=b"baz", value=b"bazzinga")
143+
producer.flush()
142144
elasticapm_client.end_transaction("foo")
143145

144146
thread = threading.Thread(target=delayed_send)
@@ -163,9 +165,10 @@ def test_kafka_consumer_ignore_topic(instrument, elasticapm_client, producer, co
163165

164166
def delayed_send():
165167
time.sleep(0.2)
166-
producer.send(topic="foo", key=b"foo", value=b"bar").get(timeout=5)
167-
producer.send("bar", key=b"foo", value=b"bar").get(timeout=5)
168-
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
168+
producer.send(topic="foo", key=b"foo", value=b"bar")
169+
producer.send("bar", key=b"foo", value=b"bar")
170+
producer.send("test", key=b"foo", value=b"bar")
171+
producer.flush()
169172

170173
thread = threading.Thread(target=delayed_send)
171174
thread.start()
@@ -183,9 +186,10 @@ def test_kafka_consumer_ignore_topic_ongoing_transaction(instrument, elasticapm_
183186

184187
def delayed_send():
185188
time.sleep(0.2)
186-
producer.send(topic="foo", key=b"foo", value=b"bar").get(timeout=5)
187-
producer.send("bar", key=b"foo", value=b"bar").get(timeout=5)
188-
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
189+
producer.send(topic="foo", key=b"foo", value=b"bar")
190+
producer.send("bar", key=b"foo", value=b"bar")
191+
producer.send("test", key=b"foo", value=b"bar")
192+
producer.flush()
189193

190194
thread = threading.Thread(target=delayed_send)
191195
thread.start()
@@ -203,8 +207,9 @@ def delayed_send():
203207
def test_kafka_poll_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics):
204208
def delayed_send():
205209
time.sleep(0.2)
206-
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
207-
producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5)
210+
producer.send("test", key=b"foo", value=b"bar")
211+
producer.send("test", key=b"baz", value=b"bazzinga")
212+
producer.flush()
208213

209214
thread = threading.Thread(target=delayed_send)
210215
thread.start()
@@ -248,7 +253,8 @@ def test_kafka_consumer_unsampled_transaction_handles_stop_iteration(
248253
):
249254
def delayed_send():
250255
time.sleep(0.2)
251-
producer.send("test", key=b"foo", value=b"bar").get(timeout=5)
256+
producer.send("test", key=b"foo", value=b"bar")
257+
producer.flush()
252258

253259
thread = threading.Thread(target=delayed_send)
254260
thread.start()

0 commit comments

Comments
 (0)