Skip to content

Commit 7fe6fcf

Browse files
authored
Prefer raised Exceptions to assert / AssertionError (#3042)
1 parent 9995ffb commit 7fe6fcf

23 files changed

Lines changed: 217 additions & 103 deletions

kafka/cli/admin/partitions/list_offsets.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@ def command(cls, client, args):
4242
@classmethod
4343
def _parse_partition_specs(cls, client, args):
4444
if args.partitions:
45-
assert not args.topic and not args.spec, "Either --partition or (--topic and --spec) is supported, but not both."
45+
if args.topic or args.spec:
46+
raise ValueError("Either --partition or (--topic and --spec) is supported, but not both.")
4647
partitions = args.partitions
4748
else:
48-
assert args.topic and args.spec, "Both --topic and --spec must be provided."
49+
if not args.topic or not args.spec:
50+
raise ValueError("Both --topic and --spec must be provided.")
4951
partitions = [f'{args.topic}:*:{args.spec}']
5052
tp_offsets = {}
5153
for entry in partitions:

kafka/codec.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,8 @@ def lz4f_decode(payload):
258258

259259
def lz4_encode_old_kafka(payload):
260260
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
261-
assert xxhash is not None
261+
if xxhash is None:
262+
raise RuntimeError('pip install xxhash for lz4 encoding with 0.8/0.9 brokers')
262263
data = lz4_encode(payload)
263264
header_size = 7
264265
flg = data[4]
@@ -288,7 +289,8 @@ def lz4_encode_old_kafka(payload):
288289

289290

290291
def lz4_decode_old_kafka(payload):
291-
assert xxhash is not None
292+
if xxhash is None:
293+
raise RuntimeError('pip install xxhash for lz4 encoding with 0.8/0.9 brokers')
292294
# Kafka's LZ4 code has a bug in its header checksum implementation
293295
header_size = 7
294296
if isinstance(payload[4], int):

kafka/consumer/fetcher.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ def fetched_records(self, max_records=None, update_offsets=True):
460460
configured max_partition_fetch_bytes
461461
TopicAuthorizationError: if consumer is not authorized to fetch
462462
messages from the topic
463+
ValueError: if max_records is <= 0
463464
464465
Returns: (records (dict), partial (bool))
465466
records: {TopicPartition: [messages]}
@@ -469,7 +470,8 @@ def fetched_records(self, max_records=None, update_offsets=True):
469470
"""
470471
if max_records is None:
471472
max_records = self.config['max_poll_records']
472-
assert max_records > 0
473+
if max_records <= 0:
474+
raise ValueError('max_records must be > 0')
473475

474476
if self._next_in_line_exception_metadata is not None:
475477
exc_meta = self._next_in_line_exception_metadata
@@ -783,6 +785,7 @@ def _handle_list_offsets_response(self, response):
783785
784786
Raises:
785787
TopicAuthorizationFailedError: if any topic returned an auth error
788+
ValueError: if ListOffsetsResponse v0 and > 1 offset returned
786789
"""
787790
fetched_offsets = dict()
788791
partitions_to_retry = set()
@@ -795,7 +798,8 @@ def _handle_list_offsets_response(self, response):
795798
if error_type is Errors.NoError:
796799
if response.API_VERSION == 0:
797800
offsets = partition_info.old_style_offsets
798-
assert len(offsets) <= 1, 'Expected ListOffsetsResponse with one offset'
801+
if len(offsets) > 1:
802+
raise ValueError('Expected ListOffsetsResponse with one offset')
799803
offset = offsets[0] if offsets else UNKNOWN_OFFSET
800804
else:
801805
offset = partition_info.offset

kafka/consumer/group.py

Lines changed: 66 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -608,11 +608,17 @@ def commit_async(self, offsets=None, callback=None):
608608
struct. This callback can be used to trigger custom actions when
609609
a commit request completes.
610610
611+
Raises:
612+
IncompatibleBrokerVersion: if broker version < 0.8.1
613+
IllegalStateError: if group_id is None
614+
611615
Returns:
612616
kafka.future.Future
613617
"""
614-
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
615-
assert self.config['group_id'] is not None, 'Requires group_id'
618+
if self.config['api_version'] < (0, 8, 1):
619+
raise Errors.IncompatibleBrokerVersion('Requires >= Kafka 0.8.1')
620+
if self.config['group_id'] is None:
621+
raise Errors.IllegalStateError('Requires group_id')
616622
if offsets is None:
617623
offsets = self._subscription.all_consumed_offsets()
618624
log.debug("Committing offsets: %s", offsets)
@@ -639,9 +645,15 @@ def commit(self, offsets=None, timeout_ms=None):
639645
offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict
640646
to commit with the configured group_id. Defaults to currently
641647
consumed offsets for all subscribed partitions.
648+
649+
Raises:
650+
IncompatibleBrokerVersion: if broker version < 0.8.1
651+
IllegalStateError: if group_id is None
642652
"""
643-
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
644-
assert self.config['group_id'] is not None, 'Requires group_id'
653+
if self.config['api_version'] < (0, 8, 1):
654+
raise Errors.IncompatibleBrokerVersion('Requires >= Kafka 0.8.1')
655+
if self.config['group_id'] is None:
656+
raise Errors.IllegalStateError('Requires group_id')
645657
if offsets is None:
646658
offsets = self._subscription.all_consumed_offsets()
647659
self._coordinator.commit_offsets_sync(offsets, timeout_ms=timeout_ms)
@@ -680,11 +692,16 @@ def committed(self, partition, metadata=False, timeout_ms=None):
680692
The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit.
681693
682694
Raises:
683-
KafkaTimeoutError if timeout_ms provided
684-
BrokerResponseErrors if OffsetFetchRequest raises an error.
695+
IncompatibleBrokerVersion: if broker version < 0.8.1
696+
IllegalStateError: if group_id is None
697+
TypeError: if partition is not TopicPartition
698+
KafkaTimeoutError: if timeout_ms provided
699+
BrokerResponseError: if OffsetFetchRequest raises an error.
685700
"""
686-
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
687-
assert self.config['group_id'] is not None, 'Requires group_id'
701+
if self.config['api_version'] < (0, 8, 1):
702+
raise Errors.IncompatibleBrokerVersion('Requires >= Kafka 0.8.1')
703+
if self.config['group_id'] is None:
704+
raise Errors.IllegalStateError('Requires group_id')
688705
if not isinstance(partition, TopicPartition):
689706
raise TypeError('partition must be a TopicPartition namedtuple')
690707
committed = self._coordinator.fetch_committed_offsets([partition], timeout_ms=timeout_ms)
@@ -755,6 +772,11 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
755772
in a single call to :meth:`~kafka.KafkaConsumer.poll`.
756773
Default: Inherit value from max_poll_records.
757774
775+
Raises:
776+
ValueError: if timeout is < 0 or max_records <= 0.
777+
TypeError: if max_records is not int.
778+
IllegalStateError: if consumer already closed.
779+
758780
Returns:
759781
dict[TopicPartition, list[ConsumerRecord]]: records since the last
760782
fetch for the subscribed list of topics and partitions.
@@ -765,12 +787,16 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
765787
# updated until the iterator returns each record to the user. As such,
766788
# the argument is not documented and should not be relied on by library
767789
# users to not break in the future.
768-
assert timeout_ms >= 0, 'Timeout must not be negative'
790+
if timeout_ms < 0:
791+
raise ValueError('Timeout must not be negative')
769792
if max_records is None:
770793
max_records = self.config['max_poll_records']
771-
assert isinstance(max_records, int), 'max_records must be an integer'
772-
assert max_records > 0, 'max_records must be positive'
773-
assert not self._closed, 'KafkaConsumer is closed'
794+
if not isinstance(max_records, int):
795+
raise TypeError('max_records must be an integer')
796+
if max_records <= 0:
797+
raise ValueError('max_records must be positive')
798+
if self._closed:
799+
raise Errors.IllegalStateError('KafkaConsumer is closed')
774800

775801
# Poll for new data until the timeout expires
776802
timer = Timer(timeout_ms)
@@ -830,12 +856,17 @@ def position(self, partition, timeout_ms=None):
830856
Arguments:
831857
partition (TopicPartition): Partition to check
832858
859+
Raises:
860+
TypeError: if partition is not a TopicPartition.
861+
IllegalStateError: if partition is not assigned.
862+
833863
Returns:
834864
int: Offset or None
835865
"""
836866
if not isinstance(partition, TopicPartition):
837867
raise TypeError('partition must be a TopicPartition namedtuple')
838-
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
868+
if not self._subscription.is_assigned(partition):
869+
raise Errors.IllegalStateError('Partition is not assigned')
839870

840871
timer = Timer(timeout_ms)
841872
# Phase 1: blocking refresh of committed offsets (network round-trip
@@ -883,12 +914,17 @@ def highwater(self, partition):
883914
Arguments:
884915
partition (TopicPartition): Partition to check
885916
917+
Raises:
918+
TypeError: if partition is not a TopicPartition.
919+
IllegalStateError: if partition is not assigned.
920+
886921
Returns:
887922
int or None: Offset if available
888923
"""
889924
if not isinstance(partition, TopicPartition):
890925
raise TypeError('partition must be a TopicPartition namedtuple')
891-
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
926+
if not self._subscription.is_assigned(partition):
927+
raise Errors.IllegalStateError('Partition is not assigned')
892928
return self._subscription.assignment[partition].highwater
893929

894930
def pause(self, *partitions):
@@ -950,13 +986,15 @@ def seek(self, partition, offset):
950986
offset (int): Message offset in partition
951987
952988
Raises:
953-
AssertionError: If offset is not an int >= 0; or if partition is not
989+
ValueError: If offset is not an int >= 0; or if partition is not
954990
currently assigned.
955991
"""
956992
if not isinstance(partition, TopicPartition):
957993
raise TypeError('partition must be a TopicPartition namedtuple')
958-
assert isinstance(offset, int) and offset >= 0, 'Offset must be >= 0'
959-
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
994+
if not isinstance(offset, int) or offset < 0:
995+
raise ValueError('Offset must be int >= 0')
996+
if partition not in self._subscription.assigned_partitions():
997+
raise ValueError('Unassigned partition')
960998
log.debug("Seeking to offset %s for partition %s", offset, partition)
961999
self._subscription.assignment[partition].seek(offset)
9621000
self._iterator = None
@@ -969,17 +1007,19 @@ def seek_to_beginning(self, *partitions):
9691007
default to all assigned partitions.
9701008
9711009
Raises:
972-
AssertionError: If any partition is not currently assigned, or if
1010+
ValueError: If any partition is not currently assigned, or if
9731011
no partitions are assigned.
9741012
"""
9751013
if not all([isinstance(p, TopicPartition) for p in partitions]):
9761014
raise TypeError('partitions must be TopicPartition namedtuples')
9771015
if not partitions:
9781016
partitions = self._subscription.assigned_partitions()
979-
assert partitions, 'No partitions are currently assigned'
1017+
if not partitions:
1018+
raise ValueError('No partitions are currently assigned')
9801019
else:
9811020
for p in partitions:
982-
assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
1021+
if p not in self._subscription.assigned_partitions():
1022+
raise ValueError('Unassigned partition: %s' % (p,))
9831023

9841024
for tp in partitions:
9851025
log.debug("Seeking to beginning of partition %s", tp)
@@ -994,17 +1034,19 @@ def seek_to_end(self, *partitions):
9941034
default to all assigned partitions.
9951035
9961036
Raises:
997-
AssertionError: If any partition is not currently assigned, or if
1037+
ValueError: If any partition is not currently assigned, or if
9981038
no partitions are assigned.
9991039
"""
10001040
if not all([isinstance(p, TopicPartition) for p in partitions]):
10011041
raise TypeError('partitions must be TopicPartition namedtuples')
10021042
if not partitions:
10031043
partitions = self._subscription.assigned_partitions()
1004-
assert partitions, 'No partitions are currently assigned'
1044+
if not partitions:
1045+
raise ValueError('No partitions are currently assigned')
10051046
else:
10061047
for p in partitions:
1007-
assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
1048+
if p not in self._subscription.assigned_partitions():
1049+
raise ValueError('Unassigned partition: %s' % (p,))
10081050

10091051
for tp in partitions:
10101052
log.debug("Seeking to end of partition %s", tp)
@@ -1049,7 +1091,7 @@ def subscribe(self, topics=(), pattern=None, listener=None):
10491091
Raises:
10501092
IllegalStateError: If called after previously calling
10511093
:meth:`~kafka.KafkaConsumer.assign`.
1052-
AssertionError: If neither topics or pattern is provided.
1094+
ValueError: If neither topics or pattern is provided.
10531095
TypeError: If listener is not a ConsumerRebalanceListener.
10541096
"""
10551097
# SubscriptionState handles error checking

kafka/consumer/subscription_state.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,15 @@ def subscribe(self, topics=(), pattern=None, listener=None):
118118
any listener set in a previous call to subscribe. It is
119119
guaranteed, however, that the partitions revoked/assigned
120120
through this interface are from topics subscribed in this call.
121+
122+
Raises:
123+
ValueError: if neither topics nor pattern provided.
124+
IllegalStateError: if both topics and pattern provided.
125+
TypeError: if topics is not a list/sequence, or listener is not
126+
a AsyncConsumerRebalanceListener or ConsumerRebalanceListener.
121127
"""
122-
assert topics or pattern, 'Must provide topics or pattern'
128+
if not topics and not pattern:
129+
raise ValueError('Must provide topics or pattern')
123130
if (topics and pattern):
124131
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
125132

@@ -191,7 +198,8 @@ def reset_group_subscription(self):
191198
"""Reset the group's subscription to only contain topics subscribed by this consumer."""
192199
if not self.partitions_auto_assigned():
193200
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
194-
assert self.subscription is not None, 'Subscription required'
201+
if self.subscription is None:
202+
raise Errors.IllegalStateError('Subscription required')
195203
self._group_subscription.intersection_update(self.subscription)
196204

197205
@synchronized
@@ -571,8 +579,10 @@ def __init__(self):
571579
self._pending_revocation = False
572580

573581
def _set_position(self, offset):
574-
assert self.has_valid_position, 'Valid position required'
575-
assert isinstance(offset, OffsetAndMetadata)
582+
if not self.has_valid_position:
583+
raise Errors.IllegalStateError('Valid position required')
584+
if not isinstance(offset, OffsetAndMetadata):
585+
raise TypeError('offset must be OffsetAndMetadata')
576586
self._position = offset
577587

578588
def _get_position(self):
@@ -581,7 +591,8 @@ def _get_position(self):
581591
position = property(_get_position, _set_position, None, "last position")
582592

583593
def reset(self, strategy):
584-
assert strategy is not None
594+
if strategy is None:
595+
raise ValueError('strategy cannot be None')
585596
self.reset_strategy = strategy
586597
self._position = None
587598
self.next_allowed_retry_time = None

kafka/coordinator/assignors/sticky/partition_movements.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ def move_partition(self, partition, old_consumer, new_consumer):
4949
if partition in self.partition_movements:
5050
# this partition has previously moved
5151
existing_pair = self._remove_movement_record_of_partition(partition)
52-
assert existing_pair.dst_member_id == old_consumer
52+
if existing_pair.dst_member_id != old_consumer:
53+
raise ValueError()
5354
if existing_pair.src_member_id != new_consumer:
5455
# the partition is not moving back to its previous consumer
5556
self._add_partition_movement_record(
@@ -63,7 +64,8 @@ def get_partition_to_be_moved(self, partition, old_consumer, new_consumer):
6364
return partition
6465
if partition in self.partition_movements:
6566
# this partition has previously moved
66-
assert old_consumer == self.partition_movements[partition].dst_member_id
67+
if old_consumer != self.partition_movements[partition].dst_member_id:
68+
raise ValueError()
6769
old_consumer = self.partition_movements[partition].src_member_id
6870
reverse_pair = ConsumerPair(src_member_id=new_consumer, dst_member_id=old_consumer)
6971
if reverse_pair not in self.partition_movements_by_topic[partition.topic]:

0 commit comments

Comments
 (0)