11import json
2+ import subprocess
23import time
4+ import uuid
35
46import pytest
57
68STORE = 'statestore'
79PUBSUB = 'pubsub'
810TOPIC = 'TOPIC_A'
11+ REDIS_CONTAINER = 'dapr_redis'
12+
13+
14+ def _flush_redis () -> None :
15+ """Flush the Dapr Redis instance to prevent state leaking between runs.
16+
17+ Both the state store and the pubsub component point at the same
18+ ``dapr_redis`` container (see ``tests/integration/components/``), so a
19+ previous run's ``received-*`` keys could otherwise satisfy this test's
20+ assertions even if no new message was delivered.
21+ """
22+ subprocess .run (
23+ args = ('docker' , 'exec' , REDIS_CONTAINER , 'redis-cli' , 'FLUSHDB' ),
24+ check = True ,
25+ capture_output = True ,
26+ )
927
1028
1129@pytest .fixture (scope = 'module' )
1230def client (dapr_env , apps_dir ):
31+ _flush_redis ()
1332 return dapr_env .start_sidecar (
1433 app_id = 'test-subscriber' ,
1534 grpc_port = 50001 ,
@@ -20,19 +39,20 @@ def client(dapr_env, apps_dir):
2039
2140
2241def test_published_messages_are_received_by_subscriber (client ):
42+ run_id = uuid .uuid4 ().hex
2343 for n in range (1 , 4 ):
2444 client .publish_event (
2545 pubsub_name = PUBSUB ,
2646 topic_name = TOPIC ,
27- data = json .dumps ({'id' : n , 'message' : 'hello world' }),
47+ data = json .dumps ({'run_id' : run_id , ' id' : n , 'message' : 'hello world' }),
2848 data_content_type = 'application/json' ,
2949 )
3050 time .sleep (1 )
3151
3252 time .sleep (3 )
3353
3454 for n in range (1 , 4 ):
35- state = client .get_state (store_name = STORE , key = f'received-topic-a -{ n } ' )
55+ state = client .get_state (store_name = STORE , key = f'received-{ run_id } -{ n } ' )
3656 assert state .data != b'' , f'Subscriber did not receive message { n } '
3757 msg = json .loads (state .data )
3858 assert msg ['id' ] == n
@@ -44,6 +64,6 @@ def test_publish_event_succeeds(client):
4464 client .publish_event (
4565 pubsub_name = PUBSUB ,
4666 topic_name = TOPIC ,
47- data = json .dumps ({'id' : 99 , 'message' : 'smoke test' }),
67+ data = json .dumps ({'run_id' : uuid . uuid4 (). hex , ' id' : 99 , 'message' : 'smoke test' }),
4868 data_content_type = 'application/json' ,
4969 )
0 commit comments