Skip to content

Commit 5bd64ba

Browse files
authored
Add StickyAssignorUserData json schema (#2755)
1 parent 057cc69 commit 5bd64ba

4 files changed

Lines changed: 53 additions & 3 deletions

File tree

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
{
17+
"type": "data",
18+
"name": "StickyAssignorUserData",
19+
// StickyAssignor currently always encodes with version 1.
20+
// To decode, versions are attempted in reverse order until one succeeds.
21+
// If no decoding is possible, the assignor ignores the previous user data.
22+
23+
// Version 1 added the "generation" field
24+
"validVersions": "0-1",
25+
"flexibleVersions": "none",
26+
"fields": [
27+
{ "name": "PreviousAssignment", "type": "[]TopicPartition", "versions": "0+", "fields": [
28+
{ "name": "Topic", "type": "string", "mapKey": true, "versions": "0+", "entityType": "topicName",
29+
"about": "The topic name."},
30+
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
31+
"about": "The partition ids."}
32+
]
33+
},
34+
{ "name": "Generation", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
35+
"about": "The generation id of the previous assignment."}
36+
]
37+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from kafka.protocol.new.api_data import ApiData
2+
3+
4+
class StickyAssignorUserData(ApiData, load_json=__package__):
5+
def __init__(self, *args, **kw):
6+
if 'version' not in kw:
7+
kw['version'] = 1
8+
super().__init__(*args, **kw)

kafka/protocol/new/api_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
class JsonSchemaData(SlotsBuilder):
1212
def __new__(metacls, name, bases, attrs, **kw):
1313
if kw.get('init', True):
14-
json = load_json(name)
14+
json = load_json(name, package=kw.get('load_json'))
1515
if 'json_patch' in attrs:
1616
json = attrs['json_patch'].__func__(metacls, json)
1717
attrs['_json'] = json

kafka/protocol/new/schemas/load_json.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
import importlib.resources
2+
import inspect
23
import json
34
import re
45

56

6-
def load_json(msg_type):
7+
def load_json(msg_type, package=None):
8+
if package is None:
9+
package = __package__ + '.resources'
10+
elif inspect.ismodule(package):
11+
package = package.__package__
712
COMMENTS_REGEX = r"(?m)((?:^\s*//.*\n?)+)"
813
# Raises FileNotFoundError if not found
9-
msg_json = importlib.resources.read_text(__package__ + '.resources', msg_type + '.json')
14+
msg_json = importlib.resources.read_text(package, msg_type + '.json')
1015
data = json.loads(re.sub(COMMENTS_REGEX, '', msg_json))
1116
comments = re.findall(COMMENTS_REGEX, msg_json)
1217
if comments:

0 commit comments

Comments
 (0)