Skip to content

Commit a467c0d

Browse files
authored
Merge pull request #77 from fuzziecoder/codex/implement-elasticsearch-for-logs-and-analytics
Add Redis service layer and Elasticsearch log indexing/search; update config and monitoring APIs
2 parents b3710fa + fc361f9 commit a467c0d

11 files changed

Lines changed: 384 additions & 2 deletions

File tree

pipeline/README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ A production-ready pipeline automation system built with:
77
- **Redis** - State management, locks, caching
88
- **PostgreSQL** - Persistence
99
- **AI Safety Module** - Failure prediction & anomaly handling
10+
- **Elasticsearch** - Execution log indexing, fast filtering, analytics
1011
- **BentoML + Feast + Kubeflow** - End-to-end model infrastructure
1112
- **Prometheus + Grafana** - Metrics collection and dashboards
1213
- **ELK Stack (Elasticsearch, Logstash, Kibana)** - Centralized logging
@@ -228,6 +229,8 @@ curl -X POST http://localhost:8000/api/executions/pipeline-xxx/execute
228229
|----------|---------|-------------|
229230
| `DATABASE_URL` | - | PostgreSQL connection string |
230231
| `REDIS_URL` | `redis://localhost:6379/0` | Redis connection string |
232+
| `ELASTICSEARCH_URL` | `http://localhost:9200` | Elasticsearch connection string |
233+
| `ELASTICSEARCH_LOGS_INDEX` | `flexiroaster-execution-logs` | Logs index for search |
231234
| `EXECUTOR_MAX_RETRIES` | `3` | Max retries per stage |
232235
| `EXECUTOR_STAGE_TIMEOUT` | `120` | Stage timeout in seconds |
233236
| `AI_BLOCK_HIGH_RISK` | `false` | Block high-risk executions |
@@ -289,8 +292,9 @@ pipeline/
289292
|-----------|----------------|
290293
| **Airflow** | Scheduling, retries, dependencies |
291294
| **FastAPI** | Business logic, execution, AI safety |
292-
| **Redis** | Locks, caching, real-time state |
293-
| **PostgreSQL** | History, definitions, logs |
295+
| **Redis** | Locks, caching, real-time state, rate limit, sessions, job queue |
296+
| **PostgreSQL** | History, definitions, canonical execution records |
297+
| **Elasticsearch** | Execution log indexing, fast search, filtering, analytics |
294298

295299
### Fail-Safe Design
296300

pipeline/backend/api/routes/health.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from db.database import check_database_health
1818
from core.redis_state import redis_state_manager
1919
from config import settings
20+
from core.elasticsearch_client import elasticsearch_manager
2021

2122
logger = logging.getLogger(__name__)
2223

@@ -38,6 +39,7 @@ async def health_check(
3839
- Application
3940
- Database
4041
- Redis
42+
- Elasticsearch
4143
"""
4244
services = {}
4345
overall_status = "healthy"
@@ -63,6 +65,15 @@ async def health_check(
6365
services["redis"].details["note"] = "Running in fallback mode"
6466
else:
6567
overall_status = "degraded"
68+
69+
# Check Elasticsearch
70+
elastic_health = await elasticsearch_manager.health_check()
71+
services["elasticsearch"] = ServiceHealth(
72+
status=elastic_health.get("status", "unknown"),
73+
details=elastic_health
74+
)
75+
if elastic_health.get("status") not in ["healthy", "disabled"]:
76+
overall_status = "degraded"
6677

6778
return HealthResponse(
6879
status=overall_status,

pipeline/backend/api/routes/monitoring.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from fastapi import APIRouter, HTTPException
66

77
from ai.monitoring_engine import monitoring_engine
8+
from core.elasticsearch_client import elasticsearch_manager
9+
from core.redis_services import redis_service_layer
810

911
router = APIRouter(prefix="/monitoring", tags=["monitoring"])
1012

@@ -51,3 +53,57 @@ async def get_monitoring_snapshot(pipeline_id: str):
5153
},
5254
"generated_at": snapshot.generated_at.isoformat(),
5355
}
56+
57+
58+
@router.get("/logs/search", response_model=Dict[str, Any])
59+
async def search_execution_logs(
60+
query: str,
61+
pipeline_id: str | None = None,
62+
execution_id: str | None = None,
63+
level: str | None = None,
64+
limit: int = 100,
65+
):
66+
"""Search indexed execution logs in Elasticsearch for fast filtering and analytics."""
67+
levels = [level] if level else None
68+
results = await elasticsearch_manager.search_logs(
69+
query=query,
70+
pipeline_id=pipeline_id,
71+
execution_id=execution_id,
72+
levels=levels,
73+
limit=limit,
74+
)
75+
return {"query": query, "count": len(results), "results": results}
76+
77+
78+
@router.post("/cache/session/{session_id}", response_model=Dict[str, Any])
79+
async def cache_session(session_id: str, payload: Dict[str, Any]):
80+
"""Store session payload in Redis-backed session storage."""
81+
success = await redis_service_layer.set_session(session_id, payload)
82+
return {"success": success, "session_id": session_id}
83+
84+
85+
@router.get("/cache/session/{session_id}", response_model=Dict[str, Any])
86+
async def fetch_session(session_id: str):
87+
"""Fetch session payload from Redis-backed session storage."""
88+
session = await redis_service_layer.get_session(session_id)
89+
return {"session_id": session_id, "session": session}
90+
91+
92+
@router.post("/rate-limit/{identifier}", response_model=Dict[str, Any])
93+
async def check_rate_limit(identifier: str):
94+
"""Apply Redis-backed rate limiting check for an identifier."""
95+
return await redis_service_layer.check_rate_limit(identifier)
96+
97+
98+
@router.post("/jobs/enqueue", response_model=Dict[str, Any])
99+
async def enqueue_background_job(payload: Dict[str, Any]):
100+
"""Push a background job payload into Redis list-based broker queue."""
101+
success = await redis_service_layer.enqueue_job(payload)
102+
return {"success": success}
103+
104+
105+
@router.post("/jobs/dequeue", response_model=Dict[str, Any])
106+
async def dequeue_background_job():
107+
"""Pop a background job payload from Redis broker queue."""
108+
job = await redis_service_layer.dequeue_job()
109+
return {"job": job, "found": job is not None}

pipeline/backend/config.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ class Settings(BaseSettings):
5353
REDIS_SOCKET_TIMEOUT: float = 5.0
5454
REDIS_RETRY_ON_TIMEOUT: bool = True
5555
REDIS_DECODE_RESPONSES: bool = True
56+
REDIS_SESSION_TTL: int = 86400 # 24 hours
57+
REDIS_RATE_LIMIT_WINDOW_SECONDS: int = 60
58+
REDIS_RATE_LIMIT_MAX_REQUESTS: int = 120
59+
60+
# Redis queue/broker settings
61+
REDIS_JOB_QUEUE_KEY: str = "flexiroaster:jobs:default"
62+
REDIS_JOB_QUEUE_TIMEOUT: int = 5
5663

5764
# Execution Lock Settings
5865
EXECUTION_LOCK_TTL: int = 3600 # 1 hour
@@ -95,6 +102,17 @@ class Settings(BaseSettings):
95102
LOG_LEVEL: str = "INFO"
96103
LOG_FORMAT: str = "json" # "json" or "text"
97104
LOG_FILE: Optional[str] = None
105+
106+
# ===================
107+
# Elasticsearch Settings
108+
# ===================
109+
ELASTICSEARCH_ENABLED: bool = True
110+
ELASTICSEARCH_URL: str = "http://localhost:9200"
111+
ELASTICSEARCH_USERNAME: Optional[str] = None
112+
ELASTICSEARCH_PASSWORD: Optional[str] = None
113+
ELASTICSEARCH_VERIFY_CERTS: bool = True
114+
ELASTICSEARCH_LOGS_INDEX: str = "flexiroaster-execution-logs"
115+
ELASTICSEARCH_REQUEST_TIMEOUT: int = 10
98116

99117
# ===================
100118
# Observability Settings
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
"""
2+
Elasticsearch client utilities for FlexiRoaster.
3+
Handles log indexing and search/filter operations for execution analytics.
4+
"""
5+
import logging
6+
from typing import Optional, Dict, Any, List
7+
8+
from elasticsearch import AsyncElasticsearch
9+
10+
from config import settings
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class ElasticsearchManager:
16+
"""Manages Elasticsearch lifecycle and execution log indexing."""
17+
18+
def __init__(self) -> None:
19+
self._client: Optional[AsyncElasticsearch] = None
20+
self._available: bool = False
21+
22+
@property
23+
def is_available(self) -> bool:
24+
return self._available
25+
26+
async def initialize(self) -> bool:
27+
if not settings.ELASTICSEARCH_ENABLED:
28+
logger.info("Elasticsearch disabled via configuration")
29+
self._available = False
30+
return False
31+
32+
try:
33+
auth = None
34+
if settings.ELASTICSEARCH_USERNAME and settings.ELASTICSEARCH_PASSWORD:
35+
auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD)
36+
37+
self._client = AsyncElasticsearch(
38+
hosts=[settings.ELASTICSEARCH_URL],
39+
basic_auth=auth,
40+
verify_certs=settings.ELASTICSEARCH_VERIFY_CERTS,
41+
request_timeout=settings.ELASTICSEARCH_REQUEST_TIMEOUT,
42+
)
43+
44+
await self._client.ping()
45+
await self.ensure_index()
46+
47+
self._available = True
48+
logger.info("Elasticsearch initialized")
49+
return True
50+
except Exception as e:
51+
logger.warning(f"Elasticsearch unavailable, continuing without indexing: {e}")
52+
self._available = False
53+
return False
54+
55+
async def close(self) -> None:
56+
if self._client:
57+
await self._client.close()
58+
self._available = False
59+
60+
async def ensure_index(self) -> None:
61+
if not self._client:
62+
return
63+
64+
index_name = settings.ELASTICSEARCH_LOGS_INDEX
65+
exists = await self._client.indices.exists(index=index_name)
66+
if exists:
67+
return
68+
69+
await self._client.indices.create(
70+
index=index_name,
71+
mappings={
72+
"properties": {
73+
"timestamp": {"type": "date"},
74+
"execution_id": {"type": "keyword"},
75+
"pipeline_id": {"type": "keyword"},
76+
"stage_id": {"type": "keyword"},
77+
"level": {"type": "keyword"},
78+
"message": {"type": "text"},
79+
"metadata": {"type": "object", "enabled": True},
80+
}
81+
},
82+
)
83+
84+
async def health_check(self) -> Dict[str, Any]:
85+
if not settings.ELASTICSEARCH_ENABLED:
86+
return {"status": "disabled"}
87+
88+
if not self._client:
89+
return {"status": "disconnected"}
90+
91+
try:
92+
health = await self._client.cluster.health()
93+
return {
94+
"status": "healthy",
95+
"cluster_status": health.get("status"),
96+
"number_of_nodes": health.get("number_of_nodes"),
97+
}
98+
except Exception as e:
99+
return {"status": "unhealthy", "error": str(e)}
100+
101+
async def index_execution_log(self, document: Dict[str, Any]) -> bool:
102+
if not self._available or not self._client:
103+
return False
104+
105+
try:
106+
await self._client.index(index=settings.ELASTICSEARCH_LOGS_INDEX, document=document)
107+
return True
108+
except Exception as e:
109+
logger.warning(f"Failed to index log in Elasticsearch: {e}")
110+
return False
111+
112+
async def search_logs(
113+
self,
114+
query: str,
115+
pipeline_id: Optional[str] = None,
116+
execution_id: Optional[str] = None,
117+
levels: Optional[List[str]] = None,
118+
limit: int = 100,
119+
) -> List[Dict[str, Any]]:
120+
if not self._available or not self._client:
121+
return []
122+
123+
filters = []
124+
if pipeline_id:
125+
filters.append({"term": {"pipeline_id": pipeline_id}})
126+
if execution_id:
127+
filters.append({"term": {"execution_id": execution_id}})
128+
if levels:
129+
filters.append({"terms": {"level": levels}})
130+
131+
body: Dict[str, Any] = {
132+
"size": limit,
133+
"query": {
134+
"bool": {
135+
"must": [{"multi_match": {"query": query, "fields": ["message", "metadata.*"]}}],
136+
"filter": filters,
137+
}
138+
},
139+
"sort": [{"timestamp": {"order": "desc"}}],
140+
}
141+
142+
response = await self._client.search(index=settings.ELASTICSEARCH_LOGS_INDEX, body=body)
143+
return [hit.get("_source", {}) for hit in response.get("hits", {}).get("hits", [])]
144+
145+
146+
elasticsearch_manager = ElasticsearchManager()
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""
2+
Redis-backed services for sessions, rate limiting, and background job brokering.
3+
"""
4+
import json
5+
from datetime import datetime
6+
from typing import Any, Dict, Optional
7+
8+
from config import settings
9+
from core.redis_state import redis_state_manager
10+
11+
12+
class RedisServiceLayer:
13+
SESSION_KEY = "flexiroaster:session:{session_id}"
14+
RATE_LIMIT_KEY = "flexiroaster:ratelimit:{identifier}:{window}"
15+
16+
async def set_session(self, session_id: str, payload: Dict[str, Any], ttl: Optional[int] = None) -> bool:
17+
if not redis_state_manager.is_available:
18+
return False
19+
20+
key = self.SESSION_KEY.format(session_id=session_id)
21+
ttl_value = ttl or settings.REDIS_SESSION_TTL
22+
await redis_state_manager.client.set(key, json.dumps(payload), ex=ttl_value)
23+
return True
24+
25+
async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
26+
if not redis_state_manager.is_available:
27+
return None
28+
29+
key = self.SESSION_KEY.format(session_id=session_id)
30+
value = await redis_state_manager.client.get(key)
31+
if not value:
32+
return None
33+
return json.loads(value)
34+
35+
async def check_rate_limit(
36+
self,
37+
identifier: str,
38+
max_requests: Optional[int] = None,
39+
window_seconds: Optional[int] = None,
40+
) -> Dict[str, Any]:
41+
if not redis_state_manager.is_available:
42+
return {"allowed": True, "remaining": -1, "window_seconds": window_seconds or settings.REDIS_RATE_LIMIT_WINDOW_SECONDS}
43+
44+
limit = max_requests or settings.REDIS_RATE_LIMIT_MAX_REQUESTS
45+
window = window_seconds or settings.REDIS_RATE_LIMIT_WINDOW_SECONDS
46+
current_window = int(datetime.now().timestamp() // window)
47+
48+
key = self.RATE_LIMIT_KEY.format(identifier=identifier, window=current_window)
49+
current_count = await redis_state_manager.client.incr(key)
50+
if current_count == 1:
51+
await redis_state_manager.client.expire(key, window)
52+
53+
remaining = max(limit - current_count, 0)
54+
return {
55+
"allowed": current_count <= limit,
56+
"remaining": remaining,
57+
"window_seconds": window,
58+
"used": current_count,
59+
"limit": limit,
60+
}
61+
62+
async def enqueue_job(self, payload: Dict[str, Any], queue_key: Optional[str] = None) -> bool:
63+
if not redis_state_manager.is_available:
64+
return False
65+
66+
key = queue_key or settings.REDIS_JOB_QUEUE_KEY
67+
await redis_state_manager.client.lpush(key, json.dumps(payload))
68+
return True
69+
70+
async def dequeue_job(self, queue_key: Optional[str] = None, timeout: Optional[int] = None) -> Optional[Dict[str, Any]]:
71+
if not redis_state_manager.is_available:
72+
return None
73+
74+
key = queue_key or settings.REDIS_JOB_QUEUE_KEY
75+
wait_timeout = timeout or settings.REDIS_JOB_QUEUE_TIMEOUT
76+
result = await redis_state_manager.client.brpop(key, timeout=wait_timeout)
77+
if not result:
78+
return None
79+
80+
_, payload = result
81+
return json.loads(payload)
82+
83+
84+
redis_service_layer = RedisServiceLayer()

0 commit comments

Comments
 (0)