From 93256a93900e09ff1246806871bf7144fe1e3a06 Mon Sep 17 00:00:00 2001 From: d42me Date: Mon, 1 Jun 2026 18:08:54 -0700 Subject: [PATCH] fix: support tunnel runtime on Windows --- .../prime-tunnel/src/prime_tunnel/binary.py | 66 +++++++--- .../prime-tunnel/src/prime_tunnel/tunnel.py | 113 ++++++++---------- packages/prime-tunnel/tests/test_binary.py | 74 ++++++++++++ packages/prime-tunnel/tests/test_tunnel.py | 16 +++ 4 files changed, 188 insertions(+), 81 deletions(-) create mode 100644 packages/prime-tunnel/tests/test_binary.py diff --git a/packages/prime-tunnel/src/prime_tunnel/binary.py b/packages/prime-tunnel/src/prime_tunnel/binary.py index 6910734bc..0e19ada5a 100644 --- a/packages/prime-tunnel/src/prime_tunnel/binary.py +++ b/packages/prime-tunnel/src/prime_tunnel/binary.py @@ -5,6 +5,7 @@ import stat import tarfile import tempfile +import zipfile from pathlib import Path import httpx @@ -19,6 +20,8 @@ ("Darwin", "x86_64"): "9558d55a9d8bc40e22018379ea645251f803f9e2d69e7a7a2fd1588f98f8ef43", ("Linux", "x86_64"): "317a17a7adac2e6bed2d7a83dc077da91ced0d110e1636373ece8ae5ac8b578b", ("Linux", "aarch64"): "196ddaa51b716c2e99aeb2916b0a2bf55bb317494c4acdcefab36c383de950ba", + ("Windows", "x86_64"): "3e2925b65a85938b936ea85072657c6c8e62b095c233e739da3eb5615b25ca55", + ("Windows", "arm64"): "dfd112469c91e6fa05274dc4929725b062b176b103463196908d24c7888e54b8", } FRPC_URLS = { @@ -38,6 +41,14 @@ "Linux", "aarch64", ): f"https://github.com/fatedier/frp/releases/download/v{FRPC_VERSION}/frp_{FRPC_VERSION}_linux_arm64.tar.gz", + ( + "Windows", + "x86_64", + ): f"https://github.com/fatedier/frp/releases/download/v{FRPC_VERSION}/frp_{FRPC_VERSION}_windows_amd64.zip", + ( + "Windows", + "arm64", + ): f"https://github.com/fatedier/frp/releases/download/v{FRPC_VERSION}/frp_{FRPC_VERSION}_windows_arm64.zip", } @@ -47,12 +58,16 @@ def _get_platform_key() -> tuple[str, str]: if machine in ("AMD64", "x86_64"): machine = "x86_64" - elif machine in ("arm64", "aarch64"): - machine = "arm64" if system == "Darwin" else "aarch64" + elif machine in ("ARM64", "arm64", "aarch64"): + machine = "arm64" if system in ("Darwin", "Windows") else "aarch64" return (system, machine) +def _frpc_binary_name(platform_key: tuple[str, str]) -> str: + return "frpc.exe" if platform_key[0] == "Windows" else "frpc" + + def _verify_checksum(file_path: Path, expected_checksum: str) -> None: """Verify SHA256 checksum of downloaded file.""" sha256 = hashlib.sha256() @@ -80,7 +95,7 @@ def _download_frpc(dest: Path) -> None: with tempfile.TemporaryDirectory() as tmpdir: tmpdir_path = Path(tmpdir) - archive_path = tmpdir_path / "frp.tar.gz" + archive_path = tmpdir_path / ("frp.zip" if url.endswith(".zip") else "frp.tar.gz") try: with httpx.stream("GET", url, follow_redirects=True, timeout=120.0) as response: @@ -94,20 +109,35 @@ def _download_frpc(dest: Path) -> None: _verify_checksum(archive_path, expected_checksum) - try: - with tarfile.open(archive_path, "r:gz") as tar: - for member in tar.getmembers(): - if member.name.endswith("/frpc") or member.name == "frpc": - member.name = "frpc" - tar.extract(member, tmpdir_path) - break - else: - raise BinaryDownloadError("frpc binary not found in archive") - - except tarfile.TarError as e: - raise BinaryDownloadError(f"Failed to extract frpc: {e}") from e - - extracted_path = tmpdir_path / "frpc" + binary_name = _frpc_binary_name(platform_key) + extracted_path = tmpdir_path / binary_name + if archive_path.suffix == ".zip": + try: + with zipfile.ZipFile(archive_path) as archive: + for member_name in archive.namelist(): + if Path(member_name).name == binary_name: + with archive.open(member_name) as source: + with open(extracted_path, "wb") as target: + shutil.copyfileobj(source, target) + break + else: + raise BinaryDownloadError("frpc binary not found in archive") + except zipfile.BadZipFile as e: + raise BinaryDownloadError(f"Failed to extract frpc: {e}") from e + else: + try: + with tarfile.open(archive_path, "r:gz") as tar: + for member in tar.getmembers(): + if Path(member.name).name == binary_name: + member.name = binary_name + tar.extract(member, tmpdir_path) + break + else: + raise BinaryDownloadError("frpc binary not found in archive") + + except tarfile.TarError as e: + raise BinaryDownloadError(f"Failed to extract frpc: {e}") from e + if not extracted_path.exists(): raise BinaryDownloadError("frpc binary not found after extraction") @@ -132,7 +162,7 @@ def _download_frpc(dest: Path) -> None: def get_frpc_path() -> Path: config = Config() - frpc_path = config.bin_dir / "frpc" + frpc_path = config.bin_dir / _frpc_binary_name(_get_platform_key()) version_file = config.bin_dir / ".frpc_version" if frpc_path.exists(): diff --git a/packages/prime-tunnel/src/prime_tunnel/tunnel.py b/packages/prime-tunnel/src/prime_tunnel/tunnel.py index 1535800f2..526f3f6a3 100644 --- a/packages/prime-tunnel/src/prime_tunnel/tunnel.py +++ b/packages/prime-tunnel/src/prime_tunnel/tunnel.py @@ -1,5 +1,4 @@ import asyncio -import fcntl import os import re import subprocess @@ -95,6 +94,7 @@ def __init__( self._config_file: Optional[Path] = None self._started = False self._output_lines: list[str] = [] + self._capture_startup_output = False @property def tunnel_id(self) -> Optional[str]: @@ -187,22 +187,27 @@ async def start(self) -> str: raise raise TunnelConnectionError(message=f"Failed to start frpc: {e}") from e - # 5. Wait for connection - try: - await self._wait_for_connection() - except BaseException: - await self._cleanup() - raise - - # 6. Start background thread to drain pipes (prevents buffer exhaustion) + # 5. Start background threads to drain pipes (prevents buffer exhaustion) + self._output_lines = [] + self._capture_startup_output = True try: self._start_pipe_drain() except BaseException as e: + self._capture_startup_output = False await self._cleanup() if isinstance(e, asyncio.CancelledError): raise raise TunnelConnectionError(message=f"Failed to start pipe drain: {e}") from e + # 6. Wait for connection + try: + await self._wait_for_connection() + except BaseException: + self._capture_startup_output = False + await self._cleanup() + raise + + self._capture_startup_output = False self._started = True return self.url @@ -220,6 +225,8 @@ def sync_stop(self) -> None: if not self._started: return + self._capture_startup_output = False + if self._process is not None: try: self._process.terminate() @@ -258,6 +265,8 @@ def sync_stop(self) -> None: async def _cleanup(self) -> None: """Clean up tunnel resources.""" + self._capture_startup_output = False + # Stop frpc process (this will cause drain threads to exit via EOF) if self._process is not None: try: @@ -334,6 +343,8 @@ def drain_pipe(pipe): self._recent_output.append(line) if len(self._recent_output) > max_lines: self._recent_output.pop(0) + if self._capture_startup_output: + self._output_lines.append(line) except (OSError, ValueError): pass # Pipe closed @@ -403,7 +414,18 @@ def _write_frpc_config(self) -> Path: async def _wait_for_connection(self) -> None: """Wait for frpc to establish connection.""" start_time = time.time() - self._output_lines = [] + checked_lines = 0 + + if not hasattr(self, "_output_lock"): + self._output_lines = [] + self._capture_startup_output = True + self._start_pipe_drain() + + def startup_output() -> list[str]: + if not hasattr(self, "_output_lock"): + return list(self._output_lines) + with self._output_lock: + return list(self._output_lines) while time.time() - start_time < self.connection_timeout: if self._process is None: @@ -411,63 +433,28 @@ async def _wait_for_connection(self) -> None: return_code = self._process.poll() if return_code is not None: - remaining_output = [] - if self._process.stdout: - remaining_output.extend(self._process.stdout.readlines()) - if self._process.stderr: - remaining_output.extend(self._process.stderr.readlines()) - self._output_lines.extend(line.strip() for line in remaining_output if line.strip()) - - raise _parse_frpc_error(self._output_lines, self.tunnel_id, return_code) - - if os.name == "posix": - # Set both pipes to non-blocking mode to drain them without deadlock - pipes_to_drain = [] - original_flags = {} - - for pipe in (self._process.stdout, self._process.stderr): - if pipe: - fd = pipe.fileno() - fl = fcntl.fcntl(fd, fcntl.F_GETFL) - original_flags[fd] = fl - fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - pipes_to_drain.append(pipe) - - try: - # Drain both stdout and stderr to prevent buffer exhaustion - for pipe in pipes_to_drain: - try: - while True: - line = pipe.readline() - if not line: - break - line = line.strip() - if line: - self._output_lines.append(line) - # Check for success/failure indicators - if "start proxy success" in line.lower(): - return - if ( - "login to the server failed" in line.lower() - or "connect to server error" in line.lower() - ): - raise _parse_frpc_error(self._output_lines, self.tunnel_id) - except (BlockingIOError, IOError): - pass # No more data available on this pipe - finally: - # Restore original flags - for fd, fl in original_flags.items(): - try: - fcntl.fcntl(fd, fcntl.F_SETFL, fl) - except (OSError, ValueError): - pass # Pipe may have closed + if hasattr(self, "_drain_threads"): + for thread in self._drain_threads: + thread.join(timeout=0.5) + raise _parse_frpc_error(startup_output(), self.tunnel_id, return_code) + + lines = startup_output() + for line in lines[checked_lines:]: + line_lower = line.lower() + if "start proxy success" in line_lower: + return + if ( + "login to the server failed" in line_lower + or "connect to server error" in line_lower + ): + raise _parse_frpc_error(lines, self.tunnel_id) + checked_lines = len(lines) await asyncio.sleep(0.1) # Timeout - include any captured output - output_text = ( - "\n".join(self._output_lines) if self._output_lines else "(no output captured)" - ) + lines = startup_output() + output_text = "\n".join(lines) if lines else "(no output captured)" raise TunnelTimeoutError( f"Tunnel connection timed out after {self.connection_timeout}s\n" f"--- frpc output ---\n{output_text}\n-------------------" diff --git a/packages/prime-tunnel/tests/test_binary.py b/packages/prime-tunnel/tests/test_binary.py new file mode 100644 index 000000000..ee533d53a --- /dev/null +++ b/packages/prime-tunnel/tests/test_binary.py @@ -0,0 +1,74 @@ +import hashlib +import io +import zipfile + +from prime_tunnel import binary + + +class _FakeStream: + def __init__(self, data: bytes): + self._data = data + + def __enter__(self): + return self + + def __exit__(self, *_args): + return None + + def raise_for_status(self): + return None + + def iter_bytes(self, chunk_size: int): + for offset in range(0, len(self._data), chunk_size): + yield self._data[offset : offset + chunk_size] + + +def _windows_zip() -> bytes: + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, "w") as archive: + archive.writestr("frp_0.66.0_windows_amd64/frpc.exe", b"fake frpc exe") + return buffer.getvalue() + + +def test_get_platform_key_normalizes_windows_arch(monkeypatch): + monkeypatch.setattr(binary.platform, "system", lambda: "Windows") + monkeypatch.setattr(binary.platform, "machine", lambda: "AMD64") + + assert binary._get_platform_key() == ("Windows", "x86_64") + + monkeypatch.setattr(binary.platform, "machine", lambda: "ARM64") + + assert binary._get_platform_key() == ("Windows", "arm64") + + +def test_get_frpc_path_uses_exe_on_windows(monkeypatch, tmp_path): + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setattr(binary.platform, "system", lambda: "Windows") + monkeypatch.setattr(binary.platform, "machine", lambda: "AMD64") + + def fake_download(path): + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(b"fake frpc exe") + + monkeypatch.setattr(binary, "_download_frpc", fake_download) + + assert binary.get_frpc_path().name == "frpc.exe" + + +def test_download_frpc_extracts_windows_zip(monkeypatch, tmp_path): + archive_data = _windows_zip() + platform_key = ("Windows", "x86_64") + monkeypatch.setattr(binary, "_get_platform_key", lambda: platform_key) + monkeypatch.setitem(binary.FRPC_URLS, platform_key, "https://example.invalid/frp.zip") + monkeypatch.setitem( + binary.FRPC_CHECKSUMS, + platform_key, + hashlib.sha256(archive_data).hexdigest(), + ) + monkeypatch.setattr(binary.httpx, "stream", lambda *_args, **_kwargs: _FakeStream(archive_data)) + + dest = tmp_path / "frpc.exe" + + binary._download_frpc(dest) + + assert dest.read_bytes() == b"fake frpc exe" diff --git a/packages/prime-tunnel/tests/test_tunnel.py b/packages/prime-tunnel/tests/test_tunnel.py index 7f25b87f2..0a4c19ec7 100644 --- a/packages/prime-tunnel/tests/test_tunnel.py +++ b/packages/prime-tunnel/tests/test_tunnel.py @@ -155,6 +155,22 @@ def test_sync_stop_survives_delete_failure(): assert tunnel._tunnel_info is None +@pytest.mark.asyncio +async def test_wait_for_connection_reads_success_from_pipe_drain(): + tunnel = _make_started_tunnel() + tunnel.connection_timeout = 1.0 + tunnel._process.poll.return_value = None + tunnel._process.stdout = iter(["2026-01-01 00:00:00.000 [I] [proxy] start proxy success\n"]) + tunnel._process.stderr = iter([]) + tunnel._output_lines = [] + tunnel._capture_startup_output = True + + tunnel._start_pipe_drain() + await tunnel._wait_for_connection() + + assert "start proxy success" in tunnel.recent_output[0] + + # -- check_registered tests --