diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a45909ad1..b8d1378be 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -6,7 +6,7 @@ import time from kafka.coordinator.base import BaseCoordinator, Generation -from kafka.coordinator.assignors.abstract import RebalanceProtocol +from kafka.coordinator.assignors.abstract import RebalanceProtocol, AbstractPartitionAssignor from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor @@ -115,7 +115,10 @@ def __init__(self, client, subscription, **configs): self._assignors = {} for klass in self.config['assignors']: - assignor = klass() + if isinstance(klass, AbstractPartitionAssignor): + assignor = klass + else: + assignor = klass() self._assignors[assignor.name] = assignor # KIP-429: all configured assignors must agree on a single # RebalanceProtocol mode. Mixing EAGER and COOPERATIVE