Skip to content

Latest commit

 

History

History
344 lines (288 loc) · 13 KB

File metadata and controls

344 lines (288 loc) · 13 KB

SynthoraAI Realtime Stream Processor - Architecture

Overview

The Realtime Stream Processor is a high-performance, event-driven microservice designed to process breaking news and live data streams in real-time. Built with Go for optimal performance and low latency, it serves as the streaming backbone for the SynthoraAI content curation platform.

Core Architecture

1. Layered Architecture

┌─────────────────────────────────────────────────────────────┐
│                    Presentation Layer                        │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │   REST API   │  │   WebSocket  │  │   GraphQL    │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
└─────────────────────────────────────────────────────────────┘
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                     Business Logic Layer                     │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │  Processor   │  │  AI Client   │  │  Validator   │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
└─────────────────────────────────────────────────────────────┘
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                      Data Access Layer                       │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │   MongoDB    │  │    Redis     │  │    Kafka     │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
└─────────────────────────────────────────────────────────────┘

2. Component Architecture

Data Ingestion Layer

  • RSS Feed Pollers: Periodically fetch RSS feeds from configured sources
  • API Watchers: Monitor REST APIs for new content
  • WebSocket Listeners: Real-time event streams from external sources
  • Rate Limiting: Token bucket algorithm to prevent API abuse

Message Broker Layer

  • Redis Streams: Lightweight, fast message queue for <100K msg/sec
  • Apache Kafka: High-throughput distributed queue for >100K msg/sec
  • Topic Routing: Priority-based routing (urgent, high, medium, low)
  • Consumer Groups: Load balancing across multiple processor instances

Processing Pipeline

  1. Validation: Schema validation and sanity checks
  2. Deduplication: Content-based hashing using MinHash/LSH
  3. AI Enrichment:
    • Summarization (Google Gemini)
    • Sentiment analysis
    • Topic classification
    • Entity extraction
  4. Persistence: MongoDB document storage
  5. Broadcasting: WebSocket and GraphQL subscription updates

Output Layer

  • REST API: Traditional request-response endpoints
  • WebSocket Server: Real-time push notifications
  • GraphQL: Flexible querying and subscriptions
  • Metrics Export: Prometheus-compatible metrics

3. Data Flow

┌─────────────┐
│ Data Source │
└──────┬──────┘
       │
       ▼
┌──────────────────┐
│ Source Manager   │◄──────┐
│ (Poller/Watcher) │       │ Retry
└────────┬─────────┘       │
         │                 │
         ▼                 │
┌────────────────────┐     │
│ Message Broker     │─────┘
│ (Redis/Kafka)      │
└────────┬───────────┘
         │
         ▼
┌────────────────────┐
│ Worker Pool        │
│ (Goroutines)       │
└────────┬───────────┘
         │
         ▼
┌────────────────────┐
│ Article Processor  │
│ - Validate         │
│ - Deduplicate      │
│ - AI Process       │
│ - Store            │
└────────┬───────────┘
         │
         ├─────────────────┐
         │                 │
         ▼                 ▼
┌─────────────┐    ┌──────────────┐
│  MongoDB    │    │  WebSocket   │
│  (Persist)  │    │  Broadcast   │
└─────────────┘    └──────────────┘

Design Patterns

1. Worker Pool Pattern

// Concurrent processing with fixed worker count
workers := 20
for i := 0; i < workers; i++ {
    go worker(articleChan)
}

2. Circuit Breaker Pattern

  • Prevents cascading failures when external services are down
  • Automatic recovery with exponential backoff
  • Fallback mechanisms for AI service failures

3. Publisher-Subscriber Pattern

  • WebSocket hub maintains subscriber registry
  • Topic-based filtering for targeted updates
  • Automatic cleanup of stale connections

4. Repository Pattern

  • Abstract data access behind interfaces
  • Easy switching between MongoDB, PostgreSQL, etc.
  • Testable with mock implementations

Performance Optimizations

1. Concurrency

  • Goroutines: Lightweight threads for parallel processing
  • Channels: Safe communication between goroutines
  • Worker Pools: Limit concurrent operations to prevent resource exhaustion
  • Connection Pooling: Reuse database and broker connections

2. Caching

  • Redis: Hot data caching (recent articles, popular topics)
  • In-Memory: LRU cache for frequently accessed data
  • Content Hashing: Fast duplicate detection

3. Batching

  • Message Batching: Process multiple articles in single database write
  • Network Batching: Combine multiple WebSocket messages
  • Bulk Indexing: Batch MongoDB inserts for better throughput

4. Resource Management

  • Memory Limits: GOMEMLIMIT to prevent OOM
  • CPU Pinning: GOMAXPROCS for optimal CPU usage
  • Graceful Degradation: Reduce features under high load

Scalability

Horizontal Scaling

# Kubernetes HPA
minReplicas: 3
maxReplicas: 10
metrics:
  - type: CPU
    target: 70%
  - type: Memory
    target: 80%

Vertical Scaling

  • Increase worker pool size
  • Allocate more memory for caching
  • Use faster storage (NVMe SSD)

Database Sharding

  • Shard by source or topic
  • Time-based partitioning for historical data
  • Read replicas for query scaling

Reliability

1. Error Handling

  • Retry logic with exponential backoff
  • Dead letter queues for failed messages
  • Comprehensive error logging

2. Health Checks

  • Liveness probe: Is the service running?
  • Readiness probe: Can it handle traffic?
  • Dependency health: Are external services available?

3. Data Durability

  • MongoDB replica sets
  • Redis AOF persistence
  • Kafka replication factor: 3

4. Graceful Shutdown

// Handle SIGTERM/SIGINT
signal.Notify(quit, syscall.SIGTERM, syscall.SIGINT)
<-quit

// Stop accepting new work
sourceManager.Stop()
consumer.Stop()

// Finish in-flight requests
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
server.Shutdown(ctx)

Security

1. API Security

  • Rate limiting per IP/API key
  • JWT authentication for protected endpoints
  • CORS configuration
  • Input validation and sanitization

2. Network Security

  • TLS/SSL for all connections
  • Network policies in Kubernetes
  • Private subnets for databases

3. Secrets Management

  • Kubernetes secrets for credentials
  • External secret stores (AWS Secrets Manager, Vault)
  • Rotation of API keys

Monitoring & Observability

1. Metrics (Prometheus)

  • Request rate, latency, errors (RED method)
  • Queue depth, processing time, throughput
  • Resource utilization (CPU, memory, network)

2. Logging (Structured)

logger.Info("Article processed",
    zap.String("id", article.ID),
    zap.Int64("processing_time_ms", duration),
    zap.String("source", article.Source),
)

3. Tracing (Jaeger)

  • End-to-end request tracing
  • Identify bottlenecks in processing pipeline
  • Distributed context propagation

4. Alerting

  • High error rates
  • Processing latency spikes
  • Queue backlog growth
  • Service downtime

Technology Choices

Component Technology Rationale
Language Go 1.21+ Performance, concurrency, simplicity
Message Queue Redis/Kafka Flexibility, proven reliability
Database MongoDB Document model fits article structure
Cache Redis Fast, feature-rich
AI Google Gemini Advanced NLP capabilities
WebSocket Gorilla Battle-tested, performant
GraphQL gqlgen Code generation, type safety
Metrics Prometheus Industry standard, rich ecosystem
Container Docker Portability, consistency
Orchestration Kubernetes Auto-scaling, self-healing

Deployment Architecture

Development

┌─────────────────────────────────────┐
│         Docker Compose              │
│  ┌──────┐  ┌──────┐  ┌──────┐      │
│  │ App  │  │Redis │  │Mongo │      │
│  └──────┘  └──────┘  └──────┘      │
└─────────────────────────────────────┘

Production (Kubernetes)

┌──────────────────────────────────────────────────────┐
│                  Ingress (NGINX)                      │
└────────────────────┬─────────────────────────────────┘
                     │
        ┌────────────┴────────────┐
        │                         │
        ▼                         ▼
┌────────────────┐      ┌────────────────┐
│  Stream        │      │  Monitoring    │
│  Processor     │◄─────┤  Stack         │
│  (3-10 pods)   │      │  (Prometheus)  │
└────┬─────┬─────┘      └────────────────┘
     │     │
     │     └──────────┐
     │                │
     ▼                ▼
┌─────────┐     ┌─────────┐
│ Redis   │     │ MongoDB │
│ Cluster │     │ Replica │
└─────────┘     └─────────┘

Future Enhancements

  1. Machine Learning

    • Custom NLP models for government content
    • Anomaly detection for unusual patterns
    • Recommendation engine
  2. Advanced Features

    • Multi-language support
    • Image/video processing
    • Audio transcription
  3. Performance

    • gRPC for internal communication
    • Edge caching with CDN
    • Read-through caching
  4. Reliability

    • Multi-region deployment
    • Disaster recovery automation
    • Chaos engineering

References


Last Updated: 2025-11-16 Version: 1.0.0 Maintainer: David Nguyen (hoangson091104@gmail.com)