Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,21 @@ TOPIC_PIPELINE_CREATED=pipeline.created
TOPIC_EXECUTION_STARTED=execution.started
TOPIC_EXECUTION_FAILED=execution.failed
TOPIC_EXECUTION_COMPLETED=execution.completed

# Distributed execution backend
DISTRIBUTED_EXECUTION_BACKEND=local
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/1
CELERY_EXECUTION_TASK=flexiroaster.execute_pipeline
RAY_ADDRESS=auto
RAY_NAMESPACE=flexiroaster
SPARK_MASTER_URL=local[*]
SPARK_APP_NAME=flexiroaster
DASK_SCHEDULER_ADDRESS=

# Database backend alternatives
DATABASE_BACKEND=sqlite
# CockroachDB can be used via a PostgreSQL-compatible DATABASE_URL.
MONGODB_URL=mongodb://localhost:27017/flexiroaster
CASSANDRA_CONTACT_POINTS=localhost
CASSANDRA_KEYSPACE=flexiroaster
33 changes: 30 additions & 3 deletions backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,20 @@ curl -X POST http://localhost:8000/api/executions \
```


### Distributed Task Execution (Celery / Ray)
### Distributed Task Execution (Celery / Ray / Spark / Dask)

FlexiRoaster supports selectable execution backends for asynchronous and distributed workloads:

- `local`: default in-process execution
- `celery`: async jobs, retries, and scheduling support through Celery workers
- `ray`: distributed Python execution, optimized for ML/AI-heavy pipelines
- `spark`: Apache Spark-based distributed compute for ETL + ML-heavy batch workloads
- `dask`: Python-native distributed parallelism from laptop to cluster

Use the optional `execution_backend` field when creating an execution:

```bash
curl -X POST http://localhost:8000/api/executions -H "Content-Type: application/json" -d '{"pipeline_id": "your-pipeline-id", "execution_backend": "ray"}'
curl -X POST http://localhost:8000/api/executions -H "Content-Type: application/json" -d '{"pipeline_id": "your-pipeline-id", "execution_backend": "spark"}'
```

Or set a default backend via environment variables in `backend/.env`:
Expand All @@ -126,9 +128,34 @@ CELERY_RESULT_BACKEND=redis://localhost:6379/1
CELERY_EXECUTION_TASK=flexiroaster.execute_pipeline
RAY_ADDRESS=auto
RAY_NAMESPACE=flexiroaster
SPARK_MASTER_URL=local[*]
SPARK_APP_NAME=flexiroaster
DASK_SCHEDULER_ADDRESS=
```

If Celery or Ray is unavailable, FlexiRoaster automatically falls back to local execution and records the fallback reason in execution context.
If Celery, Ray, Spark, or Dask is unavailable, FlexiRoaster automatically falls back to local execution and records the fallback reason in execution context.


## Database Alternatives

FlexiRoaster can run with multiple persistence backends depending on deployment requirements:

- `postgresql`: traditional relational baseline for transactional workloads
- `cockroachdb`: globally distributed SQL with strong consistency and high availability
- `mongodb`: flexible-schema document store for dynamic pipeline metadata
- `cassandra`: highly scalable wide-column store optimized for high-write throughput

Configure backend selection via environment variables:

```env
DATABASE_BACKEND=postgresql
DATABASE_URL=postgresql+psycopg2://user:password@localhost:5432/flexiroaster
MONGODB_URL=mongodb://localhost:27017/flexiroaster
CASSANDRA_CONTACT_POINTS=localhost
CASSANDRA_KEYSPACE=flexiroaster
```

For CockroachDB, use a PostgreSQL-compatible `DATABASE_URL` with `DATABASE_BACKEND=cockroachdb`.

## Authentication & Security

Expand Down
2 changes: 1 addition & 1 deletion backend/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class OrchestrationConfig(BaseModel):
pipeline_id: str
execution_backend: Optional[str] = Field(
default=None,
description="Optional override for distributed backend: local, celery, or ray",
description="Optional override for distributed backend: local, celery, ray, spark, or dask",
)
orchestration: OrchestrationConfig = Field(default_factory=OrchestrationConfig)

Expand Down
23 changes: 22 additions & 1 deletion backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Settings(BaseSettings):
TOPIC_EXECUTION_FAILED: str = "execution.failed"
TOPIC_EXECUTION_COMPLETED: str = "execution.completed"

# Distributed execution backends: local|celery|ray
# Distributed execution backends: local|celery|ray|spark|dask
DISTRIBUTED_EXECUTION_BACKEND: str = "local"

# Celery settings
Expand All @@ -63,6 +63,19 @@ class Settings(BaseSettings):
RAY_ADDRESS: str = "auto"
RAY_NAMESPACE: str = "flexiroaster"

# Spark settings
SPARK_MASTER_URL: str = "local[*]"
SPARK_APP_NAME: str = "flexiroaster"

# Dask settings
DASK_SCHEDULER_ADDRESS: str = ""

# Database engine selection
DATABASE_BACKEND: Literal["sqlite", "postgresql", "cockroachdb", "mongodb", "cassandra"] = "sqlite"
MONGODB_URL: str = "mongodb://localhost:27017/flexiroaster"
CASSANDRA_CONTACT_POINTS: Union[str, List[str]] = "localhost"
CASSANDRA_KEYSPACE: str = "flexiroaster"

@field_validator("CORS_ORIGINS", mode="before")
@classmethod
def parse_cors_origins(cls, v):
Expand All @@ -79,6 +92,14 @@ def parse_kafka_bootstrap_servers(cls, v):
return [server.strip() for server in v.split(",") if server.strip()]
return v


@field_validator("CASSANDRA_CONTACT_POINTS", mode="before")
@classmethod
def parse_cassandra_contact_points(cls, v):
"""Parse CASSANDRA_CONTACT_POINTS from comma-separated values or return list as-is."""
if isinstance(v, str):
return [host.strip() for host in v.split(",") if host.strip()]
return v
# Database
DATABASE_URL: str = "sqlite:///./flexiroaster.db"

Expand Down
79 changes: 77 additions & 2 deletions backend/core/distributed_executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Distributed execution dispatcher with optional Celery and Ray backends."""
"""Distributed execution dispatcher with optional distributed compute backends."""
from __future__ import annotations

from dataclasses import dataclass
Expand All @@ -11,7 +11,7 @@

logger = logging.getLogger(__name__)

SUPPORTED_BACKENDS = {"local", "celery", "ray"}
SUPPORTED_BACKENDS = {"local", "celery", "ray", "spark", "dask"}


@dataclass
Expand Down Expand Up @@ -43,6 +43,14 @@ def run(self, pipeline: Pipeline, backend_override: Optional[str] = None) -> Dis
execution, used_backend = self._execute_with_ray(pipeline)
return DispatchResult(execution=execution, backend_used=used_backend)

if backend == "spark":
execution, used_backend = self._execute_with_spark(pipeline)
return DispatchResult(execution=execution, backend_used=used_backend)

if backend == "dask":
execution, used_backend = self._execute_with_dask(pipeline)
return DispatchResult(execution=execution, backend_used=used_backend)

execution = self.executor.execute(pipeline)
return DispatchResult(execution=execution, backend_used="local")

Expand Down Expand Up @@ -112,3 +120,70 @@ def execute_pipeline_remote(pipeline_payload: dict):
}
)
return execution, "local"

def _execute_with_spark(self, pipeline: Pipeline) -> tuple[Execution, str]:
"""Try Spark path; fallback to local execution when unavailable."""
try:
from pyspark.sql import SparkSession

spark = (
SparkSession.builder
.master(settings.SPARK_MASTER_URL)
.appName(settings.SPARK_APP_NAME)
.getOrCreate()
)
logger.info("Spark backend initialized for pipeline %s", pipeline.id)
spark.stop()

execution = self.executor.execute(pipeline)
execution.context.setdefault("distributed_execution", {})
execution.context["distributed_execution"].update(
{
"requested_backend": "spark",
"execution_mode": "spark-driver",
}
)
Comment on lines +136 to +145
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Spark session is stopped before pipeline execution, making Spark backend useless

In _execute_with_spark, the SparkSession is created and immediately stopped (spark.stop()) before the pipeline is actually executed via self.executor.execute(pipeline). The pipeline never runs on Spark.

Root Cause and Impact

At backend/core/distributed_executor.py:129-136:

spark = (
    SparkSession.builder
    .master(settings.SPARK_MASTER_URL)
    .appName(settings.SPARK_APP_NAME)
    .getOrCreate()
)
logger.info("Spark backend initialized for pipeline %s", pipeline.id)
spark.stop()

The Spark session is stopped on line 136, and then the pipeline is executed locally via self.executor.execute(pipeline) on line 138. The Spark session is only used as a connectivity probe — the actual execution always runs locally. Yet the method reports backend_used="spark" and execution_mode="spark-driver", which is misleading.

The same pattern exists for Dask at lines 164-167: a Dask client is created and immediately closed before local execution.

Impact: When a user requests Spark or Dask execution, the pipeline always runs locally but the metadata falsely claims it ran on Spark/Dask. This is functionally equivalent to local execution with misleading metadata.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

return execution, "spark"
except Exception as exc:
logger.warning("Spark backend unavailable (%s). Executing locally.", exc)
execution = self.executor.execute(pipeline)
execution.context.setdefault("distributed_execution", {})
execution.context["distributed_execution"].update(
{
"requested_backend": "spark",
"fallback_backend": "local",
"fallback_reason": str(exc),
}
)
return execution, "local"

def _execute_with_dask(self, pipeline: Pipeline) -> tuple[Execution, str]:
"""Try Dask path; fallback to local execution when unavailable."""
try:
from dask.distributed import Client

client = Client(settings.DASK_SCHEDULER_ADDRESS) if settings.DASK_SCHEDULER_ADDRESS else Client(processes=False)
logger.info("Dask backend initialized for pipeline %s", pipeline.id)
client.close()

execution = self.executor.execute(pipeline)
execution.context.setdefault("distributed_execution", {})
execution.context["distributed_execution"].update(
{
"requested_backend": "dask",
"execution_mode": "dask-local-cluster",
}
)
return execution, "dask"
except Exception as exc:
logger.warning("Dask backend unavailable (%s). Executing locally.", exc)
execution = self.executor.execute(pipeline)
execution.context.setdefault("distributed_execution", {})
execution.context["distributed_execution"].update(
{
"requested_backend": "dask",
"fallback_backend": "local",
"fallback_reason": str(exc),
}
)
return execution, "local"
22 changes: 21 additions & 1 deletion backend/tests/test_distributed_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_dispatcher_falls_back_to_local_for_unknown_backend():
pipeline = _build_pipeline()

dispatcher = DistributedExecutionDispatcher()
result = dispatcher.run(pipeline, backend_override="spark")
result = dispatcher.run(pipeline, backend_override="unknown-backend")

assert result.backend_used == "local"
assert result.execution.status == ExecutionStatus.COMPLETED
Expand All @@ -59,3 +59,23 @@ def test_create_execution_tracks_requested_backend_and_backend_used():

assert stored.context["requested_execution_backend"] == "celery"
assert stored.context["distributed_execution"]["backend_used"] == "local"


def test_dispatcher_spark_falls_back_when_dependency_missing():
pipeline = _build_pipeline()

dispatcher = DistributedExecutionDispatcher()
result = dispatcher.run(pipeline, backend_override="spark")

assert result.backend_used in {"spark", "local"}
assert result.execution.status == ExecutionStatus.COMPLETED


def test_dispatcher_dask_falls_back_when_dependency_missing():
pipeline = _build_pipeline()

dispatcher = DistributedExecutionDispatcher()
result = dispatcher.run(pipeline, backend_override="dask")

assert result.backend_used in {"dask", "local"}
assert result.execution.status == ExecutionStatus.COMPLETED
47 changes: 47 additions & 0 deletions pipeline/backend/core/modern_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
- Airflow/Temporal orchestrators
- Pluggable eventing (Kafka/Pulsar/RabbitMQ/NATS)
- Kubernetes jobs for execution
- Ray/Spark/Dask for distributed compute
- PostgreSQL/CockroachDB/MongoDB/Cassandra + object storage persistence
- Pluggable distributed compute (Ray/Spark/Dask/Celery)
- Pluggable storage (PostgreSQL/CockroachDB/MongoDB/Cassandra) + object storage
- Prometheus/Grafana/ELK observability
Expand Down Expand Up @@ -190,6 +192,24 @@ def architecture(self) -> Dict[str, Any]:
"service_account": settings.KUBERNETES_SERVICE_ACCOUNT,
},
).__dict__,
"distributed_compute": StackComponent(
name="ray",
enabled=settings.RAY_ENABLED,
config={
"dashboard_url": settings.RAY_DASHBOARD_URL,
"entrypoint": settings.RAY_JOB_ENTRYPOINT,
"alternatives": ["spark", "dask"],
},
).__dict__,
"storage": {
"database": "postgresql",
"database_alternatives": ["cockroachdb", "mongodb", "cassandra"],
"object_storage": {
"enabled": settings.OBJECT_STORAGE_ENABLED,
"bucket": settings.OBJECT_STORAGE_BUCKET,
"endpoint": settings.OBJECT_STORAGE_ENDPOINT,
},
},
"distributed_compute": self._distributed_compute(),
"storage": self._storage(),
Comment on lines +195 to 214
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Duplicate dictionary keys in architecture() cause new entries to be silently overwritten

The architecture() method defines "distributed_compute" and "storage" keys twice in the same dict literal. In Python, when a dictionary literal has duplicate keys, the last value wins silently.

Root Cause and Impact

The new code at lines 195-212 adds hardcoded entries:

"distributed_compute": StackComponent(
    name="ray", ...,
    config={..., "alternatives": ["spark", "dask"]},
).__dict__,
"storage": {
    "database": "postgresql",
    "database_alternatives": ["cockroachdb", "mongodb", "cassandra"],
    ...
},

But then lines 213-214 define the same keys again:

"distributed_compute": self._distributed_compute(),
"storage": self._storage(),

Since the last value wins, the hardcoded entries (with alternatives and database_alternatives) are completely dead code. The returned dict will never contain config.alternatives or database_alternatives.

Impact: The new test assertions at pipeline/backend/tests/test_modern_stack.py:24-26 will fail at runtime:

  • architecture["distributed_compute"]["config"]["alternatives"] → KeyError
  • architecture["storage"]["database_alternatives"] → KeyError

More broadly, the advertised compute alternatives and database alternatives metadata that this PR intends to expose will never appear in the architecture output.

Prompt for agents
In pipeline/backend/core/modern_stack.py, the architecture() method (lines 167-224) has duplicate dictionary keys "distributed_compute" (lines 195 and 213) and "storage" (lines 204 and 214). The second occurrence silently overwrites the first. You need to either:

1. Remove the first (hardcoded) entries at lines 195-212 and instead merge the "alternatives" and "database_alternatives" metadata into the dynamic _distributed_compute() and _storage() results, OR
2. Remove the second (dynamic) entries at lines 213-214 and keep only the hardcoded ones (but this would lose the dynamic configuration behavior).

The recommended approach is option 1: remove lines 195-212 entirely, and modify _distributed_compute() to include an "alternatives" key in its config, and modify _storage() to include a "database_alternatives" key in its returned dict. This preserves both the dynamic behavior and the new metadata the PR intends to add.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

"monitoring": {
Expand Down Expand Up @@ -227,6 +247,33 @@ def submit_execution(self, pipeline_id: str, payload: Dict[str, Any]) -> Dict[st
},
]

if settings.RAY_ENABLED:
commands.append(
{
"layer": "distributed_compute",
"engine": "ray",
"action": "submit_ray_job",
"dashboard": settings.RAY_DASHBOARD_URL,
}
)
else:
commands.append(
{
"layer": "distributed_compute",
"engine": "spark",
"action": "submit_spark_job",
}
)

if settings.KAFKA_ENABLED:
commands.append(
{
"layer": "event_layer",
"engine": "kafka",
"action": "publish_event",
"topic": settings.KAFKA_EXECUTION_TOPIC,
}
)
Comment on lines +250 to +276
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Duplicate distributed_compute and event_layer commands appended in submit_execution()

The submit_execution() method appends hardcoded compute and event commands (lines 250-276) and then the existing dynamic logic (lines 277-308) appends the same types of commands again, resulting in duplicate entries in the commands list.

Root Cause and Impact

The new code at lines 250-266 unconditionally appends a distributed_compute command (either ray or spark based on RAY_ENABLED). Then the existing code at lines 277-290 also appends a distributed_compute command from self._distributed_compute() if it's enabled.

Similarly, lines 268-276 append a kafka event_layer command if KAFKA_ENABLED, and lines 292-308 append an event_layer command from self._event_layer() if it's enabled.

With default settings (RAY_ENABLED=True, KAFKA_ENABLED=False, DISTRIBUTED_COMPUTE_BACKEND=ray), the commands list will contain two distributed_compute commands — one hardcoded ray command and one dynamic ray command from _distributed_compute(). This produces a malformed execution envelope with duplicate instructions.

Impact: Downstream consumers processing the commands list will see duplicate distributed_compute (and potentially event_layer) commands, which could cause double-submission of jobs or confusing metadata.

Prompt for agents
In pipeline/backend/core/modern_stack.py, the submit_execution() method has duplicate command-appending logic. Lines 250-276 add hardcoded distributed_compute and event_layer commands, and then lines 277-308 add dynamic versions of the same commands. Remove the hardcoded block at lines 250-276 (the if/else for RAY_ENABLED and the if for KAFKA_ENABLED) since the existing dynamic logic at lines 277-308 already handles these cases correctly based on the configured backends.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

compute = self._distributed_compute()
if compute["enabled"]:
compute_command = {
Expand Down
3 changes: 3 additions & 0 deletions pipeline/backend/tests/test_modern_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def test_architecture_contains_requested_layers():
assert architecture["api_layer"]["name"] == "FastAPI"
assert architecture["orchestration"]["name"] in {"temporal", "airflow"}
assert architecture["execution"]["name"] == "kubernetes-jobs"
assert architecture["distributed_compute"]["name"] == "ray"
assert set(architecture["distributed_compute"]["config"]["alternatives"]) == {"spark", "dask"}
assert "cockroachdb" in architecture["storage"]["database_alternatives"]
assert architecture["distributed_compute"]["name"] in {"ray", "spark", "dask", "celery"}
assert architecture["security"]["authorization"] == "rbac"

Expand Down
Loading