Skip to content

Commit 2a0910e

Browse files
Add analyze_manifest tool with file size histogram and rclone flag suggestions
Adds an analyze_manifest() MCP tool that streams manifest.jsonl through a compact embedded Python script on the cluster (no extra deps required) to compute object count, total bytes, size percentiles, and a pow2-binned file size histogram. The results are formatted locally using est.py helpers and paired with rclone flag recommendations derived from the size distribution: - Small files (median < 1 MiB): --transfers 128, --checkers 256, --buffer-size 8M - Medium files (1–64 MiB): --transfers 32, --checkers 64, --buffer-size 32M - Large files (64–512 MiB): --transfers 16, --checkers 32, --buffer-size 64M - Very large (> 512 MiB): --transfers 8, --checkers 16, --buffer-size 128M - P90 > 200 MiB: adds --multi-thread-streams, --s3-upload-concurrency, --s3-chunk-size - P90 > 1 GiB: upgrades chunk/concurrency settings further Output includes the histogram table, human-readable reasoning, multi-line flags for copy-paste, and a ready-made --rclone-flags string for xfer. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b04027c commit 2a0910e

1 file changed

Lines changed: 257 additions & 0 deletions

File tree

src/xfer/mcp/server.py

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""xfer MCP server – gives Claude tools to orchestrate xfer data transfers."""
22
from __future__ import annotations
33

4+
import json
45
import os
56
import shlex
67
import subprocess
@@ -9,6 +10,7 @@
910
from mcp.server.fastmcp import FastMCP
1011

1112
from .config import ClusterConfig, XferMcpConfig, load_config
13+
from xfer.est import ascii_bar, human_bytes
1214

1315
# ---------------------------------------------------------------------------
1416
# Config loading (once at import time)
@@ -87,6 +89,224 @@ def _result(rc: int, stdout: str, stderr: str) -> str:
8789
return "\n\n".join(parts)
8890

8991

92+
# ---------------------------------------------------------------------------
93+
# Manifest analysis helpers
94+
# ---------------------------------------------------------------------------
95+
96+
_KiB = 1024
97+
_MiB = _KiB * 1024
98+
_GiB = _MiB * 1024
99+
100+
# Compact Python 3 script (no external deps, no single quotes) run on the
101+
# cluster via `cat manifest.jsonl | python3 -c '<script>'`.
102+
# Outputs a single JSON line with size stats and a pow2 histogram.
103+
_ANALYZE_SCRIPT = """\
104+
import sys, json, math
105+
sizes = []
106+
for line in sys.stdin:
107+
line = line.strip()
108+
if not line:
109+
continue
110+
try:
111+
obj = json.loads(line)
112+
except Exception:
113+
continue
114+
if obj.get("IsDir") or obj.get("isdir"):
115+
continue
116+
sz = obj.get("size", obj.get("Size"))
117+
if sz is None:
118+
continue
119+
try:
120+
sizes.append(int(sz))
121+
except Exception:
122+
pass
123+
sizes.sort()
124+
n = len(sizes)
125+
total = sum(sizes)
126+
def pct(p):
127+
if not n:
128+
return 0
129+
idx = min(n - 1, max(0, int(math.ceil(n * p / 100.0)) - 1))
130+
return sizes[idx]
131+
def pow2edges(lo, hi):
132+
lo = max(lo, 1)
133+
hi = max(hi, lo + 1)
134+
lp = 2 ** max(0, int(math.floor(math.log2(lo))))
135+
hp = 2 ** int(math.ceil(math.log2(hi)))
136+
if hp <= lp:
137+
hp = lp * 2
138+
edges, x = [], lp
139+
while x < hp:
140+
edges.append(x)
141+
x *= 2
142+
edges.append(hp)
143+
return edges
144+
if sizes:
145+
edges = pow2edges(min(sizes), max(sizes))
146+
bins = [{"lo": edges[i], "hi": edges[i+1], "count": 0, "bytes": 0} for i in range(len(edges)-1)]
147+
for sz in sizes:
148+
for i, b in enumerate(bins):
149+
if i < len(bins) - 1:
150+
if b["lo"] <= sz < b["hi"]:
151+
b["count"] += 1; b["bytes"] += sz; break
152+
else:
153+
b["count"] += 1; b["bytes"] += sz; break
154+
bins = [b for b in bins if b["count"] > 0]
155+
else:
156+
bins = []
157+
print(json.dumps({"objects": n, "bytes_total": total, "mean_bytes": int(total/n) if n else 0, "percentiles": {"p10": pct(10), "p25": pct(25), "p50": pct(50), "p75": pct(75), "p90": pct(90), "p95": pct(95), "p99": pct(99)}, "histogram": bins}))
158+
"""
159+
160+
161+
def _suggest_rclone_flags(stats: dict) -> tuple[list[tuple[str, str | None]], list[str]]:
162+
"""Derive rclone flag recommendations from manifest size statistics.
163+
164+
Returns ([(flag, value_or_None), ...], [reasoning lines]).
165+
"""
166+
p50 = stats["percentiles"]["p50"]
167+
p90 = stats["percentiles"]["p90"]
168+
169+
flags: list[tuple[str, str | None]] = []
170+
notes: list[str] = []
171+
172+
# --- Base parallelism driven by median file size ---
173+
if p50 < _MiB:
174+
transfers, checkers, buf = 128, 256, "8M"
175+
notes.append(
176+
f"Median {human_bytes(p50)} < 1 MiB: maximise --transfers for small-file throughput"
177+
)
178+
elif p50 < 64 * _MiB:
179+
transfers, checkers, buf = 32, 64, "32M"
180+
notes.append(
181+
f"Median {human_bytes(p50)} (1–64 MiB): balanced parallelism"
182+
)
183+
elif p50 < 512 * _MiB:
184+
transfers, checkers, buf = 16, 32, "64M"
185+
notes.append(
186+
f"Median {human_bytes(p50)} (64–512 MiB): fewer concurrent transfers, larger buffer"
187+
)
188+
else:
189+
transfers, checkers, buf = 8, 16, "128M"
190+
notes.append(
191+
f"Median {human_bytes(p50)} > 512 MiB: minimal parallelism, large per-transfer buffer"
192+
)
193+
194+
flags += [
195+
("--transfers", str(transfers)),
196+
("--checkers", str(checkers)),
197+
("--buffer-size", buf),
198+
]
199+
200+
# --- Multipart / multi-thread tuning driven by P90 ---
201+
if p90 > _GiB:
202+
flags += [
203+
("--multi-thread-streams", "8"),
204+
("--s3-upload-concurrency", "16"),
205+
("--s3-chunk-size", "128M"),
206+
]
207+
notes.append(
208+
f"P90 {human_bytes(p90)} > 1 GiB: large S3 chunk size + high multi-thread concurrency"
209+
)
210+
elif p90 > 200 * _MiB:
211+
flags += [
212+
("--multi-thread-streams", "4"),
213+
("--s3-upload-concurrency", "8"),
214+
("--s3-chunk-size", "64M"),
215+
]
216+
notes.append(
217+
f"P90 {human_bytes(p90)} > 200 MiB: enable multi-thread streams + S3 multipart tuning"
218+
)
219+
else:
220+
notes.append(
221+
f"P90 {human_bytes(p90)} ≤ 200 MiB: multipart flags not needed"
222+
)
223+
224+
# Always-on reliability flags
225+
flags += [
226+
("--fast-list", None),
227+
("--retries", "10"),
228+
("--low-level-retries", "20"),
229+
]
230+
231+
return flags, notes
232+
233+
234+
def _format_analysis(stats: dict, run_dir: str) -> str:
235+
"""Format remote stats JSON into a human-readable histogram + flag suggestions."""
236+
n = stats["objects"]
237+
total = stats["bytes_total"]
238+
mean = stats["mean_bytes"]
239+
pct = stats["percentiles"]
240+
bins = stats["histogram"]
241+
242+
lines = [
243+
f"## Manifest analysis: {run_dir}/manifest.jsonl",
244+
"",
245+
f" Objects : {n:,}",
246+
f" Total : {human_bytes(total)} ({total:,} bytes)",
247+
f" Mean : {human_bytes(mean)}",
248+
"",
249+
"### File size percentiles",
250+
f" P10={human_bytes(pct['p10'])} "
251+
f"P25={human_bytes(pct['p25'])} "
252+
f"P50={human_bytes(pct['p50'])} "
253+
f"P75={human_bytes(pct['p75'])}",
254+
f" P90={human_bytes(pct['p90'])} "
255+
f"P95={human_bytes(pct['p95'])} "
256+
f"P99={human_bytes(pct['p99'])}",
257+
"",
258+
"### File size histogram (pow2 bins)",
259+
]
260+
261+
if bins:
262+
max_count = max(b["count"] for b in bins)
263+
lines.append("| Size range | Files | %files | Bytes | %bytes | Bar |")
264+
lines.append("|---|---:|---:|---:|---:|---|")
265+
for b in bins:
266+
lo, hi, count, bts = b["lo"], b["hi"], b["count"], b["bytes"]
267+
pf = 100.0 * count / n if n else 0.0
268+
pb = 100.0 * bts / total if total else 0.0
269+
bar = ascii_bar(count, max_count, width=30)
270+
lines.append(
271+
f"| [{human_bytes(lo)}, {human_bytes(hi)}) "
272+
f"| {count:,} | {pf:.1f}% "
273+
f"| {human_bytes(bts)} | {pb:.1f}% | {bar} |"
274+
)
275+
else:
276+
lines.append("(no files found in manifest)")
277+
278+
flags, notes = _suggest_rclone_flags(stats)
279+
280+
lines += [
281+
"",
282+
"### Suggested rclone flags",
283+
"",
284+
"**Reasoning:**",
285+
]
286+
for note in notes:
287+
lines.append(f"- {note}")
288+
289+
# Build the flag string two ways: multi-line for readability, single-line for xfer
290+
flag_parts = [f"{f} {v}" if v is not None else f for f, v in flags]
291+
multiline = " \\\n ".join(flag_parts)
292+
singleline = " ".join(flag_parts)
293+
294+
lines += [
295+
"",
296+
"**Flags** (formatted for copy-paste):",
297+
"```",
298+
multiline,
299+
"```",
300+
"",
301+
"**Pass to xfer** via `--rclone-flags`:",
302+
"```",
303+
f'--rclone-flags "{singleline}"',
304+
"```",
305+
]
306+
307+
return "\n".join(lines)
308+
309+
90310
# ---------------------------------------------------------------------------
91311
# MCP server
92312
# ---------------------------------------------------------------------------
@@ -602,6 +822,43 @@ def get_run_config(cluster: str, run_dir: str) -> str:
602822
return _result(rc, stdout, stderr)
603823

604824

825+
# ---------------------------------------------------------------------------
826+
# Manifest analysis
827+
# ---------------------------------------------------------------------------
828+
829+
830+
@mcp.tool()
831+
def analyze_manifest(cluster: str, run_dir: str) -> str:
832+
"""Analyse the file size distribution in a manifest and suggest rclone flags.
833+
834+
Streams {run_dir}/manifest.jsonl through a lightweight Python script on the
835+
cluster to compute object count, total bytes, size percentiles, and a
836+
pow2-binned file size histogram. Returns the histogram and recommends
837+
--transfers, --checkers, --buffer-size, --multi-thread-streams, and S3
838+
multipart flags tuned to the observed distribution.
839+
840+
Run after build_manifest() and before render_slurm_scripts() so the
841+
suggested flags can be fed into --rclone-flags.
842+
843+
Args:
844+
cluster: Cluster name from list_clusters()
845+
run_dir: Run directory containing manifest.jsonl
846+
"""
847+
c = _cluster(cluster)
848+
command = (
849+
"cat " + _rq(run_dir) + "/manifest.jsonl"
850+
" | python3 -c " + shlex.quote(_ANALYZE_SCRIPT)
851+
)
852+
rc, stdout, stderr = _ssh(c, command, timeout=300)
853+
if rc != 0:
854+
return _result(rc, stdout, stderr)
855+
try:
856+
stats = json.loads(stdout.strip())
857+
except json.JSONDecodeError as exc:
858+
return f"Failed to parse analysis output: {exc}\n\nRaw output:\n{stdout[:2000]}"
859+
return _format_analysis(stats, run_dir)
860+
861+
605862
# ---------------------------------------------------------------------------
606863
# General SSH escape hatch
607864
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)