2525 ConfigResource ,
2626)
2727
28-
2928BOOTSTRAP_SERVERS = 'localhost:9092'
3029POLL_TIMEOUT = 10.0 # share session warm-up takes a few seconds
3130
@@ -39,8 +38,7 @@ def _unique_id():
3938
4039def _broker_reachable ():
4140 try :
42- p = Producer ({'bootstrap.servers' : BOOTSTRAP_SERVERS ,
43- 'socket.timeout.ms' : 2000 })
41+ p = Producer ({'bootstrap.servers' : BOOTSTRAP_SERVERS , 'socket.timeout.ms' : 2000 })
4442 return len (p .list_topics (timeout = 2.0 ).brokers ) > 0
4543 except Exception :
4644 return False
@@ -174,8 +172,7 @@ def test_implicit_mode_autocommits_on_next_poll():
174172 _warmup (sc2 )
175173 leftovers = _collect (sc2 , 1 , timeout = 5.0 )
176174 assert leftovers == [], (
177- f'expected no redelivery, got { [m .value () for m in leftovers ]} '
178- f'(produced { produced } )'
175+ f'expected no redelivery, got { [m .value () for m in leftovers ]} ' f'(produced { produced } )'
179176 )
180177 finally :
181178 sc2 .close ()
@@ -379,10 +376,7 @@ def _set_group_configs(group_id, configs):
379376 admin = AdminClient ({'bootstrap.servers' : BOOTSTRAP_SERVERS })
380377 res = ConfigResource (ConfigResource .Type .GROUP , group_id )
381378 for name , value in configs .items ():
382- res .add_incremental_config (
383- ConfigEntry (name , str (value ),
384- incremental_operation = AlterConfigOpType .SET )
385- )
379+ res .add_incremental_config (ConfigEntry (name , str (value ), incremental_operation = AlterConfigOpType .SET ))
386380 for f in admin .incremental_alter_configs ([res ]).values ():
387381 f .result (timeout = 10 )
388382
@@ -400,8 +394,7 @@ def test_lock_elapsed_acknowledge_does_not_consume_record():
400394 _create_topic (topic )
401395
402396 try :
403- _set_group_configs (group ,
404- {'share.record.lock.duration.ms' : SHORT_LOCK_MS })
397+ _set_group_configs (group , {'share.record.lock.duration.ms' : SHORT_LOCK_MS })
405398 except KafkaException as e :
406399 pytest .skip (f'cannot lower share group lock duration: { e } ' )
407400
@@ -438,10 +431,7 @@ def test_lock_elapsed_acknowledge_does_not_consume_record():
438431 except KafkaException :
439432 continue
440433 for m in batch :
441- if (
442- m .error () is None
443- and (m .topic (), m .partition (), m .offset ()) == coords
444- ):
434+ if m .error () is None and (m .topic (), m .partition (), m .offset ()) == coords :
445435 seen_again = True
446436 sc .acknowledge (m , AcknowledgeType .ACCEPT )
447437 break
@@ -463,8 +453,7 @@ def test_lock_elapsed_record_redelivered_to_same_consumer():
463453 _create_topic (topic )
464454
465455 try :
466- _set_group_configs (group ,
467- {'share.record.lock.duration.ms' : SHORT_LOCK_MS })
456+ _set_group_configs (group , {'share.record.lock.duration.ms' : SHORT_LOCK_MS })
468457 except KafkaException as e :
469458 pytest .skip (f'cannot lower share group lock duration: { e } ' )
470459
@@ -491,9 +480,7 @@ def test_lock_elapsed_record_redelivered_to_same_consumer():
491480 seen_again = False
492481 while not seen_again and time .monotonic () < deadline :
493482 for m in sc .poll (timeout = 2.0 ):
494- if m .error () is None and (
495- m .topic (), m .partition (), m .offset ()
496- ) == coords :
483+ if m .error () is None and (m .topic (), m .partition (), m .offset ()) == coords :
497484 seen_again = True
498485 sc .acknowledge (m , AcknowledgeType .ACCEPT )
499486 break
@@ -543,10 +530,7 @@ def test_delivery_attempt_limit_archives_record():
543530 deadline = time .monotonic () + 10.0
544531 while time .monotonic () < deadline :
545532 for m in sc .poll (timeout = 2.0 ):
546- if (
547- m .error () is None
548- and (m .topic (), m .partition (), m .offset ()) == coords
549- ):
533+ if m .error () is None and (m .topic (), m .partition (), m .offset ()) == coords :
550534 pytest .fail (
551535 f'record { coords } was redelivered after hitting '
552536 f'delivery.attempt.limit — broker should have archived it'
@@ -555,7 +539,6 @@ def test_delivery_attempt_limit_archives_record():
555539 sc .close ()
556540
557541
558-
559542@pytest .mark .integration
560543@broker_required
561544def test_open_transaction_stalls_share_group ():
@@ -566,18 +549,19 @@ def test_open_transaction_stalls_share_group():
566549 group = f'g_{ _unique_id ()} '
567550 _create_topic (topic )
568551
569- txn_producer = Producer ({
570- 'bootstrap.servers' : BOOTSTRAP_SERVERS ,
571- 'transactional.id' : f'txn_{ _unique_id ()} ' ,
572- })
552+ txn_producer = Producer (
553+ {
554+ 'bootstrap.servers' : BOOTSTRAP_SERVERS ,
555+ 'transactional.id' : f'txn_{ _unique_id ()} ' ,
556+ }
557+ )
573558 try :
574559 txn_producer .init_transactions (10 )
575560 except KafkaException as e :
576561 pytest .skip (f'broker does not support transactions: { e } ' )
577562
578563 try :
579- _set_group_configs (group ,
580- {'share.isolation.level' : 'read_committed' })
564+ _set_group_configs (group , {'share.isolation.level' : 'read_committed' })
581565 except KafkaException as e :
582566 pytest .skip (f'cannot set share.isolation.level on group: { e } ' )
583567
@@ -595,15 +579,13 @@ def test_open_transaction_stalls_share_group():
595579 # While the transaction is open, read_committed must NOT deliver.
596580 stalled = _collect (sc , 1 , timeout = 5.0 )
597581 assert stalled == [], (
598- f'open transaction did not stall the share group: '
599- f'received { [m .value () for m in stalled ]} '
582+ f'open transaction did not stall the share group: ' f'received { [m .value () for m in stalled ]} '
600583 )
601584
602585 # Commit the transaction — records should now flow.
603586 txn_producer .commit_transaction (10 )
604587
605- msgs = _collect (sc , 3 , POLL_TIMEOUT ,
606- ack_type = AcknowledgeType .ACCEPT )
588+ msgs = _collect (sc , 3 , POLL_TIMEOUT , ack_type = AcknowledgeType .ACCEPT )
607589 assert len (msgs ) == 3 , f'expected 3 msgs after commit, got { len (msgs )} '
608590 finally :
609591 sc .close ()
0 commit comments