diff --git a/compute-feasibility-advisor-proposal.md b/compute-feasibility-advisor-proposal.md new file mode 100644 index 000000000..7ebf70bd9 --- /dev/null +++ b/compute-feasibility-advisor-proposal.md @@ -0,0 +1,281 @@ +# Compute Feasibility Advisor for AutoIntent + +- **Date:** 2026-05-23 +- **Status:** Proposal (pre-implementation) +- **Audience:** AutoIntent maintainers / contributor picking up the task +- **Scope of this document:** technical specification — *what* the advisor estimates and the formulas it uses. Architectural and system-design choices (where the advisor lives in the codebase, how it integrates with the optimizer, the public API surface, file/module layout) are deliberately left to the implementer. + +## Problem + +AutoIntent's main strength is letting a user kick off a full search-space optimization with one call: + +```python +pipeline = Pipeline.from_preset("transformers-heavy") +pipeline.fit(dataset) +``` + +The cost of that convenience is that users — especially those running on a laptop, a single consumer GPU, or a free cloud instance — cannot tell ahead of time whether their hardware can carry the configuration they have just selected. + +Concrete failure cases we see today: + +- `transformers-heavy` fine-tunes `microsoft/deberta-v3-large` for up to 30 epochs across 40 HPO trials. That needs ~12–18 GB VRAM (full fine-tune, fp32) and many hours of wall time on a single GPU. A user with an 8 GB card finds out by OOM, often several minutes into a run. +- Swapping `intfloat/multilingual-e5-large-instruct` (2 GB) for `sentence-transformers/all-MiniLM-L6-v2` (90 MB) changes the resource bill by an order of magnitude — but nothing surfaces this difference up front. +- Disk is a silent failure mode: a search space referencing several large checkpoints can pull >10 GB into the HF cache before any training starts. + +The target audience for this feature is users with limited resources who pick a preset, hit `fit()`, and want to know within a second whether they should change something. + +## Proposed solution: pre-flight resource advisor + +Add a **pre-flight advisor** that, given a parsed search space and a dataset, estimates worst-case disk, RAM, VRAM, and wall-time requirements from public Hugging Face Hub metadata and a small set of formulas, then prints a clear summary with red/yellow/green warnings. By default it is **report-only and never blocks the run**; an opt-in **reduce-to-fit** mode additionally prunes the search space to fit detected hardware. + +### Scope + +The advisor analyses only the **local, model-bearing** modules whose footprint can be derived from HF Hub metadata. Everything else is either trivial or out of band. + + +| Module category | In scope? | Reason | +| -------------------------------------------------------------------------------- | --------- | -------------------------------------------------- | +| `SentenceTransformerEmbeddingConfig` | yes | local transformer, dominant cost on small machines | +| `VllmEmbeddingConfig` | yes | local transformer with extra engine overhead | +| `HFModelConfig`-based scorers (`bert`, `lora`, `ptuning`, `dnnc`, cross-encoder) | yes | the actual heavyweights | +| GCN scorer when configured with a transformer backbone | yes | inherits the backbone cost | +| `LinearScorer` (sklearn `LogisticRegression` / `LogisticRegressionCV`) | yes | dominant cost on presets with no transformer fine-tune; the CV path multiplies a single fit by ~30 | +| `CatBoostScorer` | yes | dominant cost on presets with no transformer fine-tune; high default `iterations` | +| `OpenaiEmbeddingConfig` | no | no local resources to estimate | +| `HashingVectorizerEmbeddingConfig` | no | trivial cost | +| `knn`, `mlknn`, generic `sklearn` classifiers via `SklearnScorer`, `description` | no | bounded so far below any in-scope module that they cannot plausibly be the bottleneck | +| `decision` and `regex` nodes | no | negligible | + + +Rationale: the user's real risk is whichever module is the actual bottleneck. On heavy presets that is a transformer fine-tune; on light presets it shifts to `linear` (CV-multiplied) or `catboost` (1000 default iterations × dataset shape). Modules left out of scope are ones whose cost is bounded so far below any in-scope module that they cannot plausibly be the reason a run fails. + +### Phases + +The advisor is one entry point, but internally splits work into three phases that share a single `PreflightReport` object. The split is internal organization — all three run at the same hook point (after `validate_modules`, before `_fit(context)`) and the user sees one summary. Separating them keeps each phase's inputs, formulas, and failure modes scoped: + +- **Resource phase.** Disk / RAM / VRAM / wall-time estimates and comparisons against detected hardware. Most of the formulas in this document live here. This is the only phase consumed by the reduce-to-fit pruner. +- **Data quality phase.** Findings derived from the dataset jointly with the active search space — token-length truncation, split readiness (auto-invokes the existing `check_split_readiness` utility rather than re-implementing it), partial intent descriptions paired with the `description` scorer, embedder/scorer dimension consistency. Reports red/yellow lines but never prunes the search space; the user fixes the dataset or the config. +- **Configuration sanity phase.** Joint checks across dataset + search-space + hardware that don't slot cleanly into the other two — e.g., `hpo_config.n_jobs > 1` × per-trial VRAM contention, CatBoost `task_type="GPU"` with no CUDA. Pydantic schema validation already runs upstream on `OptimizationConfig`; this phase only adds checks that need joint inspection. + +The advisor consumes `validate_modules`'s *post-filter* view of `self.nodes` — it does not duplicate that mutating filter. + +### Inputs + +- The parsed `OptimizationConfig` (search space, HPO config, embedder/transformer configs). +- The training `Dataset` (for `dataset_size` and an approximate token-length distribution). +- Detected local hardware: + - Total / available RAM via `psutil`. + - Free disk on the AutoIntent / HF cache directory via `shutil.disk_usage`. + - Accelerator detection, in priority order: + - **CUDA:** per-GPU VRAM and device name via `torch.cuda`. + - **MPS (Apple Silicon):** detected via `torch.backends.mps.is_available()`. Apple chips use unified memory, so there is no separate VRAM pool — the "VRAM budget" is a fraction of total system RAM. Default budget = 70 % of total RAM (matching the macOS `PYTORCH_MPS_HIGH_WATERMARK_RATIO` default) with the remainder reserved for the OS and other apps. The fraction is exposed as a knob. + - **CPU only:** when neither is available. + +### Output + +A structured estimate plus a human-readable summary printed to the logger. Example: + +``` +Compute feasibility check +───────────────────────── +Resource: + Available : 8 GB VRAM (NVIDIA RTX 3060), 32 GB RAM, 120 GB free disk + Disk : 5.2 GB to download, 1.1 GB already cached (3 unique checkpoints) + RAM : ~4 GB + VRAM : ~14 GB × 2 parallel trials (n_jobs=2) ⚠ exceeds available + Time : ~6 h (+~12 min for refit_after) (single-GPU, fp32, rough) + +Data: + Train tokens p95 : 612 (exceeds bert.max_length=512) ⚠ ~7% truncated + Split readiness : 2 classes have <3 samples — LogisticRegressionCV cv=3 will fail ✗ + +Config: + CatBoost task_type=GPU but no CUDA detected — will fall back to CPU ⚠ + +Drivers of cost: + scoring.bert microsoft/deberta-v3-large full fine-tune × 40 trials × 30 epochs → ~14 GB VRAM, ~5 h + embedder intfloat/multilingual-e5-large-instruct → ~2.2 GB VRAM + +Suggestions: + • Enable mixed precision (fp16/bf16) on the bert scorer + • Reduce batch_size from 64 to 16 or 32 + • Set hpo_config.n_jobs=1 — parallel trials are doubling VRAM demand + • Try preset `transformers-light` or `classic-medium` + +These numbers are heuristic upper bounds, not measurements. +``` + +Numbers are reported with honest precision (one significant figure for time, two for memory) and an explicit "estimate, not measurement" disclaimer. + +### Algorithm (proposal, allowed to adjust) + +1. **Collect candidates.** Walk the search space; collect every unique in-scope module. For transformer-bearing modules the identity is `(module_type, model_name, mode)` with `mode ∈ {inference, lora, full-finetune}`. For `linear` and `catboost` the identity is `(module_type, embedder_name, task_kind)` with `task_kind ∈ {multiclass, multilabel}` — the routing through `LogisticRegressionCV` vs `MultiOutputClassifier`, and CatBoost's per-class trees, both depend on it. Also collect the HPO knobs that drive cost: `n_trials` plus per-module knobs — transformer (`epochs`, `batch_size`, `max_length`, `dtype` ∈ {fp16, bf16, fp32}), `linear` (`cv`, `max_iter`), `catboost` (`iterations`, `depth`, `task_type`, `features_type`). +2. **Resolve checkpoints.** For each unique `model_name`, query HF Hub for safetensors metadata to read parameter count and weight dtype. Fall back to file-size aggregation if safetensors metadata is missing. Fall back to a "unknown — heuristic only" tag with low-confidence labelling if HF Hub is offline or the repo is private. `LinearScorer` and `CatBoostScorer` have no checkpoint of their own; they reuse the embedder resolved by this step in their formulas (their cost is parameterised by `embedder_dim`, not parameter count). +3. **Apply formulas.** All values are honest upper bounds; convergence and early stopping often terminate well below them. + - **Disk** = sum over unique downloadable checkpoints of total file size, plus a small fixed overhead per checkpoint for tokenizers and config. `LinearScorer` and `CatBoostScorer` contribute zero (they consume embedder output that is already accounted for upstream). + - **RAM per module:** + - Transformer modules (any mode): `params × dtype_bytes + dataset_tokens × 4 bytes`, treated as a loose upper bound for tokenized buffers. + - `LinearScorer`: `8 × n_samples × embedder_dim` (float64 data matrix — the dominant term) `+ 8 × n_classes × embedder_dim` (coefficients) `+ ~10 × 8 × embedder_dim` (L-BFGS history). + - `CatBoostScorer`: `4 × n_samples × n_features` (data, float32 internally) `+ 4 × n_features × n_bins` (histograms; default `n_bins = 254`) `+ iterations × 2^depth × ~32 bytes` (tree storage). For `features_type ∈ {embedding, both}`, `n_features = embedder_dim`. For `features_type = text`, `n_features` is the BoW vocab discovered at fit; bound with a coarse default (e.g. 50 000) and tag the estimate low-confidence. + - For `linear` and `catboost`, `embedder_dim` is taken from the largest embedder in the same node group — same worst-case stance as the rest of the estimate. + - **VRAM per module:** + - Inference embedder: `params × dtype_bytes × ~1.3` (small constant for activations). + - Full fine-tune (`bert`, GCN backbone, soft-prompt `ptuning`): `params × dtype_bytes × (1 + 1 + 2)` for weights + grads + Adam state, halved when fp16/bf16 mixed precision is configured. + - LoRA: inference VRAM + a small adapter constant. + - Reranker (cross-encoder, `dnnc`): inference VRAM × small factor for the reranking pass. + - `LinearScorer`: N/A (sklearn is CPU-only). + - `CatBoostScorer`: 0 by default; if `task_type="GPU"` is configured, the RAM formula above lives on device instead. + - **Time per module:** + - Transformer modules: `n_trials × epochs × (dataset_size / batch_size) × per_step_seconds(params, max_length, device_class)`, where `per_step_seconds` is a small static lookup keyed on coarse device class (`cpu`, `low-gpu`, `mid-gpu`, `high-gpu`, `apple-silicon`) auto-detected from `torch.cuda.get_device_name` or `platform`/`torch.backends.mps`. + - `LinearScorer`: `n_trials × C_cpu × n_samples × embedder_dim × max_iter × cv_multiplier × class_multiplier`, where: + - `C_cpu ≈ 1e-8 s` per `(sample × feature × iteration)` on a single modern CPU core. + - `cv_multiplier = Cs × cv + 1 ≈ 31` for the multiclass path (`LogisticRegressionCV` with default `Cs = 10`, repo default `cv = 3`, plus one final refit). `cv_multiplier = 1` for the multilabel path (no inner CV). + - `class_multiplier = n_classes` for the multilabel path (`MultiOutputClassifier` fits one binary LogReg per class); `class_multiplier = 1` otherwise. + - `CatBoostScorer`: `n_trials × iterations × C_device × n_samples × n_features × depth × class_multiplier`, where: + - `C_device ≈ 1e-9 s` on CPU, ~5–20× faster on GPU. Resolve `C_device` via the same `device_class` lookup as the transformer time formula. + - `class_multiplier = n_classes` for both the multiclass `MultiClass` loss (per-class trees per iteration) and the multilabel routing (one CatBoost per class). + - Early stopping is not modelled; `iterations` is treated as the upper bound. + - Total time = sum across modules. MPS time numbers are coarser than CUDA's (one tier for now); we accept that. +4. **Compare to detected hardware.** Per-dimension status is green / yellow / red against a configurable headroom (defaults: **red** if estimate > 100 % of available, **yellow** if > 70 %). On MPS, "VRAM" and "RAM" estimates draw from the same physical pool; we compare *the larger of the two* against the unified-memory budget rather than each independently. +5. **Render summary.** Log at INFO. If any dimension is red, emit at WARNING so it shows in non-logging contexts. + +#### Resource-phase refinements + +These adjust the formulas above for situations that look fine in single-trial isolation but blow up in practice: + +- **Cold-vs-warm HF cache (Tier 1).** Before reporting disk, probe each unique `model_name` against the local HF cache via `huggingface_hub.try_to_load_from_cache` / `scan_cache_dir`, keyed off `HF_HOME`. Split the disk line into `to_download` vs `already_cached`. Treat a repo as cached only if the weight shard (`model.safetensors` or equivalent) is present — not just config/tokenizer files. Without this, a repeated run on the same machine alarms the user about gigabytes they already have. +- **Concurrent-trial × per-trial VRAM (Tier 1).** Multiply the per-trial VRAM estimate by `hpo_config.n_jobs` when `n_jobs > 1` and the active accelerator is GPU. Same for the `dump_modules=True` path on disk: each trial writes module weights to the dump dir, so multiply per-module dump-disk by `n_trials`. vLLM is process-isolated and its contention model differs; note this in the disclaimer. +- **`refit_after=True` time delta (Tier 2).** When `Pipeline.fit(refit_after=True)`, add one full-data training pass per node to the time estimate. Small term but easy to forget; users running close to their time budget care about it. +- **HF Hub reachability probe (Tier 2).** One up-front `HfApi().whoami()` (or unauthenticated `HEAD` to `huggingface.co`) at the start of the phase. On failure, consistently downgrade *all* model entries to the "unknown — heuristic only" path instead of timing out per-model 10× on a 10-model search space. +- **CatBoost `task_type="GPU"` sanity (Tier 2).** When CatBoost is in the search space with `task_type="GPU"` but `torch.cuda.is_available()` is false, tag yellow — CatBoost silently falls back to CPU and the user otherwise sees CPU speeds with no warning. + +### Data quality phase + +The resource phase predicts whether the run *fits*. The data quality phase predicts whether the run *produces a meaningful result*. Both are caught at the same hook point because both have the same failure mode from the user's perspective: hours of compute followed by a cryptic error or a silently degraded model. + +- **Token-length truncation (Tier 1).** Sample ~1000 utterances from the train split, tokenize against each unique transformer's tokenizer, compute `p95_tokens` and `% truncated` against the module's `max_length`. Yellow when >1% truncated; red when >10%. Reuse the tokenizer the resource phase already loaded for parameter-count resolution — don't double-fetch. The existing pipeline silently truncates (sentence-transformers and the HF Trainer both default to `truncation=True`); there is no warning anywhere today. +- **Auto-invoke `check_split_readiness` (Tier 1).** Call the existing utility at `context/data_handler/_readiness_util.py:44–109` with the active `data_config` and surface its `SplitReadinessResult` — it already returns `underpopulated_classes`, `ready`, and a `reason` string, but is not called anywhere from `Pipeline.fit()` today. When `LinearScorer` with CV is in the search space and any class has `n < cv`, name the module by name in the red line ("`LogisticRegressionCV` cv=3 will fail: classes [X, Y] have <3 samples") rather than emitting a generic split-readiness message. +- **Partial intent descriptions × `description` scorer (Tier 1).** The dataset constructor already warns once at import when *some* but not all intents have descriptions (`_dataset/_dataset.py:199–207`). The advisor escalates this to red when the `description` scorer is also present in the active search space — otherwise the run will produce NaN embeddings for the missing intents. Action message: "fill in N missing descriptions", not "drop the scorer". +- **Embedder ↔ scorer dimension consistency (Tier 2).** For `LinearScorer` / `CatBoostScorer` with `features_type="both"`, verify the embedder reachable from the same node group exposes a stable, expected dimension. Cross-node walk; surface as yellow when the resolved dimension cannot be confirmed pre-flight. + +### Configuration sanity phase + +Pydantic schema validation on `OptimizationConfig` runs upstream at config-load time; this phase only adds checks that require *joint* inspection of dataset + search-space + hardware. With Tier 1 + Tier 2 in scope today, this phase holds two items: + +- The `n_jobs × VRAM` callout, surfaced jointly with the resource phase (single line in the rendered output). +- The CatBoost `task_type="GPU"` without CUDA check, same. + +Both could live entirely in the resource phase; they get their own phase because future additions — joint scorer↔decision shape checks, OOS-support mismatches detected up front rather than at module instantiation, embedder-dimension mismatches — slot here naturally. Keep the phase scaffold even if it is currently thin. + +### Failure modes + +- **HF Hub offline or private repo:** fall back to "unknown model — name-pattern heuristic only", explicit low-confidence label, never raise. +- **No accelerator (no CUDA and no MPS):** report VRAM as N/A and mark GPU-only modules as "requires GPU" without estimating a (misleading) CPU wall time. +- **MPS configured but a module is incompatible:** vLLM in particular does not run on MPS. Flag the module as "unsupported on MPS" rather than estimating; do not raise. +- **MPS with CPU fallback ops:** some PyTorch ops fall back to CPU on MPS, inflating system-RAM usage and wall time beyond the heuristic. Note this in the disclaimer; we don't try to model it. +- **vLLM configured but not installed:** still estimate (the VRAM accounting is similar), note that the engine itself has additional overhead not captured. +- **Estimate wildly wrong vs. reality:** always-on disclaimer in the printed summary that these are heuristic upper bounds. + +### Reduce-to-fit mode + +The feasibility check has two modes sharing the same estimation pipeline: + +- **Report mode (default).** Print the summary, return the structured estimate, let the run proceed regardless of severity. +- **Reduce-to-fit mode (opt-in).** Additionally prune the search space to fit detected hardware before the run starts. Same estimates, same comparisons — just one extra step that produces a reduced search space. + +Reduce-to-fit consumes only the **resource phase** output. Data-quality and config-sanity findings are reported but never trigger pruning — they require user action (fix the dataset, change a config flag), not search-space narrowing. + +Using the same per-module estimates, the pruner applies three least-destructive steps in order: + +1. **Filter discrete-choice hyperparameters.** For lists of cost-driving values (model name, batch size, training epochs, CatBoost `iterations` / `depth`, sklearn `cv`), keep only entries whose worst-case estimate fits. +2. **Cap continuous ranges.** For `{low, high}` ranges of cost-driving parameters, lower the upper bound to the largest fitting value. Ranges of non-cost parameters (learning rate, decision thresholds) are not touched. +3. **Drop module variants.** If a module entry has any required hyperparameter with no satisfiable value left, drop that module entry from its node's search space. + +Guard rails: + +- If pruning would leave any node's search space empty, the pruner **raises**. We don't silently produce a non-runnable pipeline, and we don't quietly fall back to report-only — failing loudly is the right contract for a mode whose whole purpose is to make the run feasible. The error message points the user toward a lighter preset. +- Time is not used as a filter — only memory and disk are. Time is still reported. +- Headroom thresholds are intentionally generous to avoid over-pruning and are configurable. + +Alongside the standard estimate, the caller receives a structured description of what was filtered, capped, and dropped, plus the resulting search space and its recomputed (now green) estimate. + +**Drawbacks worth surfacing.** + +- **Silent narrowing of intent.** A search space deliberately written to include heavy/light variants for comparison gets halved. The mode is opt-in for this reason. +- **Over-pruning when our formulas overestimate.** A 30 %-high estimate on a borderline configuration throws away a run that would have succeeded. Generous headroom defaults mitigate; the knob is exposed. +- **Hard failure when nothing fits.** Raising is intentional — silent degradation to report-only would defeat the mode's purpose — but it is a sharper edge than report mode has. +- **Pre-trial only.** The rewrite happens before any HPO trial starts. This is fine because the search space is treated as immutable across a study, but worth calling out so nobody tries to make this dynamic later. + +### CLI surface + +The advisor is also exposed as a console script (`autointent-advisor`) so users can answer "what will this cost?" and "what should I run?" without writing Python. Two subcommands: + +- **`autointent-advisor inspect `.** Resolves the preset (or a user-supplied `OptimizationConfig`), detects local hardware, runs the same three-phase advisor that `Pipeline.fit()` runs, and prints the same report. Accepts `--dataset` for a real dataset, or `--n-samples / --n-classes / --avg-tokens` placeholders when the dataset is not yet built — so the script is useful before any training data exists. `--json` emits the structured `PreflightReport` for scripting. +- **`autointent-advisor recommend [--n-samples ... | --dataset ...] [--budget-time 12h] [--budget-vram-gb 8]`.** Detects local hardware (with manual overrides applied), iterates over the bundled presets in `_presets/`, and tags each as `feasible` / `feasible-with-reduce` / `infeasible`. Ranks feasible presets by quality tier (`heavy > medium > light`) then estimated wall-time; picks the top one as the recommendation. For the heaviest infeasible preset, surfaces the single most-impactful knob change that would make it fit (e.g., "`transformers-heavy` would fit if `batch_size` ≤ 16 and `dtype=fp16`"), reusing the reduce-to-fit pruner's per-knob delta info. + +**Constraints (both subcommands).** No model downloads — only HF Hub metadata endpoints (`HfApi().model_info`); never `from_pretrained`. Offline-safe — on Hub unreachability, fall back to the same "heuristic only" path and mark the report low-confidence; do not raise. Hardware-detection failures (broken CUDA install where `torch.cuda.mem_get_info()` raises) fall back to CPU detection and tag the report rather than crashing. + +## Alternatives considered and rejected + +### B. Smoke-test calibration + +Run each unique module for one mini-batch / one step before the real fit, measure peak RAM and VRAM with `psutil`, `tracemalloc`, and `torch.cuda.max_memory_allocated`, time the step, and extrapolate to the full search space. + +Rejected because: + +- It **downloads weights just to estimate** — the disk-headroom check we wanted to provide is defeated by the act of performing it. +- It can **OOM while predicting OOM**, exactly on the constrained hardware that is the target audience. +- It adds **seconds to minutes** of wall time before `fit()` does anything, surprising users. +- It needs per-module "tiny run" hooks; not every scorer has a clean "stop after one step" path. +- For OpenAI- or vLLM-served embedders, a smoke test costs real money or starts the engine. +- Still not accurate due to CUDA and CPU cache, memory heating and so on. + +### C. Curated benchmark table + +Ship a JSON in the package with measured VRAM and per-step time for the bundled-preset checkpoints, broken out by hardware class (cpu / mid-gpu / high-gpu) and mode (inference / lora / full-finetune). Fall back to heuristics for unknown checkpoints. + +Rejected because: + +- **Maintenance burden:** every new model added to a preset would need entries across the hardware × precision × mode matrix. +- Numbers **go stale** when `transformers` updates change defaults (attention impl, dtype, gradient checkpointing). +- It still needs the chosen-solution heuristics as a long-tail fallback — so it adds work on top of Option A without replacing it. +- **Confident-but-wrong is worse than honest-but-fuzzy.** A table that says "4 GB on 4090" when the user OOMs at 4.5 GB damages trust more than a clearly-labelled range would. + +### D. Layered (A by default, opt-in B, embedded table from C, local actuals cache) + +Combine all three: ship A as the fast path, allow `calibrate=True` to trigger B for heavy modules only, embed a small table from C for the bundled-preset checkpoints, and write actuals from every real run to a local cache that feeds back into future estimates. + +Rejected because: + +- **Implementation surface multiplies:** two estimation code paths to keep consistent, a cache schema with versioning and eviction, two failure modes to document. +- **Discoverability:** users may not learn about `calibrate=True` and the realized value compresses back to roughly Option A anyway. +- The team's bandwidth doesn't justify the marginal accuracy gain over A for the target audience. + +## Comparison + + +| Dimension | A (chosen) | B (smoke-test) | C (benchmark table) | D (layered) | +| -------------------------------- | ------------------------------ | ---------------------- | ---------------------------------- | ------------------------------------- | +| Wall time at pre-flight | < 1 s | seconds–minutes | < 1 s | < 1 s default, s–min when calibrating | +| Accuracy on common checkpoints | medium | high | high | high | +| Accuracy on custom checkpoints | medium | high | medium (fallback) | medium–high | +| Time-estimate quality | low–medium | high | high | high | +| Disk pre-download required | no | yes | no | only when calibrating | +| Risk of OOM during the check | none | real | none | only when calibrating | +| Network usage | 1 cached call per unique model | none beyond normal fit | none | combination | +| Implementation effort | small | large | medium + ongoing benchmark refresh | large + cache infra | +| Ongoing maintenance | low (formulas only) | low | high | high | +| Friendly to offline / air-gapped | with fallback | yes | yes | partial | + + +The chosen solution accepts a real accuracy gap on time and a moderate accuracy gap on VRAM in exchange for the only profile that fits the target audience's constraints: zero added wall time, zero added downloads, zero added failure modes, and a small one-time implementation cost. + +## Out of scope (possible follow-ups) + +- Live resource observability during `fit()` (peak RAM / VRAM per trial, abort on overrun). +- A learned calibration cache from real runs to refine estimates over time. +- **Determinism / `cudnn.deterministic` check.** Belongs in seed-setting code (`set_seed` utility, `Pipeline.__init__`), not in a feasibility advisor — reproducibility is not a hardware-budget question. +- **OpenAI / Generator token-cost ($) estimation.** Real value, but pricing tables age badly, the `StructuredOutputCache` hit rate is unknowable upfront, and the API-paying audience overlaps poorly with this advisor's stated audience (resource-constrained local users). Push to a separate `cost_estimator` tool. +- **Predictive CO₂ / emissions.** `_callbacks/emissions_tracker.py` already does this retrospectively, accurately. A predictive version multiplies our (loose) time estimate by a regional kWh/CO₂ factor — two sources of imprecision compounded. The retrospective number is the trustworthy one. +- **vLLM startup compile time.** Minutes of overhead before any work, but vLLM is unsupported on MPS, isn't the dominant cost on CUDA once running, and modelling it needs a startup-time lookup table. Note once in the disclaimer; do not model. + diff --git a/pyproject.toml b/pyproject.toml index 5fbabaf86..3bc17dc00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dependencies = [ "aiofiles (>=24.1.0,<25.0.0)", "threadpoolctl (>=3.0.0,<4.0.0)", "packaging (>=23.2)", + "psutil (>=5.9.0,<8.0.0)", ] [project.optional-dependencies] @@ -121,6 +122,7 @@ typing = [ "joblib-stubs (>=1.4.2.5.20240918,<2.0.0)", "pandas-stubs (>= 2.2.3.250527, <3.0.0)", "types-aiofiles (>=24.1.0.20250606)", + "types-psutil>=7.2.2.20260518", ] docs = [ "sphinx (>=8.1.3,<9.0.0)", @@ -145,6 +147,7 @@ Documentation = "https://deeppavlov.github.io/AutoIntent/" [project.scripts] "basic-aug" = "autointent.generation.utterances._basic.cli:main" "evolution-aug" = "autointent.generation.utterances._evolution.cli:main" +"advisor" = "autointent._advisor._cli:main" [build-system] requires = ["uv_build>=0.8.7,<0.9.0"] diff --git a/src/autointent/_advisor/__init__.py b/src/autointent/_advisor/__init__.py new file mode 100644 index 000000000..04c71007f --- /dev/null +++ b/src/autointent/_advisor/__init__.py @@ -0,0 +1,29 @@ +"""Pre-flight compute feasibility advisor. + +Exposes a small surface used by both ``Pipeline.fit()`` (future integration) and +the ``autointent-advisor`` CLI script. See ``compute-feasibility-advisor-proposal.md`` +at the repo root for the design document. +""" + +from __future__ import annotations + +from ._hardware import HardwareProfile, detect_hardware +from ._report import DatasetStats, Finding, PreflightReport, RecommendationResult, ResourceEstimate, Severity +from .runner import run_preflight +from .workflows import inspect, load_config, recommend, stats_from_dataset + +__all__ = [ + "DatasetStats", + "Finding", + "HardwareProfile", + "PreflightReport", + "RecommendationResult", + "ResourceEstimate", + "Severity", + "detect_hardware", + "inspect", + "load_config", + "recommend", + "run_preflight", + "stats_from_dataset", +] diff --git a/src/autointent/_advisor/_cli.py b/src/autointent/_advisor/_cli.py new file mode 100644 index 000000000..6c1b2bc2e --- /dev/null +++ b/src/autointent/_advisor/_cli.py @@ -0,0 +1,134 @@ +"""Console-script entry point for the pre-flight advisor. + +Two subcommands: + +* ``inspect`` — show what a given preset / config will cost on this machine. +* ``recommend`` — pick the best-fitting bundled preset for this machine. + +Both subcommands accept either a real ``--dataset`` (Hub id or local +csv/json/jsonl/parquet path loaded via ``datasets.load_dataset``) or +``--n-samples / --n-classes / --avg-tokens`` placeholders so the script is +useful before the user has built a dataset. + +The CLI is a thin wrapper around :func:`autointent._advisor.inspect` and +:func:`autointent._advisor.recommend`; callers that don't need argparse can +import those helpers directly. +""" + +from __future__ import annotations + +import argparse +import json +import logging +import sys + +from autointent._advisor import inspect, recommend, stats_from_dataset + +from ._render import render_json, render_recommendation, render_text +from ._report import DatasetStats + +logger = logging.getLogger("autointent.advisor") + + +def _stats_from_args(args: argparse.Namespace) -> DatasetStats: + multilabel = args.task == "multilabel" + if args.dataset: + return stats_from_dataset(args.dataset, multilabel=multilabel) + return DatasetStats.placeholder( + n_samples=args.n_samples, + n_classes=args.n_classes, + avg_tokens=args.avg_tokens, + multilabel=multilabel, + ) + + +def _add_common_dataset_args(p: argparse.ArgumentParser) -> None: + p.add_argument("--dataset", help="Path or hub id of a dataset; overrides placeholders.") + p.add_argument("--n-samples", type=int, default=1_000, help="Placeholder training set size.") + p.add_argument("--n-classes", type=int, default=10, help="Placeholder class count.") + p.add_argument("--avg-tokens", type=int, default=32, help="Placeholder average token length.") + p.add_argument( + "--task", + choices=("multiclass", "multilabel"), + default="multiclass", + help="Placeholder task type when --dataset isn't given.", + ) + + +def cmd_inspect(args: argparse.Namespace) -> int: + report = inspect( + args.target, + stats=_stats_from_args(args), + budget_vram_gb=args.budget_vram_gb, + ) + if args.json: + sys.stdout.write(render_json(report)) + else: + sys.stdout.write(render_text(report)) + sys.stdout.write("\n") + return 0 if report.is_feasible else 1 + + +def cmd_recommend(args: argparse.Namespace) -> int: + result = recommend( + stats=_stats_from_args(args), + budget_vram_gb=args.budget_vram_gb, + budget_time_h=args.budget_time_h, + ) + if args.json: + sys.stdout.write(json.dumps(result.to_dict(), indent=2, default=str)) + sys.stdout.write("\n") + else: + sys.stdout.write(render_recommendation(result.results, result.chosen)) + sys.stdout.write("\n") + if result.chosen: + sys.stdout.write("\n") + sys.stdout.write(render_text(dict(result.results)[result.chosen])) + sys.stdout.write("\n") + return 0 if result.chosen else 1 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="autointent-advisor", + description="Pre-flight feasibility advisor for AutoIntent search-space optimization.", + ) + parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging.") + + sub = parser.add_subparsers(dest="cmd", required=True) + + p_inspect = sub.add_parser( + "inspect", + help="Inspect a preset or OptimizationConfig and print a feasibility report.", + ) + p_inspect.add_argument("target", help="Preset name (e.g. transformers-light) or path to a YAML config.") + p_inspect.add_argument("--json", action="store_true", help="Emit a structured JSON report.") + p_inspect.add_argument("--budget-vram-gb", type=float, default=None, help="Override detected VRAM budget.") + _add_common_dataset_args(p_inspect) + p_inspect.set_defaults(func=cmd_inspect) + + p_rec = sub.add_parser( + "recommend", + help="Detect hardware and recommend the best-fitting bundled preset.", + ) + p_rec.add_argument("--json", action="store_true", help="Emit a structured JSON report.") + p_rec.add_argument("--budget-vram-gb", type=float, default=None, help="Override detected VRAM budget.") + p_rec.add_argument("--budget-time-h", type=float, default=None, help="Optional wall-time ceiling in hours.") + _add_common_dataset_args(p_rec) + p_rec.set_defaults(func=cmd_recommend) + + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.WARNING, + format="%(levelname)s %(name)s: %(message)s", + ) + return int(args.func(args)) + + +if __name__ == "__main__": + main() diff --git a/src/autointent/_advisor/_estimates/__init__.py b/src/autointent/_advisor/_estimates/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/autointent/_advisor/_estimates/_formulas.py b/src/autointent/_advisor/_estimates/_formulas.py new file mode 100644 index 000000000..8bbbdcebf --- /dev/null +++ b/src/autointent/_advisor/_estimates/_formulas.py @@ -0,0 +1,293 @@ +"""Pure cost-estimate formulas — VRAM, RAM, time, severity, model shape. + +No I/O, no logging, no orchestration. Each formula docstring links to the +reference it was calibrated against so a reviewer can follow each coefficient +back to its source. + +Conventions: + * All ``*_gb`` results use the binary GiB convention (1024**3 bytes per GB) — + matches the rest of the advisor's byte->GB conversions. + * All ``*_hours`` results assume the GPU baseline of ~1 second per step; + CPU runs pay a flat slowdown factor (see ``_time_for_transformer``). + * "fp32 worst case" — we deliberately ignore lower-precision / FlashAttention / + quantization optimizations, per the advisor's "pessimistic upper bound" contract. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from autointent._advisor._report import Severity + +if TYPE_CHECKING: + from autointent._advisor._hub import ModelMeta + from autointent._advisor._report import DatasetStats + + +_BYTES_PER_GB = 1024**3 +_DEFAULT_SEQ_LEN = 128 + +# Fallback architecture shape (BERT-base) used only when the model's actual +# config.json couldn't be fetched from HF Hub — see _hub._shape_from_config. +_DEFAULT_HIDDEN = 768 +_DEFAULT_LAYERS = 12 + +_TIGHT_RATIO = 0.9 +_MULTICLASS_THRESHOLD = 2 + + +def _classify_severity(estimate: float, budget: float) -> Severity: + """Map a ``(estimate, budget)`` pair onto a Severity bucket. + + * AMPLE: ``estimate <= 0`` OR ``ratio < _TIGHT_RATIO`` + * TIGHT: ``budget <= 0`` OR ``_TIGHT_RATIO <= ratio < 1`` + * OVER: ``ratio >= 1`` + """ + if estimate <= 0: + return Severity.AMPLE + if budget <= 0: + return Severity.TIGHT + ratio = estimate / budget + if ratio >= 1: + return Severity.OVER + if ratio >= _TIGHT_RATIO: + return Severity.TIGHT + return Severity.AMPLE + + +def _weights_vram_for_transformer(meta: ModelMeta, mode: str) -> float: + """Weight-side VRAM in GB — weights + grads + optimizer state. Excludes activations. + + Returns a deliberately pessimistic upper bound, matching the advisor's + "heuristic upper bound, not measurement" contract. + + Modes: + * ``inference``: forward only — weights + ~30% intermediate-tensor overhead. + * ``lora``: frozen base + small trainable adapters + their grads/optimizer (~0.5 GB). + * ``full-finetune`` (default): the textbook 4W (weights + grads + Adam m + Adam v). + We use 4.5W to leave headroom for loss-scale buffers, allocator fragmentation, + cuDNN workspaces, and gradient-accumulation buffers — none of which the textbook + 4W accounting captures. + """ + weights_gb = meta.weights_gb + if mode == "inference": + return weights_gb * 1.3 + if mode == "lora": + return weights_gb * 1.3 + 0.5 + return weights_gb * 4.5 + + +def _activations_gb_per_sample( + meta: ModelMeta | None, + seq_len: int, + *, + is_training: bool, +) -> float: + """Heuristic activation memory per sample, assuming a fp32 worst case. + + Training: ``seq_len x hidden x layers x const`` — per-layer outputs are kept + for backward. + Inference: ``seq_len x hidden x const`` — only one or two layers' outputs in + flight at once. + """ + hidden = _embedder_dim(meta) + # Training keeps every layer's outputs for backward -> scales x n_layers. + # 16 bytes/token/layer ~ fp32 activation (4B) x ~4x backward overhead (Korthikanti et al.). + # Inference only holds ~1-2 layers' outputs in flight at once. + bytes_per_sample = seq_len * hidden * _n_layers(meta) * 16 if is_training else seq_len * hidden * 8 + return bytes_per_sample / _BYTES_PER_GB + + +def _vram_for_transformer( + meta: ModelMeta, + mode: str, + *, + batch_size: int = 0, + seq_len: int = _DEFAULT_SEQ_LEN, +) -> float: + """Total VRAM in GB: weights + grads + optimizer state + activations x batch. + + Activation accounting differs by mode — training keeps per-layer outputs for + backward; inference only needs one or two layers in flight. + """ + base = _weights_vram_for_transformer(meta, mode) + if batch_size <= 0: + return base + per_sample = _activations_gb_per_sample(meta, seq_len, is_training=mode != "inference") + return base + per_sample * batch_size + + +def _max_fitting_batch_size( + *, + weight_vram_gb: float, + vram_budget_gb: float, + per_sample_gb: float, +) -> int: + """Largest batch that keeps total VRAM under the AMPLE/TIGHT threshold. + + Returns 0 when even the weights blow the budget. Result is rounded down to + the nearest power of two + """ + if per_sample_gb <= 0: + return 0 + target_vram = vram_budget_gb * _TIGHT_RATIO + available_for_activations = target_vram - weight_vram_gb + if available_for_activations <= 0: + return 0 + return _floor_to_power_of_two(int(available_for_activations / per_sample_gb)) + + +_CPU_SLOWDOWN_FACTOR = 50.0 +"""Rough multiplier for transformer training on CPU vs. a modern GPU. + +Real benchmarks vary widely (30x for small BERTs on AVX-512 boxes, 100x+ for +billion-scale models on a stock laptop). A single 50x constant is a pessimistic +upper bound that's good enough to make the CPU/GPU distinction visible without +re-introducing the per-device tier table.""" + + +def _time_for_transformer( + *, + n_trials: int, + epochs: int, + batch_size: int, + n_samples: int, + accelerator: str, +) -> float: + """Transformer training time in hours. + + Baseline is "1 second per step" on a GPU (CUDA / MPS) — a step-count proxy, + not a real wall-time calibration. CPU training pays a flat ``_CPU_SLOWDOWN_FACTOR`` + so the report doesn't hide the fact that the same workload is dramatically + slower without a GPU. Users should treat absolute numbers as ordering / + ballpark information, not a budget. + """ + steps = max(1, (n_samples // max(1, batch_size))) * epochs + h = (n_trials * steps) / 3600.0 + if accelerator == "cpu": + h *= _CPU_SLOWDOWN_FACTOR + return h + + +def _n_layers(meta: ModelMeta | None) -> int: + """Layer count from the model's ``config.json``; falls back to BERT-base when absent.""" + if meta is not None and meta.n_layers is not None: + return meta.n_layers + return _DEFAULT_LAYERS + + +def _embedder_dim(meta: ModelMeta | None) -> int: + """Hidden size from the model's ``config.json``; falls back to BERT-base when absent.""" + if meta is not None and meta.hidden_size is not None: + return meta.hidden_size + return _DEFAULT_HIDDEN + + +def _largest_embedder(seen_models: dict[str, ModelMeta]) -> ModelMeta | None: + """Return the largest model in ``seen_models`` by parameter count, or None if empty.""" + if not seen_models: + return None + return max(seen_models.values(), key=lambda m: m.total_params) + + +def _ram_for_module(meta: ModelMeta, stats: DatasetStats) -> float: + """RAM in GB. Loose upper bound: weights + tokenized text in memory. + + Tokenized text is approximated as ``n_samples x avg_tokens x 4 bytes`` + (BPE/WordPiece token ids fit in int32). The 4 bytes/token bound is tight + enough for the report's purposes and intentionally ignores any preprocessing + artefacts (attention masks, position ids, etc.) since they're bounded by the + same factor. + """ + return meta.weights_gb + (stats.n_samples * stats.avg_tokens * 4) / _BYTES_PER_GB + + +# Coefficients are dimensional (per-sample-per-feature-per-iteration seconds) +# rather than empirically tuned constants — they give relative-cost ordering +# across configurations and absolute ballpark wall-times. +_LINEAR_CPU_S_PER_SAMPLE_FEATURE_ITER = 1e-8 +_CATBOOST_CPU_S_PER_SAMPLE_FEATURE_ITER = 1e-9 +_CATBOOST_GPU_SPEEDUP = 10.0 +# LogisticRegressionCV defaults: Cs=10, cv=3 -> 10x3 inner fits + 1 final refit = 31. +_LOGREG_CV_MULTIPLIER = 31 +# Default value of `border_count` in CatBoost (number of histogram buckets per feature). +_CATBOOST_DEFAULT_BINS = 254 +# Bytes per histogram bucket / tree node — order-of-magnitude constant. +_CATBOOST_BYTES_PER_TREE_NODE = 32 + + +def _ram_for_linear(*, stats: DatasetStats, embedder_dim: int) -> float: + """Float64 design matrix dominates; coefficients and L-BFGS history are small.""" + data_bytes = 8.0 * stats.n_samples * embedder_dim + coef_bytes = 8.0 * max(1, stats.n_classes) * embedder_dim + lbfgs_bytes = 10.0 * 8.0 * embedder_dim + return (data_bytes + coef_bytes + lbfgs_bytes) / _BYTES_PER_GB + + +def _time_for_linear( + *, + n_trials: int, + n_samples: int, + embedder_dim: int, + max_iter: int, + cv_multiplier: int, + class_multiplier: int, +) -> float: + """LogisticRegression wall time, in hours. + + Cost is ``O(n_samples x n_features x max_iter x n_classes)`` per fit + (sklearn's L-BFGS solver), multiplied by the CV inner-fit count (31 for the + default LogisticRegressionCV). + """ + seconds = ( + n_trials + * _LINEAR_CPU_S_PER_SAMPLE_FEATURE_ITER + * n_samples + * embedder_dim + * max_iter + * cv_multiplier + * class_multiplier + ) + return seconds / 3600.0 + + +def _ram_for_catboost(*, stats: DatasetStats, n_features: int, iterations: int, depth: int) -> float: + """CatBoost RAM = quantized data matrix + histograms + tree storage.""" + data_bytes = 4.0 * stats.n_samples * n_features + histograms_bytes = 4.0 * n_features * _CATBOOST_DEFAULT_BINS + trees_bytes = iterations * (2**depth) * _CATBOOST_BYTES_PER_TREE_NODE + return float((data_bytes + histograms_bytes + trees_bytes) / _BYTES_PER_GB) + + +def _time_for_catboost( + *, + n_trials: int, + n_samples: int, + n_features: int, + iterations: int, + depth: int, + class_multiplier: int, + on_gpu: bool, +) -> float: + """CatBoost wall time, in hours. + + Cost is ``O(iterations x n_samples x n_features x depth x n_classes)`` per + fit. GPU training is ~10x faster than CPU for typical workloads per + CatBoost's published benchmarks. + https://catboost.ai/en/docs/concepts/speed-up-training + """ + coeff = _CATBOOST_CPU_S_PER_SAMPLE_FEATURE_ITER + if on_gpu: + coeff /= _CATBOOST_GPU_SPEEDUP + seconds = n_trials * iterations * coeff * n_samples * n_features * depth * class_multiplier + return seconds / 3600.0 + + +def _floor_to_power_of_two(n: int) -> int: + """Largest power of two <= ``n``; returns 0 when ``n < 1``.""" + if n < 1: + return 0 + power = 1 + while power * 2 <= n: + power *= 2 + return power diff --git a/src/autointent/_advisor/_estimates/_resource.py b/src/autointent/_advisor/_estimates/_resource.py new file mode 100644 index 000000000..6ed93578c --- /dev/null +++ b/src/autointent/_advisor/_estimates/_resource.py @@ -0,0 +1,406 @@ +"""Resource-phase orchestration. + +Walks the validated search space, asks ``_formulas`` for per-module costs, +aggregates them into a ``ResourceEstimate``, and emits VRAM/RAM/disk/time +findings on the report. + +The public entry is ``_resource_phase`` at the bottom; everything above it is +private machinery. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from autointent._advisor import _hub +from autointent._advisor._report import ResourceEstimate, Severity +from autointent.configs._embedder import ( + EmbedderConfig, + OpenaiEmbeddingConfig, + SentenceTransformerEmbeddingConfig, + VllmEmbeddingConfig, +) + +from ._formulas import ( + _DEFAULT_SEQ_LEN, + _LOGREG_CV_MULTIPLIER, + _MULTICLASS_THRESHOLD, + _activations_gb_per_sample, + _classify_severity, + _embedder_dim, + _largest_embedder, + _max_fitting_batch_size, + _ram_for_catboost, + _ram_for_linear, + _ram_for_module, + _time_for_catboost, + _time_for_linear, + _time_for_transformer, + _vram_for_transformer, + _weights_vram_for_transformer, +) +from ._search_space import _extract_model_names, _max_int, _walk_modules_indexed + +if TYPE_CHECKING: + from autointent._advisor._hardware import HardwareProfile + from autointent._advisor._hub import ModelMeta + from autointent._advisor._report import DatasetStats, PreflightReport + + +# Union variants of EmbedderConfig that carry a model_name attribute. +# HashingVectorizerEmbeddingConfig and the bare BaseEmbedderConfig don't have +# one (sklearn vectorizer / abstract base), so we filter them out below. +_MODEL_BACKED_EMBEDDERS = ( + SentenceTransformerEmbeddingConfig, + OpenaiEmbeddingConfig, + VllmEmbeddingConfig, +) + + +def _embedder_model_name(embedder: EmbedderConfig) -> str | None: + """Return the embedder's model_name when the config variant carries one.""" + if isinstance(embedder, _MODEL_BACKED_EMBEDDERS): + return embedder.model_name + return None + + +# Maps each fine-tunable transformer module to its training-mode label. +# Modules not listed (or listed as "inference") run the encoder forward-only. +# Note: dnnc keeps the cross-encoder frozen and trains an sklearn LogisticRegressionCV +# head on top of its features (see autointent._wrappers.ranker.Ranker._fit), so the +# encoder's VRAM profile matches inference rather than fine-tuning. +_TRANSFORMER_TRAINING_MODE = { + "bert": "full-finetune", + "ptuning": "lora", + "lora": "lora", +} + + +@dataclass +class _ModuleEstimate: + """Per-module cost contribution + the dict that gets rendered in the report.""" + + driver: dict[str, Any] + vram_gb: float + ram_gb: float + time_hours: float + model_weights_gb: float = 0.0 + + +def _refit_factor(*, refit_after: bool, n_trials: int) -> float: + """Wall-time multiplier for ``refit_after=True`` (amortized 1/n_trials extra).""" + return 1 + 1.0 / max(1, n_trials) if refit_after else 1.0 + + +def _split_entries( + search_space: list[dict[str, Any]], +) -> tuple[list[tuple[int, str, dict[str, Any]]], list[tuple[int, str, dict[str, Any]]]]: + """Partition search-space entries into (transformer-bearing, classic).""" + transformer: list[tuple[int, str, dict[str, Any]]] = [] + classic: list[tuple[int, str, dict[str, Any]]] = [] + for node_idx, node_type, entry in _walk_modules_indexed(search_space): + bucket = classic if entry.get("module_name") in {"linear", "catboost"} else transformer + bucket.append((node_idx, node_type, entry)) + return transformer, classic + + +def _estimate_transformer_model( + *, + meta: ModelMeta, + entry: dict[str, Any], + node_type: str, + module: str, + name: str, + stats: DatasetStats, + hardware: HardwareProfile, + n_trials: int, + refit_after: bool, +) -> _ModuleEstimate: + """One row of cost for a transformer module + a specific model checkpoint.""" + mode = _TRANSFORMER_TRAINING_MODE.get(module, "inference") + batch_size = _max_int(entry.get("batch_size"), 32) + epochs = _max_int(entry.get("num_train_epochs"), 1 if mode == "inference" else 10) + seq_len = _max_int(entry.get("max_length"), _DEFAULT_SEQ_LEN) + + vram = _vram_for_transformer(meta, mode, batch_size=batch_size, seq_len=seq_len) + ram = _ram_for_module(meta, stats) + + driver_max_batch: int | None = None + if hardware.vram_gb > 0: + driver_max_batch = _max_fitting_batch_size( + weight_vram_gb=_weights_vram_for_transformer(meta, mode), + vram_budget_gb=hardware.vram_gb, + per_sample_gb=_activations_gb_per_sample(meta, seq_len, is_training=mode != "inference"), + ) + + time_h = _time_for_transformer( + n_trials=n_trials, + epochs=epochs, + batch_size=batch_size, + n_samples=stats.n_samples, + accelerator=hardware.accelerator, + ) + if mode != "inference": + time_h *= _refit_factor(refit_after=refit_after, n_trials=n_trials) + + return _ModuleEstimate( + driver={ + "node_type": node_type, + "module": module, + "model": name, + "mode": mode, + "vram_gb": round(vram, 2), + "ram_gb": round(ram, 2), + "time_hours": round(time_h, 2), + "batch_size": batch_size, + "max_batch_size": driver_max_batch, + "confidence": meta.confidence, + }, + vram_gb=vram, + ram_gb=ram, + time_hours=time_h, + model_weights_gb=meta.weights_gb, + ) + + +def _estimate_classic_entry( + *, + entry: dict[str, Any], + node_type: str, + embedder_meta: ModelMeta | None, + embedder_dim: int, + stats: DatasetStats, + hardware: HardwareProfile, + n_trials: int, + refit_after: bool, +) -> _ModuleEstimate | None: + """Cost row for a linear or catboost scorer (returns ``None`` for any other module).""" + module = entry.get("module_name", "?") + refit = _refit_factor(refit_after=refit_after, n_trials=n_trials) + # Both multinomial (multiclass) and one-vs-rest (multilabel) LR scale linearly in n_classes. + class_multiplier = max(1, stats.n_classes) + + if module == "linear": + cv_multiplier = 1 if stats.multilabel else _LOGREG_CV_MULTIPLIER + ram = _ram_for_linear(stats=stats, embedder_dim=embedder_dim) + time_h = ( + _time_for_linear( + n_trials=n_trials, + n_samples=stats.n_samples, + embedder_dim=embedder_dim, + max_iter=_max_int(entry.get("max_iter"), 100), + cv_multiplier=cv_multiplier, + class_multiplier=class_multiplier, + ) + * refit + ) + vram = 0.0 + mode = "linear-cv" if cv_multiplier > 1 else "linear" + elif module == "catboost": + on_gpu = entry.get("task_type") == "GPU" and hardware.accelerator == "cuda" + # CatBoost MultiClass loss grows per-class trees only above binary; binary uses + # Logloss with one tree per iteration. + cb_class_mult = class_multiplier if stats.n_classes > _MULTICLASS_THRESHOLD or stats.multilabel else 1 + iterations = _max_int(entry.get("iterations"), 1000) + depth = _max_int(entry.get("depth"), 6) + ram_total = _ram_for_catboost(stats=stats, n_features=embedder_dim, iterations=iterations, depth=depth) + time_h = ( + _time_for_catboost( + n_trials=n_trials, + n_samples=stats.n_samples, + n_features=embedder_dim, + iterations=iterations, + depth=depth, + class_multiplier=cb_class_mult, + on_gpu=on_gpu, + ) + * refit + ) + vram, ram = (ram_total, 0.0) if on_gpu else (0.0, ram_total) + mode = "catboost-gpu" if on_gpu else "catboost" + else: + return None + + return _ModuleEstimate( + driver={ + "node_type": node_type, + "module": module, + "model": embedder_meta.name if embedder_meta else "(no embedder)", + "mode": mode, + "vram_gb": round(vram, 2), + "ram_gb": round(ram, 2), + "time_hours": round(time_h, 2), + "batch_size": None, + "max_batch_size": None, + "confidence": embedder_meta.confidence if embedder_meta else "heuristic", + }, + vram_gb=vram, + ram_gb=ram, + time_hours=time_h, + ) + + +def _aggregate_disk( + estimate: ResourceEstimate, + seen_models: dict[str, ModelMeta], + node_max_weights: dict[int, float], + *, + dump_modules: bool, + n_trials: int, +) -> None: + """Fold per-model download/cached sizes into ``estimate`` and apply dump-modules accounting.""" + for meta in seen_models.values(): + if meta.cached_locally: + estimate.disk_cached_gb += meta.disk_gb + else: + estimate.disk_download_gb += meta.disk_gb + if dump_modules: + # Each trial selects one variant per node, so per-trial dumped weights + # are bounded by the heaviest module in each node, summed across nodes. + estimate.disk_dump_gb = sum(node_max_weights.values()) * n_trials + + +def _emit_resource_findings( + report: PreflightReport, + estimate: ResourceEstimate, + hardware: HardwareProfile, + *, + n_jobs: int, +) -> None: + """Translate aggregated estimates into VRAM/RAM/disk/time findings on the report.""" + parallel_gpu = n_jobs > 1 and hardware.accelerator in {"cuda", "mps"} + effective_vram = estimate.vram_gb * n_jobs if parallel_gpu else estimate.vram_gb + # MPS shares one unified pool: parallel workers each allocate weights+activations + # in RAM, so peak RAM also scales with n_jobs on Apple Silicon. + effective_ram = estimate.ram_gb * n_jobs if n_jobs > 1 and hardware.accelerator == "mps" else estimate.ram_gb + + if hardware.accelerator == "cpu" and effective_vram > 0: + report.add( + "resource", + Severity.TIGHT, + f"No GPU detected; transformer modules will be very slow (worst case ~{estimate.time_hours:.1f} h).", + metric="vram", + ) + else: + msg = f"VRAM ~{effective_vram:.1f} GB" + if n_jobs > 1: + msg += f" (= per-trial {estimate.vram_gb:.1f} GB x {n_jobs} parallel trials)" + msg += f" vs available {hardware.vram_gb:.1f} GB" + report.add("resource", _classify_severity(effective_vram, hardware.vram_gb), msg, metric="vram") + + report.add( + "resource", + _classify_severity(effective_ram, hardware.ram_gb), + f"RAM ~{effective_ram:.1f} GB vs available {hardware.ram_gb:.1f} GB", + metric="ram", + ) + + disk_total = estimate.disk_download_gb + estimate.disk_dump_gb + disk_msg = f"Disk ~{estimate.disk_download_gb:.1f} GB to download" + if estimate.disk_cached_gb > 0: + disk_msg += f", {estimate.disk_cached_gb:.1f} GB already cached" + if estimate.disk_dump_gb > 0: + disk_msg += f", +{estimate.disk_dump_gb:.1f} GB during training (dump_modules=True)" + disk_msg += f" vs {hardware.free_disk_gb:.0f} GB free" + report.add("resource", _classify_severity(disk_total, hardware.free_disk_gb), disk_msg, metric="disk") + + if estimate.time_hours > 0: + report.add( + "resource", + Severity.AMPLE, + f"Time ~{estimate.time_hours:.1f} h (worst case, no HPO pruning)", + metric="time", + ) + + +def _resource_phase( + *, + embedder_config: EmbedderConfig, + search_space: list[dict[str, Any]], + n_trials: int, + n_jobs: int, + dump_modules: bool, + stats: DatasetStats, + hardware: HardwareProfile, + report: PreflightReport, + refit_after: bool = False, +) -> None: + """Walk the validated search space, fold per-module costs into the report. + + Two passes: transformer-bearing modules first (collects ``seen_models`` so + the largest model can drive ``embedder_dim`` for the classic pass), then + linear / catboost. Disk, VRAM/RAM peak, time sum, and final findings are + folded onto the report. + """ + seen_models: dict[str, ModelMeta] = {} + global_embedder = _embedder_model_name(embedder_config) + if global_embedder: + seen_models[global_embedder] = _hub.resolve_model(global_embedder) + + transformer_entries, classic_entries = _split_entries(search_space) + + # First pass: transformer modules (also populates seen_models for the classic pass). + module_estimates: list[_ModuleEstimate] = [] + node_max_weights: dict[int, float] = {} + for node_idx, node_type, entry in transformer_entries: + module = entry.get("module_name", "?") + model_names = _extract_model_names(entry) + if not model_names and global_embedder and module in {"knn", "mlknn"}: + model_names = [global_embedder] + for name in model_names: + meta = seen_models.setdefault(name, _hub.resolve_model(name)) + me = _estimate_transformer_model( + meta=meta, + entry=entry, + node_type=node_type, + module=module, + name=name, + stats=stats, + hardware=hardware, + n_trials=n_trials, + refit_after=refit_after, + ) + module_estimates.append(me) + # Track heaviest weight per node so dump_modules is bounded by one + # selected variant per node x n_trials, not the sum of all candidates. + node_max_weights[node_idx] = max(node_max_weights.get(node_idx, 0.0), me.model_weights_gb) + + # Second pass: linear / catboost — cost depends on embedder_dim, not a checkpoint. + embedder_meta = _largest_embedder(seen_models) + embedder_dim_val = _embedder_dim(embedder_meta) + for _, node_type, entry in classic_entries: + classic_estimate = _estimate_classic_entry( + entry=entry, + node_type=node_type, + embedder_meta=embedder_meta, + embedder_dim=embedder_dim_val, + stats=stats, + hardware=hardware, + n_trials=n_trials, + refit_after=refit_after, + ) + if classic_estimate is not None: + module_estimates.append(classic_estimate) + + estimate = ResourceEstimate(parallel_factor=n_jobs) + for me in module_estimates: + estimate.vram_gb = max(estimate.vram_gb, me.vram_gb) + estimate.ram_gb = max(estimate.ram_gb, me.ram_gb) + estimate.time_hours += me.time_hours + estimate.drivers.append(me.driver) + + _aggregate_disk(estimate, seen_models, node_max_weights, dump_modules=dump_modules, n_trials=n_trials) + + # Flip low_confidence if any model fell back to the heuristic path (Hub + # unreachable, repo missing safetensors metadata, local-path checkpoint). + heuristic_models = [m.name for m in seen_models.values() if m.confidence == "heuristic"] + if heuristic_models: + report.low_confidence = True + report.notes.append( + f"Heuristic fallback used for {len(heuristic_models)} model(s) - sizes are BERT-base " + f"defaults: {', '.join(heuristic_models[:3])}{'...' if len(heuristic_models) > 3 else ''}", # noqa: PLR2004 + ) + + report.resource = estimate + _emit_resource_findings(report, estimate, hardware, n_jobs=n_jobs) diff --git a/src/autointent/_advisor/_estimates/_search_space.py b/src/autointent/_advisor/_estimates/_search_space.py new file mode 100644 index 000000000..577f500e6 --- /dev/null +++ b/src/autointent/_advisor/_estimates/_search_space.py @@ -0,0 +1,75 @@ +"""Walk preset / OptimizationConfig search-space dicts and extract module info. + +This module is the only place that knows the nested shape of the preset YAML: +``search_space -> list of nodes -> each node has its own search_space -> list of +module entries``. All other modules in the package consume the flattened +``(node_idx, node_type, entry)`` triples this file yields. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import Iterable + + +def _extract_model_names(module_entry: dict[str, Any]) -> list[str]: + """Pull model name(s) from a search-space module entry. + + Each module entry can declare zero or more model candidates under + ``classification_model_config`` and/or ``embedder_config``; both keys may be + a single dict or a list of dicts, and only entries with ``model_name`` are + kept. + """ + candidates: list[str] = [] + cfg = module_entry.get("classification_model_config") + if isinstance(cfg, list): + candidates.extend(c["model_name"] for c in cfg if isinstance(c, dict) and c.get("model_name")) + elif isinstance(cfg, dict) and cfg.get("model_name"): + candidates.append(cfg["model_name"]) + embedder_cfg = module_entry.get("embedder_config") + if isinstance(embedder_cfg, list): + candidates.extend(c["model_name"] for c in embedder_cfg if isinstance(c, dict) and c.get("model_name")) + elif isinstance(embedder_cfg, dict) and embedder_cfg.get("model_name"): + candidates.append(embedder_cfg["model_name"]) + return candidates + + +def _max_int(value: Any, default: int) -> int: # noqa: ANN401 + """Coerce a search-space distribution descriptor into an int upper bound. + + Accepts a plain int, a list of candidate values (returns the max), or an + Optuna-style ``{"low": ..., "high": ...}`` range dict (returns the high end). + Anything unparseable falls back to ``default``. + """ + if value is None: + return default + if isinstance(value, list) and value: + return max(int(x) for x in value) + if isinstance(value, dict): + return int(value.get("high", default)) + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _walk_modules_indexed( + search_space: list[dict[str, Any]], +) -> Iterable[tuple[int, str, dict[str, Any]]]: + """Yield ``(node_index, node_type, module_entry)`` triples. + + The index lets the resource phase bound per-node max cost — see + ``dump_modules`` accounting in ``_resource.py``. + """ + for node_idx, node in enumerate(search_space or []): + node_type = node.get("node_type", "?") + for entry in node.get("search_space", []) or []: + yield node_idx, node_type, entry + + +def _walk_modules(search_space: list[dict[str, Any]]) -> Iterable[tuple[str, dict[str, Any]]]: + """Yield ``(node_type, module_entry)`` pairs — index-agnostic view.""" + for _, node_type, entry in _walk_modules_indexed(search_space): + yield node_type, entry diff --git a/src/autointent/_advisor/_hardware.py b/src/autointent/_advisor/_hardware.py new file mode 100644 index 000000000..6aa9741ee --- /dev/null +++ b/src/autointent/_advisor/_hardware.py @@ -0,0 +1,142 @@ +"""Local hardware detection. + +Probes CPU / RAM / disk and the highest-priority accelerator available +(CUDA -> MPS -> CPU). All probes are wrapped to fall back safely on a +broken install (e.g. CUDA driver mismatch) rather than crash the advisor. +""" + +from __future__ import annotations + +import logging +import os +import platform +import shutil +from dataclasses import dataclass, field +from pathlib import Path +from typing import Literal + +import psutil +import torch + +logger = logging.getLogger(__name__) + +Accelerator = Literal["cuda", "mps", "cpu"] + +# matches macOS PYTORCH_MPS_HIGH_WATERMARK_RATIO default +MPS_DEFAULT_BUDGET_RATIO = 0.7 + +_HIGH_GPU_VRAM_GB = 24 +_MID_GPU_VRAM_GB = 12 +_BYTES_PER_GB = 1024**3 # binary GiB convention; matches all advisor byte->GB conversions + + +@dataclass +class HardwareProfile: + accelerator: Accelerator + device_name: str + vram_gb: float + ram_gb: float + free_disk_gb: float + cpu_count: int + notes: list[str] = field(default_factory=list) + + @property + def device_class(self) -> str: + if self.accelerator == "cpu": + return "cpu" + if self.accelerator == "mps": + return "apple-silicon" + if self.vram_gb >= _HIGH_GPU_VRAM_GB: + return "high-gpu" + if self.vram_gb >= _MID_GPU_VRAM_GB: + return "mid-gpu" + return "low-gpu" + + +def _detect_ram_gb() -> float: + return float(psutil.virtual_memory().total) / _BYTES_PER_GB + + +def _detect_free_disk_gb(path: str | None = None) -> float: + cache = Path(path or os.environ.get("HF_HOME") or Path("~/.cache/huggingface").expanduser()) + probe_path = cache if cache.exists() else Path("~").expanduser() + try: + usage = shutil.disk_usage(probe_path) + return usage.free / _BYTES_PER_GB + except OSError as e: + logger.debug("disk usage probe failed at %s: %s", probe_path, e) + return 0.0 + + +def _detect_cuda() -> tuple[float, str] | None: + if not torch.cuda.is_available(): + return None + idx = 0 + try: + _free, total = torch.cuda.mem_get_info(idx) + vram_gb = total / _BYTES_PER_GB + except (RuntimeError, AttributeError) as e: + logger.debug("torch.cuda.mem_get_info failed: %s", e) + return None + name = torch.cuda.get_device_name(idx) + return vram_gb, name + + +def _detect_mps(ram_gb: float, budget_ratio: float = MPS_DEFAULT_BUDGET_RATIO) -> tuple[float, str] | None: + if not (hasattr(torch.backends, "mps") and torch.backends.mps.is_available()): + return None + # apple silicon: unified memory; budget is fraction of total RAM + return ram_gb * budget_ratio, f"Apple Silicon ({platform.machine()})" + + +def detect_hardware( + *, + vram_budget_gb: float | None = None, + mps_budget_ratio: float = MPS_DEFAULT_BUDGET_RATIO, +) -> HardwareProfile: + """Detect the local hardware, with optional manual overrides. + + Args: + vram_budget_gb: when set, overrides the detected VRAM (use for + shared-GPU machines where part of the device is taken). + mps_budget_ratio: fraction of total RAM treated as the MPS + "VRAM" budget on Apple Silicon. + + Returns: + HardwareProfile reflecting current machine state. + """ + notes: list[str] = [] + ram_gb = _detect_ram_gb() + free_disk_gb = _detect_free_disk_gb() + cpu_count = os.cpu_count() or 1 + + cuda = _detect_cuda() + if cuda is not None: + vram_gb, device_name = cuda + accel: Accelerator = "cuda" + else: + mps = _detect_mps(ram_gb, mps_budget_ratio) + if mps is not None: + vram_gb, device_name = mps + accel = "mps" + notes.append(f"MPS unified memory: VRAM budget = {mps_budget_ratio:.0%} of RAM.") + else: + vram_gb = 0.0 + device_name = platform.processor() or "cpu" + accel = "cpu" + + if vram_budget_gb is not None: + if vram_gb and vram_budget_gb > vram_gb: + notes.append(f"Manual --budget-vram-gb={vram_budget_gb} exceeds detected {vram_gb:.1f} GB; using override.") + notes.append(f"Using manual VRAM budget: {vram_budget_gb} GB.") + vram_gb = vram_budget_gb + + return HardwareProfile( + accelerator=accel, + device_name=device_name, + vram_gb=vram_gb, + ram_gb=ram_gb, + free_disk_gb=free_disk_gb, + cpu_count=cpu_count, + notes=notes, + ) diff --git a/src/autointent/_advisor/_hub.py b/src/autointent/_advisor/_hub.py new file mode 100644 index 000000000..677e6045c --- /dev/null +++ b/src/autointent/_advisor/_hub.py @@ -0,0 +1,213 @@ +"""HF Hub metadata lookups + warm-cache probe. + +Memoized per-process. Offline-safe: every probe falls back to a +heuristic value rather than raising. The advisor flips the report's +``low_confidence`` flag when a fallback is taken. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from functools import lru_cache +from pathlib import Path +from typing import Literal + +from huggingface_hub import HfApi, hf_hub_download, scan_cache_dir, try_to_load_from_cache + +Confidence = Literal["hub", "heuristic"] + +logger = logging.getLogger(__name__) + +_DEFAULT_HEURISTIC_PARAMS = 110_000_000 +_DEFAULT_BYTES_PER_PARAM = 4 +_BYTES_PER_GB = 1024**3 # using the binary GiB convention everywhere in the advisor + + +@dataclass +class ModelMeta: + name: str + total_params: int + weight_bytes_per_param: float + total_file_bytes: int + cached_locally: bool + confidence: Confidence + hidden_size: int | None = None + n_layers: int | None = None + + @property + def disk_gb(self) -> float: + return self.total_file_bytes / _BYTES_PER_GB + + @property + def weights_gb(self) -> float: + return (self.total_params * self.weight_bytes_per_param) / _BYTES_PER_GB + + +def _shape_from_config(model_name: str) -> tuple[int | None, int | None]: + """Return ``(hidden_size, num_hidden_layers)`` straight from the model's config.json. + + ``hf_hub_download`` caches the file after the first call, so repeated lookups + in the same process (or across CLI invocations) hit local disk. Returns + ``(None, None)`` on any failure — the advisor stays best-effort. + """ + try: + path = hf_hub_download(model_name, "config.json") + except Exception as e: # noqa: BLE001 + logger.debug("config.json download(%s) failed: %s", model_name, e) + return None, None + try: + cfg = json.loads(Path(path).read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as e: + logger.debug("config.json parse(%s) failed: %s", model_name, e) + return None, None + # Cover the common HF naming variants: BERT/Llama/Gemma use hidden_size + + # num_hidden_layers; T5/MT5 use d_model + num_layers; GPT-2/Neo use n_embd + n_layer. + hidden = cfg.get("hidden_size") or cfg.get("d_model") or cfg.get("n_embd") + layers = cfg.get("num_hidden_layers") or cfg.get("num_layers") or cfg.get("n_layer") + return int(hidden) if hidden else None, int(layers) if layers else None + + +def _is_warm_cached(model_name: str) -> bool: + """True when the weight shard is present in the local HF cache.""" + weight_files = ["model.safetensors", "pytorch_model.bin", "model.safetensors.index.json"] + for fname in weight_files: + path = try_to_load_from_cache(model_name, fname) + if isinstance(path, str): + return True + + # sharded models won't match the single-file probe; fall back to a scan + try: + cache = scan_cache_dir() + except Exception as e: # noqa: BLE001 + logger.debug("scan_cache_dir failed: %s", e) + return False + return any(repo.repo_id == model_name for repo in cache.repos) + + +def _hub_metadata(model_name: str) -> ModelMeta | None: + try: + info = HfApi().model_info(model_name, files_metadata=True) + except Exception as e: # noqa: BLE001 + logger.debug("model_info(%s) failed: %s", model_name, e) + return None + # Bytes-per-element for safetensors dtype strings. Used to convert the per-dtype + # parameter counts (info.safetensors.parameters) into a weighted average + # bytes-per-param when a checkpoint stores tensors in multiple dtypes. + _dtype_bytes: dict[str, int] = { + "F64": 8, + "F32": 4, + "F16": 2, + "BF16": 2, + "I64": 8, + "I32": 4, + "I16": 2, + "I8": 1, + "U8": 1, + "BOOL": 1, + } + + total_params = 0 + weight_bytes_per_param: float = _DEFAULT_BYTES_PER_PARAM + if info.safetensors is not None: + params_by_dtype = info.safetensors.parameters or {} + total_params = info.safetensors.total or sum(params_by_dtype.values()) + if total_params: + total_weight_bytes = sum( + _dtype_bytes.get(dtype, _DEFAULT_BYTES_PER_PARAM) * count for dtype, count in params_by_dtype.items() + ) + if total_weight_bytes: + weight_bytes_per_param = total_weight_bytes / total_params + + total_file_bytes = sum(s.size for s in (info.siblings or []) if s.size) + + # Track whether either size came from the Hub or from the name-pattern fallback; + # if any field was filled by heuristic, downgrade confidence so the report flips + # low_confidence rather than misreporting hub-grade accuracy. + confidence: Confidence = "hub" + if total_params == 0: + total_params = _DEFAULT_HEURISTIC_PARAMS + confidence = "heuristic" + + if total_file_bytes == 0: + total_file_bytes = int(total_params * weight_bytes_per_param) + confidence = "heuristic" + + hidden_size, n_layers = _shape_from_config(model_name) + if hidden_size is None or n_layers is None: + logger.warning( + "Could not read hidden_size / num_hidden_layers from config.json for %s; " + "activation-memory estimates will fall back to BERT-base defaults (768 / 12).", + model_name, + ) + + return ModelMeta( + name=model_name, + total_params=total_params, + weight_bytes_per_param=weight_bytes_per_param, + total_file_bytes=total_file_bytes, + cached_locally=_is_warm_cached(model_name), + confidence=confidence, + hidden_size=hidden_size, + n_layers=n_layers, + ) + + +def _heuristic_metadata(model_name: str) -> ModelMeta: + logger.warning( + "Falling back to name-pattern heuristic for %s; " + "activation-memory estimates will use BERT-base defaults (hidden=768, layers=12).", + model_name, + ) + total_file_bytes = _DEFAULT_HEURISTIC_PARAMS * _DEFAULT_BYTES_PER_PARAM + return ModelMeta( + name=model_name, + total_params=_DEFAULT_HEURISTIC_PARAMS, + weight_bytes_per_param=_DEFAULT_BYTES_PER_PARAM, + total_file_bytes=total_file_bytes, + cached_locally=_is_warm_cached(model_name), + confidence="heuristic", + ) + + +def _looks_like_local_path(model_name: str) -> bool: + """True when ``model_name`` is a filesystem path rather than an HF Hub repo id. + + Hub repo ids match ``org/repo``; anything that starts with a path separator, + ``~``, a relative-path prefix, or a Windows drive letter, or contains a + backslash, is treated as a local path. We can't rely on ``Path.is_absolute()`` + alone because POSIX-style absolute paths (``/tmp/...``) are *not* absolute + on Windows. + """ + if model_name.startswith(("local:", "/", "~", "./", "../", "\\\\")): + return True + if "\\" in model_name: + return True + return len(model_name) >= 2 and model_name[1] == ":" and model_name[0].isalpha() # noqa: PLR2004 + + +@lru_cache(maxsize=64) +def resolve_model(model_name: str) -> ModelMeta: + """Resolve metadata for a single model name. Memoized per process. + + Always returns a value — never raises — so the advisor can keep going + on offline machines or for unknown checkpoints. + """ + if _looks_like_local_path(model_name): + return ModelMeta( + name=model_name, + total_params=_DEFAULT_HEURISTIC_PARAMS, + weight_bytes_per_param=_DEFAULT_BYTES_PER_PARAM, + total_file_bytes=0, + cached_locally=True, + confidence="heuristic", + ) + + # _hub_metadata returns None on any failure (network outage, missing repo, + # SDK exception) so we don't need a separate up-front probe. + meta = _hub_metadata(model_name) + if meta is not None: + return meta + + return _heuristic_metadata(model_name) diff --git a/src/autointent/_advisor/_render.py b/src/autointent/_advisor/_render.py new file mode 100644 index 000000000..afd541e2b --- /dev/null +++ b/src/autointent/_advisor/_render.py @@ -0,0 +1,154 @@ +"""Rendering for the pre-flight report. + +Text output is grouped by phase (Resource / Data / Config) plus a Drivers +section and the always-on disclaimer. JSON output dumps the structured +report straight through. +""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from ._report import PreflightReport + +_SEVERITY_TAG = {"ample": "✓", "tight": "⚠", "over": "x"} +_PHASE_ORDER = ("resource", "data", "config") +_PHASE_LABEL = {"resource": "Resource", "data": "Data", "config": "Config"} + + +def _batch_hint(driver: dict[str, Any]) -> str: + """Per-driver batch annotation: '64 -> 32', '64', '64 (no fit)', or ''.""" + bs = driver.get("batch_size") + if bs is None: + return "" + mx = driver.get("max_batch_size") + if mx is None: + return str(bs) + if mx == 0: + return f"{bs} (no fit)" + if mx == bs: + return str(bs) + return f"{bs} -> {mx}" + + +_DRIVERS_LIMIT = 8 +_DRIVERS_HEADERS = ("Node", "Model", "Mode", "VRAM", "Time", "Batch", "Source") + + +def _render_drivers_table(drivers: list[dict[str, Any]]) -> list[str]: + """Format the Drivers of cost section as an aligned table.""" + visible = drivers[:_DRIVERS_LIMIT] + rows: list[tuple[str, ...]] = [ + ( + f"{d['node_type']}.{d['module']}", + str(d["model"]), + str(d["mode"]), + f"{d['vram_gb']:.2f} GB", + f"{d['time_hours']:.2f} h", + _batch_hint(d), + f"[{d['confidence']}]", + ) + for d in visible + ] + + widths = [len(h) for h in _DRIVERS_HEADERS] + for row in rows: + for i, cell in enumerate(row): + widths[i] = max(widths[i], len(cell)) + + # Right-align numeric columns (VRAM @ idx 3, Time @ idx 4); left-align the rest. + right_align = {3, 4} + + def fmt(row: tuple[str, ...]) -> str: + cells = [] + for i, cell in enumerate(row): + if i in right_align: + cells.append(cell.rjust(widths[i])) + else: + cells.append(cell.ljust(widths[i])) + return " " + " ".join(cells).rstrip() + + lines = ["Drivers of cost:", fmt(_DRIVERS_HEADERS), " " + " ".join("─" * w for w in widths)] + lines.extend(fmt(r) for r in rows) + if len(drivers) > _DRIVERS_LIMIT: + lines.append(f" … and {len(drivers) - _DRIVERS_LIMIT} more") + return lines + + +def render_text(report: PreflightReport) -> str: + lines: list[str] = [] + title = "Compute feasibility check" + if report.preset_name: + title += f" — {report.preset_name}" + lines.append(title) + lines.append("─" * len(title)) + + hw = report.hardware + lines.append( + f"Hardware: {hw.get('accelerator', '?')} ({hw.get('device_name', '?')})," + f" {hw.get('vram_gb', 0):.1f} GB VRAM, {hw.get('ram_gb', 0):.0f} GB RAM," + f" {hw.get('free_disk_gb', 0):.0f} GB free disk" + ) + ds = report.dataset + lines.append( + f"Dataset: n_samples={ds.get('n_samples')}, n_classes={ds.get('n_classes')}," + f" avg_tokens={ds.get('avg_tokens')} ({ds.get('source')})" + ) + lines.append("") + + for phase in _PHASE_ORDER: + bucket = [f for f in report.findings if f.phase == phase] + if not bucket: + continue + lines.append(f"{_PHASE_LABEL[phase]}:") + for f in bucket: + tag = _SEVERITY_TAG.get(f.severity.value, "·") + lines.append(f" {tag} {f.message}") + lines.append("") + + if report.resource.drivers: + lines.extend(_render_drivers_table(report.resource.drivers)) + lines.append("") + + if report.notes: + lines.append("Notes:") + lines.extend(f" • {note}" for note in report.notes) + lines.append("") + + summary = f"Verdict: {'feasible' if report.is_feasible else 'INFEASIBLE'} " + summary += f"(headroom: {report.headroom.value})" + if report.low_confidence: + summary += " — low-confidence (heuristic fallback in use)" + lines.append(summary) + lines.append("Note: estimates are heuristic upper bounds, not measurements.") + return "\n".join(lines) + + +def render_json(report: PreflightReport) -> str: + return json.dumps(report.to_dict(), indent=2, default=str) + + +def render_recommendation( + results: list[tuple[str, PreflightReport]], + chosen: str | None, +) -> str: + """Compact table for the ``recommend`` subcommand.""" + lines = ["", "Recommendation:"] + if chosen: + lines.append(f" -> {chosen}") + else: + lines.append(" -> none of the bundled presets fit your hardware as-is.") + lines.append("") + lines.append(f"{'Preset':<24} {'Status':<14} {'VRAM':<10} {'Time':<10} {'Headroom':<10}") + lines.append("-" * 68) + for name, report in results: + verdict = "feasible" if report.is_feasible else "infeasible" + lines.append( + f"{name:<24} {verdict:<14} " + f"{report.resource.vram_gb:>4.1f} GB " + f"{report.resource.time_hours:>4.1f} h " + f"{report.headroom.value:<8}" + ) + return "\n".join(lines) diff --git a/src/autointent/_advisor/_report.py b/src/autointent/_advisor/_report.py new file mode 100644 index 000000000..c9fd920f4 --- /dev/null +++ b/src/autointent/_advisor/_report.py @@ -0,0 +1,130 @@ +"""Dataclasses for the pre-flight advisor's structured report.""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from enum import Enum +from typing import Any, Literal + + +class Severity(str, Enum): + AMPLE = "ample" + TIGHT = "tight" + OVER = "over" + + +Phase = Literal["resource", "data", "config"] + + +@dataclass(frozen=True) +class Finding: + """A single advisor finding rendered as one line in the summary.""" + + phase: Phase + severity: Severity + message: str + metric: str | None = None + + +@dataclass +class ResourceEstimate: + """Aggregated resource numbers across the search space.""" + + disk_download_gb: float = 0.0 + disk_cached_gb: float = 0.0 + disk_dump_gb: float = 0.0 + ram_gb: float = 0.0 + vram_gb: float = 0.0 + time_hours: float = 0.0 + parallel_factor: int = 1 + drivers: list[dict[str, Any]] = field(default_factory=list) + + @property + def total_disk_gb(self) -> float: + return self.disk_download_gb + self.disk_dump_gb + + +@dataclass +class DatasetStats: + """Minimal stats the advisor needs about the user's dataset. + + Built either from a real ``Dataset`` or from CLI placeholder flags. + """ + + n_samples: int + n_classes: int + avg_tokens: int + p95_tokens: int | None = None + multilabel: bool = False + has_descriptions: bool | None = None + rare_classes: list[str] = field(default_factory=list) + source: str = "placeholder" + + @classmethod + def placeholder( + cls, + n_samples: int = 1_000, + n_classes: int = 10, + avg_tokens: int = 32, + multilabel: bool = False, + ) -> DatasetStats: + return cls( + n_samples=n_samples, + n_classes=n_classes, + avg_tokens=avg_tokens, + p95_tokens=int(avg_tokens * 2.5), + multilabel=multilabel, + ) + + +@dataclass +class PreflightReport: + """One report covering all three phases.""" + + findings: list[Finding] = field(default_factory=list) + resource: ResourceEstimate = field(default_factory=ResourceEstimate) + hardware: dict[str, Any] = field(default_factory=dict) + dataset: dict[str, Any] = field(default_factory=dict) + preset_name: str | None = None + low_confidence: bool = False + notes: list[str] = field(default_factory=list) + + def add(self, phase: Phase, severity: Severity, message: str, metric: str | None = None) -> None: + self.findings.append(Finding(phase=phase, severity=severity, message=message, metric=metric)) + + @property + def headroom(self) -> Severity: + """Worst headroom level across all findings — the column shown in CLI reports.""" + order = {Severity.AMPLE: 0, Severity.TIGHT: 1, Severity.OVER: 2} + if not self.findings: + return Severity.AMPLE + return max((f.severity for f in self.findings), key=lambda s: order[s]) + + @property + def is_feasible(self) -> bool: + return self.headroom != Severity.OVER + + def to_dict(self) -> dict[str, Any]: + d = asdict(self) + d["findings"] = [{**asdict(f), "severity": f.severity.value} for f in self.findings] + d["headroom"] = self.headroom.value + d["is_feasible"] = self.is_feasible + return d + + +@dataclass +class RecommendationResult: + """Output of the recommend workflow: ranked per-preset reports plus the pick. + + ``chosen`` is the best feasible preset name, or ``None`` if none fit. + ``results`` is the full per-preset report list in evaluation order. + """ + + chosen: str | None + results: list[tuple[str, PreflightReport]] + + def to_dict(self) -> dict[str, Any]: + return { + "chosen": self.chosen, + "results": [{"preset": name, "report": r.to_dict()} for name, r in self.results], + } diff --git a/src/autointent/_advisor/runner.py b/src/autointent/_advisor/runner.py new file mode 100644 index 000000000..36fb4fa94 --- /dev/null +++ b/src/autointent/_advisor/runner.py @@ -0,0 +1,170 @@ +"""Public entry point + config validation + data/config phases. + +This file contains the central public function ``run_preflight`` at the top. +Everything below it is supporting machinery for the three phases. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +from pydantic import ValidationError + +from autointent._advisor._estimates._resource import _resource_phase +from autointent._advisor._estimates._search_space import _max_int, _walk_modules +from autointent._advisor._report import PreflightReport, Severity +from autointent._optimization_config import OptimizationConfig + +if TYPE_CHECKING: + from autointent._advisor._hardware import HardwareProfile + from autointent._advisor._report import DatasetStats + + +logger = logging.getLogger(__name__) + + +def run_preflight( + config: dict[str, Any], + stats: DatasetStats, + hardware: HardwareProfile, + *, + preset_name: str | None = None, + refit_after: bool = False, +) -> PreflightReport: + """Run all three preflight phases and return one report. + + Args: + config: parsed preset / ``OptimizationConfig`` dict (top-level keys: + ``search_space``, ``hpo_config``, optional ``embedder_config``, + optional ``logging_config.dump_modules``). + stats: dataset statistics (real or placeholder). + hardware: detected hardware profile. + preset_name: optional friendly name for the report header. + refit_after: matches the ``Pipeline.fit(refit_after=...)`` argument. + When True, time estimates include the extra refit-on-full-data pass. + + Returns: + ``PreflightReport`` with findings across resource / data / config phases. + """ + cfg = _validated_config(config) + report = PreflightReport( + preset_name=preset_name, + hardware={ + "accelerator": hardware.accelerator, + "device_name": hardware.device_name, + "vram_gb": round(hardware.vram_gb, 2), + "ram_gb": round(hardware.ram_gb, 2), + "free_disk_gb": round(hardware.free_disk_gb, 2), + "device_class": hardware.device_class, + }, + dataset={ + "n_samples": stats.n_samples, + "n_classes": stats.n_classes, + "avg_tokens": stats.avg_tokens, + "p95_tokens": stats.p95_tokens, + "multilabel": stats.multilabel, + "source": stats.source, + }, + ) + report.notes.extend(hardware.notes) + + _resource_phase( + embedder_config=cfg.embedder_config, + search_space=cfg.search_space, + n_trials=cfg.hpo_config.n_trials, + n_jobs=cfg.hpo_config.n_jobs, + dump_modules=cfg.logging_config.dump_modules, + stats=stats, + hardware=hardware, + report=report, + refit_after=refit_after, + ) + _data_phase(cfg.search_space, stats, report) + _config_phase(cfg.search_space, cfg.hpo_config.n_jobs, hardware, report) + + return report + + +def _validated_config(config: dict[str, Any]) -> OptimizationConfig: + """Validate ``config`` against the project's canonical ``OptimizationConfig``. + + The advisor is best-effort: a malformed user config should still produce a + report (with placeholder costs) rather than crashing, so any validation + error falls back to the model defaults. + """ + try: + return OptimizationConfig.model_validate(config) + except ValidationError as e: + logger.warning("Advisor config failed validation; falling back to defaults: %s", e) + # OptimizationConfig requires `search_space`; build a minimal valid default. + return OptimizationConfig.model_validate({"search_space": []}) + + +def _config_phase( + search_space: list[dict[str, Any]], + n_jobs: int, + hardware: HardwareProfile, + report: PreflightReport, +) -> None: + """Config-phase checks: parallelism vs. hardware mismatches.""" + if n_jobs > 1 and hardware.accelerator in {"cuda", "mps"}: + report.add( + "config", + Severity.TIGHT, + f"hpo_config.n_jobs={n_jobs} on a single GPU multiplies VRAM demand by {n_jobs}x.", + ) + + uses_catboost_gpu = any( + entry.get("module_name") == "catboost" and entry.get("task_type") == "GPU" + for _, entry in _walk_modules(search_space) + ) + if uses_catboost_gpu and hardware.accelerator != "cuda": + report.add( + "config", + Severity.TIGHT, + "CatBoost task_type=GPU configured but no CUDA detected - will fall back to CPU.", + ) + + +def _data_phase( + search_space: list[dict[str, Any]], + stats: DatasetStats, + report: PreflightReport, +) -> None: + """Data-phase checks: token truncation, rare classes, missing intent descriptions.""" + # token-length truncation (heuristic — we use stats.p95_tokens vs configured max_length) + p95 = stats.p95_tokens or int(stats.avg_tokens * 2.5) + for _, entry in _walk_modules(search_space): + max_len_value = entry.get("max_length") + if max_len_value is None: + continue + max_len = _max_int(max_len_value, 512) + if p95 > max_len: + severity = Severity.OVER if p95 > max_len * 1.5 else Severity.TIGHT + module_name = entry.get("module_name", "?") + report.add( + "data", + severity, + f"Train tokens p95~{p95} exceeds {module_name}.max_length={max_len}; expect silent truncation.", + ) + + # rare class x linear-CV (LogisticRegressionCV cv=3 needs >=3 samples/class; + # multilabel path uses one-vs-rest without CV so the failure can't occur there) + has_linear = any(e.get("module_name") == "linear" for _, e in _walk_modules(search_space)) + if has_linear and stats.rare_classes and not stats.multilabel: + report.add( + "data", + Severity.OVER, + f"LogisticRegressionCV (cv=3) will fail: classes {stats.rare_classes[:5]} have <3 samples.", + ) + + # partial descriptions x description scorer + description_modules = {"description_bi", "description_cross", "description_llm"} + has_description = any(e.get("module_name") in description_modules for _, e in _walk_modules(search_space)) + if has_description and stats.has_descriptions is False: + report.add( + "data", + Severity.OVER, + "description scorer present but intent descriptions are missing - fill them in or drop the scorer.", + ) diff --git a/src/autointent/_advisor/workflows.py b/src/autointent/_advisor/workflows.py new file mode 100644 index 000000000..2331e225a --- /dev/null +++ b/src/autointent/_advisor/workflows.py @@ -0,0 +1,230 @@ +"""High-level advisor workflows: ``inspect`` and ``recommend``. + +Each workflow orchestrates the lower-level pieces (``load_config``, +``detect_hardware``, ``stats_from_dataset``, ``run_preflight``) into a single +typed call. They expose the same logic the CLI uses but accept Python +arguments instead of an ``argparse.Namespace`` — useful from notebooks, +integration tests, or any caller that wants a ``PreflightReport`` / +``RecommendationResult`` directly. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import TYPE_CHECKING, Any, get_args + +import yaml +from datasets import ClassLabel, Sequence, load_dataset + +from autointent.custom_types import SearchSpacePreset +from autointent.utils import load_preset + +from ._hardware import detect_hardware +from ._report import DatasetStats, RecommendationResult, Severity +from .runner import run_preflight + +if TYPE_CHECKING: + from collections.abc import Iterable + + from ._report import PreflightReport + + +logger = logging.getLogger("autointent.advisor") + +_SAMPLE_LIMIT = 1000 +_P95_PERCENTILE = 0.95 +BUNDLED_PRESETS: tuple[str, ...] = get_args(SearchSpacePreset) + + +def load_config(target: str) -> tuple[dict[str, Any], str]: + """Return ``(config_dict, friendly_name)`` for either a preset name or a YAML path.""" + path = Path(target) + if path.is_file(): + with path.open(encoding="utf-8") as f: + return yaml.safe_load(f), path.stem + return load_preset(target), target # type: ignore[arg-type] + + +def stats_from_dataset(path: str, *, multilabel: bool = False) -> DatasetStats: + """Best-effort: load a dataset via HF ``datasets.load_dataset`` and derive advisor stats. + + Accepts a Hub repo id (``DeepPavlov/clinc150``) or a local file path + (``.csv`` / ``.json`` / ``.jsonl`` / ``.parquet``) / dataset directory. Falls + back to a placeholder on any loader error so callers stay best-effort. + """ + # Anything not in this map (no suffix, unknown suffix) is treated as a Hub + # repo id or a dataset directory and passed to load_dataset directly. + file_builders = {".csv": "csv", ".tsv": "csv", ".json": "json", ".jsonl": "json", ".parquet": "parquet"} + builder = file_builders.get(Path(path).suffix.lower()) + try: + ds = load_dataset(builder, data_files=path) if builder else load_dataset(path) + except (OSError, ValueError, FileNotFoundError) as e: + logger.warning("Failed to load dataset %s: %s", path, e) + return DatasetStats.placeholder(multilabel=multilabel) + + train = ds["train"] if "train" in ds else next(iter(ds.values()), None) + if train is None: + return DatasetStats.placeholder(multilabel=multilabel) + + cols = train.column_names + utt_col = next( + (c for c in ("utterance", "text", "sentence", "query", "input") if c in cols), cols[0] if cols else None + ) + label_col = next((c for c in ("label", "labels", "intent", "target") if c in cols), None) + + detected_multilabel, n_classes = _label_shape(train, label_col, fallback_multilabel=multilabel) + + sample = train[:_SAMPLE_LIMIT] if len(train) > _SAMPLE_LIMIT else train[:] + lengths = [len(str(s).split()) for s in (sample.get(utt_col, []) if utt_col else [])] + avg_tokens = int(sum(lengths) / max(1, len(lengths))) if lengths else 32 + if lengths: + sorted_lengths = sorted(lengths) + idx = max(0, min(len(sorted_lengths) - 1, round((len(sorted_lengths) - 1) * _P95_PERCENTILE))) + p95 = sorted_lengths[idx] + else: + p95 = avg_tokens * 2 + + return DatasetStats( + n_samples=len(train), + n_classes=n_classes, + avg_tokens=avg_tokens, + p95_tokens=p95, + multilabel=detected_multilabel, + has_descriptions=None, + rare_classes=_rare_classes(train, label_col, detected_multilabel, n_classes) if label_col else [], + source=f"dataset:{path}", + ) + + +def _label_shape(train: Any, label_col: str | None, *, fallback_multilabel: bool) -> tuple[bool, int]: # noqa: ANN401 + """Derive ``(multilabel, n_classes)`` from the HF feature schema with a value-based fallback.""" + if label_col is None: + return fallback_multilabel, 0 + feature = train.features.get(label_col) + if isinstance(feature, Sequence): + inner = feature.feature + if isinstance(inner, ClassLabel): + return True, inner.num_classes + # Sequence of plain ints — n_classes = max label index + 1. + max_idx = max((max(row) for row in train[label_col] if row), default=-1) + return True, max_idx + 1 + if isinstance(feature, ClassLabel): + return False, feature.num_classes + # Plain int/string column. Detect multilabel from the first non-empty row, then count uniques. + is_multi = len(train) > 0 and isinstance(train[0][label_col], (list, tuple)) + if is_multi: + max_idx = max((max(row) for row in train[label_col] if row), default=-1) + return True, max_idx + 1 + return False, len({label for label in train[label_col] if label is not None}) + + +def _rare_classes( + train: Any, # noqa: ANN401 + label_col: str, + multilabel: bool, + n_classes: int, + min_count: int = 3, +) -> list[str]: + """Return labels with fewer than ``min_count`` samples in the train split. + + Used to surface the LogisticRegressionCV(cv=3) failure case before fit. + Returns an empty list on any error so the advisor stays best-effort. + """ + try: + labels = train[label_col] + except (KeyError, AttributeError, TypeError): + return [] + counts: dict[str, int] = {} + if multilabel: + for row in labels: + if not row: + continue + for i, v in enumerate(row): + if v: + counts[str(i)] = counts.get(str(i), 0) + 1 + for i in range(n_classes): + counts.setdefault(str(i), 0) + else: + for label in labels: + counts[str(label)] = counts.get(str(label), 0) + 1 + return sorted(name for name, c in counts.items() if c < min_count) + + +def inspect( + target: str, + *, + stats: DatasetStats | None = None, + budget_vram_gb: float | None = None, +) -> PreflightReport: + """Inspect a preset (or YAML config path) against the local hardware. + + Args: + target: Bundled preset name (e.g. ``'transformers-light'``) or a YAML + config path. The friendly name surfaced in the report is the file + stem for paths and the preset name otherwise. + stats: Dataset stats to score against. Defaults to a placeholder if + ``None``. + budget_vram_gb: Optional VRAM-budget override for the hardware probe. + + Returns: + ``PreflightReport`` covering resource / data / config phases. + """ + config, name = load_config(target) + hardware = detect_hardware(vram_budget_gb=budget_vram_gb) + return run_preflight(config, stats or DatasetStats.placeholder(), hardware, preset_name=name) + + +def recommend( + *, + stats: DatasetStats | None = None, + presets: Iterable[str] | None = None, + budget_vram_gb: float | None = None, + budget_time_h: float | None = None, +) -> RecommendationResult: + """Walk bundled presets and return the best feasible fit plus all per-preset reports. + + Args: + stats: Dataset stats to score against. Defaults to a placeholder if ``None``. + presets: Override of the preset list (defaults to ``BUNDLED_PRESETS``). + budget_vram_gb: Optional VRAM-budget override for the hardware probe. + budget_time_h: Optional wall-time ceiling in hours; presets exceeding it + get an extra ``Severity.OVER`` finding so they drop out of the + feasible ranking. + + Returns: + ``RecommendationResult`` with the chosen preset name and full results list. + + Note: + Among feasible presets we pick the heaviest one that still fits the + hardware budget — "use what you have" semantics. This is a *cost* + ranking, not a quality ranking: a heavier preset is not strictly better + and may overfit on small datasets where a classic-* preset would win on + accuracy. Override ``presets=`` if you want a different ranking. + """ + hardware = detect_hardware(vram_budget_gb=budget_vram_gb) + stats = stats or DatasetStats.placeholder() + preset_iter = list(presets) if presets is not None else BUNDLED_PRESETS + + results: list[tuple[str, PreflightReport]] = [] + for preset in preset_iter: + try: + cfg = load_preset(preset) # type: ignore[arg-type] + except (OSError, ValueError, KeyError) as e: + logger.debug("Skipping preset %s: %s", preset, e) + continue + report = run_preflight(cfg, stats, hardware, preset_name=preset) + if budget_time_h is not None and report.resource.time_hours > budget_time_h: + report.add( + "resource", + Severity.OVER, + f"Estimated time {report.resource.time_hours:.1f} h exceeds budget {budget_time_h} h.", + ) + results.append((preset, report)) + + cost_rank = {name: i for i, name in enumerate(BUNDLED_PRESETS)} + feasible = [(name, r) for name, r in results if r.is_feasible] + feasible.sort(key=lambda pair: (cost_rank.get(pair[0], len(BUNDLED_PRESETS)), pair[0])) + chosen = feasible[0][0] if feasible else None + + return RecommendationResult(chosen=chosen, results=results) diff --git a/src/autointent/_utils.py b/src/autointent/_utils.py index c8a8614b7..92c81431b 100644 --- a/src/autointent/_utils.py +++ b/src/autointent/_utils.py @@ -25,5 +25,3 @@ def detect_device() -> str: if torch.mps.is_available(): return "mps" return "cpu" - - diff --git a/src/autointent/custom_types/_types.py b/src/autointent/custom_types/_types.py index cbfa82576..59e6b87a3 100644 --- a/src/autointent/custom_types/_types.py +++ b/src/autointent/custom_types/_types.py @@ -117,18 +117,25 @@ class Split: """ SearchSpacePreset = Literal[ - "classic-heavy", - "classic-light", - "classic-medium", - "nn-heavy", - "nn-medium", "transformers-heavy", "transformers-light", - "transformers-no-hpo", + "nn-heavy", "zero-shot-llm", + "nn-medium", + "classic-heavy", + "transformers-no-hpo", + "classic-medium", "zero-shot-encoders", + "classic-light", ] -"""Some presets that our library supports.""" +"""Bundled search-space presets, listed in descending resource-cost order. + +Heavier presets explore more / larger models and take longer to run. The order +is a cost ranking, **not** a quality ranking: a heavier preset is not strictly +better — e.g. ``transformers-heavy`` will overfit on tiny datasets where a +classic-* preset wins on accuracy. ``autointent._advisor.recommend`` uses this +ordering to pick the heaviest preset that still fits the hardware budget, +which is a reasonable default but not always the right choice for the data.""" class Document(BaseModel): diff --git a/tests/advisor/__init__.py b/tests/advisor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/advisor/test_estimates_and_cli.py b/tests/advisor/test_estimates_and_cli.py new file mode 100644 index 000000000..cbaee2f81 --- /dev/null +++ b/tests/advisor/test_estimates_and_cli.py @@ -0,0 +1,226 @@ +"""End-to-end smoke tests for the advisor. + +These run offline — HF Hub probes are monkeypatched to fail so the +advisor falls back to its name-pattern heuristics. Verifies that: + +* every bundled preset can be inspected without raising; +* the recommend subcommand picks something on a generous budget and + nothing on a hostile one; +* ``--json`` emits parseable JSON. +""" + +from __future__ import annotations + +import json +import sys + +import pytest + +from autointent._advisor import DatasetStats, HardwareProfile, run_preflight +from autointent._advisor._cli import main +from autointent._advisor.workflows import BUNDLED_PRESETS +from autointent.utils import load_preset + + +@pytest.fixture(autouse=True) +def _force_offline(monkeypatch: pytest.MonkeyPatch) -> None: + """Force HF Hub lookups to fail so tests don't hit the network.""" + from autointent._advisor import _hub + + _hub.resolve_model.cache_clear() + monkeypatch.setattr(_hub, "_hub_metadata", lambda _name: None) + + +def _profile(vram_gb: float = 16.0) -> HardwareProfile: + return HardwareProfile( + accelerator="cuda" if vram_gb > 0 else "cpu", + device_name="test-gpu" if vram_gb > 0 else "test-cpu", + vram_gb=vram_gb, + ram_gb=32.0, + free_disk_gb=200.0, + cpu_count=8, + ) + + +@pytest.mark.parametrize("preset", BUNDLED_PRESETS) +def test_every_preset_inspects_without_raising(preset: str) -> None: + cfg = load_preset(preset) # type: ignore[arg-type] + stats = DatasetStats.placeholder(n_samples=500, n_classes=10, avg_tokens=24) + report = run_preflight(cfg, stats, _profile(vram_gb=16.0), preset_name=preset) + assert report.preset_name == preset + # always at least one resource-phase finding + assert any(f.phase == "resource" for f in report.findings) + + +def test_heavy_preset_is_infeasible_on_2gb_budget() -> None: + cfg = load_preset("transformers-heavy") + stats = DatasetStats.placeholder(n_samples=5000, n_classes=20, avg_tokens=40) + report = run_preflight(cfg, stats, _profile(vram_gb=2.0), preset_name="transformers-heavy") + assert not report.is_feasible, "deberta-v3-large should not fit in 2 GB" + + +def test_light_preset_is_feasible_on_8gb_budget() -> None: + cfg = load_preset("transformers-light") + stats = DatasetStats.placeholder(n_samples=1000, n_classes=10, avg_tokens=24) + report = run_preflight(cfg, stats, _profile(vram_gb=8.0), preset_name="transformers-light") + assert report.is_feasible + + +def test_n_jobs_doubles_vram_findings() -> None: + cfg = load_preset("transformers-light") + cfg = {**cfg, "hpo_config": {**(cfg.get("hpo_config") or {}), "n_jobs": 4}} + stats = DatasetStats.placeholder() + report = run_preflight(cfg, stats, _profile(vram_gb=4.0)) + assert any("parallel trials" in f.message for f in report.findings) + assert any(f.phase == "config" and "n_jobs" in f.message for f in report.findings) + + +def test_cli_inspect_json_is_parseable(capsys: pytest.CaptureFixture[str]) -> None: + rc = main( + [ + "inspect", + "transformers-light", + "--n-samples", + "500", + "--n-classes", + "5", + "--avg-tokens", + "20", + "--json", + "--budget-vram-gb", + "16", + ] + ) + captured = capsys.readouterr() + payload = json.loads(captured.out) + assert payload["preset_name"] == "transformers-light" + assert "findings" in payload + assert payload["headroom"] in {"ample", "tight", "over"} + # rc is 0 on feasible, 1 otherwise + assert rc in (0, 1) + + +def test_cli_inspect_text_runs(capsys: pytest.CaptureFixture[str]) -> None: + main( + [ + "inspect", + "transformers-light", + "--n-samples", + "200", + "--n-classes", + "5", + "--avg-tokens", + "15", + "--budget-vram-gb", + "16", + ] + ) + out = capsys.readouterr().out + assert "Compute feasibility check" in out + assert "Verdict:" in out + + +def test_cli_recommend_picks_a_preset_on_generous_hardware( + capsys: pytest.CaptureFixture[str], +) -> None: + rc = main( + [ + "recommend", + "--n-samples", + "1000", + "--n-classes", + "10", + "--avg-tokens", + "20", + "--budget-vram-gb", + "24", + ] + ) + out = capsys.readouterr().out + assert "Recommendation:" in out + assert rc == 0 + + +def test_partial_descriptions_with_description_scorer_flags_red() -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "description_bi"}, + ], + } + ], + } + stats = DatasetStats( + n_samples=500, + n_classes=10, + avg_tokens=24, + has_descriptions=False, + ) + report = run_preflight(cfg, stats, _profile(vram_gb=16.0)) + assert any(f.phase == "data" and "description" in f.message.lower() for f in report.findings) + + +def test_long_dataset_triggers_truncation_warning() -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": "microsoft/deberta-v3-small"}], + "max_length": [128], + } + ], + } + ], + } + stats = DatasetStats( + n_samples=500, + n_classes=10, + avg_tokens=80, + p95_tokens=512, # well over 128 + ) + report = run_preflight(cfg, stats, _profile(vram_gb=16.0)) + assert any("truncation" in f.message.lower() for f in report.findings) + + +def test_cli_recommend_budget_time_flags_red_for_overbudget_presets( + capsys: pytest.CaptureFixture[str], +) -> None: + """Tight time budget must flag every preset that exceeds it with RED severity. + + Previously the budget path used a tautological severity expression and the + breach never escalated the finding — covers the regression.""" + main( + [ + "recommend", + "--n-samples", + "1000", + "--n-classes", + "10", + "--avg-tokens", + "20", + "--budget-vram-gb", + "48", + "--budget-time-h", + "0.0001", + "--json", + ] + ) + payload = json.loads(capsys.readouterr().out) + flagged = [ + r + for r in payload["results"] + if any(f["severity"] == "over" and "exceeds budget" in f["message"] for f in r["report"]["findings"]) + ] + assert flagged, "budget-time-h breach should produce OVER severity findings" + # Any preset above the budget must be marked infeasible. + for r in flagged: + assert r["report"]["is_feasible"] is False + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-v"])) diff --git a/tests/advisor/test_estimates_internals.py b/tests/advisor/test_estimates_internals.py new file mode 100644 index 000000000..02c254f87 --- /dev/null +++ b/tests/advisor/test_estimates_internals.py @@ -0,0 +1,601 @@ +"""Targeted tests for `_estimates` helpers + edge cases of `run_preflight`.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from autointent._advisor import _hub, run_preflight +from autointent._advisor._estimates._formulas import _classify_severity, _ram_for_module, _vram_for_transformer +from autointent._advisor._estimates._search_space import _extract_model_names, _max_int +from autointent._advisor._hardware import HardwareProfile +from autointent._advisor._hub import ModelMeta +from autointent._advisor._report import DatasetStats, Severity + +# Per-name ModelMeta fixtures used by the offline tests. Production resolution +# (HF Hub config.json + safetensors metadata) is mocked away so the batch-fit +# math doesn't depend on whatever fallback the heuristic path returns. +_FAKE_SHAPES: dict[str, tuple[int, int, int]] = { + # (total_params, hidden_size, n_layers) + "microsoft/deberta-v3-large": (350_000_000, 1024, 24), + "microsoft/deberta-v3-small": (140_000_000, 768, 6), + "sentence-transformers/all-MiniLM-L6-v2": (33_000_000, 384, 6), + "intfloat/multilingual-e5-large-instruct": (560_000_000, 1024, 24), +} + + +def _fake_resolve(model_name: str) -> ModelMeta: + known = _FAKE_SHAPES.get(model_name) + params, hidden, layers = known or (110_000_000, 768, 12) + return ModelMeta( + name=model_name, + total_params=params, + weight_bytes_per_param=4, + total_file_bytes=params * 4, + cached_locally=False, + confidence="hub" if known else "heuristic", + hidden_size=hidden, + n_layers=layers, + ) + + +@pytest.fixture(autouse=True) +def _offline(monkeypatch: pytest.MonkeyPatch) -> None: + _hub.resolve_model.cache_clear() + monkeypatch.setattr(_hub, "_is_warm_cached", lambda _name: False) + # Resource phase calls `_hub.resolve_model(...)` via module reference, so + # patching the symbol on `_hub` is enough. + monkeypatch.setattr(_hub, "resolve_model", _fake_resolve) + + +def _profile(vram_gb: float = 16.0, accelerator: str = "cuda") -> HardwareProfile: + return HardwareProfile( + accelerator=accelerator, # type: ignore[arg-type] + device_name=f"test-{accelerator}", + vram_gb=vram_gb, + ram_gb=32.0, + free_disk_gb=200.0, + cpu_count=8, + ) + + +class TestMaxInt: + def test_none_returns_default(self) -> None: + assert _max_int(None, 7) == 7 + + def test_list_picks_max(self) -> None: + assert _max_int([1, 5, 3], 0) == 5 + + def test_range_dict_uses_high(self) -> None: + assert _max_int({"low": 1, "high": 9}, 0) == 9 + + def test_scalar_int_passes_through(self) -> None: + assert _max_int(42, 0) == 42 + + def test_garbage_returns_default(self) -> None: + assert _max_int("not-a-number", 11) == 11 + + +class TestExtractModelNames: + def test_classification_model_config_as_list(self) -> None: + entry = {"classification_model_config": [{"model_name": "foo/bar"}]} + assert _extract_model_names(entry) == ["foo/bar"] + + def test_classification_model_config_as_dict(self) -> None: + entry = {"classification_model_config": {"model_name": "foo/bar"}} + assert _extract_model_names(entry) == ["foo/bar"] + + def test_embedder_config_picked_up(self) -> None: + entry = {"embedder_config": [{"model_name": "e/b"}]} + assert _extract_model_names(entry) == ["e/b"] + + def test_multiple_choices_all_returned(self) -> None: + entry = { + "classification_model_config": [ + {"model_name": "a/x"}, + {"model_name": "b/y"}, + ] + } + assert _extract_model_names(entry) == ["a/x", "b/y"] + + def test_empty_entry(self) -> None: + assert _extract_model_names({}) == [] + + +class TestClassifySeverity: + def test_below_yellow_is_green(self) -> None: + assert _classify_severity(estimate=1.0, budget=10.0) == Severity.AMPLE + + def test_above_yellow_threshold(self) -> None: + assert _classify_severity(estimate=9.5, budget=10.0) == Severity.TIGHT + + def test_at_or_above_red_threshold(self) -> None: + assert _classify_severity(estimate=10.0, budget=10.0) == Severity.OVER + assert _classify_severity(estimate=12.0, budget=10.0) == Severity.OVER + + def test_zero_budget_returns_yellow(self) -> None: + assert _classify_severity(estimate=1.0, budget=0.0) == Severity.TIGHT + + +class TestVramForTransformer: + @pytest.fixture + def meta(self) -> ModelMeta: + return ModelMeta( + name="x", + total_params=100_000_000, + weight_bytes_per_param=4, + total_file_bytes=0, + cached_locally=False, + confidence="hub", + ) + + def test_full_finetune_is_larger_than_lora_is_larger_than_inference(self, meta: ModelMeta) -> None: + inference = _vram_for_transformer(meta, "inference") + lora = _vram_for_transformer(meta, "lora") + full = _vram_for_transformer(meta, "full-finetune") + assert inference < lora < full + + def test_inference_activations_are_smaller_than_training(self, meta: ModelMeta) -> None: + """Inference doesn't store per-layer outputs for backward — activation memory + should be many times smaller than training at the same batch_size.""" + train_total = _vram_for_transformer(meta, "full-finetune", batch_size=64, seq_len=128) + train_weights = _vram_for_transformer(meta, "full-finetune", batch_size=0) + inf_total = _vram_for_transformer(meta, "inference", batch_size=64, seq_len=128) + inf_weights = _vram_for_transformer(meta, "inference", batch_size=0) + train_acts = train_total - train_weights + inf_acts = inf_total - inf_weights + assert inf_acts > 0 + assert train_acts > inf_acts + # 12-layer model: training activations should be at least ~5x inference. + assert train_acts / inf_acts > 5 + + +def test_ram_scales_with_dataset_size() -> None: + meta = ModelMeta( + name="x", + total_params=100_000_000, + weight_bytes_per_param=4, + total_file_bytes=0, + cached_locally=False, + confidence="hub", + ) + small = _ram_for_module(meta, DatasetStats.placeholder(n_samples=100)) + big = _ram_for_module(meta, DatasetStats.placeholder(n_samples=10_000_000, avg_tokens=128)) + assert big > small + + +class TestRunPreflightFeatures: + def test_dump_modules_adds_disk_during_training(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": "microsoft/deberta-v3-small"}], + "num_train_epochs": [3], + "batch_size": [16], + } + ], + } + ], + "hpo_config": {"n_trials": 5}, + "logging_config": {"dump_modules": True}, + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + assert report.resource.disk_dump_gb > 0 + assert any("during training" in f.message for f in report.findings) + + def test_refit_after_increases_time(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": "microsoft/deberta-v3-small"}], + "num_train_epochs": [3], + "batch_size": [16], + } + ], + } + ], + "hpo_config": {"n_trials": 10}, + } + baseline = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + bumped = run_preflight(cfg, DatasetStats.placeholder(), _profile(), refit_after=True) + assert bumped.resource.time_hours > baseline.resource.time_hours + + def test_catboost_gpu_without_cuda_flags_config(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "catboost", "task_type": "GPU"}, + ], + } + ], + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile(accelerator="cpu")) + assert any(f.phase == "config" and "CatBoost" in f.message for f in report.findings) + + def test_catboost_gpu_with_cuda_is_silent(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "catboost", "task_type": "GPU"}, + ], + } + ], + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile(accelerator="cuda")) + assert not any(f.phase == "config" and "CatBoost" in f.message for f in report.findings) + + def test_offline_flips_low_confidence(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": "any/model"}], + } + ], + } + ] + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + assert report.low_confidence is True + assert any("Heuristic fallback" in n for n in report.notes) + + def test_rare_classes_with_linear_scorer_flag_red(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "linear"}, + ], + } + ] + } + stats = DatasetStats( + n_samples=20, + n_classes=5, + avg_tokens=10, + rare_classes=["intent_a", "intent_b"], + ) + report = run_preflight(cfg, stats, _profile()) + assert any( + f.phase == "data" and "LogisticRegressionCV" in f.message and f.severity == Severity.OVER + for f in report.findings + ) + + def test_truncation_red_when_p95_dominates_max_length(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "max_length": [128], + "classification_model_config": [{"model_name": "some/model"}], + } + ], + } + ] + } + stats = DatasetStats(n_samples=500, n_classes=5, avg_tokens=50, p95_tokens=400) + report = run_preflight(cfg, stats, _profile()) + red = [f for f in report.findings if f.phase == "data" and f.severity == Severity.OVER] + assert red, "p95=400 > 1.5 * max_length=128 should be red" + + def test_truncation_yellow_when_p95_only_slightly_exceeds(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "max_length": [128], + "classification_model_config": [{"model_name": "some/model"}], + } + ], + } + ] + } + stats = DatasetStats(n_samples=500, n_classes=5, avg_tokens=50, p95_tokens=140) + report = run_preflight(cfg, stats, _profile()) + yellows = [ + f + for f in report.findings + if f.phase == "data" and f.severity == Severity.TIGHT and "truncation" in f.message.lower() + ] + assert yellows + + +class TestLinearCatboostFormulas: + """Cost surfaces for the classic (sklearn / catboost) scorers.""" + + def _embedder_node(self) -> dict[str, Any]: + return { + "node_type": "embedder", + "search_space": [ + { + "module_name": "sentence_transformer", + "embedder_config": [{"model_name": "sentence-transformers/all-MiniLM-L6-v2"}], + } + ], + } + + def test_linear_contributes_ram_and_time(self) -> None: + cfg = { + "search_space": [ + self._embedder_node(), + { + "node_type": "scoring", + "search_space": [{"module_name": "linear", "max_iter": [200]}], + }, + ], + "hpo_config": {"n_trials": 5}, + } + stats = DatasetStats.placeholder(n_samples=100_000, n_classes=10, avg_tokens=24) + report = run_preflight(cfg, stats, _profile()) + linear_drivers = [d for d in report.resource.drivers if d["module"] == "linear"] + assert len(linear_drivers) == 1 + assert report.resource.ram_gb > 0 + assert report.resource.time_hours > 0 + assert linear_drivers[0]["vram_gb"] == 0 # sklearn is CPU-only + + def test_logreg_cv_multiplier_dominates_multiclass_time(self) -> None: + """Multiclass linear uses LogisticRegressionCV (Cs*cv+1 ≈ 31 inner fits); + multilabel uses one LogReg per class (cv_multiplier=1). At equal n_classes, + multiclass must be much slower than the per-class multilabel path.""" + base = { + "search_space": [ + self._embedder_node(), + { + "node_type": "scoring", + "search_space": [{"module_name": "linear", "max_iter": [1000]}], + }, + ], + "hpo_config": {"n_trials": 1}, + } + multiclass = run_preflight( + base, + DatasetStats.placeholder(n_samples=100_000, n_classes=10, multilabel=False), + _profile(), + ) + multilabel = run_preflight( + base, + DatasetStats.placeholder(n_samples=100_000, n_classes=10, multilabel=True), + _profile(), + ) + # multiclass: 31 inner fits x 1 model; multilabel: 1 fit x n_classes=10 models. + # 31 > 10 => multiclass is the slower path. + assert multiclass.resource.time_hours > multilabel.resource.time_hours + + def test_catboost_contributes_ram_and_time_on_cpu(self) -> None: + cfg = { + "search_space": [ + self._embedder_node(), + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "catboost", + "iterations": [1000], + "depth": [6], + } + ], + }, + ], + "hpo_config": {"n_trials": 3}, + } + stats = DatasetStats.placeholder(n_samples=100_000, n_classes=8, avg_tokens=24) + report = run_preflight(cfg, stats, _profile(accelerator="cpu")) + cb = next(d for d in report.resource.drivers if d["module"] == "catboost") + assert report.resource.ram_gb > 0 + assert report.resource.time_hours > 0 + assert cb["vram_gb"] == 0 + assert cb["mode"] == "catboost" + + def test_catboost_gpu_moves_cost_to_vram(self) -> None: + cfg = { + "search_space": [ + self._embedder_node(), + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "catboost", + "iterations": [1000], + "depth": [6], + "task_type": "GPU", + } + ], + }, + ], + "hpo_config": {"n_trials": 2}, + } + stats = DatasetStats.placeholder(n_samples=100_000, n_classes=8, avg_tokens=24) + report = run_preflight(cfg, stats, _profile(accelerator="cuda")) + cb = next(d for d in report.resource.drivers if d["module"] == "catboost") + assert report.resource.vram_gb > 0 + assert cb["ram_gb"] == 0 + assert cb["mode"] == "catboost-gpu" + + def test_linear_scales_with_n_samples(self) -> None: + cfg = { + "search_space": [ + self._embedder_node(), + { + "node_type": "scoring", + "search_space": [{"module_name": "linear"}], + }, + ], + } + small = run_preflight(cfg, DatasetStats.placeholder(n_samples=500), _profile()) + big = run_preflight(cfg, DatasetStats.placeholder(n_samples=500_000), _profile()) + assert big.resource.time_hours > small.resource.time_hours + assert big.resource.ram_gb > small.resource.ram_gb + + +class TestPerDriverBatchHint: + """Each transformer driver carries its own (batch_size, max_batch_size) for rendering.""" + + def _bert_cfg(self, model_name: str, batch_size: int) -> dict[str, Any]: + return { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": model_name}], + "num_train_epochs": [3], + "batch_size": [batch_size], + } + ], + } + ], + "hpo_config": {"n_trials": 1}, + } + + def test_driver_records_current_and_max_batch(self) -> None: + report = run_preflight( + self._bert_cfg("microsoft/deberta-v3-large", batch_size=64), + DatasetStats.placeholder(), + _profile(vram_gb=7.5), + ) + drivers = [d for d in report.resource.drivers if d["module"] == "bert"] + assert drivers + d = drivers[0] + assert d["batch_size"] == 64 + # vram_gb=7.5 against ~5.9 GB weights x 0.9 tight ratio -> little activation room, max < 64. + assert d["max_batch_size"] is not None + assert 0 < d["max_batch_size"] < 64 + + def test_max_batch_zero_when_weights_alone_overflow(self) -> None: + report = run_preflight( + self._bert_cfg("microsoft/deberta-v3-large", batch_size=64), + DatasetStats.placeholder(), + _profile(vram_gb=2.0), + ) + d = next(d for d in report.resource.drivers if d["module"] == "bert") + assert d["max_batch_size"] == 0 + + def test_max_batch_can_be_larger_than_current(self) -> None: + report = run_preflight( + self._bert_cfg("microsoft/deberta-v3-large", batch_size=32), + DatasetStats.placeholder(), + _profile(vram_gb=64.0), + ) + d = next(d for d in report.resource.drivers if d["module"] == "bert") + assert d["max_batch_size"] is not None + assert d["max_batch_size"] > 32 + + def test_multiple_drivers_carry_independent_max_batch(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [ + {"model_name": "microsoft/deberta-v3-small"}, + {"model_name": "microsoft/deberta-v3-large"}, + ], + "num_train_epochs": [3], + "batch_size": [64], + } + ], + } + ], + "hpo_config": {"n_trials": 1}, + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile(vram_gb=10.0)) + small = next(d for d in report.resource.drivers if "small" in d["model"]) + large = next(d for d in report.resource.drivers if "large" in d["model"]) + # The smaller model has more headroom -> larger max batch (or equal-cap when both saturate). + assert small["max_batch_size"] >= large["max_batch_size"] + + +class TestDumpModulesBounding: + """`dump_modules=True` writes one selected variant per node per trial — not + every candidate. The estimate must be bounded by sum-of-max-per-node x n_trials.""" + + def test_dump_disk_is_bounded_by_per_node_max_not_sum_of_all_variants(self) -> None: + # Two BERT candidates in the same node: only one is selected per trial. + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [ + {"model_name": "microsoft/deberta-v3-small"}, + {"model_name": "microsoft/deberta-v3-large"}, + ], + "num_train_epochs": [3], + "batch_size": [16], + } + ], + } + ], + "hpo_config": {"n_trials": 4}, + "logging_config": {"dump_modules": True}, + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + # Per-node max ~ deberta-v3-large weights (~350M x 4 ~ 1.3 GB). Two-candidate + # sum would be roughly doubled. Verify we used the per-node-max bound. + small_meta = _hub.resolve_model("microsoft/deberta-v3-small") + large_meta = _hub.resolve_model("microsoft/deberta-v3-large") + expected = large_meta.weights_gb * 4 + naive_sum = (small_meta.weights_gb + large_meta.weights_gb) * 4 + assert report.resource.disk_dump_gb == pytest.approx(expected, rel=0.01) + assert report.resource.disk_dump_gb < naive_sum + + def test_dump_disk_sums_across_nodes(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "embedder", + "search_space": [ + { + "module_name": "sentence_transformer", + "embedder_config": [{"model_name": "sentence-transformers/all-MiniLM-L6-v2"}], + } + ], + }, + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": "microsoft/deberta-v3-small"}], + "num_train_epochs": [3], + "batch_size": [16], + } + ], + }, + ], + "hpo_config": {"n_trials": 2}, + "logging_config": {"dump_modules": True}, + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + embedder = _hub.resolve_model("sentence-transformers/all-MiniLM-L6-v2") + bert = _hub.resolve_model("microsoft/deberta-v3-small") + expected = (embedder.weights_gb + bert.weights_gb) * 2 + assert report.resource.disk_dump_gb == pytest.approx(expected, rel=0.01) diff --git a/tests/advisor/test_hardware_detection.py b/tests/advisor/test_hardware_detection.py new file mode 100644 index 000000000..d8131fb19 --- /dev/null +++ b/tests/advisor/test_hardware_detection.py @@ -0,0 +1,72 @@ +"""Hardware detection has to be safe on every machine — broken CUDA, no GPU, +no psutil. Verify the fallbacks work without raising. +""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from autointent._advisor._hardware import detect_hardware + + +def test_cpu_fallback_when_no_accelerator() -> None: + with ( + patch("autointent._advisor._hardware._detect_cuda", return_value=None), + patch("autointent._advisor._hardware._detect_mps", return_value=None), + ): + hw = detect_hardware() + assert hw.accelerator == "cpu" + assert hw.vram_gb == 0.0 + assert hw.device_class == "cpu" + + +def test_cuda_branch_classifies_low_gpu() -> None: + with ( + patch( + "autointent._advisor._hardware._detect_cuda", + return_value=(8.0, "NVIDIA RTX 3060"), + ), + ): + hw = detect_hardware() + assert hw.accelerator == "cuda" + assert hw.vram_gb == pytest.approx(8.0) + assert hw.device_class == "low-gpu" + + +def test_mps_budget_uses_ram_fraction() -> None: + with ( + patch("autointent._advisor._hardware._detect_cuda", return_value=None), + patch("autointent._advisor._hardware._detect_ram_gb", return_value=32.0), + patch( + "autointent._advisor._hardware._detect_mps", + side_effect=lambda ram, ratio: (ram * ratio, "Apple Silicon (arm64)"), + ), + ): + hw = detect_hardware() + assert hw.accelerator == "mps" + assert hw.vram_gb == pytest.approx(32.0 * 0.7) + assert any("MPS unified memory" in n for n in hw.notes) + + +def test_vram_budget_override_applies() -> None: + with ( + patch( + "autointent._advisor._hardware._detect_cuda", + return_value=(24.0, "NVIDIA RTX 4090"), + ), + ): + hw = detect_hardware(vram_budget_gb=8.0) + assert hw.vram_gb == pytest.approx(8.0) + assert any("manual VRAM budget" in n for n in hw.notes) + + +def test_broken_cuda_returns_none_does_not_crash() -> None: + # _detect_cuda swallows torch quirks already; verify the wrapper holds. + with ( + patch("autointent._advisor._hardware._detect_cuda", return_value=None), + patch("autointent._advisor._hardware._detect_mps", return_value=None), + ): + hw = detect_hardware() + assert hw.accelerator == "cpu" diff --git a/tests/advisor/test_hub_heuristics.py b/tests/advisor/test_hub_heuristics.py new file mode 100644 index 000000000..c19018235 --- /dev/null +++ b/tests/advisor/test_hub_heuristics.py @@ -0,0 +1,66 @@ +"""Tests for the offline heuristic fallback in `_hub`. + +The advisor must produce a sensible estimate even when HF Hub is unreachable. +Without a per-name heuristic, every offline lookup collapses to a single +BERT-base-sized default — these tests pin that contract. +""" + +from __future__ import annotations + +import pytest + +from autointent._advisor import _hub + + +@pytest.fixture(autouse=True) +def _offline(monkeypatch: pytest.MonkeyPatch) -> None: + _hub.resolve_model.cache_clear() + # Force `_hub_metadata` to behave as if the live Hub were unreachable so + # resolve_model falls through to `_heuristic_metadata`. + monkeypatch.setattr(_hub, "_hub_metadata", lambda _name: None) + monkeypatch.setattr(_hub, "_is_warm_cached", lambda _name: False) + + +def test_offline_lookup_uses_bert_base_default() -> None: + """Every offline lookup returns the same BERT-base-sized fallback.""" + for name in ( + "microsoft/deberta-v3-large", + "sentence-transformers/all-MiniLM-L6-v2", + "totally-made-up/no-such-model", + ): + meta = _hub.resolve_model(name) + assert meta.confidence == "heuristic" + assert meta.total_params == _hub._DEFAULT_HEURISTIC_PARAMS + + +def test_weights_gb_matches_params_times_bytes() -> None: + meta = _hub.resolve_model("microsoft/deberta-v3-large") + expected_gb = meta.total_params * meta.weight_bytes_per_param / (1024**3) + assert meta.weights_gb == pytest.approx(expected_gb) + + +def test_local_path_returns_zero_disk() -> None: + meta = _hub.resolve_model("/tmp/local/path/to/model") + assert meta.total_file_bytes == 0 + assert meta.cached_locally is True + + +def test_disk_gb_falls_back_to_param_size_when_siblings_unknown() -> None: + meta = _hub.resolve_model("intfloat/multilingual-e5-large-instruct") + assert meta.disk_gb > 0 + assert meta.disk_gb == pytest.approx(meta.weights_gb, rel=0.01) + + +def test_resolve_is_memoized() -> None: + a = _hub.resolve_model("microsoft/deberta-v3-large") + b = _hub.resolve_model("microsoft/deberta-v3-large") + assert a is b + + +def test_metadata_fallback_uses_heuristic_when_hub_unreachable() -> None: + """End-to-end: resolve_model must return a usable ModelMeta even when + the live Hub is unreachable (autouse fixture forces offline).""" + meta = _hub.resolve_model("microsoft/deberta-v3-large") + assert meta.confidence == "heuristic" + assert meta.total_params > 0 + assert meta.disk_gb > 0 diff --git a/tests/advisor/test_render.py b/tests/advisor/test_render.py new file mode 100644 index 000000000..7a806c7f2 --- /dev/null +++ b/tests/advisor/test_render.py @@ -0,0 +1,170 @@ +"""Output rendering: text formatting and JSON serialization.""" + +from __future__ import annotations + +import json + +from autointent._advisor._render import _batch_hint, render_json, render_recommendation, render_text +from autointent._advisor._report import ( + DatasetStats, + PreflightReport, + ResourceEstimate, + Severity, +) + + +def _populated_report() -> PreflightReport: + r = PreflightReport( + preset_name="example", + hardware={ + "accelerator": "cuda", + "device_name": "RTX 3060", + "vram_gb": 8.0, + "ram_gb": 32.0, + "free_disk_gb": 100.0, + "device_class": "low-gpu", + }, + dataset={"n_samples": 500, "n_classes": 10, "avg_tokens": 30, "source": "placeholder"}, + resource=ResourceEstimate( + disk_download_gb=2.5, + disk_cached_gb=0.5, + ram_gb=1.0, + vram_gb=4.0, + time_hours=1.2, + drivers=[ + { + "node_type": "scoring", + "module": "bert", + "model": "x/y", + "mode": "full-finetune", + "vram_gb": 4.0, + "ram_gb": 1.0, + "time_hours": 1.2, + "confidence": "hub", + } + ], + ), + notes=["MPS unified memory note"], + ) + r.add("resource", Severity.TIGHT, "VRAM ~6 GB vs available 8 GB") + r.add("data", Severity.OVER, "rare classes blocked") + return r + + +class TestRenderText: + def test_contains_phase_blocks(self) -> None: + out = render_text(_populated_report()) + assert "Resource:" in out + assert "Data:" in out + # Config phase has no findings -> block omitted + assert "Config:" not in out + + def test_includes_drivers_block(self) -> None: + out = render_text(_populated_report()) + assert "Drivers of cost:" in out + assert "x/y" in out + + def test_verdict_reflects_headroom(self) -> None: + out = render_text(_populated_report()) + assert "Verdict: INFEASIBLE" in out + assert "headroom: over" in out + + def test_disclaimer_always_present(self) -> None: + out = render_text(_populated_report()) + assert "heuristic upper bounds" in out + + def test_low_confidence_tag_when_offline(self) -> None: + r = _populated_report() + r.low_confidence = True + out = render_text(r) + assert "low-confidence" in out + + def test_preset_name_in_title(self) -> None: + out = render_text(_populated_report()) + assert "Compute feasibility check — example" in out + + def test_empty_report_still_renders(self) -> None: + out = render_text(PreflightReport()) + assert "Compute feasibility check" in out + assert "Verdict: feasible" in out + + +class TestRenderJson: + def test_is_valid_json(self) -> None: + json.loads(render_json(_populated_report())) + + def test_findings_have_string_severity(self) -> None: + d = json.loads(render_json(_populated_report())) + for f in d["findings"]: + assert f["severity"] in {"ample", "tight", "over"} + + def test_headroom_and_feasibility_serialized(self) -> None: + d = json.loads(render_json(_populated_report())) + assert d["headroom"] == "over" + assert d["is_feasible"] is False + + def test_empty_report_serializes(self) -> None: + d = json.loads(render_json(PreflightReport())) + assert d["headroom"] == "ample" + assert d["is_feasible"] is True + + +class TestRenderRecommendation: + def _two_reports(self) -> list[tuple[str, PreflightReport]]: + a = PreflightReport(preset_name="a", resource=ResourceEstimate(vram_gb=2.0, time_hours=0.5)) + a.add("resource", Severity.AMPLE, "ok") + b = PreflightReport(preset_name="b", resource=ResourceEstimate(vram_gb=8.0, time_hours=4.0)) + b.add("resource", Severity.OVER, "too big") + return [("a", a), ("b", b)] + + def test_lists_chosen_preset_when_present(self) -> None: + out = render_recommendation(self._two_reports(), chosen="a") + assert "-> a" in out + + def test_handles_no_chosen(self) -> None: + out = render_recommendation(self._two_reports(), chosen=None) + assert "none of the bundled presets" in out + + def test_includes_all_presets_in_table(self) -> None: + out = render_recommendation(self._two_reports(), chosen="a") + assert "a " in out # preset name + assert "b " in out + + def test_shows_status_per_preset(self) -> None: + out = render_recommendation(self._two_reports(), chosen="a") + assert "feasible" in out + assert "infeasible" in out + + +class TestBatchHint: + """Per-driver batch cell rendered in the Drivers-of-cost table.""" + + def test_arrow_when_max_differs(self) -> None: + assert _batch_hint({"batch_size": 64, "max_batch_size": 32}) == "64 -> 32" + + def test_plain_when_max_equals_current(self) -> None: + assert _batch_hint({"batch_size": 64, "max_batch_size": 64}) == "64" + + def test_no_fit_label_when_max_zero(self) -> None: + assert _batch_hint({"batch_size": 64, "max_batch_size": 0}) == "64 (no fit)" + + def test_empty_when_no_batch(self) -> None: + assert _batch_hint({"batch_size": None, "max_batch_size": None}) == "" + + def test_increase_arrow(self) -> None: + assert _batch_hint({"batch_size": 32, "max_batch_size": 128}) == "32 -> 128" + + +def test_dataset_stats_in_text_block() -> None: + stats = DatasetStats.placeholder(n_samples=777, n_classes=4) + r = PreflightReport( + dataset={ + "n_samples": stats.n_samples, + "n_classes": stats.n_classes, + "avg_tokens": stats.avg_tokens, + "source": stats.source, + } + ) + out = render_text(r) + assert "777" in out + assert "n_classes=4" in out diff --git a/tests/advisor/test_report.py b/tests/advisor/test_report.py new file mode 100644 index 000000000..acb2b5bf8 --- /dev/null +++ b/tests/advisor/test_report.py @@ -0,0 +1,87 @@ +"""Unit tests for the report dataclasses.""" + +from __future__ import annotations + +import dataclasses + +import pytest + +from autointent._advisor._report import ( + DatasetStats, + Finding, + PreflightReport, + ResourceEstimate, + Severity, +) + + +class TestSeverityOrdering: + def test_headroom_on_empty_report_is_green(self) -> None: + assert PreflightReport().headroom == Severity.AMPLE + + def test_red_beats_yellow_beats_green(self) -> None: + r = PreflightReport() + r.add("resource", Severity.AMPLE, "ok") + r.add("data", Severity.TIGHT, "warn") + assert r.headroom == Severity.TIGHT + r.add("config", Severity.OVER, "fail") + assert r.headroom == Severity.OVER # type: ignore[comparison-overlap] + + def test_is_feasible_flips_on_any_red(self) -> None: + r = PreflightReport() + r.add("resource", Severity.TIGHT, "warn") + assert r.is_feasible is True + r.add("data", Severity.OVER, "fail") + assert r.is_feasible is False + + +class TestDatasetStatsPlaceholder: + def test_defaults_populate_p95_above_avg(self) -> None: + stats = DatasetStats.placeholder() + assert stats.n_samples == 1_000 + assert stats.p95_tokens is not None + assert stats.p95_tokens > stats.avg_tokens + assert stats.source == "placeholder" + + def test_overrides_propagate(self) -> None: + stats = DatasetStats.placeholder(n_samples=42, n_classes=3, avg_tokens=80, multilabel=True) + assert stats.n_samples == 42 + assert stats.n_classes == 3 + assert stats.avg_tokens == 80 + assert stats.multilabel is True + + +class TestResourceEstimate: + def test_total_disk_sums_download_and_dump(self) -> None: + e = ResourceEstimate(disk_download_gb=2.5, disk_dump_gb=4.0) + assert e.total_disk_gb == pytest.approx(6.5) + + def test_total_disk_ignores_cached(self) -> None: + e = ResourceEstimate(disk_download_gb=1.0, disk_cached_gb=100.0, disk_dump_gb=0.5) + assert e.total_disk_gb == pytest.approx(1.5) + + +class TestToDictSerialization: + def test_findings_round_trip_severity_as_string(self) -> None: + r = PreflightReport() + r.add("resource", Severity.OVER, "boom") + d = r.to_dict() + assert d["headroom"] == "over" + assert d["is_feasible"] is False + assert d["findings"] == [ + {"phase": "resource", "severity": "over", "message": "boom", "metric": None}, + ] + + def test_hardware_and_dataset_pass_through(self) -> None: + r = PreflightReport( + hardware={"accelerator": "cuda", "vram_gb": 8.0}, + dataset={"n_samples": 100, "n_classes": 5}, + ) + d = r.to_dict() + assert d["hardware"]["accelerator"] == "cuda" + assert d["dataset"]["n_samples"] == 100 + + def test_finding_is_frozen(self) -> None: + f = Finding(phase="resource", severity=Severity.AMPLE, message="ok") + with pytest.raises(dataclasses.FrozenInstanceError): + f.message = "changed" # type: ignore[misc] diff --git a/tests/ci/test_compute_matrix.py b/tests/ci/test_compute_matrix.py index c03049815..f8f6899b8 100644 --- a/tests/ci/test_compute_matrix.py +++ b/tests/ci/test_compute_matrix.py @@ -88,9 +88,7 @@ def test_push_writes_full_matrix(self, monkeypatch: pytest.MonkeyPatch, tmp_path assert json.loads(outputs["matrix"]) == cm.FULL_MATRIX assert json.loads(outputs["warm_os"]) == ["ubuntu-latest", "windows-latest"] - def test_pr_without_label_writes_minimal_matrix( - self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path - ) -> None: + def test_pr_without_label_writes_minimal_matrix(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: out = tmp_path / "out.txt" monkeypatch.setenv("EVENT_NAME", "pull_request") monkeypatch.setenv("LABELS_JSON", '["bug"]') @@ -103,9 +101,7 @@ def test_pr_without_label_writes_minimal_matrix( assert json.loads(outputs["matrix"]) == cm.MINIMAL_MATRIX assert json.loads(outputs["warm_os"]) == ["ubuntu-latest"] - def test_pr_with_full_ci_label_writes_full_matrix( - self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path - ) -> None: + def test_pr_with_full_ci_label_writes_full_matrix(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: out = tmp_path / "out.txt" monkeypatch.setenv("EVENT_NAME", "pull_request") monkeypatch.setenv("LABELS_JSON", '["full-ci"]') diff --git a/tests/test_deps.py b/tests/test_deps.py index a5c74a716..1be235e62 100644 --- a/tests/test_deps.py +++ b/tests/test_deps.py @@ -35,6 +35,7 @@ def _patch_metadata( requires_map: {dist_name: [PEP 508 requirement string, ...]} versions: {dist_name: installed_version_string} (absent key => not installed) """ + def fake_requires(dist: str) -> list[str]: # Mirror the real importlib.metadata.requires: a dist with no metadata # (i.e. not installed) raises PackageNotFoundError rather than returning []. @@ -81,12 +82,14 @@ def test_check_reports_outdated(monkeypatch: pytest.MonkeyPatch) -> None: def test_iter_extra_reqs_selects_only_extra_members(monkeypatch: pytest.MonkeyPatch) -> None: _patch_metadata( monkeypatch, - {"autointent": [ - "numpy>=1.0 ; python_version >= '3.0'", # base dep w/ env marker -> excluded - "torch>=2.0", # base dep, no marker -> excluded - "catboost>=1.2.8,<2.0.0 ; extra == 'catboost'", # extra member -> included - "peft>=0.10.0 ; extra == 'peft'", # different extra -> excluded - ]}, + { + "autointent": [ + "numpy>=1.0 ; python_version >= '3.0'", # base dep w/ env marker -> excluded + "torch>=2.0", # base dep, no marker -> excluded + "catboost>=1.2.8,<2.0.0 ; extra == 'catboost'", # extra member -> included + "peft>=0.10.0 ; extra == 'peft'", # different extra -> excluded + ] + }, {}, ) reqs = deps._iter_extra_reqs("autointent", "catboost") @@ -119,10 +122,12 @@ def test_resolve_recurses_into_nested_extra(monkeypatch: pytest.MonkeyPatch) -> def test_resolve_terminates_on_cycle(monkeypatch: pytest.MonkeyPatch) -> None: _patch_metadata( monkeypatch, - {"pkg": [ - "pkg[b]>=1.0 ; extra == 'a'", - "pkg[a]>=1.0 ; extra == 'b'", - ]}, + { + "pkg": [ + "pkg[b]>=1.0 ; extra == 'a'", + "pkg[a]>=1.0 ; extra == 'b'", + ] + }, {}, ) reqs = deps._resolve("pkg", "a", set())