diff --git a/backend/__pycache__/main.cpython-313.pyc b/backend/__pycache__/main.cpython-313.pyc new file mode 100644 index 0000000..4d28e32 Binary files /dev/null and b/backend/__pycache__/main.cpython-313.pyc differ diff --git a/backend/api/__pycache__/schemas.cpython-313.pyc b/backend/api/__pycache__/schemas.cpython-313.pyc new file mode 100644 index 0000000..908061f Binary files /dev/null and b/backend/api/__pycache__/schemas.cpython-313.pyc differ diff --git a/backend/api/routes/__pycache__/ai.cpython-311.pyc b/backend/api/routes/__pycache__/ai.cpython-311.pyc new file mode 100644 index 0000000..6b88850 Binary files /dev/null and b/backend/api/routes/__pycache__/ai.cpython-311.pyc differ diff --git a/backend/api/routes/__pycache__/ai.cpython-313.pyc b/backend/api/routes/__pycache__/ai.cpython-313.pyc new file mode 100644 index 0000000..51a0e7e Binary files /dev/null and b/backend/api/routes/__pycache__/ai.cpython-313.pyc differ diff --git a/backend/core/__pycache__/executor.cpython-313.pyc b/backend/core/__pycache__/executor.cpython-313.pyc new file mode 100644 index 0000000..59746e5 Binary files /dev/null and b/backend/core/__pycache__/executor.cpython-313.pyc differ diff --git a/backend/core/executor.py b/backend/core/executor.py index 11c9b47..dc79c36 100644 --- a/backend/core/executor.py +++ b/backend/core/executor.py @@ -4,10 +4,18 @@ """ import traceback from datetime import datetime +import time from typing import Dict, Any, Optional from backend.models.pipeline import Pipeline, Stage, Execution, ExecutionStatus, LogLevel, StageType from backend.core.pipeline_engine import PipelineEngine +from backend.monitoring.metrics import ( + pipeline_executions_total, + pipeline_failures_total, + pipeline_execution_duration_seconds, + stage_execution_duration_seconds, + pipeline_active_executions +) class PipelineExecutor: @@ -36,6 +44,10 @@ def execute(self, pipeline: Pipeline) -> Execution: execution.status = ExecutionStatus.RUNNING execution.add_log(None, LogLevel.INFO, f"Starting pipeline execution: {pipeline.name}") + # Track active pipeline start + pipeline_active_executions.labels(pipeline_id=pipeline.id).inc() + start_time = time.time() + try: # Get execution order (topological sort) execution_order = self.engine.get_execution_order(pipeline) @@ -60,6 +72,12 @@ def execute(self, pipeline: Pipeline) -> Execution: f"Pipeline completed successfully in {execution.duration:.2f}s" ) + # Record success metrics + duration = time.time() - start_time + pipeline_executions_total.labels(pipeline_id=pipeline.id, status='success').inc() + pipeline_execution_duration_seconds.labels(pipeline_id=pipeline.id).observe(duration) + pipeline_active_executions.labels(pipeline_id=pipeline.id).dec() + except Exception as e: # Handle execution failure execution.status = ExecutionStatus.FAILED @@ -71,6 +89,13 @@ def execute(self, pipeline: Pipeline) -> Execution: f"Pipeline execution failed: {str(e)}", metadata={"traceback": traceback.format_exc()} ) + + # Record failure metrics + duration = time.time() - start_time + pipeline_executions_total.labels(pipeline_id=pipeline.id, status='failed').inc() + pipeline_failures_total.labels(pipeline_id=pipeline.id).inc() + pipeline_execution_duration_seconds.labels(pipeline_id=pipeline.id).observe(duration) + pipeline_active_executions.labels(pipeline_id=pipeline.id).dec() return execution @@ -83,6 +108,7 @@ def _execute_stage(self, stage: Stage, execution: Execution) -> None: execution: Current execution context """ execution.add_log(stage.id, LogLevel.INFO, f"Starting stage: {stage.name}") + stage_start_time = time.time() try: # Execute based on stage type @@ -107,6 +133,14 @@ def _execute_stage(self, stage: Stage, execution: Execution) -> None: metadata={"result_keys": list(result.keys()) if isinstance(result, dict) else None} ) + # Record stage success metrics + stage_duration = time.time() - stage_start_time + stage_execution_duration_seconds.labels( + pipeline_id=execution.pipeline_id, + stage_name=stage.name, + status='success' + ).observe(stage_duration) + except Exception as e: execution.add_log( stage.id, @@ -114,6 +148,14 @@ def _execute_stage(self, stage: Stage, execution: Execution) -> None: f"Stage failed: {str(e)}", metadata={"traceback": traceback.format_exc()} ) + + # Record stage failure metrics + stage_duration = time.time() - stage_start_time + stage_execution_duration_seconds.labels( + pipeline_id=execution.pipeline_id, + stage_name=stage.name, + status='failed' + ).observe(stage_duration) raise def _execute_input_stage(self, stage: Stage, execution: Execution) -> Dict[str, Any]: diff --git a/backend/db/__pycache__/crud.cpython-313.pyc b/backend/db/__pycache__/crud.cpython-313.pyc new file mode 100644 index 0000000..6448398 Binary files /dev/null and b/backend/db/__pycache__/crud.cpython-313.pyc differ diff --git a/backend/main.py b/backend/main.py index 0eda646..85dfff4 100644 --- a/backend/main.py +++ b/backend/main.py @@ -4,7 +4,8 @@ """ from fastapi import Depends, FastAPI, Request from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, Response +from prometheus_client import generate_latest, CONTENT_TYPE_LATEST from datetime import datetime import uvicorn @@ -71,6 +72,13 @@ async def health_check(): } +# Prometheus Metrics scraping endpoint +@app.get("/metrics", tags=["monitoring"]) +async def prometheus_metrics(): + """Prometheus metrics endpoint""" + return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) + + # Root endpoint @app.get("/", tags=["root"]) async def root(): diff --git a/backend/monitoring/__pycache__/metrics.cpython-311.pyc b/backend/monitoring/__pycache__/metrics.cpython-311.pyc new file mode 100644 index 0000000..f378721 Binary files /dev/null and b/backend/monitoring/__pycache__/metrics.cpython-311.pyc differ diff --git a/backend/monitoring/__pycache__/metrics.cpython-313.pyc b/backend/monitoring/__pycache__/metrics.cpython-313.pyc new file mode 100644 index 0000000..7d433c2 Binary files /dev/null and b/backend/monitoring/__pycache__/metrics.cpython-313.pyc differ diff --git a/backend/monitoring/metrics.py b/backend/monitoring/metrics.py new file mode 100644 index 0000000..e4ce94b --- /dev/null +++ b/backend/monitoring/metrics.py @@ -0,0 +1,52 @@ +""" +Prometheus Metrics Definitions for Pipeline Monitoring +""" +import os +from prometheus_client import Counter, Histogram, Gauge, REGISTRY + +# Disable the default process/platform metrics for cleaner app-specific dashboards if desired +# from prometheus_client import process_collector, platform_collector, gc_collector +# REGISTRY.unregister(process_collector.ProcessCollector()) +# REGISTRY.unregister(platform_collector.PlatformCollector()) +# REGISTRY.unregister(gc_collector.GCCollector()) + +# --- Counters --- +# Tracks the total number of pipeline executions, labeled by pipeline_id and final status +pipeline_executions_total = Counter( + 'pipeline_executions_total', + 'Total number of pipeline executions', + ['pipeline_id', 'status'] +) + +# Tracks the total number of explicit failures, labeled by pipeline_id +pipeline_failures_total = Counter( + 'pipeline_failures_total', + 'Total number of failed pipeline executions', + ['pipeline_id'] +) + +# --- Histograms --- +# Tracks the distribution of total end-to-end execution duration for a pipeline +# Buckets: 1s, 5s, 15s, 30s, 1m, 2m, 5m, 10m, 15m, 30m, 1h, +Inf +pipeline_execution_duration_seconds = Histogram( + 'pipeline_execution_duration_seconds', + 'Pipeline execution duration in seconds', + ['pipeline_id'], + buckets=(1.0, 5.0, 15.0, 30.0, 60.0, 120.0, 300.0, 600.0, 900.0, 1800.0, 3600.0, float('inf')) +) + +# Tracks the duration of individual stages within a pipeline +stage_execution_duration_seconds = Histogram( + 'stage_execution_duration_seconds', + 'Stage execution duration in seconds', + ['pipeline_id', 'stage_name', 'status'], + buckets=(0.1, 0.5, 1.0, 5.0, 15.0, 30.0, 60.0, 120.0, 300.0, float('inf')) +) + +# --- Gauges --- +# Tracks the current number of concurrently running pipelines +pipeline_active_executions = Gauge( + 'pipeline_active_executions', + 'Number of currently running pipeline executions', + ['pipeline_id'] +) diff --git a/docs/monitoring/grafana_dashboard.json b/docs/monitoring/grafana_dashboard.json new file mode 100644 index 0000000..ece18f6 --- /dev/null +++ b/docs/monitoring/grafana_dashboard.json @@ -0,0 +1,149 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "liveNow": false, + "panels": [ + { + "collapsed": false, + "gridPos": { + "h": 8, + "w": 6, + "x": 0, + "y": 0 + }, + "id": 1, + "panels": [], + "title": "Pipeline Success Rate (%)", + "type": "stat", + "datasource": "Prometheus", + "targets": [ + { + "expr": "sum(rate(pipeline_executions_total{status=\"success\"}[5m])) / sum(rate(pipeline_executions_total[5m])) * 100", + "refId": "A" + } + ], + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + } + }, + { + "collapsed": false, + "gridPos": { + "h": 8, + "w": 6, + "x": 6, + "y": 0 + }, + "id": 2, + "panels": [], + "title": "Active Pipeline Executions", + "type": "gauge", + "datasource": "Prometheus", + "targets": [ + { + "expr": "sum(pipeline_active_executions)", + "refId": "A" + } + ], + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "text": {} + } + }, + { + "collapsed": false, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 3, + "panels": [], + "title": "Pipeline Execution Duration (95th Percentile)", + "type": "timeseries", + "datasource": "Prometheus", + "targets": [ + { + "expr": "histogram_quantile(0.95, sum(rate(pipeline_execution_duration_seconds_bucket[5m])) by (le, pipeline_id))", + "legendFormat": "{{pipeline_id}} (95% CI)", + "refId": "A" + } + ], + "options": { + "tooltip": { + "mode": "single", + "sort": "none" + } + } + }, + { + "collapsed": false, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 8 + }, + "id": 4, + "panels": [], + "title": "Global Pipeline Throughput (Executions per Minute)", + "type": "timeseries", + "datasource": "Prometheus", + "targets": [ + { + "expr": "sum(rate(pipeline_executions_total[1m])) * 60", + "legendFormat": "Executions / Min", + "refId": "A" + } + ], + "options": { + "tooltip": { + "mode": "single", + "sort": "none" + } + } + } + ], + "refresh": "5s", + "schemaVersion": 38, + "style": "dark", + "tags": [ + "flexiroaster", + "pipelines" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "FlexiRoaster Pipeline Monitoring" +} \ No newline at end of file