|
18 | 18 | from kafka.admin import NewTopic |
19 | 19 | from kafka.consumer.fetcher import ConsumerRecord |
20 | 20 | from rstream import AMQPMessage, Producer |
21 | | -from rstream.exceptions import LeaderNotAvailable |
| 21 | +from rstream.exceptions import ( |
| 22 | + LeaderNotAvailable, |
| 23 | + StreamDoesNotExist, |
| 24 | + StreamNotAvailable, |
| 25 | +) |
22 | 26 |
|
23 | 27 | KAFKA_SETTINGS = {"bootstrap_servers": "kafka:9092"} |
24 | 28 | MQTT_BASE_ROUTE = "mqtt://mqtt:1883?client_id=$CLIENT_ID" |
@@ -418,17 +422,26 @@ async def _create_stream(self): |
418 | 422 | pass # stream may already exist |
419 | 423 | await self._wait_until_stream_ready(self.stream_name) |
420 | 424 |
|
421 | | - # create_stream returns before RabbitMQ elects a leader for the stream. |
422 | | - # If a consumer attaches during that window it fails with |
423 | | - # "Stream does not exist". Wait until the leader is available. |
| 425 | + # create_stream returns before RabbitMQ has finished propagating the |
| 426 | + # stream's metadata and electing a leader. During that window, querying |
| 427 | + # the stream can yield any of: |
| 428 | + # * StreamDoesNotExist — metadata not yet replicated to the broker |
| 429 | + # we're talking to (more likely on a multi-node cluster, but does |
| 430 | + # occur on a single node under load too); |
| 431 | + # * StreamNotAvailable — metadata exists, but the stream's underlying |
| 432 | + # resources aren't ready (response_code 6, leader_ref=65535); |
| 433 | + # * LeaderNotAvailable — leader election in progress. |
| 434 | + # All three are transient. Retry until one of them clears or the |
| 435 | + # timeout fires. Without this, the rabbitmq fixture errors at setup |
| 436 | + # under heavy CI load. |
424 | 437 | async def _wait_until_stream_ready(self, name: str, timeout: float = 30.0): |
425 | 438 | client = await self._producer.default_client |
426 | 439 | deadline = time.monotonic() + timeout |
427 | 440 | while True: |
428 | 441 | try: |
429 | 442 | await client.query_leader_and_replicas(name) |
430 | 443 | return |
431 | | - except LeaderNotAvailable: |
| 444 | + except (StreamDoesNotExist, StreamNotAvailable, LeaderNotAvailable): |
432 | 445 | if time.monotonic() >= deadline: |
433 | 446 | raise |
434 | 447 | await asyncio.sleep(0.1) |
|
0 commit comments