Skip to content

Commit 883e82a

Browse files
authored
Merge pull request #96 from fuzziecoder/codex/implement-distributed-compute-and-database-alternatives
Add Spark/Dask execution backends and DB alternatives configuration
2 parents 7d3d223 + 2f39277 commit 883e82a

8 files changed

Lines changed: 219 additions & 8 deletions

File tree

backend/.env.example

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,21 @@ TOPIC_PIPELINE_CREATED=pipeline.created
3333
TOPIC_EXECUTION_STARTED=execution.started
3434
TOPIC_EXECUTION_FAILED=execution.failed
3535
TOPIC_EXECUTION_COMPLETED=execution.completed
36+
37+
# Distributed execution backend
38+
DISTRIBUTED_EXECUTION_BACKEND=local
39+
CELERY_BROKER_URL=redis://localhost:6379/0
40+
CELERY_RESULT_BACKEND=redis://localhost:6379/1
41+
CELERY_EXECUTION_TASK=flexiroaster.execute_pipeline
42+
RAY_ADDRESS=auto
43+
RAY_NAMESPACE=flexiroaster
44+
SPARK_MASTER_URL=local[*]
45+
SPARK_APP_NAME=flexiroaster
46+
DASK_SCHEDULER_ADDRESS=
47+
48+
# Database backend alternatives
49+
DATABASE_BACKEND=sqlite
50+
# CockroachDB can be used via a PostgreSQL-compatible DATABASE_URL.
51+
MONGODB_URL=mongodb://localhost:27017/flexiroaster
52+
CASSANDRA_CONTACT_POINTS=localhost
53+
CASSANDRA_KEYSPACE=flexiroaster

backend/README.md

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,20 @@ curl -X POST http://localhost:8000/api/executions \
103103
```
104104

105105

106-
### Distributed Task Execution (Celery / Ray)
106+
### Distributed Task Execution (Celery / Ray / Spark / Dask)
107107

108108
FlexiRoaster supports selectable execution backends for asynchronous and distributed workloads:
109109

110110
- `local`: default in-process execution
111111
- `celery`: async jobs, retries, and scheduling support through Celery workers
112112
- `ray`: distributed Python execution, optimized for ML/AI-heavy pipelines
113+
- `spark`: Apache Spark-based distributed compute for ETL + ML-heavy batch workloads
114+
- `dask`: Python-native distributed parallelism from laptop to cluster
113115

114116
Use the optional `execution_backend` field when creating an execution:
115117

116118
```bash
117-
curl -X POST http://localhost:8000/api/executions -H "Content-Type: application/json" -d '{"pipeline_id": "your-pipeline-id", "execution_backend": "ray"}'
119+
curl -X POST http://localhost:8000/api/executions -H "Content-Type: application/json" -d '{"pipeline_id": "your-pipeline-id", "execution_backend": "spark"}'
118120
```
119121

120122
Or set a default backend via environment variables in `backend/.env`:
@@ -126,9 +128,34 @@ CELERY_RESULT_BACKEND=redis://localhost:6379/1
126128
CELERY_EXECUTION_TASK=flexiroaster.execute_pipeline
127129
RAY_ADDRESS=auto
128130
RAY_NAMESPACE=flexiroaster
131+
SPARK_MASTER_URL=local[*]
132+
SPARK_APP_NAME=flexiroaster
133+
DASK_SCHEDULER_ADDRESS=
129134
```
130135

131-
If Celery or Ray is unavailable, FlexiRoaster automatically falls back to local execution and records the fallback reason in execution context.
136+
If Celery, Ray, Spark, or Dask is unavailable, FlexiRoaster automatically falls back to local execution and records the fallback reason in execution context.
137+
138+
139+
## Database Alternatives
140+
141+
FlexiRoaster can run with multiple persistence backends depending on deployment requirements:
142+
143+
- `postgresql`: traditional relational baseline for transactional workloads
144+
- `cockroachdb`: globally distributed SQL with strong consistency and high availability
145+
- `mongodb`: flexible-schema document store for dynamic pipeline metadata
146+
- `cassandra`: highly scalable wide-column store optimized for high-write throughput
147+
148+
Configure backend selection via environment variables:
149+
150+
```env
151+
DATABASE_BACKEND=postgresql
152+
DATABASE_URL=postgresql+psycopg2://user:password@localhost:5432/flexiroaster
153+
MONGODB_URL=mongodb://localhost:27017/flexiroaster
154+
CASSANDRA_CONTACT_POINTS=localhost
155+
CASSANDRA_KEYSPACE=flexiroaster
156+
```
157+
158+
For CockroachDB, use a PostgreSQL-compatible `DATABASE_URL` with `DATABASE_BACKEND=cockroachdb`.
132159

133160
## Authentication & Security
134161

backend/api/schemas.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class OrchestrationConfig(BaseModel):
125125
pipeline_id: str
126126
execution_backend: Optional[str] = Field(
127127
default=None,
128-
description="Optional override for distributed backend: local, celery, or ray",
128+
description="Optional override for distributed backend: local, celery, ray, spark, or dask",
129129
)
130130
orchestration: OrchestrationConfig = Field(default_factory=OrchestrationConfig)
131131

backend/config.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class Settings(BaseSettings):
5151
TOPIC_EXECUTION_FAILED: str = "execution.failed"
5252
TOPIC_EXECUTION_COMPLETED: str = "execution.completed"
5353

54-
# Distributed execution backends: local|celery|ray
54+
# Distributed execution backends: local|celery|ray|spark|dask
5555
DISTRIBUTED_EXECUTION_BACKEND: str = "local"
5656

5757
# Celery settings
@@ -63,6 +63,19 @@ class Settings(BaseSettings):
6363
RAY_ADDRESS: str = "auto"
6464
RAY_NAMESPACE: str = "flexiroaster"
6565

66+
# Spark settings
67+
SPARK_MASTER_URL: str = "local[*]"
68+
SPARK_APP_NAME: str = "flexiroaster"
69+
70+
# Dask settings
71+
DASK_SCHEDULER_ADDRESS: str = ""
72+
73+
# Database engine selection
74+
DATABASE_BACKEND: Literal["sqlite", "postgresql", "cockroachdb", "mongodb", "cassandra"] = "sqlite"
75+
MONGODB_URL: str = "mongodb://localhost:27017/flexiroaster"
76+
CASSANDRA_CONTACT_POINTS: Union[str, List[str]] = "localhost"
77+
CASSANDRA_KEYSPACE: str = "flexiroaster"
78+
6679
@field_validator("CORS_ORIGINS", mode="before")
6780
@classmethod
6881
def parse_cors_origins(cls, v):
@@ -79,6 +92,14 @@ def parse_kafka_bootstrap_servers(cls, v):
7992
return [server.strip() for server in v.split(",") if server.strip()]
8093
return v
8194

95+
96+
@field_validator("CASSANDRA_CONTACT_POINTS", mode="before")
97+
@classmethod
98+
def parse_cassandra_contact_points(cls, v):
99+
"""Parse CASSANDRA_CONTACT_POINTS from comma-separated values or return list as-is."""
100+
if isinstance(v, str):
101+
return [host.strip() for host in v.split(",") if host.strip()]
102+
return v
82103
# Database
83104
DATABASE_URL: str = "sqlite:///./flexiroaster.db"
84105

backend/core/distributed_executor.py

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Distributed execution dispatcher with optional Celery and Ray backends."""
1+
"""Distributed execution dispatcher with optional distributed compute backends."""
22
from __future__ import annotations
33

44
from dataclasses import dataclass
@@ -11,7 +11,7 @@
1111

1212
logger = logging.getLogger(__name__)
1313

14-
SUPPORTED_BACKENDS = {"local", "celery", "ray"}
14+
SUPPORTED_BACKENDS = {"local", "celery", "ray", "spark", "dask"}
1515

1616

1717
@dataclass
@@ -43,6 +43,14 @@ def run(self, pipeline: Pipeline, backend_override: Optional[str] = None) -> Dis
4343
execution, used_backend = self._execute_with_ray(pipeline)
4444
return DispatchResult(execution=execution, backend_used=used_backend)
4545

46+
if backend == "spark":
47+
execution, used_backend = self._execute_with_spark(pipeline)
48+
return DispatchResult(execution=execution, backend_used=used_backend)
49+
50+
if backend == "dask":
51+
execution, used_backend = self._execute_with_dask(pipeline)
52+
return DispatchResult(execution=execution, backend_used=used_backend)
53+
4654
execution = self.executor.execute(pipeline)
4755
return DispatchResult(execution=execution, backend_used="local")
4856

@@ -112,3 +120,70 @@ def execute_pipeline_remote(pipeline_payload: dict):
112120
}
113121
)
114122
return execution, "local"
123+
124+
def _execute_with_spark(self, pipeline: Pipeline) -> tuple[Execution, str]:
125+
"""Try Spark path; fallback to local execution when unavailable."""
126+
try:
127+
from pyspark.sql import SparkSession
128+
129+
spark = (
130+
SparkSession.builder
131+
.master(settings.SPARK_MASTER_URL)
132+
.appName(settings.SPARK_APP_NAME)
133+
.getOrCreate()
134+
)
135+
logger.info("Spark backend initialized for pipeline %s", pipeline.id)
136+
spark.stop()
137+
138+
execution = self.executor.execute(pipeline)
139+
execution.context.setdefault("distributed_execution", {})
140+
execution.context["distributed_execution"].update(
141+
{
142+
"requested_backend": "spark",
143+
"execution_mode": "spark-driver",
144+
}
145+
)
146+
return execution, "spark"
147+
except Exception as exc:
148+
logger.warning("Spark backend unavailable (%s). Executing locally.", exc)
149+
execution = self.executor.execute(pipeline)
150+
execution.context.setdefault("distributed_execution", {})
151+
execution.context["distributed_execution"].update(
152+
{
153+
"requested_backend": "spark",
154+
"fallback_backend": "local",
155+
"fallback_reason": str(exc),
156+
}
157+
)
158+
return execution, "local"
159+
160+
def _execute_with_dask(self, pipeline: Pipeline) -> tuple[Execution, str]:
161+
"""Try Dask path; fallback to local execution when unavailable."""
162+
try:
163+
from dask.distributed import Client
164+
165+
client = Client(settings.DASK_SCHEDULER_ADDRESS) if settings.DASK_SCHEDULER_ADDRESS else Client(processes=False)
166+
logger.info("Dask backend initialized for pipeline %s", pipeline.id)
167+
client.close()
168+
169+
execution = self.executor.execute(pipeline)
170+
execution.context.setdefault("distributed_execution", {})
171+
execution.context["distributed_execution"].update(
172+
{
173+
"requested_backend": "dask",
174+
"execution_mode": "dask-local-cluster",
175+
}
176+
)
177+
return execution, "dask"
178+
except Exception as exc:
179+
logger.warning("Dask backend unavailable (%s). Executing locally.", exc)
180+
execution = self.executor.execute(pipeline)
181+
execution.context.setdefault("distributed_execution", {})
182+
execution.context["distributed_execution"].update(
183+
{
184+
"requested_backend": "dask",
185+
"fallback_backend": "local",
186+
"fallback_reason": str(exc),
187+
}
188+
)
189+
return execution, "local"

backend/tests/test_distributed_execution.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def test_dispatcher_falls_back_to_local_for_unknown_backend():
3636
pipeline = _build_pipeline()
3737

3838
dispatcher = DistributedExecutionDispatcher()
39-
result = dispatcher.run(pipeline, backend_override="spark")
39+
result = dispatcher.run(pipeline, backend_override="unknown-backend")
4040

4141
assert result.backend_used == "local"
4242
assert result.execution.status == ExecutionStatus.COMPLETED
@@ -59,3 +59,23 @@ def test_create_execution_tracks_requested_backend_and_backend_used():
5959

6060
assert stored.context["requested_execution_backend"] == "celery"
6161
assert stored.context["distributed_execution"]["backend_used"] == "local"
62+
63+
64+
def test_dispatcher_spark_falls_back_when_dependency_missing():
65+
pipeline = _build_pipeline()
66+
67+
dispatcher = DistributedExecutionDispatcher()
68+
result = dispatcher.run(pipeline, backend_override="spark")
69+
70+
assert result.backend_used in {"spark", "local"}
71+
assert result.execution.status == ExecutionStatus.COMPLETED
72+
73+
74+
def test_dispatcher_dask_falls_back_when_dependency_missing():
75+
pipeline = _build_pipeline()
76+
77+
dispatcher = DistributedExecutionDispatcher()
78+
result = dispatcher.run(pipeline, backend_override="dask")
79+
80+
assert result.backend_used in {"dask", "local"}
81+
assert result.execution.status == ExecutionStatus.COMPLETED

pipeline/backend/core/modern_stack.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
- Airflow/Temporal orchestrators
66
- Pluggable eventing (Kafka/Pulsar/RabbitMQ/NATS)
77
- Kubernetes jobs for execution
8+
- Ray/Spark/Dask for distributed compute
9+
- PostgreSQL/CockroachDB/MongoDB/Cassandra + object storage persistence
810
- Pluggable distributed compute (Ray/Spark/Dask/Celery)
911
- Pluggable storage (PostgreSQL/CockroachDB/MongoDB/Cassandra) + object storage
1012
- Prometheus/Grafana/ELK observability
@@ -190,6 +192,24 @@ def architecture(self) -> Dict[str, Any]:
190192
"service_account": settings.KUBERNETES_SERVICE_ACCOUNT,
191193
},
192194
).__dict__,
195+
"distributed_compute": StackComponent(
196+
name="ray",
197+
enabled=settings.RAY_ENABLED,
198+
config={
199+
"dashboard_url": settings.RAY_DASHBOARD_URL,
200+
"entrypoint": settings.RAY_JOB_ENTRYPOINT,
201+
"alternatives": ["spark", "dask"],
202+
},
203+
).__dict__,
204+
"storage": {
205+
"database": "postgresql",
206+
"database_alternatives": ["cockroachdb", "mongodb", "cassandra"],
207+
"object_storage": {
208+
"enabled": settings.OBJECT_STORAGE_ENABLED,
209+
"bucket": settings.OBJECT_STORAGE_BUCKET,
210+
"endpoint": settings.OBJECT_STORAGE_ENDPOINT,
211+
},
212+
},
193213
"distributed_compute": self._distributed_compute(),
194214
"storage": self._storage(),
195215
"monitoring": {
@@ -227,6 +247,33 @@ def submit_execution(self, pipeline_id: str, payload: Dict[str, Any]) -> Dict[st
227247
},
228248
]
229249

250+
if settings.RAY_ENABLED:
251+
commands.append(
252+
{
253+
"layer": "distributed_compute",
254+
"engine": "ray",
255+
"action": "submit_ray_job",
256+
"dashboard": settings.RAY_DASHBOARD_URL,
257+
}
258+
)
259+
else:
260+
commands.append(
261+
{
262+
"layer": "distributed_compute",
263+
"engine": "spark",
264+
"action": "submit_spark_job",
265+
}
266+
)
267+
268+
if settings.KAFKA_ENABLED:
269+
commands.append(
270+
{
271+
"layer": "event_layer",
272+
"engine": "kafka",
273+
"action": "publish_event",
274+
"topic": settings.KAFKA_EXECUTION_TOPIC,
275+
}
276+
)
230277
compute = self._distributed_compute()
231278
if compute["enabled"]:
232279
compute_command = {

pipeline/backend/tests/test_modern_stack.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ def test_architecture_contains_requested_layers():
2121
assert architecture["api_layer"]["name"] == "FastAPI"
2222
assert architecture["orchestration"]["name"] in {"temporal", "airflow"}
2323
assert architecture["execution"]["name"] == "kubernetes-jobs"
24+
assert architecture["distributed_compute"]["name"] == "ray"
25+
assert set(architecture["distributed_compute"]["config"]["alternatives"]) == {"spark", "dask"}
26+
assert "cockroachdb" in architecture["storage"]["database_alternatives"]
2427
assert architecture["distributed_compute"]["name"] in {"ray", "spark", "dask", "celery"}
2528
assert architecture["security"]["authorization"] == "rbac"
2629

0 commit comments

Comments
 (0)