diff --git a/src/praisonai/praisonai/agents_generator.py b/src/praisonai/praisonai/agents_generator.py index f126e4b07..d9d0cdc40 100644 --- a/src/praisonai/praisonai/agents_generator.py +++ b/src/praisonai/praisonai/agents_generator.py @@ -1109,7 +1109,6 @@ def _run_praisonai(self, config, topic, tools_dict): if acp_enabled or lsp_enabled: try: import asyncio - import os from praisonai.cli.features.interactive_runtime import InteractiveRuntime, RuntimeConfig from praisonai.cli.features.agent_tools import create_agent_centric_tools import nest_asyncio diff --git a/src/praisonai/praisonai/cli/app.py b/src/praisonai/praisonai/cli/app.py index c5722c706..1d34963bd 100644 --- a/src/praisonai/praisonai/cli/app.py +++ b/src/praisonai/praisonai/cli/app.py @@ -63,10 +63,23 @@ def _setup_langextract_observability(*, verbose: bool = False) -> None: # Ensure sink is closed on exit to write the trace file atexit.register(sink.close) - # Set up action-level trace emitter + # Set up action-level trace emitter (covers RouterAgent / PlanningAgent) emitter = TraceEmitter(sink=sink, enabled=True) set_default_emitter(emitter) - + + # Bridge the context emitter so regular Agent.start / tool calls / LLM + # responses are captured as well. Without this, typical single-agent + # flows produce an empty trace (no agent_start/end, no tool events). + def warn_handler(msg: str): + if verbose: + typer.echo(f"Warning: {msg}", err=True) + + LangextractSink.bridge_context_events( + sink=sink, + session_id="praisonai-cli", + warn_callback=warn_handler + ) + except ImportError: # Gracefully degrade if langextract not installed if verbose: diff --git a/src/praisonai/praisonai/cli/commands/langextract.py b/src/praisonai/praisonai/cli/commands/langextract.py index 57925098f..66663880a 100644 --- a/src/praisonai/praisonai/cli/commands/langextract.py +++ b/src/praisonai/praisonai/cli/commands/langextract.py @@ -79,6 +79,20 @@ def render( # Set up trace emitter for the duration of the run emitter = TraceEmitter(sink=sink, enabled=True) set_default_emitter(emitter) + + # Also bridge the context emitter so real agent runtime events + # (agent_start/end, tool_call_*, llm_response) are captured. + from praisonai.observability.langextract import LangextractSink + + def warn_handler(msg: str): + # Warn user about bridge failure since this command specifically generates traces + typer.echo(f"Warning: {msg}", err=True) + + LangextractSink.bridge_context_events( + sink=sink, + session_id="praisonai-langextract-render", + warn_callback=warn_handler + ) try: # Run the workflow diff --git a/src/praisonai/praisonai/observability/langextract.py b/src/praisonai/praisonai/observability/langextract.py index 9f43f79db..bf8ddc423 100644 --- a/src/praisonai/praisonai/observability/langextract.py +++ b/src/praisonai/praisonai/observability/langextract.py @@ -26,6 +26,88 @@ ) +class _ContextToActionBridge: + """ + Adapter that implements ``ContextTraceSinkProtocol`` and forwards + ``ContextEvent``s to a ``LangextractSink`` as equivalent ``ActionEvent``s. + + The base agent runtime (``chat_mixin``, ``tool_execution``, + ``unified_execution_mixin``) emits lifecycle events via + ``ContextTraceEmitter`` only. This bridge lets the langextract sink + observe those events without touching the core SDK. + """ + + __slots__ = ("_sink",) + + # Subset of ContextEventType values we care about (strings to avoid + # importing ContextEventType at module load time). + _CTX_AGENT_START = "agent_start" + _CTX_AGENT_END = "agent_end" + _CTX_TOOL_START = "tool_call_start" + _CTX_TOOL_END = "tool_call_end" + _CTX_LLM_RESPONSE = "llm_response" + + def __init__(self, sink: "LangextractSink") -> None: + self._sink = sink + + def emit(self, event: Any) -> None: # ContextEvent duck-typed + et = getattr(event, "event_type", None) + et_value = et.value if hasattr(et, "value") else et + data = getattr(event, "data", {}) or {} + ts = getattr(event, "timestamp", 0.0) + agent = getattr(event, "agent_name", None) + + if et_value == self._CTX_AGENT_START: + self._sink.emit(ActionEvent( + event_type=ActionEventType.AGENT_START.value, + timestamp=ts, + agent_name=agent, + metadata={"input": data.get("input") or data.get("goal") or ""}, + )) + elif et_value == self._CTX_AGENT_END: + self._sink.emit(ActionEvent( + event_type=ActionEventType.AGENT_END.value, + timestamp=ts, + agent_name=agent, + status="ok", + )) + elif et_value == self._CTX_TOOL_START: + self._sink.emit(ActionEvent( + event_type=ActionEventType.TOOL_START.value, + timestamp=ts, + agent_name=agent, + tool_name=data.get("tool_name"), + tool_args=data.get("arguments"), + )) + elif et_value == self._CTX_TOOL_END: + self._sink.emit(ActionEvent( + event_type=ActionEventType.TOOL_END.value, + timestamp=ts, + agent_name=agent, + tool_name=data.get("tool_name"), + duration_ms=(data.get("duration_ms") or 0.0), + status=data.get("status") or "ok", + tool_result_summary=str(data.get("result"))[:500] if data.get("result") is not None else None, + )) + elif et_value == self._CTX_LLM_RESPONSE: + # Treat LLM response as an OUTPUT event so the final text shows + # up in the rendered HTML. + content = data.get("response_content") or data.get("content") or "" + self._sink.emit(ActionEvent( + event_type=ActionEventType.OUTPUT.value, + timestamp=ts, + agent_name=agent, + tool_result_summary=content, + )) + + def flush(self) -> None: + pass + + def close(self) -> None: + # The owning sink handles render/close — nothing to do here. + pass + + @dataclass class LangextractSinkConfig: """Configuration for the langextract trace sink.""" @@ -61,6 +143,46 @@ def __init__(self, config: Optional[LangextractSinkConfig] = None) -> None: self._source_text: Optional[str] = None self._closed = False + # ---- Context-emitter bridge ------------------------------------------- + + def context_sink(self) -> "_ContextToActionBridge": + """ + Return a ``ContextTraceSinkProtocol`` adapter that forwards core + ``ContextEvent``s into this sink as ``ActionEvent``s. Use with + ``praisonaiagents.trace.context_events.set_context_emitter`` (or + ``trace_context``) to capture real agent runtime events. + """ + return _ContextToActionBridge(self) + + @staticmethod + def bridge_context_events(sink: "LangextractSink", session_id: str, warn_callback=None) -> None: + """ + Helper method to set up context event bridging for the given sink. + + Args: + sink: LangextractSink instance to bridge + session_id: Session ID for the context emitter + warn_callback: Optional callback function for warnings, called with message string + """ + try: + from praisonaiagents.trace.context_events import ( + ContextTraceEmitter, + set_context_emitter, + ) + context_emitter = ContextTraceEmitter( + sink=sink.context_sink(), + session_id=session_id, + enabled=True, + ) + set_context_emitter(context_emitter) + except ImportError: + # Context emitter bridging is optional if not available + if warn_callback: + warn_callback("ContextTraceEmitter not available") + except Exception as e: + if warn_callback: + warn_callback(f"could not bridge context emitter: {e}") + # ---- TraceSinkProtocol ------------------------------------------------- def emit(self, event: ActionEvent) -> None: diff --git a/src/praisonai/tests/unit/test_langextract_sink.py b/src/praisonai/tests/unit/test_langextract_sink.py index d95b41803..c9f69e573 100644 --- a/src/praisonai/tests/unit/test_langextract_sink.py +++ b/src/praisonai/tests/unit/test_langextract_sink.py @@ -342,5 +342,101 @@ def test_observe_invalid_provider_error(self): ) +class TestLangextractContextBridge: + """Regression tests for the ContextTraceEmitter bridge. + + The base agent runtime (chat_mixin, tool_execution, unified_execution_mixin) + emits ``ContextEvent``s only. Without the bridge, a single-agent run + produces zero events in the langextract sink. + """ + + def test_context_sink_returns_bridge(self): + from praisonai.observability import LangextractSink + sink = LangextractSink() + bridge = sink.context_sink() + assert hasattr(bridge, "emit") + assert hasattr(bridge, "flush") + assert hasattr(bridge, "close") + + def test_bridge_maps_context_events_to_action_events(self): + from praisonai.observability import LangextractSink + from praisonaiagents.trace.context_events import ( + ContextEvent, + ContextEventType, + ) + + sink = LangextractSink() + bridge = sink.context_sink() + + bridge.emit(ContextEvent( + event_type=ContextEventType.AGENT_START, + timestamp=1.0, session_id="s", + agent_name="writer", + data={"input": "Write a haiku"}, + )) + bridge.emit(ContextEvent( + event_type=ContextEventType.TOOL_CALL_START, + timestamp=2.0, session_id="s", + agent_name="writer", + data={"tool_name": "search", "arguments": {"q": "x"}}, + )) + bridge.emit(ContextEvent( + event_type=ContextEventType.TOOL_CALL_END, + timestamp=3.0, session_id="s", + agent_name="writer", + data={"tool_name": "search", "result": "ok", "duration_ms": 12.0}, + )) + bridge.emit(ContextEvent( + event_type=ContextEventType.LLM_RESPONSE, + timestamp=4.0, session_id="s", + agent_name="writer", + data={"response_content": "final haiku"}, + )) + bridge.emit(ContextEvent( + event_type=ContextEventType.AGENT_END, + timestamp=5.0, session_id="s", + agent_name="writer", + data={}, + )) + + types = [e.event_type for e in sink._events] + assert "agent_start" in types + assert "tool_start" in types + assert "tool_end" in types + assert "output" in types + assert "agent_end" in types + assert sink._source_text == "Write a haiku" + + def test_setup_observability_registers_context_emitter(self): + """`--observe langextract` must install the bridge on the context emitter.""" + import praisonai.cli.app as cli_app + from praisonaiagents.trace.context_events import get_context_emitter, set_context_emitter + + previous_emitter = get_context_emitter() + try: + # Make test deterministic even when optional dependency is not installed. + with patch("importlib.util.find_spec", return_value=object()), \ + patch("atexit.register"): + cli_app._setup_langextract_observability(verbose=False) + emitter = get_context_emitter() + assert emitter.enabled, "context emitter should be enabled after setup" + finally: + set_context_emitter(previous_emitter) + assert get_context_emitter() is previous_emitter + + def test_setup_observability_without_langextract_leaves_context_emitter_unchanged(self): + """Setup should be a no-op when optional langextract dependency is unavailable.""" + import praisonai.cli.app as cli_app + from praisonaiagents.trace.context_events import get_context_emitter, set_context_emitter + + previous_emitter = get_context_emitter() + try: + with patch("importlib.util.find_spec", return_value=None): + cli_app._setup_langextract_observability(verbose=False) + assert get_context_emitter() is previous_emitter + finally: + set_context_emitter(previous_emitter) + + if __name__ == "__main__": pytest.main([__file__, "-v"])