1- import subprocess , json , os , sys
1+ import subprocess , json , os , sys , time , logging
22from pathlib import Path
33from typing import List , Dict , Any , Optional
4- from .config_parser import XrayConfigBuilder , ConfigParams
4+
5+ # ! Import the CORRECT classes from your parser
6+ from .config_parser import ConfigParams , XrayConfigBuilder
7+
8+ logging .basicConfig (level = logging .INFO , format = '%(asctime)s - [%(levelname)s] - %(message)s' )
59
610class ConnectionTester :
711 def __init__ (self , vendor_path : str , core_engine_path : str ):
812 self .vendor_path = Path (vendor_path )
913 self .core_engine_path = Path (core_engine_path )
10-
1114 if sys .platform == "win32" :
12- self .tester_exe = "core_engine.exe" ; self .xray_exe = "xray .exe" ; self . hysteria_exe = "hysteria.exe"
15+ self .tester_exe , self . xray_exe , self .hysteria_exe = "core_engine .exe" , "xray.exe" , "hysteria.exe"
1316 elif sys .platform == "darwin" :
14- self .tester_exe = "core_engine_macos" ; self .xray_exe = "xray_macos" ; self .hysteria_exe = "hysteria_macos"
17+ self .tester_exe , self .xray_exe , self .hysteria_exe = "core_engine_macos" , "xray_macos" , "hysteria_macos"
1518 else :
16- self .tester_exe = "core_engine_linux" ; self .xray_exe = "xray_linux" ; self .hysteria_exe = "hysteria_linux"
19+ self .tester_exe , self .xray_exe , self .hysteria_exe = "core_engine_linux" , "xray_linux" , "hysteria_linux"
20+ if not (self .core_engine_path / self .tester_exe ).is_file (): raise FileNotFoundError ("Tester executable not found" )
1721
18- if not (self .core_engine_path / self .tester_exe ).is_file (): raise FileNotFoundError (f"Tester executable not found" )
19-
20- def test_outbounds (self , parsed_params : List [ConfigParams ], fragment_config : Optional [Dict [str , Any ]] = None , timeout : int = 60 ) -> List [Dict [str , Any ]]:
22+ def test_uris (self , parsed_params : List [ConfigParams ], fragment_config : Optional [Dict [str , Any ]] = None , timeout : int = 90 ) -> List [Dict [str , Any ]]:
23+ """
24+ * Takes a list of PRE-PARSED ConfigParams objects and tests them using the correct client.
25+ """
2126 if not parsed_params : return []
2227
23- test_configs = []
24- base_port = 20800
25- builder = XrayConfigBuilder ()
26-
27- for i , params in enumerate (parsed_params ):
28- config_dict = {}
29- client_path = ""
30- protocol = params .protocol
31-
32- if protocol in ["hysteria" , "hysteria2" ]:
33- protocol = "hysteria2"
34- client_path = str (self .vendor_path / self .hysteria_exe )
35- config_dict = {
36- "server" : f"{ params .address } :{ params .port } " ,
37- "auth" : params .hy2_password ,
38- "socks5" : {"listen" : f"127.0.0.1:{ base_port + i } " },
39- "tls" : {"sni" : params .sni , "insecure" : True }
40- }
28+ hysteria_params = []
29+ xray_params = []
30+
31+ # ! FIX: Iterate over the list of OBJECTS, don't re-parse them.
32+ for params in parsed_params :
33+ if params .protocol in ["hysteria" , "hysteria2" , "hy2" ]:
34+ hysteria_params .append (params )
4135 else :
42- client_path = str (self .vendor_path / self .xray_exe )
36+ xray_params .append (params )
37+
38+ all_results = []
39+
40+ if hysteria_params :
41+ logging .info (f"Testing { len (hysteria_params )} Hysteria configuration(s) individually..." )
42+ hysteria_results = self ._test_individual_clients (hysteria_params , self .hysteria_exe , "hysteria2" , timeout )
43+ all_results .extend (hysteria_results )
44+
45+ if xray_params :
46+ logging .info (f"Testing { len (xray_params )} Xray configuration(s) with one merged instance..." )
47+
48+ base_port = 20800
49+ builder = XrayConfigBuilder ()
50+ tests_to_run = []
51+
52+ for i , params in enumerate (xray_params ):
53+ inbound_port = base_port + i
54+ inbound_tag = f"inbound_{ i } "
55+
4356 outbound = builder .build_outbound_from_params (params , fragment_config = fragment_config )
44- if fragment_config :
45- if "streamSettings" not in outbound : outbound ["streamSettings" ] = {}
46- outbound ["streamSettings" ]["sockopt" ] = {"dialerProxy" : "fragment" }
47- config_dict = outbound
48-
49- test_configs .append ({
50- "tag" : params .tag ,
51- "protocol" : protocol ,
52- "config" : config_dict ,
53- "test_port" : base_port + i ,
54- "client_path" : client_path ,
55- "fragment_config" : fragment_config ,
57+ builder .add_outbound (outbound )
58+
59+ builder .add_inbound ({"tag" : inbound_tag , "port" : inbound_port , "listen" : "127.0.0.1" , "protocol" : "socks" , "settings" : {"auth" : "noauth" , "udp" : True , "userLevel" : 0 }})
60+ builder .config ["routing" ]["rules" ].append ({"type" : "field" , "inboundTag" : [inbound_tag ], "outboundTag" : outbound ["tag" ]})
61+ tests_to_run .append ({"tag" : outbound ["tag" ], "test_port" : inbound_port , "listen_ip" : "127.0.0.1" })
62+
63+ builder .add_outbound ({"protocol" : "freedom" , "tag" : "direct" })
64+ builder .add_outbound ({"protocol" : "blackhole" , "tag" : "block" })
65+
66+ if fragment_config :
67+ builder .add_fragment_outbound (fragment_config )
68+
69+ temp_config_path = self .core_engine_path / "merged_xray_config.json"
70+ with open (temp_config_path , "w" , encoding = 'utf-8' ) as f : f .write (builder .to_json ())
71+
72+ xray_process = None
73+ try :
74+ xray_process = subprocess .Popen ([str (self .vendor_path / self .xray_exe ), "-c" , str (temp_config_path )])
75+ logging .info (f"Merged Xray instance started (PID: { xray_process .pid } ). Waiting for initialization..." )
76+ time .sleep (1.5 )
77+ logging .info (f"Sending { len (tests_to_run )} Xray test jobs to Go engine..." )
78+ xray_results = self ._run_go_tester (tests_to_run , timeout )
79+ all_results .extend (xray_results )
80+ finally :
81+ if xray_process : xray_process .terminate (); xray_process .wait ()
82+ if temp_config_path .exists (): temp_config_path .unlink ()
83+
84+ return all_results
85+
86+ def _test_individual_clients (self , params_list : List [ConfigParams ], client_exe : str , protocol_name : str , timeout : int ) -> List [Dict [str , Any ]]:
87+ test_jobs = []
88+ base_port = 30800
89+ ip_counter = 2
90+ for i , params in enumerate (params_list ):
91+ test_jobs .append ({
92+ "tag" : params .tag , "protocol" : protocol_name ,
93+ "config_uri" : f"{ params .protocol } ://{ params .hy2_password } @{ params .address } :{ params .port } ?sni={ params .sni } " ,
94+ "listen_ip" : f"127.0.0.{ ip_counter } " , "test_port" : base_port + i ,
95+ "client_path" : str (self .vendor_path / client_exe )
5696 })
97+ ip_counter += 1
5798
58- input_json = json . dumps ( test_configs , default = lambda o : o . __dict__ )
99+ return self . _run_go_tester ( test_jobs , timeout )
59100
101+ def _run_go_tester (self , payload : List [Dict [str , Any ]], timeout : int ) -> List [Dict [str , Any ]]:
102+ input_json = json .dumps (payload , default = lambda o : o .__dict__ )
60103 try :
61104 with subprocess .Popen ([str (self .core_engine_path / self .tester_exe )], stdin = subprocess .PIPE , stdout = subprocess .PIPE , stderr = subprocess .PIPE , text = True , encoding = 'utf-8' ) as process :
62105 stdout , stderr = process .communicate (input = input_json , timeout = timeout )
63- if process .returncode != 0 : print (f"! Go engine error:\n { stderr } " ); return []
106+ if process .returncode != 0 : logging . error (f"Go engine error:\n { stderr } " ); return []
64107 return json .loads (stdout ) if stdout else []
65108 except Exception as e :
66- print (f"! Tester execution error: { e } " ); return []
109+ logging . error (f"Tester execution error: { e } " ); return []
0 commit comments