Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/opentelemetry-instrumentation-mcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,21 @@ To disable logging, set the `TRACELOOP_TRACE_CONTENT` environment variable to `f
```bash
TRACELOOP_TRACE_CONTENT=false
```

### What is and isn't gated

The `TRACELOOP_TRACE_CONTENT` flag only gates **content**. Operational **metadata** is
always traced so that spans remain useful for monitoring even when content capture is off.

| Always traced (metadata) | Gated behind `TRACELOOP_TRACE_CONTENT=true` (content) |
| --- | --- |
| Method / tool name (span name and `traceloop.entity.name`) | Tool arguments / prompt (`traceloop.entity.input`) |
| Span kind (`traceloop.span.kind`) | Tool / method result (`traceloop.entity.output`) |
| Request id (`mcp.request.id`) | Serialized response payload (`mcp.response.value`) |
| Duration (span start/end) | Error message text (span status description, recorded exception) |
| Status code (`OK` / `ERROR`) | |
| Error class (`error.type`) | |

When content is disabled, a failed call still produces an `ERROR` span carrying its
`error.type`, but the status description is replaced with a generic, content-free message so
that the tool's error text and arguments cannot leak through it.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from wrapt import register_post_import_hook, wrap_function_wrapper

from .utils import dont_throw
from .utils import dont_throw, should_send_prompts, SUPPRESSED_ERROR_DESCRIPTION


class FastMCPInstrumentor:
Expand Down Expand Up @@ -81,16 +81,27 @@ async def traced_method(wrapped, instance, args, kwargs):

entity_name = tool_key if tool_key else "unknown_tool"

# Create parent server.mcp span
with self._tracer.start_as_current_span("mcp.server") as mcp_span:
# Create parent server.mcp span. Disable the SDK's automatic
# exception recording/status so that, when content tracing is off,
# the exception message and stacktrace (which can echo tool content)
# are not leaked onto the span - we set status/record explicitly below.
with self._tracer.start_as_current_span(
"mcp.server",
record_exception=False,
set_status_on_exception=False,
) as mcp_span:
mcp_span.set_attribute(SpanAttributes.TRACELOOP_SPAN_KIND, "server")
mcp_span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, "mcp.server")
if self._server_name:
mcp_span.set_attribute(SpanAttributes.TRACELOOP_WORKFLOW_NAME, self._server_name)

# Create nested tool span
span_name = f"{entity_name}.tool"
with self._tracer.start_as_current_span(span_name) as tool_span:
with self._tracer.start_as_current_span(
span_name,
record_exception=False,
set_status_on_exception=False,
) as tool_span:
tool_span.set_attribute(SpanAttributes.TRACELOOP_SPAN_KIND, TraceloopSpanKindValues.TOOL.value)
tool_span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, entity_name)
if self._server_name:
Expand All @@ -111,13 +122,21 @@ async def traced_method(wrapped, instance, args, kwargs):
try:
result = await wrapped(*args, **kwargs)
except Exception as e:
# The exception message/stacktrace can echo tool content
# (the tool's error text or arguments), so only record it
# when content tracing is enabled. The ERROR status and
# error type are kept either way so failures stay visible.
tool_span.set_attribute(ERROR_TYPE, type(e).__name__)
tool_span.record_exception(e)
tool_span.set_status(Status(StatusCode.ERROR, str(e)))

mcp_span.set_attribute(ERROR_TYPE, type(e).__name__)
mcp_span.record_exception(e)
mcp_span.set_status(Status(StatusCode.ERROR, str(e)))
if self._should_send_prompts():
tool_span.record_exception(e)
tool_span.set_status(Status(StatusCode.ERROR, str(e)))
mcp_span.record_exception(e)
mcp_span.set_status(Status(StatusCode.ERROR, str(e)))
else:
suppressed = f"{type(e).__name__} {SUPPRESSED_ERROR_DESCRIPTION}"
tool_span.set_status(Status(StatusCode.ERROR, suppressed))
mcp_span.set_status(Status(StatusCode.ERROR, suppressed))
raise

try:
Expand Down Expand Up @@ -153,11 +172,8 @@ async def traced_method(wrapped, instance, args, kwargs):

return traced_method

def _should_send_prompts(self):
"""Check if content tracing is enabled (matches traceloop SDK)"""
return (
os.getenv("TRACELOOP_TRACE_CONTENT") or "true"
).lower() == "true"
def _should_send_prompts(self) -> bool:
return should_send_prompts()

def _get_json_encoder(self):
"""Get JSON encoder class (simplified - traceloop SDK uses custom JSONEncoder)"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE

from opentelemetry.instrumentation.mcp.version import __version__
from opentelemetry.instrumentation.mcp.utils import dont_throw, Config
from opentelemetry.instrumentation.mcp.utils import (
dont_throw,
Config,
should_send_prompts,
SUPPRESSED_ERROR_DESCRIPTION,
)
from opentelemetry.instrumentation.mcp.fastmcp_instrumentation import (
FastMCPInstrumentor,
)
Expand Down Expand Up @@ -118,6 +123,9 @@ def _uninstrument(self, **kwargs):
unwrap("mcp.server.stdio", "stdio_server")
self._fastmcp_instrumentor.uninstrument()

def _should_send_prompts(self) -> bool:
return should_send_prompts()

def _transport_wrapper(self, tracer):
@asynccontextmanager
async def traced_method(
Expand Down Expand Up @@ -282,35 +290,51 @@ async def _handle_tool_call(self, tracer, method, params, args, kwargs, wrapped)
except Exception:
pass

with tracer.start_as_current_span(span_name) as span:
# Disable the SDK's automatic exception recording/status so that, when
# content tracing is off, the exception message and stacktrace (which can
# echo tool content) are not leaked onto the span on re-raise - we set
# status/record explicitly in _execute_and_handle_result.
with tracer.start_as_current_span(
span_name,
record_exception=False,
set_status_on_exception=False,
) as span:
# Set tool-specific attributes
span.set_attribute(
SpanAttributes.TRACELOOP_SPAN_KIND, TraceloopSpanKindValues.TOOL.value
)
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, entity_name)

# Add input
clean_input = self._extract_clean_input(method, params)
if clean_input:
try:
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_INPUT, json.dumps(clean_input)
)
except (TypeError, ValueError):
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_INPUT, str(clean_input)
)
if self._should_send_prompts():
clean_input = self._extract_clean_input(method, params)
if clean_input:
try:
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_INPUT, json.dumps(clean_input)
)
except (TypeError, ValueError):
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_INPUT, str(clean_input)
)

return await self._execute_and_handle_result(
span, method, args, kwargs, wrapped, clean_output=True
)

async def _handle_mcp_method(self, tracer, method, args, kwargs, wrapped):
"""Handle non-tool MCP methods with simple serialization"""
with tracer.start_as_current_span(f"{method}.mcp") as span:
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_INPUT, f"{serialize(args[0])}"
)
# See _handle_tool_call: disable the SDK's automatic exception
# recording/status so suppressed error content is not re-leaked on re-raise.
with tracer.start_as_current_span(
f"{method}.mcp",
record_exception=False,
set_status_on_exception=False,
) as span:
if self._should_send_prompts():
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_INPUT, f"{serialize(args[0])}"
)
return await self._execute_and_handle_result(
span, method, args, kwargs, wrapped, clean_output=False
)
Expand All @@ -322,37 +346,51 @@ async def _execute_and_handle_result(
try:
result = await wrapped(*args, **kwargs)
# Add output
if clean_output:
clean_output_data = self._extract_clean_output(method, result)
if clean_output_data:
try:
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
json.dumps(clean_output_data),
)
except (TypeError, ValueError):
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
str(clean_output_data),
)
else:
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT, serialize(result)
)
if self._should_send_prompts():
if clean_output:
clean_output_data = self._extract_clean_output(method, result)
if clean_output_data:
try:
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
json.dumps(clean_output_data),
)
except (TypeError, ValueError):
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
str(clean_output_data),
)
else:
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT, serialize(result)
)
# Handle errors
if hasattr(result, "isError") and result.isError:
span.set_attribute(ERROR_TYPE, "tool_error")
if len(result.content) > 0:
if self._should_send_prompts() and len(result.content) > 0:
span.set_status(
Status(StatusCode.ERROR, f"{result.content[0].text}")
)
else:
span.set_status(
Status(StatusCode.ERROR, SUPPRESSED_ERROR_DESCRIPTION)
)
else:
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
span.set_attribute(ERROR_TYPE, type(e).__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
# The exception message/stacktrace can echo tool content (e.g. the
# tool's error text or arguments), so only record it when content
# tracing is enabled. The ERROR status and error type are kept either
# way so failures remain visible.
if self._should_send_prompts():
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
else:
span.set_status(
Status(StatusCode.ERROR, f"{type(e).__name__} {SUPPRESSED_ERROR_DESCRIPTION}")
)
raise

def _extract_clean_input(self, method: str, params: Any) -> dict:
Expand Down Expand Up @@ -565,17 +603,23 @@ async def send(self, item: Any) -> Any:

with self._tracer.start_as_current_span("ResponseStreamWriter") as span:
if hasattr(request, "result"):
span.set_attribute(
SpanAttributes.MCP_RESPONSE_VALUE, f"{serialize(request.result)}"
)
if should_send_prompts():
span.set_attribute(
SpanAttributes.MCP_RESPONSE_VALUE, f"{serialize(request.result)}"
)
if "isError" in request.result:
if request.result["isError"] is True:
span.set_status(
Status(
StatusCode.ERROR,
f"{request.result['content'][0]['text']}",
if should_send_prompts():
span.set_status(
Status(
StatusCode.ERROR,
f"{request.result['content'][0]['text']}",
)
)
else:
span.set_status(
Status(StatusCode.ERROR, SUPPRESSED_ERROR_DESCRIPTION)
)
)
if hasattr(request, "id"):
span.set_attribute(SpanAttributes.MCP_REQUEST_ID, f"{request.id}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,24 @@

import asyncio
import logging
import os
import traceback


class Config:
exception_logger = None


# Generic, content-free status description used when TRACELOOP_TRACE_CONTENT is
# disabled, so error spans still convey that a call failed without leaking the
# tool's error message or arguments.
SUPPRESSED_ERROR_DESCRIPTION = "error (content suppressed by TRACELOOP_TRACE_CONTENT)"


def should_send_prompts() -> bool:
return (os.getenv("TRACELOOP_TRACE_CONTENT") or "true").lower() == "true"


def dont_throw(func):
"""
A decorator that wraps the passed in function and logs exceptions instead of throwing them.
Expand Down
Loading