Skip to content

Commit 479f3e9

Browse files
authored
Add CPU, memory, and accelerator resource requests to PENDING span (#258)
## Add resource attributes to PENDING execution spans `execution.resources.cpu`, `execution.resources.memory`, and `execution.resources.accelerators` sourced from task_spec annotations. Enables distinguishing workload types (e.g. Nebius H100 vs GKE L4) and correlating resource requests to execution timing. The attributes are attached exclusively to the `PENDING` status span, where resource reservation is most meaningful. The function `try_emit_execution_trace` has been renamed to `emit_execution_trace` to better reflect its role. ## Screenshots ![Screenshot 2026-05-22 at 8.13.33 PM.png](https://app.graphite.com/user-attachments/assets/cc876c2d-d001-41ec-93bf-2834ef50af53.png)
1 parent 7336e2e commit 479f3e9

3 files changed

Lines changed: 91 additions & 3 deletions

File tree

cloud_pipelines_backend/instrumentation/execution_tracing.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from opentelemetry.trace import StatusCode
1515

1616
from .. import backend_types_sql as bts
17+
from ..launchers import kubernetes_launchers
1718

1819
_logger = logging.getLogger(__name__)
1920
_tracer = trace.get_tracer("tangle.orchestrator")
@@ -144,6 +145,23 @@ def _pipeline_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]:
144145
return {"execution.parent_id": execution.parent_execution_id}
145146

146147

148+
def _resource_attrs(*, execution: bts.ExecutionNode, status: str) -> dict[str, object]:
149+
"""CPU, memory, and accelerator requests for the PENDING span."""
150+
if status != bts.ContainerExecutionStatus.PENDING:
151+
return {}
152+
annotations: dict = (execution.task_spec or {}).get("annotations", {})
153+
attrs: dict[str, object] = {}
154+
if cpu := annotations.get(kubernetes_launchers.RESOURCES_CPU_ANNOTATION_KEY):
155+
attrs["execution.resources.cpu"] = cpu
156+
if memory := annotations.get(kubernetes_launchers.RESOURCES_MEMORY_ANNOTATION_KEY):
157+
attrs["execution.resources.memory"] = memory
158+
if accelerators := annotations.get(
159+
kubernetes_launchers.RESOURCES_ACCELERATORS_ANNOTATION_KEY
160+
):
161+
attrs["execution.resources.accelerators"] = accelerators
162+
return attrs
163+
164+
147165
def _ns(*, dt: datetime.datetime) -> int:
148166
"""Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK)."""
149167
if dt.tzinfo is None:
@@ -189,13 +207,18 @@ def emit_execution_trace(*, execution: bts.ExecutionNode) -> None:
189207
"execution.status": entry["status"],
190208
**_error_attrs(execution=execution, status=entry["status"]),
191209
**_launcher_pod_attrs(execution=execution, status=entry["status"]),
210+
**_resource_attrs(execution=execution, status=entry["status"]),
192211
}
212+
start_ns = _ns(dt=t_start)
213+
end_ns = _ns(dt=t_end)
214+
if end_ns <= start_ns:
215+
end_ns = start_ns + 1
193216
_tracer.start_span(
194217
f"execution.status {entry['status']}",
195218
context=root_ctx,
196219
attributes=attrs,
197-
start_time=_ns(dt=t_start),
198-
).end(end_time=_ns(dt=t_end))
220+
start_time=start_ns,
221+
).end(end_time=end_ns)
199222

200223
if history[-1]["status"] in _ERROR_TERMINAL_STATUSES:
201224
root.set_status(status=StatusCode.ERROR)

cloud_pipelines_backend/instrumentation/metrics.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,4 @@ def _handle_before_commit(session: orm.Session) -> None:
103103
exc_info=True,
104104
)
105105
obj._status_changed = False
106-
execution_tracing.try_emit_execution_trace(execution=obj)
106+
execution_tracing.emit_execution_trace(execution=obj)

tests/instrumentation/test_execution_tracing.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,3 +396,68 @@ def test_root_execution_omits_parent_id_when_absent(
396396
s for s in span_exporter.get_finished_spans() if s.name == "execution"
397397
)
398398
assert "execution.parent_id" not in (root.attributes or {})
399+
400+
401+
class TestResourceAttrs:
402+
def test_pending_span_carries_cpu_and_memory(
403+
self, span_exporter: InMemorySpanExporter
404+
) -> None:
405+
from cloud_pipelines_backend.launchers import kubernetes_launchers
406+
407+
execution = _make_execution(
408+
statuses=["QUEUED", "PENDING", "RUNNING", "SUCCEEDED"]
409+
)
410+
execution.task_spec = {
411+
"annotations": {
412+
kubernetes_launchers.RESOURCES_CPU_ANNOTATION_KEY: "4",
413+
kubernetes_launchers.RESOURCES_MEMORY_ANNOTATION_KEY: "16Gi",
414+
}
415+
}
416+
execution_tracing.emit_execution_trace(execution=execution)
417+
418+
pending_span = next(
419+
s
420+
for s in span_exporter.get_finished_spans()
421+
if s.attributes.get("execution.status") == "PENDING"
422+
)
423+
assert pending_span.attributes["execution.resources.cpu"] == "4"
424+
assert pending_span.attributes["execution.resources.memory"] == "16Gi"
425+
426+
def test_pending_span_carries_accelerators_when_present(
427+
self, span_exporter: InMemorySpanExporter
428+
) -> None:
429+
from cloud_pipelines_backend.launchers import kubernetes_launchers
430+
431+
execution = _make_execution(statuses=["QUEUED", "PENDING", "SUCCEEDED"])
432+
execution.task_spec = {
433+
"annotations": {
434+
kubernetes_launchers.RESOURCES_ACCELERATORS_ANNOTATION_KEY: '{"H100": 1}',
435+
}
436+
}
437+
execution_tracing.emit_execution_trace(execution=execution)
438+
439+
pending_span = next(
440+
s
441+
for s in span_exporter.get_finished_spans()
442+
if s.attributes.get("execution.status") == "PENDING"
443+
)
444+
assert (
445+
pending_span.attributes["execution.resources.accelerators"] == '{"H100": 1}'
446+
)
447+
448+
def test_non_pending_spans_have_no_resource_attrs(
449+
self, span_exporter: InMemorySpanExporter
450+
) -> None:
451+
from cloud_pipelines_backend.launchers import kubernetes_launchers
452+
453+
execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"])
454+
execution.task_spec = {
455+
"annotations": {
456+
kubernetes_launchers.RESOURCES_ACCELERATORS_ANNOTATION_KEY: '{"H100": 1}',
457+
}
458+
}
459+
execution_tracing.emit_execution_trace(execution=execution)
460+
461+
for span in span_exporter.get_finished_spans():
462+
assert "execution.resources.cpu" not in (span.attributes or {})
463+
assert "execution.resources.accelerators" not in (span.attributes or {})

0 commit comments

Comments
 (0)