Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions interop/perf/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
FROM python:3.13-slim

ARG UV_VERSION=0.5.31

WORKDIR /app

# Install system dependencies (contributing.rst + transport Dockerfile)
# Install only required build/runtime dependencies for local libp2p install.
RUN apt-get update && apt-get install -y \
redis-tools \
build-essential \
cmake \
pkg-config \
libgmp-dev \
git \
curl \
libssl-dev \
libffi-dev \
&& rm -rf /var/lib/apt/lists/*

# Install uv (docs/contributing.rst Option 2: same as CI)
RUN pip install --no-cache-dir uv
RUN pip install --no-cache-dir "uv==${UV_VERSION}"
RUN uv --version

# Build context = repo root (same as interop/transport/Dockerfile for GitHub + local)
Expand All @@ -24,13 +25,16 @@ RUN uv venv /app/venv

# Interop/perf project: depends on libp2p from copied clone
COPY interop/perf/pyproject.toml .
COPY interop/perf/perf_test.py .
RUN uv pip install --python /app/venv/bin/python --upgrade pip && \
uv pip install --python /app/venv/bin/python --no-cache-dir -e .

COPY interop/perf/perf_test.py .

ENV PYTHONUNBUFFERED=1
ENV CI=true
ENV HTTP_PROXY=
ENV HTTPS_PROXY=
ENV ALL_PROXY=
ENV NO_PROXY=*
ENV PATH="/app/venv/bin:${PATH}"

ENTRYPOINT ["/app/venv/bin/python", "perf_test.py"]
149 changes: 139 additions & 10 deletions interop/perf/perf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from libp2p.crypto.ed25519 import create_new_key_pair
from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair
from libp2p.custom_types import TProtocol
from libp2p.network.config import ConnectionConfig
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.perf import PROTOCOL_NAME, PerfService
from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
Expand All @@ -47,12 +48,41 @@
PROTOCOL_ID as TLS_PROTOCOL_ID,
TLSTransport,
)
from libp2p.transport.quic.config import QUICTransportConfig
from libp2p.utils.address_validation import get_available_interfaces

MAX_TEST_TIMEOUT = 300
logger = logging.getLogger("libp2p.perf_test")


def _env_int(name: str, default: int, minimum: int | None = None) -> int:
raw = os.getenv(name)
if not raw:
value = default
else:
try:
value = int(raw)
except ValueError:
value = default
if minimum is not None:
value = max(value, minimum)
return value


def _env_float(name: str, default: float, minimum: float | None = None) -> float:
raw = os.getenv(name)
if not raw:
value = default
else:
try:
value = float(raw)
except ValueError:
value = default
if minimum is not None:
value = max(value, minimum)
return value


def configure_logging() -> None:
"""Configure logging based on DEBUG environment variable."""
debug_value = os.getenv("DEBUG") or "false"
Expand Down Expand Up @@ -94,7 +124,18 @@ def _percentile(sorted_values: list[float], p: float) -> float:
def _is_connection_closed_error(exc: BaseException) -> bool:
"""True if this is the expected 'Connection closed' from swarm/mplex on shutdown."""
msg = str(exc).lower()
if "connection closed" in msg:
if any(
token in msg
for token in (
"connection closed",
"connection is closed",
"cannot read: tls connection is closed",
"closed resource",
"broken resource",
"end of file",
"eof",
)
):
return True
if isinstance(exc, ExceptionGroup):
return all(_is_connection_closed_error(e) for e in exc.exceptions)
Expand Down Expand Up @@ -185,6 +226,26 @@ def __init__(self) -> None:

timeout_val = os.getenv("TEST_TIMEOUT_SECS") or "180"
self.test_timeout_seconds = min(int(timeout_val), MAX_TEST_TIMEOUT)
# Keep write blocks below Noise's 65535-byte message limit.
self.write_block_size = _env_int("PERF_WRITE_BLOCK_SIZE", 65500, minimum=1024)

# Tunable connection/upgrade timeouts for slower interop permutations.
self.dial_timeout_seconds = _env_float("DIAL_TIMEOUT_SECS", 30.0, minimum=1.0)
self.upgrade_timeout_seconds = _env_float(
"UPGRADE_TIMEOUT_SECS", 30.0, minimum=1.0
)
self.stream_negotiate_timeout_seconds = _env_float(
"STREAM_NEGOTIATE_TIMEOUT_SECS", 30.0, minimum=1.0
)
self.negotiate_timeout_seconds = _env_int(
"NEGOTIATE_TIMEOUT_SECS", 30, minimum=1
)
self.quic_connection_timeout_seconds = _env_float(
"QUIC_CONNECTION_TIMEOUT_SECS", 30.0, minimum=1.0
)
self.quic_idle_timeout_seconds = _env_float(
"QUIC_IDLE_TIMEOUT_SECS", 60.0, minimum=1.0
)

self.host: Any = None
self.redis_client: redis.Redis[str] | None = None
Expand Down Expand Up @@ -304,7 +365,22 @@ def create_tls_server_config(self) -> ssl.SSLContext | None:
return None

def _get_ip_value(self, addr: multiaddr.Multiaddr) -> str | None:
return addr.value_for_protocol("ip4") or addr.value_for_protocol("ip6")
for protocol in ("ip4", "ip6"):
try:
value = addr.value_for_protocol(protocol)
except Exception:
value = None
if value:
return value
return None

def _safe_value_for_protocol(
self, addr: multiaddr.Multiaddr, protocol: str
) -> str | None:
try:
return addr.value_for_protocol(protocol)
except Exception:
return None

def _get_protocol_names(self, addr: multiaddr.Multiaddr) -> list[str]:
return [p.name for p in addr.protocols()]
Expand Down Expand Up @@ -338,13 +414,19 @@ def create_listen_addresses(self, port: int = 0) -> list[multiaddr.Multiaddr]:
if self.transport == "quic-v1":
out = []
for addr in base_addrs:
ip_value = self._get_ip_value(addr)
tcp_port = addr.value_for_protocol("tcp") or port
if ip_value:
qa = self._build_quic_addr(ip_value, tcp_port)
try:
ip_value = self._get_ip_value(addr)
tcp_port = self._safe_value_for_protocol(addr, "tcp")
if not ip_value:
continue
qa = self._build_quic_addr(
ip_value, int(tcp_port) if tcp_port else port
)
_, p2p = self._extract_and_preserve_p2p(addr)
qa = self._encapsulate_with_p2p(qa, p2p)
out.append(qa)
except Exception:
continue
return out if out else [self._build_quic_addr("0.0.0.0", port)]
if self.transport == "ws":
out = []
Expand Down Expand Up @@ -422,8 +504,7 @@ def _replace_loopback_ip(self, addr: multiaddr.Multiaddr) -> str:
if ip_value not in ["127.0.0.1", "0.0.0.0", "::1", "::"]:
return str(addr)
actual = self.get_container_ip()
names = self._get_protocol_names(addr)
is_ipv6 = "ip6" in names
is_ipv6 = ":" in actual
parts = [f"/ip6/{actual}" if is_ipv6 else f"/ip4/{actual}"]
found = False
for proto, value in addr.items():
Expand Down Expand Up @@ -473,17 +554,41 @@ async def run_listener(self) -> None:
listen_addrs = self.create_listen_addresses(0)
tls_client = self.create_tls_client_config()
tls_server = self.create_tls_server_config()
connection_config = ConnectionConfig(
dial_timeout=self.dial_timeout_seconds,
inbound_upgrade_timeout=self.upgrade_timeout_seconds,
outbound_upgrade_timeout=self.upgrade_timeout_seconds,
outbound_stream_protocol_negotiation_timeout=self.stream_negotiate_timeout_seconds,
inbound_stream_protocol_negotiation_timeout=self.stream_negotiate_timeout_seconds,
)
quic_transport_opt = QUICTransportConfig(
connection_timeout=self.quic_connection_timeout_seconds,
idle_timeout=self.quic_idle_timeout_seconds,
dial_timeout=self.dial_timeout_seconds,
inbound_upgrade_timeout=self.upgrade_timeout_seconds,
outbound_upgrade_timeout=self.upgrade_timeout_seconds,
outbound_stream_protocol_negotiation_timeout=self.stream_negotiate_timeout_seconds,
inbound_stream_protocol_negotiation_timeout=self.stream_negotiate_timeout_seconds,
NEGOTIATE_TIMEOUT=self.stream_negotiate_timeout_seconds,
)

self.host = new_host(
key_pair=key_pair,
sec_opt=sec_opt,
muxer_opt=muxer_opt,
listen_addrs=listen_addrs,
negotiate_timeout=self.negotiate_timeout_seconds,
enable_quic=(self.transport == "quic-v1"),
quic_transport_opt=quic_transport_opt
if self.transport == "quic-v1"
else None,
tls_client_config=tls_client,
tls_server_config=tls_server,
connection_config=connection_config,
)
self.perf_service = PerfService(
self.host, {"write_block_size": self.write_block_size}
)
self.perf_service = PerfService(self.host)
await self.perf_service.start()
print(f"Perf service started (protocol {PROTOCOL_NAME})", file=sys.stderr)

Expand Down Expand Up @@ -545,19 +650,43 @@ async def run_dialer(self) -> None:
)
tls_client = self.create_tls_client_config()
tls_server = None
connection_config = ConnectionConfig(
dial_timeout=self.dial_timeout_seconds,
inbound_upgrade_timeout=self.upgrade_timeout_seconds,
outbound_upgrade_timeout=self.upgrade_timeout_seconds,
outbound_stream_protocol_negotiation_timeout=self.stream_negotiate_timeout_seconds,
inbound_stream_protocol_negotiation_timeout=self.stream_negotiate_timeout_seconds,
)
quic_transport_opt = QUICTransportConfig(
connection_timeout=self.quic_connection_timeout_seconds,
idle_timeout=self.quic_idle_timeout_seconds,
dial_timeout=self.dial_timeout_seconds,
inbound_upgrade_timeout=self.upgrade_timeout_seconds,
outbound_upgrade_timeout=self.upgrade_timeout_seconds,
outbound_stream_protocol_negotiation_timeout=self.stream_negotiate_timeout_seconds,
inbound_stream_protocol_negotiation_timeout=self.stream_negotiate_timeout_seconds,
NEGOTIATE_TIMEOUT=self.stream_negotiate_timeout_seconds,
)

kw: dict[str, Any] = {
"key_pair": key_pair,
"sec_opt": sec_opt,
"muxer_opt": muxer_opt,
"negotiate_timeout": self.negotiate_timeout_seconds,
"enable_quic": (self.transport == "quic-v1"),
"quic_transport_opt": quic_transport_opt
if self.transport == "quic-v1"
else None,
"tls_client_config": tls_client,
"tls_server_config": tls_server,
"connection_config": connection_config,
}
if dialer_listen_addrs:
kw["listen_addrs"] = dialer_listen_addrs
self.host = new_host(**kw)
self.perf_service = PerfService(self.host)
self.perf_service = PerfService(
self.host, {"write_block_size": self.write_block_size}
)
await self.perf_service.start()

# Must run host inside host.run() so swarm/nursery are active
Expand Down
3 changes: 2 additions & 1 deletion libp2p/perf/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
# https://github.com/libp2p/specs/blob/master/perf/perf.md

PROTOCOL_NAME = "/perf/1.0.0"
WRITE_BLOCK_SIZE = 65536
# Keep default write size below Noise max payload limit.
WRITE_BLOCK_SIZE = 65500
4 changes: 3 additions & 1 deletion libp2p/perf/perf_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ def __init__(self, host: IHost, init: PerfInit | None = None) -> None:
self._host = host
self._started = False
self._protocol = TProtocol(init_opts.get("protocol_name", PROTOCOL_NAME))
self._write_block_size = init_opts.get("write_block_size", WRITE_BLOCK_SIZE)
configured_block_size = init_opts.get("write_block_size", WRITE_BLOCK_SIZE)
# Keep block size under 65535 to avoid Noise frame-size violations.
self._write_block_size = min(int(configured_block_size), WRITE_BLOCK_SIZE)

# Pre-allocate buffer for sending data
self._buf = bytes(self._write_block_size)
Expand Down
Loading
Loading