@@ -10791,3 +10791,265 @@ def __init__(self, context: TestContext, **kwargs: Any):
1079110791 extra_rp_conf = {"schema_registry_use_rpc" : True },
1079210792 ** kwargs ,
1079310793 )
10794+
10795+
10796+ class SchemaRegistryTransportStressTest (SchemaRegistryEndpoints ):
10797+ """
10798+ Stress test for schema registry transport resilience. Performs
10799+ concurrent SR read/write operations while transferring leadership of
10800+ the _schemas topic to verify zero HTTP 500 errors surface to clients.
10801+
10802+ Parameterized over both transport modes so behavior can be compared
10803+ and tuned together.
10804+ """
10805+
10806+ def __init__ (self , context : TestContext , ** kwargs ):
10807+ super ().__init__ (
10808+ context ,
10809+ ** kwargs ,
10810+ )
10811+
10812+ @cluster (num_nodes = 3 )
10813+ def test_no_errors_during_leadership_transfers (self ):
10814+ import threading
10815+
10816+ admin = Admin (self .redpanda )
10817+
10818+ # --- Setup: register initial schemas so reads have data ---
10819+ num_subjects = 3
10820+ schema_ids = []
10821+ subjects = []
10822+ for i in range (num_subjects ):
10823+ subject = f"stress-test-subject-{ i } "
10824+ subjects .append (subject )
10825+ data = json .dumps (
10826+ {
10827+ "schema" : json .dumps (
10828+ {
10829+ "type" : "record" ,
10830+ "name" : f"rec{ i } " ,
10831+ "fields" : [{"name" : "f1" , "type" : "string" }],
10832+ }
10833+ ),
10834+ }
10835+ )
10836+ result = self .sr_client .post_subjects_subject_versions (
10837+ subject = subject , data = data
10838+ )
10839+ assert result .status_code == 200 , (
10840+ f"Setup: failed to register schema: { result .status_code } { result .text } "
10841+ )
10842+ schema_ids .append (result .json ()["id" ])
10843+
10844+ self .logger .info (
10845+ f"Setup complete: { num_subjects } subjects, schema_ids={ schema_ids } "
10846+ )
10847+
10848+ # --- Background workers ---
10849+ request_counter = 0
10850+ request_counter_lock = threading .Lock ()
10851+ errors : list [str ] = []
10852+ stop_event = threading .Event ()
10853+
10854+ def count_request ():
10855+ nonlocal request_counter
10856+ with request_counter_lock :
10857+ request_counter += 1
10858+
10859+ # Short timeout so threads don't block teardown.
10860+ req_timeout = 10
10861+
10862+ def reader_worker ():
10863+ """Continuously read subjects and schemas from random nodes."""
10864+ while not stop_event .is_set ():
10865+ for node in self .redpanda .nodes :
10866+ if stop_event .is_set ():
10867+ break
10868+ hostname = node .account .hostname
10869+ try :
10870+ count_request ()
10871+ r = self .sr_client .get_subjects (
10872+ hostname = hostname , timeout = req_timeout
10873+ )
10874+ if r .status_code == 500 :
10875+ errors .append (f"GET /subjects on { hostname } : 500 { r .text } " )
10876+ for sid in schema_ids :
10877+ if stop_event .is_set ():
10878+ break
10879+ count_request ()
10880+ r = self .sr_client .request (
10881+ "GET" ,
10882+ f"schemas/ids/{ sid } " ,
10883+ hostname = hostname ,
10884+ headers = HTTP_GET_HEADERS ,
10885+ timeout = req_timeout ,
10886+ )
10887+ if r .status_code == 500 :
10888+ errors .append (
10889+ f"GET /schemas/ids/{ sid } on { hostname } : "
10890+ f"500 { r .text } "
10891+ )
10892+ except Exception as e :
10893+ self .logger .warn (f"Reader exception on { hostname } : { e } " )
10894+
10895+ def writer_worker ():
10896+ """Continuously register new schema versions."""
10897+ seq = 0
10898+ while not stop_event .is_set ():
10899+ seq += 1
10900+ subject = subjects [seq % num_subjects ]
10901+ data = json .dumps (
10902+ {
10903+ "schema" : json .dumps (
10904+ {
10905+ "type" : "record" ,
10906+ "name" : f"rec{ seq % num_subjects } " ,
10907+ "fields" : [
10908+ {"name" : "f1" , "type" : ["null" , "string" ]},
10909+ {
10910+ "name" : f"f_write_{ seq } " ,
10911+ "type" : "string" ,
10912+ "default" : "x" ,
10913+ },
10914+ ],
10915+ }
10916+ ),
10917+ }
10918+ )
10919+ try :
10920+ count_request ()
10921+ r = self .sr_client .post_subjects_subject_versions (
10922+ subject = subject ,
10923+ data = data ,
10924+ timeout = req_timeout ,
10925+ )
10926+ if r .status_code == 500 :
10927+ errors .append (
10928+ f"POST /subjects/{ subject } /versions: 500 { r .text } "
10929+ )
10930+ except Exception as e :
10931+ self .logger .warn (f"Writer exception: { e } " )
10932+
10933+ # Pace writes to avoid overwhelming the cluster. Using
10934+ # stop_event.wait lets teardown cancel the pause instead of
10935+ # running out a full 0.5s of sleep.
10936+ if stop_event .wait (0.5 ):
10937+ break
10938+
10939+ # Start 2 reader threads and 1 writer thread
10940+ threads = []
10941+ for _ in range (2 ):
10942+ t = threading .Thread (target = reader_worker )
10943+ t .start ()
10944+ threads .append (t )
10945+ t = threading .Thread (target = writer_worker )
10946+ t .start ()
10947+ threads .append (t )
10948+
10949+ # --- Perturbation: leadership transfers ---
10950+ num_transfers = 20
10951+ node_ids = [self .redpanda .node_id (n ) for n in self .redpanda .nodes ]
10952+ for i in range (num_transfers ):
10953+ leader = admin .get_partition_leader (
10954+ namespace = "kafka" , topic = "_schemas" , partition = 0
10955+ )
10956+ # Pick a specific target so the same node doesn't re-elect itself.
10957+ targets = [n for n in node_ids if n != leader ]
10958+ target = targets [i % len (targets )]
10959+ self .logger .info (
10960+ f"Transfer { i + 1 } /{ num_transfers } : moving leadership "
10961+ f"from node { leader } to node { target } "
10962+ )
10963+ admin .partition_transfer_leadership (
10964+ namespace = "kafka" ,
10965+ topic = "_schemas" ,
10966+ partition = 0 ,
10967+ target_id = target ,
10968+ )
10969+
10970+ # Wait for the specific target to become leader, not just "any
10971+ # leader other than the old one" — this is a stronger
10972+ # stabilization signal and makes the post-transfer cooldown
10973+ # unnecessary.
10974+ wait_until (
10975+ lambda : admin .get_partition_leader (
10976+ namespace = "kafka" , topic = "_schemas" , partition = 0
10977+ )
10978+ == target ,
10979+ timeout_sec = 10 ,
10980+ backoff_sec = 1 ,
10981+ err_msg = f"Leadership did not transfer to node { target } " ,
10982+ )
10983+
10984+ # --- Teardown ---
10985+ stop_event .set ()
10986+ for t in threads :
10987+ t .join (timeout = req_timeout + 5 )
10988+ alive = [t for t in threads if t .is_alive ()]
10989+ assert not alive , f"{ len (alive )} worker thread(s) still alive after join"
10990+
10991+ total_requests = request_counter
10992+ error_rate = len (errors ) / total_requests if total_requests else 0
10993+ self .logger .info (
10994+ f"Stress test complete: { num_transfers } leadership transfers, "
10995+ f"{ total_requests } requests, { len (errors )} errors "
10996+ f"({ error_rate :.2%} )"
10997+ )
10998+
10999+ # A small number of transient 500s during rapid leadership
11000+ # transfers is acceptable — the internal retry budget can be
11001+ # exhausted if a transfer is slow to propagate. The important
11002+ # thing is that the error rate is low: the system recovers
11003+ # quickly and subsequent requests succeed. A real retry-path
11004+ # regression spikes well above 1%, so this catches meaningful
11005+ # breakage without false-positiving on CI noise.
11006+ assert error_rate < 0.01 , (
11007+ f"Error rate { error_rate :.2%} exceeds 1% threshold "
11008+ f"({ len (errors )} errors in { total_requests } requests):\n "
11009+ + "\n " .join (errors [:20 ])
11010+ )
11011+
11012+ # Verify the system recovers after transfers complete: every
11013+ # node must be able to serve a basic read within a reasonable
11014+ # window. wait_until absorbs transient CI slowness while still
11015+ # catching real breakage.
11016+ def all_nodes_healthy ():
11017+ for node in self .redpanda .nodes :
11018+ r = self .sr_client .get_subjects (
11019+ hostname = node .account .hostname , timeout = req_timeout
11020+ )
11021+ if r .status_code != 200 :
11022+ return False
11023+ return True
11024+
11025+ wait_until (
11026+ all_nodes_healthy ,
11027+ timeout_sec = 30 ,
11028+ backoff_sec = 2 ,
11029+ err_msg = "Schema registry did not recover on all nodes "
11030+ "after leadership transfers" ,
11031+ )
11032+
11033+
11034+ class SchemaRegistryRpcTransportStressTest (SchemaRegistryTransportStressTest ):
11035+ """
11036+ RPC transport variant of the leadership transfer stress test.
11037+ """
11038+
11039+ def __init__ (self , context : TestContext ):
11040+ super ().__init__ (
11041+ context ,
11042+ extra_rp_conf = {"schema_registry_use_rpc" : True },
11043+ )
11044+
11045+
11046+ class SchemaRegistryKafkaClientTransportStressTest (SchemaRegistryTransportStressTest ):
11047+ """
11048+ Kafka client transport variant of the leadership transfer stress test.
11049+ """
11050+
11051+ def __init__ (self , context : TestContext ):
11052+ super ().__init__ (
11053+ context ,
11054+ extra_rp_conf = {"schema_registry_use_rpc" : False },
11055+ )
0 commit comments