@@ -358,8 +358,14 @@ def consumer_thread(i):
358358 assert i not in consumers
359359 assert i not in stop
360360 stop [i ] = threading .Event ()
361+ # Tight session/request timeouts so close() can't outlast the
362+ # join(timeout=5) below. request_timeout_ms must exceed
363+ # session_timeout_ms, and both must exceed heartbeat_interval_ms
364+ # (default 500 in the consumer_factory fixture).
361365 with kafka_consumer_factory (group_id = group_id ,
362366 client_id = "consumer_thread-%s" % i ,
367+ session_timeout_ms = 3000 ,
368+ request_timeout_ms = 4000 ,
363369 api_version_auto_timeout_ms = 5000 ) as c :
364370 consumers [i ] = c
365371 while not stop [i ].is_set ():
@@ -443,9 +449,14 @@ def consumer_thread(i):
443449
444450 finally :
445451 logging .info ('Shutting down %s consumers' , num_consumers )
452+ # Signal all stops first, then join. Serial stop-then-join causes the
453+ # broker to process N back-to-back rebalances (one per LeaveGroup);
454+ # parallel teardown lets all consumers close concurrently against a
455+ # single rebalance pass and keeps each join() within budget.
446456 for c in range (num_consumers ):
447457 logging .info ('Stopping consumer %s' , c )
448458 stop [c ].set ()
459+ for c in range (num_consumers ):
449460 threads [c ].join (timeout = 5 )
450461 assert not threads [c ].is_alive ()
451462 threads [c ] = None
0 commit comments