Skip to content

Commit b9b1656

Browse files
committed
refactor: minimal targeted pragmas instead of blanket lax no cover
Remove the lazy approach of downgrading 36 pragmas to `lax no cover` and instead do the work: strip pragmas entirely, run coverage, apply minimal annotations to only what's actually uncovered. Results: - 69 `# pragma: no cover` removed entirely — code is fully covered, pragmas were subprocess-era lies (coverage.py couldn't see into the subprocess, now it sees into the thread) + 21 `# pragma: no cover` — genuinely unhit error paths (empty Host header, invalid UUID, defensive except Exception handlers, unused test tool branches) + 20 `# pragma: no branch` — one-sided conditionals where tests only exercise one path (if self.mcp_session_id:, if ctx.close_sse_stream:) + 2 `# pragma: lax no cover` — timing-race shutdown handlers where coverage depends on cancellation-vs-closed-stream ordering. Confirmed nondeterministic (2/3 runs pass, 1/3 fail strict-no-cover) Net pragma delta: -26. Net coverage: +287 stmts, +110 branches now tracked instead of excluded. Notable findings along the way: - session.py send_resource_updated is 100% covered — pragma removed - except clauses are "covered" by exception-type matching walk even when they don't match; the pragma goes on the body, not the except - transport_security _validate_host/_validate_origin went from whole-function excluded to just the 3 untested branches excluded
1 parent 4c12359 commit b9b1656

File tree

5 files changed

+84
-78
lines changed

5 files changed

+84
-78
lines changed

src/mcp/server/session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ async def send_log_message(
222222
related_request_id,
223223
)
224224

225-
async def send_resource_updated(self, uri: str | AnyUrl) -> None: # pragma: lax no cover
225+
async def send_resource_updated(self, uri: str | AnyUrl) -> None:
226226
"""Send a resource updated notification."""
227227
await self.send_notification(
228228
types.ResourceUpdatedNotification(

src/mcp/server/sse.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ def __init__(self, endpoint: str, security_settings: TransportSecuritySettings |
116116
logger.debug(f"SseServerTransport initialized with endpoint: {endpoint}")
117117

118118
@asynccontextmanager
119-
async def connect_sse(self, scope: Scope, receive: Receive, send: Send): # pragma: lax no cover
120-
if scope["type"] != "http":
119+
async def connect_sse(self, scope: Scope, receive: Receive, send: Send):
120+
if scope["type"] != "http": # pragma: no cover
121121
logger.error("connect_sse received non-HTTP request")
122122
raise ValueError("connect_sse can only handle HTTP requests")
123123

@@ -195,7 +195,7 @@ async def response_wrapper(scope: Scope, receive: Receive, send: Send):
195195
logger.debug("Yielding read and write streams")
196196
yield (read_stream, write_stream)
197197

198-
async def handle_post_message(self, scope: Scope, receive: Receive, send: Send) -> None: # pragma: lax no cover
198+
async def handle_post_message(self, scope: Scope, receive: Receive, send: Send) -> None:
199199
logger.debug("Handling POST message")
200200
request = Request(scope, receive)
201201

@@ -205,15 +205,15 @@ async def handle_post_message(self, scope: Scope, receive: Receive, send: Send)
205205
return await error_response(scope, receive, send)
206206

207207
session_id_param = request.query_params.get("session_id")
208-
if session_id_param is None:
208+
if session_id_param is None: # pragma: no cover
209209
logger.warning("Received request without session_id")
210210
response = Response("session_id is required", status_code=400)
211211
return await response(scope, receive, send)
212212

213213
try:
214214
session_id = UUID(hex=session_id_param)
215215
logger.debug(f"Parsed session ID: {session_id}")
216-
except ValueError:
216+
except ValueError: # pragma: no cover
217217
logger.warning(f"Received invalid session ID: {session_id_param}")
218218
response = Response("Invalid session ID", status_code=400)
219219
return await response(scope, receive, send)
@@ -230,7 +230,7 @@ async def handle_post_message(self, scope: Scope, receive: Receive, send: Send)
230230
try:
231231
message = types.jsonrpc_message_adapter.validate_json(body, by_name=False)
232232
logger.debug(f"Validated client message: {message}")
233-
except ValidationError as err:
233+
except ValidationError as err: # pragma: no cover
234234
logger.exception("Failed to parse message")
235235
response = Response("Could not parse message", status_code=400)
236236
await response(scope, receive, send)

src/mcp/server/streamable_http.py

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def is_terminated(self) -> bool:
177177
"""Check if this transport has been explicitly terminated."""
178178
return self._terminated
179179

180-
def close_sse_stream(self, request_id: RequestId) -> None: # pragma: lax no cover
180+
def close_sse_stream(self, request_id: RequestId) -> None:
181181
"""Close SSE connection for a specific request without terminating the stream.
182182
183183
This method closes the HTTP connection for the specified request, triggering
@@ -200,12 +200,12 @@ def close_sse_stream(self, request_id: RequestId) -> None: # pragma: lax no cov
200200
writer.close()
201201

202202
# Also close and remove request streams
203-
if request_id in self._request_streams:
203+
if request_id in self._request_streams: # pragma: no branch
204204
send_stream, receive_stream = self._request_streams.pop(request_id)
205205
send_stream.close()
206206
receive_stream.close()
207207

208-
def close_standalone_sse_stream(self) -> None: # pragma: lax no cover
208+
def close_standalone_sse_stream(self) -> None:
209209
"""Close the standalone GET SSE stream, triggering client reconnection.
210210
211211
This method closes the HTTP connection for the standalone GET stream used
@@ -240,10 +240,10 @@ def _create_session_message(
240240
# Only provide close callbacks when client supports resumability
241241
if self._event_store and protocol_version >= "2025-11-25":
242242

243-
async def close_stream_callback() -> None: # pragma: lax no cover
243+
async def close_stream_callback() -> None:
244244
self.close_sse_stream(request_id)
245245

246-
async def close_standalone_stream_callback() -> None: # pragma: lax no cover
246+
async def close_standalone_stream_callback() -> None:
247247
self.close_standalone_sse_stream()
248248

249249
metadata = ServerMessageMetadata(
@@ -291,7 +291,7 @@ def _create_error_response(
291291
) -> Response:
292292
"""Create an error response with a simple string message."""
293293
response_headers = {"Content-Type": CONTENT_TYPE_JSON}
294-
if headers: # pragma: lax no cover
294+
if headers:
295295
response_headers.update(headers)
296296

297297
if self.mcp_session_id:
@@ -342,7 +342,7 @@ def _create_event_data(self, event_message: EventMessage) -> dict[str, str]:
342342
}
343343

344344
# If an event ID was provided, include it
345-
if event_message.event_id: # pragma: lax no cover
345+
if event_message.event_id:
346346
event_data["id"] = event_message.event_id
347347

348348
return event_data
@@ -372,7 +372,7 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
372372
await error_response(scope, receive, send)
373373
return
374374

375-
if self._terminated: # pragma: lax no cover
375+
if self._terminated:
376376
# If the session has been terminated, return 404 Not Found
377377
response = self._create_error_response(
378378
"Not Found: Session has been terminated",
@@ -387,7 +387,7 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
387387
await self._handle_get_request(request, send)
388388
elif request.method == "DELETE":
389389
await self._handle_delete_request(request, send)
390-
else: # pragma: lax no cover
390+
else:
391391
await self._handle_unsupported_request(request, send)
392392

393393
def _check_accept_headers(self, request: Request) -> tuple[bool, bool]:
@@ -467,7 +467,7 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
467467

468468
try:
469469
message = jsonrpc_message_adapter.validate_python(raw_message, by_name=False)
470-
except ValidationError as e: # pragma: lax no cover
470+
except ValidationError as e:
471471
response = self._create_error_response(
472472
f"Validation error: {str(e)}",
473473
HTTPStatus.BAD_REQUEST,
@@ -493,7 +493,7 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
493493
)
494494
await response(scope, receive, send)
495495
return
496-
elif not await self._validate_request_headers(request, send): # pragma: lax no cover
496+
elif not await self._validate_request_headers(request, send):
497497
return
498498

499499
# For notifications and responses only, return 202 Accepted
@@ -633,7 +633,7 @@ async def sse_writer(): # pragma: lax no cover
633633
finally:
634634
await sse_stream_reader.aclose()
635635

636-
except Exception as err: # pragma: lax no cover
636+
except Exception as err: # pragma: no cover
637637
logger.exception("Error handling POST request")
638638
response = self._create_error_response(
639639
f"Error handling POST request: {err}",
@@ -659,19 +659,19 @@ async def _handle_get_request(self, request: Request, send: Send) -> None:
659659
# Validate Accept header - must include text/event-stream
660660
_, has_sse = self._check_accept_headers(request)
661661

662-
if not has_sse: # pragma: lax no cover
662+
if not has_sse:
663663
response = self._create_error_response(
664664
"Not Acceptable: Client must accept text/event-stream",
665665
HTTPStatus.NOT_ACCEPTABLE,
666666
)
667667
await response(request.scope, request.receive, send)
668668
return
669669

670-
if not await self._validate_request_headers(request, send): # pragma: lax no cover
670+
if not await self._validate_request_headers(request, send):
671671
return
672672

673673
# Handle resumability: check for Last-Event-ID header
674-
if last_event_id := request.headers.get(LAST_EVENT_ID_HEADER): # pragma: lax no cover
674+
if last_event_id := request.headers.get(LAST_EVENT_ID_HEADER):
675675
await self._replay_events(last_event_id, request, send)
676676
return
677677

@@ -681,11 +681,11 @@ async def _handle_get_request(self, request: Request, send: Send) -> None:
681681
"Content-Type": CONTENT_TYPE_SSE,
682682
}
683683

684-
if self.mcp_session_id:
684+
if self.mcp_session_id: # pragma: no branch
685685
headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id
686686

687687
# Check if we already have an active GET stream
688-
if GET_STREAM_KEY in self._request_streams: # pragma: lax no cover
688+
if GET_STREAM_KEY in self._request_streams:
689689
response = self._create_error_response(
690690
"Conflict: Only one SSE stream is allowed per session",
691691
HTTPStatus.CONFLICT,
@@ -714,8 +714,12 @@ async def standalone_sse_writer():
714714
# Send the message via SSE
715715
event_data = self._create_event_data(event_message)
716716
await sse_stream_writer.send(event_data)
717-
except Exception: # pragma: lax no cover
718-
logger.exception("Error in standalone SSE writer")
717+
except Exception:
718+
# Timing race: if task cancellation reaches this writer before it
719+
# hits the closed stream, we get CancelledError (not caught here).
720+
# If the closed-stream error fires first, this logs. Neither order is
721+
# a bug — both are valid shutdown sequences.
722+
logger.exception("Error in standalone SSE writer") # pragma: lax no cover
719723
finally:
720724
logger.debug("Closing standalone SSE writer")
721725
await self._clean_up_memory_streams(GET_STREAM_KEY)
@@ -791,13 +795,13 @@ async def terminate(self) -> None:
791795
# During cleanup, we catch all exceptions since streams might be in various states
792796
logger.debug(f"Error closing streams: {e}")
793797

794-
async def _handle_unsupported_request(self, request: Request, send: Send) -> None: # pragma: lax no cover
798+
async def _handle_unsupported_request(self, request: Request, send: Send) -> None:
795799
"""Handle unsupported HTTP methods."""
796800
headers = {
797801
"Content-Type": CONTENT_TYPE_JSON,
798802
"Allow": "GET, POST, DELETE",
799803
}
800-
if self.mcp_session_id:
804+
if self.mcp_session_id: # pragma: no branch
801805
headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id
802806

803807
response = self._create_error_response(
@@ -824,7 +828,7 @@ async def _validate_session(self, request: Request, send: Send) -> bool:
824828
request_session_id = self._get_session_id(request)
825829

826830
# If no session ID provided but required, return error
827-
if not request_session_id: # pragma: lax no cover
831+
if not request_session_id:
828832
response = self._create_error_response(
829833
"Bad Request: Missing session ID",
830834
HTTPStatus.BAD_REQUEST,
@@ -849,11 +853,11 @@ async def _validate_protocol_version(self, request: Request, send: Send) -> bool
849853
protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER)
850854

851855
# If no protocol version provided, assume default version
852-
if protocol_version is None: # pragma: lax no cover
856+
if protocol_version is None:
853857
protocol_version = DEFAULT_NEGOTIATED_VERSION
854858

855859
# Check if the protocol version is supported
856-
if protocol_version not in SUPPORTED_PROTOCOL_VERSIONS: # pragma: lax no cover
860+
if protocol_version not in SUPPORTED_PROTOCOL_VERSIONS:
857861
supported_versions = ", ".join(SUPPORTED_PROTOCOL_VERSIONS)
858862
response = self._create_error_response(
859863
f"Bad Request: Unsupported protocol version: {protocol_version}. "
@@ -865,13 +869,13 @@ async def _validate_protocol_version(self, request: Request, send: Send) -> bool
865869

866870
return True
867871

868-
async def _replay_events(self, last_event_id: str, request: Request, send: Send) -> None: # pragma: lax no cover
872+
async def _replay_events(self, last_event_id: str, request: Request, send: Send) -> None:
869873
"""Replays events that would have been sent after the specified event ID.
870874
871875
Only used when resumability is enabled.
872876
"""
873877
event_store = self._event_store
874-
if not event_store:
878+
if not event_store: # pragma: no cover
875879
return
876880

877881
try:
@@ -881,7 +885,7 @@ async def _replay_events(self, last_event_id: str, request: Request, send: Send)
881885
"Content-Type": CONTENT_TYPE_SSE,
882886
}
883887

884-
if self.mcp_session_id:
888+
if self.mcp_session_id: # pragma: no branch
885889
headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id
886890

887891
# Get protocol version from header (already validated in _validate_protocol_version)
@@ -902,7 +906,7 @@ async def send_event(event_message: EventMessage) -> None:
902906
stream_id = await event_store.replay_events_after(last_event_id, send_event)
903907

904908
# If stream ID not in mapping, create it
905-
if stream_id and stream_id not in self._request_streams:
909+
if stream_id and stream_id not in self._request_streams: # pragma: no branch
906910
# Register SSE writer so close_sse_stream() can close it
907911
self._sse_stream_writers[stream_id] = sse_stream_writer
908912

@@ -921,9 +925,9 @@ async def send_event(event_message: EventMessage) -> None:
921925
await sse_stream_writer.send(event_data)
922926
except anyio.ClosedResourceError:
923927
# Expected when close_sse_stream() is called
924-
logger.debug("Replay SSE stream closed by close_sse_stream()")
928+
logger.debug("Replay SSE stream closed by close_sse_stream()") # pragma: no cover
925929
except Exception:
926-
logger.exception("Error in replay sender")
930+
logger.exception("Error in replay sender") # pragma: no cover
927931

928932
# Create and start EventSourceResponse
929933
response = EventSourceResponse(
@@ -934,13 +938,13 @@ async def send_event(event_message: EventMessage) -> None:
934938

935939
try:
936940
await response(request.scope, request.receive, send)
937-
except Exception:
941+
except Exception: # pragma: no cover
938942
logger.exception("Error in replay response")
939943
finally:
940944
await sse_stream_writer.aclose()
941945
await sse_stream_reader.aclose()
942946

943-
except Exception:
947+
except Exception: # pragma: no cover
944948
logger.exception("Error replaying events")
945949
response = self._create_error_response(
946950
"Error replaying events",
@@ -991,7 +995,7 @@ async def message_router():
991995
if isinstance(message, JSONRPCResponse | JSONRPCError) and message.id is not None:
992996
target_request_id = str(message.id)
993997
# Extract related_request_id from meta if it exists
994-
elif ( # pragma: lax no cover
998+
elif (
995999
session_message.metadata is not None
9961000
and isinstance(
9971001
session_message.metadata,
@@ -1015,10 +1019,16 @@ async def message_router():
10151019
try:
10161020
# Send both the message and the event ID
10171021
await self._request_streams[request_stream_id][0].send(EventMessage(message, event_id))
1018-
except (anyio.BrokenResourceError, anyio.ClosedResourceError): # pragma: lax no cover
1022+
except ( # pragma: lax no cover
1023+
# Timing race: if cancellation reaches this coroutine
1024+
# before send() hits the closed stream, CancelledError
1025+
# propagates instead. Either shutdown sequence is valid.
1026+
anyio.BrokenResourceError,
1027+
anyio.ClosedResourceError,
1028+
):
10191029
# Stream might be closed, remove from registry
10201030
self._request_streams.pop(request_stream_id, None)
1021-
else: # pragma: lax no cover
1031+
else:
10221032
logger.debug(
10231033
f"""Request stream {request_stream_id} not found
10241034
for message. Still processing message as the client

0 commit comments

Comments
 (0)