You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The IoT Pipeline Monitoring System is a microservices-based platform for monitoring industrial pipelines with Smart Bolt sensors and automated valve control. The system simulates sensor readings (temperature/pressure), performs real-time anomaly detection, and enables automated/manual valve control through multiple interfaces.
Multi-sector (North/South) for geographic distribution
Technology Stack
Layer
Technologies
Backend Framework
Python 3.11, CherryPy
Message Bus
Eclipse Mosquitto (MQTT), QoS 2
Time-Series DB
InfluxDB v3 Cloud
Relational DB
Supabase PostgreSQL
Frontend
Flask, Plotly Dash, Bootstrap
External APIs
Telegram Bot API
Authentication
JWT (HS256), bcrypt
Architecture Diagrams
1. High-Level System Topology
graph TB
subgraph "Edge Layer"
RPN[Raspberry Pi North<br/>Port 8086<br/>sector-north]
RPS[Raspberry Pi South<br/>Port 8088<br/>sector-south]
end
subgraph "Message Layer"
MQTT[MQTT Broker<br/>Mosquitto<br/>Port 1883]
MB[Message Broker<br/>Content Router]
end
subgraph "Data Layer"
TSDB[TimeSeries DB<br/>Port 8082]
INFLUX[(InfluxDB v3 Cloud<br/>Dual Buckets)]
end
subgraph "Analytics Layer"
AN[Analytics Service<br/>Port 8083]
end
subgraph "Control Layer"
CC[Control Center<br/>Port 8085<br/>Decision Engine]
end
subgraph "Management Layer"
CAT[Resource Catalog<br/>Port 8081<br/>Service Registry]
AM[Account Manager<br/>Port 8084]
SUPA[(Supabase<br/>PostgreSQL)]
end
subgraph "Presentation Layer"
WD[Web Dashboard<br/>Port 5000]
TB[Telegram Bot<br/>Port 8087]
end
subgraph "External"
USER[Users]
TGAPI[Telegram API]
end
RPN <-->|MQTT| MQTT
RPS <-->|MQTT| MQTT
MQTT <--> MB
MB -->|sensors/*| TSDB
TSDB --> INFLUX
TSDB -->|analytics/data/*| AN
AN -->|alerts/*| MQTT
MQTT -->|valves/*| CC
CC -->|commands/*| MQTT
MB -->|sectors/.../commands| RPN
MB -->|sectors/.../commands| RPS
CAT <-->|REST| RPN
CAT <-->|REST| RPS
CAT <-->|REST| TSDB
CAT <-->|REST| AN
CAT <-->|REST| CC
CAT <-->|REST| AM
CAT <-->|REST| TB
CAT <-->|REST| WD
AM <--> SUPA
WD <-->|REST| TSDB
WD <-->|REST| AN
WD <-->|REST| CC
WD <-->|REST| AM
TB <-->|MQTT| MQTT
TB <-->|REST| AM
TB <--> TGAPI
USER --> WD
USER --> TB
style CAT fill:#e3f2fd,stroke:#1976d2
style MQTT fill:#fff3e0,stroke:#f57c00
style MB fill:#fff3e0,stroke:#f57c00
style CC fill:#c8e6c9,stroke:#388e3c
style AN fill:#f3e5f5,stroke:#7b1fa2
style TSDB fill:#e8f5e9,stroke:#388e3c
Loading
2. Service Dependency Graph
graph LR
subgraph "Layer 0: Foundation"
CAT[Resource Catalog]
MQTT[MQTT Broker]
end
subgraph "Layer 1: Core"
MB[Message Broker]
AM[Account Manager]
end
subgraph "Layer 2: Data"
TSDB[TimeSeries DB]
RPN[Raspberry Pi North]
RPS[Raspberry Pi South]
end
subgraph "Layer 3: Intelligence"
AN[Analytics]
CC[Control Center]
end
subgraph "Layer 4: Presentation"
WD[Web Dashboard]
TB[Telegram Bot]
end
CAT --> MB
CAT --> AM
CAT --> TSDB
CAT --> RPN
CAT --> RPS
CAT --> AN
CAT --> CC
CAT --> WD
CAT --> TB
MQTT --> MB
MQTT --> TSDB
MQTT --> RPN
MQTT --> RPS
MQTT --> AN
MQTT --> CC
MQTT --> TB
MB --> TSDB
TSDB --> AN
AN --> CC
AM --> WD
AM --> TB
TSDB --> WD
AN --> WD
CC --> WD
style CAT fill:#ffcdd2
style MQTT fill:#ffcdd2
Loading
3. Complete Data Flow Sequence
sequenceDiagram
participant RP as Raspberry Pi
participant MQTT as MQTT Broker
participant MB as Message Broker
participant TSDB as TimeSeries DB
participant INFLUX as InfluxDB
participant AN as Analytics
participant CC as Control Center
participant TB as Telegram Bot
participant WD as Web Dashboard
Note over RP,WD: 1. Sensor Data Generation & Routing
RP->>MQTT: PUBLISH sectors/{sector}/pipelines/{id}/measurements
MQTT->>MB: Forward message
MB->>MB: Parse payload
MB->>MQTT: PUBLISH sensors/{pipeline_id}
MB->>MQTT: PUBLISH temperature/{pipeline_id}
MB->>MQTT: PUBLISH pressure/{pipeline_id}
MB->>MQTT: PUBLISH valves/{pipeline_id}
Note over TSDB,INFLUX: 2. Data Storage
MQTT->>TSDB: Receive on sensors/+
TSDB->>TSDB: Buffer write (100 points or 5s)
TSDB->>INFLUX: Batch write to bucket
TSDB->>MQTT: PUBLISH analytics/data/{pipeline_id}
Note over AN,CC: 3. Analytics & Alerting
MQTT->>AN: Receive on analytics/data/+
AN->>AN: Detect anomalies
AN->>AN: Calculate risk score
alt Anomaly Detected
AN->>MQTT: PUBLISH alerts/anomalies/{pipeline_id}
MQTT->>TB: Forward to Telegram Bot
TB->>TB: Send Telegram notification
end
Note over CC,RP: 4. Automated Control
MQTT->>CC: Receive on valves/{pipeline_id}
CC->>CC: Evaluate 12 control rules
alt Rule Triggered
CC->>MQTT: PUBLISH commands/valves/{pipeline_id}
MQTT->>MB: Forward command
MB->>MQTT: PUBLISH sectors/{sector}/.../commands/valves
MQTT->>RP: Receive command
RP->>RP: Update valve state
end
Note over WD,TSDB: 5. User Interface Polling
WD->>TSDB: HTTP GET /temperature
TSDB-->>WD: Temperature data
WD->>AN: HTTP GET /risk
AN-->>WD: Risk assessment
WD->>CC: HTTP GET /status
CC-->>WD: Control status
graph TD
subgraph "Phase 1: Foundation"
MQTT[1. MQTT Broker<br/>mosquitto]
CAT[2. Resource Catalog<br/>Port 8081]
end
subgraph "Phase 2: Core Services"
MB[3. Message Broker]
AM[4. Account Manager<br/>Port 8084]
TSDB[5. TimeSeries DB<br/>Port 8082]
end
subgraph "Phase 3: Edge Devices"
RPN[6. Raspberry Pi North<br/>Port 8086]
RPS[7. Raspberry Pi South<br/>Port 8088]
end
subgraph "Phase 4: Intelligence"
AN[8. Analytics<br/>Port 8083]
CC[9. Control Center<br/>Port 8085]
end
subgraph "Phase 5: Interfaces"
TB[10. Telegram Bot<br/>Port 8087]
WD[11. Web Dashboard<br/>Port 5000]
end
MQTT --> CAT
CAT --> MB
CAT --> AM
CAT --> TSDB
TSDB --> RPN
TSDB --> RPS
RPN --> AN
RPS --> AN
AN --> CC
CC --> TB
CC --> WD
Loading
Service Health Monitoring
Resource Catalog performs health checks every 30 seconds:
Status
Description
unknown
Initial state after registration
healthy
Health endpoint returned HTTP 200
unhealthy
Health endpoint returned non-200
unreachable
Connection failed or timeout (5s)
Data Flow Patterns
1. Sensor Data Fan-Out Pattern
The Message Broker implements a 1-to-N fan-out pattern:
flowchart LR
subgraph Input["1 INPUT MESSAGE"]
A["sectors/sector-north/pipelines/N1/measurements"]
end
subgraph Router["MESSAGE BROKER"]
P["Parse & Route"]
end
subgraph Output["7 OUTPUT MESSAGES"]
B1["sensors/N1"]
B2["temperature/N1"]
B3["temperature/N1/bolt_n1"]
B4["pressure/N1"]
B5["pressure/N1/bolt_n1"]
B6["bolts/N1/bolt_n1"]
B7["valves/N1"]
end
A --> P
P --> B1
P --> B2
P --> B3
P --> B4
P --> B5
P --> B6
P --> B7
Loading
2. Valve Command Flow
flowchart TB
subgraph Sources["Command Sources"]
WD["Web Dashboard<br/>HTTP /manual"]
TB["Telegram Bot<br/>MQTT telegram/commands/+"]
end
subgraph Control["CONTROL CENTER"]
CC["Process Command<br/>Apply Rules<br/>Send Command"]
end
subgraph Router["MESSAGE BROKER"]
MB["Route by Pipeline ID"]
end
subgraph Destinations["Sector-Specific Topics"]
NORTH["sectors/sector-north/.../commands/valves"]
SOUTH["sectors/sector-south/.../commands/valves"]
end
subgraph Edge["Raspberry Pi Simulators"]
RPN["Pi North<br/>Pipelines N1, N2, N3"]
RPS["Pi South<br/>Pipelines S1, S2, S3"]
end
WD -->|HTTP| CC
TB -->|MQTT| CC
CC -->|"commands/valves/{id}"| MB
MB -->|"N* pipelines"| NORTH
MB -->|"S* pipelines"| SOUTH
NORTH --> RPN
SOUTH --> RPS
style CC fill:#c8e6c9
style MB fill:#fff3e0
Loading
3. Alert Distribution Flow
flowchart TB
subgraph Detection["ANALYTICS SERVICE"]
AN["Anomaly Detection<br/>6 Detection Methods"]
end
subgraph AlertTypes["Alert Types"]
A1["Threshold Exceeded"]
A2["Risk Assessment"]
A3["Prediction Warning"]
A4["Health Warning"]
end
subgraph Distribution["MQTT Distribution"]
MQTT["MQTT Broker"]
end
subgraph Consumers["Alert Consumers"]
TB["Telegram Bot<br/>Real-time notifications"]
TSDB["TimeSeries DB<br/>Alert storage"]
WD["Web Dashboard<br/>HTTP polling"]
end
AN --> A1
AN --> A2
AN --> A3
AN --> A4
A1 -->|"alerts/anomalies/{id}"| MQTT
A2 -->|"sectors/.../alerts/risk"| MQTT
A3 -->|"sectors/.../alerts/prediction"| MQTT
A4 -->|"sectors/.../alerts/health"| MQTT
MQTT --> TB
MQTT --> TSDB
TSDB -->|HTTP GET /alerts| WD
Loading
MQTT Topic Architecture
Complete Topic Hierarchy
graph TB
subgraph "Raspberry Pi Publications"
T1["sectors/{sector_id}/pipelines/{pipeline_id}/measurements"]
end
subgraph "Message Broker Outputs"
T2["sensors/{pipeline_id}"]
T3["temperature/{pipeline_id}"]
T4["temperature/{pipeline_id}/{bolt_id}"]
T5["pressure/{pipeline_id}"]
T6["pressure/{pipeline_id}/{bolt_id}"]
T7["bolts/{pipeline_id}/{bolt_id}"]
T8["valves/{pipeline_id}"]
end
subgraph "TimeSeries DB Publications"
T9["analytics/data/{pipeline_id}"]
end
subgraph "Analytics Publications"
T10["alerts/anomalies/{pipeline_id}"]
T11["sectors/{sector}/pipelines/{id}/alerts/{type}"]
end
subgraph "Control Center Publications"
T12["commands/valves/{pipeline_id}"]
end
subgraph "Message Broker Routed Commands"
T13["sectors/{sector}/pipelines/{id}/commands/valves"]
end
subgraph "Telegram Bot"
T14["telegram/commands/{pipeline_id}"]
end
T1 --> T2
T1 --> T3
T1 --> T4
T1 --> T5
T1 --> T6
T1 --> T7
T1 --> T8
T2 --> T9
T9 --> T10
T9 --> T11
T8 --> T12
T14 --> T12
T12 --> T13
Loading
Topic Reference Table
Topic Pattern
Publisher
Subscriber(s)
QoS
Description
sectors/+/pipelines/+/measurements
Raspberry Pi
Message Broker
2
Raw sensor data
sensors/{pipeline_id}
Message Broker
TimeSeries DB
2
Complete sensor reading
temperature/{pipeline_id}
Message Broker
TimeSeries DB, Dashboard
2
Temperature readings
pressure/{pipeline_id}
Message Broker
TimeSeries DB, Dashboard
2
Pressure readings
valves/{pipeline_id}
Message Broker
Control Center, TimeSeries DB
2
Valve status
analytics/data/{pipeline_id}
TimeSeries DB
Analytics
2
Data for analysis
alerts/anomalies/{pipeline_id}
Analytics
Telegram Bot, TimeSeries DB
2
Real-time alerts
commands/valves/{pipeline_id}
Control Center
Message Broker
2
Valve commands
sectors/.../commands/valves
Message Broker
Raspberry Pi
2
Routed valve commands
telegram/commands/{pipeline_id}
Telegram Bot
Control Center
2
Telegram valve commands
status/valves
Control Center
TimeSeries DB
2
Global valve status
Authentication & Authorization
Authentication Flow
sequenceDiagram
participant User
participant WD as Web Dashboard
participant AM as Account Manager
participant SUPA as Supabase
User->>WD: Enter credentials
WD->>AM: POST /login {username, password}
AM->>AM: bcrypt.verify(password, hash)
alt Password Valid
AM->>AM: Generate JWT (HS256, 24h)
AM->>SUPA: Create session record
AM-->>WD: {token, user: {id, role, sectors}}
WD->>WD: Store in dcc.Store('auth-store')
WD-->>User: Redirect to /overview
else Password Invalid
AM-->>WD: 401 Unauthorized
WD-->>User: Show error message
end
Note over User,SUPA: Subsequent API Calls
User->>WD: Navigate to protected page
WD->>AM: GET /validate {Authorization: Bearer token}
AM->>AM: jwt.decode(token)
AM->>SUPA: Check session active
AM-->>WD: {valid: true, user: {...}}
SECTOR_ID=sector-north # or sector-south
SERVICE_PORT=8086 # or 8088
MQTT_CLIENT_ID=mqtt-raspberry-pi-north # or south
MQTT_BROKER=localhost
MQTT_PORT=1883
CATALOG_URL=http://localhost:8081
PUBLISH_INTERVAL=5
CATALOG_SYNC_INTERVAL=30
graph TB
subgraph "Local Machine"
subgraph "System Services"
MQTT[Mosquitto<br/>Port 1883]
end
subgraph "Python Services (venv)"
CAT[Catalog 8081]
TSDB[TimeSeries 8082]
AN[Analytics 8083]
AM[Account 8084]
CC[Control 8085]
RPN[Pi North 8086]
TB[Telegram 8087]
RPS[Pi South 8088]
WD[Dashboard 5000]
end
end
subgraph "Cloud Services"
INFLUX[InfluxDB Cloud]
SUPA[Supabase]
TGAPI[Telegram API]
end
TSDB --> INFLUX
AM --> SUPA
CAT --> SUPA
TB --> TGAPI
Loading
Makefile Commands
# Setup
make setup # Create venv, install dependencies# Individual services
make run-catalog # Start Catalog (8081) - START FIRST
make run-broker # Start Message Broker
make run-timeseries # Start TimeSeries DB (8082)
make run-account # Start Account Manager (8084)
make run-analytics # Start Analytics (8083)
make run-control # Start Control Center (8085)
make run-raspberry-north # Start Raspberry Pi North (8086)
make run-raspberry-south # Start Raspberry Pi South (8088)
make run-telegram # Start Telegram Bot (8087)
make run-dashboard # Start Web Dashboard (5000)# All services (requires tmux)
make run-all # Launch all services
make stop-all # Stop all services
Port Manager Utility
./port_manager.py status # Check all service status
./port_manager.py start --all # Start all services
./port_manager.py stop --all # Stop all services
./port_manager.py kill --port 8081 # Kill process on specific port
# Check if Catalog is running
curl http://localhost:8081/health
# Check port availability
./port_manager.py status
# Check MQTT broker
brew services list | grep mosquitto
# Service health
curl http://localhost:8081/services/health
# Full catalog
curl http://localhost:8081/catalog
# Pipeline data
curl "http://localhost:8082/temperature?pipeline_id=N1&limit=10"# Analytics status
curl http://localhost:8083/status
# Control center status
curl http://localhost:8085/status
# Recent alerts
curl "http://localhost:8083/alerts?limit=10"
Multiple Interfaces: Web dashboard + Telegram bot for monitoring and control
The system follows microservices best practices including loose coupling, single responsibility, independent deployment, and centralized configuration management.