Skip to content

Commit 88add6b

Browse files
committed
DPL MCP: local file support for sidecar and alikes
1 parent 25f4dfe commit 88add6b

2 files changed

Lines changed: 196 additions & 0 deletions

File tree

Framework/Core/scripts/hyperloop-perf-server/hl_common.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ async def fetch_bytes(url: str, proxy_token: str = "", token: str = "") -> bytes
3232
then HYPERLOOP_TOKEN, then ``token``.
3333
token: Hyperloop auth token fallback.
3434
"""
35+
# Local file (a path or a file:// URL) — read directly, no HTTP. Lets a
36+
# locally-generated side-car (igprof-demangle-symbols output) be attached
37+
# via load_igprof(sidecar_url=/path/to/...syms.gz) without a web server.
38+
if url.startswith("file://") or os.path.isfile(url):
39+
path = url[len("file://"):] if url.startswith("file://") else url
40+
with open(path, "rb") as f:
41+
return f.read()
42+
3543
proxy_token = (
3644
proxy_token
3745
or os.environ.get("PROXY_TOKEN", "")

Framework/Core/scripts/hyperloop-server/hyperloop_server.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,194 @@ async def fetch_one(wid: str) -> dict | None:
399399
return "\n".join(lines)
400400

401401

402+
async def _train_composition(train_id: int) -> tuple[str | None, list[dict]]:
403+
"""(dataset_name, [wagon dicts]) for a train. Shared composition fetch."""
404+
t = await _get("trains/train.jsp", {"train_id": train_id})
405+
ds = t.get("dataset_name")
406+
wagons_ts = t.get("wagons_timestamp") or t.get("dataset_timestamp")
407+
if not wagons_ts:
408+
return ds, []
409+
wd = await _get("trains/wagons_derived_data.jsp",
410+
{"train_id": train_id, "wagons_timestamp": wagons_ts})
411+
wagon_ids = list(wd.keys()) if isinstance(wd, dict) else []
412+
413+
async def fetch_one(wid: str) -> dict | None:
414+
try:
415+
w = await _get("analysis/wagon/wagon.jsp",
416+
{"wagon_id": int(wid), "referenceTime": 0})
417+
if isinstance(w, dict) and w.get("id") is not None:
418+
return w
419+
except Exception:
420+
pass
421+
return None
422+
423+
wagons = [w for w in await asyncio.gather(*(fetch_one(w) for w in wagon_ids)) if w]
424+
return ds, wagons
425+
426+
427+
def _summarize_sig(sig) -> str:
428+
"""Human-readable 'Nx workflow [analysis_id]' summary of a composition signature."""
429+
if not sig or not sig[1]:
430+
return "(no wagons / unresolved)"
431+
c = collections.Counter(f"{wf} [{aid}]" for wf, aid in sig[1])
432+
return ", ".join(f"{n}x {k}" for k, n in sorted(c.items()))
433+
434+
435+
async def _match_compositions(train_ids: list[int]):
436+
"""Group trains by (dataset, multiset of (workflow, analysis_id)).
437+
438+
Returns (groups, ref_sig, matched_ids, failed_ids) where groups maps each
439+
signature to its train ids, ref_sig is the largest group's signature (None
440+
if nothing resolved), and matched_ids are the trains sharing it. Shared by
441+
validate_train_composition and grid_job_bands so both apply the same guard.
442+
"""
443+
async def one(tid: int):
444+
try:
445+
ds, wagons = await _train_composition(tid)
446+
sig = (ds, tuple(sorted((w.get("work_flow_name") or "?",
447+
w.get("analysis_id")) for w in wagons)))
448+
return tid, sig
449+
except Exception:
450+
return tid, None
451+
452+
res = await asyncio.gather(*(one(t) for t in train_ids))
453+
groups: dict = collections.defaultdict(list)
454+
failed = []
455+
for tid, sig in res:
456+
(failed.append(tid) if sig is None else groups[sig].append(tid))
457+
if not groups:
458+
return groups, None, [], failed
459+
ref = max(groups, key=lambda s: len(groups[s]))
460+
return groups, ref, sorted(groups[ref]), failed
461+
462+
463+
@mcp.tool()
464+
async def validate_train_composition(train_ids: list[int]) -> str:
465+
"""Check whether a set of trains share the same dataset + wagon composition.
466+
467+
For each train builds a signature = its dataset plus the multiset of
468+
(workflow, analysis_id) over its wagons, then groups the trains. Run this
469+
before comparing trains over time (throughput / CPU trends, distribution
470+
heatmaps) so confounders — a different analysis, an extra or missing wagon,
471+
a different dataset — are dropped rather than silently skewing the result.
472+
473+
Returns the reference composition (the largest matching group), the matched
474+
train list (feed it straight into the comparison), and each outlier with how
475+
it differs.
476+
"""
477+
groups, ref, matched, failed = await _match_compositions(train_ids)
478+
if ref is None:
479+
return "Could not resolve composition for: " + ", ".join(map(str, failed))
480+
ref_ds = ref[0]
481+
482+
out = [f"Composition check for {len(train_ids)} trains:\n",
483+
f"Reference ({len(matched)}/{len(train_ids)} match): dataset={ref_ds}",
484+
f" {_summarize_sig(ref)}",
485+
f" matched: {', '.join(map(str, matched))}\n"]
486+
487+
outliers = sorted([(s, ts) for s, ts in groups.items() if s != ref],
488+
key=lambda x: sorted(x[1])[0])
489+
if outliers or failed:
490+
out.append("Outliers (exclude from the comparison):")
491+
for s, ts in outliers:
492+
tag = f"dataset={s[0]}; " if s[0] != ref_ds else ""
493+
out.append(f" {', '.join(map(str, sorted(ts)))}: {tag}{_summarize_sig(s)}")
494+
for tid in failed:
495+
out.append(f" {tid}: composition could not be resolved")
496+
out.append("")
497+
else:
498+
out.append("All trains share the same composition. ✓\n")
499+
500+
out.append(f"matched_train_ids = {matched}")
501+
return "\n".join(out)
502+
503+
504+
def _percentiles(vals: list[float], ps=(0, 5, 10, 25, 50, 75, 90, 95, 100)) -> dict:
505+
"""Nearest-rank percentiles of a value list (no numpy in the server env)."""
506+
s = sorted(vals)
507+
n = len(s)
508+
out = {}
509+
for p in ps:
510+
if n == 1:
511+
out[p] = s[0]
512+
continue
513+
k = (n - 1) * (p / 100.0)
514+
lo, hi = int(k), min(int(k) + 1, n - 1)
515+
out[p] = s[lo] + (s[hi] - s[lo]) * (k - lo)
516+
return out
517+
518+
519+
@mcp.tool()
520+
async def grid_job_bands(train_ids: list[int], check_composition: bool = True) -> str:
521+
"""Per-JOB grid throughput distribution (percentile bands) across trains over time.
522+
523+
For each train, fetches its per-run grid results (train.jsp jobResults) and
524+
builds percentile bands over the *individual jobs'* throughput_per_core — the
525+
distribution behind the grid-statistics "jobs per CPU time" histogram — NOT
526+
the single train-average throughput, which collapses that spread to one
527+
number. Use this to watch a job-performance distribution shift over time
528+
(e.g. an optimization landing) rather than chasing a noisy mean.
529+
530+
By default runs validate_train_composition first and keeps only the trains
531+
that share the reference composition (set check_composition=False to skip the
532+
guard and band every train as given). Returns a per-train percentile table
533+
(p0/p10/p50/p90/p100 KB/s/core, job count) ordered by date, plus a fenced
534+
```jsonl block (one {date,train,n,tpc:[...]} per train) ready to feed a
535+
band/fan-chart plotting script.
536+
"""
537+
if check_composition and len(train_ids) > 1:
538+
groups, ref, matched, failed = await _match_compositions(train_ids)
539+
if ref is None:
540+
return "Could not resolve composition for any train: " + \
541+
", ".join(map(str, train_ids))
542+
dropped = [t for t in train_ids if t not in matched]
543+
keep = matched
544+
else:
545+
keep, dropped = list(train_ids), []
546+
547+
async def fetch(tid: int):
548+
try:
549+
t = await _get("trains/train.jsp", {"train_id": tid})
550+
t = t[0] if isinstance(t, list) else t
551+
jr = t.get("jobResults") or []
552+
tpc = [j["throughput_per_core"] for j in jr
553+
if (j.get("throughput_per_core") or 0) > 0]
554+
created = t.get("created")
555+
date = (datetime.datetime.fromtimestamp(
556+
created / 1000, datetime.timezone.utc).strftime("%Y-%m-%d")
557+
if created else "?")
558+
return tid, date, tpc
559+
except Exception as e:
560+
return tid, None, str(e)
561+
562+
rows = await asyncio.gather(*(fetch(t) for t in keep))
563+
good = [(tid, d, tpc) for tid, d, tpc in rows if d is not None and tpc]
564+
good.sort(key=lambda r: (r[1], r[0]))
565+
if not good:
566+
return "No usable per-job throughput for: " + ", ".join(map(str, keep))
567+
568+
out = ["Per-job grid throughput bands (KB/s/core), over individual jobs "
569+
"(not train average):\n"]
570+
if dropped:
571+
out.append(f"Dropped (composition mismatch): {', '.join(map(str, dropped))}\n")
572+
out.append(f"{'date':<11}{'train':>8}{'jobs':>6}"
573+
f"{'p0':>8}{'p10':>8}{'p50':>8}{'p90':>8}{'p100':>8}")
574+
out.append("-" * 65)
575+
jsonl = []
576+
for tid, date, tpc in good:
577+
pc = _percentiles(tpc)
578+
k = {p: pc[p] / 1e3 for p in pc} # KB/s/core
579+
out.append(f"{date:<11}{tid:>8}{len(tpc):>6}"
580+
f"{k[0]:>8.0f}{k[10]:>8.0f}{k[50]:>8.0f}{k[90]:>8.0f}{k[100]:>8.0f}")
581+
jsonl.append(json.dumps({"date": date, "train": tid,
582+
"n": len(tpc), "tpc": tpc}))
583+
out.append("\nData (write to a .jsonl and feed the band plot):")
584+
out.append("```jsonl")
585+
out.extend(jsonl)
586+
out.append("```")
587+
return "\n".join(out)
588+
589+
402590
# ---------------------------------------------------------------------------
403591
# Analysis / wagon browsing
404592
#

0 commit comments

Comments
 (0)