-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathtest_processes.py
More file actions
161 lines (122 loc) ยท 5.26 KB
/
test_processes.py
File metadata and controls
161 lines (122 loc) ยท 5.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""Test process utilities."""
import socket
import threading
import time
from contextlib import closing
from unittest import mock
import pytest
from reflex.utils.processes import is_process_on_port
def test_is_process_on_port_free_port():
"""Test is_process_on_port returns False when port is free."""
# Find a free port
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(("127.0.0.1", 0))
free_port = sock.getsockname()[1]
# Port should be free after socket is closed
assert not is_process_on_port(free_port)
def test_is_process_on_port_occupied_port():
"""Test is_process_on_port returns True when port is occupied."""
# Create a server socket to occupy a port
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("127.0.0.1", 0))
server_socket.listen(1)
occupied_port = server_socket.getsockname()[1]
try:
# Port should be occupied
assert is_process_on_port(occupied_port)
finally:
server_socket.close()
def test_is_process_on_port_ipv6():
"""Test is_process_on_port works with IPv6."""
# Test with IPv6 socket
try:
server_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("::1", 0))
server_socket.listen(1)
occupied_port = server_socket.getsockname()[1]
try:
# Port should be occupied on IPv6
assert is_process_on_port(occupied_port)
finally:
server_socket.close()
except OSError:
# IPv6 might not be available on some systems
pytest.skip("IPv6 not available on this system")
def test_is_process_on_port_both_protocols():
"""Test is_process_on_port detects occupation on either IPv4 or IPv6."""
# Create IPv4 server
ipv4_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ipv4_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ipv4_socket.bind(("127.0.0.1", 0))
ipv4_socket.listen(1)
port = ipv4_socket.getsockname()[1]
try:
# Should detect IPv4 occupation
assert is_process_on_port(port)
finally:
ipv4_socket.close()
@pytest.mark.parametrize("port", [0, 1, 80, 443, 8000, 3000, 65535])
def test_is_process_on_port_various_ports(port):
"""Test is_process_on_port with various port numbers.
Args:
port: The port number to test.
"""
# This test just ensures the function doesn't crash with different port numbers
# The actual result depends on what's running on the system
result = is_process_on_port(port)
assert isinstance(result, bool)
def test_is_process_on_port_mock_socket_error():
"""Test is_process_on_port handles socket errors gracefully."""
with mock.patch("socket.socket") as mock_socket:
mock_socket_instance = mock.MagicMock()
mock_socket.return_value = mock_socket_instance
mock_socket_instance.__enter__.return_value = mock_socket_instance
mock_socket_instance.bind.side_effect = OSError("Mock socket error")
# Should return True when socket operations fail
result = is_process_on_port(8080)
assert result is True
def test_is_process_on_port_permission_error():
"""Test is_process_on_port handles permission errors."""
with mock.patch("socket.socket") as mock_socket:
mock_socket_instance = mock.MagicMock()
mock_socket.return_value = mock_socket_instance
mock_socket_instance.__enter__.return_value = mock_socket_instance
mock_socket_instance.bind.side_effect = PermissionError("Permission denied")
# Should return True when permission is denied (can't bind = port is "occupied")
result = is_process_on_port(80)
assert result is True
@pytest.mark.parametrize("should_listen", [True, False])
def test_is_process_on_port_concurrent_access(should_listen):
"""Test is_process_on_port works correctly with concurrent access.
Args:
should_listen: Whether the server socket should call listen() or just bind().
"""
def create_server_and_test(port_holder, listen):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("127.0.0.1", 0))
if listen:
server.listen(1)
port = server.getsockname()[1]
port_holder[0] = port
# Small delay to ensure the test runs while server is active
time.sleep(0.1)
server.close()
port_holder = [None]
thread = threading.Thread(
target=create_server_and_test, args=(port_holder, should_listen)
)
thread.start()
# Wait a bit for the server to start
time.sleep(0.05)
if port_holder[0] is not None:
# Port should be occupied while server is running (both bound-only and listening)
assert is_process_on_port(port_holder[0])
thread.join()
# After thread ends and server closes, port should be free
if port_holder[0] is not None:
# Give it a moment for the socket to be fully released
time.sleep(0.1)
assert not is_process_on_port(port_holder[0])