diff --git a/src/praisonai-agents/tests/managed/test_managed_trace_events.py b/src/praisonai-agents/tests/managed/test_managed_trace_events.py new file mode 100644 index 000000000..1aecc493f --- /dev/null +++ b/src/praisonai-agents/tests/managed/test_managed_trace_events.py @@ -0,0 +1,243 @@ +""" +Tests for managed agent trace events emission. + +Verifies that AnthropicManagedAgent and LocalManagedAgent emit proper +ContextTraceEmitter events so that langextract/langfuse traces are non-empty. +""" + +import pytest +from unittest.mock import Mock, patch +from praisonaiagents.trace.context_events import ( + ContextListSink, + ContextTraceEmitter, + ContextEventType, + trace_context +) + + +class TestAnthropicManagedAgentTraceEvents: + """Test trace event emission for AnthropicManagedAgent.""" + + def test_execute_sync_emits_trace_events(self): + """Test that _execute_sync emits agent_start, llm_response, and agent_end events.""" + from praisonai.integrations.managed_agents import AnthropicManagedAgent, ManagedConfig + + # Create a mock client and session + mock_client = Mock() + mock_stream = Mock() + mock_stream.__enter__ = Mock(return_value=mock_stream) + mock_stream.__exit__ = Mock(return_value=None) + + # Mock events for the stream + mock_event = Mock() + mock_event.type = "session.status_idle" + mock_stream.__iter__ = Mock(return_value=iter([mock_event])) + + mock_client.beta.sessions.events.stream.return_value = mock_stream + + # Create agent with mocked client + config = ManagedConfig(name="TestAgent", system="Test system") + agent = AnthropicManagedAgent(config=config) + agent._client = mock_client + agent.agent_id = "test_agent_id" + agent.environment_id = "test_env_id" + agent._session_id = "test_session_id" + + # Set up trace sink + sink = ContextListSink() + emitter = ContextTraceEmitter(sink=sink, session_id="test_session", enabled=True) + + with trace_context(emitter): + agent._execute_sync("Write a haiku") + + # Verify events were emitted + events = sink.get_events() + assert len(events) >= 2, f"Expected at least 2 events, got {len(events)}" + + # Check agent_start event + start_events = [e for e in events if e.event_type == ContextEventType.AGENT_START] + assert len(start_events) == 1, f"Expected 1 agent_start event, got {len(start_events)}" + assert start_events[0].agent_name == "TestAgent" + assert start_events[0].data["input"] == "Write a haiku" + assert start_events[0].data["goal"] == "Test system" + + # Check agent_end event + end_events = [e for e in events if e.event_type == ContextEventType.AGENT_END] + assert len(end_events) == 1, f"Expected 1 agent_end event, got {len(end_events)}" + assert end_events[0].agent_name == "TestAgent" + + def test_process_events_emits_tool_events(self): + """Test that _process_events emits tool_call_start and tool_call_end for tool_use events.""" + from praisonai.integrations.managed_agents import AnthropicManagedAgent, ManagedConfig + + # Create agent + config = ManagedConfig(name="TestAgent") + agent = AnthropicManagedAgent(config=config) + + # Mock tool_use event + mock_event = Mock() + mock_event.type = "agent.tool_use" + mock_event.name = "test_tool" + mock_event.id = "tool_123" + mock_event.input = {"query": "test"} + mock_event.needs_confirmation = False + mock_event.usage = None + mock_event.model_usage = None + + # Mock session idle event + mock_idle = Mock() + mock_idle.type = "session.status_idle" + mock_idle.usage = None + mock_idle.model_usage = None + + # Set up trace sink + sink = ContextListSink() + emitter = ContextTraceEmitter(sink=sink, session_id="test_session", enabled=True) + + # Call _process_events with emitter + with trace_context(emitter): + text_parts, tool_log = agent._process_events( + client=Mock(), + session_id="test_session", + stream=[mock_event, mock_idle], + emitter=emitter + ) + + # Verify tool events were emitted + events = sink.get_events() + + start_events = [e for e in events if e.event_type == ContextEventType.TOOL_CALL_START] + assert len(start_events) == 1, f"Expected 1 tool_call_start event, got {len(start_events)}" + assert start_events[0].agent_name == "TestAgent" + assert start_events[0].data["tool_name"] == "test_tool" + assert start_events[0].data["tool_args"] == {"query": "test"} + + end_events = [e for e in events if e.event_type == ContextEventType.TOOL_CALL_END] + assert len(end_events) == 1, f"Expected 1 tool_call_end event, got {len(end_events)}" + assert end_events[0].agent_name == "TestAgent" + assert end_events[0].data["tool_name"] == "test_tool" + assert end_events[0].data["duration_ms"] >= 0 + + +class TestLocalManagedAgentTraceEvents: + """Test trace event emission for LocalManagedAgent.""" + + def test_execute_sync_emits_trace_events(self): + """Test that _execute_sync emits agent_start, llm_response, and agent_end events.""" + from praisonai.integrations.managed_local import LocalManagedAgent, LocalManagedConfig + + # Create agent with minimal config + config = LocalManagedConfig(name="TestAgent", system="Test system", tools=[]) + agent = LocalManagedAgent(config=config) + + # Mock the inner agent + mock_inner_agent = Mock() + mock_inner_agent.chat.return_value = "This is a haiku response" + agent._inner_agent = mock_inner_agent + agent.agent_id = "test_agent_id" + agent.environment_id = "test_env_id" + agent._session_id = "test_session_id" + + # Mock session store methods + agent._persist_message = Mock() + agent._sync_usage = Mock() + agent._persist_state = Mock() + + # Set up trace sink + sink = ContextListSink() + emitter = ContextTraceEmitter(sink=sink, session_id="test_session", enabled=True) + + with trace_context(emitter): + result = agent._execute_sync("Write a haiku") + + assert result == "This is a haiku response" + + # Verify events were emitted + events = sink.get_events() + assert len(events) >= 2, f"Expected at least 2 events, got {len(events)}" + + # Check agent_start event + start_events = [e for e in events if e.event_type == ContextEventType.AGENT_START] + assert len(start_events) == 1, f"Expected 1 agent_start event, got {len(start_events)}" + assert start_events[0].agent_name == "TestAgent" + assert start_events[0].data["input"] == "Write a haiku" + assert start_events[0].data["goal"] == "Test system" + + # Check llm_response event + response_events = [e for e in events if e.event_type == ContextEventType.LLM_RESPONSE] + assert len(response_events) == 1, f"Expected 1 llm_response event, got {len(response_events)}" + assert response_events[0].agent_name == "TestAgent" + assert response_events[0].data["response_content"] == "This is a haiku response" + + # Check agent_end event + end_events = [e for e in events if e.event_type == ContextEventType.AGENT_END] + assert len(end_events) == 1, f"Expected 1 agent_end event, got {len(end_events)}" + assert end_events[0].agent_name == "TestAgent" + + def test_zero_overhead_when_no_emitter(self): + """Test that trace events have zero overhead when no emitter is installed.""" + from praisonai.integrations.managed_local import LocalManagedAgent, LocalManagedConfig + + # Create agent + config = LocalManagedConfig(name="TestAgent", tools=[]) + agent = LocalManagedAgent(config=config) + + # Mock the inner agent + mock_inner_agent = Mock() + mock_inner_agent.chat.return_value = "Response" + agent._inner_agent = mock_inner_agent + + # Mock session methods + agent._persist_message = Mock() + agent._sync_usage = Mock() + agent._persist_state = Mock() + + # Execute without any trace context - should work normally + result = agent._execute_sync("Test prompt") + + assert result == "Response" + mock_inner_agent.chat.assert_called_once_with("Test prompt") + + +class TestRealAgenticTest: + """Real agentic test with actual Agent and managed backend.""" + + @pytest.mark.skipif(True, reason="Gated real agentic test - requires API keys") + def test_agent_with_managed_backend_shows_events(self): + """Real agentic test: Agent(backend=ManagedAgent()).start() with ContextListSink shows ≥ 2 events.""" + from praisonai.integrations.managed_local import LocalManagedAgent, LocalManagedConfig + from praisonaiagents import Agent + + # Create local managed backend + managed_config = LocalManagedConfig( + name="TestAgent", + system="You are a helpful assistant. Respond in exactly one sentence.", + tools=[], # No tools for simple test + ) + managed_backend = LocalManagedAgent(config=managed_config) + + # Create Agent with managed backend + agent = Agent(name="test", backend=managed_backend) + + # Set up trace collection + sink = ContextListSink() + emitter = ContextTraceEmitter(sink=sink, session_id="real_test", enabled=True) + + # Run agent with trace context + with trace_context(emitter): + result = agent.start("Say hi") + + print(f"Agent response: {result}") + + # Verify we got events + events = sink.get_events() + print(f"Collected {len(events)} events:") + for i, event in enumerate(events): + print(f" {i+1}. {event.event_type} - {event.agent_name}") + + assert len(events) >= 2, f"Expected ≥ 2 events for real agentic test, got {len(events)}" + + # Should have at least agent_start and agent_end + event_types = [e.event_type for e in events] + assert ContextEventType.AGENT_START in event_types + assert ContextEventType.AGENT_END in event_types diff --git a/src/praisonai/praisonai/integrations/managed_agents.py b/src/praisonai/praisonai/integrations/managed_agents.py index 185221ac5..714a70152 100644 --- a/src/praisonai/praisonai/integrations/managed_agents.py +++ b/src/praisonai/praisonai/integrations/managed_agents.py @@ -259,7 +259,7 @@ def _ensure_session(self) -> str: # ------------------------------------------------------------------ # Event processing helpers # ------------------------------------------------------------------ - def _process_events(self, client, session_id, stream, *, collect: bool = True, stream_live: bool = False): + def _process_events(self, client, session_id, stream, *, collect: bool = True, stream_live: bool = False, emitter=None): """Walk the SSE stream and return (text_parts, tool_log). Handles: @@ -272,11 +272,14 @@ def _process_events(self, client, session_id, stream, *, collect: bool = True, s Args: stream_live: If True, print text chunks to stdout as they arrive. + emitter: ContextTraceEmitter for trace events. """ import sys as _sys + import time text_parts: List[str] = [] tool_log: List[str] = [] + tool_start_times = {} # Track tool start times for duration calculation for event in stream: etype = getattr(event, "type", None) @@ -292,20 +295,29 @@ def _process_events(self, client, session_id, stream, *, collect: bool = True, s elif etype == "agent.tool_use": name = getattr(event, "name", "unknown") + tool_id = getattr(event, "id", "") + tool_input = getattr(event, "input", {}) + tool_log.append(name) logger.debug("[managed] tool_use: %s", name) if stream_live: _sys.stdout.write(f"\n[Using tool: {name}]\n") _sys.stdout.flush() + # Emit tool_call_start event + if emitter: + agent_name = self._cfg.get("name", "Agent") + emitter.tool_call_start(agent_name, name, tool_input) + tool_start_times[tool_id] = time.time() + # Handle tool confirmation (always_ask policy) if getattr(event, "needs_confirmation", False): approved = True if self.on_tool_confirmation: info = { "name": name, - "input": getattr(event, "input", {}), - "tool_use_id": getattr(event, "id", None), + "input": tool_input, + "tool_use_id": tool_id, } approved = self.on_tool_confirmation(info) # Send confirmation back @@ -313,11 +325,19 @@ def _process_events(self, client, session_id, stream, *, collect: bool = True, s session_id, events=[{ "type": "user.tool_confirmation", - "tool_use_id": getattr(event, "id", ""), + "tool_use_id": tool_id, "allowed": approved, }], ) + # Emit synthetic tool_call_end since Anthropic doesn't provide a direct end event + # We emit this immediately after the tool_use event for now + if emitter and tool_id in tool_start_times: + duration_ms = (time.time() - tool_start_times[tool_id]) * 1000 + agent_name = self._cfg.get("name", "Agent") + emitter.tool_call_end(agent_name, name, duration_ms=duration_ms) + del tool_start_times[tool_id] + elif etype == "agent.custom_tool_use": tool_name = getattr(event, "name", "custom_tool") tool_input = getattr(event, "input", {}) @@ -355,8 +375,12 @@ def _process_events(self, client, session_id, stream, *, collect: bool = True, s # Usage tracking (from event.usage or span.model_usage) usage = getattr(event, "usage", None) or getattr(event, "model_usage", None) if usage: - self.total_input_tokens += getattr(usage, "input_tokens", 0) - self.total_output_tokens += getattr(usage, "output_tokens", 0) + in_t = getattr(usage, "input_tokens", 0) + out_t = getattr(usage, "output_tokens", 0) + if isinstance(in_t, int): + self.total_input_tokens += in_t + if isinstance(out_t, int): + self.total_output_tokens += out_t if tool_log: logger.info("[managed] tools used: %s", tool_log) @@ -382,28 +406,60 @@ def _execute_sync(self, prompt: str, stream_live: bool = False) -> str: (token-by-token streaming). The full text is still returned. """ import sys + + # Get context emitter (zero-overhead when no emitter is installed) + try: + from praisonaiagents.trace.context_events import get_context_emitter + emitter = get_context_emitter() + except ImportError: + emitter = None client = self._get_client() session_id = self._ensure_session() + agent_name = self._cfg.get("name", "Agent") - with client.beta.sessions.events.stream(session_id) as stream: - client.beta.sessions.events.send( - session_id, - events=[{ - "type": "user.message", - "content": [{"type": "text", "text": prompt}], - }], - ) - text_parts, _ = self._process_events( - client, session_id, stream, collect=True, - stream_live=stream_live, - ) + # Emit agent_start event + if emitter: + emitter.agent_start(agent_name, { + "input": prompt, + "goal": self._cfg.get("system", self.instructions) + }) + + try: + with client.beta.sessions.events.stream(session_id) as stream: + client.beta.sessions.events.send( + session_id, + events=[{ + "type": "user.message", + "content": [{"type": "text", "text": prompt}], + }], + ) + text_parts, _ = self._process_events( + client, session_id, stream, collect=True, + stream_live=stream_live, emitter=emitter, + ) - if stream_live: - sys.stdout.write("\n") - sys.stdout.flush() + if stream_live: + sys.stdout.write("\n") + sys.stdout.flush() + + full_response = "".join(text_parts) + + # Emit llm_response event for aggregated text + if emitter and full_response: + emitter.llm_response( + agent_name, + response_content=full_response, + prompt_tokens=self.total_input_tokens, + completion_tokens=self.total_output_tokens + ) - return "".join(text_parts) + return full_response + + finally: + # Emit agent_end event + if emitter: + emitter.agent_end(agent_name) # ------------------------------------------------------------------ # stream() — ManagedBackendProtocol diff --git a/src/praisonai/praisonai/integrations/managed_local.py b/src/praisonai/praisonai/integrations/managed_local.py index 80b641374..ef97fcdf1 100644 --- a/src/praisonai/praisonai/integrations/managed_local.py +++ b/src/praisonai/praisonai/integrations/managed_local.py @@ -547,34 +547,64 @@ def _persist_message(self, role: str, content: str) -> None: def _execute_sync(self, prompt: str, stream_live: bool = False) -> str: """Synchronous execution using PraisonAI Agent.chat().""" + # Get context emitter (zero-overhead when no emitter is installed) + try: + from praisonaiagents.trace.context_events import get_context_emitter + emitter = get_context_emitter() + except ImportError: + emitter = None + agent = self._ensure_agent() self._ensure_session() self._persist_message("user", prompt) + agent_name = self._cfg.get("name", "Agent") - if stream_live: - result_parts = [] - gen = agent.chat(prompt, stream=True) - if hasattr(gen, '__iter__'): - for chunk in gen: - if chunk: - sys.stdout.write(str(chunk)) - sys.stdout.flush() - result_parts.append(str(chunk)) - sys.stdout.write("\n") - sys.stdout.flush() - full = "".join(result_parts) - else: - full = str(gen) if gen else "" - sys.stdout.write(full + "\n") - sys.stdout.flush() - else: - result = agent.chat(prompt) - full = str(result) if result else "" + # Emit agent_start event + if emitter: + emitter.agent_start(agent_name, { + "input": prompt, + "goal": self._cfg.get("system", self.instructions) + }) - self._persist_message("assistant", full) - self._sync_usage() - self._persist_state() - return full + try: + if stream_live: + result_parts = [] + gen = agent.chat(prompt, stream=True) + if hasattr(gen, '__iter__'): + for chunk in gen: + if chunk: + sys.stdout.write(str(chunk)) + sys.stdout.flush() + result_parts.append(str(chunk)) + sys.stdout.write("\n") + sys.stdout.flush() + full = "".join(result_parts) + else: + full = str(gen) if gen else "" + sys.stdout.write(full + "\n") + sys.stdout.flush() + else: + result = agent.chat(prompt) + full = str(result) if result else "" + + # Emit llm_response event for the response + if emitter and full: + emitter.llm_response( + agent_name, + response_content=full, + prompt_tokens=self.total_input_tokens, + completion_tokens=self.total_output_tokens + ) + + self._persist_message("assistant", full) + self._sync_usage() + self._persist_state() + return full + + finally: + # Emit agent_end event + if emitter: + emitter.agent_end(agent_name) # ------------------------------------------------------------------ # stream() — ManagedBackendProtocol