The Realtime Stream Processor is a high-performance, event-driven microservice designed to process breaking news and live data streams in real-time. Built with Go for optimal performance and low latency, it serves as the streaming backbone for the SynthoraAI content curation platform.
┌─────────────────────────────────────────────────────────────┐
│ Presentation Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ REST API │ │ WebSocket │ │ GraphQL │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────┐
│ Business Logic Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Processor │ │ AI Client │ │ Validator │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────┐
│ Data Access Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ MongoDB │ │ Redis │ │ Kafka │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
- RSS Feed Pollers: Periodically fetch RSS feeds from configured sources
- API Watchers: Monitor REST APIs for new content
- WebSocket Listeners: Real-time event streams from external sources
- Rate Limiting: Token bucket algorithm to prevent API abuse
- Redis Streams: Lightweight, fast message queue for <100K msg/sec
- Apache Kafka: High-throughput distributed queue for >100K msg/sec
- Topic Routing: Priority-based routing (urgent, high, medium, low)
- Consumer Groups: Load balancing across multiple processor instances
- Validation: Schema validation and sanity checks
- Deduplication: Content-based hashing using MinHash/LSH
- AI Enrichment:
- Summarization (Google Gemini)
- Sentiment analysis
- Topic classification
- Entity extraction
- Persistence: MongoDB document storage
- Broadcasting: WebSocket and GraphQL subscription updates
- REST API: Traditional request-response endpoints
- WebSocket Server: Real-time push notifications
- GraphQL: Flexible querying and subscriptions
- Metrics Export: Prometheus-compatible metrics
┌─────────────┐
│ Data Source │
└──────┬──────┘
│
▼
┌──────────────────┐
│ Source Manager │◄──────┐
│ (Poller/Watcher) │ │ Retry
└────────┬─────────┘ │
│ │
▼ │
┌────────────────────┐ │
│ Message Broker │─────┘
│ (Redis/Kafka) │
└────────┬───────────┘
│
▼
┌────────────────────┐
│ Worker Pool │
│ (Goroutines) │
└────────┬───────────┘
│
▼
┌────────────────────┐
│ Article Processor │
│ - Validate │
│ - Deduplicate │
│ - AI Process │
│ - Store │
└────────┬───────────┘
│
├─────────────────┐
│ │
▼ ▼
┌─────────────┐ ┌──────────────┐
│ MongoDB │ │ WebSocket │
│ (Persist) │ │ Broadcast │
└─────────────┘ └──────────────┘
// Concurrent processing with fixed worker count
workers := 20
for i := 0; i < workers; i++ {
go worker(articleChan)
}- Prevents cascading failures when external services are down
- Automatic recovery with exponential backoff
- Fallback mechanisms for AI service failures
- WebSocket hub maintains subscriber registry
- Topic-based filtering for targeted updates
- Automatic cleanup of stale connections
- Abstract data access behind interfaces
- Easy switching between MongoDB, PostgreSQL, etc.
- Testable with mock implementations
- Goroutines: Lightweight threads for parallel processing
- Channels: Safe communication between goroutines
- Worker Pools: Limit concurrent operations to prevent resource exhaustion
- Connection Pooling: Reuse database and broker connections
- Redis: Hot data caching (recent articles, popular topics)
- In-Memory: LRU cache for frequently accessed data
- Content Hashing: Fast duplicate detection
- Message Batching: Process multiple articles in single database write
- Network Batching: Combine multiple WebSocket messages
- Bulk Indexing: Batch MongoDB inserts for better throughput
- Memory Limits: GOMEMLIMIT to prevent OOM
- CPU Pinning: GOMAXPROCS for optimal CPU usage
- Graceful Degradation: Reduce features under high load
# Kubernetes HPA
minReplicas: 3
maxReplicas: 10
metrics:
- type: CPU
target: 70%
- type: Memory
target: 80%- Increase worker pool size
- Allocate more memory for caching
- Use faster storage (NVMe SSD)
- Shard by source or topic
- Time-based partitioning for historical data
- Read replicas for query scaling
- Retry logic with exponential backoff
- Dead letter queues for failed messages
- Comprehensive error logging
- Liveness probe: Is the service running?
- Readiness probe: Can it handle traffic?
- Dependency health: Are external services available?
- MongoDB replica sets
- Redis AOF persistence
- Kafka replication factor: 3
// Handle SIGTERM/SIGINT
signal.Notify(quit, syscall.SIGTERM, syscall.SIGINT)
<-quit
// Stop accepting new work
sourceManager.Stop()
consumer.Stop()
// Finish in-flight requests
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
server.Shutdown(ctx)- Rate limiting per IP/API key
- JWT authentication for protected endpoints
- CORS configuration
- Input validation and sanitization
- TLS/SSL for all connections
- Network policies in Kubernetes
- Private subnets for databases
- Kubernetes secrets for credentials
- External secret stores (AWS Secrets Manager, Vault)
- Rotation of API keys
- Request rate, latency, errors (RED method)
- Queue depth, processing time, throughput
- Resource utilization (CPU, memory, network)
logger.Info("Article processed",
zap.String("id", article.ID),
zap.Int64("processing_time_ms", duration),
zap.String("source", article.Source),
)- End-to-end request tracing
- Identify bottlenecks in processing pipeline
- Distributed context propagation
- High error rates
- Processing latency spikes
- Queue backlog growth
- Service downtime
| Component | Technology | Rationale |
|---|---|---|
| Language | Go 1.21+ | Performance, concurrency, simplicity |
| Message Queue | Redis/Kafka | Flexibility, proven reliability |
| Database | MongoDB | Document model fits article structure |
| Cache | Redis | Fast, feature-rich |
| AI | Google Gemini | Advanced NLP capabilities |
| WebSocket | Gorilla | Battle-tested, performant |
| GraphQL | gqlgen | Code generation, type safety |
| Metrics | Prometheus | Industry standard, rich ecosystem |
| Container | Docker | Portability, consistency |
| Orchestration | Kubernetes | Auto-scaling, self-healing |
┌─────────────────────────────────────┐
│ Docker Compose │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ App │ │Redis │ │Mongo │ │
│ └──────┘ └──────┘ └──────┘ │
└─────────────────────────────────────┘
┌──────────────────────────────────────────────────────┐
│ Ingress (NGINX) │
└────────────────────┬─────────────────────────────────┘
│
┌────────────┴────────────┐
│ │
▼ ▼
┌────────────────┐ ┌────────────────┐
│ Stream │ │ Monitoring │
│ Processor │◄─────┤ Stack │
│ (3-10 pods) │ │ (Prometheus) │
└────┬─────┬─────┘ └────────────────┘
│ │
│ └──────────┐
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│ Redis │ │ MongoDB │
│ Cluster │ │ Replica │
└─────────┘ └─────────┘
-
Machine Learning
- Custom NLP models for government content
- Anomaly detection for unusual patterns
- Recommendation engine
-
Advanced Features
- Multi-language support
- Image/video processing
- Audio transcription
-
Performance
- gRPC for internal communication
- Edge caching with CDN
- Read-through caching
-
Reliability
- Multi-region deployment
- Disaster recovery automation
- Chaos engineering
Last Updated: 2025-11-16 Version: 1.0.0 Maintainer: David Nguyen (hoangson091104@gmail.com)