Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 38 additions & 26 deletions src/defib/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
def burn(
chip: str = typer.Option(..., "-c", "--chip", help="Chip model name"),
file: str = typer.Option("", "-f", "--file", help="Firmware file (auto-downloads from OpenIPC if omitted)"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
send_break: bool = typer.Option(False, "-b", "--break", help="Send Ctrl-C after upload"),
terminal: bool = typer.Option(False, "-t", "--terminal", help="Open serial terminal after upload"),
power_cycle: bool = typer.Option(False, "--power-cycle", help="Auto power-cycle via PoE (needs DEFIB_POE_* env vars)"),
Expand Down Expand Up @@ -441,7 +441,7 @@ def ports(

@app.command()
def detect(
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
output: str = typer.Option("human", "--output", help="Output mode: human, json"),
timeout: float = typer.Option(25.0, "--timeout", help="Detection timeout in seconds"),
) -> None:
Expand Down Expand Up @@ -508,7 +508,7 @@ async def _detect_async(port: str, output: str, timeout: float) -> None:

@app.command()
def capture(
port: str = typer.Option(..., "-p", "--port", help="Serial port"),
port: str = typer.Option(..., "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
output_file: str = typer.Option(..., "-o", "--output", help="Output .dcap file"),
chip: str = typer.Option("", "-c", "--chip", help="Chip name (metadata only)"),
duration: float = typer.Option(60.0, "--duration", help="Max capture duration in seconds"),
Expand Down Expand Up @@ -629,7 +629,7 @@ def replay(

@app.command("dump-flash")
def dump_flash_cmd(
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
output_file: str = typer.Option("flash_dump.bin", "-o", "--output", help="Output binary file"),
size: str = typer.Option("", "--size", help="Flash size (e.g., 8MB, 16MB) — auto-detect if empty"),
output: str = typer.Option("human", "--output-mode", help="Output mode: human, json"),
Expand Down Expand Up @@ -903,7 +903,7 @@ def list_interfaces_cmd(
@agent_app.command("upload")
def agent_upload(
chip: str = typer.Option(..., "-c", "--chip", help="Chip model name"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
output: str = typer.Option("human", "--output", help="Output mode: human, json"),
) -> None:
"""Upload flash agent to device via boot protocol (requires power-cycle)."""
Expand All @@ -921,7 +921,9 @@ async def _agent_upload_async(chip: str, port: str, output: str) -> None:
from defib.profiles.loader import load_profile
from defib.protocol.hisilicon_standard import HiSiliconStandard
from defib.recovery.events import ProgressEvent
from defib.transport.serial import SerialTransport
from defib.transport.serial_platform import (
create_transport, normalize_port_name,
)

console = Console()

Expand Down Expand Up @@ -957,7 +959,7 @@ async def _agent_upload_async(chip: str, port: str, output: str) -> None:
console.print(f"SPL: full U-Boot ({len(spl_data)} bytes — boundary auto-detected)")
console.print("\n[yellow]Power-cycle the camera now![/yellow]\n")

transport = await SerialTransport.create(port)
transport = await create_transport(normalize_port_name(port))
protocol = HiSiliconStandard()
protocol.set_profile(profile)

Expand Down Expand Up @@ -996,7 +998,7 @@ def on_progress(e: ProgressEvent) -> None:
import asyncio as aio
await transport.close()
await aio.sleep(2)
transport = await SerialTransport.create(port)
transport = await create_transport(normalize_port_name(port))

client = FlashAgentClient(transport, chip)
if await client.connect(timeout=10.0):
Expand All @@ -1021,7 +1023,7 @@ def on_progress(e: ProgressEvent) -> None:
def agent_flash(
chip: str = typer.Option(..., "-c", "--chip", help="Chip model name"),
input_file: str = typer.Option(..., "-i", "--input", help="Firmware binary file"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
verify: bool = typer.Option(True, "--verify/--no-verify", help="CRC32 verify after write"),
reboot: bool = typer.Option(True, "--reboot/--no-reboot", help="Reboot after flash"),
output: str = typer.Option("human", "--output", help="Output mode: human, json"),
Expand Down Expand Up @@ -1052,7 +1054,9 @@ async def _agent_flash_async(
from defib.profiles.loader import load_profile
from defib.protocol.hisilicon_standard import HiSiliconStandard
from defib.recovery.events import ProgressEvent
from defib.transport.serial import SerialTransport
from defib.transport.serial_platform import (
create_transport, normalize_port_name,
)

console = Console()
FLASH_MEM = 0x14000000
Expand Down Expand Up @@ -1101,7 +1105,7 @@ async def _agent_flash_async(
console.print("\n[yellow]Power-cycle the camera now![/yellow]\n")

# --- Phase 1: Upload agent via boot protocol ---
transport = await SerialTransport.create(port)
transport = await create_transport(normalize_port_name(port))
protocol = HiSiliconStandard()
protocol.set_profile(profile)

Expand Down Expand Up @@ -1139,7 +1143,7 @@ def on_boot_progress(e: ProgressEvent) -> None:
import asyncio as aio
await transport.close()
await aio.sleep(2)
transport = await SerialTransport.create(port)
transport = await create_transport(normalize_port_name(port))

client = FlashAgentClient(transport, chip)
if not await client.connect(timeout=10.0):
Expand Down Expand Up @@ -1242,7 +1246,7 @@ def on_flash_progress(done: int, total: int) -> None:

@agent_app.command("info")
def agent_info(
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
output: str = typer.Option("human", "--output", help="Output mode: human, json"),
) -> None:
"""Query info from a running flash agent."""
Expand All @@ -1256,10 +1260,12 @@ async def _agent_info_async(port: str, output: str) -> None:
from rich.console import Console

from defib.agent.client import FlashAgentClient
from defib.transport.serial import SerialTransport
from defib.transport.serial_platform import (
create_transport, normalize_port_name,
)

console = Console()
transport = await SerialTransport.create(port)
transport = await create_transport(normalize_port_name(port))
client = FlashAgentClient(transport)

if not await client.connect(timeout=5.0):
Expand Down Expand Up @@ -1299,7 +1305,7 @@ async def _agent_info_async(port: str, output: str) -> None:

@agent_app.command("read")
def agent_read(
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
addr: str = typer.Option(None, "-a", "--addr", help="Start address (hex, default: flash base 0x14000000)"),
size: str = typer.Option(None, "-s", "--size", help="Size in bytes (or 1KB, 16MB, etc; default: auto-detect)"),
output_file: str = typer.Option("flash_dump.bin", "-o", "--output", help="Output binary file"),
Expand All @@ -1321,11 +1327,13 @@ async def _agent_read_async(
from rich.console import Console

from defib.agent.client import FlashAgentClient
from defib.transport.serial import SerialTransport
from defib.transport.serial_platform import (
create_transport, normalize_port_name,
)

console = Console()

transport = await SerialTransport.create(port)
transport = await create_transport(normalize_port_name(port))
client = FlashAgentClient(transport)
if not await client.connect(timeout=5.0):
console.print("[red]Agent not responding[/red]")
Expand Down Expand Up @@ -1379,7 +1387,7 @@ async def _agent_read_async(

@agent_app.command("write")
def agent_write(
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
addr: str = typer.Option("0x14000000", "-a", "--addr", help="Start address (hex, default: flash base)"),
input_file: str = typer.Option(..., "-i", "--input", help="Input binary file"),
verify: bool = typer.Option(True, "--verify/--no-verify", help="CRC32 verify after write"),
Expand All @@ -1399,13 +1407,15 @@ async def _agent_write_async(
from rich.console import Console

from defib.agent.client import FlashAgentClient
from defib.transport.serial import SerialTransport
from defib.transport.serial_platform import (
create_transport, normalize_port_name,
)

console = Console()
address = int(addr_str, 0)
data = open(input_file, "rb").read()

transport = await SerialTransport.create(port)
transport = await create_transport(normalize_port_name(port))
client = FlashAgentClient(transport)
if not await client.connect(timeout=5.0):
console.print("[red]Agent not responding[/red]")
Expand Down Expand Up @@ -1449,7 +1459,7 @@ async def _agent_write_async(

@agent_app.command("scan")
def agent_scan(
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
output_file: str = typer.Option("", "-o", "--output", help="Save recoverable data to file (bad sectors filled with 0xFF)"),
output: str = typer.Option("human", "--output-mode", help="Output mode: human, json"),
) -> None:
Expand All @@ -1472,10 +1482,12 @@ async def _agent_scan_async(port: str, output_file: str, output: str) -> None:
SectorResult,
SectorStatus,
)
from defib.transport.serial import SerialTransport
from defib.transport.serial_platform import (
create_transport, normalize_port_name,
)

console = Console()
transport = await SerialTransport.create(port)
transport = await create_transport(normalize_port_name(port))
client = FlashAgentClient(transport)

if not await client.connect(timeout=5.0):
Expand Down Expand Up @@ -1658,7 +1670,7 @@ def _parse_size(s: str) -> int:
def install(
chip: str = typer.Option(..., "-c", "--chip", help="Chip model name"),
firmware: str = typer.Option(..., "--firmware", help="OpenIPC firmware tarball (.tgz)"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
power_cycle: bool = typer.Option(False, "--power-cycle", help="Auto power-cycle via PoE"),
poe_port_override: str = typer.Option("", "--poe-port", help="Explicit MikroTik ether port (e.g. ether3) — overrides comment-based auto-discovery. Requires --power-cycle."),
nic: str = typer.Option("", "--nic", help="Network interface for TFTP (auto-detect if empty)"),
Expand Down Expand Up @@ -2339,7 +2351,7 @@ async def tftp_and_flash(
def restore(
chip: str = typer.Option(..., "-c", "--chip", help="Chip model name"),
dump: str = typer.Option(..., "-i", "--input", help="Flash dump file or directory of mtdN files"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial device (/dev/ttyUSB0), tcp://host:port, rfc2217://host:port, or socket:///path"),
uboot: str = typer.Option("", "--uboot", help="U-Boot binary to load (auto-downloads if omitted)"),
flash_type: str = typer.Option("auto", "--flash-type", help="Flash type: auto, nor, nand, emmc"),
mtdparts: str = typer.Option("", "--mtdparts", help="NAND partition layout (e.g. hinand:1M(boot),4M(kernel),8M(rootfs),...)"),
Expand Down
Loading