Skip to content

Commit 8839d52

Browse files
authored
Dynamic protocol classes using upstream json schemas (#2727)
1 parent 10e3fa5 commit 8839d52

119 files changed

Lines changed: 7076 additions & 8 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

kafka/protocol/new/__init__.py

Whitespace-only changes.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from .acl import *
2+
from .cluster import *
3+
from .groups import *
4+
from .topics import *
5+
6+
__all__ = [
7+
'CreateAclsRequest', 'CreateAclsResponse',
8+
'DeleteAclsRequest', 'DeleteAclsResponse',
9+
'DescribeAclsRequest', 'DescribeAclsResponse',
10+
'ACLResourceType', 'ACLOperation',
11+
'ACLPermissionType', 'ACLResourcePatternType',
12+
13+
'DescribeClusterRequest', 'DescribeClusterResponse',
14+
'DescribeConfigsRequest', 'DescribeConfigsResponse',
15+
'AlterConfigsRequest', 'AlterConfigsResponse',
16+
'DescribeLogDirsRequest', 'DescribeLogDirsResponse',
17+
'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType',
18+
19+
'DescribeGroupsRequest', 'DescribeGroupsResponse',
20+
'ListGroupsRequest', 'ListGroupsResponse',
21+
'DeleteGroupsRequest', 'DeleteGroupsResponse',
22+
23+
'CreateTopicsRequest', 'CreateTopicsResponse',
24+
'DeleteTopicsRequest', 'DeleteTopicsResponse',
25+
'CreatePartitionsRequest', 'CreatePartitionsResponse',
26+
'DeleteRecordsRequest', 'DeleteRecordsResponse',
27+
]

kafka/protocol/new/admin/acl.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
from enum import IntEnum
2+
3+
from ..api_message import ApiMessage
4+
5+
6+
class CreateAclsRequest(ApiMessage): pass
7+
class CreateAclsResponse(ApiMessage): pass
8+
9+
class DeleteAclsRequest(ApiMessage): pass
10+
class DeleteAclsResponse(ApiMessage): pass
11+
12+
class DescribeAclsRequest(ApiMessage): pass
13+
class DescribeAclsResponse(ApiMessage): pass
14+
15+
16+
class ACLResourceType(IntEnum):
17+
"""Type of kafka resource to set ACL for
18+
19+
The ANY value is only valid in a filter context
20+
"""
21+
UNKNOWN = 0,
22+
ANY = 1,
23+
CLUSTER = 4,
24+
DELEGATION_TOKEN = 6,
25+
GROUP = 3,
26+
TOPIC = 2,
27+
TRANSACTIONAL_ID = 5
28+
29+
30+
class ACLOperation(IntEnum):
31+
"""Type of operation
32+
33+
The ANY value is only valid in a filter context
34+
"""
35+
UNKNOWN = 0,
36+
ANY = 1,
37+
ALL = 2,
38+
READ = 3,
39+
WRITE = 4,
40+
CREATE = 5,
41+
DELETE = 6,
42+
ALTER = 7,
43+
DESCRIBE = 8,
44+
CLUSTER_ACTION = 9,
45+
DESCRIBE_CONFIGS = 10,
46+
ALTER_CONFIGS = 11,
47+
IDEMPOTENT_WRITE = 12,
48+
CREATE_TOKENS = 13,
49+
DESCRIBE_TOKENS = 14
50+
51+
52+
class ACLPermissionType(IntEnum):
53+
"""An enumerated type of permissions
54+
55+
The ANY value is only valid in a filter context
56+
"""
57+
UNKNOWN = 0,
58+
ANY = 1,
59+
DENY = 2,
60+
ALLOW = 3
61+
62+
63+
class ACLResourcePatternType(IntEnum):
64+
"""An enumerated type of resource patterns
65+
66+
More details on the pattern types and how they work
67+
can be found in KIP-290 (Support for prefixed ACLs)
68+
https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs
69+
"""
70+
UNKNOWN = 0,
71+
ANY = 1,
72+
MATCH = 2,
73+
LITERAL = 3,
74+
PREFIXED = 4
75+
76+
77+
__all__ = [
78+
'CreateAclsRequest', 'CreateAclsResponse',
79+
'DeleteAclsRequest', 'DeleteAclsResponse',
80+
'DescribeAclsRequest', 'DescribeAclsResponse',
81+
'ACLResourceType', 'ACLOperation',
82+
'ACLPermissionType', 'ACLResourcePatternType',
83+
]
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from enum import IntEnum
2+
3+
from ..api_message import ApiMessage
4+
5+
6+
class DescribeClusterRequest(ApiMessage): pass
7+
class DescribeClusterResponse(ApiMessage): pass
8+
9+
class DescribeConfigsRequest(ApiMessage): pass
10+
class DescribeConfigsResponse(ApiMessage): pass
11+
12+
class AlterConfigsRequest(ApiMessage): pass
13+
class AlterConfigsResponse(ApiMessage): pass
14+
15+
class DescribeLogDirsRequest(ApiMessage): pass
16+
class DescribeLogDirsResponse(ApiMessage): pass
17+
18+
class ElectLeadersRequest(ApiMessage): pass
19+
class ElectLeadersResponse(ApiMessage): pass
20+
21+
class ElectionType(IntEnum):
22+
""" Leader election type
23+
"""
24+
25+
PREFERRED = 0,
26+
UNCLEAN = 1
27+
28+
29+
__all__ = [
30+
'DescribeClusterRequest', 'DescribeClusterResponse',
31+
'DescribeConfigsRequest', 'DescribeConfigsResponse',
32+
'AlterConfigsRequest', 'AlterConfigsResponse',
33+
'DescribeLogDirsRequest', 'DescribeLogDirsResponse',
34+
'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType',
35+
]

kafka/protocol/new/admin/groups.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from ..api_message import ApiMessage
2+
3+
4+
class DescribeGroupsRequest(ApiMessage): pass
5+
class DescribeGroupsResponse(ApiMessage): pass
6+
7+
class ListGroupsRequest(ApiMessage): pass
8+
class ListGroupsResponse(ApiMessage): pass
9+
10+
class DeleteGroupsRequest(ApiMessage): pass
11+
class DeleteGroupsResponse(ApiMessage): pass
12+
13+
14+
__all__ = [
15+
'DescribeGroupsRequest', 'DescribeGroupsResponse',
16+
'ListGroupsRequest', 'ListGroupsResponse',
17+
'DeleteGroupsRequest', 'DeleteGroupsResponse',
18+
]

kafka/protocol/new/admin/topics.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from ..api_message import ApiMessage
2+
3+
4+
class CreateTopicsRequest(ApiMessage): pass
5+
class CreateTopicsResponse(ApiMessage): pass
6+
7+
class DeleteTopicsRequest(ApiMessage): pass
8+
class DeleteTopicsResponse(ApiMessage): pass
9+
10+
class CreatePartitionsRequest(ApiMessage): pass
11+
class CreatePartitionsResponse(ApiMessage): pass
12+
13+
class DeleteRecordsRequest(ApiMessage): pass
14+
class DeleteRecordsResponse(ApiMessage): pass
15+
16+
17+
__all__ = [
18+
'CreateTopicsRequest', 'CreateTopicsResponse',
19+
'DeleteTopicsRequest', 'DeleteTopicsResponse',
20+
'CreatePartitionsRequest', 'CreatePartitionsResponse',
21+
'DeleteRecordsRequest', 'DeleteRecordsResponse',
22+
]

kafka/protocol/new/api_header.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from .data_container import DataContainer, SlotsBuilder
2+
from .schemas import BaseField, StructField, load_json
3+
4+
5+
class ApiHeaderMeta(SlotsBuilder):
6+
def __new__(metacls, name, bases, attrs, **kw):
7+
if kw.get('init', True):
8+
json = load_json(name)
9+
attrs['_json'] = json
10+
attrs['_struct'] = StructField(json)
11+
return super().__new__(metacls, name, bases, attrs, **kw)
12+
13+
14+
class ApiHeader(DataContainer, metaclass=ApiHeaderMeta, init=False):
15+
__slots__ = ()
16+
17+
def __init_subclass__(cls, **kw):
18+
super().__init_subclass__(**kw)
19+
if kw.get('init', True):
20+
# pylint: disable=E1101
21+
assert cls._json['type'] == 'header'
22+
cls._flexible_versions = BaseField.parse_versions(cls._json['flexibleVersions'])
23+
cls._valid_versions = BaseField.parse_versions(cls._json['validVersions'])
24+
25+
def encode(self, flexible=False):
26+
# Request versions are 1-2, Response versions are 0-1
27+
version = self._flexible_versions[0] if flexible else self._valid_versions[0] # pylint: disable=E1136
28+
# compact=False is probably wrong,
29+
# but it works to make sure that the client_id request header field
30+
# is never encoded as compact (required to support ApiVersionsRequest for unsupported version)
31+
return super().encode(version=version, compact=False, tagged=flexible)
32+
33+
@classmethod
34+
def decode(cls, data, flexible=False):
35+
# Request versions are 1-2, Response versions are 0-1
36+
version = cls._flexible_versions[0] if flexible else cls._valid_versions[0] # pylint: disable=E1136
37+
return cls._struct.decode(data, version=version, compact=False, tagged=flexible, data_class=cls)
38+
39+
40+
class ResponseClassRegistry:
41+
_response_class_registry = {}
42+
43+
@classmethod
44+
def register_response_class(cls, response_class):
45+
cls._response_class_registry[response_class.API_KEY] = response_class
46+
47+
@classmethod
48+
def get_response_class(cls, request_header):
49+
response_class = cls._response_class_registry.get(request_header.request_api_key)
50+
if response_class is not None:
51+
return response_class[request_header.request_api_version]
52+
53+
54+
class RequestHeader(ApiHeader):
55+
def get_response_class(self):
56+
return ResponseClassRegistry.get_response_class(self)
57+
58+
59+
class ResponseHeader(ApiHeader): pass

0 commit comments

Comments
 (0)