Skip to content

Commit decae68

Browse files
authored
bugsnag: propagate cloud_provider into contextual logging for queued executions (#268)
1 parent 7077bbb commit decae68

3 files changed

Lines changed: 78 additions & 2 deletions

File tree

cloud_pipelines_backend/instrumentation/contextual_logging.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
- execution_id: From ExecutionNode.id - tracks individual execution nodes
1010
- container_execution_id: From ContainerExecution.id - tracks running containers
1111
- user_id: User who initiated the operation
12+
- cloud_provider: From task_spec annotations, set by execution_logging_context for orchestrated runs
1213
- Any other metadata you want to track in logs
1314
1415
Usage:
@@ -23,7 +24,12 @@
2324

2425
import contextvars
2526
from contextlib import contextmanager
26-
from typing import Any, Optional
27+
from typing import TYPE_CHECKING, Any, Optional
28+
29+
if TYPE_CHECKING:
30+
from .. import backend_types_sql as bts
31+
32+
_CLOUD_PROVIDER_ANNOTATION_KEY = "cloud-pipelines.net/orchestration/cloud_provider"
2733

2834
# Single context variable to store all metadata as a dictionary
2935
_context_metadata: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar(
@@ -125,3 +131,19 @@ def logging_context(**metadata: Any):
125131
finally:
126132
# Restore previous metadata
127133
_context_metadata.set(prev_metadata)
134+
135+
136+
def execution_logging_context(execution: "bts.ExecutionNode"):
137+
"""Return a logging context populated with metadata for *execution*.
138+
139+
Always sets ``execution_id``. Also sets ``cloud_provider`` when the
140+
``cloud-pipelines.net/orchestration/cloud_provider`` annotation is present
141+
on the task spec.
142+
"""
143+
ctx: dict[str, str] = {"execution_id": execution.id}
144+
cloud_provider = ((execution.task_spec or {}).get("annotations") or {}).get(
145+
_CLOUD_PROVIDER_ANNOTATION_KEY
146+
)
147+
if cloud_provider is not None:
148+
ctx["cloud_provider"] = cloud_provider
149+
return logging_context(**ctx)

cloud_pipelines_backend/orchestrator_sql.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ def internal_process_queued_executions_queue(self, session: orm.Session):
131131
self._queued_executions_queue_idle = False
132132
start_timestamp = time.monotonic_ns()
133133

134-
with contextual_logging.logging_context(execution_id=queued_execution.id):
134+
with contextual_logging.execution_logging_context(queued_execution):
135135
_logger.info("Before processing queued execution")
136136
try:
137137
self.internal_process_one_queued_execution(
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""Tests for contextual_logging.execution_logging_context."""
2+
3+
from cloud_pipelines_backend import backend_types_sql as bts
4+
from cloud_pipelines_backend.instrumentation import contextual_logging
5+
6+
_CLOUD_PROVIDER_KEY = "cloud-pipelines.net/orchestration/cloud_provider"
7+
8+
9+
def _make_execution(*, task_spec: dict | None = None) -> bts.ExecutionNode:
10+
node = bts.ExecutionNode(task_spec=task_spec or {})
11+
node.id = "test-execution-id"
12+
node.extra_data = {}
13+
return node
14+
15+
16+
class TestExecutionLoggingContext:
17+
def test_always_sets_execution_id(self):
18+
execution = _make_execution()
19+
with contextual_logging.execution_logging_context(execution):
20+
assert (
21+
contextual_logging.get_context_metadata("execution_id")
22+
== "test-execution-id"
23+
)
24+
25+
def test_no_cloud_provider_when_annotation_absent(self):
26+
execution = _make_execution(task_spec={})
27+
with contextual_logging.execution_logging_context(execution):
28+
assert contextual_logging.get_context_metadata("cloud_provider") is None
29+
30+
def test_sets_cloud_provider_from_annotation(self):
31+
execution = _make_execution(
32+
task_spec={"annotations": {_CLOUD_PROVIDER_KEY: "nebius"}}
33+
)
34+
with contextual_logging.execution_logging_context(execution):
35+
assert contextual_logging.get_context_metadata("cloud_provider") == "nebius"
36+
37+
def test_no_cloud_provider_when_task_spec_is_none(self):
38+
execution = _make_execution(task_spec=None)
39+
with contextual_logging.execution_logging_context(execution):
40+
assert contextual_logging.get_context_metadata("cloud_provider") is None
41+
42+
def test_no_cloud_provider_when_annotations_is_none(self):
43+
execution = _make_execution(task_spec={"annotations": None})
44+
with contextual_logging.execution_logging_context(execution):
45+
assert contextual_logging.get_context_metadata("cloud_provider") is None
46+
47+
def test_context_is_cleared_after_block(self):
48+
execution = _make_execution(
49+
task_spec={"annotations": {_CLOUD_PROVIDER_KEY: "gcp"}}
50+
)
51+
with contextual_logging.execution_logging_context(execution):
52+
pass
53+
assert contextual_logging.get_context_metadata("execution_id") is None
54+
assert contextual_logging.get_context_metadata("cloud_provider") is None

0 commit comments

Comments
 (0)