-
-
Notifications
You must be signed in to change notification settings - Fork 5
Add pluggable event, compute, and database backends to modern stack #95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,10 +3,10 @@ | |
| Implements a configurable orchestration profile that combines: | ||
| - FastAPI API layer | ||
| - Airflow/Temporal orchestrators | ||
| - Kafka eventing | ||
| - Pluggable eventing (Kafka/Pulsar/RabbitMQ/NATS) | ||
| - Kubernetes jobs for execution | ||
| - Ray for distributed compute | ||
| - PostgreSQL + object storage persistence | ||
| - Pluggable distributed compute (Ray/Spark/Dask/Celery) | ||
| - Pluggable storage (PostgreSQL/CockroachDB/MongoDB/Cassandra) + object storage | ||
| - Prometheus/Grafana/ELK observability | ||
| - JWT + RBAC + secrets manager security | ||
| """ | ||
|
|
@@ -30,6 +30,138 @@ class StackComponent: | |
| class ModernOrchestrationStack: | ||
| """Service layer for the high-end orchestration stack.""" | ||
|
|
||
| def _event_layer(self) -> Dict[str, Any]: | ||
| backend = settings.EVENT_BACKEND.lower() | ||
|
|
||
| if backend == "pulsar": | ||
| return StackComponent( | ||
| name="pulsar", | ||
| enabled=settings.PULSAR_ENABLED, | ||
| config={ | ||
| "service_url": settings.PULSAR_SERVICE_URL, | ||
| "topic": settings.PULSAR_EXECUTION_TOPIC, | ||
| "tenant": settings.PULSAR_TENANT, | ||
| "namespace": settings.PULSAR_NAMESPACE, | ||
| "geo_replication_regions": settings.PULSAR_GEO_REPLICATION_REGIONS, | ||
| "multi_tenant": True, | ||
| }, | ||
| ).__dict__ | ||
|
|
||
| if backend == "rabbitmq": | ||
| return StackComponent( | ||
| name="rabbitmq", | ||
| enabled=settings.RABBITMQ_ENABLED, | ||
| config={ | ||
| "url": settings.RABBITMQ_URL, | ||
| "exchange": settings.RABBITMQ_EXCHANGE, | ||
| "queue": settings.RABBITMQ_QUEUE, | ||
| }, | ||
| ).__dict__ | ||
|
|
||
| if backend == "nats": | ||
| return StackComponent( | ||
| name="nats", | ||
| enabled=settings.NATS_ENABLED, | ||
| config={ | ||
| "servers": settings.NATS_SERVERS, | ||
| "subject": settings.NATS_SUBJECT, | ||
| }, | ||
| ).__dict__ | ||
|
|
||
| return StackComponent( | ||
| name="kafka", | ||
| enabled=settings.KAFKA_ENABLED, | ||
| config={ | ||
| "bootstrap_servers": settings.KAFKA_BOOTSTRAP_SERVERS, | ||
| "topic": settings.KAFKA_EXECUTION_TOPIC, | ||
| }, | ||
| ).__dict__ | ||
|
|
||
| def _distributed_compute(self) -> Dict[str, Any]: | ||
| backend = settings.DISTRIBUTED_COMPUTE_BACKEND.lower() | ||
|
|
||
| if backend == "spark": | ||
| return StackComponent( | ||
| name="spark", | ||
| enabled=settings.SPARK_ENABLED, | ||
| config={ | ||
| "master_url": settings.SPARK_MASTER_URL, | ||
| "app_name": settings.SPARK_APP_NAME, | ||
| "supports_batch": True, | ||
| "supports_streaming": settings.SPARK_STREAMING_ENABLED, | ||
| }, | ||
| ).__dict__ | ||
|
|
||
| if backend == "dask": | ||
| return StackComponent( | ||
| name="dask", | ||
| enabled=settings.DASK_ENABLED, | ||
| config={ | ||
| "scheduler_address": settings.DASK_SCHEDULER_ADDRESS, | ||
| "dashboard_url": settings.DASK_DASHBOARD_URL, | ||
| "python_native": True, | ||
| }, | ||
| ).__dict__ | ||
|
|
||
| if backend == "celery": | ||
| return StackComponent( | ||
| name="celery", | ||
| enabled=True, | ||
| config={ | ||
| "broker_url": settings.CELERY_BROKER_URL, | ||
| "result_backend": settings.CELERY_RESULT_BACKEND, | ||
| "task_queue": settings.CELERY_TASK_QUEUE, | ||
| }, | ||
| ).__dict__ | ||
|
|
||
| return StackComponent( | ||
| name="ray", | ||
| enabled=settings.RAY_ENABLED, | ||
| config={ | ||
| "dashboard_url": settings.RAY_DASHBOARD_URL, | ||
| "entrypoint": settings.RAY_JOB_ENTRYPOINT, | ||
| }, | ||
| ).__dict__ | ||
|
|
||
| def _storage(self) -> Dict[str, Any]: | ||
| backend = settings.DATABASE_BACKEND.lower() | ||
| database_config: Dict[str, Any] | ||
|
|
||
| if backend == "cockroachdb": | ||
| database_config = { | ||
| "engine": "cockroachdb", | ||
| "url": settings.COCKROACHDB_URL, | ||
| "strong_consistency": True, | ||
| "high_availability": True, | ||
| } | ||
| elif backend == "mongodb": | ||
| database_config = { | ||
| "engine": "mongodb", | ||
| "url": settings.MONGODB_URL, | ||
| "flexible_schema": True, | ||
| } | ||
| elif backend == "cassandra": | ||
| database_config = { | ||
| "engine": "cassandra", | ||
| "contact_points": settings.CASSANDRA_CONTACT_POINTS, | ||
| "keyspace": settings.CASSANDRA_KEYSPACE, | ||
| "high_write_workloads": True, | ||
| } | ||
| else: | ||
| database_config = { | ||
| "engine": "postgresql", | ||
| "url": settings.DATABASE_URL, | ||
| } | ||
|
Comment on lines
+150
to
+154
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Database credentials leaked through architecture API endpoint The Root Cause and ImpactPreviously, For example, the default This dict is returned by Impact: Database credentials are exposed to any authenticated user with at least Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
| return { | ||
| "database": database_config, | ||
| "object_storage": { | ||
| "enabled": settings.OBJECT_STORAGE_ENABLED, | ||
| "bucket": settings.OBJECT_STORAGE_BUCKET, | ||
| "endpoint": settings.OBJECT_STORAGE_ENDPOINT, | ||
| }, | ||
| } | ||
|
|
||
| def architecture(self) -> Dict[str, Any]: | ||
| """Return stack topology and active configuration.""" | ||
| return { | ||
|
|
@@ -48,14 +180,7 @@ def architecture(self) -> Dict[str, Any]: | |
| "task_queue": settings.TEMPORAL_TASK_QUEUE, | ||
| }, | ||
| ).__dict__, | ||
| "event_layer": StackComponent( | ||
| name="kafka", | ||
| enabled=settings.KAFKA_ENABLED, | ||
| config={ | ||
| "bootstrap_servers": settings.KAFKA_BOOTSTRAP_SERVERS, | ||
| "topic": settings.KAFKA_EXECUTION_TOPIC, | ||
| }, | ||
| ).__dict__, | ||
| "event_layer": self._event_layer(), | ||
| "execution": StackComponent( | ||
| name="kubernetes-jobs", | ||
| enabled=True, | ||
|
|
@@ -65,22 +190,8 @@ def architecture(self) -> Dict[str, Any]: | |
| "service_account": settings.KUBERNETES_SERVICE_ACCOUNT, | ||
| }, | ||
| ).__dict__, | ||
| "distributed_compute": StackComponent( | ||
| name="ray", | ||
| enabled=settings.RAY_ENABLED, | ||
| config={ | ||
| "dashboard_url": settings.RAY_DASHBOARD_URL, | ||
| "entrypoint": settings.RAY_JOB_ENTRYPOINT, | ||
| }, | ||
| ).__dict__, | ||
| "storage": { | ||
| "database": "postgresql", | ||
| "object_storage": { | ||
| "enabled": settings.OBJECT_STORAGE_ENABLED, | ||
| "bucket": settings.OBJECT_STORAGE_BUCKET, | ||
| "endpoint": settings.OBJECT_STORAGE_ENDPOINT, | ||
| }, | ||
| }, | ||
| "distributed_compute": self._distributed_compute(), | ||
| "storage": self._storage(), | ||
| "monitoring": { | ||
| "metrics": ["prometheus", "grafana"], | ||
| "logs": ["elasticsearch", "logstash", "kibana"], | ||
|
|
@@ -116,25 +227,38 @@ def submit_execution(self, pipeline_id: str, payload: Dict[str, Any]) -> Dict[st | |
| }, | ||
| ] | ||
|
|
||
| if settings.RAY_ENABLED: | ||
| commands.append( | ||
| { | ||
| "layer": "distributed_compute", | ||
| "engine": "ray", | ||
| "action": "submit_ray_job", | ||
| "dashboard": settings.RAY_DASHBOARD_URL, | ||
| } | ||
| ) | ||
|
|
||
| if settings.KAFKA_ENABLED: | ||
| commands.append( | ||
| { | ||
| "layer": "event_layer", | ||
| "engine": "kafka", | ||
| "action": "publish_event", | ||
| "topic": settings.KAFKA_EXECUTION_TOPIC, | ||
| } | ||
| ) | ||
| compute = self._distributed_compute() | ||
| if compute["enabled"]: | ||
| compute_command = { | ||
| "layer": "distributed_compute", | ||
| "engine": compute["name"], | ||
| "action": f"submit_{compute['name']}_job", | ||
| } | ||
| if compute["name"] == "ray": | ||
| compute_command["dashboard"] = settings.RAY_DASHBOARD_URL | ||
| if compute["name"] == "spark": | ||
| compute_command["master_url"] = settings.SPARK_MASTER_URL | ||
| if compute["name"] == "dask": | ||
| compute_command["scheduler_address"] = settings.DASK_SCHEDULER_ADDRESS | ||
| commands.append(compute_command) | ||
|
|
||
| event_layer = self._event_layer() | ||
| if event_layer["enabled"]: | ||
| event_command = { | ||
| "layer": "event_layer", | ||
| "engine": event_layer["name"], | ||
| "action": "publish_event", | ||
| } | ||
| if event_layer["name"] == "kafka": | ||
| event_command["topic"] = settings.KAFKA_EXECUTION_TOPIC | ||
| if event_layer["name"] == "pulsar": | ||
| event_command["topic"] = settings.PULSAR_EXECUTION_TOPIC | ||
| if event_layer["name"] == "rabbitmq": | ||
| event_command["exchange"] = settings.RABBITMQ_EXCHANGE | ||
| event_command["routing_key"] = settings.RABBITMQ_QUEUE | ||
| if event_layer["name"] == "nats": | ||
| event_command["subject"] = settings.NATS_SUBJECT | ||
| commands.append(event_command) | ||
|
|
||
| return { | ||
| "execution_id": execution_id, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 RabbitMQ credentials leaked through architecture API endpoint
The
_event_layer()method includes the full RabbitMQ connection URL (containing credentials) in its return value whenEVENT_BACKENDis"rabbitmq".Root Cause and Impact
At
pipeline/backend/core/modern_stack.py:55, the RabbitMQ config includes"url": settings.RABBITMQ_URL. The default value ofRABBITMQ_URLisamqp://guest:guest@localhost:5672/(seepipeline/backend/config.py:155), which contains the username and password.This is returned by
_event_layer()and included in thearchitecture()response at line 183, which is served to HTTP clients via the/advanced-stack/architectureendpoint atpipeline/backend/api/routes/advanced_stack.py:26. Any user withviewerrole can see these credentials.Impact: RabbitMQ credentials are exposed to any authenticated user with at least
viewerrole through the REST API.Was this helpful? React with 👍 or 👎 to provide feedback.