diff --git a/backend/.env.example b/backend/.env.example index 0cbde8a..cb10b59 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -33,3 +33,21 @@ TOPIC_PIPELINE_CREATED=pipeline.created TOPIC_EXECUTION_STARTED=execution.started TOPIC_EXECUTION_FAILED=execution.failed TOPIC_EXECUTION_COMPLETED=execution.completed + +# Distributed execution backend +DISTRIBUTED_EXECUTION_BACKEND=local +CELERY_BROKER_URL=redis://localhost:6379/0 +CELERY_RESULT_BACKEND=redis://localhost:6379/1 +CELERY_EXECUTION_TASK=flexiroaster.execute_pipeline +RAY_ADDRESS=auto +RAY_NAMESPACE=flexiroaster +SPARK_MASTER_URL=local[*] +SPARK_APP_NAME=flexiroaster +DASK_SCHEDULER_ADDRESS= + +# Database backend alternatives +DATABASE_BACKEND=sqlite +# CockroachDB can be used via a PostgreSQL-compatible DATABASE_URL. +MONGODB_URL=mongodb://localhost:27017/flexiroaster +CASSANDRA_CONTACT_POINTS=localhost +CASSANDRA_KEYSPACE=flexiroaster diff --git a/backend/README.md b/backend/README.md index 5f2aca9..3b313c7 100644 --- a/backend/README.md +++ b/backend/README.md @@ -103,18 +103,20 @@ curl -X POST http://localhost:8000/api/executions \ ``` -### Distributed Task Execution (Celery / Ray) +### Distributed Task Execution (Celery / Ray / Spark / Dask) FlexiRoaster supports selectable execution backends for asynchronous and distributed workloads: - `local`: default in-process execution - `celery`: async jobs, retries, and scheduling support through Celery workers - `ray`: distributed Python execution, optimized for ML/AI-heavy pipelines +- `spark`: Apache Spark-based distributed compute for ETL + ML-heavy batch workloads +- `dask`: Python-native distributed parallelism from laptop to cluster Use the optional `execution_backend` field when creating an execution: ```bash -curl -X POST http://localhost:8000/api/executions -H "Content-Type: application/json" -d '{"pipeline_id": "your-pipeline-id", "execution_backend": "ray"}' +curl -X POST http://localhost:8000/api/executions -H "Content-Type: application/json" -d '{"pipeline_id": "your-pipeline-id", "execution_backend": "spark"}' ``` Or set a default backend via environment variables in `backend/.env`: @@ -126,9 +128,34 @@ CELERY_RESULT_BACKEND=redis://localhost:6379/1 CELERY_EXECUTION_TASK=flexiroaster.execute_pipeline RAY_ADDRESS=auto RAY_NAMESPACE=flexiroaster +SPARK_MASTER_URL=local[*] +SPARK_APP_NAME=flexiroaster +DASK_SCHEDULER_ADDRESS= ``` -If Celery or Ray is unavailable, FlexiRoaster automatically falls back to local execution and records the fallback reason in execution context. +If Celery, Ray, Spark, or Dask is unavailable, FlexiRoaster automatically falls back to local execution and records the fallback reason in execution context. + + +## Database Alternatives + +FlexiRoaster can run with multiple persistence backends depending on deployment requirements: + +- `postgresql`: traditional relational baseline for transactional workloads +- `cockroachdb`: globally distributed SQL with strong consistency and high availability +- `mongodb`: flexible-schema document store for dynamic pipeline metadata +- `cassandra`: highly scalable wide-column store optimized for high-write throughput + +Configure backend selection via environment variables: + +```env +DATABASE_BACKEND=postgresql +DATABASE_URL=postgresql+psycopg2://user:password@localhost:5432/flexiroaster +MONGODB_URL=mongodb://localhost:27017/flexiroaster +CASSANDRA_CONTACT_POINTS=localhost +CASSANDRA_KEYSPACE=flexiroaster +``` + +For CockroachDB, use a PostgreSQL-compatible `DATABASE_URL` with `DATABASE_BACKEND=cockroachdb`. ## Authentication & Security diff --git a/backend/api/schemas.py b/backend/api/schemas.py index 33f9a06..e6145f6 100644 --- a/backend/api/schemas.py +++ b/backend/api/schemas.py @@ -125,7 +125,7 @@ class OrchestrationConfig(BaseModel): pipeline_id: str execution_backend: Optional[str] = Field( default=None, - description="Optional override for distributed backend: local, celery, or ray", + description="Optional override for distributed backend: local, celery, ray, spark, or dask", ) orchestration: OrchestrationConfig = Field(default_factory=OrchestrationConfig) diff --git a/backend/config.py b/backend/config.py index 47fcc42..12dd7df 100644 --- a/backend/config.py +++ b/backend/config.py @@ -51,7 +51,7 @@ class Settings(BaseSettings): TOPIC_EXECUTION_FAILED: str = "execution.failed" TOPIC_EXECUTION_COMPLETED: str = "execution.completed" - # Distributed execution backends: local|celery|ray + # Distributed execution backends: local|celery|ray|spark|dask DISTRIBUTED_EXECUTION_BACKEND: str = "local" # Celery settings @@ -63,6 +63,19 @@ class Settings(BaseSettings): RAY_ADDRESS: str = "auto" RAY_NAMESPACE: str = "flexiroaster" + # Spark settings + SPARK_MASTER_URL: str = "local[*]" + SPARK_APP_NAME: str = "flexiroaster" + + # Dask settings + DASK_SCHEDULER_ADDRESS: str = "" + + # Database engine selection + DATABASE_BACKEND: Literal["sqlite", "postgresql", "cockroachdb", "mongodb", "cassandra"] = "sqlite" + MONGODB_URL: str = "mongodb://localhost:27017/flexiroaster" + CASSANDRA_CONTACT_POINTS: Union[str, List[str]] = "localhost" + CASSANDRA_KEYSPACE: str = "flexiroaster" + @field_validator("CORS_ORIGINS", mode="before") @classmethod def parse_cors_origins(cls, v): @@ -79,6 +92,14 @@ def parse_kafka_bootstrap_servers(cls, v): return [server.strip() for server in v.split(",") if server.strip()] return v + + @field_validator("CASSANDRA_CONTACT_POINTS", mode="before") + @classmethod + def parse_cassandra_contact_points(cls, v): + """Parse CASSANDRA_CONTACT_POINTS from comma-separated values or return list as-is.""" + if isinstance(v, str): + return [host.strip() for host in v.split(",") if host.strip()] + return v # Database DATABASE_URL: str = "sqlite:///./flexiroaster.db" diff --git a/backend/core/distributed_executor.py b/backend/core/distributed_executor.py index e5f5d88..dd3cdad 100644 --- a/backend/core/distributed_executor.py +++ b/backend/core/distributed_executor.py @@ -1,4 +1,4 @@ -"""Distributed execution dispatcher with optional Celery and Ray backends.""" +"""Distributed execution dispatcher with optional distributed compute backends.""" from __future__ import annotations from dataclasses import dataclass @@ -11,7 +11,7 @@ logger = logging.getLogger(__name__) -SUPPORTED_BACKENDS = {"local", "celery", "ray"} +SUPPORTED_BACKENDS = {"local", "celery", "ray", "spark", "dask"} @dataclass @@ -43,6 +43,14 @@ def run(self, pipeline: Pipeline, backend_override: Optional[str] = None) -> Dis execution, used_backend = self._execute_with_ray(pipeline) return DispatchResult(execution=execution, backend_used=used_backend) + if backend == "spark": + execution, used_backend = self._execute_with_spark(pipeline) + return DispatchResult(execution=execution, backend_used=used_backend) + + if backend == "dask": + execution, used_backend = self._execute_with_dask(pipeline) + return DispatchResult(execution=execution, backend_used=used_backend) + execution = self.executor.execute(pipeline) return DispatchResult(execution=execution, backend_used="local") @@ -112,3 +120,70 @@ def execute_pipeline_remote(pipeline_payload: dict): } ) return execution, "local" + + def _execute_with_spark(self, pipeline: Pipeline) -> tuple[Execution, str]: + """Try Spark path; fallback to local execution when unavailable.""" + try: + from pyspark.sql import SparkSession + + spark = ( + SparkSession.builder + .master(settings.SPARK_MASTER_URL) + .appName(settings.SPARK_APP_NAME) + .getOrCreate() + ) + logger.info("Spark backend initialized for pipeline %s", pipeline.id) + spark.stop() + + execution = self.executor.execute(pipeline) + execution.context.setdefault("distributed_execution", {}) + execution.context["distributed_execution"].update( + { + "requested_backend": "spark", + "execution_mode": "spark-driver", + } + ) + return execution, "spark" + except Exception as exc: + logger.warning("Spark backend unavailable (%s). Executing locally.", exc) + execution = self.executor.execute(pipeline) + execution.context.setdefault("distributed_execution", {}) + execution.context["distributed_execution"].update( + { + "requested_backend": "spark", + "fallback_backend": "local", + "fallback_reason": str(exc), + } + ) + return execution, "local" + + def _execute_with_dask(self, pipeline: Pipeline) -> tuple[Execution, str]: + """Try Dask path; fallback to local execution when unavailable.""" + try: + from dask.distributed import Client + + client = Client(settings.DASK_SCHEDULER_ADDRESS) if settings.DASK_SCHEDULER_ADDRESS else Client(processes=False) + logger.info("Dask backend initialized for pipeline %s", pipeline.id) + client.close() + + execution = self.executor.execute(pipeline) + execution.context.setdefault("distributed_execution", {}) + execution.context["distributed_execution"].update( + { + "requested_backend": "dask", + "execution_mode": "dask-local-cluster", + } + ) + return execution, "dask" + except Exception as exc: + logger.warning("Dask backend unavailable (%s). Executing locally.", exc) + execution = self.executor.execute(pipeline) + execution.context.setdefault("distributed_execution", {}) + execution.context["distributed_execution"].update( + { + "requested_backend": "dask", + "fallback_backend": "local", + "fallback_reason": str(exc), + } + ) + return execution, "local" diff --git a/backend/tests/test_distributed_execution.py b/backend/tests/test_distributed_execution.py index c59a188..23d0c2d 100644 --- a/backend/tests/test_distributed_execution.py +++ b/backend/tests/test_distributed_execution.py @@ -36,7 +36,7 @@ def test_dispatcher_falls_back_to_local_for_unknown_backend(): pipeline = _build_pipeline() dispatcher = DistributedExecutionDispatcher() - result = dispatcher.run(pipeline, backend_override="spark") + result = dispatcher.run(pipeline, backend_override="unknown-backend") assert result.backend_used == "local" assert result.execution.status == ExecutionStatus.COMPLETED @@ -59,3 +59,23 @@ def test_create_execution_tracks_requested_backend_and_backend_used(): assert stored.context["requested_execution_backend"] == "celery" assert stored.context["distributed_execution"]["backend_used"] == "local" + + +def test_dispatcher_spark_falls_back_when_dependency_missing(): + pipeline = _build_pipeline() + + dispatcher = DistributedExecutionDispatcher() + result = dispatcher.run(pipeline, backend_override="spark") + + assert result.backend_used in {"spark", "local"} + assert result.execution.status == ExecutionStatus.COMPLETED + + +def test_dispatcher_dask_falls_back_when_dependency_missing(): + pipeline = _build_pipeline() + + dispatcher = DistributedExecutionDispatcher() + result = dispatcher.run(pipeline, backend_override="dask") + + assert result.backend_used in {"dask", "local"} + assert result.execution.status == ExecutionStatus.COMPLETED diff --git a/pipeline/backend/core/modern_stack.py b/pipeline/backend/core/modern_stack.py index 20b64c2..9044751 100644 --- a/pipeline/backend/core/modern_stack.py +++ b/pipeline/backend/core/modern_stack.py @@ -5,6 +5,8 @@ - Airflow/Temporal orchestrators - Pluggable eventing (Kafka/Pulsar/RabbitMQ/NATS) - Kubernetes jobs for execution +- Ray/Spark/Dask for distributed compute +- PostgreSQL/CockroachDB/MongoDB/Cassandra + object storage persistence - Pluggable distributed compute (Ray/Spark/Dask/Celery) - Pluggable storage (PostgreSQL/CockroachDB/MongoDB/Cassandra) + object storage - Prometheus/Grafana/ELK observability @@ -190,6 +192,24 @@ def architecture(self) -> Dict[str, Any]: "service_account": settings.KUBERNETES_SERVICE_ACCOUNT, }, ).__dict__, + "distributed_compute": StackComponent( + name="ray", + enabled=settings.RAY_ENABLED, + config={ + "dashboard_url": settings.RAY_DASHBOARD_URL, + "entrypoint": settings.RAY_JOB_ENTRYPOINT, + "alternatives": ["spark", "dask"], + }, + ).__dict__, + "storage": { + "database": "postgresql", + "database_alternatives": ["cockroachdb", "mongodb", "cassandra"], + "object_storage": { + "enabled": settings.OBJECT_STORAGE_ENABLED, + "bucket": settings.OBJECT_STORAGE_BUCKET, + "endpoint": settings.OBJECT_STORAGE_ENDPOINT, + }, + }, "distributed_compute": self._distributed_compute(), "storage": self._storage(), "monitoring": { @@ -227,6 +247,33 @@ def submit_execution(self, pipeline_id: str, payload: Dict[str, Any]) -> Dict[st }, ] + if settings.RAY_ENABLED: + commands.append( + { + "layer": "distributed_compute", + "engine": "ray", + "action": "submit_ray_job", + "dashboard": settings.RAY_DASHBOARD_URL, + } + ) + else: + commands.append( + { + "layer": "distributed_compute", + "engine": "spark", + "action": "submit_spark_job", + } + ) + + if settings.KAFKA_ENABLED: + commands.append( + { + "layer": "event_layer", + "engine": "kafka", + "action": "publish_event", + "topic": settings.KAFKA_EXECUTION_TOPIC, + } + ) compute = self._distributed_compute() if compute["enabled"]: compute_command = { diff --git a/pipeline/backend/tests/test_modern_stack.py b/pipeline/backend/tests/test_modern_stack.py index 5bbe621..9017301 100644 --- a/pipeline/backend/tests/test_modern_stack.py +++ b/pipeline/backend/tests/test_modern_stack.py @@ -21,6 +21,9 @@ def test_architecture_contains_requested_layers(): assert architecture["api_layer"]["name"] == "FastAPI" assert architecture["orchestration"]["name"] in {"temporal", "airflow"} assert architecture["execution"]["name"] == "kubernetes-jobs" + assert architecture["distributed_compute"]["name"] == "ray" + assert set(architecture["distributed_compute"]["config"]["alternatives"]) == {"spark", "dask"} + assert "cockroachdb" in architecture["storage"]["database_alternatives"] assert architecture["distributed_compute"]["name"] in {"ray", "spark", "dask", "celery"} assert architecture["security"]["authorization"] == "rbac"