Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions backend/apps/monitoring_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@

import logging
from http import HTTPStatus
from typing import Annotated, Optional
from typing import Annotated, Any

from fastapi import APIRouter, Header, HTTPException, Query
from sqlalchemy import text

from consts.const import (
ENABLE_TELEMETRY,
GRAFANA_PORT,
LANGFUSE_PORT,
MONITORING_PROVIDER,
PHOENIX_PORT,
SKYWALKING_UI_PORT,
)
from consts.model import ConversationResponse
from database.client import get_monitoring_db_session
from utils.auth_utils import get_current_user_id
Expand All @@ -21,19 +29,57 @@
router = APIRouter(prefix="/monitoring")


def _normalize_monitoring_provider(value: str | None) -> str:
return str(value or "otlp").strip().lower()


def _build_monitoring_ui(
provider: str,
) -> tuple[str | None, str | None, str | None]:
"""Map MONITORING_PROVIDER to a monitoring UI port and path."""
if provider == "grafana":
path = "/d/nexent-llm-agent/nexent-agent-trace-monitoring?orgId=1"
return GRAFANA_PORT, path, "Grafana"
if provider == "phoenix":
return PHOENIX_PORT, "/", "Phoenix"
if provider == "langfuse":
return LANGFUSE_PORT, "/project/nexent", "Langfuse"
if provider == "langsmith":
return None, None, "LangSmith"
if provider == "skywalking":
return SKYWALKING_UI_PORT, "/", "SkyWalking"
return None, None, None


def get_monitoring_status() -> dict[str, Any]:
"""Return telemetry state and the monitoring UI entrypoint for frontend use."""
telemetry_enabled = ENABLE_TELEMETRY
provider = _normalize_monitoring_provider(MONITORING_PROVIDER)
dashboard_port, dashboard_path, provider_name = _build_monitoring_ui(provider)

return {
"telemetry_enabled": telemetry_enabled,
"provider": provider,
"provider_name": provider_name,
"ui_enabled": telemetry_enabled and bool(dashboard_port),
"dashboard_port": dashboard_port,
"dashboard_path": dashboard_path,
}


def _compute_time_range_filter(time_range: str) -> str:
"""Convert time_range parameter to SQL timestamp condition."""
hours = {"24h": 24, "7d": 168, "30d": 720}.get(time_range, 24)
return f"m.create_time >= NOW() - INTERVAL '{hours} hours'"


def _query_model_metrics_from_db(
time_range: str, tenant_id: Optional[str] = None
) -> list[dict]:
time_range: str, tenant_id: str | None = None
) -> list[dict[str, Any]]:
time_filter = _compute_time_range_filter(time_range)

tenant_filter = ""
params = {}
params: dict[str, str] = {}
if tenant_id:
tenant_filter = "AND m.tenant_id = :tenant_id"
params["tenant_id"] = tenant_id
Expand Down Expand Up @@ -96,7 +142,7 @@ async def list_models_endpoint(
page: Annotated[int, Query(ge=1, description="Page number")] = 1,
page_size: Annotated[int, Query(
ge=1, le=100, description="Items per page")] = 20,
authorization: Annotated[Optional[str], Header()] = None,
authorization: Annotated[str | None, Header()] = None,
):
"""List all models with aggregated monitoring metrics from database."""
try:
Expand All @@ -113,3 +159,13 @@ async def list_models_endpoint(
logger.error(f"Failed to list monitoring models: {str(e)}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))


@router.get("/status", response_model=ConversationResponse)
async def get_monitoring_status_endpoint():
"""Return whether monitoring UI should be shown in the frontend."""
return ConversationResponse(
code=0,
message="success",
data=get_monitoring_status(),
)
75 changes: 62 additions & 13 deletions backend/consts/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,19 +316,68 @@ class VectorDatabaseType(str, Enum):
THINK_END_PATTERN = "</think>"


# Telemetry and Monitoring Configuration
ENABLE_TELEMETRY = os.getenv("ENABLE_TELEMETRY", "false").lower() == "true"
SERVICE_NAME = os.getenv("SERVICE_NAME", "nexent-backend")
JAEGER_ENDPOINT = os.getenv(
"JAEGER_ENDPOINT", "http://localhost:14268/api/traces")
PROMETHEUS_PORT = int(os.getenv("PROMETHEUS_PORT", "8000"))
TELEMETRY_SAMPLE_RATE = float(os.getenv("TELEMETRY_SAMPLE_RATE", "1.0"))

# Performance monitoring thresholds
LLM_SLOW_REQUEST_THRESHOLD_SECONDS = float(
os.getenv("LLM_SLOW_REQUEST_THRESHOLD_SECONDS", "5.0"))
LLM_SLOW_TOKEN_RATE_THRESHOLD = float(
os.getenv("LLM_SLOW_TOKEN_RATE_THRESHOLD", "10.0")) # tokens per second
# Telemetry and Monitoring Configuration (OTLP Protocol)
MONITORING_PROVIDER = os.getenv("MONITORING_PROVIDER", "")
ENABLE_TELEMETRY_RAW = os.getenv("ENABLE_TELEMETRY")
ENABLE_TELEMETRY = (ENABLE_TELEMETRY_RAW or "false").lower() == "true"
OTEL_SERVICE_NAME_RAW = os.getenv("OTEL_SERVICE_NAME")
OTEL_SERVICE_NAME = OTEL_SERVICE_NAME_RAW or "nexent-backend"
OTEL_EXPORTER_OTLP_ENDPOINT_RAW = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
OTEL_EXPORTER_OTLP_ENDPOINT = OTEL_EXPORTER_OTLP_ENDPOINT_RAW or "http://localhost:4318"
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "")
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", "")
OTEL_EXPORTER_OTLP_PROTOCOL_RAW = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL")
OTEL_EXPORTER_OTLP_PROTOCOL = OTEL_EXPORTER_OTLP_PROTOCOL_RAW or "http"
OTEL_EXPORTER_OTLP_HEADERS_RAW = os.getenv("OTEL_EXPORTER_OTLP_HEADERS")
OTEL_EXPORTER_OTLP_HEADERS = OTEL_EXPORTER_OTLP_HEADERS_RAW or ""
OTEL_EXPORTER_OTLP_AUTHORIZATION = os.getenv("OTEL_EXPORTER_OTLP_AUTHORIZATION", "")
OTEL_EXPORTER_OTLP_X_API_KEY = os.getenv("OTEL_EXPORTER_OTLP_X_API_KEY", "")
OTEL_EXPORTER_OTLP_LANGFUSE_INGESTION_VERSION = os.getenv(
"OTEL_EXPORTER_OTLP_LANGFUSE_INGESTION_VERSION", "")
LANGSMITH_API_KEY = os.getenv("LANGSMITH_API_KEY", "")
LANGSMITH_PROJECT = os.getenv("LANGSMITH_PROJECT", "")
OTEL_EXPORTER_OTLP_METRICS_ENABLED_RAW = os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENABLED")
OTEL_EXPORTER_OTLP_METRICS_ENABLED = (
OTEL_EXPORTER_OTLP_METRICS_ENABLED_RAW or "true").lower() == "true"
MONITORING_INSTRUMENT_FASTAPI_RAW = os.getenv("MONITORING_INSTRUMENT_FASTAPI")
MONITORING_INSTRUMENT_FASTAPI = (
MONITORING_INSTRUMENT_FASTAPI_RAW or "true").lower() == "true"
MONITORING_INSTRUMENT_REQUESTS_RAW = os.getenv("MONITORING_INSTRUMENT_REQUESTS")
MONITORING_INSTRUMENT_REQUESTS = (
MONITORING_INSTRUMENT_REQUESTS_RAW or "false").lower() == "true"
MONITORING_FASTAPI_EXCLUDED_URLS = os.getenv("MONITORING_FASTAPI_EXCLUDED_URLS", "")
MONITORING_FASTAPI_EXCLUDE_SPANS = os.getenv("MONITORING_FASTAPI_EXCLUDE_SPANS", "receive,send")
MONITORING_PROJECT_NAME = os.getenv("MONITORING_PROJECT_NAME", "")
PHOENIX_PORT = os.getenv("PHOENIX_PORT", "6006")
LANGFUSE_PORT = os.getenv("LANGFUSE_PORT", "3001")
GRAFANA_PORT = os.getenv("GRAFANA_PORT", "3002")
SKYWALKING_UI_PORT = os.getenv("SKYWALKING_UI_PORT", "8080")
TELEMETRY_SAMPLE_RATE_RAW = os.getenv("TELEMETRY_SAMPLE_RATE")
TELEMETRY_SAMPLE_RATE = float(TELEMETRY_SAMPLE_RATE_RAW or "1.0")

# Parse OTLP headers into dict format
def _parse_otlp_headers(headers_str: str) -> dict:
"""Parse OTLP headers string into dict. Format: 'key1=value1,key2=value2'"""
if not headers_str:
return {}
headers = {}
for pair in headers_str.split(","):
if "=" in pair:
key, value = pair.split("=", 1)
headers[key.strip()] = value.strip()
return headers

OTLP_HEADERS = _parse_otlp_headers(OTEL_EXPORTER_OTLP_HEADERS)
if OTEL_EXPORTER_OTLP_AUTHORIZATION:
OTLP_HEADERS["Authorization"] = OTEL_EXPORTER_OTLP_AUTHORIZATION
if OTEL_EXPORTER_OTLP_X_API_KEY:
OTLP_HEADERS["x-api-key"] = OTEL_EXPORTER_OTLP_X_API_KEY
elif LANGSMITH_API_KEY:
OTLP_HEADERS["x-api-key"] = LANGSMITH_API_KEY
if LANGSMITH_PROJECT:
OTLP_HEADERS["Langsmith-Project"] = LANGSMITH_PROJECT
if OTEL_EXPORTER_OTLP_LANGFUSE_INGESTION_VERSION:
OTLP_HEADERS["x-langfuse-ingestion-version"] = OTEL_EXPORTER_OTLP_LANGFUSE_INGESTION_VERSION


DEFAULT_ZH_TITLE = "新对话"
Expand Down
30 changes: 29 additions & 1 deletion backend/services/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
from utils.llm_utils import call_llm_for_system_prompt

# Monitoring utilities: expose monitoring context for downstream observers
from nexent.monitor import set_monitoring_context
from nexent.monitor import OPENINFERENCE_SPAN_KIND_CHAIN, set_monitoring_context

# Import monitoring utilities
from utils.monitoring import monitoring_manager
Expand Down Expand Up @@ -1815,7 +1815,7 @@


@monitoring_manager.monitor_endpoint("agent_service.run_agent_stream", exclude_params=["authorization"])
async def run_agent_stream(

Check failure on line 1818 in backend/services/agent_service.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ4bbd53SehsUYuHnVqM&open=AZ4bbd53SehsUYuHnVqM&pullRequest=2969
agent_request: AgentRequest,
http_request: Request,
authorization: str,
Expand Down Expand Up @@ -1875,6 +1875,20 @@
agent_id=agent_request.agent_id,
conversation_id=agent_request.conversation_id,
)
monitoring_manager.set_openinference_agent_context(
agent_id=agent_request.agent_id,
conversation_id=agent_request.conversation_id,
user_id=resolved_user_id,
tenant_id=resolved_tenant_id,
query=agent_request.query,
is_debug=agent_request.is_debug,
extra_metadata={
"language": language,
"history_count": len(agent_request.history) if agent_request.history else 0,
"minio_files_count": len(agent_request.minio_files) if agent_request.minio_files else 0,
},
span_kind=OPENINFERENCE_SPAN_KIND_CHAIN,
)

# Step 2: Save user message (if needed)
if not agent_request.is_debug and not skip_user_save:
Expand Down Expand Up @@ -1912,6 +1926,20 @@

memory_duration = time.time() - memory_start_time
memory_enabled = memory_ctx_preview.user_config.memory_switch
monitoring_manager.set_openinference_agent_context(
agent_id=agent_request.agent_id,
conversation_id=agent_request.conversation_id,
user_id=resolved_user_id,
tenant_id=resolved_tenant_id,
query=agent_request.query,
is_debug=agent_request.is_debug,
memory_enabled=memory_enabled,
extra_metadata={
"language": language,
"agent_share_option": getattr(memory_ctx_preview.user_config, "agent_share_option", "unknown"),
},
span_kind=OPENINFERENCE_SPAN_KIND_CHAIN,
)
monitoring_manager.add_span_event("memory_context_build.completed", {
"duration": memory_duration,
"memory_enabled": memory_enabled,
Expand Down
92 changes: 52 additions & 40 deletions backend/utils/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
Global Monitoring Manager for Backend

This module initializes and configures the global monitoring manager instance
with backend environment variables. All other backend modules should import
`monitoring_manager` directly from this module.
with backend environment variables using OTLP protocol. All other backend modules
should import `monitoring_manager` directly from this module.

Usage:
from utils.monitoring import monitoring_manager

@monitoring_manager.monitor_endpoint("my_service.my_function")
async def my_function():
return {"status": "ok"}
Expand All @@ -17,67 +17,79 @@ async def my_function():
MonitoringConfig,
get_monitoring_manager
)
# Import configuration from backend (support both relative and absolute imports)
try:
# Try relative import first (when running from backend directory)
from consts.const import (
ENABLE_TELEMETRY,
SERVICE_NAME,
JAEGER_ENDPOINT,
PROMETHEUS_PORT,
TELEMETRY_SAMPLE_RATE,
LLM_SLOW_REQUEST_THRESHOLD_SECONDS,
LLM_SLOW_TOKEN_RATE_THRESHOLD
MONITORING_PROVIDER,
MONITORING_PROJECT_NAME,
OTEL_SERVICE_NAME,
OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
OTEL_EXPORTER_OTLP_PROTOCOL,
OTEL_EXPORTER_OTLP_METRICS_ENABLED,
MONITORING_INSTRUMENT_FASTAPI,
MONITORING_INSTRUMENT_REQUESTS,
MONITORING_FASTAPI_EXCLUDED_URLS,
MONITORING_FASTAPI_EXCLUDE_SPANS,
OTLP_HEADERS,
TELEMETRY_SAMPLE_RATE
)
except ImportError:
# Fallback to absolute import (when running from project root)
from backend.consts.const import (
ENABLE_TELEMETRY,
SERVICE_NAME,
JAEGER_ENDPOINT,
PROMETHEUS_PORT,
TELEMETRY_SAMPLE_RATE,
LLM_SLOW_REQUEST_THRESHOLD_SECONDS,
LLM_SLOW_TOKEN_RATE_THRESHOLD
MONITORING_PROVIDER,
MONITORING_PROJECT_NAME,
OTEL_SERVICE_NAME,
OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
OTEL_EXPORTER_OTLP_PROTOCOL,
OTEL_EXPORTER_OTLP_METRICS_ENABLED,
MONITORING_INSTRUMENT_FASTAPI,
MONITORING_INSTRUMENT_REQUESTS,
MONITORING_FASTAPI_EXCLUDED_URLS,
MONITORING_FASTAPI_EXCLUDE_SPANS,
OTLP_HEADERS,
TELEMETRY_SAMPLE_RATE
)

import logging

logger = logging.getLogger(__name__)

# ============================================================================
# Global Monitoring Manager Instance
# ============================================================================

# Get the global monitoring manager instance
monitoring_manager = get_monitoring_manager()

# Initialize monitoring configuration immediately when this module is imported


def _initialize_monitoring():
"""Initialize monitoring configuration with backend environment variables."""
"""Initialize monitoring configuration with OTLP settings."""
config = MonitoringConfig(
enable_telemetry=ENABLE_TELEMETRY,
service_name=SERVICE_NAME,
jaeger_endpoint=JAEGER_ENDPOINT,
prometheus_port=PROMETHEUS_PORT,
telemetry_sample_rate=TELEMETRY_SAMPLE_RATE,
llm_slow_request_threshold_seconds=LLM_SLOW_REQUEST_THRESHOLD_SECONDS,
llm_slow_token_rate_threshold=LLM_SLOW_TOKEN_RATE_THRESHOLD
service_name=OTEL_SERVICE_NAME,
provider=MONITORING_PROVIDER or "otlp",
otlp_endpoint=OTEL_EXPORTER_OTLP_ENDPOINT,
otlp_traces_endpoint=OTEL_EXPORTER_OTLP_TRACES_ENDPOINT or None,
otlp_metrics_endpoint=OTEL_EXPORTER_OTLP_METRICS_ENDPOINT or None,
otlp_protocol=OTEL_EXPORTER_OTLP_PROTOCOL,
otlp_headers=OTLP_HEADERS,
export_metrics=OTEL_EXPORTER_OTLP_METRICS_ENABLED,
instrument_fastapi=MONITORING_INSTRUMENT_FASTAPI,
instrument_requests=MONITORING_INSTRUMENT_REQUESTS,
fastapi_excluded_urls=MONITORING_FASTAPI_EXCLUDED_URLS,
fastapi_exclude_spans=MONITORING_FASTAPI_EXCLUDE_SPANS,
project_name=MONITORING_PROJECT_NAME or None,
telemetry_sample_rate=TELEMETRY_SAMPLE_RATE
)

# Configure the SDK monitoring system using the singleton
monitoring_manager.configure(config)
logger.info(
f"Global monitoring initialized: service_name={SERVICE_NAME}, enable_telemetry={ENABLE_TELEMETRY}")
f"OTLP monitoring initialized: service_name={OTEL_SERVICE_NAME}, "
f"enable_telemetry={config.enable_telemetry}, provider={config.provider}, "
f"endpoint={config.otlp_endpoint}, trace_endpoint={config.get_trace_endpoint()}, "
f"protocol={OTEL_EXPORTER_OTLP_PROTOCOL}"
)


# Initialize monitoring when module is imported
_initialize_monitoring()


# Export the global monitoring manager instance
__all__ = [
'monitoring_manager'
]
__all__ = ['monitoring_manager']
1 change: 1 addition & 0 deletions doc/docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ export default defineConfig({
],
},
{ text: "性能监控", link: "/zh/sdk/monitoring" },
{ text: "OpenTelemetry 设计", link: "/zh/sdk/opentelemetry-design" },
{ text: "向量数据库", link: "/zh/sdk/vector-database" },
{ text: "数据处理", link: "/zh/sdk/data-process" },
],
Expand Down
2 changes: 1 addition & 1 deletion doc/docs/en/getting-started/software-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Real-time Input → Streaming Endpoint → Async Processing
- **High Availability**: Multi-service redundancy, health checks, auto-restart
- **High Performance**: Async processing, Redis caching, vector search optimization
- **High Concurrency**: Distributed architecture, load balancing
- **Monitoring Friendly**: Prometheus metrics, Jaeger tracing, structured logging
- **Monitoring Friendly**: OpenTelemetry observability, Grafana Tempo tracing, structured logging

### 🔧 Developer Friendly
- **Modular Development**: Clean layered architecture (App → Service → Database)
Expand Down
Loading
Loading