diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/CHANGELOG.md index 8c6a17f0..88f4a18b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/CHANGELOG.md @@ -5,18 +5,30 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.1.2] +## [0.2.0] ### Added +- **FastMCP 3.x support** — **Breaking**: targets `fastmcp >= 3.0.0, < 4` (previously `>= 2.0.0, <= 2.14.7`). - **Server session lifecycle tracking** — `server_instrumentor` now wraps `mcp.server.lowlevel.Server.run` with an `AgentInvocation(agent_type="mcp_server")` to track server session duration, enabling `mcp.server.session.duration` metric emission via `MetricsEmitter`. +- **`resources/read` and `prompts/get` instrumentation** — Server and client-side hooks for `FastMCP.read_resource` / `Client.read_resource` and `FastMCP.get_prompt` / `Client.get_prompt`. Produces `MCPOperation` spans with `{mcp.method.name} {target}` naming. +- **Transport context bridge** — `MCPRequestContext` ContextVar populated by the transport instrumentor on the server side, allowing the server instrumentor to read `jsonrpc.request.id`, `network.transport`, etc. +- **Transport detection** — Client automatically detects `pipe` vs `tcp` transport from `Client.transport` type. +- **Baggage propagation** — Transport instrumentor now extracts W3C `baggage` header alongside `traceparent`/`tracestate`. ### Changed -- Pinned compatibility to `fastmcp >= 2.0.0, <= 2.14.7` and `splunk-otel-util-genai <= 0.1.8` to avoid runtime incompatibilities introduced by newer upstream releases. +- **`list_tools` uses `MCPOperation` instead of `Step`** — Client `list_tools` now produces a `tools/list` span via `MCPOperation` with proper MCP semconv naming and `SpanKind.CLIENT`, instead of the previous `Step` type. +- **Server hooks on `FastMCP.call_tool`** — Tool call hook targets `FastMCP.call_tool` directly with re-entrant guard for FastMCP 3.x middleware recursion. +- **Renamed `mcp_server_name` → `sdot_mcp_server_name`** — **Breaking**: callers using `mcp_server_name=` on `MCPToolCall` must update to `sdot_mcp_server_name=`. ### Fixed - **MCP session attributes for duration metrics** — `client_instrumentor` now sets `network.transport` and `error.type` on the `AgentInvocation` attributes dict so that `MetricsEmitter` can record `mcp.client.session.duration` with proper semconv attributes. - **MCP span naming aligned with OTel MCP semantic conventions** — Tool call spans now use `tools/call {tool_name}` format with `SpanKind.CLIENT` (client-side) or `SpanKind.SERVER` (server-side), matching the [OTel MCP semconv spec](https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/). Previously used `execute_tool {tool_name}` with `SpanKind.INTERNAL`. +## [0.1.2] + +### Changed +- Pinned compatibility to `fastmcp >= 2.0.0, <= 2.14.7` and `splunk-otel-util-genai <= 0.1.8` to avoid runtime incompatibilities introduced by newer upstream releases. + ## [0.1.1] - 2026-01-27 ### Changed diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/README.rst b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/README.rst index 793567df..5097d6b1 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/README.rst +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/README.rst @@ -9,6 +9,26 @@ OpenTelemetry FastMCP Instrumentation This library provides automatic instrumentation for `FastMCP `_, a Python library for building Model Context Protocol (MCP) servers. +Compatibility Matrix +-------------------- + +.. list-table:: + :header-rows: 1 + :widths: 20 20 20 20 + + * - Instrumentation + - fastmcp + - util-genai + - Notes + * - 0.1.1 + - 2.x (jlowin/fastmcp) + - <= 0.1.9 + - PR #147. Wraps ``ToolManager.call_tool``. + * - 0.2.0 + - >= 3.0.0, < 4 + - >= 0.1.12 + - Wraps ``FastMCP.call_tool``, ``read_resource``, ``render_prompt``. Breaking change from 0.1.x. + Installation ------------ @@ -58,8 +78,16 @@ The following environment variables control the instrumentation behavior: What is Instrumented -------------------- -Server-side: -~~~~~~~~~~~~ +Server-side (v0.2.0 — FastMCP 3.x): +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +- FastMCP server initialization +- Tool execution via ``FastMCP.call_tool`` +- Resource reads via ``FastMCP.read_resource`` +- Prompt rendering via ``FastMCP.render_prompt`` + +Server-side (v0.1.x — FastMCP 2.x): +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - FastMCP server initialization - Tool execution via ``ToolManager.call_tool`` @@ -80,13 +108,70 @@ Trace Context Propagation ------------------------- The instrumentation automatically propagates W3C TraceContext (traceparent, tracestate) -between MCP client and server processes. This enables distributed tracing across -process boundaries: +and baggage between MCP client and server processes. This enables distributed tracing +across process boundaries: - Client spans and server spans share the same ``trace_id`` - Server tool execution spans are children of client tool call spans - No code changes required in your MCP server or client +Transport bridge (``transport_instrumentor.py``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The MCP Python SDK v1.x (current stable, up to 1.27.0) does not natively +propagate OpenTelemetry context. This instrumentation includes a +**transport-layer bridge** (``transport_instrumentor.py``) that: + +- **Client side**: wraps ``BaseSession.send_request`` to inject ``traceparent``, + ``tracestate``, and ``baggage`` into ``params.meta`` (serialized as ``_meta`` + on the wire). +- **Server side**: wraps ``Server._handle_request`` to extract trace context + from ``request_meta`` and populate an ``MCPRequestContext`` (via + ``ContextVar``) for the server instrumentor to read transport-level attributes + like ``jsonrpc.request.id`` and ``network.transport``. + +Upstream native support (mcp v2.x) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Native OTel support has been merged to the upstream SDK's ``main`` branch, +targeting **v2.x** (not yet released as of Apr 2026): + +- `#2298 `_ + (merged Mar 31) — propagate ``contextvars.Context`` through anyio streams. + Supersedes `#1996 `_ + (closed). +- `#2381 `_ + (merged Mar 31) — native CLIENT + SERVER spans, W3C trace-context + inject/extract via ``params.meta``, and ``opentelemetry-api`` as a mandatory + dependency. + +Related open/draft PRs that may further extend the native support: + +- `#2093 `_ + — enhanced inject logic (open). +- `#2133 `_ + — enhanced extract logic (draft, depends on #2298). +- `#2132 `_ + — richer CLIENT span attributes (draft, depends on #2298). + +Migration plan +^^^^^^^^^^^^^^ + +Once ``mcp >= 2.x`` is released and the minimum supported version is raised: + +- ``_send_request_wrapper`` (client-side inject) can be **removed**. +- The trace-context extract/attach portion of ``_server_handle_request_wrapper`` + can be **removed**. The ``MCPRequestContext`` population + (``jsonrpc.request.id``, ``network.transport``) should remain because the + v2.x native spans (per #2381) only surface ``mcp.method.name`` and + ``jsonrpc.request.id``; ``network.transport`` is not included. + Re-evaluate as the upstream spans mature. +- ``_extract_carrier_from_meta`` can be **removed**. + +A feature-detection guard (similar to ``_has_native_telemetry`` in the server +instrumentor) should be added so the wrappers gracefully become no-ops when +running against ``mcp >= 2.x``, allowing a wider version range. + Telemetry --------- @@ -114,6 +199,8 @@ When content capture is enabled: References ---------- -- `FastMCP `_ +- `FastMCP 3.x `_ (>= 3.0.0) +- `FastMCP 2.x `_ (<= 2.14.7) - `Model Context Protocol `_ +- `OpenTelemetry GenAI MCP Semantic Conventions `_ - `OpenTelemetry Project `_ diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/client.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/client.py index 10928592..6d0d0d93 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/client.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/client.py @@ -113,9 +113,10 @@ async def run_calculator_demo(server_url: str | None = None): Args: server_url: Optional URL of external MCP server (e.g., http://localhost:8000/sse). - If not provided, spawns server.py as a subprocess. + If not provided, spawns server_instrumented.py as a subprocess. """ from fastmcp import Client + from fastmcp.client.transports.stdio import PythonStdioTransport print("\n" + "=" * 60) print("MCP Calculator Client - OpenTelemetry Instrumentation Demo") @@ -126,12 +127,21 @@ async def run_calculator_demo(server_url: str | None = None): print(f"\n🌐 Connecting to external server: {server_url}") server_target = server_url else: - # Spawn server as subprocess - server_script = Path(__file__).parent / "server.py" + # Spawn instrumented server as subprocess. + # MCP SDK's default env only inherits a small allowlist (HOME, PATH, + # etc.), so OTEL_* vars must be passed explicitly. + server_script = Path(__file__).parent / "server_instrumented.py" if not server_script.exists(): raise FileNotFoundError(f"Server script not found: {server_script}") print(f"\n📡 Spawning server subprocess: {server_script.name}") - server_target = server_script + server_env = { + k: v + for k, v in os.environ.items() + if k.startswith(("OTEL_", "VIRTUAL_ENV", "FASTMCP_")) + or k in ("HOME", "PATH", "SHELL", "TERM", "USER", "LOGNAME") + } + server_env["OTEL_SERVICE_NAME"] = "mcp-calculator-server" + server_target = PythonStdioTransport(script_path=server_script, env=server_env) # Connect to the server using FastMCP Client async with Client(server_target) as client: diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/prompt/client.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/prompt/client.py new file mode 100644 index 00000000..c1bb86dc --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/prompt/client.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +"""MCP Weather Prompt Client. + +Connects to the weather prompt server and demonstrates prompt operations +with OpenTelemetry instrumentation capturing traces and metrics. + +Expected spans: + - "prompts/list" (SpanKind.CLIENT) + - "prompts/get weather_forecast" (SpanKind.CLIENT) + - "prompts/get travel_packing_advice" (SpanKind.CLIENT) + +Usage: + # Spawn server as subprocess (single terminal) + OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" \\ + OTEL_SERVICE_NAME="mcp-prompt-demo" \\ + OTEL_INSTRUMENTATION_GENAI_EMITTERS="span_metric" \\ + python client.py --wait 10 + + # Connect to external SSE server + python client.py --server-url http://localhost:8001/sse --wait 10 +""" + +import argparse +import asyncio +import os +import sys +from pathlib import Path + + +def setup_telemetry(): + """Set up OpenTelemetry with OTLP and/or console exporters.""" + from opentelemetry import trace, metrics + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.resources import Resource + + resource = Resource.create( + {"service.name": os.environ.get("OTEL_SERVICE_NAME", "mcp-prompt-demo")} + ) + + trace_provider = TracerProvider(resource=resource) + metric_readers = [] + + otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + if otlp_endpoint: + try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + + trace_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + metric_readers.append(PeriodicExportingMetricReader(OTLPMetricExporter())) + print(f" OTLP exporter -> {otlp_endpoint}", file=sys.stderr) + except ImportError: + print(" OTLP not available", file=sys.stderr) + + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, + ) + + trace_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + print(" Console exporter enabled", file=sys.stderr) + + trace.set_tracer_provider(trace_provider) + if metric_readers: + metrics.set_meter_provider( + MeterProvider(resource=resource, metric_readers=metric_readers) + ) + + from opentelemetry.instrumentation.fastmcp import FastMCPInstrumentor + + FastMCPInstrumentor().instrument() + print(" FastMCP instrumentation applied", file=sys.stderr) + + +async def run_prompt_demo(server_url: str | None = None): + """Connect to the weather prompt server and exercise prompt operations.""" + from fastmcp import Client + + print("\n" + "=" * 60) + print(" MCP Prompt Demo - OpenTelemetry Instrumentation") + print("=" * 60) + + if server_url: + target = server_url + print(f"\n Connecting to: {server_url}") + else: + target = Path(__file__).parent / "server.py" + print(f"\n Spawning server: {target.name}") + + async with Client(target) as client: + print(" Connected!\n") + + # 1. List prompts + print(" Step 1: List prompts") + print(" " + "-" * 40) + prompts = await client.list_prompts() + for p in prompts: + args = ", ".join(a.name for a in (p.arguments or [])) + print(f" {p.name}({args})") + print() + + # 2. Get weather_forecast prompt + print(" Step 2: Get weather_forecast for London") + print(" " + "-" * 40) + result = await client.get_prompt( + "weather_forecast", arguments={"city": "London"} + ) + for msg in result.messages: + role = msg.role + text = ( + msg.content.text if hasattr(msg.content, "text") else str(msg.content) + ) + print(f" [{role}] {text[:120]}") + print() + + # 3. Get travel_packing_advice prompt + print(" Step 3: Get travel_packing_advice for Tokyo, 5 days") + print(" " + "-" * 40) + result = await client.get_prompt( + "travel_packing_advice", + arguments={"destination": "Tokyo", "days": "5"}, + ) + for msg in result.messages: + role = msg.role + text = ( + msg.content.text if hasattr(msg.content, "text") else str(msg.content) + ) + print(f" [{role}] {text[:120]}") + print() + + print("=" * 60) + print(" Prompt demo completed!") + print("=" * 60) + + +async def main(wait_seconds: int = 0, server_url: str | None = None): + setup_telemetry() + print() + + try: + await run_prompt_demo(server_url=server_url) + except Exception as e: + print(f"\n Demo failed: {e}") + import traceback + + traceback.print_exc() + + if wait_seconds > 0: + print(f"\n Waiting {wait_seconds}s for telemetry flush...") + await asyncio.sleep(wait_seconds) + print(" Done") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="MCP Prompt Client") + parser.add_argument("--server-url", type=str, default=None) + parser.add_argument("--wait", type=int, default=0) + args = parser.parse_args() + + asyncio.run(main(wait_seconds=args.wait, server_url=args.server_url)) diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/prompt/server.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/prompt/server.py new file mode 100644 index 00000000..c55660ac --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/prompt/server.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +"""MCP Weather Prompt Server. + +Exposes templatized prompts that fetch weather data from wttr.in +and return them as structured MCP prompt messages. + +Usage: + # stdio mode (for subprocess spawning by client) + python server.py + + # SSE mode (for external client connections) + python server.py --sse --port 8001 +""" + +import urllib.request + +from fastmcp import FastMCP + +mcp = FastMCP("Weather Prompt Server") + + +def _fetch_weather(city: str) -> str: + """Fetch plain-text weather from wttr.in.""" + url = f"https://wttr.in/{urllib.request.quote(city)}?format=3" + try: + with urllib.request.urlopen(url, timeout=10) as resp: + return resp.read().decode("utf-8").strip() + except Exception as e: + return f"Unable to fetch weather for {city}: {e}" + + +@mcp.prompt() +def weather_forecast(city: str) -> str: + """Get a weather forecast summary for a city. + + Fetches live weather from wttr.in and returns it as a prompt + that an LLM can use to provide a natural-language forecast. + + Args: + city: City name (e.g. "London", "San Francisco", "Tokyo") + """ + weather = _fetch_weather(city) + return ( + f"Here is the current weather data for {city}:\n\n" + f" {weather}\n\n" + f"Please provide a brief, friendly forecast summary for {city} " + f"based on the data above." + ) + + +@mcp.prompt() +def travel_packing_advice(destination: str, days: int) -> str: + """Get packing advice based on weather at a travel destination. + + Fetches weather and returns a prompt asking for packing recommendations. + + Args: + destination: Travel destination city + days: Number of days for the trip + """ + weather = _fetch_weather(destination) + return ( + f"I'm traveling to {destination} for {days} days.\n\n" + f"Current weather: {weather}\n\n" + f"Based on this weather, what should I pack? " + f"Please provide a concise packing list." + ) + + +if __name__ == "__main__": + import argparse + import sys + + parser = argparse.ArgumentParser(description="MCP Weather Prompt Server") + parser.add_argument("--sse", action="store_true", help="Run in SSE mode") + parser.add_argument("--port", type=int, default=8001, help="SSE port") + args = parser.parse_args() + + if args.sse: + print( + f"Starting SSE server at http://localhost:{args.port}/sse", + file=sys.stderr, + ) + mcp.run(transport="sse", host="localhost", port=args.port) + else: + mcp.run() diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/resource/client.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/resource/client.py new file mode 100644 index 00000000..343236d6 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/resource/client.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +"""MCP System Dashboard Resource Client. + +Connects to the system dashboard server and demonstrates resource operations +with OpenTelemetry instrumentation. + +Expected spans: + - "resources/list" (SpanKind.CLIENT) + - "resources/read system://info" (SpanKind.CLIENT) + - "resources/read system://uptime" (SpanKind.CLIENT) + - "resources/read system://env/HOME" (SpanKind.CLIENT) + +Usage: + OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" \\ + OTEL_SERVICE_NAME="mcp-resource-demo" \\ + OTEL_INSTRUMENTATION_GENAI_EMITTERS="span_metric" \\ + python client.py --wait 10 + + # Or connect to external SSE server + python client.py --server-url http://localhost:8002/sse --wait 10 +""" + +import argparse +import asyncio +import os +import sys +from pathlib import Path + + +def setup_telemetry(): + """Set up OpenTelemetry with OTLP and/or console exporters.""" + from opentelemetry import trace, metrics + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.resources import Resource + + resource = Resource.create( + {"service.name": os.environ.get("OTEL_SERVICE_NAME", "mcp-resource-demo")} + ) + + trace_provider = TracerProvider(resource=resource) + metric_readers = [] + + otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + if otlp_endpoint: + try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + + trace_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + metric_readers.append(PeriodicExportingMetricReader(OTLPMetricExporter())) + print(f" OTLP exporter -> {otlp_endpoint}", file=sys.stderr) + except ImportError: + print(" OTLP not available", file=sys.stderr) + + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, + ) + + trace_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + print(" Console exporter enabled", file=sys.stderr) + + trace.set_tracer_provider(trace_provider) + if metric_readers: + metrics.set_meter_provider( + MeterProvider(resource=resource, metric_readers=metric_readers) + ) + + from opentelemetry.instrumentation.fastmcp import FastMCPInstrumentor + + FastMCPInstrumentor().instrument() + print(" FastMCP instrumentation applied", file=sys.stderr) + + +async def run_resource_demo(server_url: str | None = None): + """Connect to the system dashboard server and exercise resource operations.""" + from fastmcp import Client + + print("\n" + "=" * 60) + print(" MCP Resource Demo - OpenTelemetry Instrumentation") + print("=" * 60) + + if server_url: + target = server_url + print(f"\n Connecting to: {server_url}") + else: + target = Path(__file__).parent / "server.py" + print(f"\n Spawning server: {target.name}") + + async with Client(target) as client: + print(" Connected!\n") + + # 1. List resources + print(" Step 1: List resources") + print(" " + "-" * 40) + resources = await client.list_resources() + for r in resources: + print(f" {r.uri} ({r.name})") + print() + + # Also list resource templates + templates = await client.list_resource_templates() + if templates: + print(" Resource templates:") + for t in templates: + print(f" {t.uriTemplate} ({t.name})") + print() + + # 2. Read static resource: system://info + print(" Step 2: Read system://info") + print(" " + "-" * 40) + contents = await client.read_resource("system://info") + for c in contents: + text = c.text if hasattr(c, "text") else str(c) + for line in text.strip().splitlines(): + print(f" {line}") + print() + + # 3. Read static resource: system://uptime + print(" Step 3: Read system://uptime") + print(" " + "-" * 40) + contents = await client.read_resource("system://uptime") + for c in contents: + text = c.text if hasattr(c, "text") else str(c) + for line in text.strip().splitlines(): + print(f" {line}") + print() + + # 4. Read template resource: system://env/HOME + print(" Step 4: Read system://env/HOME") + print(" " + "-" * 40) + contents = await client.read_resource("system://env/HOME") + for c in contents: + text = c.text if hasattr(c, "text") else str(c) + print(f" {text.strip()}") + print() + + # 5. Read template resource: system://env/USER + print(" Step 5: Read system://env/USER") + print(" " + "-" * 40) + contents = await client.read_resource("system://env/USER") + for c in contents: + text = c.text if hasattr(c, "text") else str(c) + print(f" {text.strip()}") + print() + + print("=" * 60) + print(" Resource demo completed!") + print("=" * 60) + + +async def main(wait_seconds: int = 0, server_url: str | None = None): + setup_telemetry() + print() + + try: + await run_resource_demo(server_url=server_url) + except Exception as e: + print(f"\n Demo failed: {e}") + import traceback + + traceback.print_exc() + + if wait_seconds > 0: + print(f"\n Waiting {wait_seconds}s for telemetry flush...") + await asyncio.sleep(wait_seconds) + print(" Done") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="MCP Resource Client") + parser.add_argument("--server-url", type=str, default=None) + parser.add_argument("--wait", type=int, default=0) + args = parser.parse_args() + + asyncio.run(main(wait_seconds=args.wait, server_url=args.server_url)) diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/resource/server.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/resource/server.py new file mode 100644 index 00000000..29365f8e --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/resource/server.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +"""MCP System Dashboard Resource Server. + +Exposes system information as MCP resources: + - system://info (static) — OS / Python / hostname + - system://uptime (static) — system uptime + - system://env/{var_name} (template) — environment variable lookup + +Usage: + python server.py # stdio + python server.py --sse --port 8002 # SSE +""" + +import os +import platform +import time + +from fastmcp import FastMCP + +mcp = FastMCP("System Dashboard Server") + +_START_TIME = time.time() + + +@mcp.resource("system://info") +def system_info() -> str: + """Basic system information. + + Returns hostname, OS, architecture, and Python version. + """ + return ( + f"hostname: {platform.node()}\n" + f"os: {platform.system()} {platform.release()}\n" + f"arch: {platform.machine()}\n" + f"python: {platform.python_version()}\n" + f"pid: {os.getpid()}" + ) + + +@mcp.resource("system://uptime") +def system_uptime() -> str: + """Server process uptime since start. + + Returns uptime in seconds and a human-readable format. + """ + elapsed = time.time() - _START_TIME + hours, remainder = divmod(int(elapsed), 3600) + minutes, seconds = divmod(remainder, 60) + return ( + f"uptime_seconds: {elapsed:.1f}\nuptime_human: {hours}h {minutes}m {seconds}s" + ) + + +@mcp.resource("system://env/{var_name}") +def env_variable(var_name: str) -> str: + """Read an environment variable by name. + + Args: + var_name: Name of the environment variable to read. + """ + value = os.environ.get(var_name) + if value is None: + return f"{var_name} is not set" + return f"{var_name}={value}" + + +if __name__ == "__main__": + import argparse + import sys + + parser = argparse.ArgumentParser(description="MCP System Dashboard Server") + parser.add_argument("--sse", action="store_true", help="Run in SSE mode") + parser.add_argument("--port", type=int, default=8002, help="SSE port") + args = parser.parse_args() + + if args.sse: + print( + f"Starting SSE server at http://localhost:{args.port}/sse", + file=sys.stderr, + ) + mcp.run(transport="sse", host="localhost", port=args.port) + else: + mcp.run() diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/run_demo.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/run_demo.py index c9fd4204..121b03e0 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/run_demo.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/run_demo.py @@ -131,9 +131,12 @@ def setup_telemetry(console_output: bool = False): async def run_demo(): """Run the end-to-end demo.""" from fastmcp import Client + from fastmcp.client.transports.stdio import PythonStdioTransport - # Get the server script path - server_script = Path(__file__).parent / "server.py" + # Spawn the instrumented server. + # MCP SDK's default env only inherits a small allowlist (HOME, PATH, + # etc.), so OTEL_* vars must be passed explicitly. + server_script = Path(__file__).parent / "server_instrumented.py" if not server_script.exists(): raise FileNotFoundError(f"Server script not found: {server_script}") @@ -141,8 +144,17 @@ async def run_demo(): print(f"📡 Connecting to Calculator Server ({server_script.name})...") print() + server_env = { + k: v + for k, v in os.environ.items() + if k.startswith(("OTEL_", "VIRTUAL_ENV")) + or k in ("HOME", "PATH", "SHELL", "TERM", "USER", "LOGNAME") + } + server_env["OTEL_SERVICE_NAME"] = "mcp-calculator-server" + server_target = PythonStdioTransport(script_path=server_script, env=server_env) + # Connect to the server - async with Client(server_script) as client: + async with Client(server_target) as client: print("✅ Connected!\n") # Step 1: List available tools diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/server_instrumented.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/server_instrumented.py index 645f1658..6eed9a9c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/server_instrumented.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/e2e/server_instrumented.py @@ -37,7 +37,7 @@ def setup_telemetry(): from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource - service_name = os.environ.get("OTEL_SERVICE_NAME", "mcp-calculator-server") + service_name = os.environ.get("MCP_SERVER_SERVICE_NAME", "mcp-calculator-server") # Create resource with service info resource = Resource.create( diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/weather_agent/README.md b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/weather_agent/README.md new file mode 100644 index 00000000..cc4565c8 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/weather_agent/README.md @@ -0,0 +1,115 @@ +# Weather Agent — MCP + LLM Agentic Loop + +This example demonstrates a full agentic loop where an **Agent** (MCP client) orchestrates +between an **MCP Server** (spawned as subprocess) and an **LLM** (OpenAI) to answer weather +and packing questions. + +## Sequence Diagram + +```mermaid +sequenceDiagram + participant Agent as Agent (MCP Client) + participant MCP as MCP Server (subprocess) + participant LLM as LLM (OpenAI) + + Note over Agent,MCP: Spawns MCP Server subprocess and connects via stdio + activate MCP + MCP ->> MCP: Initialize and ready + deactivate MCP + + Agent ->> MCP: initialize: {clientInfo, protocolVersion} + activate MCP + MCP -->> Agent: response: {capabilities, serverInfo} + deactivate MCP + + Agent ->> MCP: tools/list + activate MCP + MCP -->> Agent: response: {tools: [{name: "get_weather", ...}, {name: "get_packing_suggestions", ...}]} + deactivate MCP + + Agent ->> LLM: user: "I'm traveling to Tokyo for 5 days. What's the weather and what should I pack?" + activate LLM + Note over LLM: Determines it needs weather data first + + LLM ->> Agent: tool_call: get_weather(city="Tokyo") + deactivate LLM + activate Agent + + Agent ->> MCP: tools/call: get_weather {"city": "Tokyo"} + activate MCP + MCP -->> Agent: result: {"temperature_celsius": 22, "condition": "Partly cloudy", ...} + deactivate MCP + + Agent ->> LLM: [messages + tool result: "22°C, Partly cloudy"] + deactivate Agent + activate LLM + Note over LLM: Now has weather data, calls packing tool + + LLM ->> Agent: tool_call: get_packing_suggestions(temperature_celsius=22, condition="Partly cloudy", trip_days=5) + deactivate LLM + activate Agent + + Agent ->> MCP: tools/call: get_packing_suggestions {"temperature_celsius": 22, "condition": "Partly cloudy", "trip_days": 5} + activate MCP + MCP -->> Agent: result: {"clothing": [...], "accessories": [...], "tip": "..."} + deactivate MCP + + Agent ->> LLM: [messages + tool result: packing list] + deactivate Agent + activate LLM + Note over LLM: Has all information, produces final answer + + LLM ->> Agent: "Tokyo will be 22°C and partly cloudy. Pack light layers, jeans, sneakers..." + deactivate LLM + + Note over Agent: Displays final answer to user + + Agent ->> MCP: Close stdin + activate MCP + MCP ->> MCP: Exits + deactivate MCP +``` + +## Running the Example + +### Prerequisites + +```bash +pip install openai fastmcp +pip install -e ../../ # Install FastMCP instrumentation +export NVIDIA_API_KEY="nvapi-..." +``` + +### Quick Start + +```bash +# With console trace output +python weather_agent.py --console + +# Custom query +python weather_agent.py --query "What's the weather in London and what should I pack for a rainy day?" --console + +# With OTLP export (e.g., to Splunk O11y) +export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" +export OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT="true" +python weather_agent.py --wait 10 +``` + +The agent uses **NVIDIA Nemotron** (`nvidia/llama-3.3-nemotron-super-49b-v1`) via the +OpenAI-compatible API at `https://integrate.api.nvidia.com/v1`. + +### What Gets Instrumented + +The OpenTelemetry instrumentation captures: + +| Span | Description | +|------|-------------| +| `mcp.session` | Full lifecycle of the MCP client session | +| `tools/list` | Tool discovery call | +| `tools/call get_weather` | Individual tool invocation with args/result | +| `tools/call get_packing_suggestions` | Second tool invocation | +| `mcp.server.session.duration` | Server-side session metric | + +With `OTEL_INSTRUMENTATION_GENAI_EMITTERS="span_metric"`, you also get: +- `mcp.client.tool.duration` histogram +- `mcp.server.tool.duration` histogram diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/weather_agent/weather_agent.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/weather_agent/weather_agent.py new file mode 100644 index 00000000..a3a80672 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/weather_agent/weather_agent.py @@ -0,0 +1,327 @@ +#!/usr/bin/env python3 +""" +Weather Agent — MCP Client + LLM Orchestration + +Demonstrates the full agentic loop: + + ┌───────────┐ ┌────────────┐ ┌─────┐ + │ Agent │◄───────►│ MCP Server │ │ LLM │ + │(MCP Client)│ stdio │(subprocess)│ │ │ + └─────┬─────┘ └────────────┘ └──┬──┘ + │ │ + │ 1. initialize / tools/list │ + │ 2. user query ─────────────────────────►│ + │ 3. tool_call ◄──────────────────────────│ + │ 4. call_tool (MCP) ────► │ + │ 5. tool result ◄────── │ + │ 6. [messages + result] ────────────────►│ + │ 7. final answer ◄───────────────────────│ + └──────────────────────────────────────────┘ + +Usage: + export NVIDIA_API_KEY="nvapi-..." + + # --- Manual instrumentation (sets up providers in-process) --- + python weather_agent.py --manual --console + python weather_agent.py --manual --wait 10 + + # --- Zero-code instrumentation (providers configured by the wrapper) --- + opentelemetry-instrument python weather_agent.py --wait 10 + + # Custom query: + python weather_agent.py --manual --query "What's the weather in London?" +""" + +import argparse +import asyncio +import json +import os +import sys +from pathlib import Path + + +def setup_telemetry(console_output: bool = False): + """Configure OpenTelemetry providers and apply FastMCP instrumentation.""" + from opentelemetry import trace, metrics + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.resources import Resource + + resource = Resource.create( + {"service.name": os.environ.get("OTEL_SERVICE_NAME", "weather-agent")} + ) + + trace_provider = TracerProvider(resource=resource) + metric_readers = [] + + if console_output: + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, + ) + from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader, + ) + + trace_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + metric_readers.append( + PeriodicExportingMetricReader( + ConsoleMetricExporter(), export_interval_millis=5000 + ) + ) + + otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + if otlp_endpoint: + try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + + trace_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + metric_readers.append(PeriodicExportingMetricReader(OTLPMetricExporter())) + except ImportError: + print("OTLP exporters not available", file=sys.stderr) + + trace.set_tracer_provider(trace_provider) + if metric_readers: + metrics.set_meter_provider( + MeterProvider(resource=resource, metric_readers=metric_readers) + ) + + from opentelemetry.instrumentation.fastmcp import FastMCPInstrumentor + from opentelemetry.instrumentation.openai_v2 import OpenAIInstrumentor + + FastMCPInstrumentor().instrument() + OpenAIInstrumentor().instrument() + + +def _normalize_packing_args(args: dict) -> dict: + """Normalize LLM-generated args to match the tool's exact parameter names. + + LLMs frequently rename or nest parameters. This bridges the gap between + what the LLM sends and what the MCP tool schema expects. + """ + # Flatten nested weather_data dict if the LLM wrapped everything + if "weather_data" in args and isinstance(args["weather_data"], dict): + args = {**args, **args.pop("weather_data")} + + return { + "temperature": args.get("temperature") + or args.get("temperature_celsius") + or args.get("temp", 20), + "condition": args.get("condition") or args.get("weather_condition", "Clear"), + "days": args.get("days") + or args.get("trip_days") + or args.get("trip_duration", 3), + } + + +async def run_agent(user_query: str, manual: bool = True): + """ + Run the weather agent loop: + 1. Connect to weather MCP server (spawned as subprocess via stdio) + 2. Discover available tools + 3. Send user query + tool definitions to LLM + 4. Execute any tool calls the LLM requests via MCP + 5. Return tool results to LLM for final answer + """ + from fastmcp import Client + from fastmcp.client.transports import StdioTransport + from openai import OpenAI + + openai = OpenAI( + base_url="https://integrate.api.nvidia.com/v1", + api_key=os.environ["NVIDIA_API_KEY"], + ) + server_script = str(Path(__file__).parent / "weather_server.py") + + print(f"\n{'=' * 60}") + print("Weather Agent — MCP + LLM Agentic Loop") + print(f"{'=' * 60}") + print(f"\nUser: {user_query}") + + # MCP SDK only inherits a small env allowlist (HOME, PATH, …) when + # env=None, so OTEL_* vars must be passed explicitly to the server. + server_env = { + k: v + for k, v in os.environ.items() + if k.startswith(("OTEL_", "NVIDIA_", "VIRTUAL_ENV", "FASTMCP_")) + or k in ("HOME", "PATH", "SHELL", "TERM", "USER", "LOGNAME") + } + server_env["OTEL_SERVICE_NAME"] = "weather-mcp-server" + + if manual: + # Manual: server runs its own setup_server_telemetry() + transport = StdioTransport( + command=sys.executable, + args=[server_script, "--manual"], + env=server_env, + ) + else: + # Zero-code: opentelemetry-instrument auto-configures providers + # and discovers FastMCPInstrumentor via entry points. + transport = StdioTransport( + command="opentelemetry-instrument", + args=[sys.executable, server_script], + env=server_env, + ) + + # --- Step 1: Connect to MCP server (spawns subprocess, stdio transport) --- + async with Client(transport) as client: + print(f"\n✅ Connected to MCP server: {Path(server_script).name}") + + # --- Step 2: Discover tools --- + tools = await client.list_tools() + print(f"📋 Tools available: {[t.name for t in tools]}") + + openai_tools = [] + for tool in tools: + schema = tool.inputSchema if hasattr(tool, "inputSchema") else {} + openai_tools.append( + { + "type": "function", + "function": { + "name": tool.name, + "description": tool.description or "", + "parameters": schema, + }, + } + ) + + # --- Step 3: Send user query to LLM with tool definitions --- + messages = [ + { + "role": "system", + "content": ( + "You are a helpful travel assistant. Use the available tools to " + "get weather information and packing suggestions.\n\n" + "WORKFLOW:\n" + "1. Call get_weather with the city name.\n" + "2. Call get_packing_suggestions using EXACTLY these parameters:\n" + " - temperature: the numeric celsius value from the weather result\n" + " - condition: the condition string from the weather result\n" + " - days: the number of trip days\n\n" + "IMPORTANT: Pass individual parameters, NOT a nested object." + ), + }, + {"role": "user", "content": user_query}, + ] + + # --- Agentic loop: LLM calls until it produces a final text response --- + while True: + response = openai.chat.completions.create( + model="nvidia/llama-3.3-nemotron-super-49b-v1", + messages=messages, + tools=openai_tools, + tool_choice="auto", + ) + + assistant_message = response.choices[0].message + + if not assistant_message.tool_calls: + print(f"\n🤖 Assistant: {assistant_message.content}") + break + + # --- Step 4: Execute tool calls via MCP --- + messages.append(assistant_message.model_dump()) + + for tool_call in assistant_message.tool_calls: + fn_name = tool_call.function.name + fn_args = json.loads(tool_call.function.arguments) + + if fn_name == "get_packing_suggestions": + fn_args = _normalize_packing_args(fn_args) + + print(f"\n🔧 Tool call: {fn_name}({fn_args})") + + try: + result = await client.call_tool(fn_name, fn_args) + + if hasattr(result, "content") and result.content: + content = result.content[0] + result_text = ( + content.text if hasattr(content, "text") else str(content) + ) + else: + result_text = str(result) + except Exception as e: + result_text = f"ERROR: {e}" + + print(f" ➜ Result: {result_text[:120]}...") + + # --- Step 5: Append tool result for next LLM call --- + messages.append( + { + "role": "tool", + "tool_call_id": tool_call.id, + "content": result_text, + } + ) + + print(f"\n{'=' * 60}") + print("✅ Agent completed") + print(f"{'=' * 60}\n") + + +async def main( + manual: bool = True, + console_output: bool = False, + wait_seconds: int = 0, + query: str | None = None, +): + if manual: + setup_telemetry(console_output=console_output) + + user_query = ( + query + or "I'm traveling to Tokyo for 5 days next week. What's the weather like and what should I pack?" + ) + + try: + await run_agent(user_query, manual=manual) + except KeyboardInterrupt: + print("\n⏹️ Agent interrupted") + except Exception as e: + print(f"\n❌ Agent failed: {e}") + import traceback + + traceback.print_exc() + + if wait_seconds > 0: + print(f"⏳ Waiting {wait_seconds}s for telemetry flush...") + await asyncio.sleep(wait_seconds) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Weather Agent — MCP + LLM Demo") + parser.add_argument( + "--manual", + action="store_true", + help="Manual instrumentation (set up providers in-process). " + "Omit to use zero-code via opentelemetry-instrument.", + ) + parser.add_argument( + "--console", + action="store_true", + help="Enable console exporters (only with --manual)", + ) + parser.add_argument( + "--wait", type=int, default=0, help="Wait seconds for telemetry flush" + ) + parser.add_argument("--query", type=str, default=None, help="Custom user query") + args = parser.parse_args() + + asyncio.run( + main( + manual=args.manual, + console_output=args.console, + wait_seconds=args.wait, + query=args.query, + ) + ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/weather_agent/weather_server.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/weather_agent/weather_server.py new file mode 100644 index 00000000..a2a7baa8 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/examples/weather_agent/weather_server.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +""" +Weather MCP Server + +Provides weather and travel packing tools via the Model Context Protocol. +Designed to be spawned as a subprocess by the weather agent. + +Usage: + # --- Manual instrumentation (agent passes --manual) --- + python weather_server.py --manual + + # --- Zero-code instrumentation --- + opentelemetry-instrument python weather_server.py + +Telemetry setup reads OTEL env vars (OTEL_EXPORTER_OTLP_ENDPOINT, etc.) +propagated by the client process so both sides export to the same collector. +""" + +import os +import sys + + +def setup_server_telemetry(): + """Configure OTel providers and FastMCP instrumentation for the server.""" + from opentelemetry import trace, metrics + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.resources import Resource + + resource = Resource.create( + {"service.name": os.environ.get("OTEL_SERVICE_NAME", "weather-mcp-server")} + ) + + trace_provider = TracerProvider(resource=resource) + metric_readers = [] + + otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + if otlp_endpoint: + try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + + trace_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + metric_readers.append(PeriodicExportingMetricReader(OTLPMetricExporter())) + except ImportError: + print("OTLP exporters not available in server", file=sys.stderr) + + if os.environ.get("OTEL_SERVER_CONSOLE_EXPORT"): + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, + ) + + trace_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + + trace.set_tracer_provider(trace_provider) + if metric_readers: + metrics.set_meter_provider( + MeterProvider(resource=resource, metric_readers=metric_readers) + ) + + from opentelemetry.instrumentation.fastmcp import FastMCPInstrumentor + + FastMCPInstrumentor().instrument() + + +# Apply manual telemetry only when explicitly requested. +# Zero-code mode (opentelemetry-instrument) sets up providers automatically. +if "--manual" in sys.argv: + sys.argv.remove("--manual") + setup_server_telemetry() + + +from fastmcp import FastMCP # noqa: E402 + +mcp = FastMCP("Weather Server") + +MOCK_WEATHER = { + "tokyo": { + "temp_c": 22, + "condition": "Partly cloudy", + "humidity": 65, + "wind_kph": 12, + }, + "london": {"temp_c": 14, "condition": "Rainy", "humidity": 82, "wind_kph": 20}, + "new york": {"temp_c": 18, "condition": "Sunny", "humidity": 55, "wind_kph": 8}, + "paris": {"temp_c": 16, "condition": "Overcast", "humidity": 70, "wind_kph": 15}, + "sydney": {"temp_c": 26, "condition": "Sunny", "humidity": 45, "wind_kph": 10}, +} + + +@mcp.tool() +def get_weather(city: str) -> dict: + """Get current weather for a city. + + Args: + city: Name of the city to get weather for. + + Returns: + Weather data including temperature, condition, humidity, and wind speed. + """ + key = city.lower().strip() + if key in MOCK_WEATHER: + data = MOCK_WEATHER[key] + return { + "city": city, + "temperature_celsius": data["temp_c"], + "condition": data["condition"], + "humidity_percent": data["humidity"], + "wind_kph": data["wind_kph"], + } + return { + "city": city, + "temperature_celsius": 20, + "condition": "Clear", + "humidity_percent": 50, + "wind_kph": 10, + } + + +@mcp.tool() +def get_packing_suggestions(temperature: float, condition: str, days: int = 3) -> dict: + """Get packing suggestions based on weather conditions. + + Args: + temperature: Expected temperature in Celsius at the destination. + condition: Weather condition (e.g. Sunny, Rainy, Overcast, Partly cloudy). + days: Number of days for the trip. + + Returns: + Categorized packing suggestions. + """ + essentials = ["passport", "phone charger", "toiletries", "medications"] + clothing = [] + accessories = [] + + if temperature >= 25: + clothing = ["t-shirts", "shorts", "light dresses", "sandals", "swimwear"] + accessories = ["sunglasses", "sunscreen SPF 50", "hat", "reusable water bottle"] + elif temperature >= 15: + clothing = [ + "light layers", + "jeans", + "long-sleeve shirts", + "sneakers", + "light jacket", + ] + accessories = ["sunglasses", "sunscreen SPF 30", "daypack"] + else: + clothing = [ + "warm layers", + "sweaters", + "thermal underwear", + "boots", + "warm coat", + ] + accessories = ["scarf", "gloves", "beanie", "hand warmers"] + + if "rain" in condition.lower(): + accessories.extend(["umbrella", "waterproof jacket", "waterproof bag cover"]) + if "wind" in condition.lower() or "storm" in condition.lower(): + accessories.append("windbreaker") + + return { + "essentials": essentials, + "clothing": [ + f"{days}x {item}" if "shirt" in item or "t-shirt" in item else item + for item in clothing + ], + "accessories": accessories, + "tip": f"Pack for {temperature}°C and {condition.lower()} conditions. Layers are always a good idea!", + } + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Weather MCP Server") + parser.add_argument( + "--transport", + choices=["stdio", "http", "streamable-http", "sse"], + default="stdio", + help="Transport protocol (default: stdio)", + ) + parser.add_argument( + "--host", default="0.0.0.0", help="Host to bind (default: 0.0.0.0)" + ) + parser.add_argument( + "--port", type=int, default=8000, help="Port to bind (default: 8000)" + ) + args = parser.parse_args() + + if args.transport in ("http", "streamable-http"): + mcp.run(transport="streamable-http", host=args.host, port=args.port) + else: + mcp.run(transport=args.transport) diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/pyproject.toml index 361db17c..a498e9ac 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/pyproject.toml @@ -28,11 +28,15 @@ dependencies = [ ] [project.optional-dependencies] +# Note: fastmcp transitively depends on the `mcp` SDK (currently v1.x). +# When mcp >= 2.x ships (with native OTel support via PRs #2298/#2381), +# the transport_instrumentor bridge can be removed via runtime feature +# detection — no version pin change needed here. See README.rst for details. instruments = [ - "fastmcp >= 2.0.0, <= 2.14.7", + "fastmcp >= 3.0.0, < 4", ] test = [ - "fastmcp >= 2.0.0, <= 2.14.7", + "fastmcp >= 3.0.0, < 4", "pytest >= 7.0.0", "pytest-asyncio >= 0.21.0", ] diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/_mcp_context.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/_mcp_context.py new file mode 100644 index 00000000..ebe4d24e --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/_mcp_context.py @@ -0,0 +1,52 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ContextVar bridge for passing MCP transport metadata to instrumentor layers. + +The transport instrumentor populates this context on the server side so that +the server instrumentor can read transport-level attributes (jsonrpc.request.id, +network.transport, etc.) without coupling directly to transport internals. +""" + +from __future__ import annotations + +from contextvars import ContextVar +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class MCPRequestContext: + """Transport-layer metadata extracted from an incoming MCP request.""" + + jsonrpc_request_id: Optional[str] = None + mcp_method_name: Optional[str] = None + network_transport: Optional[str] = None + + +_mcp_request_context: ContextVar[Optional[MCPRequestContext]] = ContextVar( + "mcp_request_context", default=None +) + + +def set_mcp_request_context(ctx: MCPRequestContext) -> None: + _mcp_request_context.set(ctx) + + +def get_mcp_request_context() -> Optional[MCPRequestContext]: + return _mcp_request_context.get() + + +def clear_mcp_request_context() -> None: + _mcp_request_context.set(None) diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/client_instrumentor.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/client_instrumentor.py index 5c08ed07..b5569f96 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/client_instrumentor.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/client_instrumentor.py @@ -15,18 +15,19 @@ """FastMCP client-side instrumentation.""" import logging -import time +from typing import Any, Callable from wrapt import register_post_import_hook, wrap_function_wrapper from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( AgentInvocation, - Step, Error, + MCPOperation, MCPToolCall, ) from opentelemetry.instrumentation.fastmcp.utils import ( + detect_transport, safe_serialize, should_capture_content, truncate_if_needed, @@ -35,13 +36,41 @@ _LOGGER = logging.getLogger(__name__) +def _traced_mcp_operation( + handler: TelemetryHandler, + build_op: Callable[[Any, str], MCPOperation], +) -> Callable: + """Generic async wrapper for MCPOperation lifecycle. + + ``build_op(instance, transport)`` must return a ready-to-start + :class:`MCPOperation`. Duration is tracked by the handler via + ``start_time`` / ``end_time`` on the dataclass. + """ + + async def wrapper(wrapped, instance, args, kwargs): + transport = detect_transport(instance) + op = build_op(instance, transport) + + handler.start_mcp_operation(op) + try: + result = await wrapped(*args, **kwargs) + handler.stop_mcp_operation(op) + return result + except Exception as e: + op.is_error = True + handler.fail_mcp_operation(op, Error(type=type(e), message=str(e))) + raise + + return wrapper + + class ClientInstrumentor: """Handles FastMCP client-side instrumentation. Instruments: - FastMCP Client.__aenter__: Start client session trace - FastMCP Client.__aexit__: End client session trace - - MCP client operations (list_tools, call_tool) + - Client.call_tool, list_tools, read_resource, get_prompt """ def __init__(self, telemetry_handler: TelemetryHandler): @@ -50,52 +79,32 @@ def __init__(self, telemetry_handler: TelemetryHandler): def instrument(self): """Apply FastMCP client-side instrumentation.""" - # Instrument FastMCP Client session lifecycle - register_post_import_hook( - lambda _: wrap_function_wrapper( - "fastmcp.client", - "Client.__aenter__", - self._client_enter_wrapper(), - ), - "fastmcp.client", - ) - - register_post_import_hook( - lambda _: wrap_function_wrapper( - "fastmcp.client", - "Client.__aexit__", - self._client_exit_wrapper(), - ), - "fastmcp.client", - ) - - # Instrument client tool operations - register_post_import_hook( - lambda _: wrap_function_wrapper( - "fastmcp.client", - "Client.call_tool", - self._client_call_tool_wrapper(), - ), - "fastmcp.client", - ) - - register_post_import_hook( - lambda _: wrap_function_wrapper( + hooks = { + "Client.__aenter__": self._client_enter_wrapper(), + "Client.__aexit__": self._client_exit_wrapper(), + "Client.call_tool": self._client_call_tool_wrapper(), + "Client.list_tools": self._client_list_tools_wrapper(), + "Client.read_resource": self._client_read_resource_wrapper(), + "Client.get_prompt": self._client_get_prompt_wrapper(), + } + for target, wrapper in hooks.items(): + register_post_import_hook( + lambda _, t=target, w=wrapper: wrap_function_wrapper( + "fastmcp.client", t, w + ), "fastmcp.client", - "Client.list_tools", - self._client_list_tools_wrapper(), - ), - "fastmcp.client", - ) + ) def uninstrument(self): """Remove FastMCP client-side instrumentation. Note: wrapt doesn't provide a clean way to unwrap post-import hooks. - This is a known limitation. """ pass + # ------------------------------------------------------------------ + # Session lifecycle + # ------------------------------------------------------------------ def _client_enter_wrapper(self): """Wrapper for FastMCP Client.__aenter__ to start a session trace.""" instrumentor = self @@ -103,10 +112,8 @@ def _client_enter_wrapper(self): async def traced_enter(wrapped, instance, args, kwargs): try: - # Call original result = await wrapped(*args, **kwargs) - # Create an AgentInvocation to represent the client session session = AgentInvocation( name="mcp.client", agent_type="mcp_client", @@ -116,15 +123,12 @@ async def traced_enter(wrapped, instance, args, kwargs): session.attributes["gen_ai.operation.name"] = "mcp.client_session" session.attributes["network.transport"] = "pipe" # stdio = pipe - # Store session by instance id instrumentor._active_sessions[id(instance)] = session - - # Start agent invocation handler.start_agent(session) return result except Exception as e: - _LOGGER.debug(f"Error in client enter wrapper: {e}", exc_info=True) + _LOGGER.debug("Error in client enter wrapper: %s", e, exc_info=True) return await wrapped(*args, **kwargs) return traced_enter @@ -136,10 +140,7 @@ def _client_exit_wrapper(self): async def traced_exit(wrapped, instance, args, kwargs): try: - # Get active session session = instrumentor._active_sessions.pop(id(instance), None) - - # Check if exit was due to an exception exc_type = args[0] if args else None if session: @@ -161,54 +162,44 @@ async def traced_exit(wrapped, instance, args, kwargs): return await wrapped(*args, **kwargs) except Exception as e: - _LOGGER.debug(f"Error in client exit wrapper: {e}", exc_info=True) + _LOGGER.debug("Error in client exit wrapper: %s", e, exc_info=True) return await wrapped(*args, **kwargs) return traced_exit + # ------------------------------------------------------------------ + # tools/call (unique logic — not DRY-able with MCPOperation helpers) + # ------------------------------------------------------------------ def _client_call_tool_wrapper(self): - """Wrapper for FastMCP Client.call_tool. - - Uses ToolCall (not Step) to enable MCP-specific metrics: - - mcp.client.operation.duration - - mcp.tool.output.size - """ + """Wrapper for FastMCP Client.call_tool.""" handler = self._handler instrumentor = self async def traced_call_tool(wrapped, instance, args, kwargs): import uuid - # Extract tool name tool_name = args[0] if args else kwargs.get("name", "unknown") tool_args = args[1] if len(args) > 1 else kwargs.get("arguments", {}) - # Get parent agent invocation for context parent_session = instrumentor._active_sessions.get(id(instance)) + transport = detect_transport(instance) - # Create a MCPToolCall for proper MCP metrics emission tool_call = MCPToolCall( name=tool_name, arguments=tool_args, id=str(uuid.uuid4()), framework="fastmcp", provider="mcp", - # Per execute_tool semconv: tool_type indicates type of tool - # MCP tools are "extension" - executed on agent-side calling external APIs tool_type="extension", - # MCP semantic convention fields for metrics mcp_method_name="tools/call", - network_transport="pipe", # stdio = pipe - is_client=True, # This is client-side + network_transport=transport, + is_client=True, ) - # Link to parent agent if available if parent_session: tool_call.agent_name = parent_session.name tool_call.agent_id = parent_session.agent_id - # arguments is already set in constructor above - # If content capture is enabled and args are complex, serialize them if should_capture_content() and tool_args: try: serialized = safe_serialize(tool_args) @@ -219,13 +210,9 @@ async def traced_call_tool(wrapped, instance, args, kwargs): handler.start_tool_call(tool_call) - start_time = time.time() try: result = await wrapped(*args, **kwargs) - duration = time.time() - start_time - tool_call.duration_s = duration - output_size = 0 if result: try: @@ -237,68 +224,89 @@ async def traced_call_tool(wrapped, instance, args, kwargs): except Exception: pass - # Track output size for LLM context awareness tool_call.output_size_bytes = output_size handler.stop_tool_call(tool_call) return result except Exception as e: - duration = time.time() - start_time - tool_call.duration_s = duration tool_call.is_error = True handler.fail_tool_call(tool_call, Error(type=type(e), message=str(e))) raise return traced_call_tool + # ------------------------------------------------------------------ + # MCPOperation wrappers (list, read, get) + # ------------------------------------------------------------------ def _client_list_tools_wrapper(self): """Wrapper for FastMCP Client.list_tools.""" + return _traced_mcp_operation( + self._handler, + lambda _inst, transport: MCPOperation( + target="", + mcp_method_name="tools/list", + network_transport=transport, + is_client=True, + framework="fastmcp", + system="mcp", + ), + ) + + def _client_read_resource_wrapper(self): + """Wrapper for FastMCP Client.read_resource.""" handler = self._handler - async def traced_list_tools(wrapped, instance, args, kwargs): - # Create a Step to represent the list_tools operation - # Using MCP semantic convention attribute names - step = Step( - name="list_tools", - step_type="admin", - source="agent", + async def traced_read_resource(wrapped, instance, args, kwargs): + uri = str(args[0]) if args else str(kwargs.get("uri", "")) + transport = detect_transport(instance) + op = MCPOperation( + target=uri, + mcp_method_name="resources/read", + network_transport=transport, + mcp_resource_uri=uri or None, + is_client=True, framework="fastmcp", system="mcp", ) - step.attributes["mcp.method.name"] = "tools/list" - step.attributes["network.transport"] = "pipe" # stdio = pipe - - handler.start_step(step) - start_time = time.time() + handler.start_mcp_operation(op) try: result = await wrapped(*args, **kwargs) + handler.stop_mcp_operation(op) + return result + except Exception as e: + op.is_error = True + handler.fail_mcp_operation(op, Error(type=type(e), message=str(e))) + raise - duration = time.time() - start_time - step.attributes["mcp.client.operation.duration_s"] = duration + return traced_read_resource - # Capture tool names (metadata, not content - always captured) - if result: - try: - if hasattr(result, "tools"): - tool_names = [ - t.name for t in result.tools if hasattr(t, "name") - ] - step.attributes["mcp.tools.discovered"] = safe_serialize( - tool_names - ) - except Exception: - pass + def _client_get_prompt_wrapper(self): + """Wrapper for FastMCP Client.get_prompt.""" + handler = self._handler - handler.stop_step(step) - return result + async def traced_get_prompt(wrapped, instance, args, kwargs): + prompt_name = str(args[0]) if args else str(kwargs.get("name", "")) + transport = detect_transport(instance) + op = MCPOperation( + target=prompt_name, + mcp_method_name="prompts/get", + network_transport=transport, + gen_ai_prompt_name=prompt_name or None, + is_client=True, + framework="fastmcp", + system="mcp", + ) + handler.start_mcp_operation(op) + try: + result = await wrapped(*args, **kwargs) + handler.stop_mcp_operation(op) + return result except Exception as e: - duration = time.time() - start_time - step.attributes["mcp.client.operation.duration_s"] = duration - step.attributes["error.type"] = type(e).__name__ - handler.fail_step(step, Error(type=type(e), message=str(e))) + op.is_error = True + handler.fail_mcp_operation(op, Error(type=type(e), message=str(e))) raise - return traced_list_tools + return traced_get_prompt diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/instrumentation.py index c466b0bf..d8ce3434 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/instrumentation.py @@ -36,7 +36,7 @@ _LOGGER = logging.getLogger(__name__) -_instruments = ("fastmcp >= 2.0.0, <= 2.14.7",) +_instruments = ("fastmcp >= 3.0.0, < 4",) class FastMCPInstrumentor(BaseInstrumentor): diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/server_instrumentor.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/server_instrumentor.py index c6f561b8..8a4ae015 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/server_instrumentor.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/server_instrumentor.py @@ -15,7 +15,7 @@ """FastMCP server-side instrumentation.""" import logging -import time +from contextvars import ContextVar from typing import Optional from uuid import uuid4 @@ -24,11 +24,16 @@ from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( AgentInvocation, + Error, + MCPOperation, MCPToolCall, Workflow, - Error, +) +from opentelemetry.instrumentation.fastmcp._mcp_context import ( + get_mcp_request_context, ) from opentelemetry.instrumentation.fastmcp.utils import ( + detect_transport, extract_tool_info, extract_result_content, should_capture_content, @@ -37,14 +42,32 @@ _LOGGER = logging.getLogger(__name__) +# FastMCP 3.x call_tool recurses through middleware (run_middleware=True → +# middleware → self.call_tool(..., run_middleware=False)). This guard +# ensures only the outermost call creates a span. +_IN_TOOL_CALL: ContextVar[bool] = ContextVar("_in_tool_call", default=False) + + +def _enrich_from_request_context(op: MCPOperation) -> None: + """Copy transport-layer metadata from the ContextVar into an operation.""" + ctx = get_mcp_request_context() + if ctx is None: + return + if ctx.jsonrpc_request_id and op.jsonrpc_request_id is None: + op.jsonrpc_request_id = ctx.jsonrpc_request_id + if ctx.network_transport and op.network_transport is None: + op.network_transport = ctx.network_transport + class ServerInstrumentor: - """Handles FastMCP server-side instrumentation. + """Handles FastMCP 3.x server-side instrumentation. Instruments: - FastMCP.__init__: Capture server name for context - Server.run: Track server session lifecycle (mcp.server.session.duration) - - ToolManager.call_tool: Trace individual tool executions + - FastMCP.call_tool: Trace tool executions + - FastMCP.read_resource: Trace resource reads + - FastMCP.render_prompt: Trace prompt rendering """ def __init__(self, telemetry_handler: TelemetryHandler): @@ -54,7 +77,6 @@ def __init__(self, telemetry_handler: TelemetryHandler): def instrument(self): """Apply FastMCP server-side instrumentation.""" - # Instrument FastMCP.__init__ to capture server name register_post_import_hook( lambda _: wrap_function_wrapper( "fastmcp", @@ -74,21 +96,38 @@ def instrument(self): "mcp.server.lowlevel.server", ) - # Instrument ToolManager.call_tool for tool execution + # Wrap FastMCP server methods (3.x API surface). register_post_import_hook( lambda _: wrap_function_wrapper( - "fastmcp.tools.tool_manager", - "ToolManager.call_tool", + "fastmcp.server.server", + "FastMCP.call_tool", self._tool_call_wrapper(), ), - "fastmcp.tools.tool_manager", + "fastmcp.server.server", + ) + + register_post_import_hook( + lambda _: wrap_function_wrapper( + "fastmcp.server.server", + "FastMCP.read_resource", + self._read_resource_wrapper(), + ), + "fastmcp.server.server", + ) + + register_post_import_hook( + lambda _: wrap_function_wrapper( + "fastmcp.server.server", + "FastMCP.render_prompt", + self._render_prompt_wrapper(), + ), + "fastmcp.server.server", ) def uninstrument(self): """Remove FastMCP server-side instrumentation. Note: wrapt doesn't provide a clean way to unwrap post-import hooks. - This is a known limitation. """ pass @@ -100,7 +139,6 @@ def traced_init(wrapped, instance, args, kwargs): try: result = wrapped(*args, **kwargs) - # Extract server name from args or kwargs if args and len(args) > 0: instrumentor._server_name = f"{args[0]}" elif "name" in kwargs: @@ -109,11 +147,12 @@ def traced_init(wrapped, instance, args, kwargs): instrumentor._server_name = "mcp_server" _LOGGER.debug( - f"FastMCP server initialized: {instrumentor._server_name}" + "FastMCP server initialized: %s", + instrumentor._server_name, ) return result except Exception as e: - _LOGGER.debug(f"Error in FastMCP init wrapper: {e}", exc_info=True) + _LOGGER.debug("Error in FastMCP init wrapper: %s", e, exc_info=True) return wrapped(*args, **kwargs) return traced_init @@ -154,81 +193,153 @@ async def traced_server_run(wrapped, instance, args, kwargs): return traced_server_run + # ------------------------------------------------------------------ + # tools/call + # ------------------------------------------------------------------ def _tool_call_wrapper(self): """Create wrapper for FastMCP tool execution.""" instrumentor = self handler = self._handler async def traced_tool_call(wrapped, instance, args, kwargs): - # Extract tool information + if _IN_TOOL_CALL.get(): + return await wrapped(*args, **kwargs) + tool_name, tool_arguments = extract_tool_info(args, kwargs) + transport = detect_transport(instance) - # Create MCPToolCall entity with MCP semantic convention attributes tool_call = MCPToolCall( name=tool_name, arguments=tool_arguments, id=str(uuid4()), framework="fastmcp", system="mcp", - # Per execute_tool semconv: tool_type indicates type of tool - # MCP tools are "extension" - executed on agent-side calling external APIs tool_type="extension", - # MCP semantic convention fields - mcp_method_name="tools/call", # Per OTel semconv for tool calls - network_transport="pipe", # stdio transport = pipe - mcp_server_name=instrumentor._server_name, # Server name from init - is_client=False, # This is server-side + mcp_method_name="tools/call", + network_transport=transport, + sdot_mcp_server_name=instrumentor._server_name, + is_client=False, ) + _enrich_from_request_context(tool_call) - # Capture input if content capture is enabled - # Note: arguments field has semconv_content metadata, will be applied if capture enabled - - # Start tool tracking handler.start_tool_call(tool_call) + token = _IN_TOOL_CALL.set(True) - start_time = time.time() try: - # Execute the original tool call result = await wrapped(*args, **kwargs) - # Record duration for metrics - tool_call.duration_s = time.time() - start_time - - # Capture output if content capture is enabled if result: try: output_content = extract_result_content(result) if output_content: - # Track output size for metrics (impacts LLM context) tool_call.output_size_bytes = len( output_content.encode("utf-8") ) if should_capture_content(): - # Use tool_result field - span emitter applies via semconv_content tool_call.tool_result = truncate_if_needed( output_content ) except Exception as e: - _LOGGER.debug(f"Error capturing tool output: {e}") + _LOGGER.debug("Error capturing tool output: %s", e) - # Check for error in result (tool_error per semconv) if hasattr(result, "isError") and result.isError: tool_call.is_error = True tool_call.error_type = "tool_error" - # Stop tool tracking (success) handler.stop_tool_call(tool_call) - return result except Exception as e: - # Record error with duration - tool_call.duration_s = time.time() - start_time tool_call.is_error = True tool_call.error_type = type(e).__name__ - - # Fail tool tracking handler.fail_tool_call(tool_call, Error(type=type(e), message=str(e))) raise + finally: + _IN_TOOL_CALL.reset(token) return traced_tool_call + + # ------------------------------------------------------------------ + # resources/read + # ------------------------------------------------------------------ + def _read_resource_wrapper(self): + """Wrapper for FastMCP.read_resource.""" + instrumentor = self + handler = self._handler + + async def traced_read_resource(wrapped, instance, args, kwargs): + uri = str(args[0]) if args else str(kwargs.get("uri", "")) + transport = detect_transport(instance) + + op = MCPOperation( + target=uri, + mcp_method_name="resources/read", + network_transport=transport, + mcp_resource_uri=uri or None, + sdot_mcp_server_name=instrumentor._server_name, + is_client=False, + framework="fastmcp", + system="mcp", + ) + _enrich_from_request_context(op) + + handler.start_mcp_operation(op) + + try: + result = await wrapped(*args, **kwargs) + handler.stop_mcp_operation(op) + return result + except Exception as e: + op.is_error = True + handler.fail_mcp_operation(op, Error(type=type(e), message=str(e))) + raise + + return traced_read_resource + + # ------------------------------------------------------------------ + # prompts/get (maps to FastMCP.render_prompt, not get_prompt) + # ------------------------------------------------------------------ + def _render_prompt_wrapper(self): + """Wrapper for FastMCP.render_prompt. + + ``render_prompt`` is the server-side handler for the MCP + ``prompts/get`` protocol method. ``get_prompt`` is only the + internal prompt definition lookup and is never invoked by + the MCP request path. + """ + instrumentor = self + handler = self._handler + + async def traced_render_prompt(wrapped, instance, args, kwargs): + prompt_name = str(args[0]) if args else str(kwargs.get("name", "")) + transport = detect_transport(instance) + + op = MCPOperation( + target=prompt_name, + mcp_method_name="prompts/get", + network_transport=transport, + gen_ai_prompt_name=prompt_name or None, + sdot_mcp_server_name=instrumentor._server_name, + is_client=False, + framework="fastmcp", + system="mcp", + ) + _enrich_from_request_context(op) + + handler.start_mcp_operation(op) + + try: + result = await wrapped(*args, **kwargs) + handler.stop_mcp_operation(op) + return result + except Exception as e: + op.is_error = True + handler.fail_mcp_operation(op, Error(type=type(e), message=str(e))) + raise + + return traced_render_prompt + + # Kept for backward compatibility with existing tests + def _get_prompt_wrapper(self): + """Deprecated: use _render_prompt_wrapper instead.""" + return self._render_prompt_wrapper() diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/transport_instrumentor.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/transport_instrumentor.py index 8d0d4fad..f2bf3a65 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/transport_instrumentor.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/transport_instrumentor.py @@ -15,12 +15,16 @@ """ MCP Transport layer instrumentation for trace context propagation. -This module instruments the low-level MCP SDK transport to ensure trace context -(traceparent, tracestate) is propagated between client and server processes. - -Approach: -- Client side: Wrap BaseSession.send_request to inject trace context into _meta -- Server side: Wrap ServerSession._received_request to extract from request_meta +Temporary bridge for ``mcp`` v1.x which does not natively propagate +OpenTelemetry context. Can be removed once ``mcp >= 2.x`` is adopted +(native support landed on the upstream ``main`` branch via PRs #2298 and +#2381). See the package README for upstream tracking details. + +- Client side: wraps ``BaseSession.send_request`` to inject trace context + into ``params.meta`` (serialized as ``_meta``). +- Server side: wraps ``Server._handle_request`` to extract trace context + from ``request_meta`` and populate ``MCPRequestContext`` for downstream + instrumentors. """ import logging @@ -30,13 +34,46 @@ from opentelemetry import context, propagate +from opentelemetry.instrumentation.fastmcp._mcp_context import ( + MCPRequestContext, + clear_mcp_request_context, + set_mcp_request_context, +) +from opentelemetry.instrumentation.fastmcp.utils import detect_transport + _LOGGER = logging.getLogger(__name__) +def _extract_carrier_from_meta(request_meta: Any) -> dict[str, str]: + """Build a W3C carrier dict from a Pydantic Meta object. + + Checks both first-class attributes and ``model_extra`` so that + traceparent, tracestate, and baggage are all captured. + + TODO: Remove when mcp >= 2.x is adopted (see README). + """ + carrier: dict[str, str] = {} + if request_meta is None: + return carrier + + for key in ("traceparent", "tracestate", "baggage"): + val = getattr(request_meta, key, None) + if val: + carrier[key] = val + + if hasattr(request_meta, "model_extra"): + extra = request_meta.model_extra or {} + for key in ("traceparent", "tracestate", "baggage"): + if key not in carrier and key in extra: + carrier[key] = extra[key] + + return carrier + + class TransportInstrumentor: """Instruments MCP transport layer for trace context propagation. - This handles the low-level MCP SDK to ensure traces are properly correlated + Handles low-level MCP SDK to ensure traces are properly correlated across client/server process boundaries. """ @@ -48,7 +85,6 @@ def instrument(self): if self._instrumented: return - # Wrap client-side request sending to inject trace context register_post_import_hook( lambda _: wrap_function_wrapper( "mcp.shared.session", @@ -58,9 +94,6 @@ def instrument(self): "mcp.shared.session", ) - # Wrap server-side request handling to extract trace context - # Use Server._handle_request which is invoked for each request - # and has access to message.request_meta with the traceparent register_post_import_hook( lambda _: wrap_function_wrapper( "mcp.server.lowlevel", @@ -77,29 +110,22 @@ def uninstrument(self): """Remove MCP transport instrumentation. Note: wrapt doesn't provide a clean way to unwrap post-import hooks. - This is a known limitation. """ self._instrumented = False def _send_request_wrapper(self): """Wrapper for BaseSession.send_request to inject trace context. - This runs on the client side before sending any MCP request. - Injects traceparent/tracestate into the request's params.meta field. + Runs on the client side before sending any MCP request. Injects + traceparent, tracestate, and baggage into params.meta. - The MCP SDK request structure: - - ClientRequest is a discriminated union with a 'root' attribute - - request.root contains the actual request (CallToolRequest, etc.) - - request.root.params.meta (aliased as _meta in JSON) is a Meta object - - Meta has extra='allow', so we can add traceparent/tracestate + TODO: Remove when mcp >= 2.x is adopted (see README). """ async def traced_send_request(wrapped, instance, args, kwargs) -> Any: try: - # args[0] is the request wrapper (ClientRequest) request = args[0] if args else kwargs.get("request") - # Handle discriminated union pattern: ClientRequest has 'root' actual_request = request if hasattr(request, "root"): actual_request = request.root @@ -111,12 +137,8 @@ async def traced_send_request(wrapped, instance, args, kwargs) -> Any: ): params = actual_request.params - # Create or get the meta object - # In pydantic models, 'meta' is the Python attribute, - # '_meta' is the JSON alias if hasattr(params, "meta"): if params.meta is None: - # Create a new Meta object try: from mcp.types import RequestParams @@ -125,10 +147,7 @@ async def traced_send_request(wrapped, instance, args, kwargs) -> Any: pass if params.meta is not None: - # Inject trace context into meta - # Meta allows extra fields, so we can set - # traceparent and tracestate directly - carrier = {} + carrier: dict[str, str] = {} propagate.inject(carrier) if carrier: @@ -137,13 +156,14 @@ async def traced_send_request(wrapped, instance, args, kwargs) -> Any: method = getattr(actual_request, "method", "unknown") _LOGGER.debug( - f"Injected trace context into MCP request: " - f"{method}, carrier={carrier}" + "Injected trace context into MCP request: " + "%s, carrier=%s", + method, + carrier, ) except Exception as e: - _LOGGER.debug(f"Error injecting trace context: {e}", exc_info=True) + _LOGGER.debug("Error injecting trace context: %s", e, exc_info=True) - # Call original method return await wrapped(*args, **kwargs) return traced_send_request @@ -151,62 +171,61 @@ async def traced_send_request(wrapped, instance, args, kwargs) -> Any: def _server_handle_request_wrapper(self): """Wrapper for Server._handle_request to extract trace context. - This runs on the server side when handling an MCP request. - The method signature is: - _handle_request(self, message, req, session, lifespan_context, raise_exceptions) + Runs on the server side. Extracts W3C context from ``request_meta`` + and populates :class:`MCPRequestContext` for the server instrumentor. + + TODO: Trace-context extract/attach removable when mcp >= 2.x is + adopted; MCPRequestContext population must remain (see README). - message: RequestResponder with request_meta containing traceparent - req: The actual request (CallToolRequest, etc.) + Method signature: + _handle_request(self, message, req, session, lifespan_context, + raise_exceptions) """ async def traced_handle_request(wrapped, instance, args, kwargs) -> Any: token = None try: - # args[0] is the message (RequestResponder) message = args[0] if args else kwargs.get("message") + req = args[1] if len(args) > 1 else kwargs.get("req") + + carrier: dict[str, str] = {} + jsonrpc_id: str | None = None + method_name: str | None = None + transport = detect_transport(instance) if message and hasattr(message, "request_meta"): - request_meta = message.request_meta - - if request_meta is not None: - # Extract trace context from request_meta - # The meta object may have traceparent/tracestate as attributes - carrier = {} - - # Try to get traceparent and tracestate from meta - # First check as attribute (getattr handles pydantic properly) - traceparent = getattr(request_meta, "traceparent", None) - if traceparent: - carrier["traceparent"] = traceparent - - tracestate = getattr(request_meta, "tracestate", None) - if tracestate: - carrier["tracestate"] = tracestate - - # Also try model_extra for pydantic v2 extra fields - if not carrier and hasattr(request_meta, "model_extra"): - extra = request_meta.model_extra - if extra: - for key in ["traceparent", "tracestate"]: - if key in extra: - carrier[key] = extra[key] - - if carrier: - ctx = propagate.extract(carrier) - token = context.attach(ctx) - _LOGGER.debug( - f"Attached trace context in _handle_request: " - f"carrier={carrier}" - ) + carrier = _extract_carrier_from_meta(message.request_meta) + + if message and hasattr(message, "request_id"): + raw_id = message.request_id + if raw_id is not None: + jsonrpc_id = str(raw_id) + + if req is not None: + method_name = getattr(req, "method", None) + + if carrier: + ctx = propagate.extract(carrier) + token = context.attach(ctx) + _LOGGER.debug( + "Attached trace context in _handle_request: carrier=%s", + carrier, + ) + + mcp_ctx = MCPRequestContext( + jsonrpc_request_id=jsonrpc_id, + mcp_method_name=method_name, + network_transport=transport, + ) + set_mcp_request_context(mcp_ctx) except Exception as e: - _LOGGER.debug(f"Error extracting trace context: {e}", exc_info=True) + _LOGGER.debug("Error extracting trace context: %s", e, exc_info=True) try: - # Call original method with attached context return await wrapped(*args, **kwargs) finally: - # Detach context after request handling + clear_mcp_request_context() if token is not None: try: context.detach(token) @@ -215,7 +234,7 @@ async def traced_handle_request(wrapped, instance, args, kwargs) -> Any: return traced_handle_request - # Keep old wrapper for backwards compatibility / tests + # kept for backward compatibility with existing tests def _server_received_request_wrapper(self): """Legacy wrapper - replaced by _server_handle_request_wrapper.""" return self._server_handle_request_wrapper() diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/utils.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/utils.py index 0ca2305a..9289d31b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/utils.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/utils.py @@ -18,6 +18,7 @@ import json import logging import os +from importlib import import_module from typing import Any, Optional _LOGGER = logging.getLogger(__name__) @@ -142,6 +143,40 @@ def is_instrumentation_enabled() -> bool: return env_value in ("true", "1", "yes", "on") +def detect_transport(instance: object) -> str: + """Best-effort transport detection from a FastMCP Client or MCP Server. + + Detection strategy (checked in order): + 1. Client-side: inspect ``instance.transport`` class name for SSE/streamable. + 2. Server-side: read ``fastmcp.settings.transport`` which reflects the + value passed to ``FastMCP.run()`` (or its env-var default). + 3. Fallback: ``"pipe"`` (stdio). + + Returns: + ``"tcp"`` for SSE / streamable-HTTP transports, ``"pipe"`` otherwise. + """ + _TCP_KEYWORDS = ("sse", "streamable", "http") + + try: + transport_obj = getattr(instance, "transport", None) + if transport_obj is not None: + cls_name = type(transport_obj).__name__.lower() + if any(kw in cls_name for kw in _TCP_KEYWORDS): + return "tcp" + except Exception: + pass + + try: + _settings = import_module("fastmcp.settings") + val = getattr(_settings, "transport", None) + if isinstance(val, str) and any(kw in val.lower() for kw in _TCP_KEYWORDS): + return "tcp" + except Exception: + pass + + return "pipe" + + def extract_tool_info(args: tuple, kwargs: dict) -> tuple[str, Any]: """Extract tool name and arguments from call_tool parameters. diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/version.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/version.py index b3f47562..d3ec452c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/version.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/version.py @@ -1 +1 @@ -__version__ = "0.1.2" +__version__ = "0.2.0" diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_client_instrumentor.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_client_instrumentor.py index 4ef314f5..31b72a80 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_client_instrumentor.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_client_instrumentor.py @@ -141,28 +141,24 @@ async def test_client_call_tool_wrapper_failure(self, mock_telemetry_handler): @pytest.mark.asyncio async def test_client_list_tools_wrapper(self, mock_telemetry_handler): - """Test client list_tools wrapper.""" + """Test client list_tools wrapper uses MCPOperation.""" instrumentor = ClientInstrumentor(mock_telemetry_handler) wrapper = instrumentor._client_list_tools_wrapper() - # Create mock result with tools - class MockTool: - def __init__(self, name): - self.name = name - mock_result = MagicMock() - mock_result.tools = [MockTool("tool1"), MockTool("tool2")] + mock_result.tools = [MagicMock(name="tool1"), MagicMock(name="tool2")] mock_wrapped = AsyncMock(return_value=mock_result) result = await wrapper(mock_wrapped, MagicMock(), (), {}) assert result == mock_result - assert mock_telemetry_handler.start_step.called - assert mock_telemetry_handler.stop_step.called - - # Verify Step attributes - MCP semantic conventions - step = mock_telemetry_handler.start_step.call_args[0][0] - assert step.name == "list_tools" - assert step.step_type == "admin" - assert step.attributes["mcp.method.name"] == "tools/list" - assert step.attributes["network.transport"] == "pipe" + assert mock_telemetry_handler.start_mcp_operation.called + assert mock_telemetry_handler.stop_mcp_operation.called + + from opentelemetry.util.genai.types import MCPOperation + + op = mock_telemetry_handler.start_mcp_operation.call_args[0][0] + assert isinstance(op, MCPOperation) + assert op.mcp_method_name == "tools/list" + assert op.is_client is True + assert op.network_transport == "pipe" diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_server_instrumentor.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_server_instrumentor.py index 5e360ff0..3de12204 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_server_instrumentor.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_server_instrumentor.py @@ -1,17 +1,3 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - """Tests for FastMCP server-side instrumentation.""" import pytest @@ -20,7 +6,7 @@ from opentelemetry.instrumentation.fastmcp.server_instrumentor import ( ServerInstrumentor, ) -from opentelemetry.util.genai.types import MCPToolCall +from opentelemetry.util.genai.types import MCPOperation, MCPToolCall class TestServerInstrumentor: @@ -37,11 +23,9 @@ def test_fastmcp_init_wrapper_with_args(self, mock_telemetry_handler): instrumentor = ServerInstrumentor(mock_telemetry_handler) wrapper = instrumentor._fastmcp_init_wrapper() - # Mock the wrapped function mock_wrapped = MagicMock(return_value=None) mock_instance = MagicMock() - # Call with positional arg wrapper(mock_wrapped, mock_instance, ("my-server",), {}) assert instrumentor._server_name == "my-server" @@ -59,6 +43,9 @@ def test_fastmcp_init_wrapper_with_kwargs(self, mock_telemetry_handler): assert instrumentor._server_name == "kwargs-server" + # ------------------------------------------------------------------ + # tools/call + # ------------------------------------------------------------------ @pytest.mark.asyncio async def test_tool_call_wrapper_success(self, mock_telemetry_handler): """Test tool call wrapper for successful execution.""" @@ -66,33 +53,27 @@ async def test_tool_call_wrapper_success(self, mock_telemetry_handler): instrumentor._server_name = "test-server" wrapper = instrumentor._tool_call_wrapper() - # Mock the wrapped async function mock_result = MagicMock() mock_result.content = [MagicMock(text="result text")] mock_result.isError = False mock_wrapped = AsyncMock(return_value=mock_result) mock_instance = MagicMock() - # Call the wrapper result = await wrapper( mock_wrapped, mock_instance, ("my_tool", {"arg1": "value1"}), {} ) - # Verify handler was called assert mock_telemetry_handler.start_tool_call.called assert mock_telemetry_handler.stop_tool_call.called assert not mock_telemetry_handler.fail_tool_call.called - - # Verify result is returned assert result == mock_result - # Verify MCPToolCall was created with correct attributes tool_call = mock_telemetry_handler.start_tool_call.call_args[0][0] assert isinstance(tool_call, MCPToolCall) assert tool_call.name == "my_tool" assert tool_call.framework == "fastmcp" assert tool_call.system == "mcp" - assert tool_call.mcp_server_name == "test-server" + assert tool_call.sdot_mcp_server_name == "test-server" @pytest.mark.asyncio async def test_tool_call_wrapper_failure(self, mock_telemetry_handler): @@ -101,17 +82,17 @@ async def test_tool_call_wrapper_failure(self, mock_telemetry_handler): instrumentor._server_name = "test-server" wrapper = instrumentor._tool_call_wrapper() - # Mock the wrapped async function that raises mock_wrapped = AsyncMock(side_effect=ValueError("Test error")) mock_instance = MagicMock() - # Call and expect exception with pytest.raises(ValueError, match="Test error"): await wrapper( - mock_wrapped, mock_instance, ("failing_tool",), {"arguments": {}} + mock_wrapped, + mock_instance, + ("failing_tool",), + {"arguments": {}}, ) - # Verify fail_tool_call was called assert mock_telemetry_handler.start_tool_call.called assert mock_telemetry_handler.fail_tool_call.called assert not mock_telemetry_handler.stop_tool_call.called @@ -119,23 +100,23 @@ async def test_tool_call_wrapper_failure(self, mock_telemetry_handler): @pytest.mark.asyncio async def test_tool_call_wrapper_with_content_capture(self, mock_telemetry_handler): """Test tool call wrapper with content capture enabled.""" - instrumentor = ServerInstrumentor(mock_telemetry_handler) - wrapper = instrumentor._tool_call_wrapper() - - mock_result = MagicMock() - mock_result.content = [MagicMock(text="captured output")] - mock_wrapped = AsyncMock(return_value=mock_result) - with patch.dict( - "os.environ", {"OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT": "true"} + "os.environ", + {"OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT": "true"}, ): + instrumentor = ServerInstrumentor(mock_telemetry_handler) + wrapper = instrumentor._tool_call_wrapper() + + mock_result = MagicMock() + mock_result.content = [MagicMock(text="captured output")] + mock_wrapped = AsyncMock(return_value=mock_result) + await wrapper( mock_wrapped, MagicMock(), ("tool_name", {"input": "data"}), {} ) - # Verify the call succeeded and tool was properly tracked - assert mock_telemetry_handler.start_tool_call.called - assert mock_telemetry_handler.stop_tool_call.called + assert mock_telemetry_handler.start_tool_call.called + assert mock_telemetry_handler.stop_tool_call.called @pytest.mark.asyncio async def test_tool_call_wrapper_with_error_result(self, mock_telemetry_handler): @@ -143,7 +124,6 @@ async def test_tool_call_wrapper_with_error_result(self, mock_telemetry_handler) instrumentor = ServerInstrumentor(mock_telemetry_handler) wrapper = instrumentor._tool_call_wrapper() - # Create result with isError=True mock_result = MagicMock() mock_result.isError = True mock_result.content = [MagicMock(text="error message")] @@ -153,12 +133,86 @@ async def test_tool_call_wrapper_with_error_result(self, mock_telemetry_handler) mock_wrapped, MagicMock(), ("error_tool",), {"arguments": {}} ) - # Result should still be returned assert result == mock_result - - # stop_tool_call should be called (not fail_tool_call since no exception) assert mock_telemetry_handler.stop_tool_call.called - # Verify error_type field was set per MCP semconv tool_call = mock_telemetry_handler.stop_tool_call.call_args[0][0] assert tool_call.error_type == "tool_error" + + # ------------------------------------------------------------------ + # resources/read + # ------------------------------------------------------------------ + @pytest.mark.asyncio + async def test_read_resource_wrapper_success(self, mock_telemetry_handler): + """Test read_resource wrapper creates correct MCPOperation.""" + instrumentor = ServerInstrumentor(mock_telemetry_handler) + instrumentor._server_name = "res-server" + wrapper = instrumentor._read_resource_wrapper() + + mock_wrapped = AsyncMock(return_value="resource content") + result = await wrapper(mock_wrapped, MagicMock(), ("system://info",), {}) + + assert result == "resource content" + assert mock_telemetry_handler.start_mcp_operation.called + assert mock_telemetry_handler.stop_mcp_operation.called + + op = mock_telemetry_handler.start_mcp_operation.call_args[0][0] + assert isinstance(op, MCPOperation) + assert op.mcp_method_name == "resources/read" + assert op.target == "system://info" + assert op.mcp_resource_uri == "system://info" + assert op.sdot_mcp_server_name == "res-server" + assert op.is_client is False + + @pytest.mark.asyncio + async def test_read_resource_wrapper_failure(self, mock_telemetry_handler): + """Test read_resource wrapper handles exceptions.""" + instrumentor = ServerInstrumentor(mock_telemetry_handler) + wrapper = instrumentor._read_resource_wrapper() + + mock_wrapped = AsyncMock(side_effect=FileNotFoundError("not found")) + + with pytest.raises(FileNotFoundError): + await wrapper(mock_wrapped, MagicMock(), ("system://missing",), {}) + + assert mock_telemetry_handler.start_mcp_operation.called + assert mock_telemetry_handler.fail_mcp_operation.called + + # ------------------------------------------------------------------ + # prompts/get (render_prompt) + # ------------------------------------------------------------------ + @pytest.mark.asyncio + async def test_render_prompt_wrapper_success(self, mock_telemetry_handler): + """Test render_prompt wrapper creates correct MCPOperation.""" + instrumentor = ServerInstrumentor(mock_telemetry_handler) + instrumentor._server_name = "prompt-server" + wrapper = instrumentor._render_prompt_wrapper() + + mock_wrapped = AsyncMock(return_value="rendered prompt") + result = await wrapper(mock_wrapped, MagicMock(), ("weather_forecast",), {}) + + assert result == "rendered prompt" + assert mock_telemetry_handler.start_mcp_operation.called + assert mock_telemetry_handler.stop_mcp_operation.called + + op = mock_telemetry_handler.start_mcp_operation.call_args[0][0] + assert isinstance(op, MCPOperation) + assert op.mcp_method_name == "prompts/get" + assert op.target == "weather_forecast" + assert op.gen_ai_prompt_name == "weather_forecast" + assert op.sdot_mcp_server_name == "prompt-server" + assert op.is_client is False + + @pytest.mark.asyncio + async def test_render_prompt_wrapper_failure(self, mock_telemetry_handler): + """Test render_prompt wrapper handles exceptions.""" + instrumentor = ServerInstrumentor(mock_telemetry_handler) + wrapper = instrumentor._render_prompt_wrapper() + + mock_wrapped = AsyncMock(side_effect=KeyError("no such prompt")) + + with pytest.raises(KeyError): + await wrapper(mock_wrapped, MagicMock(), ("nonexistent",), {}) + + assert mock_telemetry_handler.start_mcp_operation.called + assert mock_telemetry_handler.fail_mcp_operation.called diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_utils.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_utils.py index 0c0cf69a..bd3ecf04 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_utils.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/tests/test_utils.py @@ -24,6 +24,7 @@ is_instrumentation_enabled, extract_tool_info, extract_result_content, + detect_transport, ) @@ -233,3 +234,72 @@ class MockResult: result = extract_result_content(MockResult()) assert result is not None assert "item_value" in result + + +class TestDetectTransport: + """Tests for detect_transport function.""" + + def test_plain_object_defaults_to_pipe(self): + """Objects without transport attributes fall back to pipe.""" + assert detect_transport(object()) == "pipe" + + def test_sse_transport_attribute(self): + """Instance with an SSE-like transport attribute returns tcp.""" + + class SSETransport: + pass + + class ClientWithSSE: + transport = SSETransport() + + ClientWithSSE.transport.__class__.__name__ = "SSETransport" + assert detect_transport(ClientWithSSE()) == "tcp" + + def test_streamable_transport_attribute(self): + """Instance with a streamable-HTTP transport returns tcp.""" + + class StreamableHTTPTransport: + pass + + class ClientWithStreamable: + transport = StreamableHTTPTransport() + + assert detect_transport(ClientWithStreamable()) == "tcp" + + def test_stdio_transport_attribute(self): + """Instance with a stdio transport returns pipe.""" + + class StdioTransport: + pass + + class ClientWithStdio: + transport = StdioTransport() + + assert detect_transport(ClientWithStdio()) == "pipe" + + def test_falls_back_to_fastmcp_settings(self): + """When instance has no transport attr, reads fastmcp.settings.""" + with patch( + "opentelemetry.instrumentation.fastmcp.utils.import_module" + ) as mock_import: + mock_settings = type("Settings", (), {"transport": "sse"})() + mock_import.return_value = mock_settings + assert detect_transport(object()) == "tcp" + + def test_fastmcp_settings_http(self): + """fastmcp.settings.transport='http' detected as tcp.""" + with patch( + "opentelemetry.instrumentation.fastmcp.utils.import_module" + ) as mock_import: + mock_settings = type("Settings", (), {"transport": "http"})() + mock_import.return_value = mock_settings + assert detect_transport(object()) == "tcp" + + def test_fastmcp_settings_stdio(self): + """fastmcp.settings.transport='stdio' stays as pipe.""" + with patch( + "opentelemetry.instrumentation.fastmcp.utils.import_module" + ) as mock_import: + mock_settings = type("Settings", (), {"transport": "stdio"})() + mock_import.return_value = mock_settings + assert detect_transport(object()) == "pipe" diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 39e8f271..2071e240 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -10,10 +10,22 @@ All notable changes to this repository are documented in this file. ## Version 0.1.13 +### Fixed +- **OTel context now attached in async contexts** — Removed the `_is_async_context()` guard from `_push_current_span`. Context is now always attached via `context_api.attach()` regardless of sync/async, enabling downstream instrumentations (HTTP, DB, MCP transport) to see the correct parent span. `_pop_current_span` handles detach failures gracefully for cross-task / `copy_context` scenarios. + ### Added +- **`MCPOperation` type** — New dataclass for non-tool-call MCP operations (`tools/list`, `resources/read`, `prompts/get`, etc.). Produces spans with `{mcp.method.name} {target}` naming and CLIENT/SERVER SpanKind. +- **New MCP semconv attributes** — `jsonrpc.request.id`, `rpc.response.status_code`, `mcp.resource.uri`, `gen_ai.prompt.name`, `network.protocol.name`, `network.protocol.version`, `server.address`, `server.port`, `client.address`, `client.port` on both `MCPOperation` and `MCPToolCall`. +- **`MCPRequestContext` ContextVar bridge** — Passes transport-layer metadata (jsonrpc ID, network transport) from the transport instrumentor to the server instrumentor. +- **TelemetryHandler MCPOperation lifecycle** — `start_mcp_operation`, `stop_mcp_operation`, `fail_mcp_operation` methods. +- **SpanEmitter MCPOperation dispatch** — Unified `_start_mcp_operation`/`_finish_mcp_operation`/`_error_mcp_operation` handles both `MCPToolCall` and `MCPOperation`. +- **MetricsEmitter MCPOperation dispatch** — MCP duration metrics now emitted for all MCP operations (not just tool calls), including on the error path. - Added `explicit_bucket_boundaries_advisory` to `gen_ai.evaluation.client.usage.cost` histogram with cost-appropriate bucket boundaries. ### Changed +- **`MCPToolCall` now inherits from `MCPOperation`** — MRO: `MCPToolCall → MCPOperation → ToolCall → GenAI`. All MCP transport/protocol attributes are defined on `MCPOperation` and inherited by `MCPToolCall`. +- **Renamed `mcp_server_name` → `sdot_mcp_server_name`** — SDOT-custom attribute now uses `sdot.mcp.server_name` to distinguish from OTel semconv attributes. **Breaking**: callers using `mcp_server_name=` must update. +- **Renamed `_record_mcp_tool_metrics` → `_record_mcp_operation_metrics`** — Generalized to handle all MCP operation types. - Trimmed `EvaluationMonitoringEmitter` docstring to only list metrics managed by the emitter (duration and cost). ### Fixed diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py index 6d9e5889..8fbfcc2a 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py @@ -18,6 +18,8 @@ EmbeddingInvocation, Error, LLMInvocation, + MCPOperation, + MCPToolCall, RetrievalInvocation, ToolCall, Workflow, @@ -143,35 +145,14 @@ def on_end(self, obj: Any) -> None: ttfc, attributes=metric_attrs, context=context ) return - if isinstance(obj, ToolCall): - tool_invocation = obj - metric_attrs = _get_metric_attributes( - tool_invocation.name, - None, - GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value, - tool_invocation.provider, - tool_invocation.framework, - ) - # Add agent context if available - if tool_invocation.agent_name: - metric_attrs[GenAI.GEN_AI_AGENT_NAME] = ( - tool_invocation.agent_name - ) - if tool_invocation.agent_id: - metric_attrs[GenAI.GEN_AI_AGENT_ID] = tool_invocation.agent_id - - # Add session context if configured - metric_attrs.update(get_context_metric_attributes(tool_invocation)) - - _record_duration( - self._duration_histogram, - tool_invocation, - metric_attrs, - span=getattr(tool_invocation, "span", None), - ) + if isinstance(obj, MCPOperation): + self._record_mcp_operation_metrics(obj) + if isinstance(obj, MCPToolCall) and obj.is_client: + self._record_execute_tool_metrics(obj) + return - # Record MCP-specific metrics if this is an MCP tool call - self._record_mcp_tool_metrics(tool_invocation) + if isinstance(obj, ToolCall): + self._record_execute_tool_metrics(obj) if isinstance(obj, EmbeddingInvocation): embedding_invocation = obj @@ -242,16 +223,56 @@ def on_error(self, error: Error, obj: Any) -> None: self._duration_histogram, llm_invocation, metric_attrs ) return + + if isinstance(obj, MCPOperation): + obj.is_error = True + if getattr(error, "type", None) is not None: + obj.mcp_error_type = error.type.__qualname__ + self._record_mcp_operation_metrics(obj) + if isinstance(obj, MCPToolCall) and obj.is_client: + metric_attrs = _get_metric_attributes( + None, + None, + GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value, + obj.provider, + obj.framework, + ) + metric_attrs[GenAI.GEN_AI_TOOL_NAME] = obj.name + if obj.agent_name: + metric_attrs[GenAI.GEN_AI_AGENT_NAME] = obj.agent_name + if obj.agent_id: + metric_attrs[GenAI.GEN_AI_AGENT_ID] = obj.agent_id + if getattr(error, "type", None) is not None: + metric_attrs[ErrorAttributes.ERROR_TYPE] = ( + error.type.__qualname__ + ) + metric_attrs.update(get_context_metric_attributes(obj)) + duration = obj.duration_s + if duration is None and obj.end_time is not None: + duration = obj.end_time - obj.start_time + if duration is not None: + context = None + span = getattr(obj, "span", None) + if span is not None: + try: + context = trace.set_span_in_context(span) + except (TypeError, ValueError, AttributeError): + context = None + self._duration_histogram.record( + duration, attributes=metric_attrs, context=context + ) + return + if isinstance(obj, ToolCall): tool_invocation = obj metric_attrs = _get_metric_attributes( - tool_invocation.name, + None, None, GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value, tool_invocation.provider, tool_invocation.framework, ) - # Add agent context if available + metric_attrs[GenAI.GEN_AI_TOOL_NAME] = tool_invocation.name if tool_invocation.agent_name: metric_attrs[GenAI.GEN_AI_AGENT_NAME] = ( tool_invocation.agent_name @@ -311,6 +332,7 @@ def handles(self, obj: Any) -> bool: ( LLMInvocation, ToolCall, + MCPOperation, Workflow, AgentInvocation, EmbeddingInvocation, @@ -480,83 +502,84 @@ def _record_retrieval_metrics( duration, attributes=metric_attrs, context=context ) - def _record_mcp_tool_metrics(self, tool: ToolCall) -> None: - """Record MCP-specific metrics for tool calls. + def _record_execute_tool_metrics(self, tool: ToolCall) -> None: + """Record ``gen_ai.client.operation.duration`` for an execute_tool.""" + metric_attrs = _get_metric_attributes( + None, + None, + GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value, + tool.provider, + tool.framework, + ) + metric_attrs[GenAI.GEN_AI_TOOL_NAME] = tool.name + if tool.agent_name: + metric_attrs[GenAI.GEN_AI_AGENT_NAME] = tool.agent_name + if tool.agent_id: + metric_attrs[GenAI.GEN_AI_AGENT_ID] = tool.agent_id + metric_attrs.update(get_context_metric_attributes(tool)) + _record_duration( + self._duration_histogram, + tool, + metric_attrs, + span=getattr(tool, "span", None), + ) + + def _record_mcp_operation_metrics(self, op: MCPOperation) -> None: + """Record MCP-specific metrics for any MCP operation. Per OTel semconv: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp Metrics: - mcp.client.operation.duration / mcp.server.operation.duration - mcp.tool.output.size (custom: tracks output bytes for LLM context) - - Required attributes: - - mcp.method.name - - Conditionally required (for tool calls): - - gen_ai.tool.name - - error.type (if operation fails) - - Recommended (low-cardinality): - - gen_ai.operation.name (set to "execute_tool" for tool calls) - - network.transport - - mcp.protocol.version """ - # Only emit MCP metrics if mcp_method_name is set - # (indicates this is an MCP tool call, not a generic ToolCall) - if not tool.mcp_method_name: + if not op.mcp_method_name: return - # Build MCP metric attributes per semconv - # Only low-cardinality attributes should be included mcp_attrs: dict[str, Any] = { - # Required - "mcp.method.name": tool.mcp_method_name, - # Conditionally required for tool calls - GenAI.GEN_AI_TOOL_NAME: tool.name, + "mcp.method.name": op.mcp_method_name, } - # Recommended: gen_ai.operation.name (only for tool calls) - if tool.mcp_method_name == "tools/call": - mcp_attrs[GenAI.GEN_AI_OPERATION_NAME] = ( - GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value - ) - - # Recommended: network.transport - if tool.network_transport: - mcp_attrs["network.transport"] = tool.network_transport - - # Recommended: mcp.protocol.version - if tool.mcp_protocol_version: - mcp_attrs["mcp.protocol.version"] = tool.mcp_protocol_version + if isinstance(op, MCPToolCall): + mcp_attrs[GenAI.GEN_AI_TOOL_NAME] = op.name + if op.mcp_method_name == "tools/call": + mcp_attrs[GenAI.GEN_AI_OPERATION_NAME] = ( + GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value + ) - # Conditionally required: error.type (if operation fails) - if tool.is_error: - mcp_attrs["error.type"] = "tool_error" + if op.network_transport: + mcp_attrs["network.transport"] = op.network_transport + if op.mcp_protocol_version: + mcp_attrs["mcp.protocol.version"] = op.mcp_protocol_version + if op.is_error and op.mcp_error_type: + mcp_attrs["error.type"] = op.mcp_error_type + elif op.is_error: + mcp_attrs["error.type"] = "operation_error" - # Get span context for metric correlation context = None - span = getattr(tool, "span", None) + span = getattr(op, "span", None) if span is not None: try: context = trace.set_span_in_context(span) except (ValueError, RuntimeError): context = None - # Record duration metric - duration = tool.duration_s - if duration is None and tool.end_time is not None: - duration = tool.end_time - tool.start_time + duration = op.duration_s + if duration is None and op.end_time is not None: + duration = op.end_time - op.start_time if duration is not None: - # Choose client or server histogram based on is_client flag histogram = ( self._mcp_client_operation_duration - if tool.is_client + if op.is_client else self._mcp_server_operation_duration ) histogram.record(duration, attributes=mcp_attrs, context=context) - # Record output size metric (useful for tracking LLM context growth) - if tool.output_size_bytes is not None and tool.output_size_bytes > 0: + if ( + isinstance(op, MCPToolCall) + and op.output_size_bytes is not None + and op.output_size_bytes > 0 + ): self._mcp_tool_output_size.record( - tool.output_size_bytes, attributes=mcp_attrs, context=context + op.output_size_bytes, attributes=mcp_attrs, context=context ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py index 01fc0e14..bb7304ed 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py @@ -59,6 +59,7 @@ Error, ErrorClassification, LLMInvocation, + MCPOperation, MCPToolCall, RetrievalInvocation, Step, @@ -174,6 +175,22 @@ def _apply_tool_semconv_attributes( pass +def _apply_mcp_semconv_attributes(span: Span, op: "MCPOperation") -> None: + """Apply semconv attributes from MCPOperation dataclass fields. + + Like ``_apply_tool_semconv_attributes`` but for non-tool-call MCP + operations (list, read, get, etc.). + """ + for data_field in dataclass_fields(op): + semconv_key = data_field.metadata.get("semconv") + if semconv_key: + value = getattr(op, data_field.name) + if value is not None: + sanitized = _sanitize_span_attribute_value(value) + if sanitized is not None: + span.set_attribute(semconv_key, sanitized) + + def _apply_custom_attributes( span: Span, attributes: Optional[dict[str, Any]], @@ -420,7 +437,9 @@ def on_start( self._start_agent(invocation) elif isinstance(invocation, Step): self._start_step(invocation) - # Handle existing types + # MCPOperation check before ToolCall (MCPToolCall inherits both) + elif isinstance(invocation, MCPOperation): + self._start_mcp_operation(invocation) elif isinstance(invocation, ToolCall): self._start_tool_call(invocation) elif isinstance(invocation, EmbeddingInvocation): @@ -454,6 +473,8 @@ def on_end(self, invocation: LLMInvocation | EmbeddingInvocation) -> None: self._finish_agent(invocation) elif isinstance(invocation, Step): self._finish_step(invocation) + elif isinstance(invocation, MCPOperation): + self._finish_mcp_operation(invocation) elif isinstance(invocation, ToolCall): self._finish_tool_call(invocation) elif isinstance(invocation, EmbeddingInvocation): @@ -524,6 +545,8 @@ def on_error( self._error_agent(error, invocation) elif isinstance(invocation, Step): self._error_step(error, invocation) + elif isinstance(invocation, MCPOperation): + self._error_mcp_operation(error, invocation) elif isinstance(invocation, ToolCall): self._error_tool_call(error, invocation) elif isinstance(invocation, EmbeddingInvocation): @@ -788,59 +811,88 @@ def _error_step(self, error: Error, step: Step) -> None: ) span.end() - # ---- Tool Call lifecycle --------------------------------------------- - def _start_tool_call(self, tool: ToolCall) -> None: - """Start a tool call span per execute_tool semantic conventions. - - Span name: execute_tool {gen_ai.tool.name} - Span kind: INTERNAL - See: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-spans.md#execute-tool-span + # ---- MCP operation lifecycle ------------------------------------------ + def _start_mcp_operation(self, op: MCPOperation) -> None: + """Start a span for any MCP operation (tool call or non-tool-call). - MCPToolCall instances are dispatched to _start_mcp_tool_call for - MCP semconv span naming ({mcp.method.name} {target}) and SpanKind. + Span name: ``{mcp.method.name} {target}`` where target is + ``ToolCall.name`` for ``MCPToolCall`` or ``MCPOperation.target`` + for other operations. + SpanKind: CLIENT if ``is_client`` else SERVER. + See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/ """ - if isinstance(tool, MCPToolCall): - self._start_mcp_tool_call(tool) - return + method = op.mcp_method_name or "" + if isinstance(op, MCPToolCall): + target = op.name or "" + else: + target = op.target or "" + span_name = f"{method} {target}".strip() - span_name = f"execute_tool {tool.name}" - parent_span = getattr(tool, "parent_span", None) + kind = SpanKind.CLIENT if op.is_client else SpanKind.SERVER + + parent_span = getattr(op, "parent_span", None) parent_ctx = ( trace.set_span_in_context(parent_span) if parent_span is not None else None ) + span = self._tracer.start_span( - span_name, - kind=SpanKind.INTERNAL, - context=parent_ctx, + span_name, kind=kind, context=parent_ctx ) - self._add_span_to_invocation(tool, span) + self._add_span_to_invocation(op, span) - # Required: gen_ai.operation.name = "execute_tool" - span.set_attribute( - GenAI.GEN_AI_OPERATION_NAME, - GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value, - ) + if isinstance(op, MCPToolCall): + span.set_attribute( + GenAI.GEN_AI_OPERATION_NAME, + GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value, + ) + _apply_tool_semconv_attributes(span, op, self._capture_content) + else: + _apply_mcp_semconv_attributes(span, op) - # Apply all semconv attributes from dataclass fields - # This handles gen_ai.tool.*, mcp.*, network.*, error.type - _apply_tool_semconv_attributes(span, tool, self._capture_content) + _apply_custom_attributes(span, getattr(op, "attributes", None)) - # Apply any supplemental custom attributes - _apply_custom_attributes(span, getattr(tool, "attributes", None)) + def _finish_mcp_operation(self, op: MCPOperation) -> None: + """Finish a span for any MCP operation.""" + span = op.span + if span is None: + return + is_recording = hasattr(span, "is_recording") and span.is_recording() + if not is_recording: + return - def _start_mcp_tool_call(self, tool: MCPToolCall) -> None: - """Start an MCP tool call span per MCP semantic conventions. + if isinstance(op, MCPToolCall): + _apply_tool_semconv_attributes(span, op, self._capture_content) + else: + _apply_mcp_semconv_attributes(span, op) - Span name: {mcp.method.name} {gen_ai.tool.name} (e.g. "tools/call add") - Span kind: CLIENT (client-side) or SERVER (server-side) - See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/ - """ - method = tool.mcp_method_name or "tools/call" - span_name = f"{method} {tool.name}" - kind = SpanKind.CLIENT if tool.is_client else SpanKind.SERVER + if op.rpc_response_status_code is not None: + span.set_attribute( + "rpc.response.status_code", op.rpc_response_status_code + ) + span.end() + def _error_mcp_operation(self, error: Error, op: MCPOperation) -> None: + """Handle error for any MCP operation.""" + span = op.span + if span is None: + return + self._apply_error_status(span, error) + if isinstance(op, MCPToolCall): + _apply_tool_semconv_attributes(span, op, self._capture_content) + else: + _apply_mcp_semconv_attributes(span, op) + span.end() + + # ---- Tool Call lifecycle --------------------------------------------- + def _start_tool_call(self, tool: ToolCall) -> None: + """Start a tool call span per execute_tool semantic conventions. + + Span name: execute_tool {gen_ai.tool.name} + Span kind: INTERNAL + """ + span_name = f"execute_tool {tool.name}" parent_span = getattr(tool, "parent_span", None) parent_ctx = ( trace.set_span_in_context(parent_span) @@ -849,7 +901,7 @@ def _start_mcp_tool_call(self, tool: MCPToolCall) -> None: ) span = self._tracer.start_span( span_name, - kind=kind, + kind=SpanKind.INTERNAL, context=parent_ctx, ) self._add_span_to_invocation(tool, span) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 16234097..e019adf5 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -48,7 +48,6 @@ # handler.fail_llm(invocation, Error(type="...", message="...")) """ -import asyncio import logging import os import threading @@ -96,6 +95,7 @@ def genai_debug_log(*_args: Any, **_kwargs: Any) -> None: # type: ignore GenAI, InputMessage, LLMInvocation, + MCPOperation, RetrievalInvocation, Step, ToolCall, @@ -165,15 +165,6 @@ def is_empty(self) -> bool: ) -def _is_async_context() -> bool: - """Return True when called inside a running asyncio event loop.""" - try: - asyncio.get_running_loop() - return True - except RuntimeError: - return False - - def set_genai_context( conversation_id: Optional[str] = None, properties: Optional[dict[str, Any]] = None, @@ -632,16 +623,15 @@ def _inherit_parent_span(invocation: GenAI) -> None: def _push_current_span(invocation: GenAI) -> None: """After span creation, track this span as current for child resolution. - In sync contexts, also attach to OTel context so downstream - non-GenAI instrumentations (HTTP, DB) see the correct parent. - Skipped in async contexts to avoid cross-task detach errors. + Also attach to OTel context so downstream instrumentations (HTTP, DB, + MCP transport) see the correct parent. ``_pop_current_span`` handles + detach failures gracefully for cross-task / ``copy_context`` scenarios. """ span = getattr(invocation, "span", None) if span is not None: _current_genai_span.set(span) - if not _is_async_context(): - ctx = trace.set_span_in_context(span) - invocation._otel_context_token = context_api.attach(ctx) # type: ignore[attr-defined] + ctx = trace.set_span_in_context(span) + invocation._otel_context_token = context_api.attach(ctx) # type: ignore[attr-defined] @staticmethod def _pop_current_span(invocation: GenAI) -> None: @@ -943,6 +933,39 @@ def fail_tool_call(self, invocation: ToolCall, error: Error) -> ToolCall: self._pop_current_span(invocation) return invocation + # MCPOperation lifecycle (non-tool-call MCP operations) ---------------- + def start_mcp_operation(self, op: MCPOperation) -> MCPOperation: + """Start a non-tool-call MCP operation (list, read, get, etc.).""" + _apply_genai_context(op) + if ( + not op.agent_name or not op.agent_id + ) and self._agent_context_stack: + top_name, top_id = self._agent_context_stack[-1] + if not op.agent_name: + op.agent_name = top_name + if not op.agent_id: + op.agent_id = top_id + self._inherit_parent_span(op) + self._emitter.on_start(op) + self._push_current_span(op) + return op + + def stop_mcp_operation(self, op: MCPOperation) -> MCPOperation: + """Finalize a non-tool-call MCP operation successfully.""" + op.end_time = timeit.default_timer() + self._emitter.on_end(op) + self._pop_current_span(op) + return op + + def fail_mcp_operation( + self, op: MCPOperation, error: Error + ) -> MCPOperation: + """Fail a non-tool-call MCP operation.""" + op.end_time = timeit.default_timer() + self._emitter.on_error(error, op) + self._pop_current_span(op) + return op + @staticmethod def _maybe_mark_conversation_root(entity: GenAI) -> None: """Auto-mark entity as conversation root when no parent span exists. diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index d28f4727..342a4789 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -202,57 +202,106 @@ class ToolCall(GenAI): @dataclass() -class MCPToolCall(ToolCall): - """Represents an MCP (Model Context Protocol) tool call invocation. - - Extends ToolCall with MCP-specific semantic conventions: - https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp - - MCP Semantic Convention Attributes: - - mcp.method.name: The name of the request or notification method - - mcp.session.id: Session identifier for the MCP connection - - mcp.protocol.version: MCP protocol version - - mcp.server.name: Name of the MCP server - - network.transport: Transport type ("pipe" for stdio, "tcp" for HTTP) - - Metrics-only fields (not span attributes): - - output_size_bytes: Output size for metrics tracking - - output_size_tokens: Token count for metrics tracking - - duration_s: Duration for standalone metrics emission +class MCPOperation(GenAI): + """Represents any MCP protocol operation (non-tool-call). + + Covers tools/list, resources/read, resources/list, prompts/get, + prompts/list, initialize, ping, etc. + See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp + + Span name: ``{mcp.method.name} {target}`` (target omitted when empty). + SpanKind: CLIENT if ``is_client`` else SERVER. """ - # mcp.method.name: The name of the request or notification method + target: str = "" + + # --- Required --- mcp_method_name: Optional[str] = field( default=None, metadata={"semconv": "mcp.method.name"}, ) - # network.transport: "pipe" for stdio, "tcp" for HTTP - network_transport: Optional[str] = field( + + # --- Conditionally required --- + jsonrpc_request_id: Optional[str] = field( default=None, - metadata={"semconv": "network.transport"}, + metadata={"semconv": "jsonrpc.request.id"}, ) - # mcp.session.id: Session identifier - mcp_session_id: Optional[str] = field( + mcp_resource_uri: Optional[str] = field( default=None, - metadata={"semconv": "mcp.session.id"}, + metadata={"semconv": "mcp.resource.uri"}, + ) + gen_ai_prompt_name: Optional[str] = field( + default=None, + metadata={"semconv": "gen_ai.prompt.name"}, ) - # mcp.protocol.version: MCP protocol version + rpc_response_status_code: Optional[str] = field( + default=None, + metadata={"semconv": "rpc.response.status_code"}, + ) + + # --- Recommended --- mcp_protocol_version: Optional[str] = field( default=None, metadata={"semconv": "mcp.protocol.version"}, ) - # mcp.server.name: Name of the MCP server - mcp_server_name: Optional[str] = field( + mcp_session_id: Optional[str] = field( default=None, - metadata={"semconv": "mcp.server.name"}, + metadata={"semconv": "mcp.session.id"}, ) - # Metrics-only fields (no semconv metadata - not span attributes) - output_size_bytes: Optional[int] = None - output_size_tokens: Optional[int] = None - duration_s: Optional[float] = None - # Internal state tracking (no semconv) + network_transport: Optional[str] = field( + default=None, + metadata={"semconv": "network.transport"}, + ) + network_protocol_name: Optional[str] = field( + default=None, + metadata={"semconv": "network.protocol.name"}, + ) + network_protocol_version: Optional[str] = field( + default=None, + metadata={"semconv": "network.protocol.version"}, + ) + server_address: Optional[str] = field( + default=None, + metadata={"semconv": "server.address"}, + ) + server_port: Optional[int] = field( + default=None, + metadata={"semconv": "server.port"}, + ) + client_address: Optional[str] = field( + default=None, + metadata={"semconv": "client.address"}, + ) + client_port: Optional[int] = field( + default=None, + metadata={"semconv": "client.port"}, + ) + + # --- SDOT custom (not in OTel semconv) --- + sdot_mcp_server_name: Optional[str] = field( + default=None, + metadata={"semconv": "sdot.mcp.server_name"}, + ) + + # --- Internal (no semconv) --- is_client: bool = True + duration_s: Optional[float] = None is_error: bool = False + mcp_error_type: Optional[str] = None + + +@dataclass() +class MCPToolCall(MCPOperation, ToolCall): + """MCP tool call operation (``tools/call``). + + Inherits MCP transport/protocol attributes from :class:`MCPOperation` + and tool content fields (arguments, result, etc.) from :class:`ToolCall`. + + MRO: MCPToolCall -> MCPOperation -> ToolCall -> GenAI + """ + + output_size_bytes: Optional[int] = None + output_size_tokens: Optional[int] = None @dataclass() @@ -657,6 +706,9 @@ class Step(GenAI): "AgentCreation", "AgentInvocation", "Step", + # MCP types + "MCPOperation", + "MCPToolCall", # Security semconv constant (Cisco AI Defense) - re-exported from attributes "GEN_AI_SECURITY_EVENT_ID", ] diff --git a/util/opentelemetry-util-genai/tests/test_tool_call_span_attributes.py b/util/opentelemetry-util-genai/tests/test_tool_call_span_attributes.py index 114cb4f8..bc36b403 100644 --- a/util/opentelemetry-util-genai/tests/test_tool_call_span_attributes.py +++ b/util/opentelemetry-util-genai/tests/test_tool_call_span_attributes.py @@ -1,5 +1,7 @@ -"""Tests for ToolCall and MCPToolCall span attributes, naming, and SpanKind.""" +"""Tests for ToolCall, MCPToolCall, and MCPOperation span attributes.""" +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( @@ -9,8 +11,14 @@ gen_ai_attributes as GenAI, ) from opentelemetry.trace import SpanKind +from opentelemetry.util.genai.emitters.metrics import MetricsEmitter from opentelemetry.util.genai.emitters.span import SpanEmitter -from opentelemetry.util.genai.types import MCPToolCall, ToolCall +from opentelemetry.util.genai.types import ( + Error, + MCPOperation, + MCPToolCall, + ToolCall, +) def _make_emitter(): @@ -131,7 +139,10 @@ def test_mcp_tool_call_semconv_attributes(): network_transport="pipe", mcp_session_id="sess-123", mcp_protocol_version="2025-03-26", - mcp_server_name="math-tools", + sdot_mcp_server_name="math-tools", + jsonrpc_request_id="42", + server_address="localhost", + server_port=8080, is_client=False, ) emitter.on_start(call) @@ -143,8 +154,11 @@ def test_mcp_tool_call_semconv_attributes(): assert attrs.get("network.transport") == "pipe" assert attrs.get("mcp.session.id") == "sess-123" assert attrs.get("mcp.protocol.version") == "2025-03-26" - assert attrs.get("mcp.server.name") == "math-tools" + assert attrs.get("sdot.mcp.server_name") == "math-tools" assert attrs.get("gen_ai.tool.name") == "add" + assert attrs.get("jsonrpc.request.id") == "42" + assert attrs.get("server.address") == "localhost" + assert attrs.get("server.port") == 8080 def test_mcp_tool_call_defaults_method_name(): @@ -160,4 +174,295 @@ def test_mcp_tool_call_defaults_method_name(): spans = exporter.get_finished_spans() assert len(spans) == 1 - assert spans[0].name == "tools/call add" + # When mcp_method_name is None, span name is just the tool name + assert spans[0].name == "add" + + +# --- MCPOperation tests --- + + +def test_mcp_operation_list_tools_span(): + """MCPOperation for tools/list produces correct span name and CLIENT kind.""" + emitter, exporter = _make_emitter() + op = MCPOperation( + target="", + mcp_method_name="tools/list", + network_transport="pipe", + is_client=True, + ) + emitter.on_start(op) + emitter.on_end(op) + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "tools/list" + assert spans[0].kind == SpanKind.CLIENT + + +def test_mcp_operation_resources_read_span(): + """MCPOperation for resources/read has resource URI as target.""" + emitter, exporter = _make_emitter() + op = MCPOperation( + target="file:///config.json", + mcp_method_name="resources/read", + network_transport="pipe", + mcp_resource_uri="file:///config.json", + is_client=True, + ) + emitter.on_start(op) + emitter.on_end(op) + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "resources/read file:///config.json" + assert spans[0].kind == SpanKind.CLIENT + attrs = dict(spans[0].attributes) + assert attrs.get("mcp.resource.uri") == "file:///config.json" + + +def test_mcp_operation_prompts_get_span(): + """MCPOperation for prompts/get has prompt name as target.""" + emitter, exporter = _make_emitter() + op = MCPOperation( + target="summarize", + mcp_method_name="prompts/get", + network_transport="pipe", + gen_ai_prompt_name="summarize", + is_client=False, + ) + emitter.on_start(op) + emitter.on_end(op) + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "prompts/get summarize" + assert spans[0].kind == SpanKind.SERVER + attrs = dict(spans[0].attributes) + assert attrs.get("gen_ai.prompt.name") == "summarize" + + +def test_mcp_operation_server_kind(): + """MCPOperation with is_client=False produces SERVER SpanKind.""" + emitter, exporter = _make_emitter() + op = MCPOperation( + target="", + mcp_method_name="tools/list", + is_client=False, + ) + emitter.on_start(op) + emitter.on_end(op) + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].kind == SpanKind.SERVER + + +def test_mcp_operation_new_semconv_attrs(): + """MCPOperation emits all new GAP-2 semconv attributes.""" + emitter, exporter = _make_emitter() + op = MCPOperation( + target="", + mcp_method_name="tools/list", + network_transport="tcp", + network_protocol_name="http", + network_protocol_version="2", + server_address="mcp.example.com", + server_port=443, + client_address="10.0.0.1", + client_port=54321, + mcp_session_id="sess-abc", + jsonrpc_request_id="7", + is_client=True, + ) + emitter.on_start(op) + emitter.on_end(op) + + spans = exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs.get("network.transport") == "tcp" + assert attrs.get("network.protocol.name") == "http" + assert attrs.get("network.protocol.version") == "2" + assert attrs.get("server.address") == "mcp.example.com" + assert attrs.get("server.port") == 443 + assert attrs.get("client.address") == "10.0.0.1" + assert attrs.get("client.port") == 54321 + assert attrs.get("mcp.session.id") == "sess-abc" + assert attrs.get("jsonrpc.request.id") == "7" + + +# --- MCPToolCall metrics error-path tests --- + + +def _make_metrics_emitter(): + reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[reader]) + meter = meter_provider.get_meter(__name__) + emitter = MetricsEmitter(meter=meter) + return emitter, reader, meter_provider + + +def _collect_metric_names(reader, meter_provider): + """Flush and return the set of metric names that have data points.""" + try: + meter_provider.force_flush() + except Exception: + pass + names = set() + for metric in reader.get_metrics_data().resource_metrics: + for sm in metric.scope_metrics: + for m in sm.metrics: + if hasattr(m, "data") and getattr(m.data, "data_points", None): + if len(m.data.data_points) > 0: + names.add(m.name) + return names + + +def test_mcp_tool_call_error_records_generic_duration(): + """on_error for MCPToolCall must record both MCP and generic duration.""" + emitter, reader, meter_provider = _make_metrics_emitter() + tool = MCPToolCall( + name="divide", + id="tc-1", + mcp_method_name="tools/call", + network_transport="pipe", + is_client=True, + framework="fastmcp", + ) + tool.duration_s = 0.05 + + emitter.on_error( + Error(type=ValueError, message="division by zero"), + tool, + ) + + names = _collect_metric_names(reader, meter_provider) + assert "mcp.client.operation.duration" in names, ( + "MCP-specific duration metric should be recorded on error" + ) + assert "gen_ai.client.operation.duration" in names, ( + "Generic execute-tool duration metric should also be recorded on error" + ) + + +def test_mcp_operation_error_does_not_record_generic_duration(): + """on_error for plain MCPOperation (non-tool) should only record MCP metric.""" + emitter, reader, meter_provider = _make_metrics_emitter() + op = MCPOperation( + target="", + mcp_method_name="tools/list", + network_transport="pipe", + is_client=True, + ) + op.duration_s = 0.03 + + emitter.on_error(Error(type=RuntimeError, message="oops"), op) + + names = _collect_metric_names(reader, meter_provider) + assert "mcp.client.operation.duration" in names + assert "gen_ai.client.operation.duration" not in names + + +# --- MetricsEmitter.handles() tests --- + + +def test_handles_accepts_mcp_operation(): + """MetricsEmitter.handles() returns True for plain MCPOperation.""" + emitter, _, _ = _make_metrics_emitter() + op = MCPOperation(target="", mcp_method_name="tools/list", is_client=True) + assert emitter.handles(op) is True + + +def test_handles_accepts_mcp_tool_call(): + """MetricsEmitter.handles() returns True for MCPToolCall.""" + emitter, _, _ = _make_metrics_emitter() + tc = MCPToolCall(name="add", mcp_method_name="tools/call", is_client=True) + assert emitter.handles(tc) is True + + +def test_handles_accepts_plain_tool_call(): + """MetricsEmitter.handles() returns True for plain ToolCall.""" + emitter, _, _ = _make_metrics_emitter() + tc = ToolCall(name="summarize", id="tc-1") + assert emitter.handles(tc) is True + + +# --- MCPOperation on_end metrics tests --- + + +def _collect_metric_data_points(reader, meter_provider, metric_name): + """Flush and return data points for a specific metric.""" + try: + meter_provider.force_flush() + except Exception: + pass + for resource_metrics in reader.get_metrics_data().resource_metrics: + for sm in resource_metrics.scope_metrics: + for m in sm.metrics: + if m.name == metric_name and hasattr(m, "data"): + return list(m.data.data_points) + return [] + + +def test_mcp_operation_on_end_records_client_duration(): + """on_end for plain MCPOperation records mcp.client.operation.duration.""" + emitter, reader, meter_provider = _make_metrics_emitter() + op = MCPOperation( + target="", + mcp_method_name="tools/list", + network_transport="pipe", + is_client=True, + ) + op.duration_s = 0.05 + + emitter.on_end(op) + + names = _collect_metric_names(reader, meter_provider) + assert "mcp.client.operation.duration" in names + + +def test_mcp_operation_on_end_records_server_duration(): + """on_end for server-side MCPOperation records mcp.server.operation.duration.""" + emitter, reader, meter_provider = _make_metrics_emitter() + op = MCPOperation( + target="", + mcp_method_name="resources/read", + network_transport="tcp", + is_client=False, + ) + op.duration_s = 0.02 + + emitter.on_end(op) + + names = _collect_metric_names(reader, meter_provider) + assert "mcp.server.operation.duration" in names + + +def test_mcp_operation_metrics_have_correct_method_name(): + """MCP operation metrics include mcp.method.name attribute.""" + emitter, reader, meter_provider = _make_metrics_emitter() + + for method in ( + "tools/list", + "resources/read", + "prompts/get", + "prompts/list", + ): + op = MCPOperation( + target="", + mcp_method_name=method, + network_transport="pipe", + is_client=True, + ) + op.duration_s = 0.01 + emitter.on_end(op) + + data_points = _collect_metric_data_points( + reader, meter_provider, "mcp.client.operation.duration" + ) + recorded_methods = { + dp.attributes.get("mcp.method.name") for dp in data_points + } + assert "tools/list" in recorded_methods + assert "resources/read" in recorded_methods + assert "prompts/get" in recorded_methods + assert "prompts/list" in recorded_methods