-
-
Notifications
You must be signed in to change notification settings - Fork 5
Add Spark/Dask execution backends and DB alternatives configuration #96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,8 @@ | |
| - Airflow/Temporal orchestrators | ||
| - Pluggable eventing (Kafka/Pulsar/RabbitMQ/NATS) | ||
| - Kubernetes jobs for execution | ||
| - Ray/Spark/Dask for distributed compute | ||
| - PostgreSQL/CockroachDB/MongoDB/Cassandra + object storage persistence | ||
| - Pluggable distributed compute (Ray/Spark/Dask/Celery) | ||
| - Pluggable storage (PostgreSQL/CockroachDB/MongoDB/Cassandra) + object storage | ||
| - Prometheus/Grafana/ELK observability | ||
|
|
@@ -190,6 +192,24 @@ def architecture(self) -> Dict[str, Any]: | |
| "service_account": settings.KUBERNETES_SERVICE_ACCOUNT, | ||
| }, | ||
| ).__dict__, | ||
| "distributed_compute": StackComponent( | ||
| name="ray", | ||
| enabled=settings.RAY_ENABLED, | ||
| config={ | ||
| "dashboard_url": settings.RAY_DASHBOARD_URL, | ||
| "entrypoint": settings.RAY_JOB_ENTRYPOINT, | ||
| "alternatives": ["spark", "dask"], | ||
| }, | ||
| ).__dict__, | ||
| "storage": { | ||
| "database": "postgresql", | ||
| "database_alternatives": ["cockroachdb", "mongodb", "cassandra"], | ||
| "object_storage": { | ||
| "enabled": settings.OBJECT_STORAGE_ENABLED, | ||
| "bucket": settings.OBJECT_STORAGE_BUCKET, | ||
| "endpoint": settings.OBJECT_STORAGE_ENDPOINT, | ||
| }, | ||
| }, | ||
| "distributed_compute": self._distributed_compute(), | ||
| "storage": self._storage(), | ||
|
Comment on lines
+195
to
214
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Duplicate dictionary keys in The Root Cause and ImpactThe new code at lines 195-212 adds hardcoded entries: "distributed_compute": StackComponent(
name="ray", ...,
config={..., "alternatives": ["spark", "dask"]},
).__dict__,
"storage": {
"database": "postgresql",
"database_alternatives": ["cockroachdb", "mongodb", "cassandra"],
...
},But then lines 213-214 define the same keys again: "distributed_compute": self._distributed_compute(),
"storage": self._storage(),Since the last value wins, the hardcoded entries (with Impact: The new test assertions at
More broadly, the advertised compute alternatives and database alternatives metadata that this PR intends to expose will never appear in the architecture output. Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback. |
||
| "monitoring": { | ||
|
|
@@ -227,6 +247,33 @@ def submit_execution(self, pipeline_id: str, payload: Dict[str, Any]) -> Dict[st | |
| }, | ||
| ] | ||
|
|
||
| if settings.RAY_ENABLED: | ||
| commands.append( | ||
| { | ||
| "layer": "distributed_compute", | ||
| "engine": "ray", | ||
| "action": "submit_ray_job", | ||
| "dashboard": settings.RAY_DASHBOARD_URL, | ||
| } | ||
| ) | ||
| else: | ||
| commands.append( | ||
| { | ||
| "layer": "distributed_compute", | ||
| "engine": "spark", | ||
| "action": "submit_spark_job", | ||
| } | ||
| ) | ||
|
|
||
| if settings.KAFKA_ENABLED: | ||
| commands.append( | ||
| { | ||
| "layer": "event_layer", | ||
| "engine": "kafka", | ||
| "action": "publish_event", | ||
| "topic": settings.KAFKA_EXECUTION_TOPIC, | ||
| } | ||
| ) | ||
|
Comment on lines
+250
to
+276
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Duplicate The Root Cause and ImpactThe new code at lines 250-266 unconditionally appends a Similarly, lines 268-276 append a With default settings ( Impact: Downstream consumers processing the commands list will see duplicate Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback. |
||
| compute = self._distributed_compute() | ||
| if compute["enabled"]: | ||
| compute_command = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Spark session is stopped before pipeline execution, making Spark backend useless
In
_execute_with_spark, the SparkSession is created and immediately stopped (spark.stop()) before the pipeline is actually executed viaself.executor.execute(pipeline). The pipeline never runs on Spark.Root Cause and Impact
At
backend/core/distributed_executor.py:129-136:The Spark session is stopped on line 136, and then the pipeline is executed locally via
self.executor.execute(pipeline)on line 138. The Spark session is only used as a connectivity probe — the actual execution always runs locally. Yet the method reportsbackend_used="spark"andexecution_mode="spark-driver", which is misleading.The same pattern exists for Dask at lines 164-167: a Dask client is created and immediately closed before local execution.
Impact: When a user requests Spark or Dask execution, the pipeline always runs locally but the metadata falsely claims it ran on Spark/Dask. This is functionally equivalent to local execution with misleading metadata.
Was this helpful? React with 👍 or 👎 to provide feedback.