Skip to content

Commit 198bb99

Browse files
authored
Merge pull request #437 from PolicyEngine/pr/observability-contracts-and-telemetry-v2
Add observability scaffolding and telemetry propagation to the simulation gateway
2 parents 864a41a + 16b415d commit 198bb99

36 files changed

Lines changed: 983 additions & 124 deletions
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Fixture modules for policyengine-fastapi tests."""
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Fixture modules for ping tests."""

libs/policyengine-fastapi/tests/ping/conftest.py renamed to libs/policyengine-fastapi/fixtures/ping/shared.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
"""Shared fixtures for ping endpoint tests."""
2+
3+
from fastapi import FastAPI
14
from fastapi.testclient import TestClient
25
import pytest
36

4-
from fastapi import FastAPI
57
from policyengine_fastapi import ping
68
from policyengine_fastapi.health import HealthRegistry
79

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
11
"""
22
Re-usable elements that support FastAPI and SQLModel indipendent of this specific app.
33
"""
4+
5+
from .observability import (
6+
ObservabilityConfig as ObservabilityConfig,
7+
SimulationStage as SimulationStage,
8+
TracerCaptureMode as TracerCaptureMode,
9+
build_observability as build_observability,
10+
get_observability as get_observability,
11+
)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""
2+
Shared observability contracts and utilities for API v2 services.
3+
4+
This stays in `policyengine-fastapi` for now because only one active service
5+
needs it and the main value today is shared contract stability, not a separate
6+
distribution boundary. Extract it to a dedicated package only when a second
7+
service has materially different runtime or release needs.
8+
"""
9+
10+
from .config import (
11+
ObservabilityConfig as ObservabilityConfig,
12+
parse_header_value_pairs as parse_header_value_pairs,
13+
)
14+
from .contracts import (
15+
SimulationCompositeTraceResponse as SimulationCompositeTraceResponse,
16+
SimulationLifecycleEvent as SimulationLifecycleEvent,
17+
SimulationRunSummary as SimulationRunSummary,
18+
SimulationTelemetryEnvelope as SimulationTelemetryEnvelope,
19+
SimulationTimelineEntry as SimulationTimelineEntry,
20+
TracerArtifactManifest as TracerArtifactManifest,
21+
VersionStageMetricResponse as VersionStageMetricResponse,
22+
)
23+
from .correlation import (
24+
generate_run_id as generate_run_id,
25+
stable_config_hash as stable_config_hash,
26+
)
27+
from .emitters import (
28+
Observability as Observability,
29+
NoOpObservability as NoOpObservability,
30+
NoOpSpan as NoOpSpan,
31+
)
32+
from .provider import (
33+
build_observability as build_observability,
34+
get_observability as get_observability,
35+
)
36+
from .stages import (
37+
SimulationStage as SimulationStage,
38+
TracerCaptureMode as TracerCaptureMode,
39+
)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass, field
4+
5+
from .stages import TracerCaptureMode
6+
7+
8+
def parse_header_value_pairs(raw: str | None) -> dict[str, str]:
9+
"""Parse OTLP headers from a comma or newline separated key=value string."""
10+
11+
if raw is None:
12+
return {}
13+
14+
stripped = raw.strip()
15+
if not stripped:
16+
return {}
17+
18+
headers: dict[str, str] = {}
19+
for pair in stripped.replace("\n", ",").split(","):
20+
candidate = pair.strip()
21+
if not candidate:
22+
continue
23+
key, separator, value = candidate.partition("=")
24+
if not separator:
25+
raise ValueError(
26+
"Expected OTLP headers in key=value format separated by commas"
27+
)
28+
headers[key.strip()] = value.strip()
29+
30+
return headers
31+
32+
33+
@dataclass(frozen=True)
34+
class ObservabilityConfig:
35+
enabled: bool = False
36+
shadow_mode: bool = True
37+
service_name: str = "policyengine-observability"
38+
environment: str = "production"
39+
otlp_endpoint: str | None = None
40+
otlp_headers: dict[str, str] = field(default_factory=dict)
41+
artifact_bucket: str | None = None
42+
artifact_prefix: str = "simulation-observability"
43+
tracer_capture_mode: TracerCaptureMode = TracerCaptureMode.DISABLED
44+
slow_run_threshold_seconds: float = 30.0
45+
46+
@classmethod
47+
def disabled(cls, service_name: str = "policyengine-observability"):
48+
return cls(enabled=False, service_name=service_name)
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
from __future__ import annotations
2+
3+
from datetime import datetime
4+
from typing import Any
5+
6+
from pydantic import BaseModel, ConfigDict, Field
7+
8+
from .stages import SimulationStage, TracerCaptureMode
9+
10+
11+
class ObservabilityModel(BaseModel):
12+
model_config = ConfigDict(extra="forbid", use_enum_values=True)
13+
14+
15+
class CorrelatedRunFields(ObservabilityModel):
16+
run_id: str
17+
process_id: str | None = None
18+
job_id: str | None = None
19+
trace_id: str | None = None
20+
request_id: str | None = None
21+
country: str | None = None
22+
simulation_kind: str | None = None
23+
geography_code: str | None = None
24+
geography_type: str | None = None
25+
country_package_name: str | None = None
26+
country_package_version: str | None = None
27+
policyengine_version: str | None = None
28+
data_version: str | None = None
29+
modal_app_name: str | None = None
30+
config_hash: str | None = None
31+
32+
33+
class SimulationLifecycleEvent(CorrelatedRunFields):
34+
event_name: str
35+
stage: SimulationStage
36+
status: str
37+
timestamp: datetime
38+
service: str
39+
duration_seconds: float | None = None
40+
details: dict[str, Any] = Field(default_factory=dict)
41+
42+
43+
class TracerArtifactManifest(CorrelatedRunFields):
44+
scenario: str
45+
capture_mode: TracerCaptureMode
46+
artifact_format: str
47+
storage_uri: str
48+
summary_uri: str | None = None
49+
node_count: int = 0
50+
root_count: int = 0
51+
max_depth: int = 0
52+
total_calculation_time_seconds: float = 0.0
53+
total_formula_time_seconds: float = 0.0
54+
generated_at: datetime
55+
56+
57+
class SimulationTelemetryEnvelope(ObservabilityModel):
58+
run_id: str
59+
process_id: str | None = None
60+
request_id: str | None = None
61+
traceparent: str | None = None
62+
requested_at: datetime | None = None
63+
simulation_kind: str | None = None
64+
geography_code: str | None = None
65+
geography_type: str | None = None
66+
config_hash: str | None = None
67+
capture_mode: TracerCaptureMode = TracerCaptureMode.DISABLED
68+
69+
70+
class SimulationRunSummary(CorrelatedRunFields):
71+
status: str
72+
requested_at: datetime | None = None
73+
returned_at: datetime | None = None
74+
total_duration_seconds: float | None = None
75+
76+
77+
class SimulationTimelineEntry(ObservabilityModel):
78+
stage: SimulationStage
79+
started_at: datetime
80+
ended_at: datetime
81+
duration_seconds: float
82+
service: str
83+
84+
85+
class SimulationCompositeTraceResponse(ObservabilityModel):
86+
run: SimulationRunSummary
87+
timeline: list[SimulationTimelineEntry] = Field(default_factory=list)
88+
spans: dict[str, Any] = Field(default_factory=dict)
89+
logs: dict[str, Any] = Field(default_factory=dict)
90+
tracer: dict[str, Any] = Field(default_factory=dict)
91+
92+
93+
class StageRuntimeSummary(ObservabilityModel):
94+
mean_seconds: float | None = None
95+
p50_seconds: float | None = None
96+
p95_seconds: float | None = None
97+
98+
99+
class VersionStageMetrics(ObservabilityModel):
100+
country_package_version: str
101+
launch_source: str = "modal_version_registry"
102+
launched: bool = True
103+
observed_run_count: int = 0
104+
stages: dict[str, StageRuntimeSummary] = Field(default_factory=dict)
105+
106+
107+
class VersionStageMetricResponse(ObservabilityModel):
108+
country: str
109+
window: dict[str, datetime]
110+
versions: list[VersionStageMetrics] = Field(default_factory=list)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from __future__ import annotations
2+
3+
from hashlib import sha256
4+
import json
5+
from typing import Any
6+
from uuid import uuid4
7+
8+
9+
def generate_run_id() -> str:
10+
return str(uuid4())
11+
12+
13+
def stable_config_hash(payload: Any) -> str:
14+
serialised = json.dumps(
15+
payload,
16+
sort_keys=True,
17+
separators=(",", ":"),
18+
default=str,
19+
)
20+
return f"sha256:{sha256(serialised.encode()).hexdigest()}"
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from __future__ import annotations
2+
3+
from contextlib import AbstractContextManager
4+
from dataclasses import dataclass, field
5+
from typing import Protocol
6+
from typing import Any, Mapping
7+
8+
from .config import ObservabilityConfig
9+
from .contracts import SimulationLifecycleEvent, TracerArtifactManifest
10+
11+
12+
class NoOpSpan(AbstractContextManager["NoOpSpan"]):
13+
def __enter__(self):
14+
return self
15+
16+
def __exit__(self, exc_type, exc_value, traceback):
17+
return None
18+
19+
def set_attribute(self, key: str, value: Any) -> None:
20+
return None
21+
22+
def add_event(self, name: str, attributes: Mapping[str, Any] | None = None) -> None:
23+
return None
24+
25+
26+
class Observability(Protocol):
27+
config: ObservabilityConfig
28+
29+
def emit_lifecycle_event(self, event: SimulationLifecycleEvent) -> None: ...
30+
31+
def emit_counter(
32+
self,
33+
name: str,
34+
value: int = 1,
35+
attributes: Mapping[str, str] | None = None,
36+
) -> None: ...
37+
38+
def emit_histogram(
39+
self,
40+
name: str,
41+
value: float,
42+
attributes: Mapping[str, str] | None = None,
43+
) -> None: ...
44+
45+
def record_artifact_manifest(self, manifest: TracerArtifactManifest) -> None: ...
46+
47+
def span(
48+
self, name: str, attributes: Mapping[str, Any] | None = None
49+
) -> NoOpSpan: ...
50+
51+
def flush(self) -> None: ...
52+
53+
54+
@dataclass
55+
class NoOpObservability:
56+
config: ObservabilityConfig = field(default_factory=ObservabilityConfig.disabled)
57+
58+
def emit_lifecycle_event(self, event: SimulationLifecycleEvent) -> None:
59+
return None
60+
61+
def emit_counter(
62+
self,
63+
name: str,
64+
value: int = 1,
65+
attributes: Mapping[str, str] | None = None,
66+
) -> None:
67+
return None
68+
69+
def emit_histogram(
70+
self,
71+
name: str,
72+
value: float,
73+
attributes: Mapping[str, str] | None = None,
74+
) -> None:
75+
return None
76+
77+
def record_artifact_manifest(self, manifest: TracerArtifactManifest) -> None:
78+
return None
79+
80+
def span(self, name: str, attributes: Mapping[str, Any] | None = None) -> NoOpSpan:
81+
return NoOpSpan()
82+
83+
def flush(self) -> None:
84+
return None
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from __future__ import annotations
2+
3+
from .config import ObservabilityConfig
4+
from .emitters import NoOpObservability, Observability
5+
6+
7+
def build_observability(
8+
config: ObservabilityConfig | None = None,
9+
) -> Observability:
10+
"""Central construction point for observability emitters.
11+
12+
Commit 1 intentionally returns a no-op implementation even when enabled.
13+
Later commits can swap in a real backend here without changing callers.
14+
"""
15+
16+
if config is None:
17+
config = ObservabilityConfig.disabled()
18+
19+
return NoOpObservability(config=config)
20+
21+
22+
def get_observability(
23+
config: ObservabilityConfig | None = None,
24+
) -> Observability:
25+
return build_observability(config)

0 commit comments

Comments
 (0)