Skip to content

Commit db036dc

Browse files
HadhemiDDAAraKKeclaude
authored
[Dispatcher] Task Test Runner (DataDog#23518)
* Create dispatcher task test runner (squashed) * lint * cleanup * address requests from calude agent review * fix changelog * Restructure github_async into a lazy-loaded package (DataDog#23685) Splits the single-file `ddev/utils/github_async.py` into a package with the client and HTTP-shape primitives in `client.py` and one model per submodule under `models/`. Both `__init__.py` files use PEP 562 module-level `__getattr__` so importing one symbol only loads the submodule that defines it -- in particular `from ddev.utils.github_async.models import PullRequest` does not pull in the workflow or comment models. Adds two new endpoints used by upcoming work: - `AsyncGitHubClient.create_pull_request(owner, repo, title, head, base, body, draft)` - `AsyncGitHubClient.add_labels_to_issue(owner, repo, issue_number, labels)` Expands the `PullRequest` model with the typical fields callers need (id, state, draft, title, body, user, assignees, requested_reviewers, labels, created_at/updated_at/closed_at/merged_at, head, base) plus three small sub-models (`GitHubUser`, `Label`, `PullRequestRef`). Only `number` and `html_url` are required; the rest default to None/[] so partial payloads parse fine and `extra='ignore'` keeps the schema forward-compatible. Adds `FakeAsyncGitHubClient` and the `fake_async_github` pytest fixture in `tests/helpers/github_async.py`. The fake records every call and offers `mock_response(method, response, /, *, once=False, **match_kwargs)` for stubbing replies. Responses can be `BaseException` instances (raised), `GitHubResponse` instances (passed through), or inner data (auto-wrapped). `once=True` adds to a per-method FIFO queue so tests can model retry sequences. Sticky mocks (no `once`) match the most-recent registration. `assert_called_with` / `assert_called_once_with` perform strict-equality checks on kwargs; `assert_all_responses_consumed()` asserts the one-shot queue was drained. * update with async client changes * add return_run_details * restore get_pull_request * Emit a failed BatchFinished when a test workflow times out Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * remove retry * download retry * lint * lint * Remove Jira reference from changelog * Retry artifact downloads with exponential backoff Wrap download_artifact in a stamina retry_context so the whole operation (redirect resolution + download + extract) is retried on transient network failures, retryable server responses (429/5xx), and corrupt/truncated zips. Retrying the full operation re-resolves a fresh signed URL each attempt, avoiding stale-URL 403s. 404/403 and zip-slip errors are not retried. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * request change * remove retry * fix tests --------- Co-authored-by: Juanpe Araque <juanpedro.araque@datadoghq.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent bab3889 commit db036dc

11 files changed

Lines changed: 1381 additions & 13 deletions

File tree

ddev/changelog.d/23518.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add TaskTestRunner processor for the CI dispatcher.
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
# (C) Datadog, Inc. 2026-present
2+
# All rights reserved
3+
# Licensed under a 3-clause BSD style license (see LICENSE)
4+
from __future__ import annotations
5+
6+
import asyncio
7+
import dataclasses
8+
import json
9+
import logging
10+
from dataclasses import dataclass
11+
from pathlib import Path
12+
from typing import Any, Literal
13+
14+
from ddev.cli.ci.tests.messages import BatchFinished, TestBatch
15+
from ddev.event_bus.orchestrator import AsyncProcessor
16+
from ddev.utils.github_async import AsyncGitHubClient, GitHubResponse
17+
from ddev.utils.github_async.models import WorkflowRun
18+
19+
20+
def _conclusion_to_status(conclusion: str | None) -> Literal["success", "failure", "skipped"]:
21+
"""Map a GitHub Actions conclusion string to a BatchFinished status.
22+
23+
Note: ``None`` maps to ``"failure"`` here while the check run reports ``"neutral"``
24+
for the same input. The asymmetry is intentional — BatchFinished consumers want a
25+
binary outcome, the check UI prefers an explicit ``"neutral"`` badge.
26+
"""
27+
if conclusion == "success":
28+
return "success"
29+
if conclusion == "skipped":
30+
return "skipped"
31+
return "failure"
32+
33+
34+
@dataclass(frozen=True)
35+
class TestRunnerOptions:
36+
"""Configuration for a ``TaskTestRunner``."""
37+
38+
owner: str
39+
repo: str
40+
workflow_id: str | int
41+
ref: str
42+
base_sha: str
43+
checkout_sha: str
44+
artifacts_base_path: Path
45+
poll_interval_seconds: float = 30.0
46+
47+
48+
class TaskTestRunner(AsyncProcessor[TestBatch]):
49+
"""
50+
Runs one ``test-batch.yaml`` workflow for a ``TestBatch``: dispatches the run,
51+
opens a check run, polls until the workflow completes, downloads its artifacts,
52+
and emits a ``BatchFinished``.
53+
"""
54+
55+
def __init__(self, name: str, client: AsyncGitHubClient, options: TestRunnerOptions) -> None:
56+
super().__init__(name)
57+
self._client = client
58+
self._options = options
59+
self._logger = logging.getLogger(f"{__name__}.{name}")
60+
61+
async def process_message(self, message: TestBatch) -> None:
62+
inputs = self._build_inputs(message)
63+
log_extra: dict[str, Any] = {"batch_id": message.id}
64+
65+
dispatch = await self._client.create_workflow_dispatch(
66+
self._options.owner, self._options.repo, self._options.workflow_id, ref=self._options.ref, inputs=inputs
67+
)
68+
run_id = dispatch.data.workflow_run_id
69+
log_extra["run_id"] = run_id
70+
self._logger.info("Dispatched batch", extra=log_extra)
71+
72+
run = await self._client.get_workflow_run(self._options.owner, self._options.repo, run_id)
73+
workflow_url = run.data.html_url
74+
log_extra["workflow_url"] = workflow_url
75+
76+
check = await self._client.create_check_run(
77+
self._options.owner,
78+
self._options.repo,
79+
name=f"test-batch/{message.id}",
80+
head_sha=self._options.base_sha,
81+
status="in_progress",
82+
details_url=workflow_url,
83+
)
84+
check_run_id = check.data.id
85+
log_extra["check_run_id"] = check_run_id
86+
self._logger.info("Check run created", extra=log_extra)
87+
88+
final_conclusion: str = "cancelled"
89+
finished: BatchFinished | None = None
90+
try:
91+
if run.data.status != "completed":
92+
run = await self._poll_until_complete(run_id, log_extra)
93+
else:
94+
self._logger.info("Workflow completed", extra=log_extra)
95+
96+
raw = run.data.conclusion
97+
if raw is None:
98+
self._logger.warning("Workflow completed with null conclusion", extra=log_extra)
99+
final_conclusion = raw or "neutral"
100+
101+
artifacts_path = await self._download_artifacts(run_id, log_extra)
102+
self._logger.info("Artifacts downloaded", extra=log_extra)
103+
104+
finished = BatchFinished(
105+
id=message.id,
106+
status=_conclusion_to_status(raw),
107+
run_id=run_id,
108+
workflow_url=workflow_url,
109+
artifacts_path=str(artifacts_path),
110+
)
111+
finally:
112+
try:
113+
await self._client.update_check_run(
114+
self._options.owner,
115+
self._options.repo,
116+
check_run_id,
117+
status="completed",
118+
conclusion=final_conclusion,
119+
details_url=workflow_url,
120+
)
121+
self._logger.info("Check run closed", extra={**log_extra, "conclusion": final_conclusion})
122+
except Exception:
123+
self._logger.exception("Failed to close check run", extra={**log_extra, "conclusion": final_conclusion})
124+
125+
if finished is not None:
126+
self.submit_message(finished)
127+
self._logger.info("BatchFinished emitted", extra=log_extra)
128+
129+
async def _poll_until_complete(self, run_id: int, log_extra: dict[str, Any]) -> GitHubResponse[WorkflowRun]:
130+
while True:
131+
await asyncio.sleep(self._options.poll_interval_seconds)
132+
run = await self._client.get_workflow_run(self._options.owner, self._options.repo, run_id)
133+
if run.data.status == "completed":
134+
self._logger.info("Workflow completed", extra=log_extra)
135+
return run
136+
137+
def _build_inputs(self, message: TestBatch) -> dict[str, str]:
138+
return {
139+
"batch_id": message.id,
140+
"checkout_sha": self._options.checkout_sha,
141+
"integrations": json.dumps(message.integrations),
142+
"job_list": json.dumps([dataclasses.asdict(job) for job in message.job_list]),
143+
}
144+
145+
async def _download_artifacts(self, run_id: int, log_extra: dict[str, Any]) -> Path:
146+
run_path = self._options.artifacts_base_path / str(run_id)
147+
failures: list[tuple[int, str]] = []
148+
try:
149+
async for page in self._client.list_workflow_run_artifacts(self._options.owner, self._options.repo, run_id):
150+
for artifact in page.data.artifacts:
151+
if artifact.expired:
152+
self._logger.info(
153+
"Skipping expired artifact %s (%s)",
154+
artifact.id,
155+
artifact.name,
156+
extra=log_extra,
157+
)
158+
continue
159+
if not artifact.archive_download_url:
160+
self._logger.info(
161+
"Skipping artifact %s (%s) without download URL",
162+
artifact.id,
163+
artifact.name,
164+
extra=log_extra,
165+
)
166+
continue
167+
target = run_path / f"{artifact.id}-{artifact.name}"
168+
try:
169+
await self._client.download_artifact(artifact.archive_download_url, target)
170+
self._logger.info("Downloaded artifact %s -> %s", artifact.id, target, extra=log_extra)
171+
except Exception as exc:
172+
self._logger.warning(
173+
"Failed to download artifact %s (%s): %s",
174+
artifact.id,
175+
artifact.name,
176+
exc,
177+
extra=log_extra,
178+
)
179+
failures.append((artifact.id, artifact.name))
180+
except Exception:
181+
self._logger.warning("Failed to list workflow run artifacts", extra=log_extra, exc_info=True)
182+
if failures:
183+
self._logger.warning(
184+
"Artifact download had %s failures: %s",
185+
len(failures),
186+
failures,
187+
extra=log_extra,
188+
)
189+
return run_path

0 commit comments

Comments
 (0)