Skip to content

Commit 9b5a9dc

Browse files
authored
feat: Metrics to count all non-terminal execution statuses (#186)
### TL;DR Added a metrics polling service that periodically queries the database to emit execution status count gauges for monitoring active (non-terminal) pipeline executions. ![Screenshot 2026-03-23 at 5.24.33 PM.png](https://app.graphite.com/user-attachments/assets/bde3e0ea-123d-484b-abae-23c418332a5b.png) ### What changed? - Added `EXECUTIONS` metric unit to `MetricUnit` enum - Created `execution_status_count` observable gauge to track execution node counts by status - Implemented `PollingService` class in new `metrics_poller.py` module that: - Queries the database every 30 seconds for execution status counts - Only tracks active (non-terminal) statuses like PENDING, RUNNING, etc. - Thread-safely updates gauge observations with current counts - Integrated metrics poller as a daemon thread in the application startup, with automatic detection of OpenTelemetry metrics configuration ### How to test? 1. Set the OpenTelemetry metrics exporter endpoint environment variable (see `examples/observability/otel-jaeger-prometheus`) 2. Start the application and verify the metrics poller thread launches 3. Create pipeline executions in various active statuses 4. Check that the `execution.status.count` gauge emits observations with correct counts for each status 5. Verify that terminal statuses (SUCCEEDED, FAILED) are not included in the gauge metrics ### Why make this change? This enables real-time monitoring of pipeline execution health by providing visibility into how many executions are currently in each active state, which is essential for operational observability.
1 parent 2a77059 commit 9b5a9dc

3 files changed

Lines changed: 124 additions & 2 deletions

File tree

cloud_pipelines_backend/instrumentation/metrics.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class MetricUnit(str, enum.Enum):
4040

4141
SECONDS = "s"
4242
ERRORS = "{error}"
43+
EXECUTIONS = "{execution}"
4344

4445

4546
# ---------------------------------------------------------------------------
@@ -59,6 +60,13 @@ class MetricUnit(str, enum.Enum):
5960
unit=MetricUnit.SECONDS,
6061
)
6162

63+
execution_status_count = orchestrator_meter.create_observable_gauge(
64+
name="execution.status.count",
65+
callbacks=[],
66+
description="Number of execution nodes in each active (non-terminal) status",
67+
unit=MetricUnit.EXECUTIONS,
68+
)
69+
6270

6371
# ---------------------------------------------------------------------------
6472
# SQLAlchemy event listeners
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""Metrics polling.
2+
3+
Periodically queries the DB and updates ObservableGauges. Currently emits
4+
execution status counts; add new DB-backed metrics here as needed.
5+
6+
Only fluctuating (non-terminal) statuses are emitted as status count gauges —
7+
terminal statuses like SUCCEEDED and FAILED only ever climb and are not useful
8+
as gauges.
9+
"""
10+
11+
import logging
12+
import time
13+
import typing
14+
15+
import sqlalchemy as sql
16+
from opentelemetry import metrics as otel_metrics
17+
from sqlalchemy import orm
18+
19+
from .. import backend_types_sql as bts
20+
from . import metrics as app_metrics
21+
from .opentelemetry._internal import configuration as otel_configuration
22+
23+
_logger = logging.getLogger(__name__)
24+
25+
26+
# All statuses minus terminal (ended) ones — these fluctuate up and down
27+
_ACTIVE_STATUSES: frozenset[bts.ContainerExecutionStatus] = (
28+
frozenset(bts.ContainerExecutionStatus) - bts.CONTAINER_STATUSES_ENDED
29+
)
30+
31+
32+
def _empty_status_counts() -> dict[str, int]:
33+
return {s.value: 0 for s in _ACTIVE_STATUSES}
34+
35+
36+
class PollingService:
37+
"""Polls the DB periodically and emits execution status count gauges."""
38+
39+
def __init__(
40+
self,
41+
*,
42+
session_factory: typing.Callable[[], orm.Session],
43+
poll_interval_seconds: float = 30.0,
44+
) -> None:
45+
self._session_factory = session_factory
46+
self._poll_interval_seconds = poll_interval_seconds
47+
# Initialize all active statuses to 0
48+
self._counts: dict[str, int] = _empty_status_counts()
49+
# Register our observe method as the gauge callback.
50+
# The OTel SDK stores callbacks in _callbacks; we append after creation
51+
# since create_observable_gauge is called at module load time in metrics.py.
52+
app_metrics.execution_status_count._callbacks.append(self._observe)
53+
54+
def run_loop(self) -> None:
55+
while True:
56+
try:
57+
self._poll()
58+
except Exception:
59+
_logger.exception("Metrics PollingService: error polling DB")
60+
time.sleep(self._poll_interval_seconds)
61+
62+
def _poll(self) -> None:
63+
with self._session_factory() as session:
64+
rows = session.execute(
65+
sql.select(
66+
bts.ExecutionNode.container_execution_status,
67+
sql.func.count().label("count"),
68+
)
69+
.where(
70+
bts.ExecutionNode.container_execution_status.in_(_ACTIVE_STATUSES)
71+
)
72+
.group_by(bts.ExecutionNode.container_execution_status)
73+
).all()
74+
new_counts = _empty_status_counts()
75+
for status, count in rows:
76+
if status is not None:
77+
new_counts[status.value] = count
78+
# CPython: attribute assignment is atomic under the GIL; no lock needed.
79+
# If GIL-free Python is ever adopted, revisit this.
80+
self._counts = new_counts
81+
_logger.debug(f"Metrics PollingService: polled status counts: {new_counts}")
82+
83+
def _observe(
84+
self, _options: otel_metrics.CallbackOptions
85+
) -> typing.Iterable[otel_metrics.Observation]:
86+
counts = self._counts.copy()
87+
for status_value, count in counts.items():
88+
yield otel_metrics.Observation(count, {"execution.status": status_value})
89+
90+
91+
def run(*, db_engine: sql.Engine) -> None:
92+
"""Check OTel config and run the metrics polling loop (blocking).
93+
94+
Logs and returns immediately if no metrics endpoint is configured.
95+
"""
96+
otel_config = otel_configuration.resolve()
97+
if otel_config is None or otel_config.metrics is None:
98+
_logger.info(
99+
f"No OTel metrics endpoint configured"
100+
f" (set {otel_configuration.EnvVar.METRIC_EXPORTER_ENDPOINT})"
101+
f" — metrics poller not starting"
102+
)
103+
return
104+
session_factory = orm.sessionmaker(
105+
autocommit=False, autoflush=False, bind=db_engine
106+
)
107+
PollingService(session_factory=session_factory).run_loop()

start_local.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,13 @@ def run_orchestrator(
211211
# endregion
212212

213213

214+
# region: Metrics poller initialization
215+
216+
from cloud_pipelines_backend.instrumentation import metrics_polling
217+
218+
# endregion
219+
220+
214221
# region: API Server initialization
215222
import contextlib
216223
import threading
@@ -228,9 +235,9 @@ def run_orchestrator(
228235
@contextlib.asynccontextmanager
229236
async def lifespan(app: fastapi.FastAPI):
230237
database_ops.initialize_and_migrate_db(db_engine=db_engine)
238+
threading.Thread(target=run_configured_orchestrator, daemon=True).start()
231239
threading.Thread(
232-
target=run_configured_orchestrator,
233-
daemon=True,
240+
target=metrics_polling.run, kwargs={"db_engine": db_engine}, daemon=True
234241
).start()
235242
if os.environ.get("GOOGLE_CLOUD_SHELL") == "true":
236243
# TODO: Find a way to get fastapi/starlette/uvicorn port

0 commit comments

Comments
 (0)