|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""E2E tests for MTProxy direct mode. |
| 3 | +
|
| 4 | +Starts a real Telethon session through a direct-mode MTProxy and calls |
| 5 | +get_me() to verify the full data path works. Tests both obfuscated2 |
| 6 | +(dd-prefix) and fake-TLS (ee-prefix) transport modes. |
| 7 | +
|
| 8 | +Requires the TG_STRING_SESSION environment variable (Telethon StringSession). |
| 9 | +Skips gracefully when the secret is absent (fork PRs, external contributors). |
| 10 | +
|
| 11 | +Usage: |
| 12 | + TG_STRING_SESSION=... MTPROXY_SECRET=... python3 tests/test_direct_e2e.py |
| 13 | +
|
| 14 | +Environment variables: |
| 15 | + TG_STRING_SESSION Telethon StringSession string (required) |
| 16 | + MTPROXY_SECRET 32-char hex proxy secret (required) |
| 17 | + DIRECT_HOST Proxy hostname (default: localhost) |
| 18 | + DIRECT_OBFS2_PORT Obfuscated2 proxy port (default: 8443) |
| 19 | + DIRECT_TLS_PORT Fake-TLS proxy port (default: 9443) |
| 20 | + EE_DOMAIN Domain for fake-TLS mode (default: ya.ru) |
| 21 | +""" |
| 22 | + |
| 23 | +import asyncio |
| 24 | +import os |
| 25 | +import sys |
| 26 | + |
| 27 | +# Official Telegram macOS client credentials (public, well-known). |
| 28 | +API_ID = 2834 |
| 29 | +API_HASH = "68875f756c9b437a8b916ca3de215815" |
| 30 | + |
| 31 | + |
| 32 | +def _patch_telethon_faketls(): |
| 33 | + """Patch TelethonFakeTLS bugs. |
| 34 | +
|
| 35 | + 1. read_server_hello: upstream only reads the first encrypted record, |
| 36 | + but the proxy computes HMAC over all records. |
| 37 | + 2. FakeTLSStreamWriter: upstream never sends CCS (ChangeCipherSpec) |
| 38 | + before the first data record, but the proxy requires it. |
| 39 | + """ |
| 40 | + import TelethonFakeTLS.FakeTLS.TLSInOut as tls_io |
| 41 | + |
| 42 | + async def _read_server_hello(self): |
| 43 | + buf = bytearray(await self.upstream.readexactly(133)) |
| 44 | + while True: |
| 45 | + try: |
| 46 | + header = await asyncio.wait_for( |
| 47 | + self.upstream.readexactly(5), timeout=0.5 |
| 48 | + ) |
| 49 | + except (asyncio.TimeoutError, EOFError): |
| 50 | + break |
| 51 | + buf += header |
| 52 | + if header[:3] != b"\x17\x03\x03": |
| 53 | + break |
| 54 | + rec_len = int.from_bytes(header[3:5], "big") |
| 55 | + buf += await self.upstream.readexactly(rec_len) |
| 56 | + return bytes(buf) |
| 57 | + |
| 58 | + tls_io.FakeTLSStreamReader.read_server_hello = _read_server_hello |
| 59 | + |
| 60 | + _orig_write = tls_io.FakeTLSStreamWriter.write |
| 61 | + _ccs_sent_writers = set() |
| 62 | + |
| 63 | + def _writer_write_with_ccs(self, data, extra={}): |
| 64 | + if id(self) not in _ccs_sent_writers: |
| 65 | + _ccs_sent_writers.add(id(self)) |
| 66 | + self.upstream.write(b"\x14\x03\x03\x00\x01\x01") |
| 67 | + return _orig_write(self, data, extra) |
| 68 | + |
| 69 | + tls_io.FakeTLSStreamWriter.write = _writer_write_with_ccs |
| 70 | + |
| 71 | + |
| 72 | +async def test_obfs2_get_me(host, port, secret, session_str): |
| 73 | + """Connect via obfuscated2 (dd-prefix) and call get_me().""" |
| 74 | + from telethon import TelegramClient |
| 75 | + from telethon.network.connection import ( |
| 76 | + ConnectionTcpMTProxyRandomizedIntermediate, |
| 77 | + ) |
| 78 | + from telethon.sessions import StringSession |
| 79 | + |
| 80 | + print(f"[obfs2] Connecting to {host}:{port} ...", flush=True) |
| 81 | + |
| 82 | + client = TelegramClient( |
| 83 | + StringSession(session_str), |
| 84 | + api_id=API_ID, |
| 85 | + api_hash=API_HASH, |
| 86 | + connection=ConnectionTcpMTProxyRandomizedIntermediate, |
| 87 | + proxy=(host, port, "dd" + secret), |
| 88 | + ) |
| 89 | + |
| 90 | + try: |
| 91 | + await asyncio.wait_for(client.connect(), timeout=30) |
| 92 | + if not client.is_connected(): |
| 93 | + print("[obfs2] FAIL: client did not connect") |
| 94 | + return False |
| 95 | + |
| 96 | + me = await asyncio.wait_for(client.get_me(), timeout=15) |
| 97 | + if me is None: |
| 98 | + print("[obfs2] FAIL: get_me() returned None") |
| 99 | + return False |
| 100 | + |
| 101 | + print(f"[obfs2] OK: get_me() returned user_id={me.id}") |
| 102 | + return True |
| 103 | + except Exception as e: |
| 104 | + print(f"[obfs2] FAIL: {type(e).__name__}: {e}") |
| 105 | + return False |
| 106 | + finally: |
| 107 | + try: |
| 108 | + await client.disconnect() |
| 109 | + except Exception: |
| 110 | + pass |
| 111 | + |
| 112 | + |
| 113 | +async def test_faketls_get_me(host, port, secret, domain, session_str): |
| 114 | + """Connect via fake-TLS (ee-prefix) and call get_me().""" |
| 115 | + try: |
| 116 | + from TelethonFakeTLS import ConnectionTcpMTProxyFakeTLS |
| 117 | + except ImportError: |
| 118 | + print("[fake-tls] SKIP: TelethonFakeTLS not installed") |
| 119 | + return True |
| 120 | + |
| 121 | + from telethon import TelegramClient |
| 122 | + from telethon.sessions import StringSession |
| 123 | + |
| 124 | + _patch_telethon_faketls() |
| 125 | + |
| 126 | + proxy_secret = secret + domain.encode().hex() |
| 127 | + print(f"[fake-tls] Connecting to {host}:{port} (domain={domain}) ...", |
| 128 | + flush=True) |
| 129 | + |
| 130 | + client = TelegramClient( |
| 131 | + StringSession(session_str), |
| 132 | + api_id=API_ID, |
| 133 | + api_hash=API_HASH, |
| 134 | + connection=ConnectionTcpMTProxyFakeTLS, |
| 135 | + proxy=(host, port, proxy_secret), |
| 136 | + ) |
| 137 | + |
| 138 | + try: |
| 139 | + await asyncio.wait_for(client.connect(), timeout=30) |
| 140 | + if not client.is_connected(): |
| 141 | + print("[fake-tls] FAIL: client did not connect") |
| 142 | + return False |
| 143 | + |
| 144 | + me = await asyncio.wait_for(client.get_me(), timeout=15) |
| 145 | + if me is None: |
| 146 | + print("[fake-tls] FAIL: get_me() returned None") |
| 147 | + return False |
| 148 | + |
| 149 | + print(f"[fake-tls] OK: get_me() returned user_id={me.id}") |
| 150 | + return True |
| 151 | + except Exception as e: |
| 152 | + print(f"[fake-tls] FAIL: {type(e).__name__}: {e}") |
| 153 | + return False |
| 154 | + finally: |
| 155 | + try: |
| 156 | + await client.disconnect() |
| 157 | + except Exception: |
| 158 | + pass |
| 159 | + |
| 160 | + |
| 161 | +def main(): |
| 162 | + session_str = os.environ.get("TG_STRING_SESSION", "") |
| 163 | + if not session_str: |
| 164 | + print("SKIP: TG_STRING_SESSION not set (secrets not available)") |
| 165 | + sys.exit(0) |
| 166 | + |
| 167 | + secret = os.environ.get("MTPROXY_SECRET", "") |
| 168 | + if not secret: |
| 169 | + print("ERROR: MTPROXY_SECRET required") |
| 170 | + sys.exit(1) |
| 171 | + |
| 172 | + host = os.environ.get("DIRECT_HOST", "localhost") |
| 173 | + obfs2_port = int(os.environ.get("DIRECT_OBFS2_PORT", "8443")) |
| 174 | + tls_port = int(os.environ.get("DIRECT_TLS_PORT", "9443")) |
| 175 | + domain = os.environ.get("EE_DOMAIN", "ya.ru") |
| 176 | + |
| 177 | + results = [] |
| 178 | + |
| 179 | + print("=== Direct Mode E2E Tests ===\n") |
| 180 | + |
| 181 | + # Test 1: obfuscated2 |
| 182 | + ok = asyncio.run(test_obfs2_get_me(host, obfs2_port, secret, session_str)) |
| 183 | + results.append(("obfs2", ok)) |
| 184 | + print() |
| 185 | + |
| 186 | + # Test 2: fake-TLS |
| 187 | + ok = asyncio.run( |
| 188 | + test_faketls_get_me(host, tls_port, secret, domain, session_str) |
| 189 | + ) |
| 190 | + results.append(("fake-tls", ok)) |
| 191 | + |
| 192 | + print("\n=== Results ===") |
| 193 | + all_ok = True |
| 194 | + for name, ok in results: |
| 195 | + status = "PASS" if ok else "FAIL" |
| 196 | + print(f" {name}: {status}") |
| 197 | + if not ok: |
| 198 | + all_ok = False |
| 199 | + |
| 200 | + sys.exit(0 if all_ok else 1) |
| 201 | + |
| 202 | + |
| 203 | +if __name__ == "__main__": |
| 204 | + main() |
0 commit comments