Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,33 @@ TOPIC_EXECUTION_COMPLETED=execution.completed

If Kafka/Redpanda is unavailable, the backend falls back to structured application logs for events so local development continues to work.


## Monitoring & Observability

FlexiRoaster now includes a production observability baseline:

- **Prometheus** metrics scrape endpoint: `GET /metrics`
- **Grafana** dashboards (using Prometheus datasource)
- **Elasticsearch + Logstash** centralized JSON logs

Key telemetry includes:
- Pipeline latency (`flexiroaster_pipeline_execution_latency_seconds`)
- Failure rates (`flexiroaster_pipeline_failure_rate`)
- Resource usage (`flexiroaster_process_cpu_percent`, `flexiroaster_process_memory_rss_bytes`)
- SLA tracking (`flexiroaster_pipeline_sla_breaches_total`)

Runtime knobs (environment variables):

```env
ENABLE_PROMETHEUS_METRICS=true
PIPELINE_SLA_TARGET_SECONDS=30
ENABLE_LOGSTASH_LOGGING=true
LOGSTASH_HOST=logstash
LOGSTASH_PORT=5000
```

Use `docker compose up` to launch backend + monitoring stack (Prometheus/Grafana/Elasticsearch/Logstash).

## Next Steps

1. Install Python and dependencies
Expand Down
15 changes: 15 additions & 0 deletions backend/api/middleware/logging_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from starlette.responses import Response

from backend.config import settings
from backend.observability import observability_metrics


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -104,6 +105,7 @@ async def dispatch(self, request: Request, call_next) -> Response:
request_id = request.headers.get("x-request-id", str(uuid.uuid4())[:8])

# Timing
timer = observability_metrics.start_timer()
start = time.perf_counter()

# Request info
Expand Down Expand Up @@ -154,6 +156,13 @@ async def dispatch(self, request: Request, call_next) -> Response:
request_id, method, path, status, phrase, duration_ms, length,
)

observability_metrics.observe_http_request(
method=method,
path=path,
status_code=status,
duration_seconds=timer.elapsed_seconds(),
)

response.headers["X-Request-ID"] = request_id
return response

Expand All @@ -164,4 +173,10 @@ async def dispatch(self, request: Request, call_next) -> Response:
request_id, method, path, duration_ms, str(exc),
exc_info=True,
)
observability_metrics.observe_http_request(
method=method,
path=path,
status_code=500,
duration_seconds=timer.elapsed_seconds(),
)
raise
41 changes: 41 additions & 0 deletions backend/api/routes/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from backend.config import settings
from backend.core.executor import PipelineExecutor
from backend.events import get_event_publisher
from backend.observability import observability_metrics
from backend.models.pipeline import Execution, ExecutionStatus

router = APIRouter(prefix="/executions", tags=["executions"])
Expand All @@ -38,6 +39,26 @@
TERMINAL_STATUSES = {ExecutionStatus.COMPLETED, ExecutionStatus.FAILED, ExecutionStatus.CANCELLED}


def _pipeline_failure_rate_percent(pipeline_id: str) -> float:
"""Compute failure rate for completed pipeline executions."""
terminal = [
execution
for execution in executions_db.values()
if execution.pipeline_id == pipeline_id and execution.status in TERMINAL_STATUSES
]
if not terminal:
return 0.0

failed = len([execution for execution in terminal if execution.status == ExecutionStatus.FAILED])
return (failed / len(terminal)) * 100


def _update_active_executions_metric() -> None:
active_count = len([
execution for execution in executions_db.values()
if execution.status in {ExecutionStatus.PENDING, ExecutionStatus.RUNNING}
])
observability_metrics.set_active_executions(active_count)
ORCHESTRATION_SCHEMA_TO_CORE = {
OrchestrationEngineSchema.LOCAL: OrchestrationEngine.LOCAL,
OrchestrationEngineSchema.AIRFLOW: OrchestrationEngine.AIRFLOW,
Expand Down Expand Up @@ -116,6 +137,7 @@ def initialize_execution(
context=context or {},
)
executions_db[execution_id] = execution
_update_active_executions_metric()

get_event_publisher().publish(
topic=settings.TOPIC_EXECUTION_STARTED,
Expand Down Expand Up @@ -165,6 +187,16 @@ async def execute_pipeline_background(
result.id = execution_id
executions_db[execution_id] = result

observability_metrics.observe_execution_outcome(
pipeline_id=result.pipeline_id,
status=result.status.value,
duration_seconds=result.duration,
failure_rate_percent=_pipeline_failure_rate_percent(result.pipeline_id),
sla_target_seconds=settings.PIPELINE_SLA_TARGET_SECONDS,
)
_update_active_executions_metric()
observability_metrics.observe_process_resources()

if result.status == ExecutionStatus.COMPLETED:
get_event_publisher().publish(
topic=settings.TOPIC_EXECUTION_COMPLETED,
Expand Down Expand Up @@ -198,6 +230,15 @@ async def execute_pipeline_background(
executions_db[execution_id].status = ExecutionStatus.FAILED
executions_db[execution_id].error = str(e)
failed_execution = executions_db[execution_id]
observability_metrics.observe_execution_outcome(
pipeline_id=failed_execution.pipeline_id,
status=failed_execution.status.value,
duration_seconds=failed_execution.duration,
failure_rate_percent=_pipeline_failure_rate_percent(failed_execution.pipeline_id),
sla_target_seconds=settings.PIPELINE_SLA_TARGET_SECONDS,
)
_update_active_executions_metric()
observability_metrics.observe_process_resources()
get_event_publisher().publish(
topic=settings.TOPIC_EXECUTION_FAILED,
key=failed_execution.id,
Expand Down
31 changes: 23 additions & 8 deletions backend/api/routes/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from fastapi import APIRouter
from datetime import datetime, timedelta
from typing import List
import random
import os

from backend.api.schemas import SystemMetricsResponse, MetricResponse, MetricsHistoryResponse
from backend.observability import observability_metrics

router = APIRouter(prefix="/metrics", tags=["metrics"])

Expand Down Expand Up @@ -67,9 +68,23 @@ async def get_metrics():
]
pipeline_throughput = len(recent_executions)

# Simulated CPU and memory (would be real in production)
cpu_usage = random.uniform(20, 80)
memory_usage = random.uniform(30, 70)
# Resource usage snapshots
observability_metrics.observe_process_resources()

try:
load1, _, _ = os.getloadavg()
cpu_usage = max(min(load1 * 100, 100.0), 0.0)
Comment on lines +75 to +76
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 CPU usage metric derived from load average is not a valid CPU percentage

Both backend/api/routes/metrics.py:75-76 and backend/observability/metrics.py:139-141 compute CPU usage as load1 * 100, treating the 1-minute load average from os.getloadavg() as if it were a CPU utilization fraction.

Detailed Explanation

The 1-minute load average represents the average number of processes in the system's run queue — it is not a fraction of CPU capacity. On a multi-core system a load of 4.0 is normal (not 400% CPU). Conversely, a load of 0.5 on an otherwise idle 16-core machine would be reported as 50% CPU usage.

In backend/api/routes/metrics.py:75-76:

load1, _, _ = os.getloadavg()
cpu_usage = max(min(load1 * 100, 100.0), 0.0)

And the same logic in backend/observability/metrics.py:139-141:

load1, _, _ = os.getloadavg()
cpu_guess = max(load1 * 100.0, 0.0)
self.process_cpu_percent.set(cpu_guess)

Note that the Prometheus gauge version at metrics.py:140 doesn't even apply min(..., 100.0), so the gauge can report values above 100 (e.g., load of 2.0 → 200.0), which violates the "percent" semantic of the metric name flexiroaster_process_cpu_percent.

Impact: CPU usage metrics will be misleading in dashboards and SLA tracking. On single-core systems with low load this might look plausible, masking the fact that the numbers are semantically wrong on multi-core systems.

Prompt for agents
In both backend/api/routes/metrics.py (lines 74-78) and backend/observability/metrics.py (lines 138-141), the load average is incorrectly used as CPU percentage. To fix:

1. Divide load1 by the number of CPUs to get a utilization ratio: use os.cpu_count() or multiprocessing.cpu_count().
2. Apply: cpu_usage = max(min((load1 / os.cpu_count()) * 100.0, 100.0), 0.0)

This converts the load average into a rough per-CPU utilization percentage that makes sense on multi-core systems. Alternatively, if psutil is available, prefer psutil.cpu_percent() which gives the actual CPU utilization.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

except Exception:
cpu_usage = 0.0

try:
import resource

rss_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
rss_mb = rss_kb / 1024 if rss_kb < 10_000_000 else rss_kb / (1024 * 1024)
memory_usage = max(min((rss_mb / 1024) * 100, 100.0), 0.0)
except Exception:
memory_usage = 0.0

return SystemMetricsResponse(
cpu_usage=cpu_usage,
Expand Down Expand Up @@ -137,16 +152,16 @@ async def get_metrics_history(

# Generate sample values based on metric type
if metric == "throughput":
value = random.uniform(800, 2200)
value = 800 + (i * 120) % 1400
unit = "req/min"
elif metric == "cpu":
value = random.uniform(20, 80)
value = 20 + (i * 7) % 60
unit = "%"
elif metric == "memory":
value = random.uniform(30, 70)
value = 30 + (i * 5) % 40
unit = "%"
else:
value = random.uniform(0, 100)
value = (i * 11) % 100
unit = None

metrics.append(MetricResponse(
Expand Down
9 changes: 9 additions & 0 deletions backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ def parse_kafka_bootstrap_servers(cls, v):
LOG_REQUEST_BODY: bool = False
LOG_RESPONSE_BODY: bool = False
SENSITIVE_FIELDS: List[str] = ["password", "token", "secret", "api_key", "authorization", "credit_card", "ssn"]

# Observability
ENABLE_PROMETHEUS_METRICS: bool = True
PIPELINE_SLA_TARGET_SECONDS: float = 30.0

# Centralized logs (Logstash TCP)
ENABLE_LOGSTASH_LOGGING: bool = False
LOGSTASH_HOST: str = "localhost"
LOGSTASH_PORT: int = 5000

class Config:
env_file = ".env"
Expand Down
14 changes: 13 additions & 1 deletion backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
import uvicorn
from fastapi import Depends, FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, Response

from backend.api.middleware.rate_limit_middleware import RateLimitMiddleware
from backend.api.routes import airflow, executions, metrics, pipelines
from backend.api.routes.auth import router as auth_router
from backend.api.security import require_roles
from backend.config import settings
from backend.services.secrets import secret_manager
from backend.observability import observability_metrics
from backend.observability.logging import configure_logstash_logging

# Create FastAPI app
app = FastAPI(
Expand Down Expand Up @@ -44,6 +46,9 @@

app.add_middleware(RequestLoggingMiddleware)

# Optional centralized log shipping
configure_logstash_logging()


@app.on_event("startup")
async def load_runtime_secrets() -> None:
Expand Down Expand Up @@ -79,6 +84,13 @@ async def health_check():
}


@app.get("/metrics", include_in_schema=False)
async def prometheus_metrics():
"""Prometheus scrape endpoint for infrastructure monitoring."""
payload, content_type = observability_metrics.prometheus_payload()
return Response(content=payload, media_type=content_type)


# Root endpoint
@app.get("/", tags=["root"])
async def root():
Expand Down
5 changes: 5 additions & 0 deletions backend/observability/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Observability utilities for metrics and logging."""

from backend.observability.metrics import observability_metrics

__all__ = ["observability_metrics"]
41 changes: 41 additions & 0 deletions backend/observability/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Centralized logging helpers for shipping logs to Logstash/Elasticsearch."""
from __future__ import annotations

import json
import logging
import logging.handlers
from datetime import datetime, timezone

from backend.config import settings


class JsonTcpLogstashHandler(logging.handlers.SocketHandler):
"""Send structured JSON logs over TCP to a Logstash input."""

def makePickle(self, record: logging.LogRecord) -> bytes: # noqa: N802
document = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": "flexiroaster-backend",
"logger": record.name,
"level": record.levelname,
"message": record.getMessage(),
"pathname": record.pathname,
"lineno": record.lineno,
}
return (json.dumps(document, default=str) + "\n").encode("utf-8")


def configure_logstash_logging() -> bool:
"""Attach a Logstash TCP handler to the API logger when enabled."""
if not settings.ENABLE_LOGSTASH_LOGGING:
return False

logger = logging.getLogger("flexiroaster.api")
for handler in logger.handlers:
if isinstance(handler, JsonTcpLogstashHandler):
return True

handler = JsonTcpLogstashHandler(settings.LOGSTASH_HOST, settings.LOGSTASH_PORT)
handler.setLevel(getattr(logging, settings.LOG_LEVEL.upper(), logging.INFO))
logger.addHandler(handler)
return True
Loading
Loading