Skip to content

Commit ded4f6b

Browse files
author
Marco Moser
committed
fix(aiokafka): keep Faust assignor for changelog tables and update admin request compatibility
Ensure worker consumers use the Faust assignor whenever changelog tables are present, even with zero standby replicas, so table recovery mapping remains correct. Also update metadata/create-topics request construction for newer aiokafka APIs.
1 parent 47201a6 commit ded4f6b

4 files changed

Lines changed: 38 additions & 14 deletions

File tree

faust/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
22
"""Python Stream processing."""
3+
34
# :copyright: (c) 2017-2020, Robinhood Markets, Inc.
45
# All rights reserved.
56
# :license: BSD (3 Clause), see LICENSE for more details.

faust/stores/aerospike.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def _get(self, key: bytes) -> Optional[bytes]:
9898
key = (self.namespace, self.table_name, key)
9999
fun = self.client.get
100100
try:
101-
(key, meta, bins) = self.aerospike_fun_call_with_retry(fun=fun, key=key)
101+
key, meta, bins = self.aerospike_fun_call_with_retry(fun=fun, key=key)
102102
if bins:
103103
return bins[self.BIN_KEY]
104104
return None
@@ -173,7 +173,7 @@ def _itervalues(self) -> Iterator[bytes]:
173173
fun=fun, namespace=self.namespace, set=self.table_name
174174
)
175175
for result in scan.results():
176-
(key, meta, bins) = result
176+
key, meta, bins = result
177177
if bins:
178178
yield bins[self.BIN_KEY]
179179
else:
@@ -193,8 +193,8 @@ def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]:
193193
fun=fun, namespace=self.namespace, set=self.table_name
194194
)
195195
for result in scan.results():
196-
(key_data, meta, bins) = result
197-
(ns, set, policy, key) = key_data
196+
key_data, meta, bins = result
197+
ns, set, policy, key = key_data
198198

199199
if bins:
200200
bins = bins[self.BIN_KEY]
@@ -214,7 +214,7 @@ def _contains(self, key: bytes) -> bool:
214214
try:
215215
if self.app.conf.store_check_exists:
216216
key = (self.namespace, self.table_name, key)
217-
(key, meta) = self.aerospike_fun_call_with_retry(
217+
key, meta = self.aerospike_fun_call_with_retry(
218218
fun=self.client.exists, key=key
219219
)
220220
if meta:

faust/transport/drivers/aiokafka.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@
2727
import aiokafka
2828
import aiokafka.abc
2929
import opentracing
30-
from packaging.version import Version
31-
32-
_AIOKAFKA_HAS_API_VERSION = Version(aiokafka.__version__) < Version("0.13.0")
3330
from aiokafka import TopicPartition
3431
from aiokafka.consumer.group_coordinator import OffsetCommitRequest
3532
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
@@ -45,7 +42,7 @@
4542
)
4643
from aiokafka.partitioner import DefaultPartitioner, murmur2
4744
from aiokafka.protocol.admin import CreateTopicsRequest
48-
from aiokafka.protocol.metadata import MetadataRequest_v1
45+
from aiokafka.protocol.metadata import MetadataRequest
4946
from aiokafka.structs import OffsetAndMetadata, TopicPartition as _TopicPartition
5047
from aiokafka.util import parse_kafka_version
5148
from mode import Service, get_logger
@@ -55,6 +52,7 @@
5552
from mode.utils.objects import cached_property
5653
from mode.utils.times import Seconds, humanize_seconds_ago, want_seconds
5754
from opentracing.ext import tags
55+
from packaging.version import Version
5856
from yarl import URL
5957

6058
from faust.auth import (
@@ -96,6 +94,8 @@
9694

9795
logger = get_logger(__name__)
9896

97+
_AIOKAFKA_HAS_API_VERSION = Version(aiokafka.__version__) < Version("0.13.0")
98+
9999
DEFAULT_GENERATION_ID = OffsetCommitRequest.DEFAULT_GENERATION_ID
100100

101101
TOPIC_LENGTH_MAX = 249
@@ -511,9 +511,13 @@ def _create_worker_consumer(
511511
conf = self.app.conf
512512
if self.consumer.in_transaction:
513513
isolation_level = "read_committed"
514+
# Table recovery depends on app.assignor state to map changelog
515+
# active/standby partitions. Keep Faust assignor enabled whenever
516+
# this app has changelog tables configured.
517+
has_changelog_tables = bool(self.app.tables.changelog_topics)
514518
self._assignor = (
515519
self.app.assignor
516-
if self.app.conf.table_standby_replicas > 0
520+
if self.app.conf.table_standby_replicas > 0 or has_changelog_tables
517521
else RoundRobinPartitionAssignor
518522
)
519523
auth_settings = credentials_to_aiokafka_auth(
@@ -1513,7 +1517,7 @@ async def _get_controller_node(
15131517
for node_id in nodes:
15141518
if node_id is None:
15151519
raise NotReady("Not connected to Kafka Broker")
1516-
request = MetadataRequest_v1([])
1520+
request = MetadataRequest([])
15171521
wait_result = await owner.wait(
15181522
client.send(node_id, request),
15191523
timeout=timeout,
@@ -1546,7 +1550,6 @@ async def _really_create_topic(
15461550
owner.log.debug("Topic %r exists, skipping creation.", topic)
15471551
return
15481552

1549-
protocol_version = 1
15501553
extra_configs = config or {}
15511554
config = self._topic_config(retention, compacting, deleting)
15521555
config.update(extra_configs)
@@ -1563,7 +1566,7 @@ async def _really_create_topic(
15631566
else:
15641567
raise Exception("Controller node is None")
15651568

1566-
request = CreateTopicsRequest[protocol_version](
1569+
request = CreateTopicsRequest(
15671570
[(topic, partitions, replication, [], list(config.items()))],
15681571
timeout,
15691572
False,

tests/unit/transport/drivers/test_aiokafka.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
import faust
1818
from faust import auth
1919
from faust.exceptions import ImproperlyConfigured, NotReady
20-
from faust.transport.drivers.aiokafka import _AIOKAFKA_HAS_API_VERSION
2120
from faust.sensors.monitor import Monitor
2221
from faust.transport.drivers import aiokafka as mod
2322
from faust.transport.drivers.aiokafka import (
23+
_AIOKAFKA_HAS_API_VERSION,
2424
SLOW_PROCESSING_CAUSE_AGENT,
2525
SLOW_PROCESSING_CAUSE_STREAM,
2626
SLOW_PROCESSING_EXPLAINED,
@@ -796,6 +796,26 @@ def test__create_worker_consumer__transaction(self, *, cthread, app):
796796
isolation_level="read_committed",
797797
)
798798

799+
def test__create_worker_consumer__uses_roundrobin_without_tables(
800+
self, *, cthread, app
801+
):
802+
app.conf.table_standby_replicas = 0
803+
app.tables._changelogs.clear()
804+
transport = cthread.transport
805+
with patch("aiokafka.AIOKafkaConsumer"):
806+
cthread._create_worker_consumer(transport)
807+
assert cthread._assignor is mod.RoundRobinPartitionAssignor
808+
809+
def test__create_worker_consumer__uses_faust_assignor_with_changelog_topics(
810+
self, *, cthread, app
811+
):
812+
app.conf.table_standby_replicas = 0
813+
app.tables._changelogs["app-foo-changelog"] = Mock(name="table")
814+
transport = cthread.transport
815+
with patch("aiokafka.AIOKafkaConsumer"):
816+
cthread._create_worker_consumer(transport)
817+
assert cthread._assignor is app.assignor
818+
799819
def assert_create_worker_consumer(
800820
self,
801821
cthread,

0 commit comments

Comments
 (0)