|
| 1 | +# Copyright © 2026 Apple Inc. All rights reserved. |
| 2 | +# |
| 3 | +# Please refer to the license found in the LICENSE file in the root directory of the source tree. |
| 4 | + |
| 5 | +"""Report which CoreML operations would dispatch to ANE / GPU / CPU. |
| 6 | +
|
| 7 | +The CoreML runtime decides at compile/load time which compute device each |
| 8 | +MIL operation will run on; that decision is exposed by ``MLComputePlan`` |
| 9 | +in coremltools 9.0+. This script wraps that API so users can answer |
| 10 | +"why isn't my model running on the ANE?" without writing Swift. |
| 11 | +
|
| 12 | +Usage:: |
| 13 | +
|
| 14 | + # Analyze a CoreML model directly (mlpackage or compiled mlmodelc). |
| 15 | + python coreml_compute_plan.py --model_path path/to/model.mlpackage |
| 16 | +
|
| 17 | + # Analyze every Core ML partition embedded in an ExecuTorch .pte. |
| 18 | + python coreml_compute_plan.py --model_path path/to/program.pte |
| 19 | +
|
| 20 | + # Show ops that fell off the ANE, grouped by op type. |
| 21 | + python coreml_compute_plan.py --model_path model.mlpackage --show_non_ane |
| 22 | +
|
| 23 | + # Pick which devices the runtime is allowed to consider. |
| 24 | + python coreml_compute_plan.py --model_path model.mlpackage \\ |
| 25 | + --compute_units cpu_and_ne |
| 26 | +""" |
| 27 | + |
| 28 | +import argparse |
| 29 | +import os |
| 30 | +import shutil |
| 31 | +import sys |
| 32 | +import tempfile |
| 33 | +from collections import Counter |
| 34 | +from pathlib import Path |
| 35 | +from typing import Iterable, List, Optional, Tuple |
| 36 | + |
| 37 | +import coremltools as ct |
| 38 | +from coremltools.models.compute_device import ( |
| 39 | + MLCPUComputeDevice, |
| 40 | + MLGPUComputeDevice, |
| 41 | + MLNeuralEngineComputeDevice, |
| 42 | +) |
| 43 | +from coremltools.models.compute_plan import MLComputePlan |
| 44 | + |
| 45 | + |
| 46 | +_DEVICE_NAMES: List[Tuple[type, str]] = [ |
| 47 | + (MLNeuralEngineComputeDevice, "ANE"), |
| 48 | + (MLGPUComputeDevice, "GPU"), |
| 49 | + (MLCPUComputeDevice, "CPU"), |
| 50 | +] |
| 51 | + |
| 52 | +_COMPUTE_UNIT_CHOICES = { |
| 53 | + "all": ct.ComputeUnit.ALL, |
| 54 | + "cpu_and_ne": ct.ComputeUnit.CPU_AND_NE, |
| 55 | + "cpu_and_gpu": ct.ComputeUnit.CPU_AND_GPU, |
| 56 | + "cpu_only": ct.ComputeUnit.CPU_ONLY, |
| 57 | +} |
| 58 | + |
| 59 | + |
| 60 | +def _device_name(device) -> str: |
| 61 | + if device is None: |
| 62 | + return "unknown" |
| 63 | + for cls, name in _DEVICE_NAMES: |
| 64 | + if isinstance(device, cls): |
| 65 | + return name |
| 66 | + return type(device).__name__ |
| 67 | + |
| 68 | + |
| 69 | +def _iter_operations(block) -> Iterable: |
| 70 | + for op in block.operations: |
| 71 | + yield op |
| 72 | + for nested in getattr(op, "blocks", None) or []: |
| 73 | + yield from _iter_operations(nested) |
| 74 | + |
| 75 | + |
| 76 | +def _ensure_compiled(model_path: str, tmpdir: str) -> str: |
| 77 | + """Return a `.mlmodelc` path; compile from `.mlpackage` if needed.""" |
| 78 | + if model_path.endswith(".mlmodelc"): |
| 79 | + return model_path |
| 80 | + if model_path.endswith(".mlpackage"): |
| 81 | + dest = os.path.join( |
| 82 | + tmpdir, os.path.basename(model_path).replace(".mlpackage", ".mlmodelc") |
| 83 | + ) |
| 84 | + return str(ct.models.utils.compile_model(model_path, destination_path=dest)) |
| 85 | + raise ValueError( |
| 86 | + f"Expected a .mlpackage or .mlmodelc path, got: {model_path}" |
| 87 | + ) |
| 88 | + |
| 89 | + |
| 90 | +def _extract_models_from_pte(pte_path: str, out_dir: str) -> List[str]: |
| 91 | + """Pull every CoreML partition out of a .pte into `out_dir`. |
| 92 | +
|
| 93 | + Returns a list of paths to the extracted model directories (which |
| 94 | + `MLComputePlan.load_from_path` accepts directly). |
| 95 | + """ |
| 96 | + # Imported lazily so the script still runs against a plain .mlpackage |
| 97 | + # without requiring the executorch package. |
| 98 | + from executorch.backends.apple.coreml import executorchcoreml |
| 99 | + from executorch.exir._serialize._program import deserialize_pte_binary |
| 100 | + from executorch.exir.schema import ( |
| 101 | + BackendDelegateDataReference, |
| 102 | + DataLocation, |
| 103 | + ) |
| 104 | + import json |
| 105 | + |
| 106 | + COREML_BACKEND_ID = "CoreMLBackend" |
| 107 | + MAGIC_NUMBER = b"CMJR" |
| 108 | + |
| 109 | + with open(pte_path, "rb") as f: |
| 110 | + pte_data = f.read() |
| 111 | + pte_file = deserialize_pte_binary(pte_data) |
| 112 | + program = pte_file.program |
| 113 | + |
| 114 | + named_data = {} |
| 115 | + if pte_file.named_data is not None: |
| 116 | + for key, entry in pte_file.named_data.pte_data.items(): |
| 117 | + named_data[key] = pte_file.named_data.buffers[entry.buffer_index] |
| 118 | + |
| 119 | + delegates = sum((p.delegates for p in program.execution_plan), []) |
| 120 | + coreml_delegates = [d for d in delegates if d.id == COREML_BACKEND_ID] |
| 121 | + if not coreml_delegates: |
| 122 | + return [] |
| 123 | + |
| 124 | + extracted: List[str] = [] |
| 125 | + seen_keys: set = set() |
| 126 | + for i, delegate in enumerate(coreml_delegates): |
| 127 | + ref: BackendDelegateDataReference = delegate.processed |
| 128 | + if ref.location != DataLocation.INLINE: |
| 129 | + continue |
| 130 | + raw = program.backend_delegate_data[ref.index].data |
| 131 | + model_bytes: Optional[bytes] = None |
| 132 | + name: Optional[str] = None |
| 133 | + if raw.startswith(MAGIC_NUMBER): |
| 134 | + reference = json.loads(raw[len(MAGIC_NUMBER) :].decode("utf-8")) |
| 135 | + key = reference.get("key") |
| 136 | + if key in seen_keys or key not in named_data: |
| 137 | + continue |
| 138 | + seen_keys.add(key) |
| 139 | + model_bytes = named_data[key] |
| 140 | + name = key |
| 141 | + else: |
| 142 | + model_bytes = raw |
| 143 | + name = f"model_{i + 1}" |
| 144 | + if model_bytes is None: |
| 145 | + continue |
| 146 | + out_path = os.path.join(out_dir, name) |
| 147 | + os.makedirs(out_path, exist_ok=True) |
| 148 | + if executorchcoreml.unflatten_directory_contents(model_bytes, out_path): |
| 149 | + extracted.append(out_path) |
| 150 | + return extracted |
| 151 | + |
| 152 | + |
| 153 | +def analyze_one(model_path: str, compute_units: ct.ComputeUnit) -> List[Tuple[str, str, str]]: |
| 154 | + """Return [(function, operator_name, device)] for every op that has a plan.""" |
| 155 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 156 | + compiled = _ensure_compiled(model_path, tmpdir) |
| 157 | + plan = MLComputePlan.load_from_path(compiled, compute_units=compute_units) |
| 158 | + program = plan.model_structure.program |
| 159 | + if program is None: |
| 160 | + raise RuntimeError( |
| 161 | + f"{model_path} is not an MLProgram model; this tool only supports " |
| 162 | + "the MLProgram backend (the CoreML backend executorch produces today)." |
| 163 | + ) |
| 164 | + |
| 165 | + rows: List[Tuple[str, str, str]] = [] |
| 166 | + for fname, fn in program.functions.items(): |
| 167 | + for op in _iter_operations(fn.block): |
| 168 | + usage = plan.get_compute_device_usage_for_mlprogram_operation(op) |
| 169 | + if usage is None: |
| 170 | + # Constants and similar non-dispatched ops don't have a plan. |
| 171 | + continue |
| 172 | + rows.append( |
| 173 | + (fname, op.operator_name, _device_name(usage.preferred_compute_device)) |
| 174 | + ) |
| 175 | + return rows |
| 176 | + |
| 177 | + |
| 178 | +def _print_report(label: str, rows: List[Tuple[str, str, str]], show_non_ane: bool) -> None: |
| 179 | + print(f"\n=== {label} ===") |
| 180 | + if not rows: |
| 181 | + print(" (no dispatched operations found)") |
| 182 | + return |
| 183 | + by_device = Counter(device for _, _, device in rows) |
| 184 | + total = sum(by_device.values()) |
| 185 | + for device in ("ANE", "GPU", "CPU", "unknown"): |
| 186 | + count = by_device.get(device, 0) |
| 187 | + if count == 0: |
| 188 | + continue |
| 189 | + pct = 100.0 * count / total |
| 190 | + print(f" {device}: {count:5d} / {total} ({pct:5.1f}%)") |
| 191 | + |
| 192 | + if show_non_ane: |
| 193 | + non_ane = [(fn, op_name) for fn, op_name, dev in rows if dev != "ANE"] |
| 194 | + if non_ane: |
| 195 | + print("\n Non-ANE op types:") |
| 196 | + for op_name, count in Counter(op for _, op in non_ane).most_common(): |
| 197 | + print(f" {count:5d} {op_name}") |
| 198 | + |
| 199 | + |
| 200 | +def main() -> int: |
| 201 | + parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) |
| 202 | + parser.add_argument( |
| 203 | + "--model_path", |
| 204 | + required=True, |
| 205 | + help="Path to a .pte, .mlpackage, or .mlmodelc.", |
| 206 | + ) |
| 207 | + parser.add_argument( |
| 208 | + "--compute_units", |
| 209 | + default="cpu_and_ne", |
| 210 | + choices=sorted(_COMPUTE_UNIT_CHOICES), |
| 211 | + help="Which devices the runtime may use when planning dispatch.", |
| 212 | + ) |
| 213 | + parser.add_argument( |
| 214 | + "--show_non_ane", |
| 215 | + action="store_true", |
| 216 | + help="List op types that did not get assigned to the ANE.", |
| 217 | + ) |
| 218 | + args = parser.parse_args() |
| 219 | + |
| 220 | + compute_units = _COMPUTE_UNIT_CHOICES[args.compute_units] |
| 221 | + model_path = args.model_path |
| 222 | + |
| 223 | + if model_path.endswith(".pte"): |
| 224 | + with tempfile.TemporaryDirectory() as out_dir: |
| 225 | + extracted = _extract_models_from_pte(model_path, out_dir) |
| 226 | + if not extracted: |
| 227 | + print( |
| 228 | + f"{model_path} does not contain any CoreML delegate partitions.", |
| 229 | + file=sys.stderr, |
| 230 | + ) |
| 231 | + return 1 |
| 232 | + for path in extracted: |
| 233 | + rows = analyze_one(path, compute_units) |
| 234 | + _print_report(os.path.basename(path), rows, args.show_non_ane) |
| 235 | + else: |
| 236 | + rows = analyze_one(model_path, compute_units) |
| 237 | + _print_report(os.path.basename(model_path.rstrip("/")), rows, args.show_non_ane) |
| 238 | + return 0 |
| 239 | + |
| 240 | + |
| 241 | +if __name__ == "__main__": |
| 242 | + sys.exit(main()) |
0 commit comments