Skip to content

Commit ae95a6b

Browse files
nagkumar91Copilot
andcommitted
feat(crewai): add memory operation tracing (remember/recall/forget/reset)
Add GenAI memory semantic convention spans for CrewAI's unified memory system: - Memory.remember() → update_memory span - Captures importance, scope, namespace, update_strategy (merge) - Records memory ID from returned MemoryRecord - Memory.recall() → search_memory span - Captures query (opt-in), scope, result count - Infers memory type from categories - Memory.forget() → delete_memory span - Captures scope, individual record ID when deleting single records - Reports deleted_count from return value - Memory.reset() → delete_memory span - Scope-level deletion with reset indicator All wrappers: - Set gen_ai.operation.name, gen_ai.system, gen_ai.provider.name - Infer gen_ai.memory.scope from MemoryScope._root path - Record gen_ai.client.operation.duration metric - Set error.type on failures - Gate content/query capture behind OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT Aligned with GenAI memory semantic conventions: open-telemetry/semantic-conventions#3250 11 new tests covering all 4 operations + content capture + error handling. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent c2974c9 commit ae95a6b

2 files changed

Lines changed: 513 additions & 0 deletions

File tree

packages/opentelemetry-instrumentation-crewai/opentelemetry/instrumentation/crewai/instrumentation.py

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,39 @@
1010
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
1111
from opentelemetry.instrumentation.crewai.version import __version__
1212
from opentelemetry.semconv._incubating.attributes import (
13+
error_attributes as ErrorAttributes,
1314
gen_ai_attributes as GenAIAttributes,
1415
)
1516
from opentelemetry.semconv_ai import SpanAttributes, TraceloopSpanKindValues, Meters
1617
from .crewai_span_attributes import CrewAISpanAttributes, set_span_attribute
1718

1819
_instruments = ("crewai >= 0.70.0",)
1920

21+
# GenAI memory semantic convention attribute keys (fallback to string
22+
# literals when the installed semconv package doesn't define them yet).
23+
_GEN_AI_OPERATION_NAME = getattr(GenAIAttributes, "GEN_AI_OPERATION_NAME", "gen_ai.operation.name")
24+
_GEN_AI_PROVIDER_NAME = getattr(GenAIAttributes, "GEN_AI_PROVIDER_NAME", "gen_ai.provider.name")
25+
_GEN_AI_MEMORY_SCOPE = getattr(GenAIAttributes, "GEN_AI_MEMORY_SCOPE", "gen_ai.memory.scope")
26+
_GEN_AI_MEMORY_TYPE = getattr(GenAIAttributes, "GEN_AI_MEMORY_TYPE", "gen_ai.memory.type")
27+
_GEN_AI_MEMORY_QUERY = getattr(GenAIAttributes, "GEN_AI_MEMORY_QUERY", "gen_ai.memory.query")
28+
_GEN_AI_MEMORY_CONTENT = getattr(GenAIAttributes, "GEN_AI_MEMORY_CONTENT", "gen_ai.memory.content")
29+
_GEN_AI_MEMORY_NAMESPACE = getattr(GenAIAttributes, "GEN_AI_MEMORY_NAMESPACE", "gen_ai.memory.namespace")
30+
_GEN_AI_MEMORY_SEARCH_RESULT_COUNT = getattr(
31+
GenAIAttributes, "GEN_AI_MEMORY_SEARCH_RESULT_COUNT", "gen_ai.memory.search.result.count"
32+
)
33+
_GEN_AI_MEMORY_UPDATE_STRATEGY = getattr(
34+
GenAIAttributes, "GEN_AI_MEMORY_UPDATE_STRATEGY", "gen_ai.memory.update.strategy"
35+
)
36+
_GEN_AI_MEMORY_IMPORTANCE = getattr(GenAIAttributes, "GEN_AI_MEMORY_IMPORTANCE", "gen_ai.memory.importance")
37+
_ERROR_TYPE = getattr(ErrorAttributes, "ERROR_TYPE", "error.type")
38+
39+
_PROVIDER = "crewai"
40+
41+
42+
def _capture_content() -> bool:
43+
"""Check if memory content capture is enabled."""
44+
return os.environ.get("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "").lower() in ("true", "1")
45+
2046

2147
class CrewAIInstrumentor(BaseInstrumentor):
2248

@@ -44,12 +70,40 @@ def _instrument(self, **kwargs):
4470
wrap_function_wrapper("crewai.llm", "LLM.call",
4571
wrap_llm_call(tracer, duration_histogram, token_histogram))
4672

73+
# Memory operations (crewai.memory.unified_memory.Memory)
74+
try:
75+
wrap_function_wrapper(
76+
"crewai.memory.unified_memory", "Memory.remember",
77+
wrap_memory_remember(tracer, duration_histogram))
78+
wrap_function_wrapper(
79+
"crewai.memory.unified_memory", "Memory.recall",
80+
wrap_memory_recall(tracer, duration_histogram))
81+
wrap_function_wrapper(
82+
"crewai.memory.unified_memory", "Memory.forget",
83+
wrap_memory_forget(tracer, duration_histogram))
84+
wrap_function_wrapper(
85+
"crewai.memory.unified_memory", "Memory.reset",
86+
wrap_memory_reset(tracer, duration_histogram))
87+
except Exception:
88+
# CrewAI versions before unified_memory may not have these classes
89+
pass
90+
4791
def _uninstrument(self, **kwargs):
4892
unwrap("crewai.crew.Crew", "kickoff")
4993
unwrap("crewai.agent.Agent", "execute_task")
5094
unwrap("crewai.task.Task", "execute_sync")
5195
unwrap("crewai.llm.LLM", "call")
5296

97+
# Memory unwrap (ignore if not wrapped)
98+
try:
99+
from crewai.memory.unified_memory import Memory as UnifiedMemory
100+
unwrap(UnifiedMemory, "remember")
101+
unwrap(UnifiedMemory, "recall")
102+
unwrap(UnifiedMemory, "forget")
103+
unwrap(UnifiedMemory, "reset")
104+
except Exception:
105+
pass
106+
53107

54108
def with_tracer_wrapper(func):
55109
"""Helper for providing tracer for wrapper functions."""
@@ -199,3 +253,229 @@ def _create_metrics(meter: Meter):
199253
)
200254

201255
return token_histogram, duration_histogram
256+
257+
258+
# ---------------------------------------------------------------------------
259+
# Memory operation wrappers — aligned with GenAI memory semantic conventions
260+
# ---------------------------------------------------------------------------
261+
262+
263+
def _infer_memory_scope(instance) -> str:
264+
"""Infer memory scope from the Memory instance or its MemoryScope wrapper."""
265+
# MemoryScope has a _root attribute like "/agent/1" or "/user/123"
266+
root = getattr(instance, "_root", None)
267+
if root:
268+
parts = root.strip("/").split("/")
269+
if parts:
270+
first = parts[0].lower()
271+
if first in ("user", "agent", "session", "team", "global"):
272+
return first
273+
return "agent"
274+
275+
276+
def _infer_memory_type(kwargs) -> str:
277+
"""Infer memory type from kwargs categories hint, defaulting to long_term."""
278+
categories = kwargs.get("categories")
279+
if categories and isinstance(categories, list):
280+
for cat in categories:
281+
cl = str(cat).lower()
282+
if "short" in cl:
283+
return "short_term"
284+
if "entity" in cl:
285+
return "entity"
286+
return "long_term"
287+
288+
289+
def _set_memory_error(span, exc):
290+
"""Record error details on the span."""
291+
error_type = type(exc).__qualname__
292+
span.set_status(Status(StatusCode.ERROR, str(exc)))
293+
set_span_attribute(span, _ERROR_TYPE, error_type)
294+
return error_type
295+
296+
297+
def _record_memory_duration(duration_histogram, duration_s, operation, error_type=None):
298+
"""Record memory operation duration metric."""
299+
if not duration_histogram:
300+
return
301+
attrs = {
302+
_GEN_AI_OPERATION_NAME: operation,
303+
GenAIAttributes.GEN_AI_SYSTEM: _PROVIDER,
304+
}
305+
if error_type:
306+
attrs[_ERROR_TYPE] = error_type
307+
duration_histogram.record(max(duration_s, 0.0), attributes=attrs)
308+
309+
310+
def wrap_memory_remember(tracer: Tracer, duration_histogram: Histogram):
311+
"""Wrap Memory.remember() → update_memory span."""
312+
def _wrapper(wrapped, instance, args, kwargs):
313+
operation = "update_memory"
314+
span_name = f"{operation} {_PROVIDER}"
315+
error_type = None
316+
start_time = time.time()
317+
with tracer.start_as_current_span(
318+
span_name, kind=SpanKind.CLIENT,
319+
attributes={GenAIAttributes.GEN_AI_SYSTEM: _PROVIDER}
320+
) as span:
321+
set_span_attribute(span, _GEN_AI_OPERATION_NAME, operation)
322+
set_span_attribute(span, _GEN_AI_PROVIDER_NAME, _PROVIDER)
323+
set_span_attribute(span, _GEN_AI_MEMORY_SCOPE, _infer_memory_scope(instance))
324+
set_span_attribute(span, _GEN_AI_MEMORY_TYPE, _infer_memory_type(kwargs))
325+
set_span_attribute(span, _GEN_AI_MEMORY_UPDATE_STRATEGY, "merge")
326+
327+
# Namespace from source kwarg
328+
source = kwargs.get("source")
329+
if source:
330+
set_span_attribute(span, _GEN_AI_MEMORY_NAMESPACE, str(source))
331+
332+
# Scope path
333+
scope = kwargs.get("scope")
334+
if scope:
335+
set_span_attribute(span, "crewai.memory.scope_path", str(scope))
336+
337+
importance = kwargs.get("importance")
338+
if importance is not None:
339+
set_span_attribute(span, _GEN_AI_MEMORY_IMPORTANCE, float(importance))
340+
341+
# Content (opt-in)
342+
if _capture_content() and args:
343+
content = args[0] if args else kwargs.get("content")
344+
if content and isinstance(content, str):
345+
set_span_attribute(span, _GEN_AI_MEMORY_CONTENT, content)
346+
347+
try:
348+
result = wrapped(*args, **kwargs)
349+
span.set_status(Status(StatusCode.OK))
350+
# MemoryRecord has an id attribute
351+
if result and hasattr(result, "id"):
352+
set_span_attribute(span, "gen_ai.memory.id", str(result.id))
353+
return result
354+
except Exception as ex:
355+
error_type = _set_memory_error(span, ex)
356+
raise
357+
finally:
358+
_record_memory_duration(
359+
duration_histogram, time.time() - start_time, operation, error_type
360+
)
361+
return _wrapper
362+
363+
364+
def wrap_memory_recall(tracer: Tracer, duration_histogram: Histogram):
365+
"""Wrap Memory.recall() → search_memory span."""
366+
def _wrapper(wrapped, instance, args, kwargs):
367+
operation = "search_memory"
368+
span_name = f"{operation} {_PROVIDER}"
369+
error_type = None
370+
start_time = time.time()
371+
with tracer.start_as_current_span(
372+
span_name, kind=SpanKind.CLIENT,
373+
attributes={GenAIAttributes.GEN_AI_SYSTEM: _PROVIDER}
374+
) as span:
375+
set_span_attribute(span, _GEN_AI_OPERATION_NAME, operation)
376+
set_span_attribute(span, _GEN_AI_PROVIDER_NAME, _PROVIDER)
377+
set_span_attribute(span, _GEN_AI_MEMORY_SCOPE, _infer_memory_scope(instance))
378+
set_span_attribute(span, _GEN_AI_MEMORY_TYPE, _infer_memory_type(kwargs))
379+
380+
# Query (opt-in)
381+
query = args[0] if args else kwargs.get("query")
382+
if _capture_content() and query and isinstance(query, str):
383+
set_span_attribute(span, _GEN_AI_MEMORY_QUERY, query)
384+
385+
# Scope path
386+
scope = kwargs.get("scope")
387+
if scope:
388+
set_span_attribute(span, "crewai.memory.scope_path", str(scope))
389+
390+
source = kwargs.get("source")
391+
if source:
392+
set_span_attribute(span, _GEN_AI_MEMORY_NAMESPACE, str(source))
393+
394+
try:
395+
result = wrapped(*args, **kwargs)
396+
span.set_status(Status(StatusCode.OK))
397+
if isinstance(result, list):
398+
set_span_attribute(span, _GEN_AI_MEMORY_SEARCH_RESULT_COUNT, len(result))
399+
return result
400+
except Exception as ex:
401+
error_type = _set_memory_error(span, ex)
402+
raise
403+
finally:
404+
_record_memory_duration(
405+
duration_histogram, time.time() - start_time, operation, error_type
406+
)
407+
return _wrapper
408+
409+
410+
def wrap_memory_forget(tracer: Tracer, duration_histogram: Histogram):
411+
"""Wrap Memory.forget() → delete_memory span."""
412+
def _wrapper(wrapped, instance, args, kwargs):
413+
operation = "delete_memory"
414+
span_name = f"{operation} {_PROVIDER}"
415+
error_type = None
416+
start_time = time.time()
417+
with tracer.start_as_current_span(
418+
span_name, kind=SpanKind.CLIENT,
419+
attributes={GenAIAttributes.GEN_AI_SYSTEM: _PROVIDER}
420+
) as span:
421+
set_span_attribute(span, _GEN_AI_OPERATION_NAME, operation)
422+
set_span_attribute(span, _GEN_AI_PROVIDER_NAME, _PROVIDER)
423+
set_span_attribute(span, _GEN_AI_MEMORY_SCOPE, _infer_memory_scope(instance))
424+
425+
scope = kwargs.get("scope")
426+
if scope:
427+
set_span_attribute(span, "crewai.memory.scope_path", str(scope))
428+
429+
record_ids = kwargs.get("record_ids")
430+
if record_ids and isinstance(record_ids, list) and len(record_ids) == 1:
431+
set_span_attribute(span, "gen_ai.memory.id", str(record_ids[0]))
432+
433+
try:
434+
result = wrapped(*args, **kwargs)
435+
span.set_status(Status(StatusCode.OK))
436+
# forget() returns number of deleted records
437+
if isinstance(result, int):
438+
set_span_attribute(span, "crewai.memory.deleted_count", result)
439+
return result
440+
except Exception as ex:
441+
error_type = _set_memory_error(span, ex)
442+
raise
443+
finally:
444+
_record_memory_duration(
445+
duration_histogram, time.time() - start_time, operation, error_type
446+
)
447+
return _wrapper
448+
449+
450+
def wrap_memory_reset(tracer: Tracer, duration_histogram: Histogram):
451+
"""Wrap Memory.reset() → delete_memory span (scope-level wipe)."""
452+
def _wrapper(wrapped, instance, args, kwargs):
453+
operation = "delete_memory"
454+
span_name = f"{operation} {_PROVIDER}"
455+
error_type = None
456+
start_time = time.time()
457+
with tracer.start_as_current_span(
458+
span_name, kind=SpanKind.CLIENT,
459+
attributes={GenAIAttributes.GEN_AI_SYSTEM: _PROVIDER}
460+
) as span:
461+
set_span_attribute(span, _GEN_AI_OPERATION_NAME, operation)
462+
set_span_attribute(span, _GEN_AI_PROVIDER_NAME, _PROVIDER)
463+
set_span_attribute(span, _GEN_AI_MEMORY_SCOPE, _infer_memory_scope(instance))
464+
set_span_attribute(span, "crewai.memory.reset", True)
465+
466+
scope = kwargs.get("scope")
467+
if scope:
468+
set_span_attribute(span, "crewai.memory.scope_path", str(scope))
469+
470+
try:
471+
result = wrapped(*args, **kwargs)
472+
span.set_status(Status(StatusCode.OK))
473+
return result
474+
except Exception as ex:
475+
error_type = _set_memory_error(span, ex)
476+
raise
477+
finally:
478+
_record_memory_duration(
479+
duration_histogram, time.time() - start_time, operation, error_type
480+
)
481+
return _wrapper

0 commit comments

Comments
 (0)