diff --git a/pipeline/README.md b/pipeline/README.md index d812b15..b60479e 100644 --- a/pipeline/README.md +++ b/pipeline/README.md @@ -2,11 +2,14 @@ A production-ready pipeline automation system built with: -- **Apache Airflow** - Orchestration & scheduling +- **Apache Airflow / Temporal** - Orchestration & scheduling - **FastAPI** - Pipeline execution engine - **Redis** - State management, locks, caching - **Celery** - Distributed task queue - **Kafka** - Event-driven execution streaming +- **Kubernetes Jobs** - Isolated execution runtime +- **Ray** - Distributed compute engine +- **MinIO/Object Storage** - Artifact persistence - **PostgreSQL** - Persistence - **AI Safety Module** - Failure prediction & anomaly handling - **Elasticsearch** - Execution log indexing, fast filtering, analytics @@ -397,3 +400,13 @@ MIT License cd pipeline/backend celery -A core.tasks.celery_app worker --loglevel=info --concurrency=4 ``` + + +### Advanced Orchestration Stack API + +| Method | Endpoint | Description | +|--------|----------|-------------| +| GET | `/api/advanced-stack/architecture` | Returns the composed stack profile (JWT+RBAC protected) | +| POST | `/api/advanced-stack/executions` | Creates an orchestration envelope for Airflow/Temporal + K8s + Ray + Kafka | + +Security model uses JWT bearer auth with RBAC roles (`admin`, `operator`, `viewer`) and pluggable secrets providers (`env`, `aws`, `vault`). diff --git a/pipeline/backend/api/routes/__init__.py b/pipeline/backend/api/routes/__init__.py index fc24fdf..f2b2ac0 100644 --- a/pipeline/backend/api/routes/__init__.py +++ b/pipeline/backend/api/routes/__init__.py @@ -1,5 +1,12 @@ -"""API Routes module initialization.""" -from api.routes import pipelines, executions, health, monitoring, ai_automation, microservices """API route module exports.""" -from api.routes import ai_automation, executions, health, model_infra, monitoring, pipelines +from api.routes import ( + advanced_stack, + ai_automation, + executions, + health, + microservices, + model_infra, + monitoring, + pipelines, +) diff --git a/pipeline/backend/api/routes/advanced_stack.py b/pipeline/backend/api/routes/advanced_stack.py new file mode 100644 index 0000000..8a2e281 --- /dev/null +++ b/pipeline/backend/api/routes/advanced_stack.py @@ -0,0 +1,35 @@ +"""Advanced orchestration stack endpoints.""" +from __future__ import annotations + +from typing import Any, Dict + +from fastapi import APIRouter, Depends +from pydantic import BaseModel, Field + +from api.security import AuthContext, require_roles +from core.modern_stack import ModernOrchestrationStack + +router = APIRouter(prefix="/advanced-stack", tags=["advanced-stack"]) +stack = ModernOrchestrationStack() + + +class StackExecutionRequest(BaseModel): + pipeline_id: str = Field(..., description="Pipeline identifier to execute") + payload: Dict[str, Any] = Field(default_factory=dict, description="Execution payload") + + +@router.get("/architecture") +async def get_stack_architecture( + _: AuthContext = Depends(require_roles("admin", "operator", "viewer")), +) -> Dict[str, Any]: + """Return current high-end orchestration stack architecture.""" + return stack.architecture() + + +@router.post("/executions") +async def submit_stack_execution( + request: StackExecutionRequest, + _: AuthContext = Depends(require_roles("admin", "operator")), +) -> Dict[str, Any]: + """Submit an execution envelope for orchestration connectors.""" + return stack.submit_execution(request.pipeline_id, request.payload) diff --git a/pipeline/backend/api/security.py b/pipeline/backend/api/security.py new file mode 100644 index 0000000..8904c8d --- /dev/null +++ b/pipeline/backend/api/security.py @@ -0,0 +1,103 @@ +"""JWT + RBAC security helpers for advanced orchestration routes.""" +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Dict, List + +from fastapi import Depends, HTTPException, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer + +from config import settings + +bearer_scheme = HTTPBearer(auto_error=False) + + +@dataclass +class AuthContext: + """Authenticated identity context extracted from a JWT token.""" + + subject: str + roles: List[str] + claims: Dict[str, Any] + + +def _b64url_decode(data: str) -> bytes: + padding = "=" * (-len(data) % 4) + return base64.urlsafe_b64decode(data + padding) + + +def _decode_hs256_jwt(token: str, secret: str) -> Dict[str, Any]: + """Decode and verify a compact JWT token signed with HS256.""" + try: + header_segment, payload_segment, signature_segment = token.split(".") + except ValueError as exc: + raise HTTPException(status_code=401, detail="Malformed JWT token") from exc + + signing_input = f"{header_segment}.{payload_segment}".encode("utf-8") + expected_signature = hmac.new( + secret.encode("utf-8"), signing_input, hashlib.sha256 + ).digest() + supplied_signature = _b64url_decode(signature_segment) + + if not hmac.compare_digest(expected_signature, supplied_signature): + raise HTTPException(status_code=401, detail="Invalid JWT signature") + + header = json.loads(_b64url_decode(header_segment).decode("utf-8")) + if header.get("alg") != "HS256": + raise HTTPException(status_code=401, detail="Unsupported JWT algorithm") + + payload = json.loads(_b64url_decode(payload_segment).decode("utf-8")) + exp = payload.get("exp") + if exp is not None: + expires_at = datetime.fromtimestamp(float(exp), tz=timezone.utc) + if datetime.now(tz=timezone.utc) >= expires_at: + raise HTTPException(status_code=401, detail="JWT token is expired") + + return payload + + +async def get_auth_context( + credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), +) -> AuthContext: + """Extract JWT identity and roles from Authorization bearer token.""" + if not settings.JWT_SECRET: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="JWT security is not configured", + ) + + if credentials is None or not credentials.credentials: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Missing bearer token", + ) + + claims = _decode_hs256_jwt(credentials.credentials, settings.JWT_SECRET) + subject = claims.get("sub") + roles = claims.get("roles") or [] + + if not subject: + raise HTTPException(status_code=401, detail="JWT subject is required") + if not isinstance(roles, list): + raise HTTPException(status_code=401, detail="JWT roles claim must be a list") + + return AuthContext(subject=subject, roles=roles, claims=claims) + + +def require_roles(*required_roles: str): + """Dependency factory enforcing at least one RBAC role.""" + + async def _role_guard(auth: AuthContext = Depends(get_auth_context)) -> AuthContext: + if required_roles and not any(role in auth.roles for role in required_roles): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Requires one of roles: {', '.join(required_roles)}", + ) + return auth + + return _role_guard diff --git a/pipeline/backend/config.py b/pipeline/backend/config.py index c8705bd..938cabc 100644 --- a/pipeline/backend/config.py +++ b/pipeline/backend/config.py @@ -138,6 +138,35 @@ class Settings(BaseSettings): KAFKA_ENABLED: bool = False KAFKA_BOOTSTRAP_SERVERS: str = "localhost:9092" KAFKA_EXECUTION_TOPIC: str = "pipeline.executions" + + # =================== + # Modern Orchestration Stack + # =================== + ORCHESTRATION_ENGINE: str = "temporal" # temporal | airflow + TEMPORAL_ENABLED: bool = True + TEMPORAL_NAMESPACE: str = "default" + TEMPORAL_TASK_QUEUE: str = "flexiroaster-orchestration" + TEMPORAL_ADDRESS: str = "localhost:7233" + + KUBERNETES_NAMESPACE: str = "flexiroaster" + KUBERNETES_JOB_IMAGE: str = "python:3.11-slim" + KUBERNETES_SERVICE_ACCOUNT: str = "pipeline-runner" + + RAY_ENABLED: bool = True + RAY_DASHBOARD_URL: str = "http://localhost:8265" + RAY_JOB_ENTRYPOINT: str = "python -m worker" + + OBJECT_STORAGE_ENABLED: bool = True + OBJECT_STORAGE_BUCKET: str = "pipeline-artifacts" + OBJECT_STORAGE_ENDPOINT: str = "http://localhost:9000" + OBJECT_STORAGE_ACCESS_KEY: str = "minio" + OBJECT_STORAGE_SECRET_KEY: str = "minio123" + + JWT_SECRET: str = "" + JWT_ISSUER: str = "flexiroaster" + JWT_AUDIENCE: str = "flexiroaster-api" + + SECRETS_PROVIDER: str = "env" # env | aws | vault class Config: env_file = ".env" diff --git a/pipeline/backend/core/modern_stack.py b/pipeline/backend/core/modern_stack.py new file mode 100644 index 0000000..659ee7e --- /dev/null +++ b/pipeline/backend/core/modern_stack.py @@ -0,0 +1,146 @@ +"""Modern advanced orchestration stack primitives. + +Implements a configurable orchestration profile that combines: +- FastAPI API layer +- Airflow/Temporal orchestrators +- Kafka eventing +- Kubernetes jobs for execution +- Ray for distributed compute +- PostgreSQL + object storage persistence +- Prometheus/Grafana/ELK observability +- JWT + RBAC + secrets manager security +""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Dict, List +from uuid import uuid4 + +from config import settings + + +@dataclass +class StackComponent: + name: str + enabled: bool + config: Dict[str, Any] + + +class ModernOrchestrationStack: + """Service layer for the high-end orchestration stack.""" + + def architecture(self) -> Dict[str, Any]: + """Return stack topology and active configuration.""" + return { + "api_layer": StackComponent( + name="FastAPI", + enabled=True, + config={"base_path": settings.API_PREFIX}, + ).__dict__, + "orchestration": StackComponent( + name=settings.ORCHESTRATION_ENGINE, + enabled=True, + config={ + "temporal_enabled": settings.TEMPORAL_ENABLED, + "temporal_address": settings.TEMPORAL_ADDRESS, + "namespace": settings.TEMPORAL_NAMESPACE, + "task_queue": settings.TEMPORAL_TASK_QUEUE, + }, + ).__dict__, + "event_layer": StackComponent( + name="kafka", + enabled=settings.KAFKA_ENABLED, + config={ + "bootstrap_servers": settings.KAFKA_BOOTSTRAP_SERVERS, + "topic": settings.KAFKA_EXECUTION_TOPIC, + }, + ).__dict__, + "execution": StackComponent( + name="kubernetes-jobs", + enabled=True, + config={ + "namespace": settings.KUBERNETES_NAMESPACE, + "default_image": settings.KUBERNETES_JOB_IMAGE, + "service_account": settings.KUBERNETES_SERVICE_ACCOUNT, + }, + ).__dict__, + "distributed_compute": StackComponent( + name="ray", + enabled=settings.RAY_ENABLED, + config={ + "dashboard_url": settings.RAY_DASHBOARD_URL, + "entrypoint": settings.RAY_JOB_ENTRYPOINT, + }, + ).__dict__, + "storage": { + "database": "postgresql", + "object_storage": { + "enabled": settings.OBJECT_STORAGE_ENABLED, + "bucket": settings.OBJECT_STORAGE_BUCKET, + "endpoint": settings.OBJECT_STORAGE_ENDPOINT, + }, + }, + "monitoring": { + "metrics": ["prometheus", "grafana"], + "logs": ["elasticsearch", "logstash", "kibana"], + }, + "security": { + "auth": "jwt", + "authorization": "rbac", + "secrets_provider": settings.SECRETS_PROVIDER, + }, + } + + def submit_execution(self, pipeline_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: + """Create a stack-aware execution command envelope. + + This API intentionally stays transport-agnostic and returns metadata that + can be consumed by workers/connectors for Airflow/Temporal/Kubernetes/Ray. + """ + execution_id = f"exec-{uuid4()}" + now = datetime.now(tz=timezone.utc).isoformat() + + commands: List[Dict[str, Any]] = [ + { + "layer": "orchestration", + "engine": settings.ORCHESTRATION_ENGINE, + "action": "start_workflow", + }, + { + "layer": "execution", + "engine": "kubernetes-jobs", + "action": "submit_job", + "namespace": settings.KUBERNETES_NAMESPACE, + "image": settings.KUBERNETES_JOB_IMAGE, + }, + ] + + if settings.RAY_ENABLED: + commands.append( + { + "layer": "distributed_compute", + "engine": "ray", + "action": "submit_ray_job", + "dashboard": settings.RAY_DASHBOARD_URL, + } + ) + + if settings.KAFKA_ENABLED: + commands.append( + { + "layer": "event_layer", + "engine": "kafka", + "action": "publish_event", + "topic": settings.KAFKA_EXECUTION_TOPIC, + } + ) + + return { + "execution_id": execution_id, + "pipeline_id": pipeline_id, + "submitted_at": now, + "status": "accepted", + "commands": commands, + "payload": payload, + } diff --git a/pipeline/backend/main.py b/pipeline/backend/main.py index d8a8e8d..1ca40f5 100644 --- a/pipeline/backend/main.py +++ b/pipeline/backend/main.py @@ -17,9 +17,16 @@ from db import create_tables from core.redis_state import redis_state_manager from core.executor import pipeline_executor -from api.routes import pipelines, executions, health, monitoring, ai_automation, microservices -from api.routes import ai_automation, executions, health, model_infra, monitoring, pipelines -from api.routes import pipelines, executions, health, monitoring, ai_automation +from api.routes import ( + advanced_stack, + ai_automation, + executions, + health, + microservices, + model_infra, + monitoring, + pipelines, +) from core.elasticsearch_client import elasticsearch_manager from observability import setup_observability @@ -196,6 +203,7 @@ async def root(): app.include_router(ai_automation.router, prefix=settings.API_PREFIX) app.include_router(microservices.router, prefix=settings.API_PREFIX) app.include_router(model_infra.router, prefix=settings.API_PREFIX) +app.include_router(advanced_stack.router, prefix=settings.API_PREFIX) # =================== diff --git a/pipeline/backend/requirements.txt b/pipeline/backend/requirements.txt index 8db7b76..21cee6d 100644 --- a/pipeline/backend/requirements.txt +++ b/pipeline/backend/requirements.txt @@ -49,3 +49,9 @@ kafka-python==2.0.2 # gRPC internal service communication grpcio==1.65.5 grpcio-tools==1.65.5 + +# Advanced orchestration stack +temporalio==1.8.0 +kubernetes==30.1.0 +ray[default]==2.37.0 +boto3==1.35.17 diff --git a/pipeline/backend/tests/test_modern_stack.py b/pipeline/backend/tests/test_modern_stack.py new file mode 100644 index 0000000..9eb7d6e --- /dev/null +++ b/pipeline/backend/tests/test_modern_stack.py @@ -0,0 +1,34 @@ +from importlib.util import module_from_spec, spec_from_file_location +from pathlib import Path +import sys + + +module_path = Path(__file__).resolve().parents[1] / "core" / "modern_stack.py" +spec = spec_from_file_location("modern_stack", module_path) +modern_stack = module_from_spec(spec) +assert spec and spec.loader +sys.modules[spec.name] = modern_stack +spec.loader.exec_module(modern_stack) +ModernOrchestrationStack = modern_stack.ModernOrchestrationStack + + +def test_architecture_contains_requested_layers(): + stack = ModernOrchestrationStack() + + architecture = stack.architecture() + + assert architecture["api_layer"]["name"] == "FastAPI" + assert architecture["orchestration"]["name"] in {"temporal", "airflow"} + assert architecture["execution"]["name"] == "kubernetes-jobs" + assert architecture["distributed_compute"]["name"] == "ray" + assert architecture["security"]["authorization"] == "rbac" + + +def test_submit_execution_returns_envelope(): + stack = ModernOrchestrationStack() + + envelope = stack.submit_execution("pipeline-123", {"run_type": "full"}) + + assert envelope["pipeline_id"] == "pipeline-123" + assert envelope["status"] == "accepted" + assert any(command["layer"] == "execution" for command in envelope["commands"]) diff --git a/pipeline/docker-compose.yml b/pipeline/docker-compose.yml index 2882271..373efff 100644 --- a/pipeline/docker-compose.yml +++ b/pipeline/docker-compose.yml @@ -124,6 +124,24 @@ services: SENTRY_ENVIRONMENT: ${SENTRY_ENVIRONMENT:-local} SENTRY_TRACES_SAMPLE_RATE: ${SENTRY_TRACES_SAMPLE_RATE:-0.1} SENTRY_PROFILES_SAMPLE_RATE: ${SENTRY_PROFILES_SAMPLE_RATE:-0.1} + ORCHESTRATION_ENGINE: temporal + TEMPORAL_ENABLED: "true" + TEMPORAL_ADDRESS: temporal:7233 + TEMPORAL_NAMESPACE: default + TEMPORAL_TASK_QUEUE: flexiroaster-orchestration + KAFKA_ENABLED: "true" + KAFKA_BOOTSTRAP_SERVERS: kafka:29092 + KUBERNETES_NAMESPACE: flexiroaster + KUBERNETES_JOB_IMAGE: python:3.11-slim + RAY_ENABLED: "true" + RAY_DASHBOARD_URL: http://ray-head:8265 + OBJECT_STORAGE_ENABLED: "true" + OBJECT_STORAGE_BUCKET: pipeline-artifacts + OBJECT_STORAGE_ENDPOINT: http://minio:9000 + OBJECT_STORAGE_ACCESS_KEY: minio + OBJECT_STORAGE_SECRET_KEY: minio123 + JWT_SECRET: change-me-in-production + SECRETS_PROVIDER: env ports: - "8000:8000" depends_on: @@ -309,7 +327,91 @@ services: condition: service_healthy restart: always + + # Kafka (Event Layer) + zookeeper: + image: confluentinc/cp-zookeeper:7.6.1 + container_name: flexiroaster-zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "2181:2181" + restart: always + + kafka: + image: confluentinc/cp-kafka:7.6.1 + container_name: flexiroaster-kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + restart: always + + # Temporal (Alternative Orchestration) + temporal: + image: temporalio/auto-setup:1.25.0 + container_name: flexiroaster-temporal + depends_on: + postgres: + condition: service_healthy + environment: + DB: postgresql + DB_PORT: 5432 + POSTGRES_USER: airflow + POSTGRES_PWD: airflow + POSTGRES_SEEDS: postgres + DYNAMIC_CONFIG_FILE_PATH: config/dynamicconfig/development-sql.yaml + ports: + - "7233:7233" + restart: always + + temporal-ui: + image: temporalio/ui:2.29.1 + container_name: flexiroaster-temporal-ui + environment: + TEMPORAL_ADDRESS: temporal:7233 + ports: + - "8088:8080" + depends_on: + - temporal + restart: always + + # Ray Distributed Compute + ray-head: + image: rayproject/ray:2.37.0 + container_name: flexiroaster-ray-head + command: ray start --head --dashboard-host=0.0.0.0 --port=6378 --dashboard-port=8265 --block + ports: + - "8265:8265" + - "6378:6378" + restart: always + + # Object Storage + minio: + image: minio/minio:RELEASE.2024-09-13T20-26-02Z + container_name: flexiroaster-minio + command: server /data --console-address ":9001" + environment: + MINIO_ROOT_USER: minio + MINIO_ROOT_PASSWORD: minio123 + volumes: + - minio-data:/data + ports: + - "9000:9000" + - "9001:9001" + restart: always + volumes: + minio-data: postgres-data: redis-data: prometheus-data: