From 01092a7d26566af302c8005b036a66a818262100 Mon Sep 17 00:00:00 2001 From: Ramkumar <153575188+fuzziecoder@users.noreply.github.com> Date: Wed, 25 Feb 2026 08:35:37 -0500 Subject: [PATCH] Add pluggable event, compute, and database backends --- pipeline/backend/config.py | 34 +++ pipeline/backend/core/modern_stack.py | 216 +++++++++++++++----- pipeline/backend/tests/test_modern_stack.py | 42 +++- 3 files changed, 245 insertions(+), 47 deletions(-) diff --git a/pipeline/backend/config.py b/pipeline/backend/config.py index 8d48f73..4fd4b63 100644 --- a/pipeline/backend/config.py +++ b/pipeline/backend/config.py @@ -142,6 +142,24 @@ class Settings(BaseSettings): KAFKA_TRIGGER_TOPIC: str = "pipeline.triggers" KAFKA_TRIGGER_GROUP: str = "flexiroaster-orchestrator" + # Event & messaging provider settings + EVENT_BACKEND: str = "kafka" # kafka | pulsar | rabbitmq | nats + PULSAR_ENABLED: bool = False + PULSAR_SERVICE_URL: str = "pulsar://localhost:6650" + PULSAR_EXECUTION_TOPIC: str = "persistent://public/default/pipeline.executions" + PULSAR_TENANT: str = "public" + PULSAR_NAMESPACE: str = "default" + PULSAR_GEO_REPLICATION_REGIONS: List[str] = [] + + RABBITMQ_ENABLED: bool = False + RABBITMQ_URL: str = "amqp://guest:guest@localhost:5672/" + RABBITMQ_EXCHANGE: str = "pipeline.executions" + RABBITMQ_QUEUE: str = "pipeline.execution.tasks" + + NATS_ENABLED: bool = False + NATS_SERVERS: List[str] = ["nats://localhost:4222"] + NATS_SUBJECT: str = "pipeline.executions" + # =================== # Enterprise Orchestration # =================== @@ -169,12 +187,28 @@ class Settings(BaseSettings): RAY_DASHBOARD_URL: str = "http://localhost:8265" RAY_JOB_ENTRYPOINT: str = "python -m worker" + DISTRIBUTED_COMPUTE_BACKEND: str = "ray" # ray | celery | spark | dask + SPARK_ENABLED: bool = False + SPARK_MASTER_URL: str = "spark://localhost:7077" + SPARK_APP_NAME: str = "flexiroaster-pipeline" + SPARK_STREAMING_ENABLED: bool = True + + DASK_ENABLED: bool = False + DASK_SCHEDULER_ADDRESS: str = "tcp://localhost:8786" + DASK_DASHBOARD_URL: str = "http://localhost:8787" + OBJECT_STORAGE_ENABLED: bool = True OBJECT_STORAGE_BUCKET: str = "pipeline-artifacts" OBJECT_STORAGE_ENDPOINT: str = "http://localhost:9000" OBJECT_STORAGE_ACCESS_KEY: str = "minio" OBJECT_STORAGE_SECRET_KEY: str = "minio123" + DATABASE_BACKEND: str = "postgresql" # postgresql | cockroachdb | mongodb | cassandra + COCKROACHDB_URL: str = "postgresql://root@localhost:26257/flexiroaster?sslmode=disable" + MONGODB_URL: str = "mongodb://localhost:27017/flexiroaster" + CASSANDRA_CONTACT_POINTS: List[str] = ["localhost"] + CASSANDRA_KEYSPACE: str = "flexiroaster" + JWT_SECRET: str = "" JWT_ISSUER: str = "flexiroaster" JWT_AUDIENCE: str = "flexiroaster-api" diff --git a/pipeline/backend/core/modern_stack.py b/pipeline/backend/core/modern_stack.py index 659ee7e..20b64c2 100644 --- a/pipeline/backend/core/modern_stack.py +++ b/pipeline/backend/core/modern_stack.py @@ -3,10 +3,10 @@ Implements a configurable orchestration profile that combines: - FastAPI API layer - Airflow/Temporal orchestrators -- Kafka eventing +- Pluggable eventing (Kafka/Pulsar/RabbitMQ/NATS) - Kubernetes jobs for execution -- Ray for distributed compute -- PostgreSQL + object storage persistence +- Pluggable distributed compute (Ray/Spark/Dask/Celery) +- Pluggable storage (PostgreSQL/CockroachDB/MongoDB/Cassandra) + object storage - Prometheus/Grafana/ELK observability - JWT + RBAC + secrets manager security """ @@ -30,6 +30,138 @@ class StackComponent: class ModernOrchestrationStack: """Service layer for the high-end orchestration stack.""" + def _event_layer(self) -> Dict[str, Any]: + backend = settings.EVENT_BACKEND.lower() + + if backend == "pulsar": + return StackComponent( + name="pulsar", + enabled=settings.PULSAR_ENABLED, + config={ + "service_url": settings.PULSAR_SERVICE_URL, + "topic": settings.PULSAR_EXECUTION_TOPIC, + "tenant": settings.PULSAR_TENANT, + "namespace": settings.PULSAR_NAMESPACE, + "geo_replication_regions": settings.PULSAR_GEO_REPLICATION_REGIONS, + "multi_tenant": True, + }, + ).__dict__ + + if backend == "rabbitmq": + return StackComponent( + name="rabbitmq", + enabled=settings.RABBITMQ_ENABLED, + config={ + "url": settings.RABBITMQ_URL, + "exchange": settings.RABBITMQ_EXCHANGE, + "queue": settings.RABBITMQ_QUEUE, + }, + ).__dict__ + + if backend == "nats": + return StackComponent( + name="nats", + enabled=settings.NATS_ENABLED, + config={ + "servers": settings.NATS_SERVERS, + "subject": settings.NATS_SUBJECT, + }, + ).__dict__ + + return StackComponent( + name="kafka", + enabled=settings.KAFKA_ENABLED, + config={ + "bootstrap_servers": settings.KAFKA_BOOTSTRAP_SERVERS, + "topic": settings.KAFKA_EXECUTION_TOPIC, + }, + ).__dict__ + + def _distributed_compute(self) -> Dict[str, Any]: + backend = settings.DISTRIBUTED_COMPUTE_BACKEND.lower() + + if backend == "spark": + return StackComponent( + name="spark", + enabled=settings.SPARK_ENABLED, + config={ + "master_url": settings.SPARK_MASTER_URL, + "app_name": settings.SPARK_APP_NAME, + "supports_batch": True, + "supports_streaming": settings.SPARK_STREAMING_ENABLED, + }, + ).__dict__ + + if backend == "dask": + return StackComponent( + name="dask", + enabled=settings.DASK_ENABLED, + config={ + "scheduler_address": settings.DASK_SCHEDULER_ADDRESS, + "dashboard_url": settings.DASK_DASHBOARD_URL, + "python_native": True, + }, + ).__dict__ + + if backend == "celery": + return StackComponent( + name="celery", + enabled=True, + config={ + "broker_url": settings.CELERY_BROKER_URL, + "result_backend": settings.CELERY_RESULT_BACKEND, + "task_queue": settings.CELERY_TASK_QUEUE, + }, + ).__dict__ + + return StackComponent( + name="ray", + enabled=settings.RAY_ENABLED, + config={ + "dashboard_url": settings.RAY_DASHBOARD_URL, + "entrypoint": settings.RAY_JOB_ENTRYPOINT, + }, + ).__dict__ + + def _storage(self) -> Dict[str, Any]: + backend = settings.DATABASE_BACKEND.lower() + database_config: Dict[str, Any] + + if backend == "cockroachdb": + database_config = { + "engine": "cockroachdb", + "url": settings.COCKROACHDB_URL, + "strong_consistency": True, + "high_availability": True, + } + elif backend == "mongodb": + database_config = { + "engine": "mongodb", + "url": settings.MONGODB_URL, + "flexible_schema": True, + } + elif backend == "cassandra": + database_config = { + "engine": "cassandra", + "contact_points": settings.CASSANDRA_CONTACT_POINTS, + "keyspace": settings.CASSANDRA_KEYSPACE, + "high_write_workloads": True, + } + else: + database_config = { + "engine": "postgresql", + "url": settings.DATABASE_URL, + } + + return { + "database": database_config, + "object_storage": { + "enabled": settings.OBJECT_STORAGE_ENABLED, + "bucket": settings.OBJECT_STORAGE_BUCKET, + "endpoint": settings.OBJECT_STORAGE_ENDPOINT, + }, + } + def architecture(self) -> Dict[str, Any]: """Return stack topology and active configuration.""" return { @@ -48,14 +180,7 @@ def architecture(self) -> Dict[str, Any]: "task_queue": settings.TEMPORAL_TASK_QUEUE, }, ).__dict__, - "event_layer": StackComponent( - name="kafka", - enabled=settings.KAFKA_ENABLED, - config={ - "bootstrap_servers": settings.KAFKA_BOOTSTRAP_SERVERS, - "topic": settings.KAFKA_EXECUTION_TOPIC, - }, - ).__dict__, + "event_layer": self._event_layer(), "execution": StackComponent( name="kubernetes-jobs", enabled=True, @@ -65,22 +190,8 @@ def architecture(self) -> Dict[str, Any]: "service_account": settings.KUBERNETES_SERVICE_ACCOUNT, }, ).__dict__, - "distributed_compute": StackComponent( - name="ray", - enabled=settings.RAY_ENABLED, - config={ - "dashboard_url": settings.RAY_DASHBOARD_URL, - "entrypoint": settings.RAY_JOB_ENTRYPOINT, - }, - ).__dict__, - "storage": { - "database": "postgresql", - "object_storage": { - "enabled": settings.OBJECT_STORAGE_ENABLED, - "bucket": settings.OBJECT_STORAGE_BUCKET, - "endpoint": settings.OBJECT_STORAGE_ENDPOINT, - }, - }, + "distributed_compute": self._distributed_compute(), + "storage": self._storage(), "monitoring": { "metrics": ["prometheus", "grafana"], "logs": ["elasticsearch", "logstash", "kibana"], @@ -116,25 +227,38 @@ def submit_execution(self, pipeline_id: str, payload: Dict[str, Any]) -> Dict[st }, ] - if settings.RAY_ENABLED: - commands.append( - { - "layer": "distributed_compute", - "engine": "ray", - "action": "submit_ray_job", - "dashboard": settings.RAY_DASHBOARD_URL, - } - ) - - if settings.KAFKA_ENABLED: - commands.append( - { - "layer": "event_layer", - "engine": "kafka", - "action": "publish_event", - "topic": settings.KAFKA_EXECUTION_TOPIC, - } - ) + compute = self._distributed_compute() + if compute["enabled"]: + compute_command = { + "layer": "distributed_compute", + "engine": compute["name"], + "action": f"submit_{compute['name']}_job", + } + if compute["name"] == "ray": + compute_command["dashboard"] = settings.RAY_DASHBOARD_URL + if compute["name"] == "spark": + compute_command["master_url"] = settings.SPARK_MASTER_URL + if compute["name"] == "dask": + compute_command["scheduler_address"] = settings.DASK_SCHEDULER_ADDRESS + commands.append(compute_command) + + event_layer = self._event_layer() + if event_layer["enabled"]: + event_command = { + "layer": "event_layer", + "engine": event_layer["name"], + "action": "publish_event", + } + if event_layer["name"] == "kafka": + event_command["topic"] = settings.KAFKA_EXECUTION_TOPIC + if event_layer["name"] == "pulsar": + event_command["topic"] = settings.PULSAR_EXECUTION_TOPIC + if event_layer["name"] == "rabbitmq": + event_command["exchange"] = settings.RABBITMQ_EXCHANGE + event_command["routing_key"] = settings.RABBITMQ_QUEUE + if event_layer["name"] == "nats": + event_command["subject"] = settings.NATS_SUBJECT + commands.append(event_command) return { "execution_id": execution_id, diff --git a/pipeline/backend/tests/test_modern_stack.py b/pipeline/backend/tests/test_modern_stack.py index 9eb7d6e..5bbe621 100644 --- a/pipeline/backend/tests/test_modern_stack.py +++ b/pipeline/backend/tests/test_modern_stack.py @@ -10,6 +10,7 @@ sys.modules[spec.name] = modern_stack spec.loader.exec_module(modern_stack) ModernOrchestrationStack = modern_stack.ModernOrchestrationStack +settings = modern_stack.settings def test_architecture_contains_requested_layers(): @@ -20,7 +21,7 @@ def test_architecture_contains_requested_layers(): assert architecture["api_layer"]["name"] == "FastAPI" assert architecture["orchestration"]["name"] in {"temporal", "airflow"} assert architecture["execution"]["name"] == "kubernetes-jobs" - assert architecture["distributed_compute"]["name"] == "ray" + assert architecture["distributed_compute"]["name"] in {"ray", "spark", "dask", "celery"} assert architecture["security"]["authorization"] == "rbac" @@ -32,3 +33,42 @@ def test_submit_execution_returns_envelope(): assert envelope["pipeline_id"] == "pipeline-123" assert envelope["status"] == "accepted" assert any(command["layer"] == "execution" for command in envelope["commands"]) + + +def test_architecture_supports_pulsar_spark_and_cockroachdb(monkeypatch): + monkeypatch.setattr(settings, "EVENT_BACKEND", "pulsar") + monkeypatch.setattr(settings, "PULSAR_ENABLED", True) + monkeypatch.setattr(settings, "DISTRIBUTED_COMPUTE_BACKEND", "spark") + monkeypatch.setattr(settings, "SPARK_ENABLED", True) + monkeypatch.setattr(settings, "DATABASE_BACKEND", "cockroachdb") + + stack = ModernOrchestrationStack() + architecture = stack.architecture() + + assert architecture["event_layer"]["name"] == "pulsar" + assert architecture["event_layer"]["config"]["multi_tenant"] is True + assert architecture["distributed_compute"]["name"] == "spark" + assert architecture["storage"]["database"]["engine"] == "cockroachdb" + + +def test_submit_execution_supports_rabbitmq_and_dask(monkeypatch): + monkeypatch.setattr(settings, "EVENT_BACKEND", "rabbitmq") + monkeypatch.setattr(settings, "RABBITMQ_ENABLED", True) + monkeypatch.setattr(settings, "DISTRIBUTED_COMPUTE_BACKEND", "dask") + monkeypatch.setattr(settings, "DASK_ENABLED", True) + + stack = ModernOrchestrationStack() + envelope = stack.submit_execution("pipeline-abc", {}) + + assert any( + command["layer"] == "event_layer" + and command["engine"] == "rabbitmq" + and command["action"] == "publish_event" + for command in envelope["commands"] + ) + assert any( + command["layer"] == "distributed_compute" + and command["engine"] == "dask" + and command["action"] == "submit_dask_job" + for command in envelope["commands"] + )