diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/pyproject.toml index 46edd7ec..fb00cab2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ "Programming Language :: Python :: 3.13", ] dependencies = [ - "opentelemetry-api ~= 1.38.0.dev0", + "opentelemetry-api ~= 1.41.0.dev0", "opentelemetry-instrumentation ~= 0.59b0.dev0", "opentelemetry-semantic-conventions ~= 0.59b0.dev0", "splunk-otel-util-genai>=0.1.4", diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/instrumentation.py index 5afec17a..6445da29 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/instrumentation.py @@ -95,6 +95,18 @@ def _instrument(self, **kwargs): meter_provider = kwargs.get("meter_provider") logger_provider = kwargs.get("logger_provider") + if tracer_provider is None: + from opentelemetry import trace + tracer_provider = trace.get_tracer_provider() + + if meter_provider is None: + from opentelemetry import metrics + meter_provider = metrics.get_meter_provider() + + if logger_provider is None: + from opentelemetry import _logs + logger_provider = _logs.get_logger_provider() + # Get the telemetry handler from util-genai self._telemetry_handler = get_telemetry_handler( tracer_provider=tracer_provider, diff --git a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/transport_instrumentor.py b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/transport_instrumentor.py index 8d0d4fad..12c105e2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/transport_instrumentor.py +++ b/instrumentation-genai/opentelemetry-instrumentation-fastmcp/src/opentelemetry/instrumentation/fastmcp/transport_instrumentor.py @@ -183,11 +183,15 @@ async def traced_handle_request(wrapped, instance, args, kwargs) -> Any: if tracestate: carrier["tracestate"] = tracestate + baggage = getattr(request_meta, "baggage", None) + if baggage: + carrier["baggage"] = baggage + # Also try model_extra for pydantic v2 extra fields if not carrier and hasattr(request_meta, "model_extra"): extra = request_meta.model_extra if extra: - for key in ["traceparent", "tracestate"]: + for key in ["traceparent", "tracestate", "baggage"]: if key in extra: carrier[key] = extra[key] @@ -195,7 +199,7 @@ async def traced_handle_request(wrapped, instance, args, kwargs) -> Any: ctx = propagate.extract(carrier) token = context.attach(ctx) _LOGGER.debug( - f"Attached trace context in _handle_request: " + f"Attached trace context and baggage in _handle_request: " f"carrier={carrier}" ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py index e2346154..c3a17bfc 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py @@ -586,8 +586,8 @@ def main(manual_instrumentation: bool = False) -> None: else: print("🔑 Using standard OPENAI_API_KEY authentication") - if manual_instrumentation: - _configure_manual_instrumentation() + #if manual_instrumentation: + _configure_manual_instrumentation() session_id = str(uuid4()) user_request = ( diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/requirements.txt index 91a83b80..c6fae499 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/requirements.txt +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/requirements.txt @@ -5,9 +5,11 @@ python-dotenv>=1.0.0 deepeval litellm +opentelemetry-exporter-otlp-proto-grpc>=1.38.0 + # Splunk OpenTelemetry GenAI packages -splunk-otel-util-genai==0.1.9 -splunk-otel-util-genai-evals==0.1.7 -splunk-otel-genai-evals-deepeval==0.1.12 -splunk-otel-genai-emitters-splunk==0.1.6 -splunk-otel-instrumentation-langchain==0.1.7 +#splunk-otel-util-genai==0.1.9 +#splunk-otel-util-genai-evals==0.1.7 +#splunk-otel-genai-evals-deepeval==0.1.12 +#splunk-otel-genai-emitters-splunk==0.1.6 +#splunk-otel-instrumentation-langchain==0.1.7 diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/main.py index ee410716..2b220e26 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/main.py @@ -639,8 +639,8 @@ def main(): sys.exit(1) # Configure manual instrumentation if requested - if args.manual_instrumentation: - _configure_manual_instrumentation(config) + #if args.manual_instrumentation: + _configure_manual_instrumentation(config) # Set up OpenTelemetry environment os.environ.setdefault("OTEL_SERVICE_NAME", config.otel_service_name) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/mcp_tools/investigation_agent_mcp.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/mcp_tools/investigation_agent_mcp.py index 5fc8dca0..eed7c239 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/mcp_tools/investigation_agent_mcp.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/mcp_tools/investigation_agent_mcp.py @@ -40,72 +40,114 @@ from agents import investigation_agent # noqa: E402 from config import Config # noqa: E402 -mcp = FastMCP("investigation-agent") - - -@mcp.tool() -async def investigate_incident( - service_id: str, - investigation_checklist: str, - scenario_id: str = None, -) -> dict: - """Investigate an incident by querying metrics, logs, and traces. - - This tool exposes the Investigation Agent as an MCP tool that can be called - by other agents or external systems. - - Args: - service_id: The service identifier to investigate - investigation_checklist: JSON string with investigation steps - scenario_id: Optional scenario ID for seeded data - - Returns: - Dict containing investigation results with hypotheses and evidence - """ - # Create a minimal state for the agent - config = Config.from_env() - if scenario_id: - config.scenario_id = scenario_id - - state = { - "service_id": service_id, - "scenario_id": scenario_id, - "session_id": f"mcp-{asyncio.get_event_loop().time()}", - "triage_result": { - "investigation_checklist": json.loads(investigation_checklist) - if isinstance(investigation_checklist, str) - else investigation_checklist, - }, - "current_agent": "investigation", - "hypotheses": [], - "confidence_score": 0.0, - "eval_metrics": {}, - } - - try: - # Run investigation agent - updated_state = investigation_agent(state, config) - - # Extract results - investigation_result = updated_state.get("investigation_result", {}) - hypotheses = updated_state.get("hypotheses", []) - confidence_score = updated_state.get("confidence_score", 0.0) - - return { - "status": "success", - "service_id": service_id, - "hypotheses": hypotheses, - "investigation_result": investigation_result, - "confidence_score": confidence_score, - "evidence_count": sum(len(h.get("evidence", [])) for h in hypotheses), - } - except Exception as e: - return { - "status": "error", - "error": str(e), + +def _configure_manual_instrumentation(): + """Configure manual OpenTelemetry instrumentation.""" + + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry import _events, _logs, metrics, trace + from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + from opentelemetry.sdk._events import EventLoggerProvider + from opentelemetry.sdk._logs import LoggerProvider + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(OTLPSpanExporter()) + ) + + metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter()) + metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader])) + + _logs.set_logger_provider(LoggerProvider()) + _logs.get_logger_provider().add_log_record_processor( + BatchLogRecordProcessor(OTLPLogExporter()) + ) + _events.set_event_logger_provider(EventLoggerProvider()) + + from opentelemetry.instrumentation.langchain import LangchainInstrumentor + instrumentor = LangchainInstrumentor() + instrumentor.instrument() + + from opentelemetry.instrumentation.fastmcp import FastMCPInstrumentor + instrumentor2 = FastMCPInstrumentor() + instrumentor2.instrument() + +if __name__ == "__main__": + _configure_manual_instrumentation() + mcp = FastMCP("investigation-agent") + + + @mcp.tool() + async def investigate_incident( + service_id: str, + investigation_checklist: str, + scenario_id: str = None, + ) -> dict: + """Investigate an incident by querying metrics, logs, and traces. + + This tool exposes the Investigation Agent as an MCP tool that can be called + by other agents or external systems. + + Args: + service_id: The service identifier to investigate + investigation_checklist: JSON string with investigation steps + scenario_id: Optional scenario ID for seeded data + + Returns: + Dict containing investigation results with hypotheses and evidence + """ + # Create a minimal state for the agent + config = Config.from_env() + if scenario_id: + config.scenario_id = scenario_id + + state = { "service_id": service_id, + "scenario_id": scenario_id, + "session_id": f"mcp-{asyncio.get_event_loop().time()}", + "triage_result": { + "investigation_checklist": json.loads(investigation_checklist) + if isinstance(investigation_checklist, str) + else investigation_checklist, + }, + "current_agent": "investigation", + "hypotheses": [], + "confidence_score": 0.0, + "eval_metrics": {}, } + try: + # Run investigation agent + updated_state = investigation_agent(state, config) + + # Extract results + investigation_result = updated_state.get("investigation_result", {}) + hypotheses = updated_state.get("hypotheses", []) + confidence_score = updated_state.get("confidence_score", 0.0) + + return { + "status": "success", + "service_id": service_id, + "hypotheses": hypotheses, + "investigation_result": investigation_result, + "confidence_score": confidence_score, + "evidence_count": sum(len(h.get("evidence", [])) for h in hypotheses), + } + except Exception as e: + return { + "status": "error", + "error": str(e), + "service_id": service_id, + } -if __name__ == "__main__": mcp.run(transport="stdio", show_banner=False) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/tools.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/tools.py index 910836f7..3cc0780c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/tools.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/sre_incident_copilot/tools.py @@ -366,6 +366,12 @@ def notifier(message: str, channel: str = "incidents") -> str: } return json.dumps(notification, indent=2) +def _configure_manual_instrumentation(): + """Configure manual OpenTelemetry instrumentation.""" + + from opentelemetry.instrumentation.fastmcp import FastMCPInstrumentor + instrumentor2 = FastMCPInstrumentor() + instrumentor2.instrument() @tool def investigation_agent_mcp( @@ -383,7 +389,7 @@ def investigation_agent_mcp( Returns: JSON string with investigation results """ - + _configure_manual_instrumentation() mcp_script_path = os.path.join( os.path.dirname(__file__), "mcp_tools", "investigation_agent_mcp.py" ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml index 03c6b111..273a96f9 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ "Programming Language :: Python :: 3.13", ] dependencies = [ - "opentelemetry-api ~= 1.38.0.dev0", + "opentelemetry-api ~= 1.41.0.dev0", "opentelemetry-instrumentation ~= 0.59b0.dev0", "opentelemetry-semantic-conventions ~= 0.59b0.dev0", "splunk-otel-util-genai>=0.1.4", diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py index 9cb6ec2e..b1ac6346 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py @@ -69,6 +69,18 @@ def _instrument(self, **kwargs): meter_provider = kwargs.get("meter_provider") logger_provider = kwargs.get("logger_provider") + if tracer_provider is None: + from opentelemetry import trace + tracer_provider = trace.get_tracer_provider() + + if meter_provider is None: + from opentelemetry import metrics + meter_provider = metrics.get_meter_provider() + + if logger_provider is None: + from opentelemetry import _logs + logger_provider = _logs.get_logger_provider() + self._telemetry_handler = get_telemetry_handler( tracer_provider=tracer_provider, meter_provider=meter_provider, @@ -83,9 +95,9 @@ def _instrument(self, **kwargs): self._callback_handler = langchainCallbackHandler wrap_function_wrapper( - module="langchain_core.callbacks", - name="BaseCallbackManager.__init__", - wrapper=_BaseCallbackManagerInitWrapper(langchainCallbackHandler), + "langchain_core.callbacks", + "BaseCallbackManager.__init__", + _BaseCallbackManagerInitWrapper(langchainCallbackHandler), ) if not self.disable_trace_context_propagation: @@ -99,66 +111,66 @@ def _wrap_openai_functions_for_tracing(self, langchainCallbackHandler): if is_package_available("langchain_community"): # Wrap langchain_community.llms.openai.BaseOpenAI wrap_function_wrapper( - module="langchain_community.llms.openai", - name="BaseOpenAI._generate", - wrapper=openai_tracing_wrapper, + "langchain_community.llms.openai", + "BaseOpenAI._generate", + openai_tracing_wrapper, ) wrap_function_wrapper( - module="langchain_community.llms.openai", - name="BaseOpenAI._agenerate", - wrapper=openai_tracing_wrapper, + "langchain_community.llms.openai", + "BaseOpenAI._agenerate", + openai_tracing_wrapper, ) wrap_function_wrapper( - module="langchain_community.llms.openai", - name="BaseOpenAI._stream", - wrapper=openai_tracing_wrapper, + "langchain_community.llms.openai", + "BaseOpenAI._stream", + openai_tracing_wrapper, ) wrap_function_wrapper( - module="langchain_community.llms.openai", - name="BaseOpenAI._astream", - wrapper=openai_tracing_wrapper, + "langchain_community.llms.openai", + "BaseOpenAI._astream", + openai_tracing_wrapper, ) if is_package_available("langchain_openai"): # Wrap langchain_openai.llms.base.BaseOpenAI wrap_function_wrapper( - module="langchain_openai.llms.base", - name="BaseOpenAI._generate", - wrapper=openai_tracing_wrapper, + "langchain_openai.llms.base", + "BaseOpenAI._generate", + openai_tracing_wrapper, ) wrap_function_wrapper( - module="langchain_openai.llms.base", - name="BaseOpenAI._agenerate", - wrapper=openai_tracing_wrapper, + "langchain_openai.llms.base", + "BaseOpenAI._agenerate", + openai_tracing_wrapper, ) wrap_function_wrapper( - module="langchain_openai.llms.base", - name="BaseOpenAI._stream", - wrapper=openai_tracing_wrapper, + "langchain_openai.llms.base", + "BaseOpenAI._stream", + openai_tracing_wrapper, ) wrap_function_wrapper( - module="langchain_openai.llms.base", - name="BaseOpenAI._astream", - wrapper=openai_tracing_wrapper, + "langchain_openai.llms.base", + "BaseOpenAI._astream", + openai_tracing_wrapper, ) # langchain_openai.chat_models.base.BaseOpenAI wrap_function_wrapper( - module="langchain_openai.chat_models.base", - name="BaseChatOpenAI._generate", - wrapper=openai_tracing_wrapper, + "langchain_openai.chat_models.base", + "BaseChatOpenAI._generate", + openai_tracing_wrapper, ) wrap_function_wrapper( - module="langchain_openai.chat_models.base", - name="BaseChatOpenAI._agenerate", - wrapper=openai_tracing_wrapper, + "langchain_openai.chat_models.base", + "BaseChatOpenAI._agenerate", + openai_tracing_wrapper, ) # Doesn't work :( @@ -281,9 +293,9 @@ def _embed_query_wrapper(wrapped, instance, args, kwargs): continue wrap_function_wrapper( - module=module, - name=f"{class_name}.{method}", - wrapper=wrapper, + module, + f"{class_name}.{method}", + wrapper, ) except Exception: # pragma: no cover pass diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index a13e5108..44423bfa 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -499,8 +499,9 @@ def on_chain_start( force_workflow = self._handler.should_use_workflow_root( workflow_name=workflow_name_override ) - - if force_workflow: + from opentelemetry import baggage + workflow_name = baggage.get_baggage("workflow.name") + if force_workflow and not workflow_name: wf = Workflow(name=workflow_name_override or name, attributes=attrs) wf.input_messages = command_input_messages or _make_input_message( inputs @@ -1041,4 +1042,4 @@ def on_retriever_error( parent_run_id: Optional[UUID] = None, **_: Any, ) -> None: - self._fail(run_id, error) + self._fail(run_id, error) \ No newline at end of file diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py index 306e836a..4f2bcf1d 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py @@ -654,6 +654,7 @@ def _start_agent(self, agent: AgentCreation | AgentInvocation) -> None: ) if semconv_subset: _apply_gen_ai_semconv_attributes(span, semconv_subset) + span.set_attribute(GEN_AI_WORKFLOW_NAME, agent.workflow_name) def _finish_agent(self, agent: AgentCreation | AgentInvocation) -> None: """Finish an agent span.""" @@ -725,6 +726,7 @@ def _start_step(self, step: Step) -> None: _apply_gen_ai_semconv_attributes( span, step.semantic_convention_attributes() ) + span.set_attribute(GEN_AI_WORKFLOW_NAME, step.workflow_name) def _finish_step(self, step: Step) -> None: """Finish a step span.""" @@ -793,6 +795,7 @@ def _start_tool_call(self, tool: ToolCall) -> None: # Apply any supplemental custom attributes _apply_custom_attributes(span, getattr(tool, "attributes", None)) + span.set_attribute(GEN_AI_WORKFLOW_NAME, tool.workflow_name) def _finish_tool_call(self, tool: ToolCall) -> None: """Finish a tool call span.""" @@ -845,6 +848,7 @@ def _start_embedding(self, embedding: EmbeddingInvocation) -> None: span.set_attribute( GEN_AI_EMBEDDINGS_INPUT_TEXTS, embedding.input_texts ) + span.set_attribute(GEN_AI_WORKFLOW_NAME, embedding.workflow_name) def _finish_embedding(self, embedding: EmbeddingInvocation) -> None: """Finish an embedding span.""" @@ -905,6 +909,7 @@ def _start_retrieval(self, retrieval: RetrievalInvocation) -> None: span.set_attribute(GEN_AI_RETRIEVAL_TOP_K, retrieval.top_k) if self._capture_content and retrieval.query: span.set_attribute(GEN_AI_RETRIEVAL_QUERY_TEXT, retrieval.query) + span.set_attribute(GEN_AI_WORKFLOW_NAME, retrieval.workflow_name) def _finish_retrieval(self, retrieval: RetrievalInvocation) -> None: """Finish a retrieval span.""" diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 17ac8d40..7f8be697 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -67,6 +67,7 @@ def genai_debug_log(*_args: Any, **_kwargs: Any) -> None: # type: ignore from opentelemetry import _events as _otel_events +from opentelemetry import baggage from opentelemetry import context as context_api from opentelemetry import trace from opentelemetry._logs import Logger, LoggerProvider, get_logger @@ -163,6 +164,27 @@ def is_empty(self) -> bool: "_current_genai_span", default=None ) +# Context-scoped attributes (PR #4931 pattern). +# These replace baggage as the primary in-process propagation mechanism. +# In-process only — not propagated across wire boundaries. +_ctx_workflow_name: ContextVar[Optional[str]] = ContextVar( + "genai_workflow_name", default=None +) +_ctx_agent_name: ContextVar[Optional[str]] = ContextVar( + "genai_agent_name", default=None +) +_ctx_agent_id: ContextVar[Optional[str]] = ContextVar( + "genai_agent_id", default=None +) + + +def _get_ctx_attr(ctx_var: ContextVar[Optional[str]], baggage_key: str) -> Optional[str]: + """Read from context-scoped var first, fall back to baggage.""" + value = ctx_var.get() + if value is None: + value = baggage.get_baggage(baggage_key) + return value + def _is_async_context() -> bool: """Return True when called inside a running asyncio event loop.""" @@ -521,7 +543,6 @@ def _get_eval_histogram(canonical_name: str): self._evaluation_manager = None # Active agent identity stack (name, id) for implicit propagation to nested operations # agent_id may be None if not provided by instrumentation - self._agent_context_stack: list[tuple[str, Optional[str]]] = [] self._initialize_default_callbacks() self._initialized = True @@ -628,6 +649,18 @@ def _push_current_span(invocation: GenAI) -> None: _current_genai_span.set(span) if not _is_async_context(): ctx = trace.set_span_in_context(span) + if isinstance(invocation, Workflow): + ctx = baggage.set_baggage("workflow.name", invocation.name, context=ctx) + invocation._ctx_tokens = [ # type: ignore[attr-defined] + (_ctx_workflow_name, _ctx_workflow_name.set(invocation.name)), + ] + if isinstance(invocation, AgentInvocation): + ctx = baggage.set_baggage("agent.name", invocation.agent_name, context=ctx) + ctx = baggage.set_baggage("agent.id", invocation.agent_id, context=ctx) + invocation._ctx_tokens = [ # type: ignore[attr-defined] + (_ctx_agent_name, _ctx_agent_name.set(invocation.agent_name)), + (_ctx_agent_id, _ctx_agent_id.set(invocation.agent_id)), + ] invocation._otel_context_token = context_api.attach(ctx) # type: ignore[attr-defined] @staticmethod @@ -648,6 +681,11 @@ def _pop_current_span(invocation: GenAI) -> None: context_api._RUNTIME_CONTEXT.detach(token) # type: ignore[attr-defined] except Exception: # noqa: BLE001 pass + for ctx_var, ctx_token in getattr(invocation, "_ctx_tokens", []): + try: + ctx_var.reset(ctx_token) + except Exception: # noqa: BLE001 + pass def start_llm( self, @@ -660,14 +698,10 @@ def start_llm( # Apply GenAI context from contextvars if not already set _apply_genai_context(invocation) # Implicit agent inheritance - if ( - not invocation.agent_name or not invocation.agent_id - ) and self._agent_context_stack: - top_name, top_id = self._agent_context_stack[-1] - if not invocation.agent_name: - invocation.agent_name = top_name - if not invocation.agent_id: - invocation.agent_id = top_id + invocation.workflow_name = _get_ctx_attr(_ctx_workflow_name, "workflow.name") + invocation.agent_name = _get_ctx_attr(_ctx_agent_name, "agent.name") + invocation.agent_id = _get_ctx_attr(_ctx_agent_id, "agent.id") + self._inherit_parent_span(invocation) self._emitter.on_start(invocation) self._push_current_span(invocation) @@ -773,14 +807,11 @@ def start_embedding( self._refresh_capture_content() # Apply GenAI context from contextvars if not already set _apply_genai_context(invocation) - if ( - not invocation.agent_name or not invocation.agent_id - ) and self._agent_context_stack: - top_name, top_id = self._agent_context_stack[-1] - if not invocation.agent_name: - invocation.agent_name = top_name - if not invocation.agent_id: - invocation.agent_id = top_id + # Implicit agent inheritance + invocation.workflow_name = _get_ctx_attr(_ctx_workflow_name, "workflow.name") + invocation.agent_name = _get_ctx_attr(_ctx_agent_name, "agent.name") + invocation.agent_id = _get_ctx_attr(_ctx_agent_id, "agent.id") + invocation.start_time = time.time() self._inherit_parent_span(invocation) self._emitter.on_start(invocation) @@ -836,14 +867,11 @@ def start_retrieval( self._refresh_capture_content() # Apply GenAI context from contextvars if not already set _apply_genai_context(invocation) - if ( - not invocation.agent_name or not invocation.agent_id - ) and self._agent_context_stack: - top_name, top_id = self._agent_context_stack[-1] - if not invocation.agent_name: - invocation.agent_name = top_name - if not invocation.agent_id: - invocation.agent_id = top_id + # Implicit agent inheritance + invocation.workflow_name = _get_ctx_attr(_ctx_workflow_name, "workflow.name") + invocation.agent_name = _get_ctx_attr(_ctx_agent_name, "agent.name") + invocation.agent_id = _get_ctx_attr(_ctx_agent_id, "agent.id") + invocation.start_time = time.time() self._inherit_parent_span(invocation) self._emitter.on_start(invocation) @@ -896,14 +924,11 @@ def fail_retrieval( def start_tool_call(self, invocation: ToolCall) -> ToolCall: """Start a tool call invocation and create a pending span entry.""" _apply_genai_context(invocation) - if ( - not invocation.agent_name or not invocation.agent_id - ) and self._agent_context_stack: - top_name, top_id = self._agent_context_stack[-1] - if not invocation.agent_name: - invocation.agent_name = top_name - if not invocation.agent_id: - invocation.agent_id = top_id + # Implicit agent inheritance + invocation.workflow_name = _get_ctx_attr(_ctx_workflow_name, "workflow.name") + invocation.agent_name = _get_ctx_attr(_ctx_agent_name, "agent.name") + invocation.agent_id = _get_ctx_attr(_ctx_agent_id, "agent.id") + self._inherit_parent_span(invocation) self._emitter.on_start(invocation) self._push_current_span(invocation) @@ -1227,19 +1252,14 @@ def start_agent( """Start an agent operation (create or invoke) and create a pending span entry.""" self._refresh_capture_content() _apply_genai_context(agent) + # Implicit workflow inheritance + agent.workflow_name = _get_ctx_attr(_ctx_workflow_name, "workflow.name") + self._inherit_parent_span(agent) self._maybe_mark_conversation_root(agent) self._emitter.on_start(agent) self._push_current_span(agent) - # Push agent identity context - if isinstance(agent, AgentInvocation): - try: - if agent.name: - self._agent_context_stack.append( - (agent.name, agent.agent_id) - ) - except Exception: # pragma: no cover - defensive - pass + return agent def stop_agent( @@ -1263,15 +1283,7 @@ def stop_agent( self._meter_provider.force_flush() # type: ignore[attr-defined] except Exception: pass - # Pop context if matches top - if isinstance(agent, AgentInvocation): - try: - if self._agent_context_stack and agent.agent_id is not None: - top_name, top_id = self._agent_context_stack[-1] - if top_name == agent.name and top_id == agent.agent_id: - self._agent_context_stack.pop() - except Exception: - pass + return agent def fail_agent( @@ -1290,15 +1302,7 @@ def fail_agent( self._meter_provider.force_flush() # type: ignore[attr-defined] except Exception: pass - # Pop context if this agent is active - if isinstance(agent, AgentInvocation): - try: - if self._agent_context_stack and agent.agent_id is not None: - top_name, top_id = self._agent_context_stack[-1] - if top_name == agent.name and top_id == agent.agent_id: - self._agent_context_stack.pop() - except Exception: - pass + return agent # Step lifecycle ------------------------------------------------------ @@ -1306,6 +1310,10 @@ def start_step(self, step: Step) -> Step: """Start a step and create a pending span entry.""" self._refresh_capture_content() _apply_genai_context(step) + # Implicit agent inheritance + step.workflow_name = _get_ctx_attr(_ctx_workflow_name, "workflow.name") + step.agent_name = _get_ctx_attr(_ctx_agent_name, "agent.name") + step.agent_id = _get_ctx_attr(_ctx_agent_id, "agent.id") self._inherit_parent_span(step) self._emitter.on_start(step) self._push_current_span(step) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index f394882c..7cd815df 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -103,6 +103,10 @@ class GenAI: default=None, metadata={"semconv": GenAIAttributes.GEN_AI_DATA_SOURCE_ID}, ) + workflow_name: Optional[str] = field( + default=None, + metadata={"semconv": "gen_ai.workflow.name"}, + ) # Association properties for context tracking. # Emitted on spans as gen_ai.association.properties.. association_properties: Dict[str, Any] = field(default_factory=dict)