diff --git a/backend/README.md b/backend/README.md index 4915d73..5f2aca9 100644 --- a/backend/README.md +++ b/backend/README.md @@ -202,6 +202,33 @@ TOPIC_EXECUTION_COMPLETED=execution.completed If Kafka/Redpanda is unavailable, the backend falls back to structured application logs for events so local development continues to work. + +## Monitoring & Observability + +FlexiRoaster now includes a production observability baseline: + +- **Prometheus** metrics scrape endpoint: `GET /metrics` +- **Grafana** dashboards (using Prometheus datasource) +- **Elasticsearch + Logstash** centralized JSON logs + +Key telemetry includes: +- Pipeline latency (`flexiroaster_pipeline_execution_latency_seconds`) +- Failure rates (`flexiroaster_pipeline_failure_rate`) +- Resource usage (`flexiroaster_process_cpu_percent`, `flexiroaster_process_memory_rss_bytes`) +- SLA tracking (`flexiroaster_pipeline_sla_breaches_total`) + +Runtime knobs (environment variables): + +```env +ENABLE_PROMETHEUS_METRICS=true +PIPELINE_SLA_TARGET_SECONDS=30 +ENABLE_LOGSTASH_LOGGING=true +LOGSTASH_HOST=logstash +LOGSTASH_PORT=5000 +``` + +Use `docker compose up` to launch backend + monitoring stack (Prometheus/Grafana/Elasticsearch/Logstash). + ## Next Steps 1. Install Python and dependencies diff --git a/backend/api/middleware/logging_middleware.py b/backend/api/middleware/logging_middleware.py index e1f7575..55d67d7 100644 --- a/backend/api/middleware/logging_middleware.py +++ b/backend/api/middleware/logging_middleware.py @@ -19,6 +19,7 @@ from starlette.responses import Response from backend.config import settings +from backend.observability import observability_metrics # --------------------------------------------------------------------------- @@ -104,6 +105,7 @@ async def dispatch(self, request: Request, call_next) -> Response: request_id = request.headers.get("x-request-id", str(uuid.uuid4())[:8]) # Timing + timer = observability_metrics.start_timer() start = time.perf_counter() # Request info @@ -154,6 +156,13 @@ async def dispatch(self, request: Request, call_next) -> Response: request_id, method, path, status, phrase, duration_ms, length, ) + observability_metrics.observe_http_request( + method=method, + path=path, + status_code=status, + duration_seconds=timer.elapsed_seconds(), + ) + response.headers["X-Request-ID"] = request_id return response @@ -164,4 +173,10 @@ async def dispatch(self, request: Request, call_next) -> Response: request_id, method, path, duration_ms, str(exc), exc_info=True, ) + observability_metrics.observe_http_request( + method=method, + path=path, + status_code=500, + duration_seconds=timer.elapsed_seconds(), + ) raise diff --git a/backend/api/routes/executions.py b/backend/api/routes/executions.py index 9a2de65..9c16567 100644 --- a/backend/api/routes/executions.py +++ b/backend/api/routes/executions.py @@ -25,6 +25,7 @@ from backend.config import settings from backend.core.executor import PipelineExecutor from backend.events import get_event_publisher +from backend.observability import observability_metrics from backend.models.pipeline import Execution, ExecutionStatus router = APIRouter(prefix="/executions", tags=["executions"]) @@ -38,6 +39,26 @@ TERMINAL_STATUSES = {ExecutionStatus.COMPLETED, ExecutionStatus.FAILED, ExecutionStatus.CANCELLED} +def _pipeline_failure_rate_percent(pipeline_id: str) -> float: + """Compute failure rate for completed pipeline executions.""" + terminal = [ + execution + for execution in executions_db.values() + if execution.pipeline_id == pipeline_id and execution.status in TERMINAL_STATUSES + ] + if not terminal: + return 0.0 + + failed = len([execution for execution in terminal if execution.status == ExecutionStatus.FAILED]) + return (failed / len(terminal)) * 100 + + +def _update_active_executions_metric() -> None: + active_count = len([ + execution for execution in executions_db.values() + if execution.status in {ExecutionStatus.PENDING, ExecutionStatus.RUNNING} + ]) + observability_metrics.set_active_executions(active_count) ORCHESTRATION_SCHEMA_TO_CORE = { OrchestrationEngineSchema.LOCAL: OrchestrationEngine.LOCAL, OrchestrationEngineSchema.AIRFLOW: OrchestrationEngine.AIRFLOW, @@ -116,6 +137,7 @@ def initialize_execution( context=context or {}, ) executions_db[execution_id] = execution + _update_active_executions_metric() get_event_publisher().publish( topic=settings.TOPIC_EXECUTION_STARTED, @@ -165,6 +187,16 @@ async def execute_pipeline_background( result.id = execution_id executions_db[execution_id] = result + observability_metrics.observe_execution_outcome( + pipeline_id=result.pipeline_id, + status=result.status.value, + duration_seconds=result.duration, + failure_rate_percent=_pipeline_failure_rate_percent(result.pipeline_id), + sla_target_seconds=settings.PIPELINE_SLA_TARGET_SECONDS, + ) + _update_active_executions_metric() + observability_metrics.observe_process_resources() + if result.status == ExecutionStatus.COMPLETED: get_event_publisher().publish( topic=settings.TOPIC_EXECUTION_COMPLETED, @@ -198,6 +230,15 @@ async def execute_pipeline_background( executions_db[execution_id].status = ExecutionStatus.FAILED executions_db[execution_id].error = str(e) failed_execution = executions_db[execution_id] + observability_metrics.observe_execution_outcome( + pipeline_id=failed_execution.pipeline_id, + status=failed_execution.status.value, + duration_seconds=failed_execution.duration, + failure_rate_percent=_pipeline_failure_rate_percent(failed_execution.pipeline_id), + sla_target_seconds=settings.PIPELINE_SLA_TARGET_SECONDS, + ) + _update_active_executions_metric() + observability_metrics.observe_process_resources() get_event_publisher().publish( topic=settings.TOPIC_EXECUTION_FAILED, key=failed_execution.id, diff --git a/backend/api/routes/metrics.py b/backend/api/routes/metrics.py index 97c63fc..6da24ce 100644 --- a/backend/api/routes/metrics.py +++ b/backend/api/routes/metrics.py @@ -5,9 +5,10 @@ from fastapi import APIRouter from datetime import datetime, timedelta from typing import List -import random +import os from backend.api.schemas import SystemMetricsResponse, MetricResponse, MetricsHistoryResponse +from backend.observability import observability_metrics router = APIRouter(prefix="/metrics", tags=["metrics"]) @@ -67,9 +68,23 @@ async def get_metrics(): ] pipeline_throughput = len(recent_executions) - # Simulated CPU and memory (would be real in production) - cpu_usage = random.uniform(20, 80) - memory_usage = random.uniform(30, 70) + # Resource usage snapshots + observability_metrics.observe_process_resources() + + try: + load1, _, _ = os.getloadavg() + cpu_usage = max(min(load1 * 100, 100.0), 0.0) + except Exception: + cpu_usage = 0.0 + + try: + import resource + + rss_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + rss_mb = rss_kb / 1024 if rss_kb < 10_000_000 else rss_kb / (1024 * 1024) + memory_usage = max(min((rss_mb / 1024) * 100, 100.0), 0.0) + except Exception: + memory_usage = 0.0 return SystemMetricsResponse( cpu_usage=cpu_usage, @@ -137,16 +152,16 @@ async def get_metrics_history( # Generate sample values based on metric type if metric == "throughput": - value = random.uniform(800, 2200) + value = 800 + (i * 120) % 1400 unit = "req/min" elif metric == "cpu": - value = random.uniform(20, 80) + value = 20 + (i * 7) % 60 unit = "%" elif metric == "memory": - value = random.uniform(30, 70) + value = 30 + (i * 5) % 40 unit = "%" else: - value = random.uniform(0, 100) + value = (i * 11) % 100 unit = None metrics.append(MetricResponse( diff --git a/backend/config.py b/backend/config.py index 67f638d..47fcc42 100644 --- a/backend/config.py +++ b/backend/config.py @@ -92,6 +92,15 @@ def parse_kafka_bootstrap_servers(cls, v): LOG_REQUEST_BODY: bool = False LOG_RESPONSE_BODY: bool = False SENSITIVE_FIELDS: List[str] = ["password", "token", "secret", "api_key", "authorization", "credit_card", "ssn"] + + # Observability + ENABLE_PROMETHEUS_METRICS: bool = True + PIPELINE_SLA_TARGET_SECONDS: float = 30.0 + + # Centralized logs (Logstash TCP) + ENABLE_LOGSTASH_LOGGING: bool = False + LOGSTASH_HOST: str = "localhost" + LOGSTASH_PORT: int = 5000 class Config: env_file = ".env" diff --git a/backend/main.py b/backend/main.py index 5e42002..9204fa3 100644 --- a/backend/main.py +++ b/backend/main.py @@ -7,7 +7,7 @@ import uvicorn from fastapi import Depends, FastAPI, Request from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, Response from backend.api.middleware.rate_limit_middleware import RateLimitMiddleware from backend.api.routes import airflow, executions, metrics, pipelines @@ -15,6 +15,8 @@ from backend.api.security import require_roles from backend.config import settings from backend.services.secrets import secret_manager +from backend.observability import observability_metrics +from backend.observability.logging import configure_logstash_logging # Create FastAPI app app = FastAPI( @@ -44,6 +46,9 @@ app.add_middleware(RequestLoggingMiddleware) +# Optional centralized log shipping +configure_logstash_logging() + @app.on_event("startup") async def load_runtime_secrets() -> None: @@ -79,6 +84,13 @@ async def health_check(): } +@app.get("/metrics", include_in_schema=False) +async def prometheus_metrics(): + """Prometheus scrape endpoint for infrastructure monitoring.""" + payload, content_type = observability_metrics.prometheus_payload() + return Response(content=payload, media_type=content_type) + + # Root endpoint @app.get("/", tags=["root"]) async def root(): diff --git a/backend/observability/__init__.py b/backend/observability/__init__.py new file mode 100644 index 0000000..b5447d0 --- /dev/null +++ b/backend/observability/__init__.py @@ -0,0 +1,5 @@ +"""Observability utilities for metrics and logging.""" + +from backend.observability.metrics import observability_metrics + +__all__ = ["observability_metrics"] diff --git a/backend/observability/logging.py b/backend/observability/logging.py new file mode 100644 index 0000000..cbc2de3 --- /dev/null +++ b/backend/observability/logging.py @@ -0,0 +1,41 @@ +"""Centralized logging helpers for shipping logs to Logstash/Elasticsearch.""" +from __future__ import annotations + +import json +import logging +import logging.handlers +from datetime import datetime, timezone + +from backend.config import settings + + +class JsonTcpLogstashHandler(logging.handlers.SocketHandler): + """Send structured JSON logs over TCP to a Logstash input.""" + + def makePickle(self, record: logging.LogRecord) -> bytes: # noqa: N802 + document = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "service": "flexiroaster-backend", + "logger": record.name, + "level": record.levelname, + "message": record.getMessage(), + "pathname": record.pathname, + "lineno": record.lineno, + } + return (json.dumps(document, default=str) + "\n").encode("utf-8") + + +def configure_logstash_logging() -> bool: + """Attach a Logstash TCP handler to the API logger when enabled.""" + if not settings.ENABLE_LOGSTASH_LOGGING: + return False + + logger = logging.getLogger("flexiroaster.api") + for handler in logger.handlers: + if isinstance(handler, JsonTcpLogstashHandler): + return True + + handler = JsonTcpLogstashHandler(settings.LOGSTASH_HOST, settings.LOGSTASH_PORT) + handler.setLevel(getattr(logging, settings.LOG_LEVEL.upper(), logging.INFO)) + logger.addHandler(handler) + return True diff --git a/backend/observability/metrics.py b/backend/observability/metrics.py new file mode 100644 index 0000000..846a94f --- /dev/null +++ b/backend/observability/metrics.py @@ -0,0 +1,146 @@ +"""Prometheus metrics registry and helpers for runtime observability.""" +from __future__ import annotations + +import os +from dataclasses import dataclass +from time import perf_counter +from typing import Optional + +from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, Histogram, generate_latest + + +@dataclass +class _Timer: + """Simple timing helper used by request and execution instrumentation.""" + + started_at: float + + @classmethod + def start(cls) -> "_Timer": + return cls(started_at=perf_counter()) + + def elapsed_seconds(self) -> float: + return max(perf_counter() - self.started_at, 0.0) + + +class ObservabilityMetrics: + """Holds Prometheus metric objects and provides update helpers.""" + + def __init__(self) -> None: + self.http_requests_total = Counter( + "flexiroaster_http_requests_total", + "Total HTTP requests served", + ["method", "path", "status"], + ) + self.http_request_latency_seconds = Histogram( + "flexiroaster_http_request_latency_seconds", + "HTTP request latency in seconds", + ["method", "path", "status"], + buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10), + ) + + self.pipeline_executions_total = Counter( + "flexiroaster_pipeline_executions_total", + "Total number of pipeline execution outcomes", + ["pipeline_id", "status"], + ) + self.pipeline_execution_latency_seconds = Histogram( + "flexiroaster_pipeline_execution_latency_seconds", + "Pipeline execution latency in seconds", + ["pipeline_id", "status"], + buckets=(0.1, 0.5, 1, 2, 5, 10, 20, 30, 60, 120, 300, 600), + ) + + self.pipeline_failure_rate = Gauge( + "flexiroaster_pipeline_failure_rate", + "Pipeline failure rate as a percentage (0-100)", + ["pipeline_id"], + ) + self.pipeline_sla_breach_total = Counter( + "flexiroaster_pipeline_sla_breaches_total", + "Count of executions that exceeded SLA target latency", + ["pipeline_id"], + ) + + self.active_executions = Gauge( + "flexiroaster_active_executions", + "Number of active (pending/running) executions", + ) + self.process_cpu_percent = Gauge( + "flexiroaster_process_cpu_percent", + "Process CPU usage percent", + ) + self.process_memory_rss_bytes = Gauge( + "flexiroaster_process_memory_rss_bytes", + "Process resident memory usage in bytes", + ) + + @staticmethod + def prometheus_payload() -> tuple[bytes, str]: + """Return latest Prometheus exposition bytes and content type.""" + return generate_latest(), CONTENT_TYPE_LATEST + + @staticmethod + def start_timer() -> _Timer: + return _Timer.start() + + def observe_http_request(self, method: str, path: str, status_code: int, duration_seconds: float) -> None: + status = str(status_code) + self.http_requests_total.labels(method=method, path=path, status=status).inc() + self.http_request_latency_seconds.labels(method=method, path=path, status=status).observe(duration_seconds) + + def observe_execution_outcome( + self, + pipeline_id: str, + status: str, + duration_seconds: Optional[float], + failure_rate_percent: float, + sla_target_seconds: float, + ) -> None: + self.pipeline_executions_total.labels(pipeline_id=pipeline_id, status=status).inc() + self.pipeline_failure_rate.labels(pipeline_id=pipeline_id).set(max(min(failure_rate_percent, 100.0), 0.0)) + + if duration_seconds is not None: + safe_duration = max(duration_seconds, 0.0) + self.pipeline_execution_latency_seconds.labels( + pipeline_id=pipeline_id, + status=status, + ).observe(safe_duration) + if safe_duration > sla_target_seconds: + self.pipeline_sla_breach_total.labels(pipeline_id=pipeline_id).inc() + + def set_active_executions(self, count: int) -> None: + self.active_executions.set(max(count, 0)) + + def observe_process_resources(self) -> None: + """Capture CPU and memory usage from the current process when possible.""" + try: + import resource + + memory_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + # Linux reports KB, macOS reports bytes. Normalize by checking magnitude. + if memory_kb < 10_000_000: + memory_bytes = int(memory_kb * 1024) + else: + memory_bytes = int(memory_kb) + self.process_memory_rss_bytes.set(max(memory_bytes, 0)) + except Exception: + pass + + try: + import psutil # optional dependency + + process = psutil.Process(os.getpid()) + self.process_cpu_percent.set(max(process.cpu_percent(interval=0.0), 0.0)) + self.process_memory_rss_bytes.set(max(process.memory_info().rss, 0)) + except Exception: + # Fallback to system load average as coarse CPU signal. + try: + load1, _, _ = os.getloadavg() + cpu_guess = max(load1 * 100.0, 0.0) + self.process_cpu_percent.set(cpu_guess) + except Exception: + pass + + +observability_metrics = ObservabilityMetrics() diff --git a/backend/requirements.txt b/backend/requirements.txt index 4ed8c41..beaae79 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -12,3 +12,4 @@ python-dotenv==1.0.0 pydantic-settings==2.1.0 PyJWT==2.10.1 kafka-python==2.2.15 +prometheus-client==0.21.0 diff --git a/docker-compose.yml b/docker-compose.yml index 6132450..047785b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,6 +7,15 @@ services: dockerfile: Dockerfile.backend ports: - "8000:8000" + environment: + ENABLE_PROMETHEUS_METRICS: "true" + ENABLE_LOGSTASH_LOGGING: "true" + LOGSTASH_HOST: logstash + LOGSTASH_PORT: 5000 + PIPELINE_SLA_TARGET_SECONDS: 30 + depends_on: + logstash: + condition: service_started restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] @@ -35,3 +44,42 @@ services: backend: condition: service_healthy restart: unless-stopped + + prometheus: + image: prom/prometheus:v2.54.1 + ports: + - "9090:9090" + volumes: + - ./pipeline/monitoring/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + + grafana: + image: grafana/grafana:11.2.2 + ports: + - "3000:3000" + environment: + GF_SECURITY_ADMIN_USER: admin + GF_SECURITY_ADMIN_PASSWORD: admin + GF_USERS_ALLOW_SIGN_UP: "false" + depends_on: + - prometheus + + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:8.15.1 + environment: + ES_JAVA_OPTS: "-Xms512m -Xmx512m" + discovery.type: single-node + xpack.security.enabled: "false" + volumes: + - ./pipeline/monitoring/elasticsearch/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml:ro + ports: + - "9200:9200" + + logstash: + image: docker.elastic.co/logstash/logstash:8.15.1 + volumes: + - ./pipeline/monitoring/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro + ports: + - "5000:5000" + - "5044:5044" + depends_on: + - elasticsearch