Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions pipeline/backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,24 @@ class Settings(BaseSettings):
KAFKA_TRIGGER_TOPIC: str = "pipeline.triggers"
KAFKA_TRIGGER_GROUP: str = "flexiroaster-orchestrator"

# Event & messaging provider settings
EVENT_BACKEND: str = "kafka" # kafka | pulsar | rabbitmq | nats
PULSAR_ENABLED: bool = False
PULSAR_SERVICE_URL: str = "pulsar://localhost:6650"
PULSAR_EXECUTION_TOPIC: str = "persistent://public/default/pipeline.executions"
PULSAR_TENANT: str = "public"
PULSAR_NAMESPACE: str = "default"
PULSAR_GEO_REPLICATION_REGIONS: List[str] = []

RABBITMQ_ENABLED: bool = False
RABBITMQ_URL: str = "amqp://guest:guest@localhost:5672/"
RABBITMQ_EXCHANGE: str = "pipeline.executions"
RABBITMQ_QUEUE: str = "pipeline.execution.tasks"

NATS_ENABLED: bool = False
NATS_SERVERS: List[str] = ["nats://localhost:4222"]
NATS_SUBJECT: str = "pipeline.executions"

# ===================
# Enterprise Orchestration
# ===================
Expand Down Expand Up @@ -169,12 +187,28 @@ class Settings(BaseSettings):
RAY_DASHBOARD_URL: str = "http://localhost:8265"
RAY_JOB_ENTRYPOINT: str = "python -m worker"

DISTRIBUTED_COMPUTE_BACKEND: str = "ray" # ray | celery | spark | dask
SPARK_ENABLED: bool = False
SPARK_MASTER_URL: str = "spark://localhost:7077"
SPARK_APP_NAME: str = "flexiroaster-pipeline"
SPARK_STREAMING_ENABLED: bool = True

DASK_ENABLED: bool = False
DASK_SCHEDULER_ADDRESS: str = "tcp://localhost:8786"
DASK_DASHBOARD_URL: str = "http://localhost:8787"

OBJECT_STORAGE_ENABLED: bool = True
OBJECT_STORAGE_BUCKET: str = "pipeline-artifacts"
OBJECT_STORAGE_ENDPOINT: str = "http://localhost:9000"
OBJECT_STORAGE_ACCESS_KEY: str = "minio"
OBJECT_STORAGE_SECRET_KEY: str = "minio123"

DATABASE_BACKEND: str = "postgresql" # postgresql | cockroachdb | mongodb | cassandra
COCKROACHDB_URL: str = "postgresql://root@localhost:26257/flexiroaster?sslmode=disable"
MONGODB_URL: str = "mongodb://localhost:27017/flexiroaster"
CASSANDRA_CONTACT_POINTS: List[str] = ["localhost"]
CASSANDRA_KEYSPACE: str = "flexiroaster"

JWT_SECRET: str = ""
JWT_ISSUER: str = "flexiroaster"
JWT_AUDIENCE: str = "flexiroaster-api"
Expand Down
216 changes: 170 additions & 46 deletions pipeline/backend/core/modern_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
Implements a configurable orchestration profile that combines:
- FastAPI API layer
- Airflow/Temporal orchestrators
- Kafka eventing
- Pluggable eventing (Kafka/Pulsar/RabbitMQ/NATS)
- Kubernetes jobs for execution
- Ray for distributed compute
- PostgreSQL + object storage persistence
- Pluggable distributed compute (Ray/Spark/Dask/Celery)
- Pluggable storage (PostgreSQL/CockroachDB/MongoDB/Cassandra) + object storage
- Prometheus/Grafana/ELK observability
- JWT + RBAC + secrets manager security
"""
Expand All @@ -30,6 +30,138 @@ class StackComponent:
class ModernOrchestrationStack:
"""Service layer for the high-end orchestration stack."""

def _event_layer(self) -> Dict[str, Any]:
backend = settings.EVENT_BACKEND.lower()

if backend == "pulsar":
return StackComponent(
name="pulsar",
enabled=settings.PULSAR_ENABLED,
config={
"service_url": settings.PULSAR_SERVICE_URL,
"topic": settings.PULSAR_EXECUTION_TOPIC,
"tenant": settings.PULSAR_TENANT,
"namespace": settings.PULSAR_NAMESPACE,
"geo_replication_regions": settings.PULSAR_GEO_REPLICATION_REGIONS,
"multi_tenant": True,
},
).__dict__

if backend == "rabbitmq":
return StackComponent(
name="rabbitmq",
enabled=settings.RABBITMQ_ENABLED,
config={
"url": settings.RABBITMQ_URL,
Comment on lines +54 to +55
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 RabbitMQ credentials leaked through architecture API endpoint

The _event_layer() method includes the full RabbitMQ connection URL (containing credentials) in its return value when EVENT_BACKEND is "rabbitmq".

Root Cause and Impact

At pipeline/backend/core/modern_stack.py:55, the RabbitMQ config includes "url": settings.RABBITMQ_URL. The default value of RABBITMQ_URL is amqp://guest:guest@localhost:5672/ (see pipeline/backend/config.py:155), which contains the username and password.

This is returned by _event_layer() and included in the architecture() response at line 183, which is served to HTTP clients via the /advanced-stack/architecture endpoint at pipeline/backend/api/routes/advanced_stack.py:26. Any user with viewer role can see these credentials.

Impact: RabbitMQ credentials are exposed to any authenticated user with at least viewer role through the REST API.

Suggested change
config={
"url": settings.RABBITMQ_URL,
config={
"exchange": settings.RABBITMQ_EXCHANGE,
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

"exchange": settings.RABBITMQ_EXCHANGE,
"queue": settings.RABBITMQ_QUEUE,
},
).__dict__

if backend == "nats":
return StackComponent(
name="nats",
enabled=settings.NATS_ENABLED,
config={
"servers": settings.NATS_SERVERS,
"subject": settings.NATS_SUBJECT,
},
).__dict__

return StackComponent(
name="kafka",
enabled=settings.KAFKA_ENABLED,
config={
"bootstrap_servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"topic": settings.KAFKA_EXECUTION_TOPIC,
},
).__dict__

def _distributed_compute(self) -> Dict[str, Any]:
backend = settings.DISTRIBUTED_COMPUTE_BACKEND.lower()

if backend == "spark":
return StackComponent(
name="spark",
enabled=settings.SPARK_ENABLED,
config={
"master_url": settings.SPARK_MASTER_URL,
"app_name": settings.SPARK_APP_NAME,
"supports_batch": True,
"supports_streaming": settings.SPARK_STREAMING_ENABLED,
},
).__dict__

if backend == "dask":
return StackComponent(
name="dask",
enabled=settings.DASK_ENABLED,
config={
"scheduler_address": settings.DASK_SCHEDULER_ADDRESS,
"dashboard_url": settings.DASK_DASHBOARD_URL,
"python_native": True,
},
).__dict__

if backend == "celery":
return StackComponent(
name="celery",
enabled=True,
config={
"broker_url": settings.CELERY_BROKER_URL,
"result_backend": settings.CELERY_RESULT_BACKEND,
"task_queue": settings.CELERY_TASK_QUEUE,
},
).__dict__

return StackComponent(
name="ray",
enabled=settings.RAY_ENABLED,
config={
"dashboard_url": settings.RAY_DASHBOARD_URL,
"entrypoint": settings.RAY_JOB_ENTRYPOINT,
},
).__dict__

def _storage(self) -> Dict[str, Any]:
backend = settings.DATABASE_BACKEND.lower()
database_config: Dict[str, Any]

if backend == "cockroachdb":
database_config = {
"engine": "cockroachdb",
"url": settings.COCKROACHDB_URL,
"strong_consistency": True,
"high_availability": True,
}
elif backend == "mongodb":
database_config = {
"engine": "mongodb",
"url": settings.MONGODB_URL,
"flexible_schema": True,
}
elif backend == "cassandra":
database_config = {
"engine": "cassandra",
"contact_points": settings.CASSANDRA_CONTACT_POINTS,
"keyspace": settings.CASSANDRA_KEYSPACE,
"high_write_workloads": True,
}
else:
database_config = {
"engine": "postgresql",
"url": settings.DATABASE_URL,
}
Comment on lines +150 to +154
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Database credentials leaked through architecture API endpoint

The _storage() method includes the full database connection URL (containing credentials) in its return value, which is served directly to API consumers via the /advanced-stack/architecture endpoint.

Root Cause and Impact

Previously, architecture() returned just the string "postgresql" for the database field. The new _storage() method now returns settings.DATABASE_URL (line 153), settings.COCKROACHDB_URL (line 133), or settings.MONGODB_URL (line 140) — all of which contain full connection strings with embedded credentials.

For example, the default DATABASE_URL is postgresql+psycopg2://airflow:airflow@localhost:5432/flexiroaster and COCKROACHDB_URL is postgresql://root@localhost:26257/flexiroaster?sslmode=disable.

This dict is returned by architecture() at pipeline/backend/core/modern_stack.py:194 and served to HTTP clients at pipeline/backend/api/routes/advanced_stack.py:26. The endpoint is accessible to users with the viewer role (pipeline/backend/api/routes/advanced_stack.py:23), meaning even low-privilege users can see database credentials.

Impact: Database credentials are exposed to any authenticated user with at least viewer role through the REST API.

Prompt for agents
In pipeline/backend/core/modern_stack.py, the _storage() method (lines 126-163) should not include raw database connection URLs in its return value, as this data is served through the public API endpoint at pipeline/backend/api/routes/advanced_stack.py:26. Remove the 'url' key from the database_config dicts for postgresql (line 153), cockroachdb (line 133), and mongodb (line 140). Instead, include only non-sensitive metadata like the engine name. For example, the postgresql case should be: database_config = {"engine": "postgresql"} without the url field. Apply the same pattern to cockroachdb and mongodb cases.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


return {
"database": database_config,
"object_storage": {
"enabled": settings.OBJECT_STORAGE_ENABLED,
"bucket": settings.OBJECT_STORAGE_BUCKET,
"endpoint": settings.OBJECT_STORAGE_ENDPOINT,
},
}

def architecture(self) -> Dict[str, Any]:
"""Return stack topology and active configuration."""
return {
Expand All @@ -48,14 +180,7 @@ def architecture(self) -> Dict[str, Any]:
"task_queue": settings.TEMPORAL_TASK_QUEUE,
},
).__dict__,
"event_layer": StackComponent(
name="kafka",
enabled=settings.KAFKA_ENABLED,
config={
"bootstrap_servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"topic": settings.KAFKA_EXECUTION_TOPIC,
},
).__dict__,
"event_layer": self._event_layer(),
"execution": StackComponent(
name="kubernetes-jobs",
enabled=True,
Expand All @@ -65,22 +190,8 @@ def architecture(self) -> Dict[str, Any]:
"service_account": settings.KUBERNETES_SERVICE_ACCOUNT,
},
).__dict__,
"distributed_compute": StackComponent(
name="ray",
enabled=settings.RAY_ENABLED,
config={
"dashboard_url": settings.RAY_DASHBOARD_URL,
"entrypoint": settings.RAY_JOB_ENTRYPOINT,
},
).__dict__,
"storage": {
"database": "postgresql",
"object_storage": {
"enabled": settings.OBJECT_STORAGE_ENABLED,
"bucket": settings.OBJECT_STORAGE_BUCKET,
"endpoint": settings.OBJECT_STORAGE_ENDPOINT,
},
},
"distributed_compute": self._distributed_compute(),
"storage": self._storage(),
"monitoring": {
"metrics": ["prometheus", "grafana"],
"logs": ["elasticsearch", "logstash", "kibana"],
Expand Down Expand Up @@ -116,25 +227,38 @@ def submit_execution(self, pipeline_id: str, payload: Dict[str, Any]) -> Dict[st
},
]

if settings.RAY_ENABLED:
commands.append(
{
"layer": "distributed_compute",
"engine": "ray",
"action": "submit_ray_job",
"dashboard": settings.RAY_DASHBOARD_URL,
}
)

if settings.KAFKA_ENABLED:
commands.append(
{
"layer": "event_layer",
"engine": "kafka",
"action": "publish_event",
"topic": settings.KAFKA_EXECUTION_TOPIC,
}
)
compute = self._distributed_compute()
if compute["enabled"]:
compute_command = {
"layer": "distributed_compute",
"engine": compute["name"],
"action": f"submit_{compute['name']}_job",
}
if compute["name"] == "ray":
compute_command["dashboard"] = settings.RAY_DASHBOARD_URL
if compute["name"] == "spark":
compute_command["master_url"] = settings.SPARK_MASTER_URL
if compute["name"] == "dask":
compute_command["scheduler_address"] = settings.DASK_SCHEDULER_ADDRESS
commands.append(compute_command)

event_layer = self._event_layer()
if event_layer["enabled"]:
event_command = {
"layer": "event_layer",
"engine": event_layer["name"],
"action": "publish_event",
}
if event_layer["name"] == "kafka":
event_command["topic"] = settings.KAFKA_EXECUTION_TOPIC
if event_layer["name"] == "pulsar":
event_command["topic"] = settings.PULSAR_EXECUTION_TOPIC
if event_layer["name"] == "rabbitmq":
event_command["exchange"] = settings.RABBITMQ_EXCHANGE
event_command["routing_key"] = settings.RABBITMQ_QUEUE
if event_layer["name"] == "nats":
event_command["subject"] = settings.NATS_SUBJECT
commands.append(event_command)

return {
"execution_id": execution_id,
Expand Down
42 changes: 41 additions & 1 deletion pipeline/backend/tests/test_modern_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
sys.modules[spec.name] = modern_stack
spec.loader.exec_module(modern_stack)
ModernOrchestrationStack = modern_stack.ModernOrchestrationStack
settings = modern_stack.settings


def test_architecture_contains_requested_layers():
Expand All @@ -20,7 +21,7 @@ def test_architecture_contains_requested_layers():
assert architecture["api_layer"]["name"] == "FastAPI"
assert architecture["orchestration"]["name"] in {"temporal", "airflow"}
assert architecture["execution"]["name"] == "kubernetes-jobs"
assert architecture["distributed_compute"]["name"] == "ray"
assert architecture["distributed_compute"]["name"] in {"ray", "spark", "dask", "celery"}
assert architecture["security"]["authorization"] == "rbac"


Expand All @@ -32,3 +33,42 @@ def test_submit_execution_returns_envelope():
assert envelope["pipeline_id"] == "pipeline-123"
assert envelope["status"] == "accepted"
assert any(command["layer"] == "execution" for command in envelope["commands"])


def test_architecture_supports_pulsar_spark_and_cockroachdb(monkeypatch):
monkeypatch.setattr(settings, "EVENT_BACKEND", "pulsar")
monkeypatch.setattr(settings, "PULSAR_ENABLED", True)
monkeypatch.setattr(settings, "DISTRIBUTED_COMPUTE_BACKEND", "spark")
monkeypatch.setattr(settings, "SPARK_ENABLED", True)
monkeypatch.setattr(settings, "DATABASE_BACKEND", "cockroachdb")

stack = ModernOrchestrationStack()
architecture = stack.architecture()

assert architecture["event_layer"]["name"] == "pulsar"
assert architecture["event_layer"]["config"]["multi_tenant"] is True
assert architecture["distributed_compute"]["name"] == "spark"
assert architecture["storage"]["database"]["engine"] == "cockroachdb"


def test_submit_execution_supports_rabbitmq_and_dask(monkeypatch):
monkeypatch.setattr(settings, "EVENT_BACKEND", "rabbitmq")
monkeypatch.setattr(settings, "RABBITMQ_ENABLED", True)
monkeypatch.setattr(settings, "DISTRIBUTED_COMPUTE_BACKEND", "dask")
monkeypatch.setattr(settings, "DASK_ENABLED", True)

stack = ModernOrchestrationStack()
envelope = stack.submit_execution("pipeline-abc", {})

assert any(
command["layer"] == "event_layer"
and command["engine"] == "rabbitmq"
and command["action"] == "publish_event"
for command in envelope["commands"]
)
assert any(
command["layer"] == "distributed_compute"
and command["engine"] == "dask"
and command["action"] == "submit_dask_job"
for command in envelope["commands"]
)
Loading