-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathutilities.py
More file actions
352 lines (301 loc) · 14.4 KB
/
utilities.py
File metadata and controls
352 lines (301 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
#################################################
#
# Note to Developer
#
# The functions in this file prefixed with an underscore
# are intended for internal use only.
#
#################################################
import base64
import binascii
import datetime
import hashlib
import hmac
import json
import os
import secrets
import sys
import time
import requests
from .pgpy import PGPMessage, PGPKey
from .pgpy.constants import HashAlgorithm, SymmetricKeyAlgorithm, CompressionAlgorithm, KeyFlags
def _encrypt_file_part(file, server_secret, client_secret, path=True):
"""
Encrypts a given file part for uploading to SendSafely.
:param file: The path of the file (as a String) or a file as bytes. Set path param to False if using bytes.
:param server_secret: The server secret, may be obtained through using
SendSafely.get_package_information(package_id)
:param client_secret: The client_secret (a.k.a. keycode) used to ensure only the holders of the link
are able to decrypt.
:return: The encrypted file (as bytes)
"""
passphrase = server_secret + client_secret
message = PGPMessage.new(
file=path,
message=file,
compression=CompressionAlgorithm.Uncompressed)
cipher_message = message.encrypt(passphrase=passphrase,
cipher=SymmetricKeyAlgorithm.AES256,
hash=HashAlgorithm.SHA256)
return cipher_message.__bytes__()
def _generate_keycode():
"""
Generates client_secret that is used for encrypting files
as well as keycode for finalize package
:returns: A random 256-bit alphanumerical string in hex
"""
keycode = secrets.token_bytes(32)
return _make_safe_for_urlsafebase64(str(base64.urlsafe_b64encode(keycode).decode('utf-8')))
def make_headers(api_secret, api_key, endpoint, request_body=None):
"""
Makes headers used for secure requests against the SendSafely API
:param api_secret: Your API secret, from the SendSafely handler.
:param api_key: Your API_KEY, from the SendSafely handler.
:param endpoint: Everything after the Fully Qualified Domain Name.
:param request_body: The request body. If you're passing in a JSON object, make sure you wrap it in json.dumps()
:return: The headers appropriate for the HTTP Request you're about to make.
"""
if request_body is None:
request_body = ""
timestamp = (datetime.datetime.utcnow().isoformat())[0:19] + '+0000'
endpoint = "/api/v2.0" + endpoint
message_string = api_key + endpoint + timestamp + str(request_body)
signature = _sign_message(api_secret, message_string)
headers = {
'ss-api-key': api_key,
'ss-request-timestamp': timestamp,
'ss-request-signature': signature,
'ss-request-api': "PYTHON_API"
}
return headers
def _sign_message(api_secret, message_string):
"""
Signs a message to ensure the server knows it's us that made the request.
:param api_secret: Your API secret, obtained from creating a new API Key+secret in the edit profile page.
:param message_string: The message we're signing.
:return:
"""
secret = bytes(api_secret, 'utf-8')
signature = hmac.new(secret, bytes(message_string, 'utf-8'), digestmod=hashlib.sha256).hexdigest()
return signature
def _encrypt_message(message_to_encrypt, server_secret, client_secret):
"""
Encrypts a message (from a String)
:param message_to_encrypt: The message we're encrypting
:param server_secret: The server secret, obtained by inspecting a package
:param client_secret: The client secret, obtained by inspecting a package
:return: The encrypted message
"""
passphrase = server_secret + client_secret
message = PGPMessage.new(message_to_encrypt, compression=CompressionAlgorithm.Uncompressed)
cipher_message = message.encrypt(passphrase=passphrase, cipher=SymmetricKeyAlgorithm.AES256,
hash=HashAlgorithm.SHA256)
return base64.b64encode(bytes(cipher_message)).decode('utf-8')
def _inject_encryption_flags(user, key_flags=True, hash=True, symmetric=True, compression=True):
if not key_flags:
user.selfsig._signature.subpackets.addnew('KeyFlags', hashed=True,
flags={KeyFlags.EncryptCommunications,
KeyFlags.EncryptStorage})
user.selfsig._signature.subpackets['h_KeyFlags'] = user.selfsig._signature.subpackets['KeyFlags'][0]
if not hash:
user.selfsig._signature.subpackets.addnew('PreferredHashAlgorithms', hashed=True, flags=[HashAlgorithm.SHA256])
if not symmetric:
user.selfsig._signature.subpackets.addnew('PreferredSymmetricAlgorithms', hashed=True,
flags=[SymmetricKeyAlgorithm.AES256])
if not compression:
user.selfsig._signature.subpackets.addnew('PreferredCompressionAlgorithms', hashed=True,
flags=[CompressionAlgorithm.Uncompressed])
def _enforce_encryption_flags(user):
# https://github.com/SecurityInnovation/PGPy/issues/257
# PGPY requires KeyFlags.EncryptCommunications and KeyFlags.EncryptStorage for public key to encrypt
# which we are not setting in our current APIs
# the following code injects the require attributes to the public key signature to bypass PGPY check
has_key_flags, has_hash, has_symmetric, has_compression = True, True, True, True
key_flags = user.selfsig._signature.subpackets['h_KeyFlags']
hash = user.selfsig._signature.subpackets['h_PreferredHashAlgorithms']
symmetric = user.selfsig._signature.subpackets['h_PreferredSymmetricAlgorithms']
compression = user.selfsig._signature.subpackets['h_PreferredCompressionAlgorithms']
if (len(key_flags) > 0 and len(hash) > 0 and len(symmetric) > 0 and len(compression) > 0):
key_flags, hash, symmetric, compression = key_flags[0], hash[0], symmetric[0], compression[0]
else:
_inject_encryption_flags(user, False, False, False, False)
return
if not (KeyFlags.EncryptStorage in key_flags.__flags__ and KeyFlags.EncryptCommunications in key_flags.__flags__):
has_key_flags = False
if not HashAlgorithm.SHA256 in hash.__flags__:
has_hash = False
if not SymmetricKeyAlgorithm.AES256 in symmetric.__flags__:
has_symmetric = False
if not CompressionAlgorithm.Uncompressed in compression.__flags__:
has_compression = False
_inject_encryption_flags(user, has_key_flags, has_hash, has_symmetric, has_compression)
return user
def _encrypt_keycode(keycode, public_key):
"""
Encrypts a keycode with a public key
:param keycode
:param public_key
:return: The encrypted keycode
"""
key_pair = PGPKey.from_blob(public_key)[0]
user = None
if key_pair.is_primary:
if user is not None:
user = key_pair.get_uid(user)
else:
user = next(iter(key_pair.userids))
if user is not None:
_enforce_encryption_flags(user)
message = PGPMessage.new(keycode, compression=CompressionAlgorithm.Uncompressed,
cipher=SymmetricKeyAlgorithm.AES256,
hash=HashAlgorithm.SHA256)
cipher_message = key_pair.encrypt(message)
return str(cipher_message)
def _decrypt_message(message_to_decrypt, server_secret, client_secret):
"""
Decrypts a message
:param message_to_decrypt: The string you'd like decrypted.
:param server_secret: The server secret, obtained by inspecting a package
:param client_secret: The client_secret (a.k.a. keycode) used to ensure only the holders of the link
are able to decrypt.
:return: The decrypted message.
"""
passphrase = server_secret + client_secret
message_bytes = base64.b64decode(bytes(message_to_decrypt, 'utf-8'))
pgpmessage = PGPMessage.from_blob(message_bytes)
decrypted = pgpmessage.decrypt(passphrase=passphrase).message
return decrypted
def _upload_file_part_to_s3(encrypted_file_part, url):
"""
Upload a file/part of a file to the Amazon S3 Bucket used by SendSafely
Body must ONLY include file in binary format
Content-Type must NOT be specified
:param encrypted_file_part: Part of a file to upload to S3. Must not exceed 2621440 Bytes.
:param url: The S3 URL we're uploading to
:return: The JSON response from S3.
"""
max_attempts = 5
max_backoff_seconds = 180
for attempt in range(max_attempts):
try:
return requests.put(url=url, data=encrypted_file_part)
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout,
requests.exceptions.ChunkedEncodingError) as e:
if attempt == max_attempts - 1:
raise
sleep_seconds = min(2 ** attempt, max_backoff_seconds)
sys.stdout.write(
'\nRetrying S3 part upload (attempt {0}/{1}) after {2}: sleeping {3}s\n'.format(
attempt + 2, max_attempts, type(e).__name__, sleep_seconds))
sys.stdout.flush()
time.sleep(sleep_seconds)
def _calculate_package_checksum(package_code, keycode):
"""
Calculates the checksum of a package using keycode (Client Secret) and Package Code
Checksum is generated using PBKDF2-HMAC-SHA256 with keycode as the password, and Package Code as salt.
:param keycode: Use the keycode as password
:param package_code: Use the package code as salt
:returns: The calculated checksum
"""
password = bytes(keycode, 'utf-8')
salt = bytes(package_code, 'utf-8')
checksum = binascii.hexlify(hashlib.pbkdf2_hmac('sha256', password, salt, 1024))
return {'checksum': checksum.decode('utf-8')}
def _get_upload_urls(package, file_id, part=1):
"""
Retrieves the S3 upload URLs from SendSafely
:param file_id: The file_id (string) we're querying for, as there may be many files in a single package.
May retrieve file_id from SendSafely.get_package function
:param part: The part index (int) to start from.
:return: the URLs, as a list.
"""
sendsafely = package.sendsafely
endpoint = "/package/" + package.package_id + "/file/" + file_id + "/upload-urls/"
url = sendsafely.BASE_URL + endpoint
body = {'part': part}
headers = make_headers(sendsafely.API_SECRET, sendsafely.API_KEY, endpoint, request_body=json.dumps(body))
upload_urls = requests.post(url=url, json=body, headers=headers).json()["uploadUrls"]
return upload_urls
def _update_file_completion_status(package, file_id, directory_id=None, complete=False):
"""
Sets the file upload status as complete, the server will verify if all segments have been uploaded
:param file_id: The ID (string) of the file we're updating (must be associated with the package_id from previously)
:param complete: Whether the file is complete or not (boolean).
:return: The response from SendSafely (JSON)
"""
endpoint = '/package/' + package.package_id + '/file/' + file_id + '/upload-complete'
url = package.sendsafely.BASE_URL + endpoint
body = {'complete': complete}
if directory_id is not None:
body['directoryId'] = directory_id
headers = make_headers(package.sendsafely.API_SECRET, package.sendsafely.API_KEY, endpoint, request_body=json.dumps(body))
return requests.post(url=url, json=body, headers=headers).json()
def _get_download_urls(package, file_id, directory_id=None, start=1, end=25):
checksum = _calculate_package_checksum(package_code=package.package_code, keycode=package.client_secret)
sendsafely = package.sendsafely
endpoint = "/package/" + package.package_id + "/file/" + file_id + "/download-urls"
if directory_id:
endpoint = "/package/" + package.package_id + "/directory/" + directory_id + "/file/" + file_id + "/download-urls"
url = sendsafely.BASE_URL + endpoint
body = {
"checksum": checksum["checksum"],
"startSegment": start,
"endSegment": end
}
headers = make_headers(sendsafely.API_SECRET, sendsafely.API_KEY, endpoint, request_body=json.dumps(body))
response = requests.post(url=url, json=body, headers=headers).json()
return response["downloadUrls"]
def get_request(sendsafely, endpoint):
url = sendsafely.BASE_URL + endpoint
headers = make_headers(sendsafely.API_SECRET, sendsafely.API_KEY, endpoint)
response = requests.get(url, headers=headers).json()
return response
def delete_request(sendsafely, endpoint):
url = sendsafely.BASE_URL + endpoint
headers = make_headers(sendsafely.API_SECRET, sendsafely.API_KEY, endpoint)
response = requests.delete(url, headers=headers).json()
return response
def post_request(sendsafely, endpoint, body):
url = sendsafely.BASE_URL + endpoint
headers = make_headers(sendsafely.API_SECRET, sendsafely.API_KEY, endpoint, request_body=json.dumps(body))
response = requests.post(url, json=body, headers=headers).json()
return response
def put_request(sendsafely, endpoint, body):
url = sendsafely.BASE_URL + endpoint
headers = make_headers(sendsafely.API_SECRET, sendsafely.API_KEY, endpoint, request_body=json.dumps(body))
response = requests.put(url, json=body, headers=headers).json()
return response
def patch_request(sendsafely, endpoint, body):
url = sendsafely.BASE_URL + endpoint
headers = make_headers(sendsafely.API_SECRET, sendsafely.API_KEY, endpoint, request_body=json.dumps(body))
response = requests.patch(url, json=body, headers=headers).json()
return response
def _get_string_from_file(filename):
if os.path.exists(os.path.dirname(filename)):
with open(filename) as f:
result = f.readline()
return result
return filename
def _pretty_print(json_response):
json.dumps(json_response, indent=2)
json_object = json.loads(json_response)
pretty = json.dumps(json_object, indent=2)
print(pretty)
def save_key_pair(key_id, key_pair, path_to_save):
file = open(path_to_save, "w+")
information = {
"publicKeyId": key_id,
"privateKey": str(key_pair)
}
file.write(json.dumps(information))
file.close()
def read_key_pair(path):
file = open(path, "r+")
data = json.load(file)
file.close()
return data
def _make_safe_for_urlsafebase64(client_secret):
client_secret = client_secret.replace("=", "")
client_secret = client_secret.replace("+", "-")
client_secret = client_secret.replace("/", "_")
return client_secret