Skip to content
Open
83 changes: 54 additions & 29 deletions parsers/clash2base64.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import base64,json,re
from urllib.parse import quote, unquote

def clash2v2ray(share_link):
def clash2v2ray(original_share_link):
share_link = original_share_link.copy()
link = ''
# 2. 协议兼容补丁:针对 Hysteria2/TUIC 只有 ports (端口跳跃) 无 port 的情况
if 'port' not in share_link and 'ports' in share_link:
first_port = str(share_link['ports']).replace(',', '-').split('-')[0]
share_link['port'] = int(first_port) if first_port.isdigit() else 443
if share_link['type'] == 'vmess':
try:
vmess_info = {
Expand Down Expand Up @@ -228,19 +233,18 @@ def clash2v2ray(share_link):
return link
# TODO
elif share_link['type'] == 'tuic':
link = "tuic://{uuid}:{password}@{server}:{port}?alpn={alpn}&allow_insecure={allowInsecure}&disable_sni={disable_sni}&sni={sni}&udp_relay_mode={udp_relay_mode}&congestion_control={control}#{name}".format(
uuid = share_link['uuid'],
password = share_link['password'],
server = share_link['server'],
port = share_link['port'],
alpn = quote(','.join(share_link.get('alpn', '')), 'utf-8'),
allowInsecure = share_link.get('allowInsecure', '1'),
disable_sni = '0' if share_link.get('disable-sni', '') == False else '1',
sni = share_link.get('sni', ''),
udp_relay_mode = share_link.get('udp-relay-mode', 'native'),
control = share_link.get('congestion-controller', 'bbr'),
name = share_link['name'].encode('utf-8', 'surrogatepass').decode('utf-8')
)
# 提取原始多端口
mport = share_link.get('ports', '')
params = {
"sni": share_link.get('sni', ''),
"alpn": ','.join(share_link.get('alpn', [])),
"allow_insecure": '1' if share_link.get('skip-cert-verify') else '0',
"congestion_control": share_link.get('congestion-controller', 'bbr'),
"udp_relay_mode": share_link.get('udp-relay-mode', 'native'),
"mport": mport # 将多端口注入 URI 查询参数
}
query_string = '&'.join([f"{k}={v}" for k, v in params.items() if v])
link = f"tuic://{share_link['uuid']}:{share_link['password']}@{share_link['server']}:{share_link.get('port', 443)}?{query_string}#{share_link.get('name', '')}"
return link
# TODO
elif share_link['type'] == 'hysteria':
Expand All @@ -260,21 +264,42 @@ def clash2v2ray(share_link):
return link
# TODO
elif share_link['type'] == 'hysteria2':
link = "hysteria2://{auth}@{server}:{port}{ports}?insecure={allowInsecure}&obfs={obfs}&obfs-password={obfspassword}&pinSHA256={fingerprint}&sni={sni}&alpn={alpn}&upmbps={upmbps}&downmbps={downmbps}#{name}".format(
auth = share_link.get('password', share_link.get('auth', '')),
server = share_link['server'],
port = share_link['port'],
ports=",{}".format(share_link['ports']) if share_link.get('ports') else '',
allowInsecure = '0' if share_link.get('skip-cert-verify', '') == False else '1',
obfs = share_link.get('obfs', 'none'),
obfspassword = share_link.get('obfs-password', ''),
fingerprint = share_link.get('fingerprint', ''),
sni = share_link.get('sni', ''),
alpn = quote(','.join(share_link.get('alpn', '')), 'utf-8'),
upmbps = share_link.get('up', ''),
downmbps = share_link.get('down', ''),
name = share_link['name'].encode('utf-8', 'surrogatepass').decode('utf-8')
)
# 1. 端口与多端口 (mport) 规范化处理
base_port = share_link.get('port')
mport = share_link.get('ports', '')
if not base_port and mport:
first_port = str(mport).replace(',', '-').split('-')[0]
base_port = int(first_port) if first_port.isdigit() else 443

# 2. ALPN 强健性处理 (兼容 List 和 String)
alpn_raw = share_link.get('alpn', '')
if isinstance(alpn_raw, list):
alpn_str = ','.join(alpn_raw)
else:
alpn_str = str(alpn_raw)

# 3. 动态构建标准查询参数
params = {
"insecure": '1' if share_link.get('skip-cert-verify') else '0',
"obfs": share_link.get('obfs', ''),
"obfs-password": share_link.get('obfs-password', ''),
"pinSHA256": share_link.get('fingerprint', ''),
"sni": share_link.get('sni', ''),
"alpn": quote(alpn_str, 'utf-8'),
"mport": mport,
"upmbps": share_link.get('up', ''),
"downmbps": share_link.get('down', '')
}

# 过滤掉空值和默认值(none),保持 URI 纯净
query_string = '&'.join([f"{k}={v}" for k, v in params.items() if v and v != 'none'])

auth = share_link.get('password', share_link.get('auth', ''))
server = share_link['server']
name = quote(share_link.get('name', 'Hysteria2_Node'), 'utf-8')

# 4. 生成标准 Hysteria2 URI
link = f"hysteria2://{auth}@{server}:{base_port}?{query_string}#{name}"
return link
# TODO
elif share_link['type'] == 'wireguard':
Expand Down
33 changes: 29 additions & 4 deletions parsers/hysteria2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,60 @@ def parse(data):
for k, v in parse_qs(server_info.query).items()
)
if server_info.path:
server_info = server_info._replace(netloc=server_info.netloc + server_info.path, path="")
server_info = server_info._replace(netloc=server_info.netloc + server_info.path, path="")

# 尝试匹配官方 URI 规范中的逗号分隔端口 (如 server:21581,21400-21599)
ports_match = re.search(r',(\d+-\d+)', server_info.netloc)

node = {
'tag': unquote(server_info.fragment) or tool.genName()+'_hysteria2',
'type': 'hysteria2',
'server': re.sub(r"\[|\]", "", server_info.netloc.split("@")[-1].rsplit(":", 1)[0]),
'server_port': int(re.search(r'\d+', server_info.netloc.rsplit(":", 1)[-1].split(",")[0]).group()),
"password": netquery['auth'] if netquery.get('auth') else server_info.netloc.split("@")[0].rsplit(":", 1)[-1],
'up_mbps': int(re.search(r'\d+', netquery.get('upmbps', '10')).group()),
'down_mbps': int(re.search(r'\d+', netquery.get('downmbps', '100')).group()),
'tls': {
'enabled': True,
'server_name': netquery.get('sni', netquery.get('peer', '')),
'insecure': False
}
}

# 【核心修复区】:双链路多端口兼容与数据清洗
# 优先读取官方规范的逗号分隔端口
if netquery.get('upmbps'):
up_match = re.search(r'\d+', str(netquery['upmbps']))
if up_match:
node['up_mbps'] = int(up_match.group())

if netquery.get('downmbps'):
down_match = re.search(r'\d+', str(netquery['downmbps']))
if down_match:
node['down_mbps'] = int(down_match.group())

if ports_match:
node['server_ports'] = [ports_match.group(1).replace('-', ':')]
if 'server_port' in node:
del node['server_port']
# 兜底读取社区泛用的 mport 查询参数 (承接 clash2base64 的输出)
elif netquery.get('mport'):
node['server_ports'] = [str(netquery['mport']).replace('-', ':')]
if 'server_port' in node:
del node['server_port']

if netquery.get('insecure') in ['1', 'true'] or netquery.get('allowInsecure') == '1':
node['tls']['insecure'] = True
if not node['tls'].get('server_name'):
del node['tls']['server_name']
node['tls']['insecure'] = True
elif node['tls']['server_name'] == 'None':
del node['tls']['server_name']

node['tls']['alpn'] = (netquery.get('alpn') or "h3").strip('{}').split(',')

if netquery.get('obfs', '') not in ['none', '']:
node['obfs'] = {
'type': netquery['obfs'],
'password': netquery['obfs-password'],
}
return (node)

return node
123 changes: 72 additions & 51 deletions parsers/ss.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import tool,json,re,urllib
import tool, json, re, urllib
from urllib.parse import parse_qs

def parse(data):
param = data[5:]
if not param or param.isspace():
return None
node = {
'tag':tool.genName()+'_shadowsocks',
'type':'shadowsocks',
'server':None,
'server_port':0,
'method':None,
'password':None
'tag': tool.genName() + '_shadowsocks',
'type': 'shadowsocks',
'server': None,
'server_port': 0,
'method': None,
'password': None
}
flag = 0
if param.find('uot') > -1:
# 替换原有的 if param.find('uot') > -1: 逻辑
query_part = param.split('?')[1].lower() if '?' in param else ''
if 'uot=1' in query_part or 'uot=true' in query_part:
node["udp_over_tcp"] = {
'enabled': True,
'version': 2
Expand All @@ -28,6 +31,7 @@ def parse(data):
remark = urllib.parse.unquote(param[param.find('?remarks=') + 9:])
node['tag'] = remark
param = param[:param.find('?remarks=')]

if param.find('plugin=obfs-local') > -1 or param.find('plugin=simple-obfs') > -1:
if param.find('&', param.find('plugin')) > -1:
plugin = urllib.parse.unquote(param[param.find('plugin'):param.find('&', param.find('plugin'))])
Expand All @@ -42,36 +46,41 @@ def parse(data):
'obfs-host={};'.format(plugin_dict["obfs-host"]) if plugin_dict.get("obfs-host") else ''
)
node['plugin_opts'] = result_str

elif param.find('v2ray-plugin') > -1:
if param.find('&', param.find('v2ray-plugin')) > -1:
try:
plugin = tool.b64Decode(param[param.find('v2ray-plugin')+13:param.find('&', param.find('v2ray-plugin'))]).decode('utf-8')
except:
plugin = urllib.parse.unquote(param[param.find('v2ray-plugin')+15:param.find('&', param.find('v2ray-plugin'))])
pairs = [pair.split('=') for pair in plugin.split(';') if '=' in pair and pair.count('=') == 1]
plugin = str({key: value for key, value in pairs})
plugin_raw = param[param.find('v2ray-plugin')+13:param.find('&', param.find('v2ray-plugin'))]
else:
try:
plugin = tool.b64Decode(param[param.find('v2ray-plugin')+13:]).decode('utf-8')
except:
plugin = urllib.parse.unquote(param[param.find('v2ray-plugin')+15:])
pairs = [pair.split('=') for pair in plugin.split(';') if '=' in pair and pair.count('=') == 1]
plugin = str({key: value for key, value in pairs})
plugin_raw = param[param.find('v2ray-plugin')+13:]

try:
plugin_str = tool.b64Decode(plugin_raw).decode('utf-8')
# 如果是 JSON 格式,直接解析
if plugin_str.startswith('{'):
plugin_dict = json.loads(plugin_str)
else:
pairs = [pair.split('=') for pair in plugin_str.split(';') if '=' in pair and pair.count('=') == 1]
plugin_dict = {key: value for key, value in pairs}
except:
plugin_str = urllib.parse.unquote(plugin_raw.lstrip('='))
pairs = [pair.split('=') for pair in plugin_str.split(';') if '=' in pair and pair.count('=') == 1]
plugin_dict = {key: value for key, value in pairs}

param = param[:param.find('?')]
node['plugin'] = 'v2ray-plugin'
plugin = plugin.replace('true', '1').replace('false', '0')
plugin = eval(plugin)

result_str = "mode={};{}{}{}{}{}{}{}".format(
plugin.get("mode", ''),
'host={};'.format(plugin["host"]) if plugin.get("host") else '',
'path={};'.format(plugin["path"]) if plugin.get("path") else '',
'mux={};'.format(plugin["mux"]) if plugin.get("mux") == 1 else '',
'headers={};'.format(json.dumps(plugin["headers"])) if plugin.get("headers") else '',
'fingerprint={};'.format(plugin["fingerprint"]) if plugin.get("fingerprint") else '',
'skip-cert-verify={};'.format('true') if plugin.get("skip-cert-verify") == 1 else '',
'{};'.format('tls') if plugin.get("tls") == 1 else '',
plugin_dict.get("mode", ''),
'host={};'.format(plugin_dict["host"]) if plugin_dict.get("host") else '',
'path={};'.format(plugin_dict["path"]) if plugin_dict.get("path") else '',
'mux={};'.format(plugin_dict["mux"]) if str(plugin_dict.get("mux")).lower() in ['1', 'true'] else '',
'headers={};'.format(json.dumps(plugin_dict["headers"])) if plugin_dict.get("headers") else '',
'fingerprint={};'.format(plugin_dict["fingerprint"]) if plugin_dict.get("fingerprint") else '',
'skip-cert-verify={};'.format('true') if str(plugin_dict.get("skip-cert-verify")).lower() in ['1', 'true'] else '',
'{};'.format('tls') if str(plugin_dict.get("tls")).lower() in ['1', 'true'] else '',
)
node['plugin_opts'] = result_str

if data[5:].find('protocol') > -1:
smux = data[5:][data[5:].find('protocol'):]
smux_dict = parse_qs(smux.split('#')[0])
Expand All @@ -87,11 +96,13 @@ def parse(data):
node['multiplex']['min_streams'] = int(smux_dict['min-streams'])
if smux_dict.get('padding') == 'True':
node['multiplex']['padding'] = True
try: #fuck

try:
param = param.split('?')[0]
matcher = tool.b64Decode(param) #保留'/'测试能不能解码
matcher = tool.b64Decode(param)
except:
param = param.split('/')[0].split('?')[0] #不能解码说明'/'不是base64内容
param = param.split('/')[0].split('?')[0]

if param.find('@') > -1:
matcher = re.match(r'(.*?)@(.*):(.*)', param)
if matcher:
Expand Down Expand Up @@ -123,44 +134,54 @@ def parse(data):
node['server_port'] = matcher.group(4).split('&')[0]
else:
return None

node['server_port'] = int(re.search(r'\d+', node['server_port']).group())
param2 = data[5:]

if param2.find('shadow-tls') > -1:
flag = 1
if param2.find('&', param2.find('shadow-tls')) > -1:
plugin = tool.b64Decode(param2[param2.find('shadow-tls')+11:param2.find('&', param2.find('shadow-tls'))].split('#')[0]).decode('utf-8')
plugin_str = tool.b64Decode(param2[param2.find('shadow-tls')+11:param2.find('&', param2.find('shadow-tls'))].split('#')[0]).decode('utf-8')
else:
plugin = tool.b64Decode(param2[param2.find('shadow-tls')+11:].split('#')[0]).decode('utf-8')
plugin = eval(plugin.replace('true','True'))
plugin_str = tool.b64Decode(param2[param2.find('shadow-tls')+11:].split('#')[0]).decode('utf-8')

# 安全解析 JSON,弃用 eval
try:
plugin_dict = json.loads(plugin_str)
except:
return None

node['detour'] = node['tag']+'_shadowtls'
node_tls = {
'tag':node['detour'],
'type':'shadowtls',
'server':node['server'],
'server_port':node['server_port'],
'version':int(plugin.get('version', '1')),
'password':plugin.get('password', ''),
'tls':{
'tag': node['detour'],
'type': 'shadowtls',
'server': node['server'],
'server_port': node['server_port'],
'version': int(plugin_dict.get('version', '1')),
'password': plugin_dict.get('password', ''),
'tls': {
'enabled': True,
'server_name': plugin.get('host', '')
'server_name': plugin_dict.get('host', '')
}
}
if plugin.get('address'):
node_tls['server'] = plugin['address']
if plugin.get('port'):
node_tls['server_port'] = int(plugin['port'])
if plugin.get('fp'):
if plugin_dict.get('address'):
node_tls['server'] = plugin_dict['address']
if plugin_dict.get('port'):
node_tls['server_port'] = int(plugin_dict['port'])
if plugin_dict.get('fp'):
node_tls['tls']['utls']={
'enabled': True,
'fingerprint': plugin.get('fp')
'fingerprint': plugin_dict.get('fp')
}
del node['server']
del node['server_port']

if node['method'] == 'chacha20-poly1305':
node['method'] = 'chacha20-ietf-poly1305'
elif node['method'] == 'xchacha20-poly1305':
node['method'] = 'xchacha20-ietf-poly1305'

if flag:
return node,node_tls
else:
return node
return node
Loading