Skip to content

Commit 2a77059

Browse files
authored
feat: Measure execution status duration (#174)
### **Changes** - Adds `status_updated_at` column to `ExecutionNode` table to track when execution status last changed - Implements SQLAlchemy event listener to automatically update `status_updated_at` timestamp when `container_execution_status` changes - Creates `execution_status_transition_duration` histogram metric to measure time spent in each execution status - Adds `_transition_execution_status()` helper function to centralize status updates and metric recording across all status transitions - Implements database migration logic to add the new `status_updated_at` column to existing tables - Replaces direct status assignments throughout the orchestrator with calls to the new transition helper function ## **Show** **of** **work** ##### **Note: Attribute** **names** **have** **since** **changed to** `execution_status_` prefix ![image.png](https://app.graphite.com/user-attachments/assets/9f1ffff7-b4a1-4151-8b84-462e83268f12.png) Local smoke-test and verification completed ✅
1 parent 3d7c8a9 commit 2a77059

4 files changed

Lines changed: 166 additions & 1 deletion

File tree

cloud_pipelines_backend/backend_types_sql.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,10 @@ class ExecutionNode(_TableBase):
424424
repr=False,
425425
)
426426

427+
_status_changed: bool = dataclasses.field(default=False, init=False, repr=False)
427428

429+
430+
EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY = "container_execution_status_history"
428431
EXECUTION_NODE_EXTRA_DATA_SYSTEM_ERROR_EXCEPTION_MESSAGE_KEY = (
429432
"system_error_exception_message"
430433
)

cloud_pipelines_backend/instrumentation/metrics.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,25 @@
2222
- Instrument: orchestrator_execution_system_errors
2323
"""
2424

25+
import datetime
26+
import enum
27+
import logging
28+
2529
from opentelemetry import metrics as otel_metrics
30+
from sqlalchemy import event as sql_event
31+
from sqlalchemy import orm
32+
33+
from .. import backend_types_sql
34+
35+
_logger = logging.getLogger(__name__)
36+
37+
38+
class MetricUnit(str, enum.Enum):
39+
"""UCUM-style unit strings accepted by the OTel SDK."""
40+
41+
SECONDS = "s"
42+
ERRORS = "{error}"
43+
2644

2745
# ---------------------------------------------------------------------------
2846
# tangle.orchestrator
@@ -32,5 +50,47 @@
3250
execution_system_errors = orchestrator_meter.create_counter(
3351
name="execution.system_errors",
3452
description="Number of execution nodes that ended in SYSTEM_ERROR status",
35-
unit="{error}",
53+
unit=MetricUnit.ERRORS,
54+
)
55+
56+
execution_status_transition_duration = orchestrator_meter.create_histogram(
57+
name="execution.status_transition.duration",
58+
description="Duration an execution spent in a status before transitioning to the next status",
59+
unit=MetricUnit.SECONDS,
3660
)
61+
62+
63+
# ---------------------------------------------------------------------------
64+
# SQLAlchemy event listeners
65+
# ---------------------------------------------------------------------------
66+
67+
_HISTORY_KEY = backend_types_sql.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY
68+
69+
70+
@sql_event.listens_for(orm.Session, "before_commit")
71+
def _handle_before_commit(session: orm.Session) -> None:
72+
for obj in list(session.new) + list(session.dirty):
73+
if not isinstance(obj, backend_types_sql.ExecutionNode):
74+
continue
75+
if not obj._status_changed:
76+
continue
77+
history: list = (obj.extra_data or {}).get(_HISTORY_KEY, [])
78+
if len(history) >= 2:
79+
prev = history[-2]
80+
curr = history[-1]
81+
prev_time = datetime.datetime.fromisoformat(prev["first_observed_at"])
82+
curr_time = datetime.datetime.fromisoformat(curr["first_observed_at"])
83+
try:
84+
execution_status_transition_duration.record(
85+
(curr_time - prev_time).total_seconds(),
86+
attributes={
87+
"execution.status.from": prev["status"],
88+
"execution.status.to": curr["status"],
89+
},
90+
)
91+
except Exception:
92+
_logger.warning(
93+
f"Failed to record status transition metric for execution {obj.id!r}",
94+
exc_info=True,
95+
)
96+
obj._status_changed = False

cloud_pipelines_backend/orchestrator_sql.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010

1111
import sqlalchemy as sql
12+
from sqlalchemy import event as sql_event
1213
from sqlalchemy import orm
1314

1415
from cloud_pipelines.orchestration.storage_providers import (
@@ -1101,3 +1102,35 @@ def _maybe_get_small_artifact_value(
11011102
return text
11021103
except Exception:
11031104
pass
1105+
1106+
1107+
# ---------------------------------------------------------------------------
1108+
# SQLAlchemy event listeners
1109+
# ---------------------------------------------------------------------------
1110+
1111+
_HISTORY_KEY = bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY
1112+
1113+
1114+
@sql_event.listens_for(bts.ExecutionNode.container_execution_status, "set")
1115+
def _handle_container_execution_status_set(
1116+
execution: bts.ExecutionNode,
1117+
value: bts.ContainerExecutionStatus | None,
1118+
_old_value: object,
1119+
_initiator: object,
1120+
) -> None:
1121+
if value is None:
1122+
return
1123+
if execution.extra_data is None:
1124+
execution.extra_data = {}
1125+
history: list = execution.extra_data.get(_HISTORY_KEY, [])
1126+
if history and history[-1]["status"] == value.value:
1127+
return
1128+
entry = {
1129+
"status": value.value,
1130+
"first_observed_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
1131+
}
1132+
execution.extra_data = {
1133+
**execution.extra_data,
1134+
_HISTORY_KEY: history + [entry],
1135+
}
1136+
execution._status_changed = True

tests/test_sql_event_listeners.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""Tests for SQLAlchemy event listeners in orchestrator_sql and instrumentation.metrics."""
2+
3+
import unittest.mock
4+
5+
import pytest
6+
from sqlalchemy import orm
7+
8+
from cloud_pipelines_backend import backend_types_sql as bts
9+
from cloud_pipelines_backend import database_ops
10+
from cloud_pipelines_backend import (
11+
orchestrator_sql,
12+
) # noqa: F401 — registers set listener
13+
from cloud_pipelines_backend.instrumentation import (
14+
metrics,
15+
) # noqa: F401 — registers before_commit listener
16+
17+
18+
@pytest.fixture()
19+
def session() -> orm.Session:
20+
db_engine = database_ops.create_db_engine(database_uri="sqlite://")
21+
bts._TableBase.metadata.create_all(db_engine)
22+
with orm.Session(db_engine) as s:
23+
yield s
24+
25+
26+
class TestStatusHistoryListeners:
27+
def test_status_change_appends_history_to_extra_data(
28+
self, session: orm.Session
29+
) -> None:
30+
node = bts.ExecutionNode(task_spec={})
31+
session.add(node)
32+
node.container_execution_status = bts.ContainerExecutionStatus.QUEUED
33+
session.commit()
34+
35+
history = node.extra_data[bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY]
36+
assert len(history) == 1
37+
assert history[0]["status"] == bts.ContainerExecutionStatus.QUEUED
38+
39+
def test_duplicate_status_is_not_appended_to_history(
40+
self, session: orm.Session
41+
) -> None:
42+
node = bts.ExecutionNode(task_spec={})
43+
session.add(node)
44+
node.container_execution_status = bts.ContainerExecutionStatus.QUEUED
45+
node.container_execution_status = bts.ContainerExecutionStatus.QUEUED
46+
session.commit()
47+
48+
history = node.extra_data[bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY]
49+
assert len(history) == 1
50+
51+
def test_second_status_change_records_duration_metric(
52+
self, session: orm.Session
53+
) -> None:
54+
node = bts.ExecutionNode(task_spec={})
55+
session.add(node)
56+
node.container_execution_status = bts.ContainerExecutionStatus.QUEUED
57+
session.commit()
58+
59+
node.container_execution_status = bts.ContainerExecutionStatus.RUNNING
60+
with unittest.mock.patch.object(
61+
metrics.execution_status_transition_duration, "record"
62+
) as mock_record:
63+
session.commit()
64+
65+
mock_record.assert_called_once()
66+
assert mock_record.call_args.kwargs["attributes"] == {
67+
"execution.status.from": bts.ContainerExecutionStatus.QUEUED,
68+
"execution.status.to": bts.ContainerExecutionStatus.RUNNING,
69+
}

0 commit comments

Comments
 (0)