Skip to content

Commit a0971d3

Browse files
authored
Merge pull request #92 from fuzziecoder/codex/implement-advanced-orchestration-stack-example
Implement modern advanced orchestration stack profile and protected API
2 parents b471463 + 41afd7c commit a0971d3

10 files changed

Lines changed: 490 additions & 7 deletions

File tree

pipeline/README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
A production-ready pipeline automation system built with:
44

5-
- **Apache Airflow** - Orchestration & scheduling
5+
- **Apache Airflow / Temporal** - Orchestration & scheduling
66
- **FastAPI** - Pipeline execution engine
77
- **Redis** - State management, locks, caching
88
- **Celery** - Distributed task queue
99
- **Kafka** - Event-driven execution streaming
10+
- **Kubernetes Jobs** - Isolated execution runtime
11+
- **Ray** - Distributed compute engine
12+
- **MinIO/Object Storage** - Artifact persistence
1013
- **PostgreSQL** - Persistence
1114
- **AI Safety Module** - Failure prediction & anomaly handling
1215
- **Elasticsearch** - Execution log indexing, fast filtering, analytics
@@ -397,3 +400,13 @@ MIT License
397400
cd pipeline/backend
398401
celery -A core.tasks.celery_app worker --loglevel=info --concurrency=4
399402
```
403+
404+
405+
### Advanced Orchestration Stack API
406+
407+
| Method | Endpoint | Description |
408+
|--------|----------|-------------|
409+
| GET | `/api/advanced-stack/architecture` | Returns the composed stack profile (JWT+RBAC protected) |
410+
| POST | `/api/advanced-stack/executions` | Creates an orchestration envelope for Airflow/Temporal + K8s + Ray + Kafka |
411+
412+
Security model uses JWT bearer auth with RBAC roles (`admin`, `operator`, `viewer`) and pluggable secrets providers (`env`, `aws`, `vault`).
Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
1-
"""API Routes module initialization."""
2-
from api.routes import pipelines, executions, health, monitoring, ai_automation, microservices
31
"""API route module exports."""
42

5-
from api.routes import ai_automation, executions, health, model_infra, monitoring, pipelines
3+
from api.routes import (
4+
advanced_stack,
5+
ai_automation,
6+
executions,
7+
health,
8+
microservices,
9+
model_infra,
10+
monitoring,
11+
pipelines,
12+
)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""Advanced orchestration stack endpoints."""
2+
from __future__ import annotations
3+
4+
from typing import Any, Dict
5+
6+
from fastapi import APIRouter, Depends
7+
from pydantic import BaseModel, Field
8+
9+
from api.security import AuthContext, require_roles
10+
from core.modern_stack import ModernOrchestrationStack
11+
12+
router = APIRouter(prefix="/advanced-stack", tags=["advanced-stack"])
13+
stack = ModernOrchestrationStack()
14+
15+
16+
class StackExecutionRequest(BaseModel):
17+
pipeline_id: str = Field(..., description="Pipeline identifier to execute")
18+
payload: Dict[str, Any] = Field(default_factory=dict, description="Execution payload")
19+
20+
21+
@router.get("/architecture")
22+
async def get_stack_architecture(
23+
_: AuthContext = Depends(require_roles("admin", "operator", "viewer")),
24+
) -> Dict[str, Any]:
25+
"""Return current high-end orchestration stack architecture."""
26+
return stack.architecture()
27+
28+
29+
@router.post("/executions")
30+
async def submit_stack_execution(
31+
request: StackExecutionRequest,
32+
_: AuthContext = Depends(require_roles("admin", "operator")),
33+
) -> Dict[str, Any]:
34+
"""Submit an execution envelope for orchestration connectors."""
35+
return stack.submit_execution(request.pipeline_id, request.payload)

pipeline/backend/api/security.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
"""JWT + RBAC security helpers for advanced orchestration routes."""
2+
from __future__ import annotations
3+
4+
import base64
5+
import hashlib
6+
import hmac
7+
import json
8+
from dataclasses import dataclass
9+
from datetime import datetime, timezone
10+
from typing import Any, Dict, List
11+
12+
from fastapi import Depends, HTTPException, status
13+
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
14+
15+
from config import settings
16+
17+
bearer_scheme = HTTPBearer(auto_error=False)
18+
19+
20+
@dataclass
21+
class AuthContext:
22+
"""Authenticated identity context extracted from a JWT token."""
23+
24+
subject: str
25+
roles: List[str]
26+
claims: Dict[str, Any]
27+
28+
29+
def _b64url_decode(data: str) -> bytes:
30+
padding = "=" * (-len(data) % 4)
31+
return base64.urlsafe_b64decode(data + padding)
32+
33+
34+
def _decode_hs256_jwt(token: str, secret: str) -> Dict[str, Any]:
35+
"""Decode and verify a compact JWT token signed with HS256."""
36+
try:
37+
header_segment, payload_segment, signature_segment = token.split(".")
38+
except ValueError as exc:
39+
raise HTTPException(status_code=401, detail="Malformed JWT token") from exc
40+
41+
signing_input = f"{header_segment}.{payload_segment}".encode("utf-8")
42+
expected_signature = hmac.new(
43+
secret.encode("utf-8"), signing_input, hashlib.sha256
44+
).digest()
45+
supplied_signature = _b64url_decode(signature_segment)
46+
47+
if not hmac.compare_digest(expected_signature, supplied_signature):
48+
raise HTTPException(status_code=401, detail="Invalid JWT signature")
49+
50+
header = json.loads(_b64url_decode(header_segment).decode("utf-8"))
51+
if header.get("alg") != "HS256":
52+
raise HTTPException(status_code=401, detail="Unsupported JWT algorithm")
53+
54+
payload = json.loads(_b64url_decode(payload_segment).decode("utf-8"))
55+
exp = payload.get("exp")
56+
if exp is not None:
57+
expires_at = datetime.fromtimestamp(float(exp), tz=timezone.utc)
58+
if datetime.now(tz=timezone.utc) >= expires_at:
59+
raise HTTPException(status_code=401, detail="JWT token is expired")
60+
61+
return payload
62+
63+
64+
async def get_auth_context(
65+
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
66+
) -> AuthContext:
67+
"""Extract JWT identity and roles from Authorization bearer token."""
68+
if not settings.JWT_SECRET:
69+
raise HTTPException(
70+
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
71+
detail="JWT security is not configured",
72+
)
73+
74+
if credentials is None or not credentials.credentials:
75+
raise HTTPException(
76+
status_code=status.HTTP_401_UNAUTHORIZED,
77+
detail="Missing bearer token",
78+
)
79+
80+
claims = _decode_hs256_jwt(credentials.credentials, settings.JWT_SECRET)
81+
subject = claims.get("sub")
82+
roles = claims.get("roles") or []
83+
84+
if not subject:
85+
raise HTTPException(status_code=401, detail="JWT subject is required")
86+
if not isinstance(roles, list):
87+
raise HTTPException(status_code=401, detail="JWT roles claim must be a list")
88+
89+
return AuthContext(subject=subject, roles=roles, claims=claims)
90+
91+
92+
def require_roles(*required_roles: str):
93+
"""Dependency factory enforcing at least one RBAC role."""
94+
95+
async def _role_guard(auth: AuthContext = Depends(get_auth_context)) -> AuthContext:
96+
if required_roles and not any(role in auth.roles for role in required_roles):
97+
raise HTTPException(
98+
status_code=status.HTTP_403_FORBIDDEN,
99+
detail=f"Requires one of roles: {', '.join(required_roles)}",
100+
)
101+
return auth
102+
103+
return _role_guard

pipeline/backend/config.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,35 @@ class Settings(BaseSettings):
138138
KAFKA_ENABLED: bool = False
139139
KAFKA_BOOTSTRAP_SERVERS: str = "localhost:9092"
140140
KAFKA_EXECUTION_TOPIC: str = "pipeline.executions"
141+
142+
# ===================
143+
# Modern Orchestration Stack
144+
# ===================
145+
ORCHESTRATION_ENGINE: str = "temporal" # temporal | airflow
146+
TEMPORAL_ENABLED: bool = True
147+
TEMPORAL_NAMESPACE: str = "default"
148+
TEMPORAL_TASK_QUEUE: str = "flexiroaster-orchestration"
149+
TEMPORAL_ADDRESS: str = "localhost:7233"
150+
151+
KUBERNETES_NAMESPACE: str = "flexiroaster"
152+
KUBERNETES_JOB_IMAGE: str = "python:3.11-slim"
153+
KUBERNETES_SERVICE_ACCOUNT: str = "pipeline-runner"
154+
155+
RAY_ENABLED: bool = True
156+
RAY_DASHBOARD_URL: str = "http://localhost:8265"
157+
RAY_JOB_ENTRYPOINT: str = "python -m worker"
158+
159+
OBJECT_STORAGE_ENABLED: bool = True
160+
OBJECT_STORAGE_BUCKET: str = "pipeline-artifacts"
161+
OBJECT_STORAGE_ENDPOINT: str = "http://localhost:9000"
162+
OBJECT_STORAGE_ACCESS_KEY: str = "minio"
163+
OBJECT_STORAGE_SECRET_KEY: str = "minio123"
164+
165+
JWT_SECRET: str = ""
166+
JWT_ISSUER: str = "flexiroaster"
167+
JWT_AUDIENCE: str = "flexiroaster-api"
168+
169+
SECRETS_PROVIDER: str = "env" # env | aws | vault
141170

142171
class Config:
143172
env_file = ".env"
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
"""Modern advanced orchestration stack primitives.
2+
3+
Implements a configurable orchestration profile that combines:
4+
- FastAPI API layer
5+
- Airflow/Temporal orchestrators
6+
- Kafka eventing
7+
- Kubernetes jobs for execution
8+
- Ray for distributed compute
9+
- PostgreSQL + object storage persistence
10+
- Prometheus/Grafana/ELK observability
11+
- JWT + RBAC + secrets manager security
12+
"""
13+
from __future__ import annotations
14+
15+
from dataclasses import dataclass
16+
from datetime import datetime, timezone
17+
from typing import Any, Dict, List
18+
from uuid import uuid4
19+
20+
from config import settings
21+
22+
23+
@dataclass
24+
class StackComponent:
25+
name: str
26+
enabled: bool
27+
config: Dict[str, Any]
28+
29+
30+
class ModernOrchestrationStack:
31+
"""Service layer for the high-end orchestration stack."""
32+
33+
def architecture(self) -> Dict[str, Any]:
34+
"""Return stack topology and active configuration."""
35+
return {
36+
"api_layer": StackComponent(
37+
name="FastAPI",
38+
enabled=True,
39+
config={"base_path": settings.API_PREFIX},
40+
).__dict__,
41+
"orchestration": StackComponent(
42+
name=settings.ORCHESTRATION_ENGINE,
43+
enabled=True,
44+
config={
45+
"temporal_enabled": settings.TEMPORAL_ENABLED,
46+
"temporal_address": settings.TEMPORAL_ADDRESS,
47+
"namespace": settings.TEMPORAL_NAMESPACE,
48+
"task_queue": settings.TEMPORAL_TASK_QUEUE,
49+
},
50+
).__dict__,
51+
"event_layer": StackComponent(
52+
name="kafka",
53+
enabled=settings.KAFKA_ENABLED,
54+
config={
55+
"bootstrap_servers": settings.KAFKA_BOOTSTRAP_SERVERS,
56+
"topic": settings.KAFKA_EXECUTION_TOPIC,
57+
},
58+
).__dict__,
59+
"execution": StackComponent(
60+
name="kubernetes-jobs",
61+
enabled=True,
62+
config={
63+
"namespace": settings.KUBERNETES_NAMESPACE,
64+
"default_image": settings.KUBERNETES_JOB_IMAGE,
65+
"service_account": settings.KUBERNETES_SERVICE_ACCOUNT,
66+
},
67+
).__dict__,
68+
"distributed_compute": StackComponent(
69+
name="ray",
70+
enabled=settings.RAY_ENABLED,
71+
config={
72+
"dashboard_url": settings.RAY_DASHBOARD_URL,
73+
"entrypoint": settings.RAY_JOB_ENTRYPOINT,
74+
},
75+
).__dict__,
76+
"storage": {
77+
"database": "postgresql",
78+
"object_storage": {
79+
"enabled": settings.OBJECT_STORAGE_ENABLED,
80+
"bucket": settings.OBJECT_STORAGE_BUCKET,
81+
"endpoint": settings.OBJECT_STORAGE_ENDPOINT,
82+
},
83+
},
84+
"monitoring": {
85+
"metrics": ["prometheus", "grafana"],
86+
"logs": ["elasticsearch", "logstash", "kibana"],
87+
},
88+
"security": {
89+
"auth": "jwt",
90+
"authorization": "rbac",
91+
"secrets_provider": settings.SECRETS_PROVIDER,
92+
},
93+
}
94+
95+
def submit_execution(self, pipeline_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
96+
"""Create a stack-aware execution command envelope.
97+
98+
This API intentionally stays transport-agnostic and returns metadata that
99+
can be consumed by workers/connectors for Airflow/Temporal/Kubernetes/Ray.
100+
"""
101+
execution_id = f"exec-{uuid4()}"
102+
now = datetime.now(tz=timezone.utc).isoformat()
103+
104+
commands: List[Dict[str, Any]] = [
105+
{
106+
"layer": "orchestration",
107+
"engine": settings.ORCHESTRATION_ENGINE,
108+
"action": "start_workflow",
109+
},
110+
{
111+
"layer": "execution",
112+
"engine": "kubernetes-jobs",
113+
"action": "submit_job",
114+
"namespace": settings.KUBERNETES_NAMESPACE,
115+
"image": settings.KUBERNETES_JOB_IMAGE,
116+
},
117+
]
118+
119+
if settings.RAY_ENABLED:
120+
commands.append(
121+
{
122+
"layer": "distributed_compute",
123+
"engine": "ray",
124+
"action": "submit_ray_job",
125+
"dashboard": settings.RAY_DASHBOARD_URL,
126+
}
127+
)
128+
129+
if settings.KAFKA_ENABLED:
130+
commands.append(
131+
{
132+
"layer": "event_layer",
133+
"engine": "kafka",
134+
"action": "publish_event",
135+
"topic": settings.KAFKA_EXECUTION_TOPIC,
136+
}
137+
)
138+
139+
return {
140+
"execution_id": execution_id,
141+
"pipeline_id": pipeline_id,
142+
"submitted_at": now,
143+
"status": "accepted",
144+
"commands": commands,
145+
"payload": payload,
146+
}

pipeline/backend/main.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@
1717
from db import create_tables
1818
from core.redis_state import redis_state_manager
1919
from core.executor import pipeline_executor
20-
from api.routes import pipelines, executions, health, monitoring, ai_automation, microservices
21-
from api.routes import ai_automation, executions, health, model_infra, monitoring, pipelines
22-
from api.routes import pipelines, executions, health, monitoring, ai_automation
20+
from api.routes import (
21+
advanced_stack,
22+
ai_automation,
23+
executions,
24+
health,
25+
microservices,
26+
model_infra,
27+
monitoring,
28+
pipelines,
29+
)
2330
from core.elasticsearch_client import elasticsearch_manager
2431
from observability import setup_observability
2532

@@ -196,6 +203,7 @@ async def root():
196203
app.include_router(ai_automation.router, prefix=settings.API_PREFIX)
197204
app.include_router(microservices.router, prefix=settings.API_PREFIX)
198205
app.include_router(model_infra.router, prefix=settings.API_PREFIX)
206+
app.include_router(advanced_stack.router, prefix=settings.API_PREFIX)
199207

200208

201209
# ===================

0 commit comments

Comments
 (0)