-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Expand file tree
/
Copy pathcommon.py
More file actions
87 lines (71 loc) · 2.92 KB
/
common.py
File metadata and controls
87 lines (71 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: common.py
@date:2025/6/6 19:55
@desc:
"""
import hashlib
import json
import threading
from django.core import signing, cache
from common.constants.cache_version import Cache_Version
from common.utils.rsa_util import encrypt, decrypt
authentication_cache = cache.cache
lock = threading.Lock()
def _decrypt(authentication: str):
cache_key = hashlib.sha256(authentication.encode()).hexdigest()
result = authentication_cache.get(key=cache_key, version=Cache_Version.CHAT.value)
if result is None:
with lock:
result = authentication_cache.get(cache_key, version=Cache_Version.CHAT.value)
if result is None:
result = decrypt(authentication)
authentication_cache.set(cache_key, result, version=Cache_Version.CHAT.value, timeout=60 * 60 * 2)
return result
class ChatAuthentication:
def __init__(self, auth_type: str | None, **kwargs):
self.auth_type = auth_type
for k, v in kwargs.items():
self.__setattr__(k, v)
def to_dict(self):
return self.__dict__
def to_string(self):
value = json.dumps(self.to_dict())
authentication = encrypt(value)
cache_key = hashlib.sha256(authentication.encode()).hexdigest()
authentication_cache.set(cache_key, value, version=Cache_Version.CHAT.get_version(), timeout=60 * 60 * 2)
return authentication
@staticmethod
def new_instance(authentication: str):
auth = json.loads(_decrypt(authentication))
return ChatAuthentication(**auth)
class ChatUserToken:
def __init__(self, application_id, user_id, access_token, _type, chat_user_type, chat_user_id,
authentication: ChatAuthentication):
self.application_id = application_id
self.user_id = user_id
self.access_token = access_token
self.type = _type
self.chat_user_type = chat_user_type
self.chat_user_id = chat_user_id
self.authentication = authentication
def to_dict(self):
return {
'application_id': str(self.application_id),
'user_id': str(self.user_id),
'access_token': self.access_token,
'type': str(self.type.value),
'chat_user_type': str(self.chat_user_type),
'chat_user_id': str(self.chat_user_id),
'authentication': self.authentication.to_string()
}
def to_token(self):
return signing.dumps(self.to_dict())
@staticmethod
def new_instance(token_dict):
return ChatUserToken(token_dict.get('application_id'), token_dict.get('user_id'),
token_dict.get('access_token'), token_dict.get('type'), token_dict.get('chat_user_type'),
token_dict.get('chat_user_id'),
ChatAuthentication.new_instance(token_dict.get('authentication')))