Skip to content

Commit 1b38ef5

Browse files
authored
Use new protocol in kafka.producer (#2765)
1 parent c51a6c9 commit 1b38ef5

4 files changed

Lines changed: 13 additions & 12 deletions

File tree

kafka/producer/sender.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99
from kafka.metrics.measurable import AnonMeasurable
1010
from kafka.metrics.stats import Avg, Max, Rate
1111
from kafka.producer.transaction_manager import ProducerIdAndEpoch
12-
from kafka.protocol.init_producer_id import InitProducerIdRequest
13-
from kafka.protocol.produce import ProduceRequest
12+
from kafka.protocol.new.producer import InitProducerIdRequest, ProduceRequest
1413
from kafka.structs import TopicPartition
1514
from kafka.version import __version__
1615

kafka/producer/transaction_manager.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@
66
import threading
77

88
import kafka.errors as Errors
9-
from kafka.protocol.add_offsets_to_txn import AddOffsetsToTxnRequest
10-
from kafka.protocol.add_partitions_to_txn import AddPartitionsToTxnRequest
11-
from kafka.protocol.end_txn import EndTxnRequest
12-
from kafka.protocol.find_coordinator import FindCoordinatorRequest
13-
from kafka.protocol.init_producer_id import InitProducerIdRequest
14-
from kafka.protocol.txn_offset_commit import TxnOffsetCommitRequest
9+
from kafka.protocol.new.metadata import FindCoordinatorRequest
10+
from kafka.protocol.new.producer import (
11+
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest,
12+
EndTxnRequest, InitProducerIdRequest, TxnOffsetCommitRequest,
13+
)
1514
from kafka.structs import TopicPartition
1615

1716

@@ -942,7 +941,7 @@ def handle_response(self, response):
942941
log.debug("Successfully added offsets for %s from consumer group %s to transaction.",
943942
tp, self.consumer_group_id)
944943
del self.transaction_manager._pending_txn_offset_commits[tp]
945-
elif error in (errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError, Errors.RequestTimedOutError):
944+
elif error in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError, Errors.RequestTimedOutError):
946945
retriable_failure = True
947946
lookup_coordinator = True
948947
elif error is Errors.UnknownTopicOrPartitionError:

test/integration/test_producer_integration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ def test_transactional_producer_offsets(kafka_broker):
194194
with producer_factory(bootstrap_servers=connect_str, transactional_id='testing') as producer:
195195
producer.init_transactions()
196196
producer.begin_transaction()
197+
producer.send('transactional_test_topic', partition=0, value=b'msg1').get()
197198
producer.send_offsets_to_transaction(offsets, 'txn-test-group')
198199
producer.commit_transaction()
199200

test/test_sender.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import kafka.errors as Errors
1414
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
1515
from kafka.producer.kafka import KafkaProducer
16-
from kafka.protocol.produce import ProduceRequest
16+
from kafka.protocol.new.producer import ProduceRequest
1717
from kafka.producer.future import FutureRecordMetadata
1818
from kafka.producer.producer_batch import ProducerBatch
1919
from kafka.producer.record_accumulator import RecordAccumulator
@@ -64,7 +64,8 @@ def test_produce_request(sender, api_version, produce_version):
6464
magic = KafkaProducer.max_usable_produce_magic(api_version)
6565
batch = producer_batch(magic=magic)
6666
produce_request = sender._produce_request(0, 0, 0, [batch])
67-
assert isinstance(produce_request, ProduceRequest[produce_version])
67+
assert isinstance(produce_request, ProduceRequest)
68+
assert produce_request.version == produce_version
6869

6970

7071
@pytest.mark.parametrize(("api_version", "produce_version"), [
@@ -81,7 +82,8 @@ def test_create_produce_requests(sender, api_version, produce_version):
8182
produce_requests_by_node = sender._create_produce_requests(batches_by_node)
8283
assert len(produce_requests_by_node) == 3
8384
for node in range(3):
84-
assert isinstance(produce_requests_by_node[node], ProduceRequest[produce_version])
85+
assert isinstance(produce_requests_by_node[node], ProduceRequest)
86+
assert produce_requests_by_node[node].version == produce_version
8587

8688

8789
def test_complete_batch_success(sender):

0 commit comments

Comments
 (0)