Skip to content

Commit daede24

Browse files
dpkpclaude
andauthored
Consumer: sleep in poll() if timeout, no records, and no fetchable partitions (#3039)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent f05c6db commit daede24

4 files changed

Lines changed: 138 additions & 6 deletions

File tree

kafka/consumer/fetcher.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,12 @@ def fetch_records(self, max_records=None, update_offsets=True, timeout_ms=None):
173173
Only applies when no records are immediately available.
174174
175175
Returns:
176-
dict[TopicPartition, list[ConsumerRecord]]: records grouped by
177-
partition; may be empty if no records arrived in the budget.
176+
tuple[dict[TopicPartition, list[ConsumerRecord]], bool]:
177+
``(records, idle)``. ``idle`` is True when there were no
178+
buffered records, no in-flight fetches, and no pending
179+
offset-reset task -- i.e. nothing this fetcher could wait
180+
on. Callers in that state should sleep before retrying
181+
instead of busy-looping.
178182
"""
179183
# Drain whatever's already buffered from prior fetch responses.
180184
records, partial = self.fetched_records(
@@ -184,7 +188,7 @@ def fetch_records(self, max_records=None, update_offsets=True, timeout_ms=None):
184188
self.send_fetches()
185189

186190
if records:
187-
return records
191+
return records, False
188192

189193
# No records yet. Block until either an in-flight fetch
190194
# completes (records may have arrived) or a pending offset-reset
@@ -196,7 +200,7 @@ def fetch_records(self, max_records=None, update_offsets=True, timeout_ms=None):
196200
if self._reset_task is not None and not self._reset_task.is_done:
197201
waited_on.append(self._reset_task)
198202
if not waited_on:
199-
return records # nothing pending; nothing to wait for
203+
return records, True # nothing pending; caller should sleep
200204

201205
wakeup = Future()
202206
def _wake(_):
@@ -212,7 +216,7 @@ def _wake(_):
212216

213217
records, _ = self.fetched_records(
214218
max_records, update_offsets=update_offsets)
215-
return records
219+
return records, False
216220

217221
def send_fetches(self):
218222
"""Send FetchRequests for all assigned partitions that do not already have

kafka/consumer/group.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -813,8 +813,16 @@ def _poll_once(self, timer, max_records, update_offsets=True):
813813
if self.config['group_id'] is not None:
814814
poll_timeout_ms = min(poll_timeout_ms, self._coordinator.time_to_next_poll() * 1000)
815815

816-
return self._fetcher.fetch_records(
816+
records, idle = self._fetcher.fetch_records(
817817
max_records, update_offsets=update_offsets, timeout_ms=poll_timeout_ms)
818+
if not records and idle and poll_timeout_ms > 0:
819+
# Nothing fetchable and nothing in flight (no assignment, all
820+
# paused, or no resolvable positions). Sleep up to poll_timeout_ms
821+
# to avoid busy-looping; poll_timeout_ms is already capped by the
822+
# coordinator's next-action deadline.
823+
poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms'])
824+
time.sleep(poll_timeout_ms / 1000)
825+
return records
818826

819827
def position(self, partition, timeout_ms=None):
820828
"""Get the offset of the next record that will be fetched

test/consumer/test_consumer.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from kafka import ConsumerGroupMetadata, KafkaConsumer, TopicPartition
44
from kafka.errors import KafkaConfigurationError, IllegalStateError
5+
from kafka.util import Timer
56

67

78
def test_session_timeout_different_from_max_poll_timeout_raises():
@@ -92,3 +93,69 @@ def test_group_metadata_with_group_id_delegates_to_coordinator():
9293
assert gm.generation_id == -1
9394
assert gm.member_id == ''
9495
consumer.close()
96+
97+
98+
def _stub_poll_path(consumer, mocker, fetch_return):
99+
"""Patch out the bits of _poll_once we don't care about so the test can
100+
focus on the fetch -> sleep handoff."""
101+
mocker.patch.object(consumer._coordinator, 'poll', return_value=True)
102+
mocker.patch.object(consumer, '_refresh_committed_offsets')
103+
mocker.patch.object(consumer._fetcher, 'reset_offsets_if_needed')
104+
mocker.patch.object(consumer._fetcher, 'maybe_validate_positions')
105+
mocker.patch.object(consumer._fetcher, 'validate_offsets_if_needed')
106+
mocker.patch.object(consumer._fetcher, 'fetch_records',
107+
return_value=fetch_return)
108+
109+
110+
def test_poll_once_sleeps_when_fetcher_idle(mocker):
111+
"""If the fetcher reports it has no work pending, _poll_once sleeps up to
112+
poll_timeout_ms before returning - otherwise consumer.poll() busy-loops
113+
on no-fetchable-partition consumers."""
114+
consumer = KafkaConsumer(api_version=(0, 10, 0))
115+
try:
116+
_stub_poll_path(consumer, mocker, fetch_return=({}, True))
117+
sleep = mocker.patch('kafka.consumer.group.time.sleep')
118+
119+
records = consumer._poll_once(Timer(1000), max_records=100)
120+
121+
assert records == {}
122+
sleep.assert_called_once()
123+
slept_secs = sleep.call_args[0][0]
124+
# poll_timeout_ms is uncapped for no-group consumers, so we sleep
125+
# roughly the full Timer budget (1.0s).
126+
assert 0 < slept_secs <= 1.0
127+
finally:
128+
consumer.close()
129+
130+
131+
def test_poll_once_does_not_sleep_when_records_returned(mocker):
132+
"""fetch_records returned records - no sleep, caller can return them."""
133+
consumer = KafkaConsumer(api_version=(0, 10, 0))
134+
try:
135+
tp_records = {TopicPartition('foo', 0): [object()]}
136+
_stub_poll_path(consumer, mocker, fetch_return=(tp_records, False))
137+
sleep = mocker.patch('kafka.consumer.group.time.sleep')
138+
139+
records = consumer._poll_once(Timer(1000), max_records=100)
140+
141+
assert records is tp_records
142+
sleep.assert_not_called()
143+
finally:
144+
consumer.close()
145+
146+
147+
def test_poll_once_does_not_sleep_when_fetcher_waited_but_empty(mocker):
148+
"""fetch_records returned ({}, False) - it waited on in-flight work and
149+
got nothing. Don't sleep; the next loop iteration will check the
150+
completed fetches without delay."""
151+
consumer = KafkaConsumer(api_version=(0, 10, 0))
152+
try:
153+
_stub_poll_path(consumer, mocker, fetch_return=({}, False))
154+
sleep = mocker.patch('kafka.consumer.group.time.sleep')
155+
156+
records = consumer._poll_once(Timer(1000), max_records=100)
157+
158+
assert records == {}
159+
sleep.assert_not_called()
160+
finally:
161+
consumer.close()

test/consumer/test_fetcher.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,59 @@ def test_fetched_records(fetcher, topic, mocker):
521521
assert partial is False
522522

523523

524+
def test_fetch_records_idle_when_nothing_pending(fetcher, mocker):
525+
"""No buffered records, no in-flight fetches, no reset task -> idle=True
526+
so the caller (KafkaConsumer._poll_once) can sleep instead of
527+
busy-looping."""
528+
# Suppress fetch dispatch: simulate "nothing fetchable" (paused, no
529+
# assignment, etc.) without going through the manager.
530+
mocker.patch.object(fetcher, 'send_fetches', return_value=None)
531+
assert not fetcher._fetch_futures
532+
assert fetcher._reset_task is None
533+
534+
records, idle = fetcher.fetch_records(timeout_ms=0)
535+
assert records == {}
536+
assert idle is True
537+
538+
539+
def test_fetch_records_not_idle_when_records_buffered(fetcher, topic):
540+
"""Buffered records short-circuit before the no-work check; idle=False."""
541+
fetcher.config['check_crcs'] = False
542+
tp = TopicPartition(topic, 0)
543+
fetcher._completed_fetches.append(
544+
_build_completed_fetch(tp, [(None, b'foo', None)]))
545+
546+
records, idle = fetcher.fetch_records(timeout_ms=0)
547+
assert tp in records
548+
assert idle is False
549+
550+
551+
def test_fetch_records_not_idle_when_inflight_fetch_pending(fetcher, mocker):
552+
"""An in-flight fetch future means there's work to wait on; idle=False
553+
even if the wait times out with no records."""
554+
pending = Future()
555+
fetcher._fetch_futures.append(pending)
556+
mocker.patch.object(fetcher, 'send_fetches', return_value=None)
557+
# Don't actually block on the wakeup - just return as if the wait
558+
# timed out.
559+
mocker.patch.object(fetcher._net, 'run', return_value=None)
560+
561+
records, idle = fetcher.fetch_records(timeout_ms=0)
562+
assert records == {}
563+
assert idle is False
564+
565+
566+
def test_fetch_records_not_idle_when_reset_task_pending(fetcher, mocker):
567+
"""An in-flight offset-reset task counts as pending work; idle=False."""
568+
fetcher._reset_task = Future()
569+
mocker.patch.object(fetcher, 'send_fetches', return_value=None)
570+
mocker.patch.object(fetcher._net, 'run', return_value=None)
571+
572+
records, idle = fetcher.fetch_records(timeout_ms=0)
573+
assert records == {}
574+
assert idle is False
575+
576+
524577
@pytest.mark.parametrize(("fetch_offsets", "fetch_response", "num_partitions"), [
525578
(
526579
{TopicPartition('foo', 0): 0},

0 commit comments

Comments
 (0)