@@ -336,24 +336,30 @@ class KafkaProducer:
336336 or other configuration forbids use of all the specified ciphers),
337337 an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
338338 api_version (tuple): Specify which Kafka API version to use. If set to
339- None, the client will attempt to determine the broker version via
339+ None, the client will infer the broker version from the results of
340340 ApiVersionsRequest API or, for brokers earlier than 0.10, probing
341- various known APIs. Dynamic version checking is performed eagerly
342- during __init__ and can raise KafkaTimeoutError if no connection
343- was made before timeout (see api_version_auto_timeout_ms below).
344- Different versions enable different functionality.
341+ various known APIs. Different versions enable different functionality.
345342
346343 Examples:
347- (3, 9 ) most recent broker release, enable all supported features
344+ (4, 2 ) most recent broker release, enable all supported features
348345 (0, 11) enables message format v2 (internal)
349346 (0, 10, 0) enables sasl authentication and message format v1
350- (0, 8, 0) enables basic functionality only
347+ (0, 9) enables full group coordination features with automatic
348+ partition assignment and rebalancing,
349+ (0, 8, 2) enables kafka-storage offset commits with manual
350+ partition assignment only,
351+ (0, 8, 1) enables zookeeper-storage offset commits with manual
352+ partition assignment only,
353+ (0, 8, 0) enables basic functionality but requires manual
354+ partition assignment and offset management.
351355
352356 Default: None
353- api_version_auto_timeout_ms (int): number of milliseconds to throw a
354- timeout exception from the constructor when checking the broker
355- api version. Only applies if api_version set to None.
356- Default: 2000
357+ bootstrap_timeout_ms (int): number of milliseconds to wait for first
358+ successful cluster bootstrap. If provided, an attempt to bootstrap
359+ will raise KafkaTimeoutError if it is unable to fetch cluster
360+ metadata before the configured timeout. Note that bootstrap will
361+ be called eagerly from __init__() if api_version is None.
362+ Default: 30000
357363 metric_reporters (list): A list of classes to use as metrics reporters.
358364 Implementing the AbstractMetricsReporter interface allows plugging
359365 in classes that will be notified of new metric creation. Default: []
@@ -429,7 +435,7 @@ class KafkaProducer:
429435 'ssl_password' : None ,
430436 'ssl_ciphers' : None ,
431437 'api_version' : None ,
432- 'api_version_auto_timeout_ms ' : 2000 ,
438+ 'bootstrap_timeout_ms ' : 30000 ,
433439 'metric_reporters' : [],
434440 'metrics_enabled' : True ,
435441 'metrics_num_samples' : 2 ,
@@ -506,9 +512,14 @@ def __init__(self, **configs):
506512 metrics = self ._metrics , metric_group_prefix = 'producer' ,
507513 wakeup_timeout_ms = self .config ['max_block_ms' ],
508514 ** self .config )
515+ manager = client ._manager
509516
510- # Get auto-discovered / normalized version from client
511- self .config ['api_version' ] = client .get_broker_version (timeout_ms = self .config ['api_version_auto_timeout_ms' ])
517+ # We currently depend on eager-resolution of api_version.
518+ # If it wasn't provided as a config option, we need to bootstrap
519+ # to get it.
520+ if manager .broker_version_data is None :
521+ manager .bootstrap (self .config ['bootstrap_timeout_ms' ])
522+ self .config ['api_version' ] = manager .broker_version
512523
513524 if self .config ['compression_type' ] == 'lz4' :
514525 assert self .config ['api_version' ] >= (0 , 8 , 2 ), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
@@ -525,7 +536,7 @@ def __init__(self, **configs):
525536 assert checker (), "Libraries for {} compression codec not found" .format (ct )
526537 self .config ['compression_attrs' ] = compression_attrs
527538
528- self ._metadata = client .cluster
539+ self ._metadata = manager .cluster
529540 self ._transaction_manager = None
530541 self ._init_transactions_result = None
531542
0 commit comments