Skip to content

Commit 1a75ee2

Browse files
authored
Merge pull request #1029 from PolicyEngine/codex/pipeline-run-status-index
Add structured pipeline run index endpoint
2 parents c5a0c0e + f227934 commit 1a75ee2

9 files changed

Lines changed: 442 additions & 10 deletions

File tree

changelog.d/1028.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add structured pipeline run index endpoints for dashboard status discovery.

changelog.d/1028.changed

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Update the pinned PolicyEngine US dependency to 1.696.0.

docs/engineering/skills/pipeline_operations.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,16 @@ The status system reports:
2020
## Status Surfaces
2121

2222
The structured status payload is canonical. The pipeline status sub-app exposes
23-
three Modal functions:
23+
run-level and run-index Modal functions:
2424

2525
- `get_pipeline_status`: Python-callable structured JSON for agents, scripts,
2626
dashboards, and tests. Prefer this for diagnosis and automation.
2727
- `pipeline_status_endpoint`: protected HTTP endpoint returning the same
2828
structured JSON for non-Python clients. Use Modal proxy auth headers.
29+
- `list_pipeline_runs`: Python-callable structured JSON index of recent runs.
30+
Use this for dashboards that need to discover candidate run IDs.
31+
- `pipeline_runs_endpoint`: protected HTTP endpoint returning the same
32+
structured recent-run index for non-Python clients.
2933
- `pipeline_status_snippet`: human-readable text used by
3034
`modal run modal_app/pipeline.py::main --action status`. This is for quick
3135
terminal inspection only and must not be treated as a schema.

modal_app/pipeline_status.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717
from modal_app.images import cpu_image as image # noqa: E402
1818
from modal_app.step_manifests.state import PIPELINE_MOUNT, RUNS_DIR # noqa: E402
19-
from modal_app.step_manifests.status import build_pipeline_status_payload # noqa: E402
19+
from modal_app.step_manifests.status import ( # noqa: E402
20+
build_pipeline_runs_payload,
21+
build_pipeline_status_payload,
22+
)
2023

2124
app = modal.App(
2225
os.environ.get("US_DATA_PIPELINE_STATUS_APP_NAME")
@@ -46,6 +49,22 @@ def get_pipeline_status(
4649
return build_pipeline_status_payload(run_id)
4750

4851

52+
@app.function(
53+
image=image,
54+
timeout=60,
55+
volumes={PIPELINE_MOUNT: pipeline_volume},
56+
)
57+
def list_pipeline_runs(
58+
limit: int = 25,
59+
status: str = "",
60+
branch: str = "",
61+
) -> dict:
62+
"""Get a structured index of recent pipeline runs."""
63+
64+
pipeline_volume.reload()
65+
return build_pipeline_runs_payload(limit=limit, status=status, branch=branch)
66+
67+
4968
@app.function(
5069
image=status_image,
5170
timeout=60,
@@ -65,6 +84,27 @@ def pipeline_status_endpoint(
6584
return build_pipeline_status_payload(run_id)
6685

6786

87+
@app.function(
88+
image=status_image,
89+
timeout=60,
90+
volumes={PIPELINE_MOUNT: pipeline_volume},
91+
)
92+
@modal.fastapi_endpoint(
93+
method="GET",
94+
docs=False,
95+
requires_proxy_auth=True,
96+
)
97+
def pipeline_runs_endpoint(
98+
limit: int = 25,
99+
status: str = "",
100+
branch: str = "",
101+
) -> dict:
102+
"""Protected HTTP endpoint for a structured pipeline run index."""
103+
104+
pipeline_volume.reload()
105+
return build_pipeline_runs_payload(limit=limit, status=status, branch=branch)
106+
107+
68108
@app.function(
69109
image=image,
70110
timeout=60,

modal_app/step_manifests/status.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
from modal_app.step_manifests.specs import RUN_MANIFEST_STEP_IDS, step_title
2828

2929
PIPELINE_STATUS_SCHEMA_VERSION = "1"
30+
DEFAULT_RUNS_LIMIT = 25
31+
MAX_RUNS_LIMIT = 100
3032

3133

3234
def _run_dir(run_id: str, runs_dir: str | Path | None = None) -> Path:
@@ -149,6 +151,176 @@ def _manifest_payload(manifest) -> dict[str, Any]:
149151
}
150152

151153

154+
def _bounded_limit(limit: int | str | None) -> int:
155+
try:
156+
parsed = int(limit if limit is not None else DEFAULT_RUNS_LIMIT)
157+
except (TypeError, ValueError):
158+
parsed = DEFAULT_RUNS_LIMIT
159+
return max(0, min(parsed, MAX_RUNS_LIMIT))
160+
161+
162+
def _index_error_payload(error: dict[str, Any] | None) -> dict[str, Any] | None:
163+
if error is None:
164+
return None
165+
allowed = (
166+
"stage_id",
167+
"substage_id",
168+
"surface",
169+
"error_type",
170+
"message",
171+
"message_truncated",
172+
"record_path",
173+
"latest_path",
174+
"traceback_available",
175+
)
176+
return {key: error[key] for key in allowed if key in error}
177+
178+
179+
def _latest_manifest_payload(
180+
stage_manifests: list[dict[str, Any]],
181+
) -> dict[str, Any] | None:
182+
if not stage_manifests:
183+
return None
184+
item = stage_manifests[-1]
185+
manifest = item["manifest"]
186+
return {
187+
"step_id": item["step_id"],
188+
"stage_id": item["stage_id"],
189+
"substage_id": item["substage_id"],
190+
"title": item["title"],
191+
"status": item["status"],
192+
"started_at": manifest.get("started_at"),
193+
"completed_at": manifest.get("completed_at"),
194+
"duration_s": manifest.get("duration_s"),
195+
"reuse_decision": manifest.get("reuse_decision", "not_applicable"),
196+
}
197+
198+
199+
def _run_index_item(
200+
run_id: str,
201+
*,
202+
runs_dir: str | Path | None = None,
203+
) -> dict[str, Any]:
204+
payload = build_pipeline_status_payload(run_id, runs_dir=runs_dir)
205+
run_manifest = payload.get("run_manifest") or {}
206+
stage_manifests = payload.get("stage_manifests") or []
207+
missing = payload.get("missing_expected_manifest_ids") or []
208+
expected = list(run_manifest.get("known_step_ids") or RUN_MANIFEST_STEP_IDS)
209+
return {
210+
"run_id": payload["run_id"],
211+
"status": payload["status"],
212+
"message": payload["message"],
213+
"branch": run_manifest.get("branch"),
214+
"sha": run_manifest.get("sha"),
215+
"candidate_version": run_manifest.get("candidate_version"),
216+
"release_version": run_manifest.get("release_version"),
217+
"started_at": run_manifest.get("started_at"),
218+
"updated_at": payload.get("updated_at"),
219+
"completed_at": run_manifest.get("completed_at"),
220+
"modal_app_name": payload.get("modal_app_name"),
221+
"modal_environment": payload.get("modal_environment"),
222+
"hf_staging_prefix": run_manifest.get("hf_staging_prefix"),
223+
"github_run_url": (run_manifest.get("run_context") or {}).get("github_run_url"),
224+
"latest_manifest": _latest_manifest_payload(stage_manifests),
225+
"progress": {
226+
"expected_manifests": len(expected),
227+
"present_manifests": len(stage_manifests),
228+
"missing_manifests": len(missing),
229+
},
230+
"error": _index_error_payload(payload.get("error")),
231+
}
232+
233+
234+
def _unreadable_run_index_item(run_id: str, exc: BaseException) -> dict[str, Any]:
235+
message = redacted_bounded_error_text(
236+
f"{type(exc).__name__}: {exc}",
237+
max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS,
238+
).text
239+
return {
240+
"run_id": run_id,
241+
"status": "unreadable",
242+
"message": message,
243+
"branch": None,
244+
"sha": None,
245+
"candidate_version": None,
246+
"release_version": None,
247+
"started_at": None,
248+
"updated_at": None,
249+
"completed_at": None,
250+
"modal_app_name": None,
251+
"modal_environment": None,
252+
"hf_staging_prefix": None,
253+
"github_run_url": None,
254+
"latest_manifest": None,
255+
"progress": {
256+
"expected_manifests": 0,
257+
"present_manifests": 0,
258+
"missing_manifests": 0,
259+
},
260+
"error": {
261+
"error_type": type(exc).__name__,
262+
"message": message,
263+
"traceback_available": False,
264+
},
265+
}
266+
267+
268+
def _run_sort_key(item: dict[str, Any]) -> tuple[str, str]:
269+
return (
270+
str(item.get("updated_at") or item.get("started_at") or ""),
271+
str(item.get("run_id") or ""),
272+
)
273+
274+
275+
def build_pipeline_runs_payload(
276+
*,
277+
limit: int | str | None = DEFAULT_RUNS_LIMIT,
278+
status: str = "",
279+
branch: str = "",
280+
runs_dir: str | Path | None = None,
281+
) -> dict[str, Any]:
282+
"""Build a JSON-serializable index of recent pipeline runs."""
283+
284+
bounded_limit = _bounded_limit(limit)
285+
root = Path(runs_dir) if runs_dir is not None else Path(pipeline_state.RUNS_DIR)
286+
filters = {"status": status or "", "branch": branch or ""}
287+
if not root.exists():
288+
return {
289+
"schema_version": PIPELINE_STATUS_SCHEMA_VERSION,
290+
"count": 0,
291+
"limit": bounded_limit,
292+
"filters": filters,
293+
"runs": [],
294+
}
295+
296+
items = []
297+
for entry in root.iterdir():
298+
if not entry.is_dir():
299+
continue
300+
manifest_path = run_manifest_path(entry)
301+
if not manifest_path.exists():
302+
continue
303+
try:
304+
item = _run_index_item(entry.name, runs_dir=root)
305+
except Exception as exc:
306+
item = _unreadable_run_index_item(entry.name, exc)
307+
if filters["status"] and item.get("status") != filters["status"]:
308+
continue
309+
if filters["branch"] and item.get("branch") != filters["branch"]:
310+
continue
311+
items.append(item)
312+
313+
items.sort(key=_run_sort_key, reverse=True)
314+
runs = items[:bounded_limit]
315+
return {
316+
"schema_version": PIPELINE_STATUS_SCHEMA_VERSION,
317+
"count": len(runs),
318+
"limit": bounded_limit,
319+
"filters": filters,
320+
"runs": runs,
321+
}
322+
323+
152324
def build_pipeline_status_payload(
153325
run_id: str,
154326
*,

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ classifiers = [
2222
"Programming Language :: Python :: 3.14",
2323
]
2424
dependencies = [
25-
"policyengine-us==1.694.0",
25+
"policyengine-us==1.696.0",
2626
# policyengine-core 3.26.1 is the current 3.26.x runtime and includes the fix for
2727
# PolicyEngine/policyengine-core#482 (user-set ETERNITY inputs lost
2828
# after _invalidate_all_caches) and is required by policyengine-us 1.682.1+.

tests/integration/test_modal_pipeline_seams.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,23 @@ def test_pipeline_status_callable_reports_missing_run():
128128
assert result["stage_manifests"] == []
129129

130130

131+
def test_pipeline_runs_callable_reports_structured_index():
132+
_require_modal_tokens()
133+
134+
fn = modal.Function.from_name(
135+
APP_NAME,
136+
"list_pipeline_runs",
137+
environment_name=MODAL_ENVIRONMENT,
138+
)
139+
result = fn.remote(limit=1)
140+
141+
assert result["schema_version"] == "1"
142+
assert result["limit"] == 1
143+
assert result["count"] <= 1
144+
assert isinstance(result["runs"], list)
145+
assert result["filters"] == {"status": "", "branch": ""}
146+
147+
131148
def test_pipeline_status_http_endpoint_reports_missing_run():
132149
_require_modal_tokens()
133150
headers = _modal_proxy_auth_headers()
@@ -155,6 +172,34 @@ def test_pipeline_status_http_endpoint_reports_missing_run():
155172
assert result["error"] is None
156173

157174

175+
def test_pipeline_runs_http_endpoint_reports_structured_index():
176+
_require_modal_tokens()
177+
headers = _modal_proxy_auth_headers()
178+
179+
fn = modal.Function.from_name(
180+
APP_NAME,
181+
"pipeline_runs_endpoint",
182+
environment_name=MODAL_ENVIRONMENT,
183+
)
184+
endpoint = fn.get_web_url()
185+
assert endpoint
186+
187+
response = requests.get(
188+
endpoint,
189+
params={"limit": "1"},
190+
headers=headers,
191+
timeout=30,
192+
)
193+
194+
assert response.status_code == 200, response.text[:500]
195+
result = response.json()
196+
assert result["schema_version"] == "1"
197+
assert result["limit"] == 1
198+
assert result["count"] <= 1
199+
assert isinstance(result["runs"], list)
200+
assert result["filters"] == {"status": "", "branch": ""}
201+
202+
158203
def test_pipeline_status_cli_snippet_reports_missing_run():
159204
_require_modal_tokens()
160205

0 commit comments

Comments
 (0)