Skip to content

Commit 0105f8c

Browse files
committed
refactor: enable calling from Jupyter
Signed-off-by: Jos Verlinde <Jos_Verlinde@hotmail.com>
1 parent 5d05b0a commit 0105f8c

4 files changed

Lines changed: 170 additions & 113 deletions

File tree

mpflash/cli_list.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ def cli_list_mcus(serial: List[str], ignore: List[str], bluetooth: bool, as_json
8282

8383
if progress:
8484
show_mcus(conn_mcus, refresh=False)
85+
reset_connected_mcus(conn_mcus)
86+
return 0 if conn_mcus else 1
87+
88+
def reset_connected_mcus(conn_mcus):
8589
for mcu in conn_mcus:
8690
# reset the board so it can continue to whatever it was running before
8791
if mcu.family == "circuitpython":
@@ -91,4 +95,3 @@ def cli_list_mcus(serial: List[str], ignore: List[str], bluetooth: bool, as_json
9195
continue
9296
else:
9397
mcu.run_command("reset")
94-
return 0 if conn_mcus else 1

mpflash/connected.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def list_mcus(*, ignore: List[str], include: List[str], bluetooth: bool = False)
6868
rp_bar,
6969
TimeElapsedColumn(),
7070
refresh_per_second=1,
71+
transient=True,
7172
) as progress:
7273
tsk_scan = progress.add_task("[green]Scanning", visible=False, total=None)
7374
progress.tasks[tsk_scan].fields["device"] = "..."

mpflash/mpremoteboard/__init__.py

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010
from typing import List, Optional, Union
1111

1212
import serial.tools.list_ports
13+
from rich.progress import track
14+
from tenacity import retry, stop_after_attempt, wait_fixed
15+
1316
from mpflash.errors import MPFlashError
1417
from mpflash.logger import log
1518
from mpflash.mpboard_id.board_id import find_board_id_by_description
1619
from mpflash.mpremoteboard.runner import run
17-
from rich.progress import track
18-
from tenacity import retry, stop_after_attempt, wait_fixed
1920

2021
if sys.version_info >= (3, 11):
2122
import tomllib # type: ignore
@@ -63,7 +64,7 @@ def __init__(
6364
self.build = ""
6465
self.location = location # USB location
6566
self.toml = {}
66-
portinfo = list(serial.tools.list_ports.grep(serialport))
67+
portinfo = list(serial.tools.list_ports.grep(serialport))
6768
if not portinfo or len(portinfo) != 1:
6869
self.vid = 0x00
6970
self.pid = 0x00
@@ -90,8 +91,8 @@ def board_id(self, value: str) -> None:
9091
@property
9192
def board(self) -> str:
9293
_board = self._board_id.split("-")[0]
93-
# Workaround for Pimoroni boards
94-
if not "-" in self._board_id:
94+
# Workaround for Pimoroni boards
95+
if "-" not in self._board_id:
9596
# match with the regex : (.*)(_\d+MB)$
9697
match = re.match(r"(.*)_(\d+MB)$", self._board_id)
9798
if match:
@@ -106,12 +107,12 @@ def board(self, value: str) -> None:
106107
def variant(self) -> str:
107108
_variant = self._board_id.split("-")[1] if "-" in self._board_id else ""
108109
if not _variant:
109-
# Workaround for Pimoroni boards
110+
# Workaround for Pimoroni boards
110111
# match with the regex : (.*)(_\d+MB)$
111112
match = re.match(r"(.*)_(\d+MB)$", self._board_id)
112113
if match:
113114
_variant = match.group(2)
114-
return _variant
115+
return _variant
115116

116117
@variant.setter
117118
def variant(self, value: str) -> None:
@@ -131,7 +132,6 @@ def __str__(self):
131132
def connected_comports(
132133
bluetooth: bool = False, description: bool = False
133134
) -> List[str]:
134-
# TODO: rename to connected_comports
135135
"""
136136
Get a list of connected comports.
137137
@@ -167,7 +167,7 @@ def connected_comports(
167167
return sorted(output)
168168

169169
@retry(stop=stop_after_attempt(RETRIES), wait=wait_fixed(1), reraise=True) # type: ignore ## retry_error_cls=ConnectionError,
170-
def get_mcu_info(self, timeout: int = 2):
170+
def get_mcu_info(self, timeout: Union[int, float] = 2):
171171
"""
172172
Get MCU information from the connected board.
173173
@@ -207,7 +207,9 @@ def get_mcu_info(self, timeout: int = 2):
207207
self.cpu = info["cpu"]
208208
self.arch = info["arch"]
209209
self.mpy = info["mpy"]
210-
self.description = descr = info["description"] if 'description' in info else info["board"]
210+
self.description = descr = (
211+
info["description"] if "description" in info else info["board"]
212+
)
211213
pos = descr.rfind(" with")
212214
short_descr = descr[:pos].strip() if pos != -1 else ""
213215
if info.get("board_id", None):
@@ -297,7 +299,9 @@ def set_board_info_toml(self, timeout: int = 1):
297299
log_errors=False,
298300
)
299301
except Exception as e:
300-
raise MPFlashError(f"Failed to write board_info.toml for {self.serialport}: {e}") from e
302+
raise MPFlashError(
303+
f"Failed to write board_info.toml for {self.serialport}: {e}"
304+
) from e
301305
finally:
302306
# remove the temp file
303307
if toml_path.exists():
@@ -328,7 +332,7 @@ def run_command(
328332
*,
329333
log_errors: bool = True,
330334
no_info: bool = False,
331-
timeout: int = 60,
335+
timeout: Union[int, float] = 60,
332336
resume: Optional[bool] = None,
333337
**kwargs,
334338
):
@@ -344,19 +348,34 @@ def run_command(
344348
Returns:
345349
- bool: True if the command succeeded, False otherwise.
346350
"""
351+
full_cmd = self.build_cmd(cmd, resume=resume)
352+
log.debug(" ".join(full_cmd))
353+
result = run(full_cmd, timeout, log_errors, no_info)
354+
self.connected = result[0] == OK
355+
return result
356+
357+
def build_cmd(
358+
self,
359+
cmd: Union[str, List[str]],
360+
*,
361+
resume: Optional[bool] = None,
362+
auto_connect: bool = True,
363+
) -> List[str]:
364+
"""Construct a full mpremote command with optional connect/resume prefix."""
347365
if isinstance(cmd, str):
348366
cmd = cmd.split(" ")
367+
368+
prefix = self.cmd_prefix(resume) if auto_connect else []
369+
return prefix + cmd
370+
371+
def cmd_prefix(self, resume: bool = True) -> List[str]:
349372
prefix = [sys.executable, "-m", "mpremote"]
350373
if self.serialport:
351374
prefix += ["connect", self.serialport]
352375
# if connected add resume to keep state between commands
353376
if (resume != False) and self.connected or resume:
354377
prefix += ["resume"]
355-
cmd = prefix + cmd
356-
log.debug(" ".join(cmd))
357-
result = run(cmd, timeout, log_errors, no_info, **kwargs)
358-
self.connected = result[0] == OK
359-
return result
378+
return prefix
360379

361380
@retry(stop=stop_after_attempt(RETRIES), wait=wait_fixed(1))
362381
def mip_install(self, name: str) -> bool:
@@ -391,28 +410,18 @@ def wait_for_restart(self, timeout: int = 10):
391410
break
392411

393412
def to_dict(self) -> dict:
394-
"""
395-
Serialize the MPRemoteBoard object to JSON, including all attributes and readable properties.
396-
397-
Returns:
398-
- str: A JSON string representation of the object.
399-
"""
400-
401-
def get_properties(obj):
402-
"""Helper function to get all readable properties."""
403-
return {
404-
name: getattr(obj, name)
405-
for name in dir(obj.__class__)
406-
if isinstance(getattr(obj.__class__, name, None), property)
407-
}
408-
409-
# Combine instance attributes, readable properties, and private attributes
410-
data = {**self.__dict__, **get_properties(self)}
411-
412-
# remove the path and firmware attibutes from the json output as they are always empty
413-
del data["_board_id"] # dup of board_id
414-
del data["connected"]
415-
del data["path"]
416-
del data["firmware"]
417-
418-
return data
413+
"""Return a minimal MCU description with only populated board info fields."""
414+
return {
415+
"serialport": self.serialport,
416+
"version": self.version,
417+
"family": self.family,
418+
"port": self.port,
419+
"board": self.board,
420+
"variant": self.variant,
421+
"board_id": self.board_id,
422+
"build": self.build,
423+
"description": self.description,
424+
"cpu": self.cpu,
425+
"arch": self.arch,
426+
"mpy": self.mpy,
427+
}

0 commit comments

Comments
 (0)