@@ -10767,3 +10767,211 @@ def __init__(self, context: TestContext, **kwargs: Any):
1076710767 extra_rp_conf = {"schema_registry_use_rpc" : True },
1076810768 ** kwargs ,
1076910769 )
10770+
10771+
10772+ class SchemaRegistryTransportStressTest (SchemaRegistryEndpoints ):
10773+ """
10774+ Stress test for schema registry transport resilience. Performs
10775+ concurrent SR read/write operations while transferring leadership of
10776+ the _schemas topic to verify zero HTTP 500 errors surface to clients.
10777+
10778+ Parameterized over both transport modes so behavior can be compared
10779+ and tuned together.
10780+ """
10781+
10782+ def __init__ (self , context : TestContext , ** kwargs ):
10783+ # extra_rp_conf = {}
10784+ # if "extra_rp_conf" in kwargs:
10785+ # extra_rp_conf.update(kwargs.pop("extra_rp_conf"))
10786+ super ().__init__ (
10787+ context ,
10788+ ** kwargs ,
10789+ )
10790+
10791+ @cluster (num_nodes = 3 )
10792+ def test_no_errors_during_leadership_transfers (self ):
10793+ import threading
10794+
10795+ admin = Admin (self .redpanda )
10796+
10797+ # --- Setup: register initial schemas so reads have data ---
10798+ num_subjects = 3
10799+ schema_ids = []
10800+ subjects = []
10801+ for i in range (num_subjects ):
10802+ subject = f"stress-test-subject-{ i } "
10803+ subjects .append (subject )
10804+ data = json .dumps (
10805+ {
10806+ "schema" : json .dumps (
10807+ {
10808+ "type" : "record" ,
10809+ "name" : f"rec{ i } " ,
10810+ "fields" : [{"name" : "f1" , "type" : "string" }],
10811+ }
10812+ ),
10813+ }
10814+ )
10815+ result = self .sr_client .post_subjects_subject_versions (
10816+ subject = subject , data = data
10817+ )
10818+ assert result .status_code == 200 , (
10819+ f"Setup: failed to register schema: { result .status_code } { result .text } "
10820+ )
10821+ schema_ids .append (result .json ()["id" ])
10822+
10823+ self .logger .info (
10824+ f"Setup complete: { num_subjects } subjects, schema_ids={ schema_ids } "
10825+ )
10826+
10827+ # --- Background workers ---
10828+ errors : list [str ] = []
10829+ stop_event = threading .Event ()
10830+
10831+ def reader_worker ():
10832+ """Continuously read subjects and schemas from random nodes."""
10833+ while not stop_event .is_set ():
10834+ for node in self .redpanda .nodes :
10835+ if stop_event .is_set ():
10836+ break
10837+ hostname = node .account .hostname
10838+ try :
10839+ r = self .sr_client .get_subjects (hostname = hostname )
10840+ if r .status_code == 500 :
10841+ errors .append (f"GET /subjects on { hostname } : 500 { r .text } " )
10842+ for sid in schema_ids :
10843+ if stop_event .is_set ():
10844+ break
10845+ r = self .sr_client .request (
10846+ "GET" ,
10847+ f"schemas/ids/{ sid } " ,
10848+ hostname = hostname ,
10849+ headers = HTTP_GET_HEADERS ,
10850+ )
10851+ if r .status_code == 500 :
10852+ errors .append (
10853+ f"GET /schemas/ids/{ sid } on { hostname } : "
10854+ f"500 { r .text } "
10855+ )
10856+ except Exception as e :
10857+ self .logger .warn (f"Reader exception on { hostname } : { e } " )
10858+
10859+ write_counter = 0
10860+ write_counter_lock = threading .Lock ()
10861+
10862+ def writer_worker ():
10863+ """Continuously register new schema versions."""
10864+ nonlocal write_counter
10865+ while not stop_event .is_set ():
10866+ with write_counter_lock :
10867+ write_counter += 1
10868+ seq = write_counter
10869+ subject = subjects [seq % num_subjects ]
10870+ data = json .dumps (
10871+ {
10872+ "schema" : json .dumps (
10873+ {
10874+ "type" : "record" ,
10875+ "name" : f"rec{ seq % num_subjects } " ,
10876+ "fields" : [
10877+ {"name" : "f1" , "type" : ["null" , "string" ]},
10878+ {
10879+ "name" : f"f_write_{ seq } " ,
10880+ "type" : "string" ,
10881+ "default" : "x" ,
10882+ },
10883+ ],
10884+ }
10885+ ),
10886+ }
10887+ )
10888+ try :
10889+ r = self .sr_client .post_subjects_subject_versions (
10890+ subject = subject , data = data
10891+ )
10892+ if r .status_code == 500 :
10893+ errors .append (
10894+ f"POST /subjects/{ subject } /versions: 500 { r .text } "
10895+ )
10896+ except Exception as e :
10897+ self .logger .warn (f"Writer exception: { e } " )
10898+
10899+ # Pace writes to avoid overwhelming the cluster
10900+ time .sleep (0.5 )
10901+
10902+ # Start 2 reader threads and 1 writer thread
10903+ threads = []
10904+ for _ in range (2 ):
10905+ t = threading .Thread (target = reader_worker , daemon = True )
10906+ t .start ()
10907+ threads .append (t )
10908+ t = threading .Thread (target = writer_worker , daemon = True )
10909+ t .start ()
10910+ threads .append (t )
10911+
10912+ # --- Perturbation: leadership transfers ---
10913+ num_transfers = 20
10914+ for i in range (num_transfers ):
10915+ leader = admin .get_partition_leader (
10916+ namespace = "kafka" , topic = "_schemas" , partition = 0
10917+ )
10918+ self .logger .info (
10919+ f"Transfer { i + 1 } /{ num_transfers } : moving leadership "
10920+ f"from node { leader } "
10921+ )
10922+ admin .partition_transfer_leadership (
10923+ namespace = "kafka" , topic = "_schemas" , partition = 0
10924+ )
10925+
10926+ def leader_changed ():
10927+ new_leader = admin .get_partition_leader (
10928+ namespace = "kafka" , topic = "_schemas" , partition = 0
10929+ )
10930+ return new_leader != leader
10931+
10932+ wait_until (
10933+ leader_changed ,
10934+ timeout_sec = 10 ,
10935+ backoff_sec = 1 ,
10936+ err_msg = "Leadership did not transfer" ,
10937+ )
10938+ # Brief pause between transfers
10939+ time .sleep (1 )
10940+
10941+ # --- Teardown ---
10942+ stop_event .set ()
10943+ for t in threads :
10944+ t .join (timeout = 30 )
10945+
10946+ self .logger .info (
10947+ f"Stress test complete: { num_transfers } leadership transfers, "
10948+ f"{ write_counter } writes attempted, { len (errors )} errors"
10949+ )
10950+ assert len (errors ) == 0 , (
10951+ f"Got { len (errors )} HTTP 500 errors during leadership transfers:\n "
10952+ + "\n " .join (errors [:20 ])
10953+ )
10954+
10955+
10956+ class SchemaRegistryRpcTransportStressTest (SchemaRegistryTransportStressTest ):
10957+ """
10958+ Kafka client transport variant of the leadership transfer stress test.
10959+ """
10960+
10961+ def __init__ (self , context : TestContext ):
10962+ super ().__init__ (
10963+ context ,
10964+ extra_rp_conf = {"schema_registry_use_rpc" : True },
10965+ )
10966+
10967+
10968+ class SchemaRegistryKafkaClientTransportStressTest (SchemaRegistryTransportStressTest ):
10969+ """
10970+ Kafka client transport variant of the leadership transfer stress test.
10971+ """
10972+
10973+ def __init__ (self , context : TestContext ):
10974+ super ().__init__ (
10975+ context ,
10976+ extra_rp_conf = {"schema_registry_use_rpc" : False },
10977+ )
0 commit comments