Skip to content

Commit aa5f9e1

Browse files
committed
1 support CrewAI uninstrument;
2 change unit test to adapt code.
1 parent 6239454 commit aa5f9e1

9 files changed

Lines changed: 946 additions & 710 deletions

File tree

instrumentation-loongsuite/loongsuite-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/__init__.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,24 @@
3939
to_output_message,
4040
)
4141
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
42+
from opentelemetry.instrumentation.utils import unwrap
4243
from opentelemetry.semconv._incubating.attributes import gen_ai_attributes
4344
from opentelemetry.trace import SpanKind, Status, StatusCode
4445
from opentelemetry.util.genai.completion_hook import load_completion_hook
4546

47+
try:
48+
import crewai.agent
49+
import crewai.crew
50+
import crewai.flow.flow
51+
import crewai.task
52+
import crewai.tools.tool_usage
53+
54+
_CREWAI_LOADED = True
55+
except (ImportError, Exception):
56+
_CREWAI_LOADED = False
57+
4658
logger = logging.getLogger(__name__)
4759

48-
MAX_MESSAGE_SIZE = 10000 # characters
4960

5061
# Context keys for tracking
5162
_CREWAI_SPAN_KEY = context_api.create_key("crewai_span")
@@ -128,7 +139,21 @@ def _instrument(self, **kwargs: Any) -> None:
128139
logger.debug(f"Could not wrap ToolUsage._use: {e}")
129140

130141
def _uninstrument(self, **kwargs: Any) -> None:
131-
pass
142+
"""Uninstrument CrewAI framework."""
143+
if not _CREWAI_LOADED:
144+
logger.debug(
145+
"CrewAI modules were not available for uninstrumentation."
146+
)
147+
return
148+
try:
149+
unwrap(crewai.crew.Crew, "kickoff")
150+
unwrap(crewai.flow.flow.Flow, "kickoff_async")
151+
unwrap(crewai.agent.Agent, "execute_task")
152+
unwrap(crewai.task.Task, "execute_sync")
153+
unwrap(crewai.tools.tool_usage.ToolUsage, "_use")
154+
155+
except Exception as e:
156+
logger.debug(f"Error during uninstrumenting: {e}")
132157

133158

134159
class _CrewKickoffWrapper:

instrumentation-loongsuite/loongsuite-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/utils.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import dataclasses
22
import json
3+
import logging
34
from base64 import b64encode
4-
from functools import partial
55
from typing import Any, Dict, List, Optional
66

77
from opentelemetry.trace import Span
@@ -14,6 +14,10 @@
1414
ToolCall,
1515
)
1616

17+
logger = logging.getLogger(__name__)
18+
19+
MAX_MESSAGE_SIZE = 10000
20+
1721

1822
class _GenAiJsonEncoder(json.JSONEncoder):
1923
def default(self, o: Any) -> Any:
@@ -22,9 +26,36 @@ def default(self, o: Any) -> Any:
2226
return super().default(o)
2327

2428

25-
gen_ai_json_dumps = partial(
26-
json.dumps, separators=(",", ":"), cls=_GenAiJsonEncoder
27-
)
29+
def _safe_json_dumps(
30+
obj: Any, default: str = "", max_size: int = MAX_MESSAGE_SIZE
31+
) -> str:
32+
"""
33+
Safely serialize object to JSON string with size limit.
34+
"""
35+
if obj is None:
36+
return default
37+
38+
# Fast path for simple types
39+
if isinstance(obj, (str, int, float, bool)):
40+
result = str(obj)
41+
else:
42+
try:
43+
result = json.dumps(
44+
obj,
45+
separators=(",", ":"),
46+
cls=_GenAiJsonEncoder,
47+
ensure_ascii=False,
48+
)
49+
except Exception as e:
50+
logger.debug(f"Failed to serialize to JSON: {e}")
51+
result = str(obj)
52+
53+
if len(result) > max_size:
54+
return result[:max_size] + "...[truncated]"
55+
return result
56+
57+
58+
gen_ai_json_dumps = _safe_json_dumps
2859

2960
GEN_AI_INPUT_MESSAGES = "gen_ai.input.messages"
3061
GEN_AI_OUTPUT_MESSAGES = "gen_ai.output.messages"

instrumentation-loongsuite/loongsuite-instrumentation-crewai/tests/test_agent_workflow.py

Lines changed: 112 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,22 @@
1515
"""
1616
Test cases for Agent workflow orchestration and multi-agent collaboration in CrewAI.
1717
18-
Business Demo Description:
19-
This test suite uses CrewAI framework to test complex Agent workflows including:
20-
- Sequential task execution
21-
- Hierarchical task delegation
22-
- Multi-agent collaboration
23-
- Agent lifecycle management
18+
This test suite verifies complex Agent interaction patterns, ensuring that the
19+
trace hierarchy and span relationships correctly represent the execution flow.
2420
"""
2521

26-
import pysqlite3
22+
import json
23+
import os
2724
import sys
28-
sys.modules["sqlite3"] = pysqlite3
2925

30-
import os
31-
import unittest
32-
from crewai import Agent, Task, Crew, Process
33-
from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor
26+
import pysqlite3
27+
from crewai import Agent, Crew, Process, Task
3428

3529
from opentelemetry.instrumentation.crewai import CrewAIInstrumentor
3630
from opentelemetry.test.test_base import TestBase
3731

32+
sys.modules["sqlite3"] = pysqlite3
33+
3834

3935
class TestAgentWorkflow(TestBase):
4036
"""Test Agent workflow orchestration scenarios."""
@@ -43,46 +39,48 @@ def setUp(self):
4339
"""Setup test resources."""
4440
super().setUp()
4541
# Set up environment variables
46-
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder")
47-
os.environ["DASHSCOPE_API_KEY"] = os.getenv("DASHSCOPE_API_KEY", "test-dashscope-key-placeholder")
48-
os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1"
49-
os.environ["DASHSCOPE_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1"
50-
42+
os.environ["OPENAI_API_KEY"] = os.environ.get(
43+
"OPENAI_API_KEY", "fake-key"
44+
)
45+
os.environ["DASHSCOPE_API_KEY"] = os.environ.get(
46+
"DASHSCOPE_API_KEY", "fake-key"
47+
)
48+
os.environ["OPENAI_API_BASE"] = (
49+
"https://dashscope.aliyuncs.com/compatible-mode/v1"
50+
)
51+
os.environ["DASHSCOPE_API_BASE"] = (
52+
"https://dashscope.aliyuncs.com/compatible-mode/v1"
53+
)
54+
55+
os.environ["CREWAI_TRACING_ENABLED"] = "false"
5156
self.instrumentor = CrewAIInstrumentor()
52-
self.instrumentor.instrument()
57+
self.instrumentor.instrument(tracer_provider=self.tracer_provider)
5358

54-
self.litellm_instrumentor = LiteLLMInstrumentor()
55-
self.litellm_instrumentor.instrument()
56-
5759
# Test data
58-
self.model_name = "dashscope/qwen-turbo"
59-
60+
self.model_name = "openai/qwen-turbo"
61+
6062
def tearDown(self):
6163
"""Cleanup test resources."""
6264
with self.disable_logging():
6365
self.instrumentor.uninstrument()
64-
self.litellm_instrumentor.uninstrument()
6566
super().tearDown()
66-
from aliyun.sdk.extension.arms.semconv.metrics import SingletonMeta
67-
SingletonMeta.reset()
6867

6968
def test_sequential_workflow(self):
7069
"""
7170
Test sequential workflow with multiple agents and tasks.
72-
71+
7372
Business Demo:
7473
- Creates a Crew with 3 Agents (Researcher, Analyst, Writer)
7574
- Executes 3 Tasks sequentially
7675
- Each task is handled by a different agent
7776
- Performs 3 LLM calls (one per task)
78-
77+
7978
Verification:
8079
- 1 CHAIN span for Crew.kickoff
81-
- 3 AGENT spans (one per agent execution)
8280
- 3 TASK spans (one per task)
83-
- 3 LLM spans
84-
- Span hierarchy: CHAIN -> TASK -> AGENT -> LLM
85-
- Metrics: genai_calls_count=3, duration and tokens for each call
81+
- 3 AGENT spans (one per agent execution)
82+
- Span hierarchy: Crew -> Task -> Agent
83+
- Trace continuity across all sequential operations
8684
"""
8785
# Create Agents
8886
researcher = Agent(
@@ -140,45 +138,68 @@ def test_sequential_workflow(self):
140138

141139
# Verify spans
142140
spans = self.memory_exporter.get_finished_spans()
143-
144-
chain_spans = [s for s in spans if s.attributes.get("gen_ai.span.kind") == "CHAIN"]
145-
task_spans = [s for s in spans if s.attributes.get("gen_ai.span.kind") == "TASK"]
141+
142+
chain_spans = [
143+
s
144+
for s in spans
145+
if s.attributes.get("gen_ai.operation.name") == "crew.kickoff"
146+
]
147+
task_spans = [
148+
s
149+
for s in spans
150+
if s.attributes.get("gen_ai.operation.name") == "task.execute"
151+
]
152+
agent_spans = [
153+
s
154+
for s in spans
155+
if s.attributes.get("gen_ai.operation.name") == "agent.execute"
156+
]
146157

147158
# Verify span counts
148-
self.assertGreaterEqual(len(chain_spans), 1, "Expected at least 1 CHAIN span")
149-
self.assertGreaterEqual(len(task_spans), 3, f"Expected at least 3 TASK spans, got {len(task_spans)}")
150-
159+
self.assertGreaterEqual(
160+
len(chain_spans), 1, "Expected at least 1 CHAIN span"
161+
)
162+
self.assertGreaterEqual(
163+
len(task_spans),
164+
3,
165+
f"Expected at least 3 TASK spans, got {len(task_spans)}",
166+
)
167+
self.assertGreaterEqual(
168+
len(agent_spans),
169+
3,
170+
f"Expected at least 3 AGENT spans, got {len(agent_spans)}",
171+
)
172+
151173
# Verify CHAIN span has proper attributes
152174
chain_span = chain_spans[0]
153-
self.assertEqual(chain_span.attributes.get("gen_ai.span.kind"), "CHAIN")
154-
self.assertIsNotNone(chain_span.attributes.get("gen_ai.operation.name"))
155-
self.assertIsNotNone(chain_span.attributes.get("output.value"))
156-
157-
# Verify metrics show 3 LLM calls
158-
metrics = self.memory_metrics_reader.get_metrics_data()
159-
160-
for resource_metrics in metrics.resource_metrics:
161-
for scope_metrics in resource_metrics.scope_metrics:
162-
for metric in scope_metrics.metrics:
163-
if metric.name == "genai_calls_count":
164-
for data_point in metric.data.data_points:
165-
# Should have at least 3 calls
166-
self.assertGreaterEqual(data_point.value, 3, f"Expected at least 3 LLM calls, got {data_point.value}")
175+
self.assertEqual(chain_span.attributes.get("gen_ai.system"), "crewai")
176+
self.assertEqual(
177+
chain_span.attributes.get("gen_ai.operation.name"), "crew.kickoff"
178+
)
179+
180+
# Verify result is captured in OpenTelemetry GenAI format
181+
output_messages_json = chain_span.attributes.get(
182+
"gen_ai.output.messages"
183+
)
184+
self.assertIsNotNone(output_messages_json)
185+
output_messages = json.loads(output_messages_json)
186+
self.assertGreater(len(output_messages), 0)
187+
self.assertIn("role", output_messages[0])
188+
self.assertIn("parts", output_messages[0])
167189

168190
def test_multi_agent_collaboration(self):
169191
"""
170192
Test multi-agent collaboration scenario.
171-
193+
172194
Business Demo:
173195
- Creates a Crew with 2 Agents working together
174196
- Agents share context and collaborate on tasks
175197
- Executes 2 Tasks with agent collaboration
176198
- Performs multiple LLM calls with shared context
177-
199+
178200
Verification:
179201
- Multiple AGENT spans with proper context
180202
- All spans share the same trace context
181-
- Metrics: genai_calls_count reflects all collaborative LLM calls
182203
"""
183204
# Create collaborative agents
184205
designer = Agent(
@@ -222,25 +243,35 @@ def test_multi_agent_collaboration(self):
222243

223244
# Verify spans
224245
spans = self.memory_exporter.get_finished_spans()
225-
246+
226247
# Verify all spans share the same trace
227248
trace_ids = set(span.context.trace_id for span in spans)
228-
self.assertEqual(len(trace_ids), 1, "All spans should share the same trace ID")
249+
self.assertEqual(
250+
len(trace_ids), 1, "All spans should share the same trace ID"
251+
)
252+
253+
agent_spans = [
254+
s
255+
for s in spans
256+
if s.attributes.get("gen_ai.operation.name") == "agent.execute"
257+
]
229258

230-
agent_spans = [s for s in spans if s.attributes.get("gen_ai.span.kind") == "AGENT"]
231-
232259
# Should have multiple agent spans for collaboration
233-
self.assertGreaterEqual(len(agent_spans), 2, f"Expected at least 2 AGENT spans, got {len(agent_spans)}")
260+
self.assertGreaterEqual(
261+
len(agent_spans),
262+
2,
263+
f"Expected at least 2 AGENT spans, got {len(agent_spans)}",
264+
)
234265

235266
def test_hierarchical_workflow(self):
236267
"""
237268
Test hierarchical workflow with manager delegation.
238-
269+
239270
Business Demo:
240271
- Creates a Crew with hierarchical process
241272
- Manager agent delegates tasks to worker agents
242273
- Executes tasks with delegation pattern
243-
274+
244275
Verification:
245276
- CHAIN span for overall workflow
246277
- Multiple AGENT spans showing delegation hierarchy
@@ -278,25 +309,26 @@ def test_hierarchical_workflow(self):
278309

279310
# Create Crew with hierarchical process
280311
# Note: Hierarchical process requires a manager_llm
281-
try:
282-
crew = Crew(
283-
agents=[worker1, worker2],
284-
tasks=[task1, task2],
285-
process=Process.hierarchical,
286-
manager_llm=self.model_name,
287-
verbose=True,
288-
)
289-
290-
crew.kickoff()
291-
except Exception as e:
292-
# Hierarchical process may not be fully supported in test environment
293-
raise unittest.SkipTest(f"Hierarchical process not supported: {e}")
312+
crew = Crew(
313+
agents=[worker1, worker2],
314+
tasks=[task1, task2],
315+
process=Process.hierarchical,
316+
manager_llm=self.model_name,
317+
verbose=True,
318+
)
319+
320+
crew.kickoff()
294321

295322
# Verify spans
296323
spans = self.memory_exporter.get_finished_spans()
297-
298-
chain_spans = [s for s in spans if s.attributes.get("gen_ai.span.kind") == "CHAIN"]
299-
300-
# Should have CHAIN span for hierarchical workflow
301-
self.assertGreaterEqual(len(chain_spans), 1, "Expected at least 1 CHAIN span")
302324

325+
chain_spans = [
326+
s
327+
for s in spans
328+
if s.attributes.get("gen_ai.operation.name") == "crew.kickoff"
329+
]
330+
331+
# Should have CHAIN span for hierarchical workflow
332+
self.assertGreaterEqual(
333+
len(chain_spans), 1, "Expected at least 1 CHAIN span"
334+
)

0 commit comments

Comments
 (0)