Skip to content

Commit edb0f19

Browse files
dpkpclaude
andauthored
Consumer: handle retriable errors in offsets_for_times (#3022)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 75c0ad7 commit edb0f19

1 file changed

Lines changed: 10 additions & 3 deletions

File tree

kafka/consumer/fetcher.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -349,15 +349,22 @@ async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None):
349349
offsets, retry = await self._manager.wait_for(future, timer.timeout_ms)
350350
except Errors.KafkaTimeoutError:
351351
break
352-
except Exception as exc:
353-
if not getattr(exc, 'retriable', False):
352+
except Errors.KafkaError as exc:
353+
if not exc.retriable:
354354
raise
355-
if getattr(exc, 'invalid_metadata', False) or self._manager.cluster.need_update:
355+
if exc.invalid_metadata or self._manager.cluster.need_update:
356356
refresh_future = self._manager.cluster.request_update()
357357
try:
358358
await self._manager.wait_for(refresh_future, timer.timeout_ms)
359359
except Errors.KafkaTimeoutError:
360360
break
361+
except Errors.KafkaError as refresh_exc:
362+
if not refresh_exc.retriable:
363+
raise
364+
delay = self.config['retry_backoff_ms'] / 1000
365+
if timer.timeout_ms is not None:
366+
delay = min(delay, timer.timeout_ms / 1000)
367+
await self._manager._net.sleep(delay)
361368
else:
362369
delay = self.config['retry_backoff_ms'] / 1000
363370
if timer.timeout_ms is not None:

0 commit comments

Comments
 (0)