|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""Flash a coreboot binary to a free SnipeIT-managed DUT via osfv_cli. |
| 3 | +
|
| 4 | +Looks up a Ready-to-Deploy, unassigned asset for the given model in SnipeIT, |
| 5 | +reads its RTE IP from a custom field, checks the asset out to the calling |
| 6 | +user, runs `osfv_cli rte --rte_ip <ip> flash write --rom <rom>`, then checks |
| 7 | +the asset back in (always, even if the flash fails). |
| 8 | +
|
| 9 | +Env vars: |
| 10 | + SNIPEIT_IP SnipeIT host IP/hostname (default: 192.168.4.202) |
| 11 | + SNIPEIT_API_KEY Personal API token (required) |
| 12 | +""" |
| 13 | + |
| 14 | +import argparse |
| 15 | +import os |
| 16 | +import subprocess |
| 17 | +import sys |
| 18 | + |
| 19 | +import requests |
| 20 | + |
| 21 | + |
| 22 | +SNIPEIT_IP = os.environ.get("SNIPEIT_IP", "192.168.4.202").strip().rstrip("/") |
| 23 | +SNIPEIT_URL = f"http://{SNIPEIT_IP}" |
| 24 | +SNIPEIT_API_KEY = os.environ.get("SNIPEIT_API_KEY") |
| 25 | +READY_TO_DEPLOY = "Ready to Deploy" |
| 26 | + |
| 27 | + |
| 28 | +def api(method, path, **kwargs): |
| 29 | + if not SNIPEIT_API_KEY: |
| 30 | + sys.exit("SNIPEIT_API_KEY env var is required") |
| 31 | + headers = kwargs.pop("headers", {}) |
| 32 | + headers.update({ |
| 33 | + "Authorization": f"Bearer {SNIPEIT_API_KEY}", |
| 34 | + "Accept": "application/json", |
| 35 | + "Content-Type": "application/json", |
| 36 | + }) |
| 37 | + r = requests.request( |
| 38 | + method, f"{SNIPEIT_URL}/api/v1{path}", |
| 39 | + headers=headers, timeout=30, **kwargs, |
| 40 | + ) |
| 41 | + r.raise_for_status() |
| 42 | + data = r.json() |
| 43 | + # SnipeIT returns HTTP 200 with {"status":"error",...} on logical errors. |
| 44 | + if isinstance(data, dict) and data.get("status") == "error": |
| 45 | + sys.exit(f"SnipeIT error on {method} {path}: {data.get('messages')}") |
| 46 | + return data |
| 47 | + |
| 48 | + |
| 49 | +def find_model_id(name): |
| 50 | + data = api("GET", "/models", params={"search": name, "limit": 50}) |
| 51 | + rows = data.get("rows", []) |
| 52 | + exact = [m for m in rows if m["name"].lower() == name.lower()] |
| 53 | + matches = exact or rows |
| 54 | + if not matches: |
| 55 | + sys.exit(f"No SnipeIT model matches '{name}'") |
| 56 | + if len(matches) > 1: |
| 57 | + names = ", ".join(sorted(m["name"] for m in matches)) |
| 58 | + sys.exit(f"Ambiguous model name '{name}': {names}") |
| 59 | + return matches[0]["id"] |
| 60 | + |
| 61 | + |
| 62 | +def find_free_asset(model_id): |
| 63 | + data = api("GET", "/hardware", params={ |
| 64 | + "model_id": model_id, |
| 65 | + "status": "RTD", |
| 66 | + "limit": 100, |
| 67 | + }) |
| 68 | + for asset in data.get("rows", []): |
| 69 | + status = (asset.get("status_label") or {}).get("name") |
| 70 | + if status != READY_TO_DEPLOY: |
| 71 | + continue |
| 72 | + if asset.get("assigned_to"): |
| 73 | + continue |
| 74 | + return asset |
| 75 | + sys.exit( |
| 76 | + f"No '{READY_TO_DEPLOY}' unassigned asset found for model_id={model_id}" |
| 77 | + ) |
| 78 | + |
| 79 | + |
| 80 | +def get_rte_ip(asset): |
| 81 | + fields = asset.get("custom_fields") or {} |
| 82 | + for key, payload in fields.items(): |
| 83 | + k = key.lower() |
| 84 | + if "rte" in k and "ip" in k: |
| 85 | + value = (payload or {}).get("value") |
| 86 | + if value: |
| 87 | + return value |
| 88 | + sys.exit(f"Asset {asset.get('asset_tag')} has no RTE IP custom field") |
| 89 | + |
| 90 | + |
| 91 | +def get_my_user_id(): |
| 92 | + return api("GET", "/users/me")["id"] |
| 93 | + |
| 94 | + |
| 95 | +def checkout(asset_id, user_id): |
| 96 | + api("POST", f"/hardware/{asset_id}/checkout", json={ |
| 97 | + "checkout_to_type": "user", |
| 98 | + "assigned_user": user_id, |
| 99 | + "note": "flash_via_snipeit.py auto-checkout", |
| 100 | + }) |
| 101 | + |
| 102 | + |
| 103 | +def checkin(asset_id): |
| 104 | + api("POST", f"/hardware/{asset_id}/checkin", json={ |
| 105 | + "note": "flash_via_snipeit.py auto-checkin", |
| 106 | + }) |
| 107 | + |
| 108 | + |
| 109 | +def flash(rte_ip, rom_path): |
| 110 | + cmd = ["osfv_cli", "rte", "--rte_ip", rte_ip, |
| 111 | + "flash", "write", "--rom", rom_path] |
| 112 | + print("+ " + " ".join(cmd), flush=True) |
| 113 | + return subprocess.run(cmd, check=False).returncode |
| 114 | + |
| 115 | + |
| 116 | +def main(): |
| 117 | + ap = argparse.ArgumentParser( |
| 118 | + description="Flash a coreboot binary to a SnipeIT-managed DUT." |
| 119 | + ) |
| 120 | + ap.add_argument("rom", help="Path to coreboot binary to flash") |
| 121 | + ap.add_argument("model", help="Platform/model name as it appears in SnipeIT") |
| 122 | + args = ap.parse_args() |
| 123 | + |
| 124 | + if not os.path.isfile(args.rom): |
| 125 | + sys.exit(f"ROM file not found: {args.rom}") |
| 126 | + |
| 127 | + user_id = get_my_user_id() |
| 128 | + model_id = find_model_id(args.model) |
| 129 | + asset = find_free_asset(model_id) |
| 130 | + rte_ip = get_rte_ip(asset) |
| 131 | + asset_id = asset["id"] |
| 132 | + asset_tag = asset["asset_tag"] |
| 133 | + print(f"Selected asset {asset_tag} (id={asset_id}), RTE IP {rte_ip}") |
| 134 | + |
| 135 | + print(f"Checking out {asset_tag} to user_id={user_id}...") |
| 136 | + checkout(asset_id, user_id) |
| 137 | + try: |
| 138 | + rc = flash(rte_ip, args.rom) |
| 139 | + finally: |
| 140 | + print(f"Checking in {asset_tag}...") |
| 141 | + try: |
| 142 | + checkin(asset_id) |
| 143 | + except Exception as e: |
| 144 | + print(f"WARNING: check-in failed: {e}", file=sys.stderr) |
| 145 | + sys.exit(rc) |
| 146 | + |
| 147 | + |
| 148 | +if __name__ == "__main__": |
| 149 | + main() |
0 commit comments