Skip to content

Commit c8e9058

Browse files
committed
add tests and overflow handling
1 parent 157651d commit c8e9058

2 files changed

Lines changed: 190 additions & 0 deletions

File tree

reflex/utils/processes.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ def _can_bind_at_port(
6969
try:
7070
with closing(socket.socket(address_family, socket.SOCK_STREAM)) as sock:
7171
sock.bind((address, port))
72+
except OverflowError:
73+
return False
7274
except PermissionError:
7375
return False
7476
except OSError:
@@ -102,8 +104,15 @@ def change_port(port: int, _type: str) -> int:
102104
Returns:
103105
The new port.
104106
107+
Raises:
108+
Exit: If the port is invalid or if the new port is occupied.
105109
"""
106110
new_port = port + 1
111+
if new_port < 0 or new_port > 65535:
112+
console.error(
113+
f"The {_type} port: {port} is invalid. It must be between 0 and 65535."
114+
)
115+
raise click.exceptions.Exit(1)
107116
if is_process_on_port(new_port):
108117
return change_port(new_port, _type)
109118
console.info(
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
"""Test process utilities."""
2+
3+
import socket
4+
import threading
5+
import time
6+
from contextlib import closing
7+
from unittest import mock
8+
9+
import pytest
10+
11+
from reflex.utils.processes import is_process_on_port
12+
13+
14+
def test_is_process_on_port_free_port():
15+
"""Test is_process_on_port returns False when port is free."""
16+
# Find a free port
17+
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
18+
sock.bind(("127.0.0.1", 0))
19+
free_port = sock.getsockname()[1]
20+
21+
# Port should be free after socket is closed
22+
assert not is_process_on_port(free_port)
23+
24+
25+
def test_is_process_on_port_occupied_port():
26+
"""Test is_process_on_port returns True when port is occupied."""
27+
# Create a server socket to occupy a port
28+
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
29+
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
30+
server_socket.bind(("127.0.0.1", 0))
31+
server_socket.listen(1)
32+
33+
occupied_port = server_socket.getsockname()[1]
34+
35+
try:
36+
# Port should be occupied
37+
assert is_process_on_port(occupied_port)
38+
finally:
39+
server_socket.close()
40+
41+
42+
def test_is_process_on_port_ipv6():
43+
"""Test is_process_on_port works with IPv6."""
44+
# Test with IPv6 socket
45+
try:
46+
server_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
47+
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
48+
server_socket.bind(("::1", 0))
49+
server_socket.listen(1)
50+
51+
occupied_port = server_socket.getsockname()[1]
52+
53+
try:
54+
# Port should be occupied on IPv6
55+
assert is_process_on_port(occupied_port)
56+
finally:
57+
server_socket.close()
58+
except OSError:
59+
# IPv6 might not be available on some systems
60+
pytest.skip("IPv6 not available on this system")
61+
62+
63+
def test_is_process_on_port_both_protocols():
64+
"""Test is_process_on_port detects occupation on either IPv4 or IPv6."""
65+
# Create IPv4 server
66+
ipv4_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
67+
ipv4_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
68+
ipv4_socket.bind(("127.0.0.1", 0))
69+
ipv4_socket.listen(1)
70+
71+
port = ipv4_socket.getsockname()[1]
72+
73+
try:
74+
# Should detect IPv4 occupation
75+
assert is_process_on_port(port)
76+
finally:
77+
ipv4_socket.close()
78+
79+
80+
@pytest.mark.parametrize("port", [1, 80, 443, 8000, 3000, 65535])
81+
def test_is_process_on_port_various_ports(port):
82+
"""Test is_process_on_port with various port numbers.
83+
84+
Args:
85+
port: The port number to test.
86+
"""
87+
# This test just ensures the function doesn't crash with different port numbers
88+
# The actual result depends on what's running on the system
89+
result = is_process_on_port(port)
90+
assert isinstance(result, bool)
91+
92+
93+
def test_is_process_on_port_privileged_port():
94+
"""Test is_process_on_port handles privileged ports gracefully."""
95+
# Port 1 is typically privileged and should return True if we can't bind
96+
# (either because something is running or we don't have permission)
97+
result = is_process_on_port(1)
98+
assert isinstance(result, bool)
99+
100+
101+
def test_is_process_on_port_invalid_port():
102+
"""Test is_process_on_port with invalid port numbers."""
103+
# Test with port 0 (should be handled gracefully)
104+
result = is_process_on_port(0)
105+
assert isinstance(result, bool)
106+
107+
# Test with port out of range (should handle OSError gracefully)
108+
result = is_process_on_port(65536)
109+
assert isinstance(result, bool)
110+
111+
112+
def test_is_process_on_port_mock_socket_error():
113+
"""Test is_process_on_port handles socket errors gracefully."""
114+
with mock.patch("socket.socket") as mock_socket:
115+
mock_socket_instance = mock.MagicMock()
116+
mock_socket.return_value = mock_socket_instance
117+
mock_socket_instance.__enter__.return_value = mock_socket_instance
118+
mock_socket_instance.bind.side_effect = OSError("Mock socket error")
119+
120+
# Should return True when socket operations fail
121+
result = is_process_on_port(8080)
122+
assert result is True
123+
124+
125+
def test_is_process_on_port_permission_error():
126+
"""Test is_process_on_port handles permission errors."""
127+
with mock.patch("socket.socket") as mock_socket:
128+
mock_socket_instance = mock.MagicMock()
129+
mock_socket.return_value = mock_socket_instance
130+
mock_socket_instance.__enter__.return_value = mock_socket_instance
131+
mock_socket_instance.bind.side_effect = PermissionError("Permission denied")
132+
133+
# Should return True when permission is denied (can't bind = port is "occupied")
134+
result = is_process_on_port(80)
135+
assert result is True
136+
137+
138+
@pytest.mark.parametrize("should_listen", [True, False])
139+
def test_is_process_on_port_concurrent_access(should_listen):
140+
"""Test is_process_on_port works correctly with concurrent access.
141+
142+
Args:
143+
should_listen: Whether the server socket should call listen() or just bind().
144+
"""
145+
146+
def create_server_and_test(port_holder, listen):
147+
"""Create a server socket and test port detection."""
148+
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149+
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
150+
server.bind(("127.0.0.1", 0))
151+
152+
if listen:
153+
server.listen(1)
154+
155+
port = server.getsockname()[1]
156+
port_holder[0] = port
157+
158+
# Small delay to ensure the test runs while server is active
159+
time.sleep(0.1)
160+
server.close()
161+
162+
port_holder = [None]
163+
thread = threading.Thread(
164+
target=create_server_and_test, args=(port_holder, should_listen)
165+
)
166+
thread.start()
167+
168+
# Wait a bit for the server to start
169+
time.sleep(0.05)
170+
171+
if port_holder[0] is not None:
172+
# Port should be occupied while server is running (both bound-only and listening)
173+
assert is_process_on_port(port_holder[0])
174+
175+
thread.join()
176+
177+
# After thread ends and server closes, port should be free
178+
if port_holder[0] is not None:
179+
# Give it a moment for the socket to be fully released
180+
time.sleep(0.1)
181+
assert not is_process_on_port(port_holder[0])

0 commit comments

Comments
 (0)