Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

A production-ready pipeline automation system built with:

- **Apache Airflow** - Orchestration & scheduling
- **Apache Airflow / Temporal** - Orchestration & scheduling
- **FastAPI** - Pipeline execution engine
- **Redis** - State management, locks, caching
- **Celery** - Distributed task queue
- **Kafka** - Event-driven execution streaming
- **Kubernetes Jobs** - Isolated execution runtime
- **Ray** - Distributed compute engine
- **MinIO/Object Storage** - Artifact persistence
- **PostgreSQL** - Persistence
- **AI Safety Module** - Failure prediction & anomaly handling
- **Elasticsearch** - Execution log indexing, fast filtering, analytics
Expand Down Expand Up @@ -397,3 +400,13 @@ MIT License
cd pipeline/backend
celery -A core.tasks.celery_app worker --loglevel=info --concurrency=4
```


### Advanced Orchestration Stack API

| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/api/advanced-stack/architecture` | Returns the composed stack profile (JWT+RBAC protected) |
| POST | `/api/advanced-stack/executions` | Creates an orchestration envelope for Airflow/Temporal + K8s + Ray + Kafka |

Security model uses JWT bearer auth with RBAC roles (`admin`, `operator`, `viewer`) and pluggable secrets providers (`env`, `aws`, `vault`).
13 changes: 10 additions & 3 deletions pipeline/backend/api/routes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
"""API Routes module initialization."""
from api.routes import pipelines, executions, health, monitoring, ai_automation, microservices
"""API route module exports."""

from api.routes import ai_automation, executions, health, model_infra, monitoring, pipelines
from api.routes import (
advanced_stack,
ai_automation,
executions,
health,
microservices,
model_infra,
monitoring,
pipelines,
)
35 changes: 35 additions & 0 deletions pipeline/backend/api/routes/advanced_stack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Advanced orchestration stack endpoints."""
from __future__ import annotations

from typing import Any, Dict

from fastapi import APIRouter, Depends
from pydantic import BaseModel, Field

from api.security import AuthContext, require_roles
from core.modern_stack import ModernOrchestrationStack

router = APIRouter(prefix="/advanced-stack", tags=["advanced-stack"])
stack = ModernOrchestrationStack()


class StackExecutionRequest(BaseModel):
pipeline_id: str = Field(..., description="Pipeline identifier to execute")
payload: Dict[str, Any] = Field(default_factory=dict, description="Execution payload")


@router.get("/architecture")
async def get_stack_architecture(
_: AuthContext = Depends(require_roles("admin", "operator", "viewer")),
) -> Dict[str, Any]:
"""Return current high-end orchestration stack architecture."""
return stack.architecture()


@router.post("/executions")
async def submit_stack_execution(
request: StackExecutionRequest,
_: AuthContext = Depends(require_roles("admin", "operator")),
) -> Dict[str, Any]:
"""Submit an execution envelope for orchestration connectors."""
return stack.submit_execution(request.pipeline_id, request.payload)
103 changes: 103 additions & 0 deletions pipeline/backend/api/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""JWT + RBAC security helpers for advanced orchestration routes."""
from __future__ import annotations

import base64
import hashlib
import hmac
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, List

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer

from config import settings

bearer_scheme = HTTPBearer(auto_error=False)


@dataclass
class AuthContext:
"""Authenticated identity context extracted from a JWT token."""

subject: str
roles: List[str]
claims: Dict[str, Any]


def _b64url_decode(data: str) -> bytes:
padding = "=" * (-len(data) % 4)
return base64.urlsafe_b64decode(data + padding)


def _decode_hs256_jwt(token: str, secret: str) -> Dict[str, Any]:
"""Decode and verify a compact JWT token signed with HS256."""
try:
header_segment, payload_segment, signature_segment = token.split(".")
except ValueError as exc:
raise HTTPException(status_code=401, detail="Malformed JWT token") from exc

signing_input = f"{header_segment}.{payload_segment}".encode("utf-8")
expected_signature = hmac.new(
secret.encode("utf-8"), signing_input, hashlib.sha256
).digest()
supplied_signature = _b64url_decode(signature_segment)

if not hmac.compare_digest(expected_signature, supplied_signature):
raise HTTPException(status_code=401, detail="Invalid JWT signature")

header = json.loads(_b64url_decode(header_segment).decode("utf-8"))
if header.get("alg") != "HS256":
raise HTTPException(status_code=401, detail="Unsupported JWT algorithm")

payload = json.loads(_b64url_decode(payload_segment).decode("utf-8"))
exp = payload.get("exp")
if exp is not None:
expires_at = datetime.fromtimestamp(float(exp), tz=timezone.utc)
if datetime.now(tz=timezone.utc) >= expires_at:
raise HTTPException(status_code=401, detail="JWT token is expired")

return payload


async def get_auth_context(
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
) -> AuthContext:
"""Extract JWT identity and roles from Authorization bearer token."""
if not settings.JWT_SECRET:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="JWT security is not configured",
)

if credentials is None or not credentials.credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing bearer token",
)

claims = _decode_hs256_jwt(credentials.credentials, settings.JWT_SECRET)
subject = claims.get("sub")
roles = claims.get("roles") or []

if not subject:
raise HTTPException(status_code=401, detail="JWT subject is required")
if not isinstance(roles, list):
raise HTTPException(status_code=401, detail="JWT roles claim must be a list")

return AuthContext(subject=subject, roles=roles, claims=claims)


def require_roles(*required_roles: str):
"""Dependency factory enforcing at least one RBAC role."""

async def _role_guard(auth: AuthContext = Depends(get_auth_context)) -> AuthContext:
if required_roles and not any(role in auth.roles for role in required_roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires one of roles: {', '.join(required_roles)}",
)
return auth

return _role_guard
29 changes: 29 additions & 0 deletions pipeline/backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,35 @@ class Settings(BaseSettings):
KAFKA_ENABLED: bool = False
KAFKA_BOOTSTRAP_SERVERS: str = "localhost:9092"
KAFKA_EXECUTION_TOPIC: str = "pipeline.executions"

# ===================
# Modern Orchestration Stack
# ===================
ORCHESTRATION_ENGINE: str = "temporal" # temporal | airflow
TEMPORAL_ENABLED: bool = True
TEMPORAL_NAMESPACE: str = "default"
TEMPORAL_TASK_QUEUE: str = "flexiroaster-orchestration"
TEMPORAL_ADDRESS: str = "localhost:7233"

KUBERNETES_NAMESPACE: str = "flexiroaster"
KUBERNETES_JOB_IMAGE: str = "python:3.11-slim"
KUBERNETES_SERVICE_ACCOUNT: str = "pipeline-runner"

RAY_ENABLED: bool = True
RAY_DASHBOARD_URL: str = "http://localhost:8265"
RAY_JOB_ENTRYPOINT: str = "python -m worker"

OBJECT_STORAGE_ENABLED: bool = True
OBJECT_STORAGE_BUCKET: str = "pipeline-artifacts"
OBJECT_STORAGE_ENDPOINT: str = "http://localhost:9000"
OBJECT_STORAGE_ACCESS_KEY: str = "minio"
OBJECT_STORAGE_SECRET_KEY: str = "minio123"

JWT_SECRET: str = ""
JWT_ISSUER: str = "flexiroaster"
JWT_AUDIENCE: str = "flexiroaster-api"

SECRETS_PROVIDER: str = "env" # env | aws | vault

class Config:
env_file = ".env"
Expand Down
146 changes: 146 additions & 0 deletions pipeline/backend/core/modern_stack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""Modern advanced orchestration stack primitives.

Implements a configurable orchestration profile that combines:
- FastAPI API layer
- Airflow/Temporal orchestrators
- Kafka eventing
- Kubernetes jobs for execution
- Ray for distributed compute
- PostgreSQL + object storage persistence
- Prometheus/Grafana/ELK observability
- JWT + RBAC + secrets manager security
"""
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, List
from uuid import uuid4

from config import settings


@dataclass
class StackComponent:
name: str
enabled: bool
config: Dict[str, Any]


class ModernOrchestrationStack:
"""Service layer for the high-end orchestration stack."""

def architecture(self) -> Dict[str, Any]:
"""Return stack topology and active configuration."""
return {
"api_layer": StackComponent(
name="FastAPI",
enabled=True,
config={"base_path": settings.API_PREFIX},
).__dict__,
"orchestration": StackComponent(
name=settings.ORCHESTRATION_ENGINE,
enabled=True,
config={
"temporal_enabled": settings.TEMPORAL_ENABLED,
"temporal_address": settings.TEMPORAL_ADDRESS,
"namespace": settings.TEMPORAL_NAMESPACE,
"task_queue": settings.TEMPORAL_TASK_QUEUE,
},
).__dict__,
"event_layer": StackComponent(
name="kafka",
enabled=settings.KAFKA_ENABLED,
config={
"bootstrap_servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"topic": settings.KAFKA_EXECUTION_TOPIC,
},
).__dict__,
"execution": StackComponent(
name="kubernetes-jobs",
enabled=True,
config={
"namespace": settings.KUBERNETES_NAMESPACE,
"default_image": settings.KUBERNETES_JOB_IMAGE,
"service_account": settings.KUBERNETES_SERVICE_ACCOUNT,
},
).__dict__,
"distributed_compute": StackComponent(
name="ray",
enabled=settings.RAY_ENABLED,
config={
"dashboard_url": settings.RAY_DASHBOARD_URL,
"entrypoint": settings.RAY_JOB_ENTRYPOINT,
},
).__dict__,
"storage": {
"database": "postgresql",
"object_storage": {
"enabled": settings.OBJECT_STORAGE_ENABLED,
"bucket": settings.OBJECT_STORAGE_BUCKET,
"endpoint": settings.OBJECT_STORAGE_ENDPOINT,
},
},
"monitoring": {
"metrics": ["prometheus", "grafana"],
"logs": ["elasticsearch", "logstash", "kibana"],
},
"security": {
"auth": "jwt",
"authorization": "rbac",
"secrets_provider": settings.SECRETS_PROVIDER,
},
}

def submit_execution(self, pipeline_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Create a stack-aware execution command envelope.

This API intentionally stays transport-agnostic and returns metadata that
can be consumed by workers/connectors for Airflow/Temporal/Kubernetes/Ray.
"""
execution_id = f"exec-{uuid4()}"
now = datetime.now(tz=timezone.utc).isoformat()

commands: List[Dict[str, Any]] = [
{
"layer": "orchestration",
"engine": settings.ORCHESTRATION_ENGINE,
"action": "start_workflow",
},
{
"layer": "execution",
"engine": "kubernetes-jobs",
"action": "submit_job",
"namespace": settings.KUBERNETES_NAMESPACE,
"image": settings.KUBERNETES_JOB_IMAGE,
},
]

if settings.RAY_ENABLED:
commands.append(
{
"layer": "distributed_compute",
"engine": "ray",
"action": "submit_ray_job",
"dashboard": settings.RAY_DASHBOARD_URL,
}
)

if settings.KAFKA_ENABLED:
commands.append(
{
"layer": "event_layer",
"engine": "kafka",
"action": "publish_event",
"topic": settings.KAFKA_EXECUTION_TOPIC,
}
)

return {
"execution_id": execution_id,
"pipeline_id": pipeline_id,
"submitted_at": now,
"status": "accepted",
"commands": commands,
"payload": payload,
}
14 changes: 11 additions & 3 deletions pipeline/backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@
from db import create_tables
from core.redis_state import redis_state_manager
from core.executor import pipeline_executor
from api.routes import pipelines, executions, health, monitoring, ai_automation, microservices
from api.routes import ai_automation, executions, health, model_infra, monitoring, pipelines
from api.routes import pipelines, executions, health, monitoring, ai_automation
from api.routes import (
advanced_stack,
ai_automation,
executions,
health,
microservices,
model_infra,
monitoring,
pipelines,
)
from core.elasticsearch_client import elasticsearch_manager
from observability import setup_observability

Expand Down Expand Up @@ -196,6 +203,7 @@ async def root():
app.include_router(ai_automation.router, prefix=settings.API_PREFIX)
app.include_router(microservices.router, prefix=settings.API_PREFIX)
app.include_router(model_infra.router, prefix=settings.API_PREFIX)
app.include_router(advanced_stack.router, prefix=settings.API_PREFIX)


# ===================
Expand Down
Loading
Loading