|
4 | 4 | import pytest |
5 | 5 |
|
6 | 6 | from kafka.client_async import KafkaClient |
7 | | -from kafka.consumer.subscription_state import ( |
8 | | - SubscriptionState, ConsumerRebalanceListener) |
| 7 | +from kafka.consumer.subscription_state import SubscriptionState, ConsumerRebalanceListener |
9 | 8 | from kafka.coordinator.assignors.range import RangePartitionAssignor |
10 | 9 | from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor |
11 | 10 | from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor |
|
17 | 16 | import kafka.errors as Errors |
18 | 17 | from kafka.future import Future |
19 | 18 | from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS |
20 | | -from kafka.protocol.commit import ( |
| 19 | +from kafka.protocol.new.consumer import ( |
21 | 20 | OffsetCommitRequest, OffsetCommitResponse, |
22 | | - OffsetFetchRequest, OffsetFetchResponse) |
| 21 | + OffsetFetchRequest, OffsetFetchResponse, |
| 22 | +) |
23 | 23 | from kafka.protocol.group import GroupMember |
24 | | -from kafka.protocol.metadata import MetadataResponse |
| 24 | +from kafka.protocol.new.metadata import MetadataResponse |
25 | 25 | from kafka.structs import OffsetAndMetadata, TopicPartition |
26 | 26 | from kafka.util import WeakMethod |
27 | 27 |
|
@@ -446,23 +446,23 @@ def test_send_offset_commit_request_fail(mocker, patched_coord, offsets): |
446 | 446 | assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError) |
447 | 447 |
|
448 | 448 |
|
449 | | -@pytest.mark.parametrize('api_version,req_type', [ |
450 | | - ((0, 8, 1), OffsetCommitRequest[0]), |
451 | | - ((0, 8, 2), OffsetCommitRequest[1]), |
452 | | - ((0, 9), OffsetCommitRequest[2]), |
453 | | - ((0, 11), OffsetCommitRequest[3]), |
454 | | - ((2, 0), OffsetCommitRequest[4]), |
455 | | - ((2, 1), OffsetCommitRequest[6]), |
| 449 | +@pytest.mark.parametrize('api_version,version', [ |
| 450 | + ((0, 8, 1), 0), |
| 451 | + ((0, 8, 2), 1), |
| 452 | + ((0, 9), 2), |
| 453 | + ((0, 11), 3), |
| 454 | + ((2, 0), 4), |
| 455 | + ((2, 1), 6), |
456 | 456 | ]) |
457 | 457 | def test_send_offset_commit_request_versions(patched_coord, offsets, |
458 | | - api_version, req_type): |
| 458 | + api_version, version): |
459 | 459 | expect_node = 0 |
460 | 460 | patched_coord._client._api_versions = BROKER_API_VERSIONS[api_version] |
461 | 461 |
|
462 | 462 | patched_coord._send_offset_commit_request(offsets) |
463 | 463 | (node, request), _ = patched_coord._client.send.call_args |
464 | 464 | assert node == expect_node, 'Unexpected coordinator node' |
465 | | - assert isinstance(request, req_type) |
| 465 | + assert request.API_VERSION == version |
466 | 466 |
|
467 | 467 |
|
468 | 468 | def test_send_offset_commit_request_failure(patched_coord, offsets): |
@@ -559,25 +559,25 @@ def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions): |
559 | 559 | assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError) |
560 | 560 |
|
561 | 561 |
|
562 | | -@pytest.mark.parametrize('api_version,req_type', [ |
563 | | - ((0, 8, 1), OffsetFetchRequest[0]), |
564 | | - ((0, 8, 2), OffsetFetchRequest[1]), |
565 | | - ((0, 9), OffsetFetchRequest[1]), |
566 | | - ((0, 10, 2), OffsetFetchRequest[2]), |
567 | | - ((0, 11), OffsetFetchRequest[3]), |
568 | | - ((2, 0), OffsetFetchRequest[4]), |
569 | | - ((2, 1), OffsetFetchRequest[5]), |
| 562 | +@pytest.mark.parametrize('api_version,version', [ |
| 563 | + ((0, 8, 1), 0), |
| 564 | + ((0, 8, 2), 1), |
| 565 | + ((0, 9), 1), |
| 566 | + ((0, 10, 2), 2), |
| 567 | + ((0, 11), 3), |
| 568 | + ((2, 0), 4), |
| 569 | + ((2, 1), 5), |
570 | 570 | ]) |
571 | 571 | def test_send_offset_fetch_request_versions(patched_coord, partitions, |
572 | | - api_version, req_type): |
| 572 | + api_version, version): |
573 | 573 | # assuming fixture sets coordinator=0, least_loaded_node=1 |
574 | 574 | expect_node = 0 |
575 | 575 | patched_coord._client._api_versions = BROKER_API_VERSIONS[api_version] |
576 | 576 |
|
577 | 577 | patched_coord._send_offset_fetch_request(partitions) |
578 | 578 | (node, request), _ = patched_coord._client.send.call_args |
579 | 579 | assert node == expect_node, 'Unexpected coordinator node' |
580 | | - assert isinstance(request, req_type) |
| 580 | + assert request.API_VERSION == version |
581 | 581 |
|
582 | 582 |
|
583 | 583 | def test_send_offset_fetch_request_failure(patched_coord, partitions): |
|
0 commit comments