diff --git a/tests/instrumentation/kafka_tests.py b/tests/instrumentation/kafka_tests.py index 54be2ee8e..8339ceefb 100644 --- a/tests/instrumentation/kafka_tests.py +++ b/tests/instrumentation/kafka_tests.py @@ -113,6 +113,7 @@ def delayed_send(): elasticapm_client.begin_transaction("foo") producer.send("test", key=b"foo", value=b"bar") producer.send("test", key=b"baz", value=b"bazzinga") + producer.flush() elasticapm_client.end_transaction("foo") thread = threading.Thread(target=delayed_send) @@ -139,6 +140,7 @@ def delayed_send(): elasticapm_client.begin_transaction("foo") producer.send("test", key=b"foo", value=b"bar") producer.send("test", key=b"baz", value=b"bazzinga") + producer.flush() elasticapm_client.end_transaction("foo") thread = threading.Thread(target=delayed_send) @@ -166,6 +168,7 @@ def delayed_send(): producer.send(topic="foo", key=b"foo", value=b"bar") producer.send("bar", key=b"foo", value=b"bar") producer.send("test", key=b"foo", value=b"bar") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -186,6 +189,7 @@ def delayed_send(): producer.send(topic="foo", key=b"foo", value=b"bar") producer.send("bar", key=b"foo", value=b"bar") producer.send("test", key=b"foo", value=b"bar") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -205,6 +209,7 @@ def delayed_send(): time.sleep(0.2) producer.send("test", key=b"foo", value=b"bar") producer.send("test", key=b"baz", value=b"bazzinga") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -249,6 +254,7 @@ def test_kafka_consumer_unsampled_transaction_handles_stop_iteration( def delayed_send(): time.sleep(0.2) producer.send("test", key=b"foo", value=b"bar") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start()