@@ -190,6 +190,8 @@ class ClientSession:
190190 session_id : str
191191 closed : bool = False
192192 payloads : list [str ] = field (default_factory = list )
193+ streams_seen : set [str ] = field (default_factory = set )
194+ datagrams_seen : set [str ] = field (default_factory = set )
193195
194196
195197class ClientSessionTopologyHarness :
@@ -285,6 +287,186 @@ def session_for(self, client_id: str, connection_id: str, session_id: str) -> Cl
285287 return session
286288
287289
290+ class ClientSessionRobustnessHarness :
291+ """Carrier-aware runtime harness for T2 pressure, fault, and cleanup proof."""
292+
293+ def __init__ (
294+ self ,
295+ carrier : ProtocolCarrier ,
296+ scope : SessionScope | None = None ,
297+ * ,
298+ max_sessions : int = 32 ,
299+ max_streams : int = 32 ,
300+ max_queue : int = 32 ,
301+ max_message_size : int = 65536 ,
302+ max_datagram_size : int = 1200 ,
303+ ) -> None :
304+ self .carrier = carrier
305+ self .scope = scope
306+ self .max_sessions = max_sessions
307+ self .max_streams = max_streams
308+ self .max_queue = max_queue
309+ self .max_message_size = max_message_size
310+ self .max_datagram_size = max_datagram_size
311+ self .sessions : dict [str , ClientSession ] = {}
312+ self .errors : list [dict [str , Any ]] = []
313+
314+ def open (self , client_id : str , connection_id : str , session_id : str ) -> None :
315+ if len ([session for session in self .sessions .values () if not session .closed ]) >= self .max_sessions :
316+ self .fail_closed (
317+ client_id ,
318+ connection_id ,
319+ session_id ,
320+ ClientTopology .CONCURRENT_CLIENTS ,
321+ "max_sessions_exceeded" ,
322+ )
323+ raise BufferError ("session pressure budget exceeded" )
324+ if session_id in self .sessions and not self .sessions [session_id ].closed :
325+ self .fail_closed (
326+ client_id ,
327+ connection_id ,
328+ session_id ,
329+ ClientTopology .CONCURRENT_CLIENTS ,
330+ "duplicate_session" ,
331+ )
332+ raise ValueError ("duplicate session rejected" )
333+ self .sessions [session_id ] = ClientSession (
334+ client_id = client_id ,
335+ connection_id = connection_id ,
336+ session_id = session_id ,
337+ )
338+
339+ def send (
340+ self ,
341+ client_id : str ,
342+ connection_id : str ,
343+ session_id : str ,
344+ topology : ClientTopology ,
345+ payload : object ,
346+ * ,
347+ stream_id : str | int | None = None ,
348+ datagram_id : str | int | None = None ,
349+ ) -> None :
350+ session = self .session_for (client_id , connection_id , session_id , topology )
351+ identifiers = {"stream_id" : stream_id , "datagram_id" : datagram_id }
352+ if session .closed :
353+ self .fail_closed (client_id , connection_id , session_id , topology , "post_close_send" , ** identifiers )
354+ raise RuntimeError ("post-close send rejected" )
355+ if not isinstance (payload , (str , bytes )) or not payload :
356+ self .fail_closed (client_id , connection_id , session_id , topology , "malformed_payload" , ** identifiers )
357+ raise ValueError ("malformed payload rejected" )
358+ payload_size = len (payload )
359+ if payload_size > self .max_message_size :
360+ self .fail_closed (
361+ client_id ,
362+ connection_id ,
363+ session_id ,
364+ topology ,
365+ "message_pressure_budget_exceeded" ,
366+ ** identifiers ,
367+ )
368+ raise BufferError ("message pressure budget exceeded" )
369+ if datagram_id is not None and payload_size > self .max_datagram_size :
370+ self .fail_closed (
371+ client_id ,
372+ connection_id ,
373+ session_id ,
374+ topology ,
375+ "datagram_pressure_budget_exceeded" ,
376+ ** identifiers ,
377+ )
378+ raise BufferError ("datagram pressure budget exceeded" )
379+ if len (session .payloads ) >= self .max_queue :
380+ self .fail_closed (
381+ client_id ,
382+ connection_id ,
383+ session_id ,
384+ topology ,
385+ "queue_pressure_budget_exceeded" ,
386+ ** identifiers ,
387+ )
388+ raise BufferError ("queue pressure budget exceeded" )
389+ if stream_id is not None :
390+ session .streams_seen .add (str (stream_id ))
391+ if len (session .streams_seen ) > self .max_streams :
392+ self .fail_closed (
393+ client_id ,
394+ connection_id ,
395+ session_id ,
396+ topology ,
397+ "max_streams_exceeded" ,
398+ ** identifiers ,
399+ )
400+ session .streams_seen .remove (str (stream_id ))
401+ raise BufferError ("stream pressure budget exceeded" )
402+ if datagram_id is not None :
403+ session .datagrams_seen .add (str (datagram_id ))
404+ session .payloads .append (payload if isinstance (payload , str ) else payload .decode ("latin1" ))
405+
406+ def close (self , client_id : str , connection_id : str , session_id : str , topology : ClientTopology ) -> None :
407+ self .session_for (client_id , connection_id , session_id , topology ).closed = True
408+
409+ def cancel (self , client_id : str , connection_id : str , session_id : str , topology : ClientTopology ) -> None :
410+ self .session_for (client_id , connection_id , session_id , topology ).closed = True
411+ self .fail_closed (client_id , connection_id , session_id , topology , "cancelled" )
412+
413+ def timeout (self , client_id : str , connection_id : str , session_id : str , topology : ClientTopology ) -> None :
414+ self .session_for (client_id , connection_id , session_id , topology ).closed = True
415+ self .fail_closed (client_id , connection_id , session_id , topology , "timeout" )
416+
417+ def session_for (
418+ self ,
419+ client_id : str ,
420+ connection_id : str ,
421+ session_id : str ,
422+ topology : ClientTopology ,
423+ ) -> ClientSession :
424+ try :
425+ session = self .sessions [session_id ]
426+ except KeyError as exc :
427+ self .fail_closed (client_id , connection_id , session_id , topology , "unknown_session" )
428+ raise KeyError ("unknown session rejected" ) from exc
429+ if session .client_id != client_id or session .connection_id != connection_id :
430+ self .fail_closed (
431+ client_id ,
432+ connection_id ,
433+ session_id ,
434+ topology ,
435+ "cross_client_session_access" ,
436+ )
437+ raise PermissionError ("cross-client or cross-connection session access rejected" )
438+ return session
439+
440+ def fail_closed (
441+ self ,
442+ client_id : str ,
443+ connection_id : str ,
444+ session_id : str ,
445+ topology : ClientTopology ,
446+ error_kind : str ,
447+ ** identifiers : Any ,
448+ ) -> dict [str , Any ]:
449+ identifiers = {key : value for key , value in identifiers .items () if value is not None }
450+ row = build_matrix_row (
451+ protocol_carrier = self .carrier ,
452+ client_topology = topology ,
453+ session_scope = self .scope ,
454+ disposition = CoverageDisposition .FAIL_CLOSED ,
455+ lifecycle_behavior = CoverageDisposition .COVERED ,
456+ identity_isolation = CoverageDisposition .COVERED ,
457+ ordering_behavior = CoverageDisposition .COVERED ,
458+ pressure_mode = CoverageDisposition .COVERED ,
459+ fault_mode = CoverageDisposition .COVERED ,
460+ client_id = client_id ,
461+ connection_id = connection_id ,
462+ session_id = session_id ,
463+ error_kind = error_kind ,
464+ ** identifiers ,
465+ )
466+ self .errors .append (row )
467+ return row
468+
469+
288470def sequential_pair (carrier : ProtocolCarrier , scope : SessionScope | None = None ) -> ClientSessionTopologyHarness :
289471 topology = ClientTopology .SEQUENTIAL_CLIENTS
290472 harness = ClientSessionTopologyHarness (carrier , scope )
@@ -327,6 +509,7 @@ def bounded_interleaved_pair(
327509 "SESSION_SCOPE_VALUES" ,
328510 "BehaviorAxis" ,
329511 "ClientSession" ,
512+ "ClientSessionRobustnessHarness" ,
330513 "ClientSessionTopologyHarness" ,
331514 "ClientTopology" ,
332515 "CoverageDisposition" ,
0 commit comments