Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tests/instrumentation/kafka_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def delayed_send():
producer.send("test", key=b"foo", value=b"bar")
producer.send("test", key=b"baz", value=b"bazzinga")
elasticapm_client.end_transaction("foo")
producer.flush()

thread = threading.Thread(target=delayed_send)
thread.start()
Expand All @@ -140,6 +141,7 @@ def delayed_send():
producer.send("test", key=b"foo", value=b"bar")
producer.send("test", key=b"baz", value=b"bazzinga")
elasticapm_client.end_transaction("foo")
producer.flush()

thread = threading.Thread(target=delayed_send)
thread.start()
Expand All @@ -166,6 +168,7 @@ def delayed_send():
producer.send(topic="foo", key=b"foo", value=b"bar")
producer.send("bar", key=b"foo", value=b"bar")
producer.send("test", key=b"foo", value=b"bar")
producer.flush()

thread = threading.Thread(target=delayed_send)
thread.start()
Expand All @@ -186,6 +189,7 @@ def delayed_send():
producer.send(topic="foo", key=b"foo", value=b"bar")
producer.send("bar", key=b"foo", value=b"bar")
producer.send("test", key=b"foo", value=b"bar")
producer.flush()

thread = threading.Thread(target=delayed_send)
thread.start()
Expand All @@ -205,6 +209,7 @@ def delayed_send():
time.sleep(0.2)
producer.send("test", key=b"foo", value=b"bar")
producer.send("test", key=b"baz", value=b"bazzinga")
producer.flush()

thread = threading.Thread(target=delayed_send)
thread.start()
Expand Down Expand Up @@ -249,6 +254,7 @@ def test_kafka_consumer_unsampled_transaction_handles_stop_iteration(
def delayed_send():
time.sleep(0.2)
producer.send("test", key=b"foo", value=b"bar")
producer.flush()

thread = threading.Thread(target=delayed_send)
thread.start()
Expand Down
Loading