-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Expand file tree
/
Copy pathrsa_util.py
More file actions
160 lines (129 loc) · 4.68 KB
/
rsa_util.py
File metadata and controls
160 lines (129 loc) · 4.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# coding=utf-8
"""
@project: maxkb
@Author:虎
@file: rsa_util.py
@date:2023/11/3 11:13
@desc:
"""
import base64
import threading
from functools import lru_cache
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
from Crypto.PublicKey import RSA
from django.core import cache
from django.db.models import QuerySet
from common.cache.mem_cache import MemCache
from common.constants.cache_version import Cache_Version
from system_manage.models import SystemSetting, SettingType
rsa_message_cache = MemCache('rsa', {})
lock = threading.Lock()
rsa_cache = cache.cache
cache_key = "rsa_key"
# 对密钥加密的密码
secret_code = "mac_kb_password"
def generate():
"""
生成 私钥秘钥对
:return:{key:'公钥',value:'私钥'}
"""
# 生成一个 2048 位的密钥
key = RSA.generate(2048)
# 获取私钥
encrypted_key = key.export_key(passphrase=secret_code, pkcs=8,
protection="scryptAndAES128-CBC")
return {'key': key.publickey().export_key(), 'value': encrypted_key}
version, get_key = Cache_Version.SYSTEM.value
def get_key_pair():
rsa_value = rsa_cache.get(get_key(key='rsa_key'), version=version)
if rsa_value is None:
with lock:
rsa_value = rsa_cache.get(get_key(key='rsa_key'), version=version)
if rsa_value is not None:
return rsa_value
rsa_value = get_key_pair_by_sql()
rsa_cache.set(get_key(key='rsa_key'), rsa_value, timeout=None, version=version)
return rsa_value
def get_key_pair_by_sql():
system_setting = QuerySet(SystemSetting).filter(type=SettingType.RSA.value).first()
if system_setting is None:
kv = generate()
system_setting = SystemSetting(type=SettingType.RSA.value,
meta={'key': kv.get('key').decode(), 'value': kv.get('value').decode()})
system_setting.save()
return system_setting.meta
def encrypt(msg, public_key: str | None = None):
"""
加密
:param msg: 加密数据
:param public_key: 公钥
:return: 加密后的数据
"""
if public_key is None:
public_key = get_key_pair().get('key')
cipher = _get_encrypt_cipher(public_key)
encrypt_msg = cipher.encrypt(msg.encode("utf-8"))
return base64.b64encode(encrypt_msg).decode()
def decrypt(msg, pri_key: str | None = None):
"""
解密
:param msg: 需要解密的数据
:param pri_key: 私钥
:return: 解密后数据
"""
if pri_key is None:
pri_key = get_key_pair().get('value')
cipher = _get_cipher(pri_key)
decrypt_data = cipher.decrypt(base64.b64decode(msg), 0)
return decrypt_data.decode("utf-8")
@lru_cache(maxsize=2)
def _get_encrypt_cipher(public_key: str):
"""缓存加密 cipher 对象"""
return PKCS1_cipher.new(RSA.importKey(extern_key=public_key, passphrase=secret_code))
def rsa_long_encrypt(message, public_key: str | None = None, length=200):
"""
超长文本加密
:param message: 需要加密的字符串
:param public_key 公钥
:param length: 1024bit的证书用100, 2048bit的证书用 200
:return: 加密后的数据
"""
if public_key is None:
public_key = get_key_pair().get('key')
cipher = _get_encrypt_cipher(public_key)
if len(message) <= length:
result = base64.b64encode(cipher.encrypt(message.encode('utf-8')))
else:
rsa_text = []
for i in range(0, len(message), length):
cont = message[i:i + length]
rsa_text.append(cipher.encrypt(cont.encode('utf-8')))
cipher_text = b''.join(rsa_text)
result = base64.b64encode(cipher_text)
return result.decode()
@lru_cache(maxsize=2)
def _get_cipher(pri_key: str):
"""缓存 cipher 对象,避免重复创建"""
return PKCS1_cipher.new(RSA.importKey(pri_key, passphrase=secret_code))
def rsa_long_decrypt(message, pri_key: str | None = None, length=256):
"""
超长文本解密,优化内存使用
:param message: 需要解密的数据
:param pri_key: 秘钥
:param length : 1024bit的证书用128,2048bit证书用256位
:return: 解密后的数据
"""
result = rsa_message_cache.get(message)
if result is not None:
return result
if pri_key is None:
pri_key = get_key_pair().get('value')
cipher = _get_cipher(pri_key)
base64_de = base64.b64decode(message)
# 使用 bytearray 减少内存分配
result = bytearray()
for i in range(0, len(base64_de), length):
result.extend(cipher.decrypt(base64_de[i:i + length], 0))
r = result.decode()
rsa_message_cache.set(message, r, timeout=60 * 60 * 8)
return r