Skip to content

Commit 29c6f44

Browse files
authored
Use new protocol in kafka.consumer (Fetch/ListOffsets) (#2767)
1 parent bb263b7 commit 29c6f44

4 files changed

Lines changed: 10 additions & 9 deletions

File tree

kafka/consumer/fetcher.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import kafka.errors as Errors
99
from kafka.future import Future
1010
from kafka.metrics.stats import Avg, Count, Max, Rate
11-
from kafka.protocol.fetch import FetchRequest, AbortedTransaction
12-
from kafka.protocol.list_offsets import (
11+
from kafka.protocol.new.consumer import FetchRequest
12+
from kafka.protocol.new.consumer import (
1313
ListOffsetsRequest, OffsetResetStrategy, UNKNOWN_OFFSET
1414
)
1515
from kafka.record import MemoryRecords
@@ -910,7 +910,7 @@ class PartitionRecords(object):
910910
def __init__(self, fetch_offset, tp, records,
911911
key_deserializer=None, value_deserializer=None,
912912
check_crcs=True, isolation_level=READ_UNCOMMITTED,
913-
aborted_transactions=None, # raw data from response / list of (producer_id, first_offset) tuples
913+
aborted_transactions=None, # AbortedTransaction data from FetchResponse
914914
metric_aggregator=None, on_drain=lambda x: None):
915915
self.fetch_offset = fetch_offset
916916
self.topic_partition = tp
@@ -921,8 +921,7 @@ def __init__(self, fetch_offset, tp, records,
921921
self.isolation_level = isolation_level
922922
self.aborted_producer_ids = set()
923923
self.aborted_transactions = collections.deque(
924-
sorted([AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else [],
925-
key=lambda txn: txn.first_offset)
924+
sorted(aborted_transactions or [], key=lambda txn: txn.first_offset)
926925
)
927926
self.metric_aggregator = metric_aggregator
928927
self.check_crcs = check_crcs

kafka/consumer/group.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from kafka.coordinator.assignors.range import RangePartitionAssignor
1414
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
1515
from kafka.metrics import MetricConfig, Metrics
16-
from kafka.protocol.list_offsets import OffsetResetStrategy
16+
from kafka.protocol.new.consumer import OffsetResetStrategy
1717
from kafka.structs import OffsetAndMetadata, TopicPartition
1818
from kafka.util import Timer
1919
from kafka.version import __version__

kafka/consumer/subscription_state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import time
1010

1111
import kafka.errors as Errors
12-
from kafka.protocol.list_offsets import OffsetResetStrategy
12+
from kafka.protocol.new.consumer import OffsetResetStrategy
1313
from kafka.structs import OffsetAndMetadata
1414
from kafka.util import ensure_valid_topic_name, synchronized
1515

test/test_fetcher.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
import kafka.errors as Errors
1515
from kafka.future import Future
1616
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
17-
from kafka.protocol.fetch import FetchRequest, FetchResponse
18-
from kafka.protocol.list_offsets import ListOffsetsResponse, OffsetResetStrategy
17+
from kafka.protocol.new.consumer import (
18+
FetchRequest, FetchResponse,
19+
ListOffsetsResponse, OffsetResetStrategy,
20+
)
1921
from kafka.errors import (
2022
StaleMetadata, NotLeaderForPartitionError,
2123
UnknownTopicOrPartitionError, OffsetOutOfRangeError

0 commit comments

Comments
 (0)