Skip to content

Commit 34d8f7b

Browse files
dpkpclaude
andauthored
KIP-429: Add on_partitions_lost hook (#3016)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 7417a5d commit 34d8f7b

4 files changed

Lines changed: 308 additions & 3 deletions

File tree

kafka/consumer/subscription_state.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,26 @@ def on_partitions_assigned(self, assigned):
770770
"""
771771
pass
772772

773+
def on_partitions_lost(self, lost):
774+
"""KIP-429: called when the consumer has been forcibly removed
775+
from the group (heartbeat session expiry, ``UnknownMemberIdError``,
776+
``IllegalGenerationError``, ``FencedInstanceIdError``) and the
777+
partitions cannot be cleanly committed. ``on_partitions_revoked``
778+
implies the user *can* still commit; ``on_partitions_lost`` makes
779+
explicit that the member has been booted and any in-flight state
780+
for these partitions should be discarded.
781+
782+
Default behaviour is to delegate to ``on_partitions_revoked`` so
783+
listeners written before KIP-429 keep working unchanged. Override
784+
for cleanup that is specific to the forced-eviction case (e.g.
785+
skipping a commit attempt that will fail anyway).
786+
787+
Arguments:
788+
lost (set of TopicPartition): the partitions that were
789+
assigned but have been lost due to forced eviction.
790+
"""
791+
return self.on_partitions_revoked(lost)
792+
773793

774794
class AsyncConsumerRebalanceListener(metaclass=abc.ABCMeta):
775795
"""
@@ -814,3 +834,15 @@ async def on_partitions_assigned(self, assigned):
814834
assigned).
815835
"""
816836
pass
837+
838+
async def on_partitions_lost(self, lost):
839+
"""Async variant of
840+
:meth:`ConsumerRebalanceListener.on_partitions_lost`. Default
841+
implementation awaits ``on_partitions_revoked`` for backward
842+
compatibility with listeners written before KIP-429.
843+
844+
Arguments:
845+
lost (set of TopicPartition): the partitions that were
846+
assigned but have been lost due to forced eviction.
847+
"""
848+
await self.on_partitions_revoked(lost)

kafka/coordinator/base.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,18 @@ def has_member_id(self):
4242
"""
4343
return self.member_id != UNKNOWN_MEMBER_ID
4444

45+
def is_lost(self):
46+
"""True if this generation is effectively the no-generation
47+
sentinel - either the generation_id has been cleared
48+
(DEFAULT_GENERATION_ID) or the member_id has been cleared
49+
(UNKNOWN_MEMBER_ID). Mirrors Java's NO_GENERATION-or-empty-memberId
50+
check in ConsumerCoordinator.onJoinPrepare; used to fire
51+
on_partitions_lost (KIP-429) instead of on_partitions_revoked
52+
when the broker has forcibly removed us from the group.
53+
"""
54+
return (self.generation_id == DEFAULT_GENERATION_ID
55+
or not self.has_member_id())
56+
4557
def __eq__(self, other):
4658
return (self.generation_id == other.generation_id and
4759
self.member_id == other.member_id and
@@ -864,7 +876,14 @@ def rebalance_in_progress(self):
864876
return self.state is MemberState.REBALANCING
865877

866878
def reset_generation(self, member_id=UNKNOWN_MEMBER_ID):
867-
"""Reset the generation and member_id because we have fallen out of the group."""
879+
"""Reset the generation and member_id because we have fallen out of the group.
880+
881+
Arguments:
882+
member_id (str): new local member id to record. Defaults to
883+
``UNKNOWN_MEMBER_ID``. The broker hands back a real member id
884+
on a ``MemberIdRequiredError`` retry; that path passes the
885+
broker-returned id through here.
886+
"""
868887
with self._lock:
869888
self._generation = Generation(DEFAULT_GENERATION_ID, member_id, None)
870889
self.rejoin_needed = True

kafka/coordinator/consumer.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,27 @@ def _perform_assignment(self, leader_id, protocol_name, members):
488488
return group_assignment
489489

490490
async def _on_join_prepare_async(self, generation, member_id, timeout_ms=None):
491+
if self._generation.is_lost():
492+
lost = set(self._subscription.assigned_partitions())
493+
if lost:
494+
log.info("Group %s lost membership; forcibly revoking %s",
495+
self.group_id, lost)
496+
if self._subscription.rebalance_listener:
497+
try:
498+
await self._invoke_rebalance_listener_async(
499+
'on_partitions_lost', lost)
500+
except Exception:
501+
log.exception("User provided subscription rebalance listener %s"
502+
" for group %s failed on_partitions_lost",
503+
self._subscription.rebalance_listener, self.group_id)
504+
self._subscription.assign_from_subscribed([])
505+
self._is_leader = False
506+
self._subscription.reset_group_subscription()
507+
return
508+
# else: generation is lost but we have no partitions to
509+
# lose - this is the initial-join case. Fall through to the
510+
# normal auto-commit + EAGER/COOPERATIVE path.
511+
491512
# commit offsets prior to rebalance if auto-commit enabled
492513
if self.config['enable_auto_commit']:
493514
try:

test/consumer/test_coordinator.py

Lines changed: 235 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1477,7 +1477,10 @@ def test_eager_revokes_everything(self, mocker, coordinator):
14771477
coordinator._subscription.subscribe(topics=['foobar'], listener=listener)
14781478
coordinator._subscription.assign_from_subscribed(
14791479
[TopicPartition('foobar', 0), TopicPartition('foobar', 1)])
1480-
coordinator._manager.run(coordinator._on_join_prepare_async, 0, 'member-foo')
1480+
# Real generation - otherwise is_lost() trips and the lost
1481+
# branch fires on_partitions_lost instead.
1482+
coordinator._generation = Generation(42, 'member-foo', 'range')
1483+
coordinator._manager.run(coordinator._on_join_prepare_async, 42, 'member-foo')
14811484
listener.on_partitions_revoked.assert_called_once_with(
14821485
{TopicPartition('foobar', 0), TopicPartition('foobar', 1)})
14831486

@@ -1489,7 +1492,10 @@ def test_cooperative_skips_global_revoke(self, mocker, client, metrics):
14891492
coord._subscription.subscribe(topics=['foobar'], listener=listener)
14901493
coord._subscription.assign_from_subscribed(
14911494
[TopicPartition('foobar', 0), TopicPartition('foobar', 1)])
1492-
coord._manager.run(coord._on_join_prepare_async, 0, 'member-foo')
1495+
# Real generation - otherwise is_lost() trips and the lost
1496+
# branch fires on_partitions_lost instead.
1497+
coord._generation = Generation(42, 'member-foo', 'cooperative-sticky')
1498+
coord._manager.run(coord._on_join_prepare_async, 42, 'member-foo')
14931499
# KIP-429: no global on_partitions_revoked in the prepare
14941500
# phase - individual partitions are revoked later in
14951501
# _on_join_complete based on the diff.
@@ -1674,6 +1680,233 @@ def test_cooperative_assignment_for_unsubscribed_topic_bails(
16741680
coord.close(timeout_ms=0)
16751681

16761682

1683+
class TestKip429OnPartitionsLost:
1684+
"""When the broker forcibly removes the member (heartbeat /
1685+
commit / sync UnknownMemberId, IllegalGeneration, or fenced
1686+
instance) the next rebalance must surface on_partitions_lost
1687+
instead of on_partitions_revoked - the prior commit attempts
1688+
have already failed, so the user can't safely commit on the
1689+
way out."""
1690+
1691+
def test_default_listener_falls_through_to_revoked(self):
1692+
"""ConsumerRebalanceListener.on_partitions_lost defaults to
1693+
on_partitions_revoked so listeners written before KIP-429
1694+
keep working."""
1695+
calls = []
1696+
1697+
class OldListener(ConsumerRebalanceListener):
1698+
def on_partitions_revoked(self, revoked):
1699+
calls.append(('revoked', revoked))
1700+
def on_partitions_assigned(self, assigned):
1701+
calls.append(('assigned', assigned))
1702+
1703+
lost = {TopicPartition('t', 0)}
1704+
OldListener().on_partitions_lost(lost)
1705+
assert calls == [('revoked', lost)]
1706+
1707+
def test_async_default_listener_falls_through_to_revoked(self, coordinator):
1708+
"""Same default for the async listener: on_partitions_lost
1709+
awaits on_partitions_revoked."""
1710+
calls = []
1711+
1712+
class OldAsync(AsyncConsumerRebalanceListener):
1713+
async def on_partitions_revoked(self, revoked):
1714+
calls.append(('revoked', revoked))
1715+
async def on_partitions_assigned(self, assigned):
1716+
calls.append(('assigned', assigned))
1717+
1718+
lost = {TopicPartition('t', 0)}
1719+
coordinator._manager.run(OldAsync().on_partitions_lost, lost)
1720+
assert calls == [('revoked', lost)]
1721+
1722+
def test_generation_is_lost(self):
1723+
"""Generation.is_lost() mirrors Java's NO_GENERATION-or-empty-memberId
1724+
check used by ConsumerCoordinator.onJoinPrepare for KIP-429."""
1725+
from kafka.coordinator.base import DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID
1726+
# The sentinel itself is lost by construction.
1727+
assert Generation.NO_GENERATION.is_lost() is True
1728+
# A real generation is not lost.
1729+
assert Generation(42, 'mbr-1', 'range').is_lost() is False
1730+
# MemberIdRequiredError retry shape: real member id but generation
1731+
# not yet assigned. Java's OR-check trips this; lost branch is a
1732+
# no-op because assigned_partitions is empty at this point.
1733+
assert Generation(DEFAULT_GENERATION_ID, 'mbr-1', None).is_lost() is True
1734+
# Defensive: a real generation_id with a cleared member_id - shouldn't
1735+
# happen in practice but the check should still trip.
1736+
assert Generation(42, UNKNOWN_MEMBER_ID, 'range').is_lost() is True
1737+
1738+
def test_reset_generation_marks_generation_lost(self, coordinator):
1739+
"""reset_generation() always leaves the live Generation in a
1740+
lost state; ConsumerCoordinator reads that in
1741+
_on_join_prepare_async to fire on_partitions_lost."""
1742+
coordinator._generation = Generation(42, 'mbr-1', 'range')
1743+
assert coordinator._generation.is_lost() is False
1744+
coordinator.reset_generation()
1745+
assert coordinator._generation.is_lost() is True
1746+
1747+
def test_on_join_prepare_fires_lost_and_clears_assignment(
1748+
self, mocker, coordinator):
1749+
"""After a forced eviction, _on_join_prepare_async invokes
1750+
on_partitions_lost with the prior assignment, clears local
1751+
assignment, and skips the eager on_partitions_revoked path."""
1752+
coordinator.config['enable_auto_commit'] = False
1753+
listener = mocker.MagicMock(spec=ConsumerRebalanceListener)
1754+
coordinator._subscription.subscribe(topics=['t'], listener=listener)
1755+
coordinator._subscription.assign_from_subscribed([
1756+
TopicPartition('t', 0), TopicPartition('t', 1)])
1757+
# Simulate the broker booting us.
1758+
coordinator.reset_generation()
1759+
assert coordinator._generation.is_lost() is True
1760+
1761+
coordinator._manager.run(
1762+
coordinator._on_join_prepare_async, 0, 'member-foo')
1763+
1764+
listener.on_partitions_lost.assert_called_once_with(
1765+
{TopicPartition('t', 0), TopicPartition('t', 1)})
1766+
listener.on_partitions_revoked.assert_not_called()
1767+
# Local assignment is cleared so subsequent code doesn't keep
1768+
# treating the lost partitions as owned.
1769+
assert coordinator._subscription.assigned_partitions() == set()
1770+
1771+
def test_on_join_prepare_skips_auto_commit_when_lost(
1772+
self, mocker, coordinator):
1773+
"""A forced eviction means the pre-rebalance commit would fail
1774+
with the same error; skip it instead of logging the spurious
1775+
'likely duplicate delivery' warning."""
1776+
coordinator.config['enable_auto_commit'] = True
1777+
coordinator._subscription.subscribe(topics=['t'])
1778+
coordinator._subscription.assign_from_subscribed([TopicPartition('t', 0)])
1779+
commit_spy = mocker.patch.object(
1780+
coordinator, '_commit_offsets_sync_async')
1781+
coordinator.reset_generation()
1782+
1783+
coordinator._manager.run(
1784+
coordinator._on_join_prepare_async, 0, 'member-foo')
1785+
1786+
commit_spy.assert_not_called()
1787+
1788+
def test_on_join_prepare_after_lost_then_normal(
1789+
self, mocker, coordinator):
1790+
"""A subsequent rebalance against a real (non-lost) generation
1791+
runs the normal prepare path. In production the rejoin that
1792+
follows the lost path lands a real generation in
1793+
_process_join_group_response; here we install one directly."""
1794+
coordinator.config['enable_auto_commit'] = False
1795+
listener = mocker.MagicMock(spec=ConsumerRebalanceListener)
1796+
coordinator._subscription.subscribe(topics=['t'], listener=listener)
1797+
coordinator._subscription.assign_from_subscribed([TopicPartition('t', 0)])
1798+
coordinator.reset_generation()
1799+
1800+
# First call: lost path.
1801+
coordinator._manager.run(
1802+
coordinator._on_join_prepare_async, 0, 'member-foo')
1803+
# Re-assign and install a real generation; the next prepare
1804+
# should fire revoked, not lost.
1805+
coordinator._generation = Generation(42, 'mbr-1', 'range')
1806+
coordinator._subscription.assign_from_subscribed([TopicPartition('t', 0)])
1807+
listener.reset_mock()
1808+
coordinator._manager.run(
1809+
coordinator._on_join_prepare_async, 42, 'mbr-1')
1810+
listener.on_partitions_lost.assert_not_called()
1811+
listener.on_partitions_revoked.assert_called_once_with(
1812+
{TopicPartition('t', 0)})
1813+
1814+
def test_heartbeat_illegal_generation_marks_generation_lost(self, coordinator):
1815+
"""Heartbeat IllegalGenerationError forces reset_generation; the
1816+
live generation must trip is_lost() so the next rebalance fires
1817+
on_partitions_lost."""
1818+
from kafka.protocol.consumer import HeartbeatResponse
1819+
coordinator.coordinator_id = 0
1820+
coordinator._generation = Generation(42, 'mbr-1', 'range')
1821+
# Build a HeartbeatResponse with IllegalGenerationError code.
1822+
response = HeartbeatResponse[0](Errors.IllegalGenerationError.errno)
1823+
with pytest.raises(Errors.IllegalGenerationError):
1824+
coordinator._handle_heartbeat_response(response, time.monotonic())
1825+
assert coordinator._generation.is_lost() is True
1826+
assert coordinator.state == MemberState.UNJOINED
1827+
1828+
def test_heartbeat_unknown_member_id_marks_generation_lost(self, coordinator):
1829+
from kafka.protocol.consumer import HeartbeatResponse
1830+
coordinator.coordinator_id = 0
1831+
coordinator._generation = Generation(42, 'mbr-1', 'range')
1832+
response = HeartbeatResponse[0](Errors.UnknownMemberIdError.errno)
1833+
with pytest.raises(Errors.UnknownMemberIdError):
1834+
coordinator._handle_heartbeat_response(response, time.monotonic())
1835+
assert coordinator._generation.is_lost() is True
1836+
1837+
def test_heartbeat_rebalance_in_progress_keeps_generation(self, coordinator):
1838+
"""RebalanceInProgress is a normal rebalance signal, not a forced
1839+
eviction - the live generation must stay valid."""
1840+
from kafka.protocol.consumer import HeartbeatResponse
1841+
coordinator.coordinator_id = 0
1842+
coordinator._generation = Generation(42, 'mbr-1', 'range')
1843+
response = HeartbeatResponse[0](Errors.RebalanceInProgressError.errno)
1844+
with pytest.raises(Errors.RebalanceInProgressError):
1845+
coordinator._handle_heartbeat_response(response, time.monotonic())
1846+
assert coordinator._generation.is_lost() is False
1847+
1848+
def test_commit_response_illegal_generation_marks_generation_lost(
1849+
self, coordinator, offsets):
1850+
"""OffsetCommit IllegalGeneration forces reset_generation; the
1851+
next rebalance must fire on_partitions_lost."""
1852+
coordinator.coordinator_id = 0
1853+
coordinator._generation = Generation(42, 'mbr-1', 'range')
1854+
response = OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])])
1855+
with pytest.raises(Errors.CommitFailedError):
1856+
coordinator._handle_offset_commit_response(
1857+
offsets, time.monotonic(), response)
1858+
assert coordinator._generation.is_lost() is True
1859+
1860+
def test_sync_group_illegal_generation_marks_generation_lost(self, coordinator):
1861+
"""SyncGroup IllegalGeneration forces reset_generation; the next
1862+
rebalance must fire on_partitions_lost."""
1863+
coordinator.coordinator_id = 0
1864+
coordinator._generation = Generation(42, 'mbr-1', 'range')
1865+
# SyncGroupResponse[0]: (error_code, assignment_bytes)
1866+
response = SyncGroupResponse[0](Errors.IllegalGenerationError.errno, b'')
1867+
with pytest.raises(Errors.IllegalGenerationError):
1868+
coordinator._process_sync_group_response(response, time.monotonic())
1869+
assert coordinator._generation.is_lost() is True
1870+
1871+
def test_lost_async_listener_is_awaited(self, coordinator):
1872+
"""An AsyncConsumerRebalanceListener with on_partitions_lost
1873+
override is awaited from the prepare path."""
1874+
coordinator.config['enable_auto_commit'] = False
1875+
calls = []
1876+
1877+
class AsyncListener(AsyncConsumerRebalanceListener):
1878+
async def on_partitions_revoked(self, revoked):
1879+
calls.append(('revoked', revoked))
1880+
async def on_partitions_assigned(self, assigned):
1881+
calls.append(('assigned', assigned))
1882+
async def on_partitions_lost(self, lost):
1883+
calls.append(('lost', lost))
1884+
1885+
coordinator._subscription.subscribe(topics=['t'], listener=AsyncListener())
1886+
coordinator._subscription.assign_from_subscribed([TopicPartition('t', 0)])
1887+
coordinator.reset_generation()
1888+
coordinator._manager.run(
1889+
coordinator._on_join_prepare_async, 0, 'member-foo')
1890+
1891+
assert calls == [('lost', {TopicPartition('t', 0)})]
1892+
1893+
def test_lost_listener_exception_is_caught(self, mocker, coordinator):
1894+
"""A throwing on_partitions_lost must not abort the prepare path."""
1895+
coordinator.config['enable_auto_commit'] = False
1896+
listener = mocker.MagicMock(spec=ConsumerRebalanceListener)
1897+
listener.on_partitions_lost.side_effect = RuntimeError('listener crash')
1898+
coordinator._subscription.subscribe(topics=['t'], listener=listener)
1899+
coordinator._subscription.assign_from_subscribed([TopicPartition('t', 0)])
1900+
coordinator.reset_generation()
1901+
1902+
# Must not raise; assignment still cleared.
1903+
coordinator._manager.run(
1904+
coordinator._on_join_prepare_async, 0, 'member-foo')
1905+
1906+
listener.on_partitions_lost.assert_called_once()
1907+
assert coordinator._subscription.assigned_partitions() == set()
1908+
1909+
16771910
class TestOnJoinCompleteBailOnInvalidAssignment:
16781911
"""The EAGER branch has the same bail-on-ValueError behaviour as
16791912
COOPERATIVE - if the leader hands us a topic we don't subscribe

0 commit comments

Comments
 (0)