@@ -177,7 +177,7 @@ def is_terminated(self) -> bool:
177177 """Check if this transport has been explicitly terminated."""
178178 return self ._terminated
179179
180- def close_sse_stream (self , request_id : RequestId ) -> None : # pragma: lax no cover
180+ def close_sse_stream (self , request_id : RequestId ) -> None :
181181 """Close SSE connection for a specific request without terminating the stream.
182182
183183 This method closes the HTTP connection for the specified request, triggering
@@ -200,12 +200,12 @@ def close_sse_stream(self, request_id: RequestId) -> None: # pragma: lax no cov
200200 writer .close ()
201201
202202 # Also close and remove request streams
203- if request_id in self ._request_streams :
203+ if request_id in self ._request_streams : # pragma: no branch
204204 send_stream , receive_stream = self ._request_streams .pop (request_id )
205205 send_stream .close ()
206206 receive_stream .close ()
207207
208- def close_standalone_sse_stream (self ) -> None : # pragma: lax no cover
208+ def close_standalone_sse_stream (self ) -> None :
209209 """Close the standalone GET SSE stream, triggering client reconnection.
210210
211211 This method closes the HTTP connection for the standalone GET stream used
@@ -240,10 +240,10 @@ def _create_session_message(
240240 # Only provide close callbacks when client supports resumability
241241 if self ._event_store and protocol_version >= "2025-11-25" :
242242
243- async def close_stream_callback () -> None : # pragma: lax no cover
243+ async def close_stream_callback () -> None :
244244 self .close_sse_stream (request_id )
245245
246- async def close_standalone_stream_callback () -> None : # pragma: lax no cover
246+ async def close_standalone_stream_callback () -> None :
247247 self .close_standalone_sse_stream ()
248248
249249 metadata = ServerMessageMetadata (
@@ -291,7 +291,7 @@ def _create_error_response(
291291 ) -> Response :
292292 """Create an error response with a simple string message."""
293293 response_headers = {"Content-Type" : CONTENT_TYPE_JSON }
294- if headers : # pragma: lax no cover
294+ if headers :
295295 response_headers .update (headers )
296296
297297 if self .mcp_session_id :
@@ -342,7 +342,7 @@ def _create_event_data(self, event_message: EventMessage) -> dict[str, str]:
342342 }
343343
344344 # If an event ID was provided, include it
345- if event_message .event_id : # pragma: lax no cover
345+ if event_message .event_id :
346346 event_data ["id" ] = event_message .event_id
347347
348348 return event_data
@@ -372,7 +372,7 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
372372 await error_response (scope , receive , send )
373373 return
374374
375- if self ._terminated : # pragma: lax no cover
375+ if self ._terminated :
376376 # If the session has been terminated, return 404 Not Found
377377 response = self ._create_error_response (
378378 "Not Found: Session has been terminated" ,
@@ -387,7 +387,7 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
387387 await self ._handle_get_request (request , send )
388388 elif request .method == "DELETE" :
389389 await self ._handle_delete_request (request , send )
390- else : # pragma: lax no cover
390+ else :
391391 await self ._handle_unsupported_request (request , send )
392392
393393 def _check_accept_headers (self , request : Request ) -> tuple [bool , bool ]:
@@ -467,7 +467,7 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
467467
468468 try :
469469 message = jsonrpc_message_adapter .validate_python (raw_message , by_name = False )
470- except ValidationError as e : # pragma: lax no cover
470+ except ValidationError as e :
471471 response = self ._create_error_response (
472472 f"Validation error: { str (e )} " ,
473473 HTTPStatus .BAD_REQUEST ,
@@ -493,7 +493,7 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
493493 )
494494 await response (scope , receive , send )
495495 return
496- elif not await self ._validate_request_headers (request , send ): # pragma: lax no cover
496+ elif not await self ._validate_request_headers (request , send ):
497497 return
498498
499499 # For notifications and responses only, return 202 Accepted
@@ -633,7 +633,7 @@ async def sse_writer(): # pragma: lax no cover
633633 finally :
634634 await sse_stream_reader .aclose ()
635635
636- except Exception as err : # pragma: lax no cover
636+ except Exception as err : # pragma: no cover
637637 logger .exception ("Error handling POST request" )
638638 response = self ._create_error_response (
639639 f"Error handling POST request: { err } " ,
@@ -659,19 +659,19 @@ async def _handle_get_request(self, request: Request, send: Send) -> None:
659659 # Validate Accept header - must include text/event-stream
660660 _ , has_sse = self ._check_accept_headers (request )
661661
662- if not has_sse : # pragma: lax no cover
662+ if not has_sse :
663663 response = self ._create_error_response (
664664 "Not Acceptable: Client must accept text/event-stream" ,
665665 HTTPStatus .NOT_ACCEPTABLE ,
666666 )
667667 await response (request .scope , request .receive , send )
668668 return
669669
670- if not await self ._validate_request_headers (request , send ): # pragma: lax no cover
670+ if not await self ._validate_request_headers (request , send ):
671671 return
672672
673673 # Handle resumability: check for Last-Event-ID header
674- if last_event_id := request .headers .get (LAST_EVENT_ID_HEADER ): # pragma: lax no cover
674+ if last_event_id := request .headers .get (LAST_EVENT_ID_HEADER ):
675675 await self ._replay_events (last_event_id , request , send )
676676 return
677677
@@ -681,11 +681,11 @@ async def _handle_get_request(self, request: Request, send: Send) -> None:
681681 "Content-Type" : CONTENT_TYPE_SSE ,
682682 }
683683
684- if self .mcp_session_id :
684+ if self .mcp_session_id : # pragma: no branch
685685 headers [MCP_SESSION_ID_HEADER ] = self .mcp_session_id
686686
687687 # Check if we already have an active GET stream
688- if GET_STREAM_KEY in self ._request_streams : # pragma: lax no cover
688+ if GET_STREAM_KEY in self ._request_streams :
689689 response = self ._create_error_response (
690690 "Conflict: Only one SSE stream is allowed per session" ,
691691 HTTPStatus .CONFLICT ,
@@ -714,8 +714,12 @@ async def standalone_sse_writer():
714714 # Send the message via SSE
715715 event_data = self ._create_event_data (event_message )
716716 await sse_stream_writer .send (event_data )
717- except Exception : # pragma: lax no cover
718- logger .exception ("Error in standalone SSE writer" )
717+ except Exception :
718+ # Timing race: if task cancellation reaches this writer before it
719+ # hits the closed stream, we get CancelledError (not caught here).
720+ # If the closed-stream error fires first, this logs. Neither order is
721+ # a bug — both are valid shutdown sequences.
722+ logger .exception ("Error in standalone SSE writer" ) # pragma: lax no cover
719723 finally :
720724 logger .debug ("Closing standalone SSE writer" )
721725 await self ._clean_up_memory_streams (GET_STREAM_KEY )
@@ -791,13 +795,13 @@ async def terminate(self) -> None:
791795 # During cleanup, we catch all exceptions since streams might be in various states
792796 logger .debug (f"Error closing streams: { e } " )
793797
794- async def _handle_unsupported_request (self , request : Request , send : Send ) -> None : # pragma: lax no cover
798+ async def _handle_unsupported_request (self , request : Request , send : Send ) -> None :
795799 """Handle unsupported HTTP methods."""
796800 headers = {
797801 "Content-Type" : CONTENT_TYPE_JSON ,
798802 "Allow" : "GET, POST, DELETE" ,
799803 }
800- if self .mcp_session_id :
804+ if self .mcp_session_id : # pragma: no branch
801805 headers [MCP_SESSION_ID_HEADER ] = self .mcp_session_id
802806
803807 response = self ._create_error_response (
@@ -824,7 +828,7 @@ async def _validate_session(self, request: Request, send: Send) -> bool:
824828 request_session_id = self ._get_session_id (request )
825829
826830 # If no session ID provided but required, return error
827- if not request_session_id : # pragma: lax no cover
831+ if not request_session_id :
828832 response = self ._create_error_response (
829833 "Bad Request: Missing session ID" ,
830834 HTTPStatus .BAD_REQUEST ,
@@ -849,11 +853,11 @@ async def _validate_protocol_version(self, request: Request, send: Send) -> bool
849853 protocol_version = request .headers .get (MCP_PROTOCOL_VERSION_HEADER )
850854
851855 # If no protocol version provided, assume default version
852- if protocol_version is None : # pragma: lax no cover
856+ if protocol_version is None :
853857 protocol_version = DEFAULT_NEGOTIATED_VERSION
854858
855859 # Check if the protocol version is supported
856- if protocol_version not in SUPPORTED_PROTOCOL_VERSIONS : # pragma: lax no cover
860+ if protocol_version not in SUPPORTED_PROTOCOL_VERSIONS :
857861 supported_versions = ", " .join (SUPPORTED_PROTOCOL_VERSIONS )
858862 response = self ._create_error_response (
859863 f"Bad Request: Unsupported protocol version: { protocol_version } . "
@@ -865,13 +869,13 @@ async def _validate_protocol_version(self, request: Request, send: Send) -> bool
865869
866870 return True
867871
868- async def _replay_events (self , last_event_id : str , request : Request , send : Send ) -> None : # pragma: lax no cover
872+ async def _replay_events (self , last_event_id : str , request : Request , send : Send ) -> None :
869873 """Replays events that would have been sent after the specified event ID.
870874
871875 Only used when resumability is enabled.
872876 """
873877 event_store = self ._event_store
874- if not event_store :
878+ if not event_store : # pragma: no cover
875879 return
876880
877881 try :
@@ -881,7 +885,7 @@ async def _replay_events(self, last_event_id: str, request: Request, send: Send)
881885 "Content-Type" : CONTENT_TYPE_SSE ,
882886 }
883887
884- if self .mcp_session_id :
888+ if self .mcp_session_id : # pragma: no branch
885889 headers [MCP_SESSION_ID_HEADER ] = self .mcp_session_id
886890
887891 # Get protocol version from header (already validated in _validate_protocol_version)
@@ -902,7 +906,7 @@ async def send_event(event_message: EventMessage) -> None:
902906 stream_id = await event_store .replay_events_after (last_event_id , send_event )
903907
904908 # If stream ID not in mapping, create it
905- if stream_id and stream_id not in self ._request_streams :
909+ if stream_id and stream_id not in self ._request_streams : # pragma: no branch
906910 # Register SSE writer so close_sse_stream() can close it
907911 self ._sse_stream_writers [stream_id ] = sse_stream_writer
908912
@@ -921,9 +925,9 @@ async def send_event(event_message: EventMessage) -> None:
921925 await sse_stream_writer .send (event_data )
922926 except anyio .ClosedResourceError :
923927 # Expected when close_sse_stream() is called
924- logger .debug ("Replay SSE stream closed by close_sse_stream()" )
928+ logger .debug ("Replay SSE stream closed by close_sse_stream()" ) # pragma: no cover
925929 except Exception :
926- logger .exception ("Error in replay sender" )
930+ logger .exception ("Error in replay sender" ) # pragma: no cover
927931
928932 # Create and start EventSourceResponse
929933 response = EventSourceResponse (
@@ -934,13 +938,13 @@ async def send_event(event_message: EventMessage) -> None:
934938
935939 try :
936940 await response (request .scope , request .receive , send )
937- except Exception :
941+ except Exception : # pragma: no cover
938942 logger .exception ("Error in replay response" )
939943 finally :
940944 await sse_stream_writer .aclose ()
941945 await sse_stream_reader .aclose ()
942946
943- except Exception :
947+ except Exception : # pragma: no cover
944948 logger .exception ("Error replaying events" )
945949 response = self ._create_error_response (
946950 "Error replaying events" ,
@@ -991,7 +995,7 @@ async def message_router():
991995 if isinstance (message , JSONRPCResponse | JSONRPCError ) and message .id is not None :
992996 target_request_id = str (message .id )
993997 # Extract related_request_id from meta if it exists
994- elif ( # pragma: lax no cover
998+ elif (
995999 session_message .metadata is not None
9961000 and isinstance (
9971001 session_message .metadata ,
@@ -1015,10 +1019,16 @@ async def message_router():
10151019 try :
10161020 # Send both the message and the event ID
10171021 await self ._request_streams [request_stream_id ][0 ].send (EventMessage (message , event_id ))
1018- except (anyio .BrokenResourceError , anyio .ClosedResourceError ): # pragma: lax no cover
1022+ except ( # pragma: lax no cover
1023+ # Timing race: if cancellation reaches this coroutine
1024+ # before send() hits the closed stream, CancelledError
1025+ # propagates instead. Either shutdown sequence is valid.
1026+ anyio .BrokenResourceError ,
1027+ anyio .ClosedResourceError ,
1028+ ):
10191029 # Stream might be closed, remove from registry
10201030 self ._request_streams .pop (request_stream_id , None )
1021- else : # pragma: lax no cover
1031+ else :
10221032 logger .debug (
10231033 f"""Request stream { request_stream_id } not found
10241034 for message. Still processing message as the client
0 commit comments