Skip to content

Commit e11fcdd

Browse files
authored
feat: HYBIM-518 Mark root GenAI span with gen_ai.conversation_root + promote root to AgentInvocation (#236)
Change: 1. gen_ai.conversation_root attribute Adds a new gen_ai.conversation_root = true boolean span attribute that explicitly marks the invocation-level root GenAI span in a trace. This gives the backend a reliable way to find the "one span" that represents the entire AI invocation — carrying the user's input and the final assistant output — without relying on topology traversal. The attribute is independent of gen_ai.conversation.id: it marks the per-trace entry point for both multi-turn conversations (where conversation.id groups turns) and single invocations (where no conversation exists). 2. Root span promoted to AgentInvocation (was Workflow) Root spans now default to AgentInvocation (invoke_agent) instead of Workflow (invoke_workflow). This better reflects that the top-level entity in a LangGraph/CrewAI trace is an agent invocation. Workflow root spans are still available via two opt-in mechanisms: Environment variable: OTEL_INSTRUMENTATION_GENAI_ROOT_SPAN_AS_WORKFLOW=true LangGraph config metadata: config = {"metadata": {"workflow_name": "My Workflow"}, ...} When workflow_name is provided in metadata, it also becomes the span name (e.g. workflow My Workflow instead of workflow LangGraph). The root agent span also includes improved output capture: when the LangGraph state has no AIMessages (common pattern — nodes update structured state fields), the agent's output falls back to serializing non-message state fields (triage_result, confidence_score, etc.) as a JSON summary. 3. Resume detection fix Fixed a timing bug where gen_ai.command="resume" was set on the Python entity AFTER start_agent() had already created the OTel span (so the attribute never appeared). Now set in the attrs dict BEFORE _start_agent_invocation(). 4. Renamed gen_ai.workflow.command → gen_ai.command The command attribute (used for resume detection with value "resume") was renamed to be entity-agnostic since it now applies to both Workflow and AgentInvocation root spans. 5. Interrupt status bubbles up to root span When a child entity (step, tool, LLM invocation) encounters a GraphInterrupt or NodeInterrupt, the root span now receives gen_ai.finish_reason="interrupted" attribute. This ensures the root span clearly reflects that the conversation was interrupted, even if the interrupt occurred deeper in the call hierarchy.
1 parent a769522 commit e11fcdd

19 files changed

Lines changed: 1679 additions & 270 deletions

File tree

instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/instrumentation.py

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import json
88
import logging
9+
from contextvars import ContextVar
910
from typing import Any, Collection, Optional
1011

1112
from wrapt import wrap_function_wrapper
@@ -16,7 +17,6 @@
1617
get_telemetry_handler,
1718
)
1819
from opentelemetry.util.genai.types import (
19-
Workflow,
2020
AgentInvocation,
2121
Step,
2222
ToolCall,
@@ -30,6 +30,11 @@
3030

3131
# Global handler instance (singleton)
3232
_handler: Optional[TelemetryHandler] = None
33+
# Active root entity (set during kickoff, used by child wrappers).
34+
# ContextVar ensures thread safety for concurrent kickoffs.
35+
_active_root_entity: ContextVar[Optional[Any]] = ContextVar(
36+
"_active_root_entity", default=None
37+
)
3338

3439
_logger = logging.getLogger(__name__)
3540

@@ -182,44 +187,45 @@ def _safe_unwrap(module: str, name: str):
182187

183188
def _wrap_crew_kickoff(wrapped, instance, args, kwargs):
184189
"""
185-
Wrap Crew.kickoff to create a Workflow span.
190+
Wrap Crew.kickoff to create a root span.
191+
192+
By default creates an AgentInvocation (agent type) at root.
193+
When OTEL_INSTRUMENTATION_GENAI_ROOT_SPAN_AS_WORKFLOW is truthy,
194+
creates a Workflow instead.
186195
187-
Maps to: Workflow type from splunk-otel-util-genai
196+
Uses handler.create_and_start_root() to centralize the Workflow vs
197+
AgentInvocation decision.
188198
"""
189199
try:
190200
handler = _handler
191-
192-
# Create workflow invocation
193-
workflow = Workflow(
194-
name=getattr(instance, "name", None) or "CrewAI Workflow",
195-
workflow_type="crewai.crew",
196-
framework="crewai",
197-
system="crewai",
198-
)
201+
crew_name = getattr(instance, "name", None) or "CrewAI Workflow"
199202

200203
inputs = kwargs.get("inputs")
201204
if inputs is None and args:
202205
inputs = args[0]
203-
if inputs is not None:
204-
workflow.input_messages = _make_input_message(inputs)
205206

206-
# Start the workflow
207-
handler.start_workflow(workflow)
207+
root_entity = handler.create_and_start_root(
208+
crew_name,
209+
workflow_type="crewai.crew",
210+
framework="crewai",
211+
system="crewai",
212+
input_messages=_make_input_message(inputs) if inputs else None,
213+
)
208214
except Exception:
209215
# If instrumentation setup fails, just run the original function
210216
return wrapped(*args, **kwargs)
211217

218+
token = _active_root_entity.set(root_entity)
212219
try:
213220
result = wrapped(*args, **kwargs)
214221

215222
# Capture result information
216223
try:
217224
if result:
218225
if hasattr(result, "raw"):
219-
workflow.output_messages = _make_output_message(result.raw)
226+
root_entity.output_messages = _make_output_message(result.raw)
220227

221-
# Stop the workflow successfully
222-
handler.stop_workflow(workflow)
228+
handler.finish(root_entity)
223229
except Exception:
224230
# Ignore instrumentation errors on success path
225231
pass
@@ -228,10 +234,12 @@ def _wrap_crew_kickoff(wrapped, instance, args, kwargs):
228234
except Exception as exc:
229235
# Wrapped function failed - record error and end span
230236
try:
231-
handler.fail(workflow, Error(message=str(exc), type=type(exc)))
237+
handler.fail(root_entity, Error(message=str(exc), type=type(exc)))
232238
except Exception:
233239
pass
234240
raise
241+
finally:
242+
_active_root_entity.reset(token)
235243

236244

237245
def _wrap_agent_execute_task(wrapped, instance, args, kwargs):
@@ -264,6 +272,11 @@ def _wrap_agent_execute_task(wrapped, instance, args, kwargs):
264272
messages["expected_output"] = expected_output
265273
agent_invocation.input_messages = _make_input_message(messages)
266274

275+
# Communicate parent hierarchy for conversation root detection.
276+
root = _active_root_entity.get()
277+
if root and getattr(root, "span", None):
278+
agent_invocation.parent_span = root.span
279+
267280
# Start the agent invocation
268281
handler.start_agent(agent_invocation)
269282
except Exception:

instrumentation-genai/opentelemetry-instrumentation-crewai/tests/conftest.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,70 @@ def __init__(self):
9898
self.stopped_tool_calls = []
9999
self.failed_entities = []
100100

101+
def _should_use_workflow_root(self, force_workflow=False, workflow_name=None):
102+
"""Check if root should be workflow based on env var."""
103+
if force_workflow or workflow_name:
104+
return True
105+
val = os.environ.get("OTEL_INSTRUMENTATION_GENAI_ROOT_SPAN_AS_WORKFLOW", "")
106+
return val.strip().lower() in {"1", "true", "yes", "on"}
107+
108+
def create_and_start_root(
109+
self,
110+
name,
111+
*,
112+
force_workflow=False,
113+
workflow_name=None,
114+
workflow_type=None,
115+
framework=None,
116+
system=None,
117+
input_messages=None,
118+
conversation_id=None,
119+
attributes=None,
120+
):
121+
"""Create and start a root entity (Workflow or AgentInvocation)."""
122+
from opentelemetry.util.genai.types import Workflow, AgentInvocation
123+
124+
use_workflow = self._should_use_workflow_root(force_workflow, workflow_name)
125+
attrs = attributes or {}
126+
127+
if use_workflow:
128+
wf_name = workflow_name or name
129+
entity = Workflow(
130+
name=wf_name,
131+
workflow_type=workflow_type,
132+
framework=framework,
133+
system=system,
134+
attributes=attrs,
135+
)
136+
if input_messages:
137+
entity.input_messages = input_messages
138+
if conversation_id:
139+
entity.conversation_id = conversation_id
140+
self.start_workflow(entity)
141+
else:
142+
entity = AgentInvocation(
143+
name=name,
144+
framework=framework,
145+
system=system,
146+
attributes=attrs,
147+
)
148+
entity.agent_name = name
149+
if input_messages:
150+
entity.input_messages = input_messages
151+
if conversation_id:
152+
entity.conversation_id = conversation_id
153+
self.start_agent(entity)
154+
155+
return entity
156+
157+
def finish(self, entity):
158+
"""Finish any entity (generic stop dispatcher)."""
159+
from opentelemetry.util.genai.types import Workflow
160+
161+
if isinstance(entity, Workflow):
162+
return self.stop_workflow(entity)
163+
return self.stop_agent(entity)
164+
101165
def start_workflow(self, workflow):
102166
self.started_workflows.append(workflow)
103167
return workflow

0 commit comments

Comments
 (0)