diff --git a/src/defib/cli/app.py b/src/defib/cli/app.py index 07d0bd4..a12ea52 100644 --- a/src/defib/cli/app.py +++ b/src/defib/cli/app.py @@ -2501,6 +2501,14 @@ def restore( nic: str = typer.Option("", "--nic", help="Network interface for TFTP"), power_cycle: bool = typer.Option(False, "--power-cycle", help="Auto power-cycle via PoE"), poe_port_override: str = typer.Option("", "--poe-port", help="Explicit MikroTik ether port (e.g. ether3) — overrides comment-based auto-discovery. Requires --power-cycle."), + tftp_via: str = typer.Option( + "auto", "--tftp-via", + help="Where the camera fetches partitions from: " + "'host' (defib starts a host-side TFTP server — needs sudo/port-69), " + "'pod' (stage in the rack pod's PSRAM and serve from 192.168.1.1 — " + "no host setup, requires DEFIB_POWER_TYPE=rack), " + "'auto' (pod when power=rack, else host).", + ), output: str = typer.Option("human", "--output", help="Output mode: human, json"), debug: bool = typer.Option(False, "-d", "--debug", help="Enable debug logging"), ) -> None: @@ -2515,18 +2523,19 @@ def restore( Examples: defib restore -c hi3516av200 -i /path/to/dump/ -p /dev/ttyUSB0 --power-cycle defib restore -c hi3516ev300 -i flash.bin -p /dev/ttyUSB0 --flash-type nor + defib restore -c hi3516ev300 -i dump/ -p rack://10.216.128.69 --power-cycle """ import asyncio asyncio.run(_restore_async( chip, dump, port, uboot, flash_type, mtdparts, host_ip, device_ip, nic, - power_cycle, poe_port_override, output, debug, + power_cycle, poe_port_override, tftp_via, output, debug, )) async def _restore_async( chip: str, dump: str, port: str, uboot_path: str, flash_type: str, mtdparts_arg: str, host_ip: str, device_ip: str, nic: str, power_cycle: bool, - poe_port_override: str, output: str, debug: bool, + poe_port_override: str, tftp_via: str, output: str, debug: bool, ) -> None: import json as json_mod import logging @@ -2664,6 +2673,7 @@ async def _restore_async( poe_port = None if power_cycle: from defib.power.factory import power_controller_from_env + from defib.power.rack import RackController from defib.power.routeros import RouterOSController try: power_controller = power_controller_from_env() @@ -2671,64 +2681,56 @@ async def _restore_async( console.print(f"[red]Power controller error:[/red] {e}") raise typer.Exit(1) - # restore needs independent power_off/power_on (frame-blast flow); - # Vectis only emits a fixed pulse and cannot satisfy that. - if not isinstance(power_controller, RouterOSController): + # restore on local UART needs independent power_off/power_on so + # the host can start the boot-ROM frame-blast on a quiet line + # and then trigger power-on. RouterOSController exposes that. + # RackController is fine too — its `fastboot` endpoint does the + # whole power-cycle + handshake on the pod locally, so we don't + # need the host-side frame-blast race at all. + if not isinstance(power_controller, (RouterOSController, RackController)): console.print( f"[red]restore requires a controller that supports independent " - f"power_off/power_on; {power_controller.name()!r} only supports " - f"power_cycle. Use --power-cycle with DEFIB_POWER_TYPE=routeros, " + f"power_off/power_on (RouterOS) or pod-side fastboot (RackController); " + f"{power_controller.name()!r} doesn't. " + f"Use --power-cycle with DEFIB_POWER_TYPE=routeros|rack, " f"or run restore without --power-cycle and cycle power manually." f"[/red]" ) await power_controller.close() raise typer.Exit(1) - if poe_port_override: - poe_port = poe_port_override - if output == "human": - console.print(f" PoE: [cyan]{poe_port}[/cyan] (explicit)") - else: - port_basename = Path(port).name - device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename - try: - poe_port = await power_controller.find_port_by_comment(device_label) - except Exception as e: - console.print(f"[red]PoE port discovery failed:[/red] {e}") - if power_controller: - await power_controller.close() - raise typer.Exit(1) + if isinstance(power_controller, RouterOSController): + if poe_port_override: + poe_port = poe_port_override + if output == "human": + console.print(f" PoE: [cyan]{poe_port}[/cyan] (explicit)") + else: + port_basename = Path(port).name + device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename + try: + poe_port = await power_controller.find_port_by_comment(device_label) + except Exception as e: + console.print(f"[red]PoE port discovery failed:[/red] {e}") + if power_controller: + await power_controller.close() + raise typer.Exit(1) + if output == "human": + console.print(f" PoE: [cyan]{poe_port}[/cyan]") + else: + # Rack: pod-side fastboot handles its own power-cycle. Empty + # string here signals "automated power available" to downstream + # code paths (same convention as Vectis in install/burn). + poe_port = "" if output == "human": - console.print(f" PoE: [cyan]{poe_port}[/cyan]") + console.print(f" Power: [cyan]{power_controller.name()}[/cyan]") # --- Phase 1: Burn U-Boot to RAM --- if output == "human": console.print("\n[bold yellow]Phase 1: Loading U-Boot to RAM[/bold yellow]") - # For restore with power-cycle: power off first (kill running OS), - # open serial on a quiet line, then let session handle power-on. - if power_controller and poe_port: - import asyncio as _aio - from defib.power.routeros import RouterOSController - # We rejected non-RouterOS controllers above — narrow for mypy. - assert isinstance(power_controller, RouterOSController) - if output == "human": - console.print(" Powering off...") - power_controller._saved_poe_out[poe_port] = "forced-on" - await power_controller.power_off(poe_port) - await _aio.sleep(3) - if output == "human": - console.print(" Device off, opening serial...") - - transport = await create_transport(normalize_port_name(port)) - await transport.flush_input() - - # Don't pass power controller to session — it would call power_cycle - # which doesn't work on an already-off port. We handle power-on below. - session = RecoverySession( - chip=chip, firmware_path=uboot_path, - ) + from defib.power.rack import RackController + use_rack_fastboot = isinstance(power_controller, RackController) def on_progress(e: ProgressEvent) -> None: if output == "human" and e.message: @@ -2738,27 +2740,67 @@ def on_log(e: LogEvent) -> None: if output == "human" and e.level != "debug": console.print(f" {e.message}") - # Start session (frame-blast begins immediately for PRESTEP0 chips), - # then power on so the bootrom sees 0xAA+HEAD when it starts. - if power_controller and poe_port: - import asyncio as _aio - session_task = _aio.create_task( - session.run(transport, on_progress=on_progress, on_log=on_log) - ) - await _aio.sleep(0.5) # let frame-blast start + if use_rack_fastboot: + # Pod-side fastboot: the rack pod owns UART exclusively for the + # upload, so we open `transport` only after it returns. Handles + # its own power-cycle internally. + from defib.recovery.rack_fastboot import run_rack_fastboot + assert isinstance(power_controller, RackController) if output == "human": - console.print(" Powering on...") - await power_controller.power_on(poe_port) - result = await session_task + console.print(" Pod-side fastboot in progress…") + uboot_bytes = Path(uboot_path).read_bytes() + result = await run_rack_fastboot(power_controller, chip, uboot_bytes) + if not result.success: + console.print(f"[red]Burn failed:[/red] {result.error}") + if power_controller: + await power_controller.close() + raise typer.Exit(1) + transport = await create_transport(normalize_port_name(port)) else: - result = await session.run(transport, on_progress=on_progress, on_log=on_log) + # RouterOS frame-blast: power off → open serial on a quiet line → + # start session.run (frame-blast loop starts) → power on. + if power_controller and poe_port: + import asyncio as _aio + from defib.power.routeros import RouterOSController + assert isinstance(power_controller, RouterOSController) + if output == "human": + console.print(" Powering off...") + power_controller._saved_poe_out[poe_port] = "forced-on" + await power_controller.power_off(poe_port) + await _aio.sleep(3) + if output == "human": + console.print(" Device off, opening serial...") - if not result.success: - console.print(f"[red]Burn failed:[/red] {result.error}") - await transport.close() - if power_controller: - await power_controller.close() - raise typer.Exit(1) + transport = await create_transport(normalize_port_name(port)) + await transport.flush_input() + + # Don't pass power controller to session — it would call power_cycle + # which doesn't work on an already-off port. We handle power-on below. + session = RecoverySession( + chip=chip, firmware_path=uboot_path, + ) + + if power_controller and poe_port: + import asyncio as _aio + session_task = _aio.create_task( + session.run(transport, on_progress=on_progress, on_log=on_log) + ) + await _aio.sleep(0.5) # let frame-blast start + if output == "human": + console.print(" Powering on...") + from defib.power.routeros import RouterOSController + assert isinstance(power_controller, RouterOSController) + await power_controller.power_on(poe_port) + result = await session_task + else: + result = await session.run(transport, on_progress=on_progress, on_log=on_log) + + if not result.success: + console.print(f"[red]Burn failed:[/red] {result.error}") + await transport.close() + if power_controller: + await power_controller.close() + raise typer.Exit(1) if output == "human": console.print(f" [green]U-Boot loaded ({result.elapsed_ms:.0f}ms)[/green]") @@ -2835,14 +2877,40 @@ async def _send(cmd: str, timeout: float = 60.0) -> str: if output == "human": console.print("\n[bold yellow]Phase 4: Network setup[/bold yellow]") + # Pick TFTP backend — same logic as install. When --tftp-via=auto + # (default), pod when power=rack, host otherwise. Pod path means + # the camera reaches the pod over its own LAN at 192.168.1.1 and + # we use a camera-side IP from that subnet. + use_pod_tftp = ( + tftp_via == "pod" + or (tftp_via == "auto" and isinstance(power_controller, RackController)) + ) + if tftp_via == "pod" and not isinstance(power_controller, RackController): + console.print( + "[red]--tftp-via pod requires DEFIB_POWER_TYPE=rack[/red] " + "(no rack pod to host TFTP)." + ) + await transport.close() + raise typer.Exit(1) + + if use_pod_tftp: + # Override host_ip + device_ip so they live on the pod's camera- + # side subnet — the host_ip auto-detect picks something on the + # WiFi-uplink subnet (e.g. 10.216.128.x) which the camera can't + # reach. + serverip = "192.168.1.1" + device_ip = "192.168.1.20" + else: + serverip = host_ip + await _send(f"setenv ipaddr {device_ip}") - await _send(f"setenv serverip {host_ip}") + await _send(f"setenv serverip {serverip}") # Retry ping — PHY may need time to establish link import asyncio as _asyncio ping_ok = False for attempt in range(5): - resp = await _send(f"ping {host_ip}", timeout=15) + resp = await _send(f"ping {serverip}", timeout=15) if "is alive" in resp: ping_ok = True if output == "human": @@ -2861,168 +2929,241 @@ async def _send(cmd: str, timeout: float = 60.0) -> str: if output == "human": console.print("\n[bold yellow]Phase 5: Writing flash[/bold yellow]") - from defib.network.tftp_server import start_tftp_server - tftp_files = {name: data for name, data in partitions} - tftp_transport, tftp_proto = await start_tftp_server( - files=tftp_files, bind_addr=host_ip, port=69, done_count=len(partitions), - ) - ram_addr = 0x82000000 - offset = 0 - ubi_partmap: dict[int, tuple[str, int, int]] | None = None - - # Write boot partition (offset 0) LAST — setenv/network commands during - # restore cause U-Boot to save env to NAND within the boot area, so - # writing boot last overwrites any env corruption. - write_order = list(enumerate(partitions)) - boot_parts = [(i, p) for i, p in write_order if i == 0 or p[0] in ("mtd0", "boot")] - other_parts = [(i, p) for i, p in write_order if (i, p) not in boot_parts] - write_order = other_parts + boot_parts - - # Pre-compute partition offsets from mtdparts if available, else from data sizes - _part_offsets: dict[int, int] = {} - if mtdparts_arg and detected_flash == "nand": - import re as _re_off - _pstr = mtdparts_arg.split(":", 1)[1] if ":" in mtdparts_arg else mtdparts_arg - _cur = 0 - for _i, _pd in enumerate(_pstr.split(",")): - _part_offsets[_i] = _cur - _fm = _re_off.match(r"-\(", _pd.strip()) - if _fm: - break # fill remainder — no more offsets to compute - _pm = _re_off.match(r"([\d]+)([kKmM]?)", _pd.strip()) - if _pm: - _sz = int(_pm.group(1)) - _u = _pm.group(2).upper() - if _u == "M": - _sz *= 1024 * 1024 - elif _u == "K": - _sz *= 1024 - _cur += _sz - # Fill remaining indices from cumulative - for _i in range(len(partitions)): - if _i not in _part_offsets: - _part_offsets[_i] = _cur - else: - _off = 0 - for i, (_, pdata) in enumerate(partitions): - _part_offsets[i] = _off - _off += len(pdata) - - for part_idx, (name, data) in write_order: - # Pad to page alignment for NAND (2KB pages) - page = 2048 if detected_flash == "nand" else 1 - write_size = ((len(data) + page - 1) // page) * page - - # Use real partition offset from mtdparts if available, else from sequential order - if ubi_partmap is not None and part_idx in ubi_partmap: - _, real_offset, _ = ubi_partmap[part_idx] - offset = real_offset - else: - offset = _part_offsets[part_idx] + from contextlib import AsyncExitStack + _tftp_stack = AsyncExitStack() + await _tftp_stack.__aenter__() + tftp_proto = None # only host path uses this for UBI replacement + + if use_pod_tftp: + assert isinstance(power_controller, RackController) + _rack_for_close = power_controller + + # Wipe anything still staged from a prior aborted run BEFORE we + # try to allocate — PSRAM is bounded and a leftover 4 MB rootfs + # will OOM the next stage at 256 KB headroom. + await power_controller.tftp_clear() + + async def _aclose_pod_tftp() -> None: + try: + await _rack_for_close.tftp_clear() + except Exception: + pass + + # Register cleanup BEFORE staging — so even a mid-stage OOM + # still triggers cleanup on AsyncExitStack exit. + _tftp_stack.push_async_callback(_aclose_pod_tftp) if output == "human": console.print( - f"\n [bold]{name}[/bold]: {len(data) // 1024}KB → " - f"0x{offset:X}" + f" [cyan]Staging {sum(len(d) for d in tftp_files.values()) // 1024} KB " + f"in pod PSRAM via POST /tftp/...[/cyan]" ) + for _name, _data in tftp_files.items(): + await power_controller.tftp_put(_name, _data, timeout=180.0) - t0 = _time.monotonic() - - # Detect partition type before TFTP - # Raw UBI images (UBI# magic) must be converted to UBIFS first — - # nand write of raw UBI corrupts UBIFS due to bad block shifting. - from defib.ubi import extract_ubifs as _extract_ubifs, is_ubi_image as _is_ubi + async def _replace_in_tftp(name: str, data: bytes) -> None: + await _rack_for_close.tftp_put(name, data, timeout=180.0) - is_ubifs = ( - detected_flash == "nand" - and len(data) >= 4 - and data[:4] == b"\x31\x18\x10\x06" # UBIFS superblock + if output == "human": + console.print(" [green]Pod TFTP ready on 192.168.1.1:69[/green]") + else: + from defib.network.tftp_server import start_tftp_server + tftp_transport, tftp_proto = await start_tftp_server( + files=tftp_files, bind_addr=host_ip, port=69, done_count=len(partitions), ) - if detected_flash == "nand" and len(data) >= 4 and _is_ubi(data): - if output == "human": - console.print(" Raw UBI image → extracting UBIFS volume data") - data = _extract_ubifs(data) - is_ubifs = True - if output == "human": - console.print(f" Extracted {len(data)} bytes of UBIFS") - # Update TFTP file and write_size for the extracted data + _tftp_stack.callback(tftp_transport.close) + + async def _replace_in_tftp(name: str, data: bytes) -> None: tftp_proto._files[name] = data - write_size = ((len(data) + page - 1) // page) * page + try: - if detected_flash == "nand" and is_ubifs: - # UBI-aware write: let UBI handle bad block mapping + ram_addr = 0x82000000 + offset = 0 + ubi_partmap: dict[int, tuple[str, int, int]] | None = None + + # Write boot partition (offset 0) LAST — setenv/network commands during + # restore cause U-Boot to save env to NAND within the boot area, so + # writing boot last overwrites any env corruption. + write_order = list(enumerate(partitions)) + boot_parts = [(i, p) for i, p in write_order if i == 0 or p[0] in ("mtd0", "boot")] + other_parts = [(i, p) for i, p in write_order if (i, p) not in boot_parts] + write_order = other_parts + boot_parts + + # Pre-compute partition offsets from mtdparts if available, else from data sizes + _part_offsets: dict[int, int] = {} + if mtdparts_arg and detected_flash == "nand": + import re as _re_off + _pstr = mtdparts_arg.split(":", 1)[1] if ":" in mtdparts_arg else mtdparts_arg + _cur = 0 + for _i, _pd in enumerate(_pstr.split(",")): + _part_offsets[_i] = _cur + _fm = _re_off.match(r"-\(", _pd.strip()) + if _fm: + break # fill remainder — no more offsets to compute + _pm = _re_off.match(r"([\d]+)([kKmM]?)", _pd.strip()) + if _pm: + _sz = int(_pm.group(1)) + _u = _pm.group(2).upper() + if _u == "M": + _sz *= 1024 * 1024 + elif _u == "K": + _sz *= 1024 + _cur += _sz + # Fill remaining indices from cumulative + for _i in range(len(partitions)): + if _i not in _part_offsets: + _part_offsets[_i] = _cur + else: + _off = 0 + for i, (_, pdata) in enumerate(partitions): + _part_offsets[i] = _off + _off += len(pdata) + + for part_idx, (name, data) in write_order: + # Pad to page alignment for NAND (2KB pages) + page = 2048 if detected_flash == "nand" else 1 + write_size = ((len(data) + page - 1) // page) * page + + # Use real partition offset from mtdparts if available, else from sequential order + if ubi_partmap is not None and part_idx in ubi_partmap: + _, real_offset, _ = ubi_partmap[part_idx] + offset = real_offset + else: + offset = _part_offsets[part_idx] + if output == "human": - console.print(" UBI partition detected") - - # Need mtdids + mtdparts for ubi commands. - # Parse NAND partition layout from bootargs or use default. - if ubi_partmap is None: - import re as _re - mtdparts_val = None - - # Use --mtdparts if provided - if mtdparts_arg: - mtdparts_val = mtdparts_arg - else: - # Try bootargs (look for NAND device like hinand) - resp = await _send("printenv bootargs", timeout=5) - m = _re.search(r"mtdparts=(hinand:[\S]+)", resp) - if not m: - resp = await _send("printenv mtdparts", timeout=5) - m = _re.search(r"mtdparts=(hinand:[\S]+)", resp) - if m: - mtdparts_val = m.group(1) - - if not mtdparts_val: - if output == "human": - console.print(" [red]Cannot determine NAND partition layout for UBI.[/red]") - console.print(" [red]Use --mtdparts to specify it.[/red]") - continue - - nand_name = mtdparts_val.split(":")[0] - await _send(f"setenv mtdids nand0={nand_name}") - await _send(f"setenv mtdparts mtdparts={mtdparts_val}") - await _send("mtdparts") + console.print( + f"\n [bold]{name}[/bold]: {len(data) // 1024}KB → " + f"0x{offset:X}" + ) + + t0 = _time.monotonic() + + # Detect partition type before TFTP + # Raw UBI images (UBI# magic) must be converted to UBIFS first — + # nand write of raw UBI corrupts UBIFS due to bad block shifting. + from defib.ubi import extract_ubifs as _extract_ubifs, is_ubi_image as _is_ubi + + is_ubifs = ( + detected_flash == "nand" + and len(data) >= 4 + and data[:4] == b"\x31\x18\x10\x06" # UBIFS superblock + ) + if detected_flash == "nand" and len(data) >= 4 and _is_ubi(data): if output == "human": - console.print(f" mtdparts: {mtdparts_val[:70]}") - - # Parse partition offsets and sizes - ubi_partmap = {} - part_str = mtdparts_val.split(":", 1)[1] - cur_off = 0 - for i, pdef in enumerate(part_str.split(",")): - pdef_s = pdef.strip() - # Handle "-(name)" fill-remainder syntax - fm = _re.match(r"-\((\w+)\)", pdef_s) - if fm: - ubi_partmap[i] = (fm.group(1), cur_off, 0) + console.print(" Raw UBI image → extracting UBIFS volume data") + data = _extract_ubifs(data) + is_ubifs = True + if output == "human": + console.print(f" Extracted {len(data)} bytes of UBIFS") + # Update TFTP file — works for both host (dict reassignment) + # and pod (HTTP repost) via the unified _replace_in_tftp hook. + await _replace_in_tftp(name, data) + write_size = ((len(data) + page - 1) // page) * page + + if detected_flash == "nand" and is_ubifs: + # UBI-aware write: let UBI handle bad block mapping + if output == "human": + console.print(" UBI partition detected") + + # Need mtdids + mtdparts for ubi commands. + # Parse NAND partition layout from bootargs or use default. + if ubi_partmap is None: + import re as _re + mtdparts_val = None + + # Use --mtdparts if provided + if mtdparts_arg: + mtdparts_val = mtdparts_arg + else: + # Try bootargs (look for NAND device like hinand) + resp = await _send("printenv bootargs", timeout=5) + m = _re.search(r"mtdparts=(hinand:[\S]+)", resp) + if not m: + resp = await _send("printenv mtdparts", timeout=5) + m = _re.search(r"mtdparts=(hinand:[\S]+)", resp) + if m: + mtdparts_val = m.group(1) + + if not mtdparts_val: + if output == "human": + console.print(" [red]Cannot determine NAND partition layout for UBI.[/red]") + console.print(" [red]Use --mtdparts to specify it.[/red]") continue - pm = _re.match(r"([\d]+[kKmM]?)\((\w+)\)", pdef_s) - if not pm: + + nand_name = mtdparts_val.split(":")[0] + await _send(f"setenv mtdids nand0={nand_name}") + await _send(f"setenv mtdparts mtdparts={mtdparts_val}") + await _send("mtdparts") + if output == "human": + console.print(f" mtdparts: {mtdparts_val[:70]}") + + # Parse partition offsets and sizes + ubi_partmap = {} + part_str = mtdparts_val.split(":", 1)[1] + cur_off = 0 + for i, pdef in enumerate(part_str.split(",")): + pdef_s = pdef.strip() + # Handle "-(name)" fill-remainder syntax + fm = _re.match(r"-\((\w+)\)", pdef_s) + if fm: + ubi_partmap[i] = (fm.group(1), cur_off, 0) + continue + pm = _re.match(r"([\d]+[kKmM]?)\((\w+)\)", pdef_s) + if not pm: + continue + sz_str, pname = pm.group(1), pm.group(2) + if sz_str.upper().endswith("M"): + sz = int(sz_str[:-1]) * 1024 * 1024 + elif sz_str.upper().endswith("K"): + sz = int(sz_str[:-1]) * 1024 + else: + sz = int(sz_str) + ubi_partmap[i] = (pname, cur_off, sz) + cur_off += sz + + # Find the real NAND offset for this partition + part_idx = [i for i, (n, d) in enumerate(partitions) if n == name][0] + if part_idx in ubi_partmap: + real_name, real_off, real_sz = ubi_partmap[part_idx] + vol_name = real_name # kernel mounts by name (e.g. "ubi0:rootfs") + if real_sz == 0: + real_sz = 0x8000000 - real_off # fill to end of 128MB + + # TFTP → erase → ubi part → ubi create → ubi write + # TFTP must be FIRST — ubi operations allocate memory that + # conflicts with TFTP's network stack on constrained U-Boot. + resp = await _send(f"tftpboot 0x{ram_addr:x} {name}", timeout=120) + if "unknown command" in resp.lower(): + resp = await _send(f"tftp 0x{ram_addr:x} {name}", timeout=120) + if "done" not in resp.lower() and "bytes transferred" not in resp.lower(): + if output == "human": + console.print(" [red]TFTP failed[/red]") continue - sz_str, pname = pm.group(1), pm.group(2) - if sz_str.upper().endswith("M"): - sz = int(sz_str[:-1]) * 1024 * 1024 - elif sz_str.upper().endswith("K"): - sz = int(sz_str[:-1]) * 1024 - else: - sz = int(sz_str) - ubi_partmap[i] = (pname, cur_off, sz) - cur_off += sz - - # Find the real NAND offset for this partition - part_idx = [i for i, (n, d) in enumerate(partitions) if n == name][0] - if part_idx in ubi_partmap: - real_name, real_off, real_sz = ubi_partmap[part_idx] - vol_name = real_name # kernel mounts by name (e.g. "ubi0:rootfs") - if real_sz == 0: - real_sz = 0x8000000 - real_off # fill to end of 128MB - - # TFTP → erase → ubi part → ubi create → ubi write - # TFTP must be FIRST — ubi operations allocate memory that - # conflicts with TFTP's network stack on constrained U-Boot. + if output == "human": + console.print(" TFTP OK") + + if output == "human": + console.print(f" Erasing 0x{real_off:X}+0x{real_sz:X}...") + await _send(f"nand erase 0x{real_off:x} 0x{real_sz:x}", timeout=120) + if output == "human": + console.print(f" UBI format {real_name}...") + await _send(f"ubi part {real_name}", timeout=120) + # Use all available space so UBIFS has room for runtime writes + await _send(f"ubi create {vol_name}", timeout=60) + resp = await _send(f"ubi write 0x{ram_addr:x} {vol_name} 0x{len(data):x}", timeout=300) + if "error" in resp.lower() or "cannot" in resp.lower(): + if output == "human": + console.print(f" [red]ubi write failed: {resp.strip()[-80:]}[/red]") + else: + if output == "human": + console.print(f" [red]Partition {name} not in mtdparts[/red]") + continue + + else: + # Non-UBI: TFTP first, then raw write resp = await _send(f"tftpboot 0x{ram_addr:x} {name}", timeout=120) if "unknown command" in resp.lower(): resp = await _send(f"tftp 0x{ram_addr:x} {name}", timeout=120) @@ -3032,60 +3173,33 @@ async def _send(cmd: str, timeout: float = 60.0) -> str: continue if output == "human": console.print(" TFTP OK") - - if output == "human": - console.print(f" Erasing 0x{real_off:X}+0x{real_sz:X}...") - await _send(f"nand erase 0x{real_off:x} 0x{real_sz:x}", timeout=120) - if output == "human": - console.print(f" UBI format {real_name}...") - await _send(f"ubi part {real_name}", timeout=120) - # Use all available space so UBIFS has room for runtime writes - await _send(f"ubi create {vol_name}", timeout=60) - resp = await _send(f"ubi write 0x{ram_addr:x} {vol_name} 0x{len(data):x}", timeout=300) - if "error" in resp.lower() or "cannot" in resp.lower(): - if output == "human": - console.print(f" [red]ubi write failed: {resp.strip()[-80:]}[/red]") - else: - if output == "human": - console.print(f" [red]Partition {name} not in mtdparts[/red]") - continue - - else: - # Non-UBI: TFTP first, then raw write - resp = await _send(f"tftpboot 0x{ram_addr:x} {name}", timeout=120) - if "unknown command" in resp.lower(): - resp = await _send(f"tftp 0x{ram_addr:x} {name}", timeout=120) - if "done" not in resp.lower() and "bytes transferred" not in resp.lower(): - if output == "human": - console.print(" [red]TFTP failed[/red]") - continue + + if detected_flash == "nand": + erase_size = ((len(data) + 0x1FFFF) // 0x20000) * 0x20000 + await _send(f"nand erase 0x{offset:x} 0x{erase_size:x}", timeout=120) + await _send(f"nand write 0x{ram_addr:x} 0x{offset:x} 0x{write_size:x}", timeout=120) + elif detected_flash == "nor": + erase_size = ((len(data) + 0xFFFF) // 0x10000) * 0x10000 + await _send(f"sf erase 0x{offset:x} 0x{erase_size:x}", timeout=120) + await _send(f"sf write 0x{ram_addr:x} 0x{offset:x} 0x{len(data):x}", timeout=120) + elif detected_flash == "emmc": + block_off = offset // 512 + block_cnt = (len(data) + 511) // 512 + await _send(f"mmc erase 0x{block_off:x} 0x{block_cnt:x}", timeout=120) + await _send(f"mmc write 0x{ram_addr:x} 0x{block_off:x} 0x{block_cnt:x}", timeout=120) + + elapsed = _time.monotonic() - t0 if output == "human": - console.print(" TFTP OK") - - if detected_flash == "nand": - erase_size = ((len(data) + 0x1FFFF) // 0x20000) * 0x20000 - await _send(f"nand erase 0x{offset:x} 0x{erase_size:x}", timeout=120) - await _send(f"nand write 0x{ram_addr:x} 0x{offset:x} 0x{write_size:x}", timeout=120) - elif detected_flash == "nor": - erase_size = ((len(data) + 0xFFFF) // 0x10000) * 0x10000 - await _send(f"sf erase 0x{offset:x} 0x{erase_size:x}", timeout=120) - await _send(f"sf write 0x{ram_addr:x} 0x{offset:x} 0x{len(data):x}", timeout=120) - elif detected_flash == "emmc": - block_off = offset // 512 - block_cnt = (len(data) + 511) // 512 - await _send(f"mmc erase 0x{block_off:x} 0x{block_cnt:x}", timeout=120) - await _send(f"mmc write 0x{ram_addr:x} 0x{block_off:x} 0x{block_cnt:x}", timeout=120) - - elapsed = _time.monotonic() - t0 + console.print(f" Written ({elapsed:.1f}s)") + + # Phase 6: Reset if output == "human": - console.print(f" Written ({elapsed:.1f}s)") + console.print("\n Resetting device...") + await _send("reset", timeout=5) - tftp_transport.close() + finally: + await _tftp_stack.__aexit__(None, None, None) - # --- Phase 6: Reset --- - if output == "human": - console.print("\n Resetting device...") - await _send("reset", timeout=5) await transport.close() if power_controller: