diff --git a/examples/observability/langfuse_example.py b/examples/observability/langfuse_example.py index c1186600b..12924bbb4 100644 --- a/examples/observability/langfuse_example.py +++ b/examples/observability/langfuse_example.py @@ -1,48 +1,40 @@ """ -Langfuse Integration Example +Langfuse Integration Example (Updated for TraceSinkProtocol) -This example shows how to use Langfuse for LLM observability. +This example shows how to use Langfuse for LLM observability with PraisonAI's native trace infrastructure. Setup: - 1. Sign up at https://langfuse.com/ - 2. Get your API keys from the project settings - 3. Set environment variables: - export LANGFUSE_PUBLIC_KEY=pk-lf-xxx - export LANGFUSE_SECRET_KEY=sk-lf-xxx - 4. Install dependencies: - pip install opentelemetry-sdk opentelemetry-exporter-otlp + pip install "praisonai[langfuse]" + export LANGFUSE_PUBLIC_KEY=pk-lf-xxx + export LANGFUSE_SECRET_KEY=sk-lf-xxx Usage: python langfuse_example.py """ - -import os -from praisonai_tools.observability import obs from praisonaiagents import Agent - -# Initialize Langfuse -success = obs.init( - provider="langfuse", - project_name="praisonai-demo", +from praisonai.observability import LangfuseSink +from praisonaiagents.trace.protocol import ( + TraceEmitter, set_default_emitter ) -if not success: - print("Failed to initialize Langfuse. Check your API keys.") - print("Required: LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY") - exit(1) - -print("Langfuse initialized successfully!") -print(f"View traces at: https://cloud.langfuse.com/") +# Initialize Langfuse observability +sink = LangfuseSink() +emitter = TraceEmitter(sink=sink, enabled=True) +set_default_emitter(emitter) -# Create agent +# Create and run agent — all traces automatically captured agent = Agent( + name="Coder", instructions="You are a helpful coding assistant.", - model="gpt-4o-mini", + llm="openai/gpt-4o-mini", ) -# Run with tracing -with obs.trace("coding-session", user_id="developer-1"): - response = agent.chat("Write a Python function to calculate fibonacci numbers") - print(response) +try: + result = agent.start("Write a Python function to calculate fibonacci numbers") + print(result) +finally: + # Ensure traces are flushed and resources cleaned up + sink.flush() + sink.close() print("\nCheck Langfuse dashboard for traces!") diff --git a/src/praisonai/praisonai/cli/app.py b/src/praisonai/praisonai/cli/app.py index b46324a36..0e242a4a3 100644 --- a/src/praisonai/praisonai/cli/app.py +++ b/src/praisonai/praisonai/cli/app.py @@ -13,6 +13,28 @@ from .state.identifiers import create_context +def _setup_langfuse_observability(*, verbose: bool = False) -> None: + """Set up Langfuse observability by wiring TraceSink to action emitter.""" + try: + from praisonai.observability.langfuse import LangfuseSink + from praisonaiagents.trace.protocol import TraceEmitter, set_default_emitter + + # Create LangfuseSink (auto-reads env vars) + sink = LangfuseSink() + + # Set up action-level trace emitter (sufficient for Phase 1) + emitter = TraceEmitter(sink=sink, enabled=True) + set_default_emitter(emitter) + + except ImportError: + # Gracefully degrade if Langfuse not installed + pass + except Exception as e: + # Avoid breaking CLI if observability setup fails + if verbose: + typer.echo(f"Warning: failed to initialize Langfuse observability: {e}", err=True) + + class OutputFormat(str, Enum): """Output format options.""" text = "text" @@ -38,6 +60,7 @@ class GlobalState: quiet: bool = False verbose: bool = False screen_reader: bool = False + observe: Optional[str] = None output_controller: Optional[OutputController] = None @@ -98,6 +121,13 @@ def main_callback( "--screen-reader", help="Screen reader friendly output (no spinners/panels)", ), + observe: Optional[str] = typer.Option( + None, + "--observe", + "-O", + help="Enable observability (langfuse, langsmith, etc.)", + envvar="PRAISONAI_OBSERVE", + ), ): """ PraisonAI - AI Agents Framework CLI. @@ -110,11 +140,18 @@ def main_callback( state.quiet = quiet state.verbose = verbose state.screen_reader = screen_reader + state.observe = observe # Handle --json alias if json_output: state.output_format = OutputFormat.json + # Validate and set up observability if requested + if observe: + if observe != "langfuse": + raise typer.BadParameter(f"Unsupported observe provider: {observe}") + _setup_langfuse_observability(verbose=verbose) + # Determine output mode if state.quiet: mode = OutputMode.QUIET diff --git a/src/praisonai/praisonai/flow/components/PraisonAI/praisonai_agent.py b/src/praisonai/praisonai/flow/components/PraisonAI/praisonai_agent.py index f1c84f3bb..77e3a7ecb 100644 --- a/src/praisonai/praisonai/flow/components/PraisonAI/praisonai_agent.py +++ b/src/praisonai/praisonai/flow/components/PraisonAI/praisonai_agent.py @@ -405,6 +405,9 @@ def build_agent(self) -> Any: def build_response(self) -> Message: """Execute the agent and return the response as a Message.""" agent = self.build_agent() + + # Wire up observability if configured + self._setup_observability() # Get input value input_value = self.input_value @@ -434,3 +437,9 @@ def _get_llm(self) -> str: if converted: return converted return self.llm + + def _setup_observability(self) -> None: + """Auto-configure observability from environment variables.""" + from praisonai.flow.helpers import setup_langfuse_context_observability + + setup_langfuse_context_observability() diff --git a/src/praisonai/praisonai/flow/components/PraisonAI/praisonai_agents.py b/src/praisonai/praisonai/flow/components/PraisonAI/praisonai_agents.py index bb0c8e7e5..68b63161e 100644 --- a/src/praisonai/praisonai/flow/components/PraisonAI/praisonai_agents.py +++ b/src/praisonai/praisonai/flow/components/PraisonAI/praisonai_agents.py @@ -307,6 +307,9 @@ def build_agents(self) -> Any: def build_response(self) -> Message: """Execute the multi-agent workflow and return the response.""" agents_instance = self.build_agents() + + # Wire up observability if configured + self._setup_observability() # Get input value input_value = self.input_value @@ -326,3 +329,9 @@ def build_response(self) -> Message: async def build_response_async(self) -> Message: """Execute the multi-agent workflow asynchronously.""" return await asyncio.to_thread(self.build_response) + + def _setup_observability(self) -> None: + """Auto-configure observability from environment variables.""" + from praisonai.flow.helpers import setup_langfuse_context_observability + + setup_langfuse_context_observability() diff --git a/src/praisonai/praisonai/flow/helpers.py b/src/praisonai/praisonai/flow/helpers.py index f3acfcb6b..c61d79c7c 100644 --- a/src/praisonai/praisonai/flow/helpers.py +++ b/src/praisonai/praisonai/flow/helpers.py @@ -6,10 +6,17 @@ from __future__ import annotations +import threading from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from collections.abc import Callable + from praisonaiagents.trace.context_events import ContextTraceEmitter +else: + ContextTraceEmitter = Any + +_LANGFUSE_CONTEXT_EMITTER: ContextTraceEmitter | None = None +_LANGFUSE_OBS_LOCK = threading.Lock() def convert_tools(tools: list | None) -> list[Callable] | None: @@ -128,3 +135,39 @@ def build_memory_config( return { "provider": memory_provider, } + + +def setup_langfuse_context_observability() -> None: + """Set up Langfuse context observability once per process.""" + import os + + if os.environ.get("PRAISONAI_OBSERVE", "") != "langfuse": + return + + try: + from praisonai.observability.langfuse import LangfuseSink + from praisonaiagents.trace.context_events import ContextTraceEmitter, set_context_emitter + except ImportError: + return + + global _LANGFUSE_CONTEXT_EMITTER + + with _LANGFUSE_OBS_LOCK: + if _LANGFUSE_CONTEXT_EMITTER is None: + sink = LangfuseSink() + _LANGFUSE_CONTEXT_EMITTER = ContextTraceEmitter(sink=sink, enabled=True) + + set_context_emitter(_LANGFUSE_CONTEXT_EMITTER) + + +def get_langfuse_context_emitter() -> ContextTraceEmitter | None: + """Return the cached Langfuse context emitter.""" + with _LANGFUSE_OBS_LOCK: + return _LANGFUSE_CONTEXT_EMITTER + + +def reset_langfuse_context_observability_for_tests() -> None: + """Reset cached Langfuse context emitter (for tests).""" + global _LANGFUSE_CONTEXT_EMITTER + with _LANGFUSE_OBS_LOCK: + _LANGFUSE_CONTEXT_EMITTER = None diff --git a/src/praisonai/tests/unit/test_observability_setup.py b/src/praisonai/tests/unit/test_observability_setup.py new file mode 100644 index 000000000..e22ec4795 --- /dev/null +++ b/src/praisonai/tests/unit/test_observability_setup.py @@ -0,0 +1,67 @@ +import sys +import types + +from praisonai.cli.app import _setup_langfuse_observability +from praisonai.flow import helpers as flow_helpers + + +def test_flow_langfuse_context_observability_reuses_single_emitter(monkeypatch): + monkeypatch.setenv("PRAISONAI_OBSERVE", "langfuse") + flow_helpers.reset_langfuse_context_observability_for_tests() + + sink_instances = [] + set_calls = [] + + class FakeSink: + def __init__(self): + sink_instances.append(self) + + class FakeContextTraceEmitter: + def __init__(self, sink, enabled): + self.sink = sink + self.enabled = enabled + + fake_langfuse_module = types.ModuleType("praisonai.observability.langfuse") + fake_langfuse_module.LangfuseSink = FakeSink + + fake_context_events_module = types.ModuleType("praisonaiagents.trace.context_events") + fake_context_events_module.ContextTraceEmitter = FakeContextTraceEmitter + fake_context_events_module.set_context_emitter = set_calls.append + + monkeypatch.setitem(sys.modules, "praisonai.observability.langfuse", fake_langfuse_module) + monkeypatch.setitem(sys.modules, "praisonaiagents.trace.context_events", fake_context_events_module) + + flow_helpers.setup_langfuse_context_observability() + flow_helpers.setup_langfuse_context_observability() + + assert len(sink_instances) == 1 + assert len(set_calls) == 2 + assert set_calls[0] is set_calls[1] + assert set_calls[0] is flow_helpers.get_langfuse_context_emitter() + + +def test_setup_langfuse_observability_verbose_logs_warning(monkeypatch, capsys): + class FakeSink: + def __init__(self): + raise RuntimeError("boom") + + fake_langfuse_module = types.ModuleType("praisonai.observability.langfuse") + fake_langfuse_module.LangfuseSink = FakeSink + + fake_protocol_module = types.ModuleType("praisonaiagents.trace.protocol") + fake_protocol_module.TraceEmitter = object + fake_protocol_module.set_default_emitter = lambda *_args, **_kwargs: None + + fake_context_events_module = types.ModuleType("praisonaiagents.trace.context_events") + fake_context_events_module.ContextTraceEmitter = object + fake_context_events_module.set_context_emitter = lambda *_args, **_kwargs: None + + monkeypatch.setitem(sys.modules, "praisonai.observability.langfuse", fake_langfuse_module) + monkeypatch.setitem(sys.modules, "praisonaiagents.trace.protocol", fake_protocol_module) + monkeypatch.setitem(sys.modules, "praisonaiagents.trace.context_events", fake_context_events_module) + + _setup_langfuse_observability(verbose=True) + + captured = capsys.readouterr() + assert "failed to initialize Langfuse observability" in captured.err + assert "boom" in captured.err