File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -138,6 +138,7 @@ def create_kafka_consumer() -> LaunchpadKafkaConsumer:
138138 consumer = arroyo_consumer ,
139139 topic = topic ,
140140 processor_factory = strategy_factory ,
141+ join_timeout = config .join_timeout_seconds , # Drop in-flight work during rebalance before Kafka times out
141142 )
142143 return LaunchpadKafkaConsumer (processor , healthcheck_path , strategy_factory )
143144
@@ -293,6 +294,7 @@ class KafkaConfig:
293294 sasl_mechanism : str | None
294295 sasl_username : str | None
295296 sasl_password : str | None
297+ join_timeout_seconds : float
296298
297299
298300def get_kafka_config () -> KafkaConfig :
@@ -327,4 +329,5 @@ def get_kafka_config() -> KafkaConfig:
327329 sasl_mechanism = os .environ .get ("KAFKA_SASL_MECHANISM" , None ),
328330 sasl_username = os .environ .get ("KAFKA_SASL_USERNAME" , None ),
329331 sasl_password = os .environ .get ("KAFKA_SASL_PASSWORD" , None ),
332+ join_timeout_seconds = float (os .getenv ("KAFKA_JOIN_TIMEOUT_SECONDS" , "10" )),
330333 )
You can’t perform that action at this time.
0 commit comments