1717from kafka import KafkaAdminClient , KafkaConsumer , KafkaProducer
1818from kafka .admin import NewTopic
1919from kafka .consumer .fetcher import ConsumerRecord
20+ from rstream import AMQPMessage , Producer
21+ from rstream .exceptions import LeaderNotAvailable
2022
2123KAFKA_SETTINGS = {"bootstrap_servers" : "kafka:9092" }
2224MQTT_BASE_ROUTE = "mqtt://mqtt:1883?client_id=$CLIENT_ID"
@@ -403,8 +405,6 @@ def _run(self, coro):
403405 return fut .result ()
404406
405407 async def _create_stream (self ):
406- from rstream import Producer
407-
408408 self ._producer = Producer (
409409 host = RABBITMQ_HOST ,
410410 port = RABBITMQ_PORT ,
@@ -416,13 +416,48 @@ async def _create_stream(self):
416416 await self ._producer .create_stream (self .stream_name )
417417 except Exception :
418418 pass # stream may already exist
419+ await self ._wait_until_stream_ready (self .stream_name )
420+
421+ # create_stream returns before RabbitMQ elects a leader for the stream.
422+ # If a consumer attaches during that window it fails with
423+ # "Stream does not exist". Wait until the leader is available.
424+ async def _wait_until_stream_ready (self , name : str , timeout : float = 30.0 ):
425+ client = await self ._producer .default_client
426+ deadline = time .monotonic () + timeout
427+ while True :
428+ try :
429+ await client .query_leader_and_replicas (name )
430+ return
431+ except LeaderNotAvailable :
432+ if time .monotonic () >= deadline :
433+ raise
434+ await asyncio .sleep (0.1 )
419435
420436 async def _cleanup (self ):
437+ # Close the producer first so any buffered messages are flushed
438+ # while the stream still exists. Otherwise the flush that runs
439+ # inside close() races against delete_stream and fails with
440+ # StreamDoesNotExist.
441+ try :
442+ await self ._producer .close ()
443+ except Exception :
444+ pass
445+ cleanup_producer = Producer (
446+ host = RABBITMQ_HOST ,
447+ port = RABBITMQ_PORT ,
448+ username = RABBITMQ_USER ,
449+ password = RABBITMQ_PASSWORD ,
450+ )
421451 try :
422- await self ._producer .delete_stream (self .stream_name )
452+ await cleanup_producer .start ()
453+ await cleanup_producer .delete_stream (self .stream_name )
423454 except Exception :
424455 pass
425- await self ._producer .close ()
456+ finally :
457+ try :
458+ await cleanup_producer .close ()
459+ except Exception :
460+ pass
426461
427462 def create_stream (self , name : str ) -> None :
428463 self ._run (self ._create_stream_by_name (name ))
@@ -432,6 +467,7 @@ async def _create_stream_by_name(self, name: str):
432467 await self ._producer .create_stream (name )
433468 except Exception :
434469 pass
470+ await self ._wait_until_stream_ready (name )
435471
436472 def delete_stream (self , name : str ) -> None :
437473 self ._run (self ._delete_stream_by_name (name ))
@@ -446,7 +482,5 @@ def send(self, message: str) -> None:
446482 self ._run (self ._send_async (message ))
447483
448484 async def _send_async (self , message : str ) -> None :
449- from rstream import AMQPMessage
450-
451485 amqp_message = AMQPMessage (body = message .encode ())
452486 await self ._producer .send (self .stream_name , amqp_message )
0 commit comments