Skip to content

Commit 28b031d

Browse files
authored
Automated flaky test remediation (open-telemetry#18494)
1 parent 9c60ea4 commit 28b031d

7 files changed

Lines changed: 938 additions & 0 deletions

File tree

Lines changed: 375 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,375 @@
1+
#!/usr/bin/env python3
2+
"""Identify the most-flaky JUnit test from Develocity over a recent window.
3+
4+
Uses the dashboard's internal data endpoints (the same ones the SPA fetches)
5+
since the public ``/api/builds`` endpoint requires an access key with the
6+
"Access build data via the API" scope, which the org's
7+
``DEVELOCITY_ACCESS_KEY`` does not have. The dashboard endpoints are
8+
unauthenticated and return JSON.
9+
10+
Reads the skip list from ``build/flaky-test-remediation/skip.txt`` (one Develocity
11+
test container/class name per line) and writes ``build/flaky-test-remediation/selected.json``.
12+
"""
13+
14+
import json
15+
import os
16+
import subprocess
17+
import sys
18+
import time
19+
from pathlib import Path
20+
from urllib.parse import urlencode
21+
from urllib.request import Request, urlopen
22+
23+
from _paths import OUT_DIR, SELECTED, SKIP
24+
25+
DEFAULT_DEVELOCITY_URL = "https://develocity.opentelemetry.io"
26+
PROJECT_NAME = "opentelemetry-java-instrumentation"
27+
WORKSPACE_ROOT = Path(subprocess.check_output(
28+
["git", "rev-parse", "--show-toplevel"], text=True).strip())
29+
SOURCE_EXTS = (".java", ".groovy", ".kt")
30+
31+
WINDOW_DAYS = 7
32+
MIN_FLAKY = 5
33+
FLAKY_OUTCOMES = ("flaky", "failed")
34+
MAX_HISTORY_SPLIT_DEPTH = 12
35+
MIN_HISTORY_WINDOW_MS = 60 * 1000
36+
37+
38+
def http_get_json(url, *, timeout=60):
39+
headers = {
40+
"Accept": "application/json",
41+
"User-Agent": "otel-flaky-test-remediation/1.0",
42+
}
43+
with urlopen(Request(url, headers=headers), timeout=timeout) as resp:
44+
return json.loads(resp.read().decode("utf-8"))
45+
46+
47+
def common_query(*, since_ms, until_ms):
48+
return {
49+
"rootProjectNames": PROJECT_NAME,
50+
"startTimeMin": str(since_ms),
51+
"startTimeMax": str(until_ms),
52+
"timeZoneId": "UTC",
53+
}
54+
55+
56+
def total_flaky(test):
57+
"""Sum the per-bucket flaky counts in an outcomeTrend entry."""
58+
points = (test.get("outcomeTrend") or {}).get("dataPoints") or []
59+
return sum(int((p.get("outcomeDistribution") or {}).get("flaky") or 0)
60+
for p in points)
61+
62+
63+
def fetch_top_tests(base, *, since_ms, until_ms):
64+
params = common_query(since_ms=since_ms, until_ms=until_ms)
65+
params["sortField"] = "FLAKY"
66+
payload = http_get_json(f"{base}/tests-data/top?{urlencode(params)}")
67+
return ((payload.get("data") or {}).get("topTests") or {}).get("tests") or []
68+
69+
70+
def fetch_container_methods(base, *, container, since_ms, until_ms):
71+
params = common_query(since_ms=since_ms, until_ms=until_ms)
72+
params["container"] = container
73+
params["sortField"] = "FLAKY"
74+
payload = http_get_json(
75+
f"{base}/tests-data/test-container-history?{urlencode(params)}")
76+
single = (payload.get("data") or {}).get("singleContainerDetails") or {}
77+
return (single.get("tests") or {}).get("tests") or []
78+
79+
80+
def fetch_test_history(base, *, container, test_name, since_ms, until_ms):
81+
params = common_query(since_ms=since_ms, until_ms=until_ms)
82+
params["container"] = container
83+
params["test"] = test_name
84+
payload = http_get_json(
85+
f"{base}/tests-data/test-history?{urlencode(params)}")
86+
return payload.get("data") or {}
87+
88+
89+
def flaky_day_buckets(history):
90+
"""Day-buckets [(start_ms, end_ms)] in outcomeTrend that contain at
91+
least one flaky/failed execution, most-recent-first."""
92+
points = ((history.get("outcomeTrend") or {}).get("dataPoints")) or []
93+
buckets = []
94+
for p in points:
95+
dist = p.get("outcomeDistribution") or {}
96+
if (dist.get("flaky") or 0) > 0 or (dist.get("failed") or 0) > 0:
97+
s, e = p.get("startTimestamp"), p.get("endTimestamp")
98+
if isinstance(s, int) and isinstance(e, int):
99+
buckets.append((s, e))
100+
buckets.sort(key=lambda be: be[0], reverse=True)
101+
return buckets
102+
103+
104+
def flaky_result_count(history):
105+
points = ((history.get("outcomeTrend") or {}).get("dataPoints")) or []
106+
total = 0
107+
for p in points:
108+
dist = p.get("outcomeDistribution") or {}
109+
total += sum(int(dist.get(outcome) or 0)
110+
for outcome in FLAKY_OUTCOMES)
111+
return total
112+
113+
114+
def _all_source_files():
115+
"""Tracked .java/.groovy/.kt files, posix-relative to WORKSPACE_ROOT."""
116+
out = subprocess.check_output(
117+
["git", "-C", str(WORKSPACE_ROOT), "ls-files",
118+
*(f"*{ext}" for ext in SOURCE_EXTS)],
119+
text=True,
120+
)
121+
return out.splitlines()
122+
123+
124+
def find_test_source(class_fqcn, *, all_files):
125+
"""Locate the source file for a fully-qualified class name.
126+
Disambiguates simple-name collisions by matching the package on the
127+
file's directory path."""
128+
outer = class_fqcn.split("$", 1)[0]
129+
simple = outer.rsplit(".", 1)[-1]
130+
package = outer.rsplit(".", 1)[0] if "." in outer else ""
131+
package_path = "/" + package.replace(".", "/") + "/" if package else ""
132+
targets = {f"{simple}{ext}" for ext in SOURCE_EXTS}
133+
candidates = [f for f in all_files if f.rsplit("/", 1)[-1] in targets]
134+
if not candidates:
135+
return None
136+
if package_path:
137+
for hit in candidates:
138+
if package_path in f"/{hit}":
139+
return WORKSPACE_ROOT / hit
140+
return WORKSPACE_ROOT / candidates[0]
141+
142+
143+
def best_failure_sample(history):
144+
"""Return (build_id, failure_message) from the most recent test result
145+
that carries a non-null ``firstFailureMessage``."""
146+
for r in (history.get("testResults") or []):
147+
msg = r.get("firstFailureMessage")
148+
if isinstance(msg, str) and msg.strip():
149+
return r.get("buildId") or "", msg
150+
return "", ""
151+
152+
153+
def result_scan(result, *, base):
154+
bid = result.get("buildId") or ""
155+
msg = result.get("firstFailureMessage") or ""
156+
return {
157+
"build_id": bid,
158+
"scan_url": f"{base}/s/{bid}",
159+
"outcome": result.get("outcome") or "",
160+
"timestamp_ms": result.get("startTimestamp") or 0,
161+
"tags": result.get("tags") or [],
162+
"work_unit": result.get("workUnitName") or "",
163+
"failure_excerpt": (msg[:600] + (" \u2026" if len(msg) > 600 else ""))
164+
if msg else "",
165+
}
166+
167+
168+
def collect_flaky_scans(history, *, base, limit, seen=None):
169+
"""Up to ``limit`` recent scans where the test failed or flaked."""
170+
out = []
171+
if seen is None:
172+
seen = set()
173+
for r in (history.get("testResults") or []):
174+
if r.get("outcome") not in FLAKY_OUTCOMES:
175+
continue
176+
bid = r.get("buildId") or ""
177+
if not bid or bid in seen:
178+
continue
179+
seen.add(bid)
180+
out.append(result_scan(r, base=base))
181+
if len(out) >= limit:
182+
break
183+
return out
184+
185+
186+
def collect_flaky_scans_by_time(fetch_history, *, base, since_ms, until_ms,
187+
limit, seen, depth=0):
188+
"""Find flaky/failed scans in busy history windows.
189+
190+
The dashboard history endpoint caps ``testResults`` at 50 recent entries.
191+
For high-volume tests, a day with flaky results can still return only
192+
passed rows. Use the trend counts to split that window until the failed or
193+
flaky rows are visible.
194+
"""
195+
if limit <= 0:
196+
return "", "", []
197+
198+
history = fetch_history(since_ms, until_ms)
199+
sample_build, sample_failure = best_failure_sample(history)
200+
scans = collect_flaky_scans(history, base=base, limit=limit, seen=seen)
201+
if scans or flaky_result_count(history) == 0:
202+
return sample_build, sample_failure, scans
203+
204+
if (depth >= MAX_HISTORY_SPLIT_DEPTH
205+
or until_ms - since_ms <= MIN_HISTORY_WINDOW_MS):
206+
return sample_build, sample_failure, scans
207+
208+
mid_ms = (since_ms + until_ms) // 2
209+
right_build, right_failure, right_scans = collect_flaky_scans_by_time(
210+
fetch_history, base=base, since_ms=mid_ms + 1, until_ms=until_ms,
211+
limit=limit, seen=seen, depth=depth + 1,
212+
)
213+
scans.extend(right_scans)
214+
if not sample_failure:
215+
sample_build, sample_failure = right_build, right_failure
216+
217+
if len(scans) < limit:
218+
left_build, left_failure, left_scans = collect_flaky_scans_by_time(
219+
fetch_history, base=base, since_ms=since_ms, until_ms=mid_ms,
220+
limit=limit - len(scans), seen=seen, depth=depth + 1,
221+
)
222+
scans.extend(left_scans)
223+
if not sample_failure:
224+
sample_build, sample_failure = left_build, left_failure
225+
226+
return sample_build, sample_failure, scans
227+
228+
229+
def per_day_flake_breakdown(history):
230+
points = ((history.get("outcomeTrend") or {}).get("dataPoints")) or []
231+
rows = []
232+
for p in points:
233+
dist = p.get("outcomeDistribution") or {}
234+
flaky = dist.get("flaky") or 0
235+
failed = dist.get("failed") or 0
236+
passed = dist.get("passed") or 0
237+
if flaky or failed or passed:
238+
rows.append({
239+
"start_ms": p.get("startTimestamp") or 0,
240+
"end_ms": p.get("endTimestamp") or 0,
241+
"flaky": flaky,
242+
"failed": failed,
243+
"passed": passed,
244+
})
245+
return rows
246+
247+
248+
def main():
249+
OUT_DIR.mkdir(parents=True, exist_ok=True)
250+
SELECTED.unlink(missing_ok=True)
251+
252+
base = os.environ.get("DEVELOCITY_URL", DEFAULT_DEVELOCITY_URL).rstrip("/")
253+
254+
skip = set()
255+
if SKIP.exists():
256+
skip = {
257+
line.strip()
258+
for line in SKIP.read_text(encoding="utf-8").splitlines()
259+
if line.strip() and not line.startswith("#")
260+
}
261+
262+
until_ms = int(time.time() * 1000)
263+
since_ms = until_ms - WINDOW_DAYS * 86400 * 1000
264+
print(f"Querying Develocity {base} for window "
265+
f"[{since_ms}..{until_ms}] ({WINDOW_DAYS}d).")
266+
267+
top = fetch_top_tests(base, since_ms=since_ms, until_ms=until_ms)
268+
print(f"Top containers returned: {len(top)}")
269+
all_files = _all_source_files()
270+
271+
selected = None
272+
for container in top:
273+
cname = container.get("name") or ""
274+
if not cname:
275+
continue
276+
c_flaky = total_flaky(container)
277+
if c_flaky < MIN_FLAKY:
278+
print(f"info: container {cname} below threshold "
279+
f"({c_flaky} < {MIN_FLAKY})")
280+
break # list is sorted; nothing past this will qualify
281+
282+
outer = cname.split("$", 1)[0]
283+
source = find_test_source(outer, all_files=all_files)
284+
if source is None:
285+
print(f"info: skipping {cname}: source not found")
286+
continue
287+
288+
methods = fetch_container_methods(
289+
base, container=cname,
290+
since_ms=since_ms, until_ms=until_ms,
291+
)
292+
ranked = sorted(
293+
({"name": m.get("name") or "", "flaky": total_flaky(m)}
294+
for m in methods),
295+
key=lambda m: m["flaky"], reverse=True,
296+
)
297+
ranked = [m for m in ranked if m["flaky"] >= MIN_FLAKY and m["name"]]
298+
if not ranked:
299+
print(f"info: no per-method flake data for {cname}")
300+
continue
301+
302+
chosen = None
303+
for m in ranked:
304+
fq = f"{cname}.{m['name']}"
305+
if fq in skip or cname in skip:
306+
print(f"info: skipping {fq} (on skip list)")
307+
continue
308+
chosen = m
309+
break
310+
if chosen is None:
311+
continue
312+
313+
history = fetch_test_history(
314+
base, container=cname, test_name=chosen["name"],
315+
since_ms=since_ms, until_ms=until_ms,
316+
)
317+
sample_build, sample_failure = best_failure_sample(history)
318+
seen_scans = set()
319+
recent_scans = collect_flaky_scans(
320+
history, base=base, limit=5, seen=seen_scans)
321+
# The window-wide /tests-data/test-history response truncates
322+
# results and tends to omit the flaky/failed entries. Re-query
323+
# narrowed to each day-bucket that had a flaky execution. Busy
324+
# tests can still return only passed rows for an entire day, so
325+
# recursively split those windows until the flaky rows are visible.
326+
if not sample_failure or not recent_scans:
327+
def fetch_history_window(since_ms, until_ms):
328+
return fetch_test_history(
329+
base, container=cname, test_name=chosen["name"],
330+
since_ms=since_ms, until_ms=until_ms,
331+
)
332+
333+
for s_ms, e_ms in flaky_day_buckets(history):
334+
day_build, day_failure, day_scans = collect_flaky_scans_by_time(
335+
fetch_history_window, base=base, since_ms=s_ms,
336+
until_ms=e_ms, limit=5 - len(recent_scans),
337+
seen=seen_scans,
338+
)
339+
if not sample_failure:
340+
sample_build, sample_failure = day_build, day_failure
341+
recent_scans.extend(day_scans)
342+
if sample_failure and len(recent_scans) >= 5:
343+
break
344+
345+
method = chosen["name"]
346+
selected = {
347+
"class": cname,
348+
"method": method,
349+
"fully_qualified": f"{cname}.{method}",
350+
"flaky_count": chosen["flaky"],
351+
"container_flaky_count": c_flaky,
352+
"source_file": source.relative_to(WORKSPACE_ROOT).as_posix(),
353+
"sample_build_id": sample_build,
354+
"sample_scan_url": f"{base}/s/{sample_build}" if sample_build else "",
355+
"sample_failure": (sample_failure or "")[:8000],
356+
"recent_flaky_scans": recent_scans,
357+
"per_day_breakdown": per_day_flake_breakdown(history),
358+
"window_days": WINDOW_DAYS,
359+
"develocity_url": base,
360+
}
361+
break
362+
363+
if selected is None:
364+
print("No flaky test candidate found that satisfies all constraints.")
365+
return 0
366+
367+
SELECTED.write_text(json.dumps(selected, indent=2), encoding="utf-8")
368+
print(f"Selected: {selected['fully_qualified']} "
369+
f"(flaky={selected['flaky_count']}, "
370+
f"source={selected['source_file']})")
371+
return 0
372+
373+
374+
if __name__ == "__main__":
375+
sys.exit(main())

0 commit comments

Comments
 (0)