Skip to content

Commit 9a8af08

Browse files
authored
Store in-flight request headers only for protocol parser (#2723)
1 parent efbcb43 commit 9a8af08

21 files changed

Lines changed: 64 additions & 158 deletions

kafka/protocol/__init__.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
from . import (
2+
produce, fetch, list_offsets, metadata,
3+
commit, find_coordinator, group,
4+
sasl_handshake, api_versions, admin,
5+
init_producer_id, offset_for_leader_epoch,
6+
add_partitions_to_txn, add_offsets_to_txn, end_txn,
7+
txn_offset_commit, sasl_authenticate,
8+
)
9+
110
API_KEYS = {
211
0: 'Produce',
312
1: 'Fetch',
@@ -44,3 +53,47 @@
4453
46: 'ListPartitionReassignments',
4554
48: 'DescribeClientQuotas',
4655
}
56+
57+
# Mapping of Api_key to a tuple of (request_classes, response_classes)
58+
REQUEST_TYPES = {
59+
0: (produce.ProduceRequest, produce.ProduceResponse),
60+
1: (fetch.FetchRequest, fetch.FetchResponse),
61+
2: (list_offsets.ListOffsetsRequest, list_offsets.ListOffsetsResponse),
62+
3: (metadata.MetadataRequest, metadata.MetadataResponse),
63+
8: (commit.OffsetCommitRequest, commit.OffsetCommitResponse),
64+
9: (commit.OffsetFetchRequest, commit.OffsetFetchResponse),
65+
10: (find_coordinator.FindCoordinatorRequest, find_coordinator.FindCoordinatorResponse),
66+
11: (group.JoinGroupRequest, group.JoinGroupResponse),
67+
12: (group.HeartbeatRequest, group.HeartbeatResponse),
68+
13: (group.LeaveGroupRequest, group.LeaveGroupResponse),
69+
14: (group.SyncGroupRequest, group.SyncGroupResponse),
70+
15: (admin.DescribeGroupsRequest, admin.DescribeGroupsResponse),
71+
16: (admin.ListGroupsRequest, admin.ListGroupsResponse),
72+
17: (sasl_handshake.SaslHandshakeRequest, sasl_handshake.SaslHandshakeResponse),
73+
18: (api_versions.ApiVersionsRequest, api_versions.ApiVersionsResponse),
74+
19: (admin.CreateTopicsRequest, admin.CreateTopicsResponse),
75+
20: (admin.DeleteTopicsRequest, admin.DeleteTopicsResponse),
76+
21: (admin.DeleteRecordsRequest, admin.DeleteRecordsResponse),
77+
22: (init_producer_id.InitProducerIdRequest, init_producer_id.InitProducerIdResponse),
78+
23: (offset_for_leader_epoch.OffsetForLeaderEpochRequest, offset_for_leader_epoch.OffsetForLeaderEpochResponse),
79+
24: (add_partitions_to_txn.AddPartitionsToTxnRequest, add_partitions_to_txn.AddPartitionsToTxnResponse),
80+
25: (add_offsets_to_txn.AddOffsetsToTxnRequest, add_offsets_to_txn.AddOffsetsToTxnResponse),
81+
26: (end_txn.EndTxnRequest, end_txn.EndTxnResponse),
82+
28: (txn_offset_commit.TxnOffsetCommitRequest, txn_offset_commit.TxnOffsetCommitResponse),
83+
29: (admin.DescribeAclsRequest, admin.DescribeAclsResponse),
84+
30: (admin.CreateAclsRequest, admin.CreateAclsResponse),
85+
31: (admin.DeleteAclsRequest, admin.DeleteAclsResponse),
86+
32: (admin.DescribeConfigsRequest, admin.DescribeConfigsResponse),
87+
33: (admin.AlterConfigsRequest, admin.AlterConfigsResponse),
88+
36: (sasl_authenticate.SaslAuthenticateRequest, sasl_authenticate.SaslAuthenticateResponse),
89+
37: (admin.CreatePartitionsRequest, admin.CreatePartitionsResponse),
90+
42: (admin.DeleteGroupsRequest, admin.DeleteGroupsResponse)
91+
}
92+
93+
def get_response_class(api_key, api_version):
94+
request_type, response_type = REQUEST_TYPES.get(api_key, (None, None))
95+
if response_type:
96+
if hasattr(response_type, '__getitem__'):
97+
return response_type[api_version]
98+
return response_type
99+
return None

kafka/protocol/add_offsets_to_txn.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ class AddOffsetsToTxnResponse_v2(Response):
2626
class AddOffsetsToTxnRequest_v0(Request):
2727
API_KEY = 25
2828
API_VERSION = 0
29-
RESPONSE_TYPE = AddOffsetsToTxnResponse_v0
3029
SCHEMA = Schema(
3130
('transactional_id', String('utf-8')),
3231
('producer_id', Int64),
@@ -38,14 +37,12 @@ class AddOffsetsToTxnRequest_v0(Request):
3837
class AddOffsetsToTxnRequest_v1(Request):
3938
API_KEY = 25
4039
API_VERSION = 1
41-
RESPONSE_TYPE = AddOffsetsToTxnResponse_v1
4240
SCHEMA = AddOffsetsToTxnRequest_v0.SCHEMA
4341

4442

4543
class AddOffsetsToTxnRequest_v2(Request):
4644
API_KEY = 25
4745
API_VERSION = 2
48-
RESPONSE_TYPE = AddOffsetsToTxnResponse_v2
4946
SCHEMA = AddOffsetsToTxnRequest_v1.SCHEMA
5047

5148

kafka/protocol/add_partitions_to_txn.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ class AddPartitionsToTxnResponse_v2(Response):
2929
class AddPartitionsToTxnRequest_v0(Request):
3030
API_KEY = 24
3131
API_VERSION = 0
32-
RESPONSE_TYPE = AddPartitionsToTxnResponse_v0
3332
SCHEMA = Schema(
3433
('transactional_id', String('utf-8')),
3534
('producer_id', Int64),
@@ -42,14 +41,12 @@ class AddPartitionsToTxnRequest_v0(Request):
4241
class AddPartitionsToTxnRequest_v1(Request):
4342
API_KEY = 24
4443
API_VERSION = 1
45-
RESPONSE_TYPE = AddPartitionsToTxnResponse_v1
4644
SCHEMA = AddPartitionsToTxnRequest_v0.SCHEMA
4745

4846

4947
class AddPartitionsToTxnRequest_v2(Request):
5048
API_KEY = 24
5149
API_VERSION = 2
52-
RESPONSE_TYPE = AddPartitionsToTxnResponse_v2
5350
SCHEMA = AddPartitionsToTxnRequest_v1.SCHEMA
5451

5552

0 commit comments

Comments
 (0)