Skip to content

Commit 192b727

Browse files
feat: implement mcp.server.operation.duration and mcp.server.session.duration metrics
Implements the two server-side OTel metrics defined in the MCP semantic conventions (https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp), as discussed in #421. Changes: - src/mcp/shared/_otel.py: add a meter and two histograms with the spec-mandated explicit bucket boundaries [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30, 60, 120, 300]. Expose record_server_operation_duration() and record_server_session_duration() helpers. - src/mcp/server/lowlevel/server.py: record mcp.server.operation.duration in _handle_request() on all exit paths (success, MCPError, cancellation, transport close, raise_exceptions=True). Attributes recorded: mcp.method.name (required), error.type, rpc.response.status_code, gen_ai.tool.name + gen_ai.operation.name (tools/call), gen_ai.prompt.name (prompts/get), mcp.protocol.version. - src/mcp/server/session.py: record mcp.server.session.duration in __aexit__(). Session start time is captured in __aenter__() (not __init__()) so the clock starts when the session is actually active. Cancellation exceptions (transport close) are not treated as errors. - tests/shared/test_otel.py: extend test_client_and_server_instrumentation to assert both metrics are emitted with correct attributes and units. Transport attributes (network.transport, network.protocol.name, network.protocol.version) are not set. ServerSession does not currently know the transport type — adding it would require a new constructor parameter and a corresponding API change. This can be addressed in a follow-up once the transport kind is plumbed through. Generated by Mistral Vibe. Co-Authored-By: Mistral Vibe <vibe@mistral.ai>
1 parent d5b9155 commit 192b727

File tree

4 files changed

+158
-5
lines changed

4 files changed

+158
-5
lines changed

src/mcp/server/lowlevel/server.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async def main():
3838

3939
import contextvars
4040
import logging
41+
import time
4142
import warnings
4243
from collections.abc import AsyncIterator, Awaitable, Callable
4344
from contextlib import AbstractAsyncContextManager, AsyncExitStack, asynccontextmanager
@@ -66,7 +67,7 @@ async def main():
6667
from mcp.server.streamable_http import EventStore
6768
from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager
6869
from mcp.server.transport_security import TransportSecuritySettings
69-
from mcp.shared._otel import extract_trace_context, otel_span
70+
from mcp.shared._otel import extract_trace_context, otel_span, record_server_operation_duration
7071
from mcp.shared._stream_protocols import ReadStream, WriteStream
7172
from mcp.shared.exceptions import MCPError
7273
from mcp.shared.message import ServerMessageMetadata, SessionMessage
@@ -455,12 +456,34 @@ async def _handle_request(
455456
meta = cast(dict[str, Any] | None, getattr(req.params, "meta", None)) if req.params else None
456457
parent_context = extract_trace_context(meta) if meta is not None else None
457458

459+
mcp_protocol_version: str | None = session.client_params.protocol_version if session.client_params else None
460+
461+
start_time = time.monotonic()
462+
463+
def _record_duration(
464+
*,
465+
error_type: str | None = None,
466+
rpc_response_status_code: str | None = None,
467+
) -> None:
468+
record_server_operation_duration(
469+
time.monotonic() - start_time,
470+
req.method,
471+
error_type=error_type,
472+
rpc_response_status_code=rpc_response_status_code,
473+
tool_name=target if req.method == "tools/call" else None,
474+
prompt_name=target if req.method == "prompts/get" else None,
475+
mcp_protocol_version=mcp_protocol_version,
476+
)
477+
458478
with otel_span(
459479
span_name,
460480
kind=SpanKind.SERVER,
461481
attributes={"mcp.method.name": req.method, "jsonrpc.request.id": message.request_id},
462482
context=parent_context,
463483
) as span:
484+
error_type: str | None = None
485+
rpc_response_status_code: str | None = None
486+
464487
if handler := self._request_handlers.get(req.method):
465488
logger.debug("Dispatching request of type %s", type(req).__name__)
466489

@@ -499,25 +522,38 @@ async def _handle_request(
499522
)
500523
response = await handler(ctx, req.params)
501524
except MCPError as err:
525+
rpc_response_status_code = str(err.error.code)
526+
error_type = rpc_response_status_code
502527
response = err.error
503528
except anyio.get_cancelled_exc_class():
504529
if message.cancelled:
505530
# Client sent CancelledNotification; responder.cancel() already
506531
# sent an error response, so skip the duplicate.
507532
logger.info("Request %s cancelled - duplicate response suppressed", message.request_id)
533+
_record_duration(error_type="cancelled")
508534
return
509535
# Transport-close cancellation from the TG in run(); re-raise so the
510536
# TG swallows its own cancellation.
511537
raise
512538
except Exception as err:
539+
error_type = type(err).__name__
513540
if raise_exceptions: # pragma: no cover
541+
_record_duration(error_type=error_type)
514542
raise err
515543
response = types.ErrorData(code=0, message=str(err))
516544
else: # pragma: no cover
545+
rpc_response_status_code = str(types.METHOD_NOT_FOUND)
546+
error_type = rpc_response_status_code
517547
response = types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found")
518548

519-
if isinstance(response, types.ErrorData) and span is not None:
520-
span.set_status(StatusCode.ERROR, response.message)
549+
if isinstance(response, types.ErrorData):
550+
if span is not None:
551+
span.set_status(StatusCode.ERROR, response.message)
552+
# Only set error_type/rpc_response_status_code from response code if not
553+
# already set by an exception.
554+
if error_type is None:
555+
rpc_response_status_code = str(response.code)
556+
error_type = rpc_response_status_code
521557

522558
try:
523559
await message.respond(response)
@@ -529,10 +565,13 @@ async def _handle_request(
529565
# end closed (_receive_loop's async-with exit); Broken if the peer
530566
# end closed first (streamable_http terminate()).
531567
logger.debug("Response for %s dropped - transport closed", message.request_id)
568+
_record_duration(error_type=error_type, rpc_response_status_code=rpc_response_status_code)
532569
return
533570

534571
logger.debug("Response sent")
535572

573+
_record_duration(error_type=error_type, rpc_response_status_code=rpc_response_status_code)
574+
536575
async def _handle_notification(
537576
self,
538577
notify: types.ClientNotification,

src/mcp/server/session.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ async def handle_list_prompts(ctx: RequestContext, params) -> ListPromptsResult:
4040
from mcp.server.experimental.session_features import ExperimentalServerSessionFeatures
4141
from mcp.server.models import InitializationOptions
4242
from mcp.server.validation import validate_sampling_tools, validate_tool_use_result_messages
43+
import time
44+
45+
from mcp.shared._otel import record_server_session_duration
4346
from mcp.shared._stream_protocols import ReadStream, WriteStream
4447
from mcp.shared.exceptions import StatelessModeNotSupported
4548
from mcp.shared.experimental.tasks.capabilities import check_tasks_capability
@@ -96,6 +99,27 @@ def __init__(
9699
ServerRequestResponder
97100
](0)
98101
self._exit_stack.push_async_callback(lambda: self._incoming_message_stream_reader.aclose())
102+
self._session_start_time: float | None = None
103+
104+
async def __aenter__(self) -> "ServerSession":
105+
self._session_start_time = time.monotonic()
106+
return await super().__aenter__()
107+
108+
async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> bool | None:
109+
if self._session_start_time is not None:
110+
duration = time.monotonic() - self._session_start_time
111+
mcp_protocol_version: str | None = (
112+
self._client_params.protocol_version if self._client_params else None
113+
)
114+
# Cancellation exceptions indicate transport close, not a session error.
115+
is_cancellation = exc_val is not None and isinstance(exc_val, anyio.get_cancelled_exc_class())
116+
error_type: str | None = type(exc_val).__name__ if exc_val is not None and not is_cancellation else None
117+
record_server_session_duration(
118+
duration,
119+
error_type=error_type,
120+
mcp_protocol_version=mcp_protocol_version,
121+
)
122+
return await super().__aexit__(exc_type, exc_val, exc_tb)
99123

100124
@property
101125
def _receive_request_adapter(self) -> TypeAdapter[types.ClientRequest]:

src/mcp/shared/_otel.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,32 @@
77
from typing import Any
88

99
from opentelemetry.context import Context
10+
from opentelemetry.metrics import get_meter
1011
from opentelemetry.propagate import extract, inject
1112
from opentelemetry.trace import SpanKind, get_tracer
1213

1314
_tracer = get_tracer("mcp-python-sdk")
15+
_meter = get_meter("mcp-python-sdk")
16+
17+
# Metrics as defined by the OTEL semconv https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/mcp.md
18+
_DURATION_BUCKETS = [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30, 60, 120, 300]
19+
20+
_server_operation_duration = _meter.create_histogram(
21+
"mcp.server.operation.duration",
22+
unit="s",
23+
description=(
24+
"MCP request or notification duration as observed on the receiver "
25+
"from the time it was received until the result or ack is sent."
26+
),
27+
explicit_bucket_boundaries_advisory=_DURATION_BUCKETS,
28+
)
29+
30+
_server_session_duration = _meter.create_histogram(
31+
"mcp.server.session.duration",
32+
unit="s",
33+
description="The duration of the MCP session as observed on the MCP server.",
34+
explicit_bucket_boundaries_advisory=_DURATION_BUCKETS,
35+
)
1436

1537

1638
@contextmanager
@@ -34,3 +56,44 @@ def inject_trace_context(meta: dict[str, Any]) -> None:
3456
def extract_trace_context(meta: dict[str, Any]) -> Context:
3557
"""Extract W3C trace context from a `_meta` dict."""
3658
return extract(meta)
59+
60+
61+
def record_server_operation_duration(
62+
duration_s: float,
63+
method: str,
64+
*,
65+
error_type: str | None = None,
66+
rpc_response_status_code: str | None = None,
67+
tool_name: str | None = None,
68+
prompt_name: str | None = None,
69+
mcp_protocol_version: str | None = None,
70+
) -> None:
71+
"""Record a data point for mcp.server.operation.duration."""
72+
attributes: dict[str, str] = {"mcp.method.name": method}
73+
if error_type is not None:
74+
attributes["error.type"] = error_type
75+
if rpc_response_status_code is not None:
76+
attributes["rpc.response.status_code"] = rpc_response_status_code
77+
if tool_name is not None:
78+
attributes["gen_ai.tool.name"] = tool_name
79+
attributes["gen_ai.operation.name"] = "execute_tool"
80+
if prompt_name is not None:
81+
attributes["gen_ai.prompt.name"] = prompt_name
82+
if mcp_protocol_version is not None:
83+
attributes["mcp.protocol.version"] = mcp_protocol_version
84+
_server_operation_duration.record(duration_s, attributes)
85+
86+
87+
def record_server_session_duration(
88+
duration_s: float,
89+
*,
90+
error_type: str | None = None,
91+
mcp_protocol_version: str | None = None,
92+
) -> None:
93+
"""Record a data point for mcp.server.session.duration."""
94+
attributes: dict[str, str] = {}
95+
if error_type is not None:
96+
attributes["error.type"] = error_type
97+
if mcp_protocol_version is not None:
98+
attributes["mcp.protocol.version"] = mcp_protocol_version
99+
_server_session_duration.record(duration_s, attributes)

tests/shared/test_otel.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
# Logfire warns about propagated trace context by default (distributed_tracing=None).
1414
# This is expected here since we're testing cross-boundary context propagation.
1515
@pytest.mark.filterwarnings("ignore::RuntimeWarning")
16-
async def test_client_and_server_spans(capfire: CaptureLogfire):
17-
"""Verify that calling a tool produces client and server spans with correct attributes."""
16+
async def test_client_and_server_instrumentation(capfire: CaptureLogfire):
17+
"""Verify that calling a tool produces client and server spans and metrics with correct attributes."""
1818
server = MCPServer("test")
1919

2020
@server.tool()
@@ -42,3 +42,30 @@ def greet(name: str) -> str:
4242

4343
# Server span should be in the same trace as the client span (context propagation).
4444
assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"]
45+
46+
metrics = {m["name"]: m for m in capfire.get_collected_metrics() if m["name"].startswith("mcp.")}
47+
48+
assert "mcp.server.operation.duration" in metrics
49+
assert "mcp.server.session.duration" in metrics
50+
51+
op_metric = metrics["mcp.server.operation.duration"]
52+
assert op_metric["unit"] == "s"
53+
op_points = op_metric["data"]["data_points"]
54+
55+
# tools/call data point
56+
tools_call_point = next(p for p in op_points if p["attributes"]["mcp.method.name"] == "tools/call")
57+
assert tools_call_point["attributes"]["gen_ai.tool.name"] == "greet"
58+
assert tools_call_point["attributes"]["gen_ai.operation.name"] == "execute_tool"
59+
assert tools_call_point["attributes"]["mcp.protocol.version"] == "2025-11-25"
60+
assert tools_call_point["count"] == 1
61+
assert tools_call_point["sum"] > 0
62+
63+
# tools/list is also called during initialization
64+
assert any(p["attributes"]["mcp.method.name"] == "tools/list" for p in op_points)
65+
66+
session_metric = metrics["mcp.server.session.duration"]
67+
assert session_metric["unit"] == "s"
68+
[session_point] = session_metric["data"]["data_points"]
69+
assert session_point["attributes"]["mcp.protocol.version"] == "2025-11-25"
70+
assert session_point["count"] == 1
71+
assert session_point["sum"] > 0

0 commit comments

Comments
 (0)