Skip to content
This repository was archived by the owner on Apr 5, 2026. It is now read-only.

Commit 974dc81

Browse files
committed
fix: single client per transport, relative throughput assertion
- Use ONE Telethon client per transport mode (get_me + all downloads) to avoid AuthKeyDuplicatedError from multiple concurrent auth keys - Add relative throughput check: fake-TLS must be >= 50% of obfs2 (catches DRS regression independent of CI network speed) - Keep absolute floor (0.5 MB/s default, configurable via env var) - Increase timeouts for slow CI networks (20MB: 300s, 100MB: 900s)
1 parent f0625a6 commit 974dc81

1 file changed

Lines changed: 65 additions & 130 deletions

File tree

tests/test_direct_e2e.py

Lines changed: 65 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
to verify sustained data transfer works correctly — catches DRS delay
1010
regressions that throttle large downloads.
1111
12+
Uses ONE client per transport mode to avoid AuthKeyDuplicatedError from
13+
Telegram seeing multiple connections with the same auth key.
14+
1215
Requires the TG_STRING_SESSION environment variable (Telethon StringSession).
1316
Skips gracefully when the secret is absent (fork PRs, external contributors).
1417
@@ -44,12 +47,12 @@
4447
}
4548

4649
# Per-file download timeouts (seconds).
47-
DOWNLOAD_TIMEOUTS = {"1mb": 60, "20mb": 180, "100mb": 600}
50+
DOWNLOAD_TIMEOUTS = {"1mb": 60, "20mb": 300, "100mb": 900}
4851

4952
# Minimum acceptable throughput (MB/s) for files >= 20 MB.
5053
# The DRS delay bug (v3.5.0) throttled downloads to ~2 MB/s.
51-
# This floor catches the regression while tolerating CI network variability.
52-
MIN_THROUGHPUT_MBPS = 3.0
54+
# Set conservatively — if even this floor is missed, something is broken.
55+
MIN_THROUGHPUT_MBPS = float(os.environ.get("MIN_THROUGHPUT_MBPS", "0.5"))
5356

5457

5558
def _patch_telethon_faketls():
@@ -95,9 +98,10 @@ def _writer_write_with_ccs(self, data, extra={}):
9598
async def _download_test_files(client, transport_label):
9699
"""Download pre-saved test files and verify integrity.
97100
98-
Returns True if all files downloaded and verified successfully.
101+
Returns dict of {label: throughput_mbps} for successful downloads,
102+
or None for the label if that download failed.
99103
"""
100-
all_ok = True
104+
throughputs = {}
101105
for label, info in TEST_FILES.items():
102106
timeout = DOWNLOAD_TIMEOUTS[label]
103107
print(f" [{transport_label}] downloading {label} "
@@ -129,16 +133,17 @@ async def _download_test_files(client, transport_label):
129133
except asyncio.TimeoutError:
130134
elapsed = time.monotonic() - start
131135
got = len(buf.getvalue())
136+
throughput = got / elapsed / 1048576 if elapsed > 0 else 0
132137
print(f" [{transport_label}] {label}: FAIL — "
133138
f"timed out after {elapsed:.0f}s "
134139
f"({got}/{info['size']} bytes, "
135-
f"{got / elapsed / 1048576:.2f} MB/s)")
136-
all_ok = False
140+
f"{throughput:.2f} MB/s)")
141+
throughputs[label] = None
137142
continue
138143
except Exception as e:
139144
print(f" [{transport_label}] {label}: FAIL — "
140145
f"{type(e).__name__}: {e}")
141-
all_ok = False
146+
throughputs[label] = None
142147
continue
143148

144149
elapsed = time.monotonic() - start
@@ -148,34 +153,34 @@ async def _download_test_files(client, transport_label):
148153
if len(data) != info["size"]:
149154
print(f" [{transport_label}] {label}: FAIL — "
150155
f"size mismatch: got {len(data)}, expected {info['size']}")
151-
all_ok = False
156+
throughputs[label] = None
152157
continue
153158

154159
if actual_hash != info["sha256"]:
155160
print(f" [{transport_label}] {label}: FAIL — "
156161
f"SHA256 mismatch")
157-
all_ok = False
162+
throughputs[label] = None
158163
continue
159164

160165
throughput_mb = len(data) / elapsed / 1048576
166+
throughputs[label] = throughput_mb
161167

162-
# Assert minimum throughput for large files to catch DRS throttling.
168+
# Assert minimum throughput for large files.
163169
if info["size"] >= 20 * 1048576 and throughput_mb < MIN_THROUGHPUT_MBPS:
164170
print(f" [{transport_label}] {label}: FAIL — "
165171
f"throughput {throughput_mb:.2f} MB/s < "
166-
f"{MIN_THROUGHPUT_MBPS} MB/s minimum "
167-
f"(DRS delay regression?)")
168-
all_ok = False
172+
f"{MIN_THROUGHPUT_MBPS} MB/s minimum")
173+
throughputs[label] = None
169174
continue
170175

171176
print(f" [{transport_label}] {label}: OK — "
172-
f"{elapsed:.1f}s, {throughput_mb:.1f} MB/s")
177+
f"{elapsed:.1f}s, {throughput_mb:.2f} MB/s")
173178

174-
return all_ok
179+
return throughputs
175180

176181

177-
async def test_obfs2_get_me(host, port, secret, session_str):
178-
"""Connect via obfuscated2 (dd-prefix) and call get_me()."""
182+
async def test_obfs2_all(host, port, secret, session_str):
183+
"""Run all obfs2 tests with a single client connection."""
179184
from telethon import TelegramClient
180185
from telethon.network.connection import (
181186
ConnectionTcpMTProxyRandomizedIntermediate,
@@ -196,67 +201,35 @@ async def test_obfs2_get_me(host, port, secret, session_str):
196201
await asyncio.wait_for(client.connect(), timeout=30)
197202
if not client.is_connected():
198203
print("[obfs2] FAIL: client did not connect")
199-
return False
204+
return False, {}
200205

201206
me = await asyncio.wait_for(client.get_me(), timeout=15)
202207
if me is None:
203208
print("[obfs2] FAIL: get_me() returned None")
204-
return False
205-
206-
print(f"[obfs2] OK: get_me() returned user_id={me.id}")
207-
return True
208-
except Exception as e:
209-
print(f"[obfs2] FAIL: {type(e).__name__}: {e}")
210-
return False
211-
finally:
212-
try:
213-
await client.disconnect()
214-
except Exception:
215-
pass
209+
return False, {}
216210

211+
print(f"[obfs2] get_me OK: user_id={me.id}")
217212

218-
async def test_obfs2_media_download(host, port, secret, session_str):
219-
"""Download pre-saved files through obfuscated2 proxy."""
220-
from telethon import TelegramClient
221-
from telethon.network.connection import (
222-
ConnectionTcpMTProxyRandomizedIntermediate,
223-
)
224-
from telethon.sessions import StringSession
225-
226-
print(f"[obfs2-media] Connecting to {host}:{port} ...", flush=True)
227-
228-
client = TelegramClient(
229-
StringSession(session_str),
230-
api_id=API_ID,
231-
api_hash=API_HASH,
232-
connection=ConnectionTcpMTProxyRandomizedIntermediate,
233-
proxy=(host, port, "dd" + secret),
234-
)
235-
236-
try:
237-
await asyncio.wait_for(client.connect(), timeout=30)
238-
if not client.is_connected():
239-
print("[obfs2-media] FAIL: client did not connect")
240-
return False
241-
242-
return await _download_test_files(client, "obfs2-media")
213+
throughputs = await _download_test_files(client, "obfs2")
214+
has_failure = any(v is None for v in throughputs.values())
215+
return not has_failure, throughputs
243216
except Exception as e:
244-
print(f"[obfs2-media] FAIL: {type(e).__name__}: {e}")
245-
return False
217+
print(f"[obfs2] FAIL: {type(e).__name__}: {e}")
218+
return False, {}
246219
finally:
247220
try:
248221
await client.disconnect()
249222
except Exception:
250223
pass
251224

252225

253-
async def test_faketls_get_me(host, port, secret, domain, session_str):
254-
"""Connect via fake-TLS (ee-prefix) and call get_me()."""
226+
async def test_faketls_all(host, port, secret, domain, session_str):
227+
"""Run all fake-TLS tests with a single client connection."""
255228
try:
256229
from TelethonFakeTLS import ConnectionTcpMTProxyFakeTLS
257230
except ImportError:
258231
print("[fake-tls] SKIP: TelethonFakeTLS not installed")
259-
return True
232+
return True, {}
260233

261234
from telethon import TelegramClient
262235
from telethon.sessions import StringSession
@@ -279,60 +252,21 @@ async def test_faketls_get_me(host, port, secret, domain, session_str):
279252
await asyncio.wait_for(client.connect(), timeout=30)
280253
if not client.is_connected():
281254
print("[fake-tls] FAIL: client did not connect")
282-
return False
255+
return False, {}
283256

284257
me = await asyncio.wait_for(client.get_me(), timeout=15)
285258
if me is None:
286259
print("[fake-tls] FAIL: get_me() returned None")
287-
return False
288-
289-
print(f"[fake-tls] OK: get_me() returned user_id={me.id}")
290-
return True
291-
except Exception as e:
292-
print(f"[fake-tls] FAIL: {type(e).__name__}: {e}")
293-
return False
294-
finally:
295-
try:
296-
await client.disconnect()
297-
except Exception:
298-
pass
299-
260+
return False, {}
300261

301-
async def test_faketls_media_download(host, port, secret, domain, session_str):
302-
"""Download pre-saved files through fake-TLS proxy (exercises DRS)."""
303-
try:
304-
from TelethonFakeTLS import ConnectionTcpMTProxyFakeTLS
305-
except ImportError:
306-
print("[fake-tls-media] SKIP: TelethonFakeTLS not installed")
307-
return True
262+
print(f"[fake-tls] get_me OK: user_id={me.id}")
308263

309-
from telethon import TelegramClient
310-
from telethon.sessions import StringSession
311-
312-
_patch_telethon_faketls()
313-
314-
proxy_secret = secret + domain.encode().hex()
315-
print(f"[fake-tls-media] Connecting to {host}:{port} (domain={domain}) ...",
316-
flush=True)
317-
318-
client = TelegramClient(
319-
StringSession(session_str),
320-
api_id=API_ID,
321-
api_hash=API_HASH,
322-
connection=ConnectionTcpMTProxyFakeTLS,
323-
proxy=(host, port, proxy_secret),
324-
)
325-
326-
try:
327-
await asyncio.wait_for(client.connect(), timeout=30)
328-
if not client.is_connected():
329-
print("[fake-tls-media] FAIL: client did not connect")
330-
return False
331-
332-
return await _download_test_files(client, "fake-tls-media")
264+
throughputs = await _download_test_files(client, "fake-tls")
265+
has_failure = any(v is None for v in throughputs.values())
266+
return not has_failure, throughputs
333267
except Exception as e:
334-
print(f"[fake-tls-media] FAIL: {type(e).__name__}: {e}")
335-
return False
268+
print(f"[fake-tls] FAIL: {type(e).__name__}: {e}")
269+
return False, {}
336270
finally:
337271
try:
338272
await client.disconnect()
@@ -356,38 +290,39 @@ def main():
356290
tls_port = int(os.environ.get("DIRECT_TLS_PORT", "9443"))
357291
domain = os.environ.get("EE_DOMAIN", "ya.ru")
358292

359-
results = []
360-
361293
print("=== Direct Mode E2E Tests ===\n")
362294

363-
# Test 1: obfuscated2 get_me
364-
ok = asyncio.run(test_obfs2_get_me(host, obfs2_port, secret, session_str))
365-
results.append(("obfs2-get_me", ok))
366-
print()
367-
368-
# Test 2: fake-TLS get_me
369-
ok = asyncio.run(
370-
test_faketls_get_me(host, tls_port, secret, domain, session_str)
295+
# Test 1: obfuscated2 (control — no DRS delays)
296+
obfs2_ok, obfs2_tp = asyncio.run(
297+
test_obfs2_all(host, obfs2_port, secret, session_str)
371298
)
372-
results.append(("fake-tls-get_me", ok))
373299
print()
374300

375-
# Test 3: obfuscated2 media download (control — no DRS delays)
376-
ok = asyncio.run(
377-
test_obfs2_media_download(host, obfs2_port, secret, session_str)
301+
# Test 2: fake-TLS (exercises DRS delays)
302+
tls_ok, tls_tp = asyncio.run(
303+
test_faketls_all(host, tls_port, secret, domain, session_str)
378304
)
379-
results.append(("obfs2-media", ok))
380-
print()
381305

382-
# Test 4: fake-TLS media download (exercises DRS delays)
383-
ok = asyncio.run(
384-
test_faketls_media_download(host, tls_port, secret, domain, session_str)
385-
)
386-
results.append(("fake-tls-media", ok))
306+
# Compare throughputs: fake-TLS must not be dramatically slower than obfs2.
307+
# If fake-TLS is < 50% of obfs2 for the same file, DRS delays are the cause.
308+
print("\n=== Throughput Comparison ===")
309+
comparison_ok = True
310+
for label in TEST_FILES:
311+
o = obfs2_tp.get(label)
312+
t = tls_tp.get(label)
313+
if o is None or t is None:
314+
continue
315+
ratio = t / o if o > 0 else 0
316+
status = "OK" if ratio >= 0.5 else "FAIL"
317+
if status == "FAIL":
318+
comparison_ok = False
319+
print(f" {label}: obfs2={o:.2f} MB/s, fake-tls={t:.2f} MB/s, "
320+
f"ratio={ratio:.0%} {status}")
387321

388322
print("\n=== Results ===")
389323
all_ok = True
390-
for name, ok in results:
324+
for name, ok in [("obfs2", obfs2_ok), ("fake-tls", tls_ok),
325+
("tls-vs-obfs2-ratio", comparison_ok)]:
391326
status = "PASS" if ok else "FAIL"
392327
print(f" {name}: {status}")
393328
if not ok:

0 commit comments

Comments
 (0)