Skip to content

Commit c75bb5f

Browse files
authored
Cleanup and expand coverage for non-admin new protocol tests (#2749)
1 parent 30da55a commit c75bb5f

8 files changed

Lines changed: 614 additions & 546 deletions

File tree

test/protocol/new/consumer/test_new_fetch.py

Lines changed: 111 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,118 @@
55
from kafka.protocol.new.consumer import FetchRequest, FetchResponse
66

77

8+
@pytest.mark.parametrize("version", range(FetchRequest.min_version, FetchRequest.max_version + 1))
9+
def test_fetch_request_roundtrip(version):
10+
ReplicaState = FetchRequest.ReplicaState
11+
Topic = FetchRequest.FetchTopic
12+
Partition = Topic.FetchPartition
13+
ForgottenTopic = FetchRequest.ForgottenTopic
14+
request = FetchRequest(
15+
cluster_id='cluster' if version >= 12 else None,
16+
replica_id=12 if version <= 14 else -1,
17+
replica_state=ReplicaState(
18+
replica_id=12,
19+
replica_epoch=345,
20+
) if version >= 15 else None,
21+
max_wait_ms=500,
22+
min_bytes=1,
23+
isolation_level=2 if version >= 4 else 0,
24+
session_id=12 if version >= 7 else 0,
25+
session_epoch=34 if version >= 7 else -1,
26+
topics=[
27+
Topic(
28+
topic="test-topic" if version <= 12 else '',
29+
topic_id=uuid.uuid4() if version >= 13 else None,
30+
partitions=[
31+
Partition(
32+
partition=0,
33+
current_leader_epoch=6 if version >= 9 else -1,
34+
fetch_offset=100,
35+
last_fetched_epoch=8 if version >= 12 else -1,
36+
log_start_offset=3 if version >= 5 else -1,
37+
partition_max_bytes=1024,
38+
replica_directory_id=uuid.uuid4() if version >= 17 else None,
39+
)
40+
],
41+
),
42+
],
43+
forgotten_topics_data=[
44+
ForgottenTopic(
45+
topic='foo' if version <= 12 else '',
46+
topic_id=uuid.uuid4() if version >= 13 else None,
47+
partitions=[1, 2, 3],
48+
),
49+
] if version >= 7 else [],
50+
rack_id='Z' if version >= 11 else '',
51+
)
52+
encoded = request.encode(version=version)
53+
decoded = FetchRequest.decode(encoded, version=version)
54+
assert decoded == request
55+
56+
57+
@pytest.mark.parametrize("version", range(FetchResponse.min_version, FetchResponse.max_version + 1))
58+
def test_fetch_response_roundtrip(version):
59+
Response = FetchResponse.FetchableTopicResponse
60+
PartitionData = Response.PartitionData
61+
EpochEndOffset = PartitionData.EpochEndOffset
62+
LeaderIdAndEpoch = PartitionData.LeaderIdAndEpoch
63+
SnapshotId = PartitionData.SnapshotId
64+
AbortedTransaction = PartitionData.AbortedTransaction
65+
NodeEndpoint = FetchResponse.NodeEndpoint
66+
response = FetchResponse(
67+
throttle_time_ms=100 if version >= 1 else 0,
68+
error_code=13 if version >= 7 else 0,
69+
session_id=12345 if version >= 7 else 0,
70+
responses=[
71+
Response(
72+
topic="test-topic" if version <= 12 else '',
73+
topic_id=uuid.uuid4() if version >= 13 else None,
74+
partitions=[
75+
PartitionData(
76+
partition_index=0,
77+
error_code=0,
78+
high_watermark=1000,
79+
last_stable_offset=3 if version >= 4 else -1,
80+
log_start_offset=25 if version >= 5 else -1,
81+
diverging_epoch=EpochEndOffset(
82+
epoch=1000,
83+
end_offset=3,
84+
) if version >= 12 else None,
85+
current_leader=LeaderIdAndEpoch(
86+
leader_id=5,
87+
leader_epoch=99,
88+
) if version >= 12 else None,
89+
snapshot_id=SnapshotId(
90+
end_offset=44,
91+
epoch=88,
92+
) if version >= 12 else None,
93+
aborted_transactions=[
94+
AbortedTransaction(
95+
producer_id=3,
96+
first_offset=9,
97+
),
98+
] if version >= 4 else [],
99+
preferred_read_replica=12 if version >= 11 else -1,
100+
records=b"some records"
101+
)
102+
]
103+
)
104+
],
105+
node_endpoints=[
106+
NodeEndpoint(
107+
node_id=12,
108+
host='foo',
109+
port=1000,
110+
rack='ZZ',
111+
),
112+
] if version >= 16 else [],
113+
)
114+
encoded = response.encode(version=version)
115+
decoded = FetchResponse.decode(encoded, version=version)
116+
assert decoded == response
117+
118+
8119
def test_fetch_request_v15_hex():
9-
# Hex dump provided by user
10120
expected_hex = "0001000f0000007b00096d792d636c69656e7400000001f400000001000003e800123456780000007f01010100"
11121
expected_bytes = binascii.unhexlify(expected_hex)
12122

@@ -34,95 +144,3 @@ def test_fetch_request_v15_hex():
34144
assert decoded.session_id == 0x12345678
35145
assert decoded.session_epoch == 0x7f
36146
assert decoded.max_wait_ms == 500
37-
38-
39-
@pytest.mark.parametrize("version", range(FetchRequest.min_version, FetchRequest.max_version + 1))
40-
def test_fetch_request_roundtrip(version):
41-
# Topic data needs to match the version's requirements (Topic vs TopicId)
42-
topic_data = []
43-
if version < 13:
44-
topic_data = [
45-
FetchRequest.FetchTopic(
46-
topic="test-topic",
47-
partitions=[
48-
FetchRequest.FetchTopic.FetchPartition(
49-
partition=0,
50-
fetch_offset=100,
51-
partition_max_bytes=1024
52-
)
53-
]
54-
)
55-
]
56-
else:
57-
topic_id = uuid.uuid4()
58-
topic_data = [
59-
FetchRequest.FetchTopic(
60-
topic_id=topic_id,
61-
partitions=[
62-
FetchRequest.FetchTopic.FetchPartition(
63-
partition=0,
64-
fetch_offset=100,
65-
partition_max_bytes=1024
66-
)
67-
]
68-
)
69-
]
70-
71-
data = FetchRequest(
72-
replica_id=-1,
73-
max_wait_ms=500,
74-
min_bytes=1,
75-
topics=topic_data
76-
)
77-
78-
encoded = data.encode(version=version)
79-
decoded = FetchRequest.decode(encoded, version=version)
80-
81-
assert decoded == data
82-
83-
84-
@pytest.mark.parametrize("version", range(FetchResponse.min_version, FetchResponse.max_version + 1))
85-
def test_fetch_response_roundtrip(version):
86-
# Mocking some response data
87-
resp_topic_data = []
88-
if version < 13:
89-
resp_topic_data = [
90-
FetchResponse.FetchableTopicResponse(
91-
topic="test-topic",
92-
partitions=[
93-
FetchResponse.FetchableTopicResponse.PartitionData(
94-
partition_index=0,
95-
error_code=0,
96-
high_watermark=1000,
97-
records=b"some records"
98-
)
99-
]
100-
)
101-
]
102-
else:
103-
topic_id = uuid.uuid4()
104-
resp_topic_data = [
105-
FetchResponse.FetchableTopicResponse(
106-
topic_id=topic_id,
107-
partitions=[
108-
FetchResponse.FetchableTopicResponse.PartitionData(
109-
partition_index=0,
110-
error_code=0,
111-
high_watermark=1000,
112-
records=b"some records"
113-
)
114-
]
115-
)
116-
]
117-
118-
data = FetchResponse(
119-
throttle_time_ms=100 if version >= 1 else 0,
120-
error_code=0,
121-
session_id=12345 if version >= 7 else 0,
122-
responses=resp_topic_data
123-
)
124-
125-
encoded = data.encode(version=version)
126-
decoded = FetchResponse.decode(encoded, version=version)
127-
128-
assert decoded == data

0 commit comments

Comments
 (0)