Skip to content

Commit 2f4b81e

Browse files
authored
KIP-735: Increase default consumer session_timeout_ms from 10s to 45s (#3030)
1 parent 10a4f45 commit 2f4b81e

5 files changed

Lines changed: 66 additions & 73 deletions

File tree

kafka/consumer/group.py

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class KafkaConsumer:
166166
from the group and initiate a rebalance. Note that the value must
167167
be in the allowable range as configured in the broker configuration
168168
by group.min.session.timeout.ms and group.max.session.timeout.ms.
169-
Default: 10000
169+
Default: 45000 for brokers 3.0+, otherwise 30000.
170170
heartbeat_interval_ms (int): The expected time in milliseconds
171171
between heartbeats to the consumer coordinator when using
172172
Kafka's group management facilities. Heartbeats are used to ensure
@@ -317,7 +317,7 @@ class KafkaConsumer:
317317
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
318318
'max_poll_records': 500,
319319
'max_poll_interval_ms': 300000,
320-
'session_timeout_ms': 10000,
320+
'session_timeout_ms': 45000,
321321
'heartbeat_interval_ms': 3000,
322322
'receive_buffer_bytes': None,
323323
'send_buffer_bytes': None,
@@ -354,11 +354,7 @@ class KafkaConsumer:
354354
'socks5_proxy': None, # deprecated
355355
'kafka_client': KafkaNetClient,
356356
}
357-
# Pre-0.10.1 brokers don't separate session_timeout_ms from
358-
# max_poll_interval_ms; both default to this value when neither is
359-
# user-supplied. Kept under request_timeout_ms (30s) so the strict
360-
# request > session check below doesn't fire on the default path.
361-
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 25000
357+
DEFAULT_SESSION_TIMEOUT_MS_PRE_KIP_735 = 30000
362358

363359
def __init__(self, *topics, **configs):
364360
# Only check for extra config keys in top-level class
@@ -416,21 +412,21 @@ def __init__(self, *topics, **configs):
416412
# Coordinator configurations are different for older brokers
417413
# max_poll_interval_ms is not supported directly -- it must the be
418414
# the same as session_timeout_ms. If the user provides one of them,
419-
# use it for both. Otherwise use the old default of 30secs
420-
if self.config['api_version'] < (0, 10, 1):
421-
if 'session_timeout_ms' not in configs:
422-
if 'max_poll_interval_ms' in configs:
423-
self.config['session_timeout_ms'] = configs['max_poll_interval_ms']
424-
else:
425-
self.config['session_timeout_ms'] = self.DEFAULT_SESSION_TIMEOUT_MS_0_9
426-
if 'max_poll_interval_ms' not in configs:
427-
self.config['max_poll_interval_ms'] = self.config['session_timeout_ms']
415+
# use it for both.
416+
user_supplied_session_timeout = 'session_timeout_ms' in configs
417+
user_supplied_max_poll_interval = 'max_poll_interval_ms' in configs
428418

429-
if self.config['group_id'] is not None:
430-
if self.config['request_timeout_ms'] <= self.config['session_timeout_ms']:
431-
raise KafkaConfigurationError(
432-
"Request timeout (%s) must be larger than session timeout (%s)" %
433-
(self.config['request_timeout_ms'], self.config['session_timeout_ms']))
419+
if not user_supplied_session_timeout:
420+
if self.config['api_version'] < (0, 10, 1) and user_supplied_max_poll_interval:
421+
self.config['session_timeout_ms'] = self.config['max_poll_interval_ms']
422+
423+
elif self.config['api_version'] < (3, 0):
424+
# Prior to 3.0 the broker-side default max session timeout was 30000
425+
self.config['session_timeout_ms'] = self.DEFAULT_SESSION_TIMEOUT_MS_PRE_KIP_735
426+
427+
if not user_supplied_max_poll_interval:
428+
if self.config['api_version'] < (0, 10, 1):
429+
self.config['max_poll_interval_ms'] = self.config['session_timeout_ms']
434430

435431
if self.config['group_instance_id'] is not None:
436432
if self.config['group_id'] is None:

kafka/coordinator/base.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class BaseCoordinator(metaclass=abc.ABCMeta):
109109
DEFAULT_CONFIG = {
110110
'group_id': 'kafka-python-default-group',
111111
'group_instance_id': None,
112-
'session_timeout_ms': 10000,
112+
'session_timeout_ms': 45000,
113113
'heartbeat_interval_ms': 3000,
114114
'max_poll_interval_ms': 300000,
115115
'request_timeout_ms': 30000,
@@ -118,15 +118,31 @@ class BaseCoordinator(metaclass=abc.ABCMeta):
118118
'metrics': None,
119119
'metric_group_prefix': '',
120120
}
121+
DEFAULT_SESSION_TIMEOUT_MS_PRE_KIP_735 = 30000
121122

122123
def __init__(self, client, **configs):
123124
"""
124125
Keyword Arguments:
125126
group_id (str): name of the consumer group to join for dynamic
126127
partition assignment (if enabled), and to use for fetching and
127128
committing offsets. Default: 'kafka-python-default-group'
129+
group_instance_id (str): A unique identifier of the consumer instance
130+
provided by end user. Only non-empty strings are permitted. If set,
131+
the consumer is treated as a static member, which means that only
132+
one instance with this ID is allowed in the consumer group at any
133+
time. This can be used in combination with a larger session timeout
134+
to avoid group rebalances caused by transient unavailability (e.g.
135+
process restarts). If not set, the consumer will join the group as
136+
a dynamic member, which is the traditional behavior. Default: None
128137
session_timeout_ms (int): The timeout used to detect failures when
129-
using Kafka's group management facilities. Default: 30000
138+
using Kafka's group management facilities. The consumer sends
139+
periodic heartbeats to indicate its liveness to the broker. If
140+
no heartbeats are received by the broker before the expiration of
141+
this session timeout, then the broker will remove this consumer
142+
from the group and initiate a rebalance. Note that the value must
143+
be in the allowable range as configured in the broker configuration
144+
by group.min.session.timeout.ms and group.max.session.timeout.ms.
145+
Default: 45000 for brokers 3.0+, otherwise 30000.
130146
heartbeat_interval_ms (int): The expected time in milliseconds
131147
between heartbeats to the consumer coordinator when using
132148
Kafka's group management feature. Heartbeats are used to ensure
@@ -144,6 +160,29 @@ def __init__(self, client, **configs):
144160
if key in configs:
145161
self.config[key] = configs[key]
146162

163+
# Coordinator configurations are different for older brokers
164+
# max_poll_interval_ms is not supported directly -- it must the be
165+
# the same as session_timeout_ms. If the user provides one of them,
166+
# use it for both.
167+
user_supplied_session_timeout = 'session_timeout_ms' in configs
168+
user_supplied_max_poll_interval = 'max_poll_interval_ms' in configs
169+
170+
if not user_supplied_session_timeout:
171+
if self.config['api_version'] < (0, 10, 1) and user_supplied_max_poll_interval:
172+
self.config['session_timeout_ms'] = self.config['max_poll_interval_ms']
173+
174+
elif self.config['api_version'] < (3, 0):
175+
# Prior to 3.0 the broker-side default max session timeout was 30000
176+
self.config['session_timeout_ms'] = self.DEFAULT_SESSION_TIMEOUT_MS_PRE_KIP_735
177+
178+
if not user_supplied_max_poll_interval:
179+
if self.config['api_version'] < (0, 10, 1):
180+
self.config['max_poll_interval_ms'] = self.config['session_timeout_ms']
181+
182+
if self.config['group_instance_id'] is not None:
183+
if self.config['group_id'] is None:
184+
raise Errors.KafkaConfigurationError("group_instance_id requires group_id")
185+
147186
if self.config['api_version'] < (0, 10, 1):
148187
if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']:
149188
raise Errors.KafkaConfigurationError("Broker version %s does not support "

kafka/coordinator/consumer.py

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,16 @@
2727

2828
class ConsumerCoordinator(BaseCoordinator):
2929
"""This class manages the coordination process with the consumer coordinator."""
30-
DEFAULT_CONFIG = {
31-
'group_id': 'kafka-python-default-group',
32-
'group_instance_id': None,
30+
DEFAULT_CONFIG = BaseCoordinator.DEFAULT_CONFIG.copy()
31+
DEFAULT_CONFIG.update({
3332
'enable_auto_commit': True,
3433
'auto_commit_interval_ms': 5000,
3534
'default_offset_commit_callback': None,
3635
'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor, StickyPartitionAssignor),
37-
'session_timeout_ms': 10000,
38-
'heartbeat_interval_ms': 3000,
39-
'max_poll_interval_ms': 300000,
40-
'request_timeout_ms': 30000,
41-
'retry_backoff_ms': 100,
42-
'api_version': (0, 10, 1),
4336
'exclude_internal_topics': True,
4437
'isolation_level': 'read_uncommitted',
45-
'metrics': None,
4638
'metric_group_prefix': 'consumer'
47-
}
39+
})
4840

4941
def __init__(self, client, subscription, **configs):
5042
"""Initialize the coordination manager.
@@ -53,14 +45,6 @@ def __init__(self, client, subscription, **configs):
5345
group_id (str): name of the consumer group to join for dynamic
5446
partition assignment (if enabled), and to use for fetching and
5547
committing offsets. Default: 'kafka-python-default-group'
56-
group_instance_id (str): A unique identifier of the consumer instance
57-
provided by end user. Only non-empty strings are permitted. If set,
58-
the consumer is treated as a static member, which means that only
59-
one instance with this ID is allowed in the consumer group at any
60-
time. This can be used in combination with a larger session timeout
61-
to avoid group rebalances caused by transient unavailability (e.g.
62-
process restarts). If not set, the consumer will join the group as
63-
a dynamic member, which is the traditional behavior. Default: None
6448
enable_auto_commit (bool): If true the consumer's offset will be
6549
periodically committed in the background. Default: True.
6650
auto_commit_interval_ms (int): milliseconds between automatic
@@ -72,17 +56,6 @@ def __init__(self, client, subscription, **configs):
7256
assignors (list): List of objects to use to distribute partition
7357
ownership amongst consumer instances when group management is
7458
used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor, StickyPartitionAssignor]
75-
heartbeat_interval_ms (int): The expected time in milliseconds
76-
between heartbeats to the consumer coordinator when using
77-
Kafka's group management feature. Heartbeats are used to ensure
78-
that the consumer's session stays active and to facilitate
79-
rebalancing when new consumers join or leave the group. The
80-
value must be set lower than session_timeout_ms, but typically
81-
should be set no higher than 1/3 of that value. It can be
82-
adjusted even lower to control the expected time for normal
83-
rebalances. Default: 3000
84-
session_timeout_ms (int): The timeout used to detect failures when
85-
using Kafka's group management facilities. Default: 30000
8659
retry_backoff_ms (int): Milliseconds to backoff when retrying on
8760
errors. Default: 100.
8861
exclude_internal_topics (bool): Whether records from internal topics
@@ -121,11 +94,6 @@ def __init__(self, client, subscription, **configs):
12194
if self._use_group_apis:
12295
if not self.config['assignors']:
12396
raise Errors.KafkaConfigurationError('Coordinator requires assignors')
124-
if self.config['api_version'] < (0, 10, 1):
125-
if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']:
126-
raise Errors.KafkaConfigurationError("Broker version %s does not support "
127-
"different values for max_poll_interval_ms "
128-
"and session_timeout_ms")
12997

13098
if self.config['enable_auto_commit']:
13199
if not self._use_offset_apis:
@@ -264,7 +232,7 @@ def _lookup_assignor(self, name):
264232
# warning. Sync listeners on the IO loop will block heartbeats while
265233
# they run; even async ones delay rebalance progress. 1s is a soft
266234
# ceiling: well below default heartbeat_interval_ms (3s) and
267-
# session_timeout_ms (10s).
235+
# session_timeout_ms (45s).
268236
_REBALANCE_LISTENER_WARN_SECS = 1.0
269237

270238
async def _invoke_rebalance_listener_async(self, method_name, arg):

test/consumer/test_consumer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
from kafka.errors import KafkaConfigurationError, IllegalStateError
55

66

7-
def test_session_timeout_larger_than_request_timeout_raises():
7+
def test_session_timeout_different_from_max_poll_timeout_raises():
88
with pytest.raises(KafkaConfigurationError):
9-
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000)
9+
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, max_poll_timeout_ms=40000)
1010

1111

1212
def test_fetch_max_wait_larger_than_request_timeout_raises():

test/consumer/test_coordinator.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@
3434
def coordinator(broker, client, metrics):
3535
coord = ConsumerCoordinator(client, SubscriptionState(),
3636
metrics=metrics,
37-
api_version=broker.broker_version,
38-
max_poll_interval_ms=300000 if broker.broker_version >= (0, 10, 1) else 10000,
39-
session_timeout_ms=10000)
37+
api_version=broker.broker_version)
4038
try:
4139
yield coord
4240
finally:
@@ -560,8 +558,6 @@ def test_maybe_auto_commit_offsets_sync(mocker, client, api_version, group_id, e
560558
mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
561559
coordinator = ConsumerCoordinator(client, SubscriptionState(),
562560
api_version=api_version,
563-
session_timeout_ms=30000,
564-
max_poll_interval_ms=30000,
565561
enable_auto_commit=enable,
566562
group_id=group_id)
567563
commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync',
@@ -864,9 +860,7 @@ def test_send_offset_fetch_request_sets_require_stable(
864860
coord = ConsumerCoordinator(client, SubscriptionState(),
865861
metrics=metrics,
866862
api_version=broker.broker_version,
867-
isolation_level=isolation_level,
868-
max_poll_interval_ms=300000,
869-
session_timeout_ms=10000)
863+
isolation_level=isolation_level)
870864
try:
871865
client._manager.bootstrap(timeout_ms=5000)
872866
coord._subscription.subscribe(topics=['foobar'])
@@ -1571,8 +1565,6 @@ def _cooperative_coordinator(client, metrics):
15711565
client, SubscriptionState(),
15721566
metrics=metrics,
15731567
api_version=(2, 4),
1574-
max_poll_interval_ms=300000,
1575-
session_timeout_ms=10000,
15761568
assignors=(CooperativeStickyAssignor,))
15771569

15781570

@@ -1605,8 +1597,6 @@ def test_rejects_mixed_protocols(self, client, metrics):
16051597
client, SubscriptionState(),
16061598
metrics=metrics,
16071599
api_version=(2, 4),
1608-
max_poll_interval_ms=300000,
1609-
session_timeout_ms=10000,
16101600
assignors=(RangePartitionAssignor, CooperativeStickyAssignor))
16111601

16121602

0 commit comments

Comments
 (0)