|
16 | 16 |
|
17 | 17 | from mpflash.common import BootloaderMethod, FlashMethod |
18 | 18 | from mpflash.config import config |
| 19 | +from mpflash.downloaded import find_downloaded_firmware |
19 | 20 | from mpflash.errors import MPFlashError |
20 | 21 | from mpflash.logger import log |
21 | 22 |
|
@@ -52,10 +53,76 @@ def flash_tasks( |
52 | 53 | **kwargs, |
53 | 54 | ): |
54 | 55 | """Flash every entry in ``tasks`` and return the updated boards.""" |
| 56 | + |
| 57 | + def _pick_backend_compatible_firmware(task, fw_info): |
| 58 | + """Pick a firmware image matching the explicit backend's supported formats.""" |
| 59 | + if fw_info is None: |
| 60 | + return None |
| 61 | + |
| 62 | + requested_name = _resolve_backend_name(method) |
| 63 | + if not requested_name: |
| 64 | + return fw_info |
| 65 | + |
| 66 | + backend = get_backend(requested_name) |
| 67 | + if backend is None or not backend.supported_formats: |
| 68 | + return fw_info |
| 69 | + |
| 70 | + current_suffix = Path(fw_info.firmware_file).suffix.lower() |
| 71 | + if current_suffix in backend.supported_formats: |
| 72 | + return fw_info |
| 73 | + |
| 74 | + # Fast path: if a same-stem file with a backend-supported extension |
| 75 | + # exists next to the selected firmware, use it directly. |
| 76 | + selected_path = config.firmware_folder / fw_info.firmware_file |
| 77 | + for suffix in backend.supported_formats: |
| 78 | + sibling = selected_path.with_suffix(suffix) |
| 79 | + if sibling.exists(): |
| 80 | + rel = sibling.relative_to(config.firmware_folder).as_posix() |
| 81 | + log.info( |
| 82 | + f"Using {requested_name} compatible sibling firmware {rel} " |
| 83 | + f"instead of {fw_info.firmware_file} for {task.board.board} on {task.board.serialport}" |
| 84 | + ) |
| 85 | + fw_info.firmware_file = rel |
| 86 | + return fw_info |
| 87 | + |
| 88 | + board = task.board |
| 89 | + detected_board_id = f"{board.board}-{board.variant}" if board.variant else board.board |
| 90 | + board_ids = [getattr(fw_info, "board_id", ""), detected_board_id] |
| 91 | + |
| 92 | + candidates = [] |
| 93 | + seen_files = set() |
| 94 | + for bid in board_ids: |
| 95 | + if not bid: |
| 96 | + continue |
| 97 | + # First prefer exact port match, then broaden to any port. |
| 98 | + for cand in find_downloaded_firmware( |
| 99 | + board_id=bid, |
| 100 | + version=fw_info.version, |
| 101 | + port=board.port, |
| 102 | + custom=bool(fw_info.custom), |
| 103 | + ) + find_downloaded_firmware( |
| 104 | + board_id=bid, |
| 105 | + version=fw_info.version, |
| 106 | + port="", |
| 107 | + custom=bool(fw_info.custom), |
| 108 | + ): |
| 109 | + if cand.firmware_file not in seen_files: |
| 110 | + seen_files.add(cand.firmware_file) |
| 111 | + candidates.append(cand) |
| 112 | + |
| 113 | + for cand in reversed(candidates): |
| 114 | + if Path(cand.firmware_file).suffix.lower() in backend.supported_formats: |
| 115 | + log.info( |
| 116 | + f"Using {requested_name} compatible firmware {cand.firmware_file} " |
| 117 | + f"instead of {fw_info.firmware_file} for {board.board} on {board.serialport}" |
| 118 | + ) |
| 119 | + return cand |
| 120 | + return fw_info |
| 121 | + |
55 | 122 | flashed = [] |
56 | 123 | for task in tasks: |
57 | 124 | mcu = task.board |
58 | | - fw_info = task.firmware |
| 125 | + fw_info = _pick_backend_compatible_firmware(task, task.firmware) |
59 | 126 | if not fw_info: |
60 | 127 | log.error(f"Firmware not found for {mcu.board} on {mcu.serialport}, skipping") |
61 | 128 | continue |
|
0 commit comments