Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 48 additions & 18 deletions packages/prime-tunnel/src/prime_tunnel/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import stat
import tarfile
import tempfile
import zipfile
from pathlib import Path

import httpx
Expand All @@ -19,6 +20,8 @@
("Darwin", "x86_64"): "9558d55a9d8bc40e22018379ea645251f803f9e2d69e7a7a2fd1588f98f8ef43",
("Linux", "x86_64"): "317a17a7adac2e6bed2d7a83dc077da91ced0d110e1636373ece8ae5ac8b578b",
("Linux", "aarch64"): "196ddaa51b716c2e99aeb2916b0a2bf55bb317494c4acdcefab36c383de950ba",
("Windows", "x86_64"): "3e2925b65a85938b936ea85072657c6c8e62b095c233e739da3eb5615b25ca55",
("Windows", "arm64"): "dfd112469c91e6fa05274dc4929725b062b176b103463196908d24c7888e54b8",
}

FRPC_URLS = {
Expand All @@ -38,6 +41,14 @@
"Linux",
"aarch64",
): f"https://github.com/fatedier/frp/releases/download/v{FRPC_VERSION}/frp_{FRPC_VERSION}_linux_arm64.tar.gz",
(
"Windows",
"x86_64",
): f"https://github.com/fatedier/frp/releases/download/v{FRPC_VERSION}/frp_{FRPC_VERSION}_windows_amd64.zip",
(
"Windows",
"arm64",
): f"https://github.com/fatedier/frp/releases/download/v{FRPC_VERSION}/frp_{FRPC_VERSION}_windows_arm64.zip",
}


Expand All @@ -47,12 +58,16 @@ def _get_platform_key() -> tuple[str, str]:

if machine in ("AMD64", "x86_64"):
machine = "x86_64"
elif machine in ("arm64", "aarch64"):
machine = "arm64" if system == "Darwin" else "aarch64"
elif machine in ("ARM64", "arm64", "aarch64"):
machine = "arm64" if system in ("Darwin", "Windows") else "aarch64"

return (system, machine)


def _frpc_binary_name(platform_key: tuple[str, str]) -> str:
return "frpc.exe" if platform_key[0] == "Windows" else "frpc"


def _verify_checksum(file_path: Path, expected_checksum: str) -> None:
"""Verify SHA256 checksum of downloaded file."""
sha256 = hashlib.sha256()
Expand Down Expand Up @@ -80,7 +95,7 @@ def _download_frpc(dest: Path) -> None:

with tempfile.TemporaryDirectory() as tmpdir:
tmpdir_path = Path(tmpdir)
archive_path = tmpdir_path / "frp.tar.gz"
archive_path = tmpdir_path / ("frp.zip" if url.endswith(".zip") else "frp.tar.gz")

try:
with httpx.stream("GET", url, follow_redirects=True, timeout=120.0) as response:
Expand All @@ -94,20 +109,35 @@ def _download_frpc(dest: Path) -> None:

_verify_checksum(archive_path, expected_checksum)

try:
with tarfile.open(archive_path, "r:gz") as tar:
for member in tar.getmembers():
if member.name.endswith("/frpc") or member.name == "frpc":
member.name = "frpc"
tar.extract(member, tmpdir_path)
break
else:
raise BinaryDownloadError("frpc binary not found in archive")

except tarfile.TarError as e:
raise BinaryDownloadError(f"Failed to extract frpc: {e}") from e

extracted_path = tmpdir_path / "frpc"
binary_name = _frpc_binary_name(platform_key)
extracted_path = tmpdir_path / binary_name
if archive_path.suffix == ".zip":
try:
with zipfile.ZipFile(archive_path) as archive:
for member_name in archive.namelist():
if Path(member_name).name == binary_name:
with archive.open(member_name) as source:
with open(extracted_path, "wb") as target:
shutil.copyfileobj(source, target)
break
else:
raise BinaryDownloadError("frpc binary not found in archive")
except zipfile.BadZipFile as e:
raise BinaryDownloadError(f"Failed to extract frpc: {e}") from e
else:
try:
with tarfile.open(archive_path, "r:gz") as tar:
for member in tar.getmembers():
if Path(member.name).name == binary_name:
member.name = binary_name
tar.extract(member, tmpdir_path)
break
else:
raise BinaryDownloadError("frpc binary not found in archive")

except tarfile.TarError as e:
raise BinaryDownloadError(f"Failed to extract frpc: {e}") from e

if not extracted_path.exists():
raise BinaryDownloadError("frpc binary not found after extraction")

Expand All @@ -132,7 +162,7 @@ def _download_frpc(dest: Path) -> None:

def get_frpc_path() -> Path:
config = Config()
frpc_path = config.bin_dir / "frpc"
frpc_path = config.bin_dir / _frpc_binary_name(_get_platform_key())
version_file = config.bin_dir / ".frpc_version"

if frpc_path.exists():
Expand Down
113 changes: 50 additions & 63 deletions packages/prime-tunnel/src/prime_tunnel/tunnel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import fcntl
import os
import re
import subprocess
Expand Down Expand Up @@ -95,6 +94,7 @@ def __init__(
self._config_file: Optional[Path] = None
self._started = False
self._output_lines: list[str] = []
self._capture_startup_output = False

@property
def tunnel_id(self) -> Optional[str]:
Expand Down Expand Up @@ -187,22 +187,27 @@ async def start(self) -> str:
raise
raise TunnelConnectionError(message=f"Failed to start frpc: {e}") from e

# 5. Wait for connection
try:
await self._wait_for_connection()
except BaseException:
await self._cleanup()
raise

# 6. Start background thread to drain pipes (prevents buffer exhaustion)
# 5. Start background threads to drain pipes (prevents buffer exhaustion)
self._output_lines = []
self._capture_startup_output = True
try:
self._start_pipe_drain()
except BaseException as e:
self._capture_startup_output = False
await self._cleanup()
if isinstance(e, asyncio.CancelledError):
raise
raise TunnelConnectionError(message=f"Failed to start pipe drain: {e}") from e

# 6. Wait for connection
try:
await self._wait_for_connection()
except BaseException:
self._capture_startup_output = False
await self._cleanup()
raise

self._capture_startup_output = False
self._started = True

return self.url
Expand All @@ -220,6 +225,8 @@ def sync_stop(self) -> None:
if not self._started:
return

self._capture_startup_output = False

if self._process is not None:
try:
self._process.terminate()
Expand Down Expand Up @@ -258,6 +265,8 @@ def sync_stop(self) -> None:

async def _cleanup(self) -> None:
"""Clean up tunnel resources."""
self._capture_startup_output = False

# Stop frpc process (this will cause drain threads to exit via EOF)
if self._process is not None:
try:
Expand Down Expand Up @@ -334,6 +343,8 @@ def drain_pipe(pipe):
self._recent_output.append(line)
if len(self._recent_output) > max_lines:
self._recent_output.pop(0)
if self._capture_startup_output:
self._output_lines.append(line)
except (OSError, ValueError):
pass # Pipe closed

Expand Down Expand Up @@ -403,71 +414,47 @@ def _write_frpc_config(self) -> Path:
async def _wait_for_connection(self) -> None:
"""Wait for frpc to establish connection."""
start_time = time.time()
self._output_lines = []
checked_lines = 0

if not hasattr(self, "_output_lock"):
self._output_lines = []
self._capture_startup_output = True
self._start_pipe_drain()

def startup_output() -> list[str]:
if not hasattr(self, "_output_lock"):
return list(self._output_lines)
with self._output_lock:
return list(self._output_lines)

while time.time() - start_time < self.connection_timeout:
if self._process is None:
raise TunnelConnectionError(message="frpc process not running")

return_code = self._process.poll()
if return_code is not None:
remaining_output = []
if self._process.stdout:
remaining_output.extend(self._process.stdout.readlines())
if self._process.stderr:
remaining_output.extend(self._process.stderr.readlines())
self._output_lines.extend(line.strip() for line in remaining_output if line.strip())

raise _parse_frpc_error(self._output_lines, self.tunnel_id, return_code)

if os.name == "posix":
# Set both pipes to non-blocking mode to drain them without deadlock
pipes_to_drain = []
original_flags = {}

for pipe in (self._process.stdout, self._process.stderr):
if pipe:
fd = pipe.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
original_flags[fd] = fl
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
pipes_to_drain.append(pipe)

try:
# Drain both stdout and stderr to prevent buffer exhaustion
for pipe in pipes_to_drain:
try:
while True:
line = pipe.readline()
if not line:
break
line = line.strip()
if line:
self._output_lines.append(line)
# Check for success/failure indicators
if "start proxy success" in line.lower():
return
if (
"login to the server failed" in line.lower()
or "connect to server error" in line.lower()
):
raise _parse_frpc_error(self._output_lines, self.tunnel_id)
except (BlockingIOError, IOError):
pass # No more data available on this pipe
finally:
# Restore original flags
for fd, fl in original_flags.items():
try:
fcntl.fcntl(fd, fcntl.F_SETFL, fl)
except (OSError, ValueError):
pass # Pipe may have closed
if hasattr(self, "_drain_threads"):
for thread in self._drain_threads:
thread.join(timeout=0.5)
raise _parse_frpc_error(startup_output(), self.tunnel_id, return_code)

lines = startup_output()
for line in lines[checked_lines:]:
line_lower = line.lower()
if "start proxy success" in line_lower:
return
if (
"login to the server failed" in line_lower
or "connect to server error" in line_lower
):
raise _parse_frpc_error(lines, self.tunnel_id)
checked_lines = len(lines)

await asyncio.sleep(0.1)

# Timeout - include any captured output
output_text = (
"\n".join(self._output_lines) if self._output_lines else "(no output captured)"
)
lines = startup_output()
output_text = "\n".join(lines) if lines else "(no output captured)"
raise TunnelTimeoutError(
f"Tunnel connection timed out after {self.connection_timeout}s\n"
f"--- frpc output ---\n{output_text}\n-------------------"
Expand Down
74 changes: 74 additions & 0 deletions packages/prime-tunnel/tests/test_binary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import hashlib
import io
import zipfile

from prime_tunnel import binary


class _FakeStream:
def __init__(self, data: bytes):
self._data = data

def __enter__(self):
return self

def __exit__(self, *_args):
return None

def raise_for_status(self):
return None

def iter_bytes(self, chunk_size: int):
for offset in range(0, len(self._data), chunk_size):
yield self._data[offset : offset + chunk_size]


def _windows_zip() -> bytes:
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w") as archive:
archive.writestr("frp_0.66.0_windows_amd64/frpc.exe", b"fake frpc exe")
return buffer.getvalue()


def test_get_platform_key_normalizes_windows_arch(monkeypatch):
monkeypatch.setattr(binary.platform, "system", lambda: "Windows")
monkeypatch.setattr(binary.platform, "machine", lambda: "AMD64")

assert binary._get_platform_key() == ("Windows", "x86_64")

monkeypatch.setattr(binary.platform, "machine", lambda: "ARM64")

assert binary._get_platform_key() == ("Windows", "arm64")


def test_get_frpc_path_uses_exe_on_windows(monkeypatch, tmp_path):
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.setattr(binary.platform, "system", lambda: "Windows")
monkeypatch.setattr(binary.platform, "machine", lambda: "AMD64")

def fake_download(path):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(b"fake frpc exe")

monkeypatch.setattr(binary, "_download_frpc", fake_download)

assert binary.get_frpc_path().name == "frpc.exe"


def test_download_frpc_extracts_windows_zip(monkeypatch, tmp_path):
archive_data = _windows_zip()
platform_key = ("Windows", "x86_64")
monkeypatch.setattr(binary, "_get_platform_key", lambda: platform_key)
monkeypatch.setitem(binary.FRPC_URLS, platform_key, "https://example.invalid/frp.zip")
monkeypatch.setitem(
binary.FRPC_CHECKSUMS,
platform_key,
hashlib.sha256(archive_data).hexdigest(),
)
monkeypatch.setattr(binary.httpx, "stream", lambda *_args, **_kwargs: _FakeStream(archive_data))

dest = tmp_path / "frpc.exe"

binary._download_frpc(dest)

assert dest.read_bytes() == b"fake frpc exe"
16 changes: 16 additions & 0 deletions packages/prime-tunnel/tests/test_tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,22 @@ def test_sync_stop_survives_delete_failure():
assert tunnel._tunnel_info is None


@pytest.mark.asyncio
async def test_wait_for_connection_reads_success_from_pipe_drain():
tunnel = _make_started_tunnel()
tunnel.connection_timeout = 1.0
tunnel._process.poll.return_value = None
tunnel._process.stdout = iter(["2026-01-01 00:00:00.000 [I] [proxy] start proxy success\n"])
tunnel._process.stderr = iter([])
tunnel._output_lines = []
tunnel._capture_startup_output = True

tunnel._start_pipe_drain()
await tunnel._wait_for_connection()

assert "start proxy success" in tunnel.recent_output[0]


# -- check_registered tests --


Expand Down
Loading