Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 22 additions & 30 deletions examples/observability/langfuse_example.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,40 @@
"""
Langfuse Integration Example
Langfuse Integration Example (Updated for TraceSinkProtocol)

This example shows how to use Langfuse for LLM observability.
This example shows how to use Langfuse for LLM observability with PraisonAI's native trace infrastructure.

Setup:
1. Sign up at https://langfuse.com/
2. Get your API keys from the project settings
3. Set environment variables:
export LANGFUSE_PUBLIC_KEY=pk-lf-xxx
export LANGFUSE_SECRET_KEY=sk-lf-xxx
4. Install dependencies:
pip install opentelemetry-sdk opentelemetry-exporter-otlp
pip install "praisonai[langfuse]"
export LANGFUSE_PUBLIC_KEY=pk-lf-xxx
export LANGFUSE_SECRET_KEY=sk-lf-xxx

Usage:
python langfuse_example.py
"""

import os
from praisonai_tools.observability import obs
from praisonaiagents import Agent

# Initialize Langfuse
success = obs.init(
provider="langfuse",
project_name="praisonai-demo",
from praisonai.observability import LangfuseSink
from praisonaiagents.trace.protocol import (
TraceEmitter, set_default_emitter
)

if not success:
print("Failed to initialize Langfuse. Check your API keys.")
print("Required: LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY")
exit(1)

print("Langfuse initialized successfully!")
print(f"View traces at: https://cloud.langfuse.com/")
# Initialize Langfuse observability
sink = LangfuseSink()
emitter = TraceEmitter(sink=sink, enabled=True)
set_default_emitter(emitter)

# Create agent
# Create and run agent — all traces automatically captured
agent = Agent(
name="Coder",
instructions="You are a helpful coding assistant.",
model="gpt-4o-mini",
llm="openai/gpt-4o-mini",
)

# Run with tracing
with obs.trace("coding-session", user_id="developer-1"):
response = agent.chat("Write a Python function to calculate fibonacci numbers")
print(response)
try:
result = agent.start("Write a Python function to calculate fibonacci numbers")
print(result)
finally:
# Ensure traces are flushed and resources cleaned up
sink.flush()
sink.close()

print("\nCheck Langfuse dashboard for traces!")
37 changes: 37 additions & 0 deletions src/praisonai/praisonai/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,28 @@
from .state.identifiers import create_context


def _setup_langfuse_observability(*, verbose: bool = False) -> None:
"""Set up Langfuse observability by wiring TraceSink to action emitter."""
try:
from praisonai.observability.langfuse import LangfuseSink
from praisonaiagents.trace.protocol import TraceEmitter, set_default_emitter

# Create LangfuseSink (auto-reads env vars)
sink = LangfuseSink()

# Set up action-level trace emitter (sufficient for Phase 1)
emitter = TraceEmitter(sink=sink, enabled=True)
set_default_emitter(emitter)

except ImportError:
# Gracefully degrade if Langfuse not installed
pass
except Exception as e:
# Avoid breaking CLI if observability setup fails
if verbose:
typer.echo(f"Warning: failed to initialize Langfuse observability: {e}", err=True)


class OutputFormat(str, Enum):
"""Output format options."""
text = "text"
Expand All @@ -38,6 +60,7 @@ class GlobalState:
quiet: bool = False
verbose: bool = False
screen_reader: bool = False
observe: Optional[str] = None
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new global field state.observe is introduced, but the unit-test fixture that resets global CLI state (src/praisonai/tests/unit/cli/test_typer_cli.py) doesn’t reset this field. That can lead to state leaking between CLI tests. Please update the fixture to also restore state.observe to its default value.

Copilot uses AI. Check for mistakes.
output_controller: Optional[OutputController] = None


Expand Down Expand Up @@ -98,6 +121,13 @@ def main_callback(
"--screen-reader",
help="Screen reader friendly output (no spinners/panels)",
),
observe: Optional[str] = typer.Option(
None,
"--observe",
"-O",
help="Enable observability (langfuse, langsmith, etc.)",
envvar="PRAISONAI_OBSERVE",
),
):
"""
PraisonAI - AI Agents Framework CLI.
Expand All @@ -110,11 +140,18 @@ def main_callback(
state.quiet = quiet
state.verbose = verbose
state.screen_reader = screen_reader
state.observe = observe

# Handle --json alias
if json_output:
state.output_format = OutputFormat.json

# Validate and set up observability if requested
if observe:
if observe != "langfuse":
raise typer.BadParameter(f"Unsupported observe provider: {observe}")
_setup_langfuse_observability(verbose=verbose)

# Determine output mode
if state.quiet:
mode = OutputMode.QUIET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ def build_agent(self) -> Any:
def build_response(self) -> Message:
"""Execute the agent and return the response as a Message."""
agent = self.build_agent()

# Wire up observability if configured
self._setup_observability()

# Get input value
input_value = self.input_value
Expand Down Expand Up @@ -434,3 +437,9 @@ def _get_llm(self) -> str:
if converted:
return converted
return self.llm

def _setup_observability(self) -> None:
"""Auto-configure observability from environment variables."""
from praisonai.flow.helpers import setup_langfuse_context_observability

setup_langfuse_context_observability()
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ def build_agents(self) -> Any:
def build_response(self) -> Message:
"""Execute the multi-agent workflow and return the response."""
agents_instance = self.build_agents()

# Wire up observability if configured
self._setup_observability()

# Get input value
input_value = self.input_value
Expand All @@ -326,3 +329,9 @@ def build_response(self) -> Message:
async def build_response_async(self) -> Message:
"""Execute the multi-agent workflow asynchronously."""
return await asyncio.to_thread(self.build_response)

def _setup_observability(self) -> None:
"""Auto-configure observability from environment variables."""
from praisonai.flow.helpers import setup_langfuse_context_observability

setup_langfuse_context_observability()
43 changes: 43 additions & 0 deletions src/praisonai/praisonai/flow/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@

from __future__ import annotations

import threading
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from collections.abc import Callable
from praisonaiagents.trace.context_events import ContextTraceEmitter
else:
ContextTraceEmitter = Any

_LANGFUSE_CONTEXT_EMITTER: ContextTraceEmitter | None = None
_LANGFUSE_OBS_LOCK = threading.Lock()


def convert_tools(tools: list | None) -> list[Callable] | None:
Expand Down Expand Up @@ -128,3 +135,39 @@ def build_memory_config(
return {
"provider": memory_provider,
}


def setup_langfuse_context_observability() -> None:
"""Set up Langfuse context observability once per process."""
import os

if os.environ.get("PRAISONAI_OBSERVE", "") != "langfuse":
return

try:
from praisonai.observability.langfuse import LangfuseSink
from praisonaiagents.trace.context_events import ContextTraceEmitter, set_context_emitter
except ImportError:
return

global _LANGFUSE_CONTEXT_EMITTER

with _LANGFUSE_OBS_LOCK:
if _LANGFUSE_CONTEXT_EMITTER is None:
sink = LangfuseSink()
_LANGFUSE_CONTEXT_EMITTER = ContextTraceEmitter(sink=sink, enabled=True)

set_context_emitter(_LANGFUSE_CONTEXT_EMITTER)


def get_langfuse_context_emitter() -> ContextTraceEmitter | None:
"""Return the cached Langfuse context emitter."""
with _LANGFUSE_OBS_LOCK:
return _LANGFUSE_CONTEXT_EMITTER


def reset_langfuse_context_observability_for_tests() -> None:
"""Reset cached Langfuse context emitter (for tests)."""
global _LANGFUSE_CONTEXT_EMITTER
with _LANGFUSE_OBS_LOCK:
_LANGFUSE_CONTEXT_EMITTER = None
67 changes: 67 additions & 0 deletions src/praisonai/tests/unit/test_observability_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import sys
import types

from praisonai.cli.app import _setup_langfuse_observability
from praisonai.flow import helpers as flow_helpers


def test_flow_langfuse_context_observability_reuses_single_emitter(monkeypatch):
monkeypatch.setenv("PRAISONAI_OBSERVE", "langfuse")
flow_helpers.reset_langfuse_context_observability_for_tests()

sink_instances = []
set_calls = []

class FakeSink:
def __init__(self):
sink_instances.append(self)

class FakeContextTraceEmitter:
def __init__(self, sink, enabled):
self.sink = sink
self.enabled = enabled

fake_langfuse_module = types.ModuleType("praisonai.observability.langfuse")
fake_langfuse_module.LangfuseSink = FakeSink

fake_context_events_module = types.ModuleType("praisonaiagents.trace.context_events")
fake_context_events_module.ContextTraceEmitter = FakeContextTraceEmitter
fake_context_events_module.set_context_emitter = set_calls.append

monkeypatch.setitem(sys.modules, "praisonai.observability.langfuse", fake_langfuse_module)
monkeypatch.setitem(sys.modules, "praisonaiagents.trace.context_events", fake_context_events_module)

flow_helpers.setup_langfuse_context_observability()
flow_helpers.setup_langfuse_context_observability()

assert len(sink_instances) == 1
assert len(set_calls) == 2
assert set_calls[0] is set_calls[1]
assert set_calls[0] is flow_helpers.get_langfuse_context_emitter()


def test_setup_langfuse_observability_verbose_logs_warning(monkeypatch, capsys):
class FakeSink:
def __init__(self):
raise RuntimeError("boom")

fake_langfuse_module = types.ModuleType("praisonai.observability.langfuse")
fake_langfuse_module.LangfuseSink = FakeSink

fake_protocol_module = types.ModuleType("praisonaiagents.trace.protocol")
fake_protocol_module.TraceEmitter = object
fake_protocol_module.set_default_emitter = lambda *_args, **_kwargs: None

fake_context_events_module = types.ModuleType("praisonaiagents.trace.context_events")
fake_context_events_module.ContextTraceEmitter = object
fake_context_events_module.set_context_emitter = lambda *_args, **_kwargs: None

monkeypatch.setitem(sys.modules, "praisonai.observability.langfuse", fake_langfuse_module)
monkeypatch.setitem(sys.modules, "praisonaiagents.trace.protocol", fake_protocol_module)
monkeypatch.setitem(sys.modules, "praisonaiagents.trace.context_events", fake_context_events_module)

_setup_langfuse_observability(verbose=True)

captured = capsys.readouterr()
assert "failed to initialize Langfuse observability" in captured.err
assert "boom" in captured.err