-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcrypto.py
More file actions
133 lines (91 loc) · 4.89 KB
/
crypto.py
File metadata and controls
133 lines (91 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
import os
import sys
class crypto:
def __init__(self):
self.backend = default_backend()
#For creating RSA key pair
def rsa_key_pair(self):
private_key = rsa.generate_private_key(public_exponent=65537,
key_size=2048,backend=self.backend)
public_key = private_key.public_key()
return public_key, private_key
#For loading RSA keys for usage
def public_key_load(self, public_key_pem):
public_key = serialization.load_pem_public_key(public_key_pem.read(),
backend=self.backend)
return public_key
def private_key_load(self, private_key_pem):
private_key = serialization.load_pem_public_key(private_key_pem.read(),
backend=self.backend)
return private_key
#Need to pass peer public key for key generation
def diffie_hellman(self, peer_public_key):
parameters = dh.generate_parameters(generator=2, key_size=2048,backend=default_backend())
# A new private key for each exchange
priv_key_DH=parameters.generate_private_key()
#Pass the public of the peer
session_key= priv_key_DH.exchange(peer_public_key)
derived_key = HKDF(algorithm=hashes.SHA256(),length=32,salt=None,info=b'handshake',backend=default_backend()).derive(session_key)
# Encrypt the message using the public key of the destination
def rsa_encryption(self,public_key,message):
ciphertext = public_key.encrypt(message,padding.OAEP(mgf = padding.MGF1(algorithm=hashes.SHA256()),
algorithm = hashes.SHA256(),label = None))
return ciphertext
# Decrypt the message using the private key of the receiver
def rsa_decryption(self,private_key,ciphertext):
plaintext = private_key.decrypt(ciphertext,padding.OAEP(mgf = padding.MGF1(algorithm=hashes.SHA256()),
algorithm = hashes.SHA256(),label = None))
return plaintext
# AES Symmetric encryption
def symmetric_encryption(self, sym_key, iv, payload, ad):
ciphers = Cipher(algorithms.AES(sym_key), mode=modes.GCM(iv),
backend=self.backend).encryptor()
encryptor=ciphers.encryptor
encryptor.authenticate_additional_data(ad)
ciphertext = encryptor.update(payload) + encryptor.finalize()
return encryptor.tag, ciphertext
# AES Symmetric encryption
def symmetric_decryption(self, sym_key, iv, payload, tag, ad):
ciphers = Cipher(algorithms.AES(sym_key),mode= modes.GCM(iv, tag),
backend=self.backend).decryptor()
decryptor=ciphers.decryptor
decryptor.authenticate_additional_data(ad)
plain_text = decryptor.update(payload) + decryptor.finalize()
return plain_text
#function to serialize private key
def private_key(self,key_file):
try:
private_key_serial = serialization.load_pem_private_key(key_file.read(),password=None, backend=default_backend())
except:
print "Key format not supported,(My developer is lazy)"
print "Supported key format: PEM"
sys.exit(1)
return private_key_serial
#function to serialize public key
def public_key(self,key_file):
try:
public_key_serial = serialization.load_pem_public_key(key_file.read(), backend=default_backend())
except:
try:
print "Key format not supported, (My developer is lazy)"
print "Supported key formats: PEM"
sys.exit(1)
except:
return public_key_serial
def sign(self,private_key_sender,message): #pass the private key of the sender
signature = private_key_sender.sign(message,padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH),hashes.SHA256())
return signature
#Pass the public key and signature of the sender
# Check if the signature tag is required
def verify(self,public_key_receiver,signature):
public_key_receiver.verify(signature,padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH),hashes.SHA256())