Skip to content

Extract connect_esp command for scripting purposes (ESPTOOL-1318)#1163

Open
nebkat wants to merge 2 commits into
espressif:masterfrom
nebkat:feat/connect-esp
Open

Extract connect_esp command for scripting purposes (ESPTOOL-1318)#1163
nebkat wants to merge 2 commits into
espressif:masterfrom
nebkat:feat/connect-esp

Conversation

@nebkat
Copy link
Copy Markdown

@nebkat nebkat commented May 23, 2026

This change extracts the connection logic in the esptool (CLI/Click) specific prepare_esp_object(ctx) function into a reusable connect_esp command.

This greatly simplifies the creation of user facing scripts vs. detect_chip(PORT).


Before
To achieve "auto" connections:

def get_esp(port: str | None, baud: int) -> ESPLoader:
    esp: ESPLoader | None = None
    if port:
        esp = detect_chip(port, baud=baud)
    else:
        ports = get_port_list()
        for port in ports:
            try:
                print(f"Serial port {port} (baud={baud})")
                esp = detect_chip(port, baud=baud)
                break
            except RuntimeError:
                pass
    if not esp:
        raise RuntimeError("No ESP found")
    return run_stub(esp)

... and no support for open_port_attempts and others

...and missing dependencies
# These are not exported from `esptool` so have to be copied
def get_port_list() -> list[str]:
    """Get the list of serial ports names with optional filters.

    For backwards compatibility, this function returns a list of port names.
    """
    return [port.device for port in _get_port_list()]


def _get_port_list() -> list[ListPortInfo]:
    ports = []
    for port in list_ports.comports():
        if sys.platform == "darwin" and port.device.endswith(
            ("Bluetooth-Incoming-Port", "wlan-debug", "cu.debug-console")
        ):
            continue
        ports.append(port)

    # Constants for sorting optimization
    ESPRESSIF_VID = 0x303A
    LINUX_DEVICE_PATTERNS = ("ttyUSB", "ttyACM")
    MACOS_DEVICE_PATTERNS = ("usbserial", "usbmodem")

    def _port_sort_key_linux(port_info: ListPortInfo) -> tuple[int, str]:
        if port_info.vid == ESPRESSIF_VID:
            return (3, port_info.device)

        if any(pattern in port_info.device for pattern in LINUX_DEVICE_PATTERNS):
            return (2, port_info.device)

        return (1, port_info.device)

    def _port_sort_key_macos(port_info: ListPortInfo) -> tuple[int, str]:
        if port_info.vid == ESPRESSIF_VID:
            return (3, port_info.device)

        if any(pattern in port_info.device for pattern in MACOS_DEVICE_PATTERNS):
            return (2, port_info.device)

        return (1, port_info.device)

    def _port_sort_key_windows(port_info: ListPortInfo) -> tuple[int, str]:
        if port_info.vid == ESPRESSIF_VID:
            return (2, port_info.device)

        return (1, port_info.device)

    if sys.platform == "win32":
        key_func = _port_sort_key_windows
    elif sys.platform == "darwin":
        key_func = _port_sort_key_macos
    else:
        key_func = _port_sort_key_linux

    sorted_port_info = sorted(ports, key=key_func)
    return sorted_port_info

After

def get_esp(port: str | None, baud: int) -> ESPLoader:
    esp = connect_esp(port)
    esp.change_baud(baud)
    return run_stub(esp)

I have tested this change with the following hardware & software combinations:

ESP32-S3, custom board

Tests added for command.

I have run the esptool automated integration tests with this change and the above hardware:

pytest test_esptool.py --port /dev/cu.usbmodem101 --chip esp32s3 --baud 115200
============================================================================================================================ test session starts =============================================================================================================================
platform darwin -- Python 3.13.13, pytest-9.0.3, pluggy-1.6.0
rootdir: /REDACTED/esptool
configfile: pyproject.toml
plugins: rerunfailures-16.3
collected 121 items

test_esptool.py sssss.s......sss...................s.s..ss.s............s.s.....ss..............s...sssssss.....FF...s...ssss............ [100%]

======================================================= FAILURES =======================================================
____________________________________ TestVirtualPort.test_auto_detect_virtual_port _____________________________________

self = <test_esptool.TestVirtualPort object at 0x106e84410>

    def test_auto_detect_virtual_port(self):
        with ESPRFC2217Server() as server:
>           output = self.run_esptool(
                "chip-id",
                chip="auto",
                port=f"rfc2217://127.0.0.1:{str(server.port)}?ign_set_control",
            )

test_esptool.py:1952:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test_esptool.py:262: in run_esptool
    output = run_esptool_process(full_cmd)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
test_esptool.py:208: in run_esptool_process
    raise e
test_esptool.py:200: in run_esptool_process
    output = subprocess.check_output(
/opt/homebrew/Cellar/python@3.13/3.13.13_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/subprocess.py:472: in check_output
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

input = None, capture_output = False, timeout = None, check = True
popenargs = (['/REDACTED/esptool/.venv/bin/python3.13', '-m', 'esptool', '--port', 'rfc2217://127.0.0.1:62352?ign_set_control', '--baud', ...],)
kwargs = {'cwd': '/REDACTED/esptool/test', 'stderr': -2, 'stdout': -1}
process = <Popen: returncode: 1 args: ['/REDACTED/esptool/.venv/bin/pyth...>
stdout = b'esptool v5.2.0\nSerial port rfc2217://127.0.0.1:62352?ign_set_control:\nConnecting...\nDevice VID/PID identification.../ serial ports.\n\nDevice VID/PID identification is only supported on COM and /dev/ serial ports.\n.................\n'
stderr = None, retcode = 1

    def run(*popenargs,
            input=None, capture_output=False, timeout=None, check=False, **kwargs):
        """Run command with arguments and return a CompletedProcess instance.

        The returned instance will have attributes args, returncode, stdout and
        stderr. By default, stdout and stderr are not captured, and those attributes
        will be None. Pass stdout=PIPE and/or stderr=PIPE in order to capture them,
        or pass capture_output=True to capture both.

        If check is True and the exit code was non-zero, it raises a
        CalledProcessError. The CalledProcessError object will have the return code
        in the returncode attribute, and output & stderr attributes if those streams
        were captured.

        If timeout (seconds) is given and the process takes too long,
         a TimeoutExpired exception will be raised.

        There is an optional argument "input", allowing you to
        pass bytes or a string to the subprocess's stdin.  If you use this argument
        you may not also use the Popen constructor's "stdin" argument, as
        it will be used internally.

        By default, all communication is in bytes, and therefore any "input" should
        be bytes, and the stdout and stderr will be bytes. If in text mode, any
        "input" should be a string, and stdout and stderr will be strings decoded
        according to locale encoding, or by "encoding" if set. Text mode is
        triggered by setting any of text, encoding, errors or universal_newlines.

        The other arguments are the same as for the Popen constructor.
        """
        if input is not None:
            if kwargs.get('stdin') is not None:
                raise ValueError('stdin and input arguments may not both be used.')
            kwargs['stdin'] = PIPE

        if capture_output:
            if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None:
                raise ValueError('stdout and stderr arguments may not be used '
                                 'with capture_output.')
            kwargs['stdout'] = PIPE
            kwargs['stderr'] = PIPE

        with Popen(*popenargs, **kwargs) as process:
            try:
                stdout, stderr = process.communicate(input, timeout=timeout)
            except TimeoutExpired as exc:
                process.kill()
                if _mswindows:
                    # Windows accumulates the output in a single blocking
                    # read() call run on child threads, with the timeout
                    # being done in a join() on those threads.  communicate()
                    # _after_ kill() is required to collect that and add it
                    # to the exception.
                    exc.stdout, exc.stderr = process.communicate()
                else:
                    # POSIX _communicate already populated the output so
                    # far into the TimeoutExpired exception.
                    process.wait()
                raise
            except:  # Including KeyboardInterrupt, communicate handled that.
                process.kill()
                # We don't call process.wait() as .__exit__ does that for us.
                raise
            retcode = process.poll()
            if check and retcode:
>               raise CalledProcessError(retcode, process.args,
                                         output=stdout, stderr=stderr)
E               subprocess.CalledProcessError: Command '['/REDACTED/esptool/.venv/bin/python3.13', '-m', 'esptool', '--port', 'rfc2217://127.0.0.1:62352?ign_set_control', '--baud', '115200', 'chip-id']' returned non-zero exit status 1.

/opt/homebrew/Cellar/python@3.13/3.13.13_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/subprocess.py:577: CalledProcessError
------------------------------------------------- Captured stdout call -------------------------------------------------
Server started successfully.

Running the "chip-id" command...
Executing /REDACTED/esptool/.venv/bin/python3.13 -m esptool --port rfc2217://127.0.0.1:62352?ign_set_control --baud 115200 chip-id...
esptool v5.2.0
Serial port rfc2217://127.0.0.1:62352?ign_set_control:
Connecting...
Device VID/PID identification is only supported on COM and /dev/ serial ports.

Device VID/PID identification is only supported on COM and /dev/ serial ports.
.................

__________________________________ TestVirtualPort.test_highspeed_flash_virtual_port ___________________________________

self = <test_esptool.TestVirtualPort object at 0x106e84550>

    def test_highspeed_flash_virtual_port(self):
        with ESPRFC2217Server() as server:
            rfc2217_port = f"rfc2217://127.0.0.1:{str(server.port)}?ign_set_control"
>           self.run_esptool(
                "write-flash 0x0 images/fifty_kb.bin",
                baud=921600,
                port=rfc2217_port,
            )

test_esptool.py:1962:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test_esptool.py:262: in run_esptool
    output = run_esptool_process(full_cmd)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
test_esptool.py:208: in run_esptool_process
    raise e
test_esptool.py:200: in run_esptool_process
    output = subprocess.check_output(
/opt/homebrew/Cellar/python@3.13/3.13.13_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/subprocess.py:472: in check_output
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

input = None, capture_output = False, timeout = None, check = True
popenargs = (['/REDACTED/esptool/.venv/bin/python3.13', '-m', 'esptool', '--chip', 'esp32s3', '--port', ...],)
kwargs = {'cwd': '/REDACTED/esptool/test', 'stderr': -2, 'stdout': -1}
process = <Popen: returncode: 1 args: ['/REDACTED/esptool/.venv/bin/pyth...>
stdout = b'esptool v5.2.0\nSerial port rfc2217://127.0.0.1:62474?ign_set_control:\nConnecting...\nDevice VID/PID identification...elf._socket.sendall(data)\n    ~~~~~~~~~~~~~~~~~~~~^^^^^^\nConnectionResetError: [Errno 54] Connection reset by peer\n'
stderr = None, retcode = 1

    def run(*popenargs,
            input=None, capture_output=False, timeout=None, check=False, **kwargs):
        """Run command with arguments and return a CompletedProcess instance.

        The returned instance will have attributes args, returncode, stdout and
        stderr. By default, stdout and stderr are not captured, and those attributes
        will be None. Pass stdout=PIPE and/or stderr=PIPE in order to capture them,
        or pass capture_output=True to capture both.

        If check is True and the exit code was non-zero, it raises a
        CalledProcessError. The CalledProcessError object will have the return code
        in the returncode attribute, and output & stderr attributes if those streams
        were captured.

        If timeout (seconds) is given and the process takes too long,
         a TimeoutExpired exception will be raised.

        There is an optional argument "input", allowing you to
        pass bytes or a string to the subprocess's stdin.  If you use this argument
        you may not also use the Popen constructor's "stdin" argument, as
        it will be used internally.

        By default, all communication is in bytes, and therefore any "input" should
        be bytes, and the stdout and stderr will be bytes. If in text mode, any
        "input" should be a string, and stdout and stderr will be strings decoded
        according to locale encoding, or by "encoding" if set. Text mode is
        triggered by setting any of text, encoding, errors or universal_newlines.

        The other arguments are the same as for the Popen constructor.
        """
        if input is not None:
            if kwargs.get('stdin') is not None:
                raise ValueError('stdin and input arguments may not both be used.')
            kwargs['stdin'] = PIPE

        if capture_output:
            if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None:
                raise ValueError('stdout and stderr arguments may not be used '
                                 'with capture_output.')
            kwargs['stdout'] = PIPE
            kwargs['stderr'] = PIPE

        with Popen(*popenargs, **kwargs) as process:
            try:
                stdout, stderr = process.communicate(input, timeout=timeout)
            except TimeoutExpired as exc:
                process.kill()
                if _mswindows:
                    # Windows accumulates the output in a single blocking
                    # read() call run on child threads, with the timeout
                    # being done in a join() on those threads.  communicate()
                    # _after_ kill() is required to collect that and add it
                    # to the exception.
                    exc.stdout, exc.stderr = process.communicate()
                else:
                    # POSIX _communicate already populated the output so
                    # far into the TimeoutExpired exception.
                    process.wait()
                raise
            except:  # Including KeyboardInterrupt, communicate handled that.
                process.kill()
                # We don't call process.wait() as .__exit__ does that for us.
                raise
            retcode = process.poll()
            if check and retcode:
>               raise CalledProcessError(retcode, process.args,
                                         output=stdout, stderr=stderr)
E               subprocess.CalledProcessError: Command '['/REDACTED/esptool/.venv/bin/python3.13', '-m', 'esptool', '--chip', 'esp32s3', '--port', 'rfc2217://127.0.0.1:62474?ign_set_control', '--baud', '921600', 'write-flash', '0x0', 'images/fifty_kb.bin']' returned non-zero exit status 1.

/opt/homebrew/Cellar/python@3.13/3.13.13_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/subprocess.py:577: CalledProcessError
------------------------------------------------- Captured stdout call -------------------------------------------------
Server started successfully.

Running the "write-flash 0x0 images/fifty_kb.bin" command...
Executing /REDACTED/esptool/.venv/bin/python3.13 -m esptool --chip esp32s3 --port rfc2217://127.0.0.1:62474?ign_set_control --baud 921600 write-flash 0x0 images/fifty_kb.bin...
esptool v5.2.0
Serial port rfc2217://127.0.0.1:62474?ign_set_control:
Connecting...
Device VID/PID identification is only supported on COM and /dev/ serial ports.

Device VID/PID identification is only supported on COM and /dev/ serial ports.
.............................
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/REDACTED/esptool/esptool/__main__.py", line 9, in <module>
    esptool._main()
    ~~~~~~~~~~~~~^^
  File "/REDACTED/esptool/esptool/__init__.py", line 1265, in _main
    main()
    ~~~~^^
  File "/REDACTED/esptool/esptool/__init__.py", line 1233, in main
    cli(args=args, esp=esp)
    ~~~^^^^^^^^^^^^^^^^^^^^
  File "/REDACTED/esptool/esptool/cli_util.py", line 346, in __call__
    return super().__call__(*args, **kwargs)
           ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/rich_click/rich_command.py", line 402, in __call__
    return super().__call__(*args, **kwargs)
           ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/click/core.py", line 1485, in __call__
    return self.main(*args, **kwargs)
           ~~~~~~~~~^^^^^^^^^^^^^^^^^
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/rich_click/rich_command.py", line 216, in main
    rv = self.invoke(ctx)
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/click/core.py", line 1873, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
                           ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/click/core.py", line 1269, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/click/core.py", line 824, in invoke
    return callback(*args, **kwargs)
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/click/decorators.py", line 34, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/REDACTED/esptool/esptool/__init__.py", line 799, in write_flash_cli
    prepare_esp_object(ctx)
    ~~~~~~~~~~~~~~~~~~^^^^^
  File "/REDACTED/esptool/esptool/__init__.py", line 523, in prepare_esp_object
    esp = connect_esp(
        port=ctx.obj["port"],
    ...<6 lines>...
        open_port_attempts=open_port_attempts,
    )
  File "/REDACTED/esptool/esptool/cmds.py", line 326, in connect_esp
    esp = esp or _connect_first_available(
                 ~~~~~~~~~~~~~~~~~~~~~~~~^
        ser_list,
        ^^^^^^^^^
    ...<5 lines>...
        before=before,
        ^^^^^^^^^^^^^^
    )
    ^
  File "/REDACTED/esptool/esptool/cmds.py", line 287, in _connect_first_available
    esp.connect(before, connect_attempts)
    ~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/REDACTED/esptool/esptool/loader.py", line 870, in connect
    last_error = self._connect_attempt(reset_strategy, mode)
  File "/REDACTED/esptool/esptool/loader.py", line 763, in _connect_attempt
    self.sync()
    ~~~~~~~~~^^
  File "/REDACTED/esptool/esptool/loader.py", line 675, in sync
    val, _ = self.command(
             ~~~~~~~~~~~~^
        self.ESP_CMDS["SYNC"],
        ^^^^^^^^^^^^^^^^^^^^^^
        b"\x07\x07\x12\x20" + 32 * b"\x55",
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        timeout=SYNC_TIMEOUT,
        ^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/REDACTED/esptool/esptool/loader.py", line 550, in command
    self._port.timeout = new_timeout
    ^^^^^^^^^^^^^^^^^^
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/serial/serialutil.py", line 372, in timeout
    self._reconfigure_port()
    ~~~~~~~~~~~~~~~~~~~~~~^^
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/serial/rfc2217.py", line 516, in _reconfigure_port
    self._rfc2217_port_settings['datasize'].set(struct.pack(b'!B', self._bytesize))
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/serial/rfc2217.py", line 335, in set
    self.connection.rfc2217_send_subnegotiation(self.option, self.value)
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/serial/rfc2217.py", line 867, in rfc2217_send_subnegotiation
    self._internal_raw_write(IAC + SB + COM_PORT_OPTION + option + value + IAC + SE)
    ~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/REDACTED/esptool/.venv/lib/python3.13/site-packages/serial/rfc2217.py", line 858, in _internal_raw_write
    self._socket.sendall(data)
    ~~~~~~~~~~~~~~~~~~~~^^^^^^
ConnectionResetError: [Errno 54] Connection reset by peer

=================================================== warnings summary ===================================================
../espefuse/cli_util.py:8
  /REDACTED/esptool/espefuse/cli_util.py:8: DeprecationWarning: 'parser.OptionParser' is deprecated and will be removed in Click 9.0. The old parser is available in 'optparse'.
    from click.parser import OptionParser, ParsingState, _unpack_args

../espefuse/cli_util.py:8
  /REDACTED/esptool/espefuse/cli_util.py:8: DeprecationWarning: 'parser.ParsingState' is deprecated and will be removed in Click 9.0. The old parser is available in 'optparse'.
    from click.parser import OptionParser, ParsingState, _unpack_args

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
=============================================== short test summary info ================================================
FAILED test_esptool.py::TestVirtualPort::test_auto_detect_virtual_port - subprocess.CalledProcessError: Command '['/REDACTED/esptool/.venv/bin/python3.13', '-m', 'esptool', '...
FAILED test_esptool.py::TestVirtualPort::test_highspeed_flash_virtual_port - subprocess.CalledProcessError: Command '['/REDACTED/esptool/.venv/bin/python3.13', '-m', 'esptool', '...
=========================== 2 failed, 88 passed, 31 skipped, 2 warnings in 234.13s (0:03:54) ===========================

@nebkat nebkat force-pushed the feat/connect-esp branch 2 times, most recently from f01a554 to b6d0fdc Compare May 23, 2026 16:19
@github-actions github-actions Bot changed the title Extract connect_esp command for scripting purposes Extract connect_esp command for scripting purposes (ESPTOOL-1318) May 23, 2026
nebkat added 2 commits May 23, 2026 21:49
Move get_port_list, _get_port_list, and parse_port_filters out of
cli_util.py (which is the Click-types layer) into a new port_util.py.
Creates a new exported connect_esp command that provides the same
functionality as the main CLI - detecting ports, retry logic, etc.
@nebkat nebkat force-pushed the feat/connect-esp branch from b6d0fdc to 77624ef Compare May 23, 2026 19:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant