Skip to content

Commit d71bdda

Browse files
authored
[Cherry-Pick][CI] Optimize clean_ports logic by removing redundant code(#7809) (#7830)
1 parent 04e4ae8 commit d71bdda

5 files changed

Lines changed: 47 additions & 213 deletions

File tree

.github/workflows/_unit_test_coverage.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ jobs:
394394
wget -O ${filename} ${diff_cov_result_json_url} || echo "Download cov json file failed, but continuing..."
395395
fi
396396
if [ -f "${filename}" ];then
397-
echo "Failed test cases:"
397+
echo "GPU Patch Coverage Details:"
398398
if command -v jq >/dev/null 2>&1; then
399399
jq . "${filename}"
400400
else

tests/engine/test_common_engine.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
import numpy as np
3030
import paddle
31-
from e2e.utils.serving_utils import clean_ports
31+
from e2e.utils.serving_utils import PORTS_TO_CLEAN, clean_ports
3232

3333
if not hasattr(paddle, "enable_compat"):
3434
paddle.enable_compat = lambda scope=None: None
@@ -512,6 +512,21 @@ def _make_cfg(self, **kwargs):
512512
engine_worker_queue_port = [engine_worker_queue_port + 21 + i for i in range(dp // nnode)]
513513
cache_queue_port = [cache_queue_port + 21 + i for i in range(dp // nnode)]
514514

515+
# Add ports to cleanup list
516+
ports_to_add = []
517+
if isinstance(engine_worker_queue_port, list):
518+
ports_to_add.extend(engine_worker_queue_port)
519+
else:
520+
ports_to_add.append(engine_worker_queue_port)
521+
if isinstance(cache_queue_port, list):
522+
ports_to_add.extend(cache_queue_port)
523+
else:
524+
ports_to_add.append(cache_queue_port)
525+
526+
for port in ports_to_add:
527+
if port not in PORTS_TO_CLEAN:
528+
PORTS_TO_CLEAN.append(port)
529+
515530
if kwargs.get("num_gpu_blocks_override") is not None and "kv_cache_ratio" not in kwargs:
516531
kwargs["kv_cache_ratio"] = 1
517532

tests/entrypoints/openai/test_run_batch.py

Lines changed: 9 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import os
2020
import shutil
2121
import signal
22-
import socket
2322
import subprocess
2423
import sys
2524
import tempfile
@@ -63,124 +62,16 @@
6362
write_local_file,
6463
)
6564

66-
# Read ports from environment variables; use default values if not set
67-
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
68-
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
69-
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
70-
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
71-
72-
# List of ports to clean before and after tests
73-
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]
74-
75-
76-
def is_port_open(host: str, port: int, timeout=1.0):
77-
"""
78-
Check if a TCP port is open on the given host.
79-
Returns True if connection succeeds, False otherwise.
80-
"""
81-
try:
82-
with socket.create_connection((host, port), timeout):
83-
return True
84-
except Exception:
85-
return False
86-
87-
88-
def _clean_cuda_process():
89-
"""
90-
Kill processes that are using CUDA devices.
91-
NOTE: Do not call this function directly, use the `clean` function instead.
92-
"""
93-
try:
94-
subprocess.run("fuser -k /dev/nvidia*", shell=True, timeout=5)
95-
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
96-
pass
97-
98-
99-
def kill_process_on_port(port: int):
100-
"""
101-
Kill processes that are listening on the given port.
102-
Uses multiple methods to ensure thorough cleanup.
103-
"""
104-
current_pid = os.getpid()
105-
parent_pid = os.getppid()
106-
107-
# Method 1: Use lsof to find processes
108-
try:
109-
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
110-
for pid in output.splitlines():
111-
pid = int(pid)
112-
if pid in (current_pid, parent_pid):
113-
print(f"Skip killing current process (pid={pid}) on port {port}")
114-
continue
115-
try:
116-
# First try SIGTERM for graceful shutdown
117-
os.kill(pid, signal.SIGTERM)
118-
time.sleep(1)
119-
# Then SIGKILL if still running
120-
os.kill(pid, signal.SIGKILL)
121-
print(f"Killed process on port {port}, pid={pid}")
122-
except ProcessLookupError:
123-
pass # Process already terminated
124-
except subprocess.CalledProcessError:
125-
pass
126-
127-
# Method 2: Use netstat and fuser as backup
128-
try:
129-
# Find processes using netstat and awk
130-
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
131-
output = subprocess.check_output(cmd, shell=True).decode().strip()
132-
for pid in output.splitlines():
133-
if pid and pid.isdigit():
134-
pid = int(pid)
135-
if pid in (current_pid, parent_pid):
136-
continue
137-
try:
138-
os.kill(pid, signal.SIGKILL)
139-
print(f"Killed process (netstat) on port {port}, pid={pid}")
140-
except ProcessLookupError:
141-
pass
142-
except (subprocess.CalledProcessError, FileNotFoundError):
143-
pass
144-
145-
# Method 3: Use fuser if available
146-
try:
147-
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
148-
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
149-
pass
150-
151-
152-
def clean_ports(ports=None):
153-
"""
154-
Kill all processes occupying the ports
155-
"""
156-
if ports is None:
157-
ports = PORTS_TO_CLEAN
158-
159-
print(f"Cleaning ports: {ports}")
160-
for port in ports:
161-
kill_process_on_port(port)
162-
163-
# Double check and retry if ports are still in use
164-
time.sleep(2)
165-
for port in ports:
166-
if is_port_open("127.0.0.1", port, timeout=0.1):
167-
print(f"Port {port} still in use, retrying cleanup...")
168-
kill_process_on_port(port)
169-
time.sleep(1)
170-
171-
172-
def clean(ports=None):
173-
"""
174-
Clean up resources used during testing.
175-
"""
176-
clean_ports(ports)
177-
178-
# Clean CUDA devices before and after tests.
179-
# NOTE: It is dangerous to use this flag on development machines, as it may kill other processes
180-
clean_cuda = int(os.getenv("CLEAN_CUDA", "0")) == 1
181-
if clean_cuda:
182-
_clean_cuda_process()
65+
current_dir = os.path.dirname(os.path.abspath(__file__))
66+
project_root = os.path.abspath(os.path.join(current_dir, ".."))
67+
if project_root not in sys.path:
68+
sys.path.insert(0, project_root)
18369

70+
from e2e.utils.serving_utils import (
71+
FD_CACHE_QUEUE_PORT,
72+
FD_ENGINE_QUEUE_PORT,
73+
clean_ports,
74+
)
18475

18576
INPUT_BATCH = """
18677
{"custom_id": "req-00001", "method": "POST", "url": "/v1/chat/completions", "body": {"messages": [{"role": "user", "content": "Can you write a short poem? (id=1)"}], "temperature": 0.7, "max_tokens": 200}}

tests/layers/test_plas_attention.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,16 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import os
16+
import sys
17+
1518
import paddle
1619

20+
tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
21+
sys.path.insert(0, tests_dir)
22+
23+
from e2e.utils.serving_utils import clean_ports
24+
1725
try:
1826
from fastdeploy.model_executor.ops.gpu import (
1927
fused_block_mean_and_rope,
@@ -338,6 +346,9 @@ def test_plas_attention(self):
338346
self.compare_attn(attn_out, qk_gate_topk_idx)
339347

340348
def test_server(self):
349+
# Clean ports before starting the test
350+
clean_ports()
351+
341352
if get_cur_cu_seq_len_k is None:
342353
return
343354
os.environ["FD_ATTENTION_BACKEND"] = "PLAS_ATTN"

tests/model_loader/test_load_ernie_vl.py

Lines changed: 10 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import json
1616
import os
1717
import signal
18-
import socket
1918
import subprocess
2019
import sys
2120
import time
@@ -28,96 +27,14 @@
2827
if project_root not in sys.path:
2928
sys.path.insert(0, project_root)
3029

31-
# Read ports from environment variables; use default values if not set
32-
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
33-
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
34-
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
35-
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
36-
37-
# List of ports to clean before and after tests
38-
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]
39-
40-
41-
def is_port_open(host: str, port: int, timeout=1.0):
42-
"""
43-
Check if a TCP port is open on the given host.
44-
Returns True if connection succeeds, False otherwise.
45-
"""
46-
try:
47-
with socket.create_connection((host, port), timeout):
48-
return True
49-
except Exception:
50-
return False
51-
52-
53-
def kill_process_on_port(port: int):
54-
"""
55-
Kill processes that are listening on the given port.
56-
Uses multiple methods to ensure thorough cleanup.
57-
"""
58-
current_pid = os.getpid()
59-
parent_pid = os.getppid()
60-
61-
# Method 1: Use lsof to find processes
62-
try:
63-
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
64-
for pid in output.splitlines():
65-
pid = int(pid)
66-
if pid in (current_pid, parent_pid):
67-
print(f"Skip killing current process (pid={pid}) on port {port}")
68-
continue
69-
try:
70-
# First try SIGTERM for graceful shutdown
71-
os.kill(pid, signal.SIGTERM)
72-
time.sleep(1)
73-
# Then SIGKILL if still running
74-
os.kill(pid, signal.SIGKILL)
75-
print(f"Killed process on port {port}, pid={pid}")
76-
except ProcessLookupError:
77-
pass # Process already terminated
78-
except subprocess.CalledProcessError:
79-
pass
80-
81-
# Method 2: Use netstat and fuser as backup
82-
try:
83-
# Find processes using netstat and awk
84-
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
85-
output = subprocess.check_output(cmd, shell=True).decode().strip()
86-
for pid in output.splitlines():
87-
if pid and pid.isdigit():
88-
pid = int(pid)
89-
if pid in (current_pid, parent_pid):
90-
continue
91-
try:
92-
os.kill(pid, signal.SIGKILL)
93-
print(f"Killed process (netstat) on port {port}, pid={pid}")
94-
except ProcessLookupError:
95-
pass
96-
except (subprocess.CalledProcessError, FileNotFoundError):
97-
pass
98-
99-
# Method 3: Use fuser if available
100-
try:
101-
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
102-
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
103-
pass
104-
105-
106-
def clean_ports():
107-
"""
108-
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
109-
"""
110-
print(f"Cleaning ports: {PORTS_TO_CLEAN}")
111-
for port in PORTS_TO_CLEAN:
112-
kill_process_on_port(port)
113-
114-
# Double check and retry if ports are still in use
115-
time.sleep(2)
116-
for port in PORTS_TO_CLEAN:
117-
if is_port_open("127.0.0.1", port, timeout=0.1):
118-
print(f"Port {port} still in use, retrying cleanup...")
119-
kill_process_on_port(port)
120-
time.sleep(1)
30+
from e2e.utils.serving_utils import (
31+
FD_API_PORT,
32+
FD_CACHE_QUEUE_PORT,
33+
FD_ENGINE_QUEUE_PORT,
34+
FD_METRICS_PORT,
35+
clean_ports,
36+
is_port_open,
37+
)
12138

12239

12340
@pytest.fixture(scope="session", autouse=True)
@@ -184,8 +101,8 @@ def setup_and_run_server():
184101
start_new_session=True, # Enables killing full group via os.killpg
185102
)
186103

187-
# Wait up to 10 minutes for API server to be ready
188-
for _ in range(10 * 60):
104+
# Wait up to 5 minutes for API server to be ready
105+
for _ in range(300):
189106
if is_port_open("127.0.0.1", FD_API_PORT):
190107
print(f"API server is up on port {FD_API_PORT}")
191108
break

0 commit comments

Comments
 (0)