Skip to content

Latest commit

 

History

History
1312 lines (1080 loc) · 36.2 KB

File metadata and controls

1312 lines (1080 loc) · 36.2 KB

IoT Pipeline Monitoring System - Complete Architecture

Author: Mahdi Rajaee Version: 2.0 Last Updated: 2025-12-13


Table of Contents

  1. System Overview
  2. Architecture Diagrams
  3. Service Catalog
  4. Data Flow Patterns
  5. MQTT Topic Architecture
  6. Authentication & Authorization
  7. Control Loop Architecture
  8. Database Architecture
  9. Configuration Management
  10. Deployment Architecture
  11. Security Architecture
  12. Troubleshooting Guide

System Overview

The IoT Pipeline Monitoring System is a microservices-based platform for monitoring industrial pipelines with Smart Bolt sensors and automated valve control. The system simulates sensor readings (temperature/pressure), performs real-time anomaly detection, and enables automated/manual valve control through multiple interfaces.

Key Characteristics

Aspect Description
Architecture 10 independent microservices
Communication MQTT (message bus) + REST APIs
Service Discovery Centralized catalog with health monitoring
Storage InfluxDB v3 Cloud (time-series) + Supabase PostgreSQL (users)
Frontend Web Dashboard (Dash) + Telegram Bot
Deployment Multi-sector (North/South) for geographic distribution

Technology Stack

Layer Technologies
Backend Framework Python 3.11, CherryPy
Message Bus Eclipse Mosquitto (MQTT), QoS 2
Time-Series DB InfluxDB v3 Cloud
Relational DB Supabase PostgreSQL
Frontend Flask, Plotly Dash, Bootstrap
External APIs Telegram Bot API
Authentication JWT (HS256), bcrypt

Architecture Diagrams

1. High-Level System Topology

graph TB
    subgraph "Edge Layer"
        RPN[Raspberry Pi North<br/>Port 8086<br/>sector-north]
        RPS[Raspberry Pi South<br/>Port 8088<br/>sector-south]
    end

    subgraph "Message Layer"
        MQTT[MQTT Broker<br/>Mosquitto<br/>Port 1883]
        MB[Message Broker<br/>Content Router]
    end

    subgraph "Data Layer"
        TSDB[TimeSeries DB<br/>Port 8082]
        INFLUX[(InfluxDB v3 Cloud<br/>Dual Buckets)]
    end

    subgraph "Analytics Layer"
        AN[Analytics Service<br/>Port 8083]
    end

    subgraph "Control Layer"
        CC[Control Center<br/>Port 8085<br/>Decision Engine]
    end

    subgraph "Management Layer"
        CAT[Resource Catalog<br/>Port 8081<br/>Service Registry]
        AM[Account Manager<br/>Port 8084]
        SUPA[(Supabase<br/>PostgreSQL)]
    end

    subgraph "Presentation Layer"
        WD[Web Dashboard<br/>Port 5000]
        TB[Telegram Bot<br/>Port 8087]
    end

    subgraph "External"
        USER[Users]
        TGAPI[Telegram API]
    end

    RPN <-->|MQTT| MQTT
    RPS <-->|MQTT| MQTT
    MQTT <--> MB
    MB -->|sensors/*| TSDB
    TSDB --> INFLUX
    TSDB -->|analytics/data/*| AN
    AN -->|alerts/*| MQTT
    MQTT -->|valves/*| CC
    CC -->|commands/*| MQTT
    MB -->|sectors/.../commands| RPN
    MB -->|sectors/.../commands| RPS

    CAT <-->|REST| RPN
    CAT <-->|REST| RPS
    CAT <-->|REST| TSDB
    CAT <-->|REST| AN
    CAT <-->|REST| CC
    CAT <-->|REST| AM
    CAT <-->|REST| TB
    CAT <-->|REST| WD

    AM <--> SUPA

    WD <-->|REST| TSDB
    WD <-->|REST| AN
    WD <-->|REST| CC
    WD <-->|REST| AM

    TB <-->|MQTT| MQTT
    TB <-->|REST| AM
    TB <--> TGAPI

    USER --> WD
    USER --> TB

    style CAT fill:#e3f2fd,stroke:#1976d2
    style MQTT fill:#fff3e0,stroke:#f57c00
    style MB fill:#fff3e0,stroke:#f57c00
    style CC fill:#c8e6c9,stroke:#388e3c
    style AN fill:#f3e5f5,stroke:#7b1fa2
    style TSDB fill:#e8f5e9,stroke:#388e3c
Loading

2. Service Dependency Graph

graph LR
    subgraph "Layer 0: Foundation"
        CAT[Resource Catalog]
        MQTT[MQTT Broker]
    end

    subgraph "Layer 1: Core"
        MB[Message Broker]
        AM[Account Manager]
    end

    subgraph "Layer 2: Data"
        TSDB[TimeSeries DB]
        RPN[Raspberry Pi North]
        RPS[Raspberry Pi South]
    end

    subgraph "Layer 3: Intelligence"
        AN[Analytics]
        CC[Control Center]
    end

    subgraph "Layer 4: Presentation"
        WD[Web Dashboard]
        TB[Telegram Bot]
    end

    CAT --> MB
    CAT --> AM
    CAT --> TSDB
    CAT --> RPN
    CAT --> RPS
    CAT --> AN
    CAT --> CC
    CAT --> WD
    CAT --> TB

    MQTT --> MB
    MQTT --> TSDB
    MQTT --> RPN
    MQTT --> RPS
    MQTT --> AN
    MQTT --> CC
    MQTT --> TB

    MB --> TSDB
    TSDB --> AN
    AN --> CC

    AM --> WD
    AM --> TB

    TSDB --> WD
    AN --> WD
    CC --> WD

    style CAT fill:#ffcdd2
    style MQTT fill:#ffcdd2
Loading

3. Complete Data Flow Sequence

sequenceDiagram
    participant RP as Raspberry Pi
    participant MQTT as MQTT Broker
    participant MB as Message Broker
    participant TSDB as TimeSeries DB
    participant INFLUX as InfluxDB
    participant AN as Analytics
    participant CC as Control Center
    participant TB as Telegram Bot
    participant WD as Web Dashboard

    Note over RP,WD: 1. Sensor Data Generation & Routing
    RP->>MQTT: PUBLISH sectors/{sector}/pipelines/{id}/measurements
    MQTT->>MB: Forward message
    MB->>MB: Parse payload
    MB->>MQTT: PUBLISH sensors/{pipeline_id}
    MB->>MQTT: PUBLISH temperature/{pipeline_id}
    MB->>MQTT: PUBLISH pressure/{pipeline_id}
    MB->>MQTT: PUBLISH valves/{pipeline_id}

    Note over TSDB,INFLUX: 2. Data Storage
    MQTT->>TSDB: Receive on sensors/+
    TSDB->>TSDB: Buffer write (100 points or 5s)
    TSDB->>INFLUX: Batch write to bucket
    TSDB->>MQTT: PUBLISH analytics/data/{pipeline_id}

    Note over AN,CC: 3. Analytics & Alerting
    MQTT->>AN: Receive on analytics/data/+
    AN->>AN: Detect anomalies
    AN->>AN: Calculate risk score

    alt Anomaly Detected
        AN->>MQTT: PUBLISH alerts/anomalies/{pipeline_id}
        MQTT->>TB: Forward to Telegram Bot
        TB->>TB: Send Telegram notification
    end

    Note over CC,RP: 4. Automated Control
    MQTT->>CC: Receive on valves/{pipeline_id}
    CC->>CC: Evaluate 12 control rules

    alt Rule Triggered
        CC->>MQTT: PUBLISH commands/valves/{pipeline_id}
        MQTT->>MB: Forward command
        MB->>MQTT: PUBLISH sectors/{sector}/.../commands/valves
        MQTT->>RP: Receive command
        RP->>RP: Update valve state
    end

    Note over WD,TSDB: 5. User Interface Polling
    WD->>TSDB: HTTP GET /temperature
    TSDB-->>WD: Temperature data
    WD->>AN: HTTP GET /risk
    AN-->>WD: Risk assessment
    WD->>CC: HTTP GET /status
    CC-->>WD: Control status
Loading

4. Sector-Based Multi-Instance Architecture

graph TB
    subgraph "Sector North"
        RPN[Raspberry Pi North<br/>Port 8086]
        PN1[Pipeline N1]
        PN2[Pipeline N2]
        PN3[Pipeline N3]
        BN[Bolts: bolt_n1, bolt_n2, bolt_n3]
        VN[Valves: valve_n1, valve_n2, valve_n3]
    end

    subgraph "Sector South"
        RPS[Raspberry Pi South<br/>Port 8088]
        PS1[Pipeline S1]
        PS2[Pipeline S2]
        PS3[Pipeline S3]
        BS[Bolts: bolt_s1, bolt_s2, bolt_s3]
        VS[Valves: valve_s1, valve_s2, valve_s3]
    end

    subgraph "Storage"
        BUCKET_N[(InfluxDB<br/>smartboltbucket-north)]
        BUCKET_S[(InfluxDB<br/>smartboltbucket-south)]
    end

    RPN --> PN1
    RPN --> PN2
    RPN --> PN3
    PN1 --> BN
    PN1 --> VN

    RPS --> PS1
    RPS --> PS2
    RPS --> PS3
    PS1 --> BS
    PS1 --> VS

    PN1 -.->|sector-north data| BUCKET_N
    PS1 -.->|sector-south data| BUCKET_S

    style RPN fill:#e3f2fd
    style RPS fill:#ffebee
    style BUCKET_N fill:#e3f2fd
    style BUCKET_S fill:#ffebee
Loading

Service Catalog

Complete Service Registry

Service Port Framework Database MQTT Description
Resource Catalog 8081 CherryPy catalog.json, Supabase - Central service registry & configuration hub
Message Broker - Python - Publisher/Subscriber Pure MQTT content-based router
TimeSeries DB 8082 CherryPy InfluxDB v3 Cloud Subscriber/Publisher MQTT to InfluxDB bridge
Analytics 8083 CherryPy In-memory cache Subscriber/Publisher Anomaly detection & risk assessment
Account Manager 8084 CherryPy Supabase PostgreSQL - Authentication & authorization
Control Center 8085 CherryPy In-memory Subscriber/Publisher Automated decision engine
Raspberry Pi North 8086 CherryPy JSON files Publisher/Subscriber Sensor simulator (sector-north)
Telegram Bot 8087 CherryPy + python-telegram-bot - Subscriber/Publisher Mobile notifications & control
Raspberry Pi South 8088 CherryPy JSON files Publisher/Subscriber Sensor simulator (sector-south)
Web Dashboard 5000 Flask + Dash - - Browser-based monitoring UI

Service Startup Order

graph TD
    subgraph "Phase 1: Foundation"
        MQTT[1. MQTT Broker<br/>mosquitto]
        CAT[2. Resource Catalog<br/>Port 8081]
    end

    subgraph "Phase 2: Core Services"
        MB[3. Message Broker]
        AM[4. Account Manager<br/>Port 8084]
        TSDB[5. TimeSeries DB<br/>Port 8082]
    end

    subgraph "Phase 3: Edge Devices"
        RPN[6. Raspberry Pi North<br/>Port 8086]
        RPS[7. Raspberry Pi South<br/>Port 8088]
    end

    subgraph "Phase 4: Intelligence"
        AN[8. Analytics<br/>Port 8083]
        CC[9. Control Center<br/>Port 8085]
    end

    subgraph "Phase 5: Interfaces"
        TB[10. Telegram Bot<br/>Port 8087]
        WD[11. Web Dashboard<br/>Port 5000]
    end

    MQTT --> CAT
    CAT --> MB
    CAT --> AM
    CAT --> TSDB
    TSDB --> RPN
    TSDB --> RPS
    RPN --> AN
    RPS --> AN
    AN --> CC
    CC --> TB
    CC --> WD
Loading

Service Health Monitoring

Resource Catalog performs health checks every 30 seconds:

Status Description
unknown Initial state after registration
healthy Health endpoint returned HTTP 200
unhealthy Health endpoint returned non-200
unreachable Connection failed or timeout (5s)

Data Flow Patterns

1. Sensor Data Fan-Out Pattern

The Message Broker implements a 1-to-N fan-out pattern:

flowchart LR
    subgraph Input["1 INPUT MESSAGE"]
        A["sectors/sector-north/pipelines/N1/measurements"]
    end

    subgraph Router["MESSAGE BROKER"]
        P["Parse & Route"]
    end

    subgraph Output["7 OUTPUT MESSAGES"]
        B1["sensors/N1"]
        B2["temperature/N1"]
        B3["temperature/N1/bolt_n1"]
        B4["pressure/N1"]
        B5["pressure/N1/bolt_n1"]
        B6["bolts/N1/bolt_n1"]
        B7["valves/N1"]
    end

    A --> P
    P --> B1
    P --> B2
    P --> B3
    P --> B4
    P --> B5
    P --> B6
    P --> B7
Loading

2. Valve Command Flow

flowchart TB
    subgraph Sources["Command Sources"]
        WD["Web Dashboard<br/>HTTP /manual"]
        TB["Telegram Bot<br/>MQTT telegram/commands/+"]
    end

    subgraph Control["CONTROL CENTER"]
        CC["Process Command<br/>Apply Rules<br/>Send Command"]
    end

    subgraph Router["MESSAGE BROKER"]
        MB["Route by Pipeline ID"]
    end

    subgraph Destinations["Sector-Specific Topics"]
        NORTH["sectors/sector-north/.../commands/valves"]
        SOUTH["sectors/sector-south/.../commands/valves"]
    end

    subgraph Edge["Raspberry Pi Simulators"]
        RPN["Pi North<br/>Pipelines N1, N2, N3"]
        RPS["Pi South<br/>Pipelines S1, S2, S3"]
    end

    WD -->|HTTP| CC
    TB -->|MQTT| CC
    CC -->|"commands/valves/{id}"| MB
    MB -->|"N* pipelines"| NORTH
    MB -->|"S* pipelines"| SOUTH
    NORTH --> RPN
    SOUTH --> RPS

    style CC fill:#c8e6c9
    style MB fill:#fff3e0
Loading

3. Alert Distribution Flow

flowchart TB
    subgraph Detection["ANALYTICS SERVICE"]
        AN["Anomaly Detection<br/>6 Detection Methods"]
    end

    subgraph AlertTypes["Alert Types"]
        A1["Threshold Exceeded"]
        A2["Risk Assessment"]
        A3["Prediction Warning"]
        A4["Health Warning"]
    end

    subgraph Distribution["MQTT Distribution"]
        MQTT["MQTT Broker"]
    end

    subgraph Consumers["Alert Consumers"]
        TB["Telegram Bot<br/>Real-time notifications"]
        TSDB["TimeSeries DB<br/>Alert storage"]
        WD["Web Dashboard<br/>HTTP polling"]
    end

    AN --> A1
    AN --> A2
    AN --> A3
    AN --> A4

    A1 -->|"alerts/anomalies/{id}"| MQTT
    A2 -->|"sectors/.../alerts/risk"| MQTT
    A3 -->|"sectors/.../alerts/prediction"| MQTT
    A4 -->|"sectors/.../alerts/health"| MQTT

    MQTT --> TB
    MQTT --> TSDB
    TSDB -->|HTTP GET /alerts| WD
Loading

MQTT Topic Architecture

Complete Topic Hierarchy

graph TB
    subgraph "Raspberry Pi Publications"
        T1["sectors/{sector_id}/pipelines/{pipeline_id}/measurements"]
    end

    subgraph "Message Broker Outputs"
        T2["sensors/{pipeline_id}"]
        T3["temperature/{pipeline_id}"]
        T4["temperature/{pipeline_id}/{bolt_id}"]
        T5["pressure/{pipeline_id}"]
        T6["pressure/{pipeline_id}/{bolt_id}"]
        T7["bolts/{pipeline_id}/{bolt_id}"]
        T8["valves/{pipeline_id}"]
    end

    subgraph "TimeSeries DB Publications"
        T9["analytics/data/{pipeline_id}"]
    end

    subgraph "Analytics Publications"
        T10["alerts/anomalies/{pipeline_id}"]
        T11["sectors/{sector}/pipelines/{id}/alerts/{type}"]
    end

    subgraph "Control Center Publications"
        T12["commands/valves/{pipeline_id}"]
    end

    subgraph "Message Broker Routed Commands"
        T13["sectors/{sector}/pipelines/{id}/commands/valves"]
    end

    subgraph "Telegram Bot"
        T14["telegram/commands/{pipeline_id}"]
    end

    T1 --> T2
    T1 --> T3
    T1 --> T4
    T1 --> T5
    T1 --> T6
    T1 --> T7
    T1 --> T8

    T2 --> T9
    T9 --> T10
    T9 --> T11
    T8 --> T12
    T14 --> T12
    T12 --> T13
Loading

Topic Reference Table

Topic Pattern Publisher Subscriber(s) QoS Description
sectors/+/pipelines/+/measurements Raspberry Pi Message Broker 2 Raw sensor data
sensors/{pipeline_id} Message Broker TimeSeries DB 2 Complete sensor reading
temperature/{pipeline_id} Message Broker TimeSeries DB, Dashboard 2 Temperature readings
pressure/{pipeline_id} Message Broker TimeSeries DB, Dashboard 2 Pressure readings
valves/{pipeline_id} Message Broker Control Center, TimeSeries DB 2 Valve status
analytics/data/{pipeline_id} TimeSeries DB Analytics 2 Data for analysis
alerts/anomalies/{pipeline_id} Analytics Telegram Bot, TimeSeries DB 2 Real-time alerts
commands/valves/{pipeline_id} Control Center Message Broker 2 Valve commands
sectors/.../commands/valves Message Broker Raspberry Pi 2 Routed valve commands
telegram/commands/{pipeline_id} Telegram Bot Control Center 2 Telegram valve commands
status/valves Control Center TimeSeries DB 2 Global valve status

Authentication & Authorization

Authentication Flow

sequenceDiagram
    participant User
    participant WD as Web Dashboard
    participant AM as Account Manager
    participant SUPA as Supabase

    User->>WD: Enter credentials
    WD->>AM: POST /login {username, password}
    AM->>AM: bcrypt.verify(password, hash)

    alt Password Valid
        AM->>AM: Generate JWT (HS256, 24h)
        AM->>SUPA: Create session record
        AM-->>WD: {token, user: {id, role, sectors}}
        WD->>WD: Store in dcc.Store('auth-store')
        WD-->>User: Redirect to /overview
    else Password Invalid
        AM-->>WD: 401 Unauthorized
        WD-->>User: Show error message
    end

    Note over User,SUPA: Subsequent API Calls
    User->>WD: Navigate to protected page
    WD->>AM: GET /validate {Authorization: Bearer token}
    AM->>AM: jwt.decode(token)
    AM->>SUPA: Check session active
    AM-->>WD: {valid: true, user: {...}}
Loading

Role-Based Access Control

graph TB
    subgraph "Role Hierarchy"
        ADMIN["Admin (Level 2)"]
        OPERATOR["Operator (Level 1)"]
        VIEWER["Viewer (Level 0)"]
    end

    subgraph "Permissions"
        P_VIEW["view"]
        P_CONTROL["control"]
        P_EMERGENCY["emergency"]
        P_CONFIGURE["configure"]
        P_USERS["manage_users"]
        P_PIPELINES["manage_pipelines"]
    end

    ADMIN --> P_VIEW
    ADMIN --> P_CONTROL
    ADMIN --> P_EMERGENCY
    ADMIN --> P_CONFIGURE
    ADMIN --> P_USERS
    ADMIN --> P_PIPELINES

    OPERATOR --> P_VIEW
    OPERATOR --> P_CONTROL
    OPERATOR --> P_EMERGENCY

    VIEWER --> P_VIEW

    style ADMIN fill:#ffcdd2
    style OPERATOR fill:#fff9c4
    style VIEWER fill:#c8e6c9
Loading

Page Access Matrix

Page Route Viewer Operator Admin
Landing / Public Public Public
Login /login Public Public Public
Overview /overview Yes Yes Yes
Pipelines /pipelines Yes Yes Yes
Alerts /alerts Yes Yes Yes
Analytics /analytics Yes Yes Yes
Control /control No Yes Yes
Users /users No No Yes
Pipeline Management /pipeline-management No No Yes

Sector-Based Data Filtering

Users can only see data from their assigned sectors:

if role == 'admin':
    sectors = ['sector-north', 'sector-south']  # All sectors
else:
    sectors = user.sectors  # Only assigned sectors

Control Loop Architecture

Decision Engine Flow

flowchart TB
    subgraph Input["Analytics Data"]
        DATA["Pipeline Analytics<br/>Temperature, Pressure<br/>Risk Score, Health Score"]
    end

    subgraph Rules["12 Control Rules (Priority Order)"]
        R1["100: critical_risk → EMERGENCY_SHUTDOWN"]
        R2["95: high_pressure_relief → OPEN_VALVE"]
        R3["90: high_temperature_hazard → CLOSE_VALVE"]
        R4["88: leak_detected → CLOSE_VALVE"]
        R5["85: sensor_failure → CLOSE_VALVE"]
        R6["80: prediction_threshold_breach → ALERT"]
        R7["75: multiple_anomalies → ALERT"]
        R8["70: combined_high_anomaly → CLOSE_VALVE"]
        R9["60: rapid_change_detected → ALERT"]
        R10["50: low_health_score → CLOSE_VALVE"]
        R11["40: moderate_risk → ALERT"]
        R12["30: system_recovery → OPEN_VALVE"]
    end

    subgraph Actions["Action Types"]
        A1["NO_ACTION"]
        A2["OPEN_VALVE"]
        A3["CLOSE_VALVE"]
        A4["ALERT_OPERATOR"]
        A5["EMERGENCY_SHUTDOWN"]
    end

    subgraph Execution["Command Execution"]
        CMD["ValveCommander<br/>MQTT QoS 2<br/>Emergency: QoS 2<br/>Normal: QoS 1"]
    end

    DATA --> R1
    R1 --> R2
    R2 --> R3
    R3 --> R4
    R4 --> R5
    R5 --> R6
    R6 --> R7
    R7 --> R8
    R8 --> R9
    R9 --> R10
    R10 --> R11
    R11 --> R12

    R1 --> A5
    R2 --> A2
    R3 --> A3
    R12 --> A2

    A2 --> CMD
    A3 --> CMD
    A5 --> CMD

    style R1 fill:#ffcdd2
    style R2 fill:#ffe0b2
    style R3 fill:#fff9c4
    style A5 fill:#ffcdd2
Loading

Control Rules Reference

Rule Name Priority Condition Action
critical_risk 100 risk_score >= 90 EMERGENCY_SHUTDOWN
high_pressure_relief 95 pressure > 115 PSI OPEN_VALVE
high_temperature_hazard 90 temperature > 48°C CLOSE_VALVE
leak_detected 88 leak_probability > 0.8 CLOSE_VALVE
sensor_failure 85 sensor_status == "failed" CLOSE_VALVE
prediction_threshold_breach 80 predicted_temp > 45°C ALERT_OPERATOR
multiple_anomalies 75 anomaly_count >= 3 ALERT_OPERATOR
combined_high_anomaly 70 temp > 42°C AND pressure > 105 PSI CLOSE_VALVE
rapid_change_detected 60 temp_delta > 10°C/min ALERT_OPERATOR
low_health_score 50 health_score < 30 CLOSE_VALVE
moderate_risk 40 50 <= risk_score < 70 ALERT_OPERATOR
system_recovery 30 all_normal AND health > 80 OPEN_VALVE

Valve State Machine

stateDiagram-v2
    [*] --> closed : Initial State

    closed --> opening : OPEN command
    opening --> open : transition_complete (3s)
    opening --> error : failure (2%)

    open --> closing : CLOSE command
    closing --> closed : transition_complete (3s)
    closing --> error : failure (2%)

    error --> closing : CLOSE command
    error --> opening : OPEN command

    note right of opening
        Position: 0% → 100%
        Duration: 3 seconds
    end note

    note right of closing
        Position: 100% → 0%
        Duration: 3 seconds
    end note
Loading

Database Architecture

InfluxDB v3 Cloud (Time-Series)

graph TB
    subgraph "InfluxDB v3 Cloud (eu-central-1)"
        subgraph "Bucket: smartboltbucket-north"
            M1N["Measurement: temperature<br/>Tags: pipeline_id, bolt_id, sector_id<br/>Fields: value (float)"]
            M2N["Measurement: pressure<br/>Tags: pipeline_id, bolt_id, sector_id<br/>Fields: value (float)"]
            M3N["Measurement: valve_status<br/>Tags: pipeline_id, valve_id, sector_id<br/>Fields: state (string)"]
            M4N["Measurement: anomalies<br/>Tags: pipeline_id, bolt_id, severity<br/>Fields: description, temperature, pressure"]
        end

        subgraph "Bucket: smartboltbucket-south"
            M1S["Measurement: temperature"]
            M2S["Measurement: pressure"]
            M3S["Measurement: valve_status"]
            M4S["Measurement: anomalies"]
        end
    end

    style M1N fill:#e3f2fd
    style M1S fill:#ffebee
Loading

Supabase PostgreSQL (Relational)

erDiagram
    iot_users {
        int id PK
        string username UK
        string password_hash
        string email UK
        string role
        boolean is_active
        int telegram_chat_id
        timestamp created_at
        timestamp updated_at
    }

    iot_sessions {
        int id PK
        int user_id FK
        string token UK
        timestamp expires_at
        timestamp created_at
        boolean is_active
    }

    iot_user_pipelines {
        int id PK
        int user_id FK
        string sector_id
        string pipeline_id
        timestamp assigned_at
    }

    iot_pipelines {
        string id PK
        string sector_id
        string location
        string description
        string status
        timestamp created_at
    }

    iot_bolts {
        string id PK
        string pipeline_id FK
        string status
    }

    iot_valves {
        string id PK
        string pipeline_id FK
        string status
    }

    iot_users ||--o{ iot_sessions : "has"
    iot_users ||--o{ iot_user_pipelines : "assigned"
    iot_pipelines ||--o{ iot_bolts : "contains"
    iot_pipelines ||--o{ iot_valves : "contains"
Loading

Write Buffering Strategy

Parameter Value Description
Buffer Size 100 points Flush when buffer reaches capacity
Buffer Timeout 5 seconds Flush after timeout
Batch Size 100 InfluxDB client batch size
Flush Interval 5000ms InfluxDB client flush interval

Configuration Management

Environment Variables by Service

Resource Catalog (8081)

CHERRYPY_HOST=127.0.0.1
CHERRYPY_PORT=8081
SUPABASE_URL=https://xxx.supabase.co
SUPABASE_KEY=xxx
HEALTH_CHECK_INTERVAL=30

TimeSeries DB (8082)

CHERRYPY_PORT=8082
INFLUXDB_URL=https://eu-central-1-1.aws.cloud2.influxdata.com
INFLUXDB_TOKEN=xxx
INFLUXDB_ORG=xxx
INFLUXDB_BUCKET_NORTH=smartboltbucket-north
INFLUXDB_BUCKET_SOUTH=smartboltbucket-south
MQTT_BROKER=localhost
MQTT_PORT=1883

Analytics (8083)

CHERRYPY_PORT=8083
CATALOG_URL=http://localhost:8081
TIMESERIES_DB_URL=http://localhost:8082
MQTT_BROKER=localhost
MQTT_PORT=1883
ANALYSIS_CACHE_TTL=300
ALERT_THRESHOLD_TEMP=45.0
CRITICAL_THRESHOLD_TEMP=60.0

Account Manager (8084)

CHERRYPY_PORT=8084
SUPABASE_URL=https://xxx.supabase.co
SUPABASE_KEY=xxx
JWT_SECRET_KEY=xxx
JWT_ALGORITHM=HS256
JWT_EXPIRATION_HOURS=24
SESSION_CLEANUP_INTERVAL=3600

Control Center (8085)

CHERRYPY_PORT=8085
CATALOG_URL=http://localhost:8081
ANALYTICS_URL=http://localhost:8083
MQTT_BROKER=localhost
MQTT_PORT=1883
MONITORING_INTERVAL=30

Raspberry Pi North (8086) / South (8088)

SECTOR_ID=sector-north  # or sector-south
SERVICE_PORT=8086       # or 8088
MQTT_CLIENT_ID=mqtt-raspberry-pi-north  # or south
MQTT_BROKER=localhost
MQTT_PORT=1883
CATALOG_URL=http://localhost:8081
PUBLISH_INTERVAL=5
CATALOG_SYNC_INTERVAL=30

Telegram Bot (8087)

CHERRYPY_PORT=8087
TELEGRAM_TOKEN=xxx
MQTT_BROKER=localhost
MQTT_PORT=1883
ACCOUNT_MANAGER_URL=http://localhost:8084
ALERT_COOLDOWN=30

Web Dashboard (5000)

FLASK_PORT=5000
CATALOG_URL=http://localhost:8081
TIMESERIES_DB_URL=http://localhost:8082
ANALYTICS_URL=http://localhost:8083
ACCOUNT_MANAGER_URL=http://localhost:8084
CONTROL_CENTER_URL=http://localhost:8085
AUTO_REFRESH_INTERVAL=5000

Threshold Configuration (Centralized)

{
  "thresholds": {
    "temperature": {
      "min_normal": 20.0,
      "max_normal": 40.0,
      "alert": 42.0,
      "critical": 48.0
    },
    "pressure": {
      "min_normal": 90.0,
      "max_normal": 110.0,
      "alert": 105.0,
      "critical": 115.0
    }
  }
}

Deployment Architecture

Local Development

graph TB
    subgraph "Local Machine"
        subgraph "System Services"
            MQTT[Mosquitto<br/>Port 1883]
        end

        subgraph "Python Services (venv)"
            CAT[Catalog 8081]
            TSDB[TimeSeries 8082]
            AN[Analytics 8083]
            AM[Account 8084]
            CC[Control 8085]
            RPN[Pi North 8086]
            TB[Telegram 8087]
            RPS[Pi South 8088]
            WD[Dashboard 5000]
        end
    end

    subgraph "Cloud Services"
        INFLUX[InfluxDB Cloud]
        SUPA[Supabase]
        TGAPI[Telegram API]
    end

    TSDB --> INFLUX
    AM --> SUPA
    CAT --> SUPA
    TB --> TGAPI
Loading

Makefile Commands

# Setup
make setup              # Create venv, install dependencies

# Individual services
make run-catalog        # Start Catalog (8081) - START FIRST
make run-broker         # Start Message Broker
make run-timeseries     # Start TimeSeries DB (8082)
make run-account        # Start Account Manager (8084)
make run-analytics      # Start Analytics (8083)
make run-control        # Start Control Center (8085)
make run-raspberry-north # Start Raspberry Pi North (8086)
make run-raspberry-south # Start Raspberry Pi South (8088)
make run-telegram       # Start Telegram Bot (8087)
make run-dashboard      # Start Web Dashboard (5000)

# All services (requires tmux)
make run-all            # Launch all services
make stop-all           # Stop all services

Port Manager Utility

./port_manager.py status           # Check all service status
./port_manager.py start --all      # Start all services
./port_manager.py stop --all       # Stop all services
./port_manager.py kill --port 8081 # Kill process on specific port

Security Architecture

Security Layers

graph TB
    subgraph "Authentication Layer"
        JWT[JWT Tokens<br/>HS256, 24h expiry]
        BCRYPT[bcrypt Password Hashing<br/>12 rounds]
        SESSION[Session Management<br/>Hourly cleanup]
    end

    subgraph "Authorization Layer"
        RBAC[Role-Based Access<br/>admin/operator/viewer]
        SECTOR[Sector-Based Filtering]
        PERMISSION[Permission Matrix<br/>10 action types]
    end

    subgraph "Communication Layer"
        HTTPS[HTTPS<br/>Production only]
        MQTT_QOS[MQTT QoS 2<br/>Exactly-once delivery]
    end

    subgraph "Data Layer"
        HASH[Password Hashing]
        TOKEN_STORE[Token Storage<br/>Supabase]
    end

    JWT --> RBAC
    BCRYPT --> JWT
    SESSION --> JWT
    RBAC --> SECTOR
    SECTOR --> PERMISSION
Loading

Security Considerations

Area Current State Production Recommendation
MQTT Authentication None Add TLS + username/password
MQTT Encryption None Enable TLS
API Authentication JWT Add rate limiting
Password Storage bcrypt (12 rounds) Sufficient
Default Admin admin/ict2022 MUST CHANGE
CORS Enabled (all origins) Restrict to specific origins
Session Expiry 24 hours Consider shorter for sensitive ops

Troubleshooting Guide

Common Issues

Service Won't Start

# Check if Catalog is running
curl http://localhost:8081/health

# Check port availability
./port_manager.py status

# Check MQTT broker
brew services list | grep mosquitto

MQTT Messages Not Flowing

# Monitor all topics
mosquitto_sub -h localhost -p 1883 -t '#' -v

# Monitor specific topics
mosquitto_sub -h localhost -p 1883 -t 'sensors/#' -v
mosquitto_sub -h localhost -p 1883 -t 'alerts/#' -v
mosquitto_sub -h localhost -p 1883 -t 'commands/#' -v

No Data in InfluxDB

  1. Verify TimeSeries DB Connector is running
  2. Check MQTT subscription active
  3. Verify InfluxDB credentials in .env
  4. Check Raspberry Pi is publishing data

Authentication Failing

  1. Check Account Manager running on port 8084
  2. Verify JWT token not expired (24h default)
  3. Check iot_users table has user
  4. Verify password hash format

Debug Commands

# Service health
curl http://localhost:8081/services/health

# Full catalog
curl http://localhost:8081/catalog

# Pipeline data
curl "http://localhost:8082/temperature?pipeline_id=N1&limit=10"

# Analytics status
curl http://localhost:8083/status

# Control center status
curl http://localhost:8085/status

# Recent alerts
curl "http://localhost:8083/alerts?limit=10"

Log Locations

Service Log Method
All Python services stdout/stderr (console)
Mosquitto /opt/homebrew/var/log/mosquitto/mosquitto.log
tmux sessions tmux attach -t iot-{service}

File Structure

IoT_final_version_3_DEC/
├── SYSTEM_ARCHITECTURE.md           # This document
├── Makefile                         # Build and run commands
├── port_manager.py                  # Port management utility
├── requirements.txt                 # Global dependencies
│
├── catalog/                         # Resource Catalog Service
│   ├── main.py                      # REST API (648 lines)
│   ├── service_registry.py          # Service registration (105 lines)
│   ├── device_manager.py            # Device management (582 lines)
│   ├── config_manager.py            # Configuration (127 lines)
│   ├── catalog.json                 # Data storage
│   └── CATALOG_SERVICE_ARCHITECTURE.md
│
├── message_broker/                  # Message Broker Service
│   ├── main.py                      # MQTT router
│   ├── mqtt_router.py               # Pattern matching
│   ├── config_manager.py            # Config sync
│   └── MESSAGE_BROKER_ARCHITECTURE.md
│
├── timeSeriesDbConnector/           # TimeSeries DB Connector
│   ├── main.py                      # REST API
│   ├── influxdb3_storage.py         # InfluxDB integration
│   ├── mqtt_subscriber.py           # MQTT subscription
│   ├── data_models.py               # Data classes
│   └── TIMESERIES_DB_CONNECTOR_ARCHITECTURE.md
│
├── analytics/                       # Analytics Service
│   ├── main.py                      # REST API (800 lines)
│   ├── analytics_engine.py          # Statistical analysis (353 lines)
│   ├── anomaly_detector.py          # Anomaly detection (473 lines)
│   ├── mqtt_publisher.py            # Alert publishing (192 lines)
│   ├── catalog_client.py            # Catalog integration (200 lines)
│   └── ANALYTICS_SERVICE_ARCHITECTURE.md
│
├── account_manager/                 # Account Manager Service
│   ├── main.py                      # REST API
│   ├── supabase_db.py               # Database operations
│   ├── auth.py                      # Authentication
│   └── ACCOUNT_MANAGER_ARCHITECTURE.md
│
├── control_center/                  # Control Center Service
│   ├── main.py                      # REST API (468 lines)
│   ├── decision_engine.py           # Decision logic (239 lines)
│   ├── control_rules.py             # 12 control rules (305 lines)
│   ├── valve_commander.py           # MQTT commands (283 lines)
│   ├── analytics_client.py          # Analytics integration (230 lines)
│   ├── auth_client.py               # Auth client (91 lines)
│   └── CONTROL_CENTER_ARCHITECTURE.md
│
├── raspberry-pi-north/              # Raspberry Pi North Simulator
│   ├── main.py                      # REST API
│   ├── sensor_simulator.py          # Simulation engine
│   ├── pipeline_manager.py          # Pipeline management
│   ├── data_generator.py            # Physics simulation
│   ├── mqtt_publisher.py            # MQTT communication
│   └── RASPBERRY_PI_SIMULATOR_ARCHITECTURE.md
│
├── raspberry-pi-south/              # Raspberry Pi South Simulator
│   ├── main.py                      # REST API
│   ├── sensor_simulator.py          # Simulation engine
│   ├── pipeline_manager.py          # Pipeline management
│   ├── data_generator.py            # Physics simulation
│   ├── mqtt_publisher.py            # MQTT communication
│   └── RASPBERRY_PI_SIMULATOR_ARCHITECTURE.md
│
├── telegram_bot/                    # Telegram Bot Service
│   ├── main.py                      # REST API
│   ├── bot_handler.py               # Bot commands (14 commands)
│   ├── mqtt_client.py               # MQTT integration
│   ├── auth_client.py               # Authentication
│   └── TELEGRAM_BOT_ARCHITECTURE.md
│
├── web_dashboard/                   # Web Dashboard
│   ├── app.py                       # Main Flask/Dash app (266 lines)
│   ├── components/
│   │   ├── auth.py                  # Auth manager (121 lines)
│   │   ├── layouts.py               # Common UI (269 lines)
│   │   └── service_client.py        # API client (467 lines)
│   ├── pages/
│   │   ├── landing.py               # Landing page (453 lines)
│   │   ├── overview.py              # Overview dashboard (507 lines)
│   │   ├── pipelines.py             # Pipeline monitoring (588 lines)
│   │   ├── alerts.py                # Alerts page (475 lines)
│   │   ├── analytics.py             # Analytics page (563 lines)
│   │   ├── control.py               # Control panel (583 lines)
│   │   ├── users.py                 # User management (467 lines)
│   │   └── pipeline_management.py   # Pipeline CRUD (568 lines)
│   └── WEB_DASHBOARD_ARCHITECTURE.md
│
├── common_utils/                    # Shared Utilities
│   └── ResourceCatalogClient        # Service registration helper
│
└── DevelopmentDOC/                  # Development Documentation
    └── 2025-12-13/                  # Daily logs

Summary

The IoT Pipeline Monitoring System is a comprehensive microservices architecture that demonstrates:

  1. Service Discovery: Centralized catalog with health monitoring
  2. Event-Driven Architecture: MQTT-based message routing with QoS 2
  3. Real-Time Analytics: Multi-method anomaly detection and risk assessment
  4. Automated Control: Rule-based decision engine with 12 configurable rules
  5. Multi-Sector Deployment: Geographic distribution with sector-based isolation
  6. Role-Based Security: JWT authentication with granular permissions
  7. Dual Storage: Time-series (InfluxDB) + relational (Supabase) databases
  8. Multiple Interfaces: Web dashboard + Telegram bot for monitoring and control

The system follows microservices best practices including loose coupling, single responsibility, independent deployment, and centralized configuration management.