Skip to content

Commit 057cc69

Browse files
authored
Add ConsumerProtocol data schemas to kafka.protocol.new.consumer.metadata (#2754)
1 parent 3ac9788 commit 057cc69

6 files changed

Lines changed: 169 additions & 2 deletions

File tree

kafka/coordinator/protocol.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,17 @@ class ConsumerProtocolMemberMetadata_v0(Struct):
1313
class ConsumerProtocolMemberAssignment_v0(Struct):
1414
SCHEMA = Schema(
1515
('version', Int16),
16-
('assignment', Array(
16+
('assigned_partitions', Array(
1717
('topic', String('utf-8')),
1818
('partitions', Array(Int32)))),
1919
('user_data', Bytes))
20+
ALIASES = {
21+
'assignment': 'assigned_partitions',
22+
}
2023

2124
def partitions(self):
2225
return [TopicPartition(topic, partition)
23-
for topic, partitions in self.assignment # pylint: disable-msg=no-member
26+
for topic, partitions in self.assigned_partitions
2427
for partition in partitions]
2528

2629

kafka/protocol/new/consumer/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from .fetch import *
22
from .group import *
3+
from .metadata import *
34
from .offsets import *
45

56

@@ -13,4 +14,5 @@
1314
'HeartbeatRequest', 'HeartbeatResponse',
1415
'OffsetFetchRequest', 'OffsetFetchResponse',
1516
'OffsetCommitRequest', 'OffsetCommitResponse',
17+
'ConsumerProtocolType', 'ConsumerProtocolSubscription', 'ConsumerProtocolAssignment',
1618
]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from ..api_data import ApiData
2+
from kafka.structs import TopicPartition
3+
4+
5+
ConsumerProtocolType = 'consumer'
6+
7+
8+
class ConsumerProtocolSubscription(ApiData): pass
9+
class ConsumerProtocolAssignment(ApiData):
10+
11+
# Compatibility with old manual protocol definition
12+
@property
13+
def assignment(self):
14+
return self.assigned_partitions
15+
16+
@assignment.setter
17+
def assignment(self, value):
18+
self.assigned_partitions = value
19+
20+
def partitions(self):
21+
return [TopicPartition(topic, partition)
22+
for topic, partitions in self.assigned_partitions
23+
for partition in partitions]
24+
25+
26+
__all__ = [
27+
'ConsumerProtocolSubscription', 'ConsumerProtocolAssignment', 'ConsumerProtocolType',
28+
]
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
{
17+
"type": "data",
18+
"name": "ConsumerProtocolAssignment",
19+
// Assignment part of the Consumer Protocol.
20+
//
21+
// The current implementation assumes that future versions will not break compatibility. When
22+
// it encounters a newer version, it parses it using the current format. This basically means
23+
// that new versions cannot remove or reorder any of the existing fields.
24+
//
25+
// Version 2 is to support a new field "GenerationId" in ConsumerProtocolSubscription.
26+
// Version 3 adds rack id to ConsumerProtocolSubscription.
27+
"validVersions": "0-3",
28+
"flexibleVersions": "none",
29+
"fields": [
30+
{ "name": "AssignedPartitions", "type": "[]TopicPartition", "versions": "0+",
31+
"about": "The list of topics and partitions assigned to this consumer.", "fields": [
32+
{ "name": "Topic", "type": "string", "mapKey": true, "versions": "0+", "entityType": "topicName",
33+
"about": "The topic name."},
34+
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
35+
"about": "The list of partitions assigned to this consumer."}
36+
]
37+
},
38+
{ "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
39+
"default": "null", "zeroCopy": true,
40+
"about": "User data."}
41+
]
42+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
{
17+
"type": "data",
18+
"name": "ConsumerProtocolSubscription",
19+
// Subscription part of the Consumer Protocol.
20+
//
21+
// The current implementation assumes that future versions will not break compatibility. When
22+
// it encounters a newer version, it parses it using the current format. This basically means
23+
// that new versions cannot remove or reorder any of the existing fields.
24+
25+
// Version 1 added the "OwnedPartitions" field to allow assigner know what partitions each member owned
26+
// Version 2 added a new field "GenerationId" to indicate if the member has out-of-date ownedPartitions.
27+
// Version 3 adds rack id to enable rack-aware assignment.
28+
"validVersions": "0-3",
29+
"flexibleVersions": "none",
30+
"fields": [
31+
{ "name": "Topics", "type": "[]string", "versions": "0+",
32+
"about": "The topics that the member wants to consume."},
33+
{ "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
34+
"default": "null", "zeroCopy": true,
35+
"about": "User data that will be passed back to the consumer."},
36+
{ "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true,
37+
"about": "The partitions that the member owns.", "fields": [
38+
{ "name": "Topic", "type": "string", "mapKey": true, "versions": "1+", "entityType": "topicName",
39+
"about": "The topic name."},
40+
{ "name": "Partitions", "type": "[]int32", "versions": "1+",
41+
"about": "The partition ids."}
42+
]
43+
},
44+
{ "name": "GenerationId", "type": "int32", "versions": "2+", "default": "-1", "ignorable": true,
45+
"about": "The generation id of the member."},
46+
{ "name": "RackId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "ignorable": true,
47+
"about": "The rack id of the member."}
48+
]
49+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import pytest
2+
3+
from kafka.protocol.new.consumer.metadata import *
4+
5+
6+
@pytest.mark.parametrize('version', range(ConsumerProtocolAssignment.min_version, ConsumerProtocolAssignment.max_version + 1))
7+
def test_consumer_protocol_assignment(version):
8+
assignment = ConsumerProtocolAssignment(
9+
assigned_partitions=[('t0', [0, 2]), ('t1', [1])],
10+
user_data=b'foo\x12',
11+
version=version,
12+
)
13+
encoded = assignment.encode()
14+
decoded = ConsumerProtocolAssignment.decode(encoded)
15+
assert decoded == assignment
16+
assert decoded.version == version
17+
assert len(decoded.assigned_partitions) == 2
18+
assert decoded.assigned_partitions[0].topic == 't0'
19+
assert decoded.assigned_partitions[0].partitions == [0, 2]
20+
assert decoded.assigned_partitions[1].topic == 't1'
21+
assert decoded.assigned_partitions[1].partitions == [1]
22+
assert decoded.user_data == b'foo\x12'
23+
24+
25+
@pytest.mark.parametrize('version', range(ConsumerProtocolSubscription.min_version, ConsumerProtocolSubscription.max_version + 1))
26+
def test_consumer_protocol_subscription(version):
27+
topics = ['t0', 't1']
28+
user_data = b'foo\x12'
29+
subscription = ConsumerProtocolSubscription(
30+
topics=topics,
31+
user_data=user_data,
32+
version=version,
33+
)
34+
encoded = subscription.encode()
35+
decoded = ConsumerProtocolSubscription.decode(encoded)
36+
assert decoded == subscription
37+
assert decoded.version == version
38+
assert decoded.topics == topics
39+
assert decoded.user_data == user_data
40+
41+
42+
def test_consumer_protocol_type():
43+
assert ConsumerProtocolType == 'consumer'

0 commit comments

Comments
 (0)