Skip to content

Commit fa70d6d

Browse files
authored
Add retroactive OTel execution lifecycle tracing (#252)
## Summary Emits a root `execution` span and one `execution.status` child span per status history entry when an `ExecutionNode` reaches a terminal state. All span timestamps are derived from the existing status history so durations reflect actual time spent, not when this code ran. - New module: `cloud_pipelines_backend/instrumentation/execution_tracing.py` - Hook: `metrics._handle_before_commit` calls `try_emit_execution_trace` - Orchestrator: `otel.setup_providers()` so the exporter is active - Tests: `InMemorySpanExporter`\-backed suite in `tests/instrumentation/` ## Screenshots ![Screenshot 2026-05-22 at 8.09.01 PM.png](https://app.graphite.com/user-attachments/assets/5ab74df1-27e9-4c9d-96d6-09c9a02ff31a.png) ![Screenshot 2026-05-22 at 8.13.33 PM.png](https://app.graphite.com/user-attachments/assets/7426c24f-64d5-4f20-90ea-04b80e56db57.png) ![Screenshot 2026-05-22 at 8.14.49 PM.png](https://app.graphite.com/user-attachments/assets/c730d657-34bd-47a2-97a7-a256039439a7.png)
1 parent ec83b54 commit fa70d6d

4 files changed

Lines changed: 223 additions & 0 deletions

File tree

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
"""Retroactive OTel trace emission for execution lifecycle.
2+
3+
When an ExecutionNode reaches a terminal status, emits a root ``execution``
4+
span covering the full lifetime plus one ``execution.status`` child span per
5+
status entry recorded in the status history. All timestamps are derived from
6+
the history so span durations reflect actual time spent, not when this code
7+
ran.
8+
"""
9+
10+
import datetime
11+
import logging
12+
13+
from opentelemetry import trace
14+
15+
from .. import backend_types_sql as bts
16+
17+
_logger = logging.getLogger(__name__)
18+
_tracer = trace.get_tracer("tangle.orchestrator")
19+
20+
_HISTORY_KEY = bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY
21+
_TERMINAL_STATUSES = frozenset(s.value for s in bts.CONTAINER_STATUSES_ENDED)
22+
23+
24+
def _ns(*, dt: datetime.datetime) -> int:
25+
"""Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK)."""
26+
if dt.tzinfo is None:
27+
dt = dt.replace(tzinfo=datetime.timezone.utc)
28+
return int(dt.timestamp() * 1_000_000_000)
29+
30+
31+
def try_emit_execution_trace(*, execution: bts.ExecutionNode) -> None:
32+
"""Emit a complete execution trace when *execution* reaches a terminal status.
33+
34+
No-op for non-terminal executions. All exceptions are caught and logged so
35+
that tracing failures never affect the surrounding SQLAlchemy commit.
36+
"""
37+
history: list = (execution.extra_data or {}).get(_HISTORY_KEY, [])
38+
if not history or history[-1]["status"] not in _TERMINAL_STATUSES:
39+
return
40+
try:
41+
first_time = datetime.datetime.fromisoformat(history[0]["first_observed_at"])
42+
last_time = datetime.datetime.fromisoformat(history[-1]["first_observed_at"])
43+
44+
root = _tracer.start_span(
45+
"execution",
46+
attributes={"execution.id": execution.id},
47+
start_time=_ns(dt=first_time),
48+
)
49+
root_ctx = trace.set_span_in_context(root)
50+
51+
for i, entry in enumerate(history):
52+
t_start = datetime.datetime.fromisoformat(entry["first_observed_at"])
53+
t_end = (
54+
datetime.datetime.fromisoformat(history[i + 1]["first_observed_at"])
55+
if i + 1 < len(history)
56+
else last_time
57+
)
58+
attrs: dict[str, object] = {
59+
"execution.id": execution.id,
60+
"execution.status": entry["status"],
61+
}
62+
_tracer.start_span(
63+
f"execution.status {entry['status']}",
64+
context=root_ctx,
65+
attributes=attrs,
66+
start_time=_ns(dt=t_start),
67+
).end(end_time=_ns(dt=t_end))
68+
69+
root.end(end_time=_ns(dt=last_time))
70+
except Exception:
71+
_logger.exception(f"Failed to emit execution trace for {execution.id!r}")

cloud_pipelines_backend/instrumentation/metrics.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from sqlalchemy import orm
3232

3333
from .. import backend_types_sql
34+
from . import execution_tracing
3435

3536
_logger = logging.getLogger(__name__)
3637

@@ -102,3 +103,4 @@ def _handle_before_commit(session: orm.Session) -> None:
102103
exc_info=True,
103104
)
104105
obj._status_changed = False
106+
execution_tracing.try_emit_execution_trace(execution=obj)

orchestrator_main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from cloud_pipelines_backend import orchestrator_sql
99
from cloud_pipelines_backend.instrumentation import bugsnag_instrumentation
10+
from cloud_pipelines_backend.instrumentation import opentelemetry as otel
1011
from cloud_pipelines_backend.launchers import kubernetes_launchers
1112
from cloud_pipelines.orchestration.storage_providers import local_storage
1213

@@ -26,6 +27,7 @@ def _build_launcher():
2627
from cloud_pipelines_backend.launchers.skypilot_launchers import (
2728
SkyPilotKubernetesLauncher,
2829
)
30+
2931
return SkyPilotKubernetesLauncher(
3032
infra=os.environ.get("SKYPILOT_INFRA", "kubernetes"),
3133
pool=os.environ.get("SKYPILOT_POOL"),
@@ -36,6 +38,7 @@ def _build_launcher():
3638

3739
from kubernetes import config as k8s_config_lib
3840
from kubernetes import client as k8s_client_lib
41+
3942
try:
4043
k8s_config_lib.load_incluster_config()
4144
except Exception:
@@ -75,6 +78,7 @@ def main():
7578

7679
logger.info("Starting the orchestrator")
7780
bugsnag_instrumentation.setup(service_name="tangle-orchestrator")
81+
otel.setup_providers()
7882

7983
DEFAULT_DATABASE_URI = "sqlite:///db.sqlite"
8084
database_uri = os.environ.get("DATABASE_URI", DEFAULT_DATABASE_URI)
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
"""Tests for execution lifecycle OTel trace emission."""
2+
3+
import datetime
4+
5+
import pytest
6+
from opentelemetry import trace
7+
from opentelemetry.sdk import trace as otel_sdk_trace
8+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
9+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
10+
11+
from cloud_pipelines_backend import backend_types_sql as bts
12+
from cloud_pipelines_backend.instrumentation import execution_tracing
13+
14+
15+
@pytest.fixture()
16+
def span_exporter(monkeypatch: pytest.MonkeyPatch) -> InMemorySpanExporter:
17+
"""Isolated in-memory span exporter for each test.
18+
19+
Patches ``execution_tracing._tracer`` directly so tests are independent of
20+
global OTel provider state (the module-level ProxyTracer would otherwise
21+
remain bound to the provider from the first test run).
22+
"""
23+
exporter = InMemorySpanExporter()
24+
provider = otel_sdk_trace.TracerProvider()
25+
provider.add_span_processor(SimpleSpanProcessor(exporter))
26+
monkeypatch.setattr(
27+
execution_tracing, "_tracer", provider.get_tracer("tangle.orchestrator")
28+
)
29+
return exporter
30+
31+
32+
def _make_execution(
33+
*, statuses: list[str], extra: dict | None = None
34+
) -> bts.ExecutionNode:
35+
"""Build an ExecutionNode stub with a pre-populated status history.
36+
37+
Assigns a deterministic ID because OTel drops None-valued attributes and
38+
execution.id is only set by the DB insert_default in production.
39+
"""
40+
history = [
41+
{
42+
"status": s,
43+
"first_observed_at": (
44+
datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)
45+
+ datetime.timedelta(minutes=i * 5)
46+
).isoformat(),
47+
}
48+
for i, s in enumerate(statuses)
49+
]
50+
node = bts.ExecutionNode(task_spec={})
51+
node.id = "test-execution-id"
52+
node.extra_data = {
53+
bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY: history,
54+
**(extra or {}),
55+
}
56+
return node
57+
58+
59+
class TestTryEmitExecutionTrace:
60+
def test_no_spans_for_non_terminal_execution(
61+
self, span_exporter: InMemorySpanExporter
62+
) -> None:
63+
execution = _make_execution(statuses=["QUEUED", "RUNNING"])
64+
execution_tracing.try_emit_execution_trace(execution=execution)
65+
assert span_exporter.get_finished_spans() == ()
66+
67+
def test_no_spans_for_empty_history(
68+
self, span_exporter: InMemorySpanExporter
69+
) -> None:
70+
execution = _make_execution(statuses=[])
71+
execution_tracing.try_emit_execution_trace(execution=execution)
72+
assert span_exporter.get_finished_spans() == ()
73+
74+
def test_emits_root_and_child_spans_on_terminal(
75+
self, span_exporter: InMemorySpanExporter
76+
) -> None:
77+
execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"])
78+
execution_tracing.try_emit_execution_trace(execution=execution)
79+
80+
names = {s.name for s in span_exporter.get_finished_spans()}
81+
assert "execution" in names
82+
assert any(n.startswith("execution.status ") for n in names)
83+
84+
def test_child_span_count_matches_history(
85+
self, span_exporter: InMemorySpanExporter
86+
) -> None:
87+
execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"])
88+
execution_tracing.try_emit_execution_trace(execution=execution)
89+
90+
status_spans = [
91+
s
92+
for s in span_exporter.get_finished_spans()
93+
if s.name.startswith("execution.status ")
94+
]
95+
assert len(status_spans) == 3
96+
97+
def test_root_span_has_execution_id_attribute(
98+
self, span_exporter: InMemorySpanExporter
99+
) -> None:
100+
execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"])
101+
execution_tracing.try_emit_execution_trace(execution=execution)
102+
103+
root = next(
104+
s for s in span_exporter.get_finished_spans() if s.name == "execution"
105+
)
106+
assert root.attributes["execution.id"] == execution.id
107+
108+
def test_child_spans_share_trace_id_with_root(
109+
self, span_exporter: InMemorySpanExporter
110+
) -> None:
111+
execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"])
112+
execution_tracing.try_emit_execution_trace(execution=execution)
113+
114+
finished = span_exporter.get_finished_spans()
115+
trace_ids = {s.context.trace_id for s in finished}
116+
assert len(trace_ids) == 1
117+
118+
def test_root_span_duration_matches_history(
119+
self, span_exporter: InMemorySpanExporter
120+
) -> None:
121+
execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"])
122+
execution_tracing.try_emit_execution_trace(execution=execution)
123+
124+
root = next(
125+
s for s in span_exporter.get_finished_spans() if s.name == "execution"
126+
)
127+
duration_ns = root.end_time - root.start_time
128+
assert duration_ns == int(
129+
datetime.timedelta(minutes=10).total_seconds() * 1_000_000_000
130+
)
131+
132+
def test_child_span_status_attribute(
133+
self, span_exporter: InMemorySpanExporter
134+
) -> None:
135+
execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"])
136+
execution_tracing.try_emit_execution_trace(execution=execution)
137+
138+
status_spans = [
139+
s
140+
for s in span_exporter.get_finished_spans()
141+
if s.name.startswith("execution.status ")
142+
]
143+
assert {s.name for s in status_spans} == {
144+
"execution.status QUEUED",
145+
"execution.status SUCCEEDED",
146+
}

0 commit comments

Comments
 (0)