Skip to content

Commit 21b0575

Browse files
authored
Merge pull request #90 from fuzziecoder/codex/implement-monitoring-and-observability-stack
Implement Prometheus metrics and ELK/Grafana observability stack
2 parents 68284bf + 705f784 commit 21b0575

11 files changed

Lines changed: 369 additions & 9 deletions

File tree

backend/README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,33 @@ TOPIC_EXECUTION_COMPLETED=execution.completed
202202

203203
If Kafka/Redpanda is unavailable, the backend falls back to structured application logs for events so local development continues to work.
204204

205+
206+
## Monitoring & Observability
207+
208+
FlexiRoaster now includes a production observability baseline:
209+
210+
- **Prometheus** metrics scrape endpoint: `GET /metrics`
211+
- **Grafana** dashboards (using Prometheus datasource)
212+
- **Elasticsearch + Logstash** centralized JSON logs
213+
214+
Key telemetry includes:
215+
- Pipeline latency (`flexiroaster_pipeline_execution_latency_seconds`)
216+
- Failure rates (`flexiroaster_pipeline_failure_rate`)
217+
- Resource usage (`flexiroaster_process_cpu_percent`, `flexiroaster_process_memory_rss_bytes`)
218+
- SLA tracking (`flexiroaster_pipeline_sla_breaches_total`)
219+
220+
Runtime knobs (environment variables):
221+
222+
```env
223+
ENABLE_PROMETHEUS_METRICS=true
224+
PIPELINE_SLA_TARGET_SECONDS=30
225+
ENABLE_LOGSTASH_LOGGING=true
226+
LOGSTASH_HOST=logstash
227+
LOGSTASH_PORT=5000
228+
```
229+
230+
Use `docker compose up` to launch backend + monitoring stack (Prometheus/Grafana/Elasticsearch/Logstash).
231+
205232
## Next Steps
206233

207234
1. Install Python and dependencies

backend/api/middleware/logging_middleware.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from starlette.responses import Response
2020

2121
from backend.config import settings
22+
from backend.observability import observability_metrics
2223

2324

2425
# ---------------------------------------------------------------------------
@@ -104,6 +105,7 @@ async def dispatch(self, request: Request, call_next) -> Response:
104105
request_id = request.headers.get("x-request-id", str(uuid.uuid4())[:8])
105106

106107
# Timing
108+
timer = observability_metrics.start_timer()
107109
start = time.perf_counter()
108110

109111
# Request info
@@ -154,6 +156,13 @@ async def dispatch(self, request: Request, call_next) -> Response:
154156
request_id, method, path, status, phrase, duration_ms, length,
155157
)
156158

159+
observability_metrics.observe_http_request(
160+
method=method,
161+
path=path,
162+
status_code=status,
163+
duration_seconds=timer.elapsed_seconds(),
164+
)
165+
157166
response.headers["X-Request-ID"] = request_id
158167
return response
159168

@@ -164,4 +173,10 @@ async def dispatch(self, request: Request, call_next) -> Response:
164173
request_id, method, path, duration_ms, str(exc),
165174
exc_info=True,
166175
)
176+
observability_metrics.observe_http_request(
177+
method=method,
178+
path=path,
179+
status_code=500,
180+
duration_seconds=timer.elapsed_seconds(),
181+
)
167182
raise

backend/api/routes/executions.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from backend.config import settings
2626
from backend.core.executor import PipelineExecutor
2727
from backend.events import get_event_publisher
28+
from backend.observability import observability_metrics
2829
from backend.models.pipeline import Execution, ExecutionStatus
2930

3031
router = APIRouter(prefix="/executions", tags=["executions"])
@@ -38,6 +39,26 @@
3839
TERMINAL_STATUSES = {ExecutionStatus.COMPLETED, ExecutionStatus.FAILED, ExecutionStatus.CANCELLED}
3940

4041

42+
def _pipeline_failure_rate_percent(pipeline_id: str) -> float:
43+
"""Compute failure rate for completed pipeline executions."""
44+
terminal = [
45+
execution
46+
for execution in executions_db.values()
47+
if execution.pipeline_id == pipeline_id and execution.status in TERMINAL_STATUSES
48+
]
49+
if not terminal:
50+
return 0.0
51+
52+
failed = len([execution for execution in terminal if execution.status == ExecutionStatus.FAILED])
53+
return (failed / len(terminal)) * 100
54+
55+
56+
def _update_active_executions_metric() -> None:
57+
active_count = len([
58+
execution for execution in executions_db.values()
59+
if execution.status in {ExecutionStatus.PENDING, ExecutionStatus.RUNNING}
60+
])
61+
observability_metrics.set_active_executions(active_count)
4162
ORCHESTRATION_SCHEMA_TO_CORE = {
4263
OrchestrationEngineSchema.LOCAL: OrchestrationEngine.LOCAL,
4364
OrchestrationEngineSchema.AIRFLOW: OrchestrationEngine.AIRFLOW,
@@ -116,6 +137,7 @@ def initialize_execution(
116137
context=context or {},
117138
)
118139
executions_db[execution_id] = execution
140+
_update_active_executions_metric()
119141

120142
get_event_publisher().publish(
121143
topic=settings.TOPIC_EXECUTION_STARTED,
@@ -165,6 +187,16 @@ async def execute_pipeline_background(
165187
result.id = execution_id
166188
executions_db[execution_id] = result
167189

190+
observability_metrics.observe_execution_outcome(
191+
pipeline_id=result.pipeline_id,
192+
status=result.status.value,
193+
duration_seconds=result.duration,
194+
failure_rate_percent=_pipeline_failure_rate_percent(result.pipeline_id),
195+
sla_target_seconds=settings.PIPELINE_SLA_TARGET_SECONDS,
196+
)
197+
_update_active_executions_metric()
198+
observability_metrics.observe_process_resources()
199+
168200
if result.status == ExecutionStatus.COMPLETED:
169201
get_event_publisher().publish(
170202
topic=settings.TOPIC_EXECUTION_COMPLETED,
@@ -198,6 +230,15 @@ async def execute_pipeline_background(
198230
executions_db[execution_id].status = ExecutionStatus.FAILED
199231
executions_db[execution_id].error = str(e)
200232
failed_execution = executions_db[execution_id]
233+
observability_metrics.observe_execution_outcome(
234+
pipeline_id=failed_execution.pipeline_id,
235+
status=failed_execution.status.value,
236+
duration_seconds=failed_execution.duration,
237+
failure_rate_percent=_pipeline_failure_rate_percent(failed_execution.pipeline_id),
238+
sla_target_seconds=settings.PIPELINE_SLA_TARGET_SECONDS,
239+
)
240+
_update_active_executions_metric()
241+
observability_metrics.observe_process_resources()
201242
get_event_publisher().publish(
202243
topic=settings.TOPIC_EXECUTION_FAILED,
203244
key=failed_execution.id,

backend/api/routes/metrics.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
from fastapi import APIRouter
66
from datetime import datetime, timedelta
77
from typing import List
8-
import random
8+
import os
99

1010
from backend.api.schemas import SystemMetricsResponse, MetricResponse, MetricsHistoryResponse
11+
from backend.observability import observability_metrics
1112

1213
router = APIRouter(prefix="/metrics", tags=["metrics"])
1314

@@ -67,9 +68,23 @@ async def get_metrics():
6768
]
6869
pipeline_throughput = len(recent_executions)
6970

70-
# Simulated CPU and memory (would be real in production)
71-
cpu_usage = random.uniform(20, 80)
72-
memory_usage = random.uniform(30, 70)
71+
# Resource usage snapshots
72+
observability_metrics.observe_process_resources()
73+
74+
try:
75+
load1, _, _ = os.getloadavg()
76+
cpu_usage = max(min(load1 * 100, 100.0), 0.0)
77+
except Exception:
78+
cpu_usage = 0.0
79+
80+
try:
81+
import resource
82+
83+
rss_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
84+
rss_mb = rss_kb / 1024 if rss_kb < 10_000_000 else rss_kb / (1024 * 1024)
85+
memory_usage = max(min((rss_mb / 1024) * 100, 100.0), 0.0)
86+
except Exception:
87+
memory_usage = 0.0
7388

7489
return SystemMetricsResponse(
7590
cpu_usage=cpu_usage,
@@ -137,16 +152,16 @@ async def get_metrics_history(
137152

138153
# Generate sample values based on metric type
139154
if metric == "throughput":
140-
value = random.uniform(800, 2200)
155+
value = 800 + (i * 120) % 1400
141156
unit = "req/min"
142157
elif metric == "cpu":
143-
value = random.uniform(20, 80)
158+
value = 20 + (i * 7) % 60
144159
unit = "%"
145160
elif metric == "memory":
146-
value = random.uniform(30, 70)
161+
value = 30 + (i * 5) % 40
147162
unit = "%"
148163
else:
149-
value = random.uniform(0, 100)
164+
value = (i * 11) % 100
150165
unit = None
151166

152167
metrics.append(MetricResponse(

backend/config.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,15 @@ def parse_kafka_bootstrap_servers(cls, v):
9292
LOG_REQUEST_BODY: bool = False
9393
LOG_RESPONSE_BODY: bool = False
9494
SENSITIVE_FIELDS: List[str] = ["password", "token", "secret", "api_key", "authorization", "credit_card", "ssn"]
95+
96+
# Observability
97+
ENABLE_PROMETHEUS_METRICS: bool = True
98+
PIPELINE_SLA_TARGET_SECONDS: float = 30.0
99+
100+
# Centralized logs (Logstash TCP)
101+
ENABLE_LOGSTASH_LOGGING: bool = False
102+
LOGSTASH_HOST: str = "localhost"
103+
LOGSTASH_PORT: int = 5000
95104

96105
class Config:
97106
env_file = ".env"

backend/main.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@
77
import uvicorn
88
from fastapi import Depends, FastAPI, Request
99
from fastapi.middleware.cors import CORSMiddleware
10-
from fastapi.responses import JSONResponse
10+
from fastapi.responses import JSONResponse, Response
1111

1212
from backend.api.middleware.rate_limit_middleware import RateLimitMiddleware
1313
from backend.api.routes import airflow, executions, metrics, pipelines
1414
from backend.api.routes.auth import router as auth_router
1515
from backend.api.security import require_roles
1616
from backend.config import settings
1717
from backend.services.secrets import secret_manager
18+
from backend.observability import observability_metrics
19+
from backend.observability.logging import configure_logstash_logging
1820

1921
# Create FastAPI app
2022
app = FastAPI(
@@ -44,6 +46,9 @@
4446

4547
app.add_middleware(RequestLoggingMiddleware)
4648

49+
# Optional centralized log shipping
50+
configure_logstash_logging()
51+
4752

4853
@app.on_event("startup")
4954
async def load_runtime_secrets() -> None:
@@ -79,6 +84,13 @@ async def health_check():
7984
}
8085

8186

87+
@app.get("/metrics", include_in_schema=False)
88+
async def prometheus_metrics():
89+
"""Prometheus scrape endpoint for infrastructure monitoring."""
90+
payload, content_type = observability_metrics.prometheus_payload()
91+
return Response(content=payload, media_type=content_type)
92+
93+
8294
# Root endpoint
8395
@app.get("/", tags=["root"])
8496
async def root():

backend/observability/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""Observability utilities for metrics and logging."""
2+
3+
from backend.observability.metrics import observability_metrics
4+
5+
__all__ = ["observability_metrics"]

backend/observability/logging.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""Centralized logging helpers for shipping logs to Logstash/Elasticsearch."""
2+
from __future__ import annotations
3+
4+
import json
5+
import logging
6+
import logging.handlers
7+
from datetime import datetime, timezone
8+
9+
from backend.config import settings
10+
11+
12+
class JsonTcpLogstashHandler(logging.handlers.SocketHandler):
13+
"""Send structured JSON logs over TCP to a Logstash input."""
14+
15+
def makePickle(self, record: logging.LogRecord) -> bytes: # noqa: N802
16+
document = {
17+
"timestamp": datetime.now(timezone.utc).isoformat(),
18+
"service": "flexiroaster-backend",
19+
"logger": record.name,
20+
"level": record.levelname,
21+
"message": record.getMessage(),
22+
"pathname": record.pathname,
23+
"lineno": record.lineno,
24+
}
25+
return (json.dumps(document, default=str) + "\n").encode("utf-8")
26+
27+
28+
def configure_logstash_logging() -> bool:
29+
"""Attach a Logstash TCP handler to the API logger when enabled."""
30+
if not settings.ENABLE_LOGSTASH_LOGGING:
31+
return False
32+
33+
logger = logging.getLogger("flexiroaster.api")
34+
for handler in logger.handlers:
35+
if isinstance(handler, JsonTcpLogstashHandler):
36+
return True
37+
38+
handler = JsonTcpLogstashHandler(settings.LOGSTASH_HOST, settings.LOGSTASH_PORT)
39+
handler.setLevel(getattr(logging, settings.LOG_LEVEL.upper(), logging.INFO))
40+
logger.addHandler(handler)
41+
return True

0 commit comments

Comments
 (0)