Skip to content

Commit 4a420ec

Browse files
authored
Merge pull request #87 from fuzziecoder/codex/implement-real-time-streaming-pipelines
Add Kafka/Redpanda streaming backend support
2 parents 5fe2a2d + a5168db commit 4a420ec

5 files changed

Lines changed: 74 additions & 8 deletions

File tree

backend/.env.example

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,13 @@ SENSITIVE_FIELDS=password,token,secret,api_key,authorization,credit_card,ssn
2323
# Development default: http://localhost:3000,http://localhost:5173,http://127.0.0.1:5173
2424
# Production example: https://yourdomain.com,https://app.yourdomain.com
2525
CORS_ORIGINS=http://localhost:3000,http://localhost:5173,http://127.0.0.1:5173
26+
27+
# Event streaming (Kafka-compatible, supports Apache Kafka and Redpanda)
28+
ENABLE_EVENT_STREAMING=False
29+
EVENT_STREAM_BACKEND=kafka
30+
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
31+
KAFKA_CLIENT_ID=flexiroaster-backend
32+
TOPIC_PIPELINE_CREATED=pipeline.created
33+
TOPIC_EXECUTION_STARTED=execution.started
34+
TOPIC_EXECUTION_FAILED=execution.failed
35+
TOPIC_EXECUTION_COMPLETED=execution.completed

backend/README.md

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,19 +136,35 @@ cp backend/.env.example backend/.env
136136

137137
## Event-Driven Architecture (Advanced Setup)
138138

139-
FlexiRoaster supports Kafka-backed domain events for loose coupling, high scalability, audit-friendly workflows, and real-time analytics.
139+
FlexiRoaster supports Kafka-compatible domain events (Apache Kafka or Redpanda) for loose coupling, high scalability, audit-friendly workflows, and real-time analytics.
140+
141+
142+
#### Apache Kafka
143+
- Event-driven triggers
144+
- High-throughput ingestion
145+
- Real-time pipeline activation
146+
147+
#### Redpanda
148+
- Kafka-compatible
149+
- Lower operational complexity
150+
151+
Use this layer when:
152+
- Pipelines should trigger from events
153+
- You need real-time monitoring
154+
- You process millions of records
140155

141156
### Published topics
142157
- `pipeline.created`
143158
- `execution.started`
144159
- `execution.failed`
145160
- `execution.completed`
146161

147-
### Enable Kafka publishing
162+
### Enable streaming publishing
148163
Set the following environment variables in `backend/.env`:
149164

150165
```env
151166
ENABLE_EVENT_STREAMING=true
167+
EVENT_STREAM_BACKEND=kafka
152168
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
153169
KAFKA_CLIENT_ID=flexiroaster-backend
154170
TOPIC_PIPELINE_CREATED=pipeline.created
@@ -157,7 +173,7 @@ TOPIC_EXECUTION_FAILED=execution.failed
157173
TOPIC_EXECUTION_COMPLETED=execution.completed
158174
```
159175

160-
If Kafka is unavailable, the backend falls back to structured application logs for events so local development continues to work.
176+
If Kafka/Redpanda is unavailable, the backend falls back to structured application logs for events so local development continues to work.
161177

162178
## Next Steps
163179

backend/config.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44
from pydantic_settings import BaseSettings
55
from pydantic import field_validator
6-
from typing import List, Optional, Union
6+
from typing import List, Literal, Optional, Union
77

88

99
class Settings(BaseSettings):
@@ -40,8 +40,9 @@ class Settings(BaseSettings):
4040
KEYCLOAK_ENABLED: bool = False
4141
KEYCLOAK_ISSUER: Optional[str] = None
4242
KEYCLOAK_CLIENT_ID: Optional[str] = None
43-
# Event-driven architecture (Kafka)
43+
# Event-driven architecture (Kafka / Redpanda)
4444
ENABLE_EVENT_STREAMING: bool = False
45+
EVENT_STREAM_BACKEND: Literal["kafka", "redpanda"] = "kafka"
4546
KAFKA_BOOTSTRAP_SERVERS: Union[str, List[str]] = "localhost:9092"
4647
KAFKA_CLIENT_ID: str = "flexiroaster-backend"
4748

backend/events/publisher.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Kafka-backed event publisher with graceful fallback for local/dev environments."""
1+
"""Kafka/Redpanda-backed event publisher with graceful fallback for local/dev environments."""
22

33
from __future__ import annotations
44

@@ -36,7 +36,7 @@ def _ensure_producer(self) -> Optional[Any]:
3636
return self._producer
3737

3838
if KafkaProducer is None:
39-
logger.warning("Event streaming enabled but kafka-python is not installed.")
39+
logger.warning("Event streaming enabled for %s but kafka-python is not installed.", settings.EVENT_STREAM_BACKEND)
4040
return None
4141

4242
try:
@@ -48,13 +48,14 @@ def _ensure_producer(self) -> Optional[Any]:
4848
)
4949
return self._producer
5050
except Exception as exc: # pragma: no cover - depends on external broker
51-
logger.warning("Failed to initialize Kafka producer: %s", exc)
51+
logger.warning("Failed to initialize %s producer: %s", settings.EVENT_STREAM_BACKEND, exc)
5252
return None
5353

5454
def publish(self, topic: str, key: Optional[str], payload: Dict[str, Any]) -> None:
5555
"""Publish a single event with standard envelope metadata."""
5656
event = {
5757
"event_type": topic,
58+
"stream_backend": settings.EVENT_STREAM_BACKEND,
5859
"published_at": datetime.now().isoformat(),
5960
"payload": payload,
6061
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from backend.events.publisher import EventPublisher
2+
3+
4+
class _StubFuture:
5+
pass
6+
7+
8+
class _StubProducer:
9+
def __init__(self, *args, **kwargs):
10+
self.sent = []
11+
12+
def send(self, topic, key=None, value=None):
13+
self.sent.append({"topic": topic, "key": key, "value": value})
14+
return _StubFuture()
15+
16+
def flush(self, timeout=None):
17+
return None
18+
19+
20+
def test_publish_adds_stream_backend(monkeypatch):
21+
monkeypatch.setattr("backend.events.publisher.KafkaProducer", _StubProducer)
22+
monkeypatch.setattr("backend.events.publisher.settings.ENABLE_EVENT_STREAMING", True)
23+
monkeypatch.setattr("backend.events.publisher.settings.EVENT_STREAM_BACKEND", "redpanda")
24+
25+
publisher = EventPublisher()
26+
publisher.publish("pipeline.created", "pipeline-1", {"id": "pipeline-1"})
27+
28+
sent = publisher._producer.sent # type: ignore[attr-defined]
29+
assert sent[0]["topic"] == "pipeline.created"
30+
assert sent[0]["value"]["stream_backend"] == "redpanda"
31+
32+
33+
def test_publish_noop_when_streaming_disabled(monkeypatch):
34+
monkeypatch.setattr("backend.events.publisher.settings.ENABLE_EVENT_STREAMING", False)
35+
publisher = EventPublisher()
36+
publisher.publish("execution.started", "execution-1", {"id": "execution-1"})
37+
38+
assert publisher._producer is None

0 commit comments

Comments
 (0)