Skip to content

Commit 625596c

Browse files
committed
DPL MCP: support for validating homogeneous train composition
1 parent 2c76d3a commit 625596c

1 file changed

Lines changed: 102 additions & 0 deletions

File tree

Framework/Core/scripts/hyperloop-server/hyperloop_server.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,108 @@ async def fetch_one(wid: str) -> dict | None:
399399
return "\n".join(lines)
400400

401401

402+
async def _train_composition(train_id: int) -> tuple[str | None, list[dict]]:
403+
"""(dataset_name, [wagon dicts]) for a train. Shared composition fetch."""
404+
t = await _get("trains/train.jsp", {"train_id": train_id})
405+
ds = t.get("dataset_name")
406+
wagons_ts = t.get("wagons_timestamp") or t.get("dataset_timestamp")
407+
if not wagons_ts:
408+
return ds, []
409+
wd = await _get("trains/wagons_derived_data.jsp",
410+
{"train_id": train_id, "wagons_timestamp": wagons_ts})
411+
wagon_ids = list(wd.keys()) if isinstance(wd, dict) else []
412+
413+
async def fetch_one(wid: str) -> dict | None:
414+
try:
415+
w = await _get("analysis/wagon/wagon.jsp",
416+
{"wagon_id": int(wid), "referenceTime": 0})
417+
if isinstance(w, dict) and w.get("id") is not None:
418+
return w
419+
except Exception:
420+
pass
421+
return None
422+
423+
wagons = [w for w in await asyncio.gather(*(fetch_one(w) for w in wagon_ids)) if w]
424+
return ds, wagons
425+
426+
427+
def _summarize_sig(sig) -> str:
428+
"""Human-readable 'Nx workflow [analysis_id]' summary of a composition signature."""
429+
if not sig or not sig[1]:
430+
return "(no wagons / unresolved)"
431+
c = collections.Counter(f"{wf} [{aid}]" for wf, aid in sig[1])
432+
return ", ".join(f"{n}x {k}" for k, n in sorted(c.items()))
433+
434+
435+
async def _match_compositions(train_ids: list[int]):
436+
"""Group trains by (dataset, multiset of (workflow, analysis_id)).
437+
438+
Returns (groups, ref_sig, matched_ids, failed_ids) where groups maps each
439+
signature to its train ids, ref_sig is the largest group's signature (None
440+
if nothing resolved), and matched_ids are the trains sharing it. Shared by
441+
validate_train_composition and grid_job_bands so both apply the same guard.
442+
"""
443+
async def one(tid: int):
444+
try:
445+
ds, wagons = await _train_composition(tid)
446+
sig = (ds, tuple(sorted((w.get("work_flow_name") or "?",
447+
w.get("analysis_id")) for w in wagons)))
448+
return tid, sig
449+
except Exception:
450+
return tid, None
451+
452+
res = await asyncio.gather(*(one(t) for t in train_ids))
453+
groups: dict = collections.defaultdict(list)
454+
failed = []
455+
for tid, sig in res:
456+
(failed.append(tid) if sig is None else groups[sig].append(tid))
457+
if not groups:
458+
return groups, None, [], failed
459+
ref = max(groups, key=lambda s: len(groups[s]))
460+
return groups, ref, sorted(groups[ref]), failed
461+
462+
463+
@mcp.tool()
464+
async def validate_train_composition(train_ids: list[int]) -> str:
465+
"""Check whether a set of trains share the same dataset + wagon composition.
466+
467+
For each train builds a signature = its dataset plus the multiset of
468+
(workflow, analysis_id) over its wagons, then groups the trains. Run this
469+
before comparing trains over time (throughput / CPU trends, distribution
470+
heatmaps) so confounders — a different analysis, an extra or missing wagon,
471+
a different dataset — are dropped rather than silently skewing the result.
472+
473+
Returns the reference composition (the largest matching group), the matched
474+
train list (feed it straight into the comparison), and each outlier with how
475+
it differs.
476+
"""
477+
groups, ref, matched, failed = await _match_compositions(train_ids)
478+
if ref is None:
479+
return "Could not resolve composition for: " + ", ".join(map(str, failed))
480+
ref_ds = ref[0]
481+
482+
out = [f"Composition check for {len(train_ids)} trains:\n",
483+
f"Reference ({len(matched)}/{len(train_ids)} match): dataset={ref_ds}",
484+
f" {_summarize_sig(ref)}",
485+
f" matched: {', '.join(map(str, matched))}\n"]
486+
487+
outliers = sorted([(s, ts) for s, ts in groups.items() if s != ref],
488+
key=lambda x: sorted(x[1])[0])
489+
if outliers or failed:
490+
out.append("Outliers (exclude from the comparison):")
491+
for s, ts in outliers:
492+
tag = f"dataset={s[0]}; " if s[0] != ref_ds else ""
493+
out.append(f" {', '.join(map(str, sorted(ts)))}: {tag}{_summarize_sig(s)}")
494+
for tid in failed:
495+
out.append(f" {tid}: composition could not be resolved")
496+
out.append("")
497+
else:
498+
out.append("All trains share the same composition. ✓\n")
499+
500+
out.append(f"matched_train_ids = {matched}")
501+
return "\n".join(out)
502+
503+
402504
# ---------------------------------------------------------------------------
403505
# Analysis / wagon browsing
404506
#

0 commit comments

Comments
 (0)