-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy path_otel.py
More file actions
37 lines (27 loc) · 1.04 KB
/
_otel.py
File metadata and controls
37 lines (27 loc) · 1.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""OpenTelemetry helpers for MCP."""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any
from opentelemetry.context import Context
from opentelemetry.propagate import extract, inject
from opentelemetry.trace import SpanKind, get_tracer
_tracer = get_tracer("mcp-python-sdk")
@contextmanager
def otel_span(
name: str,
*,
kind: str = "INTERNAL",
attributes: dict[str, Any] | None = None,
context: Context | None = None,
) -> Iterator[Any]:
"""Create an OTel span."""
span_kind = getattr(SpanKind, kind, SpanKind.INTERNAL)
with _tracer.start_as_current_span(name, kind=span_kind, attributes=attributes, context=context) as span:
yield span
def inject_trace_context(meta: dict[str, Any]) -> None:
"""Inject W3C trace context (traceparent/tracestate) into a `_meta` dict."""
inject(meta)
def extract_trace_context(meta: dict[str, Any]) -> Context:
"""Extract W3C trace context from a `_meta` dict."""
return extract(meta)