-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
279 lines (231 loc) · 8.76 KB
/
database.py
File metadata and controls
279 lines (231 loc) · 8.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
import copy
from pymongo import MongoClient
import config
from logger_config import get_logger
logger = get_logger(__name__)
# MongoDB configuration
mongo_client = MongoClient(config.MONGO_URI)
db = mongo_client[config.DB_NAME]
ROLES_STATE_DOC_ID = "roles_state"
REGISTERED_CHANNELS_STATE_DOC_ID = "registered_channels_state"
LINKED_CHANNEL_GROUPS_STATE_DOC_ID = "linked_channel_groups_state"
DEFAULT_ROLES_STATE = {
"superadmins": [],
"admins": [],
"registrators": [],
}
DEFAULT_REGISTERED_CHANNELS_STATE = {
"register": [],
}
DEFAULT_LINKED_CHANNEL_GROUPS_STATE = {
"groups": [],
}
def _get_state_document(collection_name: str, doc_id: str, default_state: dict):
collection = db[collection_name]
result = collection.find_one({"_id": doc_id}, {"_id": 0})
if result is None:
return copy.deepcopy(default_state)
return result
def _save_state_document(collection_name: str, doc_id: str, state: dict):
collection = db[collection_name]
document = {"_id": doc_id, **state}
collection.replace_one({"_id": doc_id}, document, upsert=True)
def ensure_state_documents():
"""Ensure singleton state documents exist for legacy JSON-backed data."""
db[config.ROLES_COLLECTION_NAME].update_one(
{"_id": ROLES_STATE_DOC_ID},
{"$setOnInsert": {"_id": ROLES_STATE_DOC_ID, **copy.deepcopy(DEFAULT_ROLES_STATE)}},
upsert=True,
)
db[config.REGISTERED_CHANNELS_COLLECTION_NAME].update_one(
{"_id": REGISTERED_CHANNELS_STATE_DOC_ID},
{"$setOnInsert": {"_id": REGISTERED_CHANNELS_STATE_DOC_ID, **copy.deepcopy(DEFAULT_REGISTERED_CHANNELS_STATE)}},
upsert=True,
)
db[config.LINKED_CHANNEL_GROUPS_COLLECTION_NAME].update_one(
{"_id": LINKED_CHANNEL_GROUPS_STATE_DOC_ID},
{"$setOnInsert": {"_id": LINKED_CHANNEL_GROUPS_STATE_DOC_ID, **copy.deepcopy(DEFAULT_LINKED_CHANNEL_GROUPS_STATE)}},
upsert=True,
)
def load_roles_state():
return _get_state_document(
config.ROLES_COLLECTION_NAME,
ROLES_STATE_DOC_ID,
DEFAULT_ROLES_STATE,
)
def save_roles_state(state: dict):
_save_state_document(config.ROLES_COLLECTION_NAME, ROLES_STATE_DOC_ID, state)
def load_registered_channels_state():
return _get_state_document(
config.REGISTERED_CHANNELS_COLLECTION_NAME,
REGISTERED_CHANNELS_STATE_DOC_ID,
DEFAULT_REGISTERED_CHANNELS_STATE,
)
def save_registered_channels_state(state: dict):
_save_state_document(
config.REGISTERED_CHANNELS_COLLECTION_NAME,
REGISTERED_CHANNELS_STATE_DOC_ID,
state,
)
def load_linked_channel_groups_state():
return _get_state_document(
config.LINKED_CHANNEL_GROUPS_COLLECTION_NAME,
LINKED_CHANNEL_GROUPS_STATE_DOC_ID,
DEFAULT_LINKED_CHANNEL_GROUPS_STATE,
)
def save_linked_channel_groups_state(state: dict):
_save_state_document(
config.LINKED_CHANNEL_GROUPS_COLLECTION_NAME,
LINKED_CHANNEL_GROUPS_STATE_DOC_ID,
state,
)
def check_and_create_group_collection(group_name: str):
if group_name not in db.list_collection_names():
db.create_collection(group_name)
logger.info(f"Created collection: {group_name}")
else:
logger.info(f"Collection {group_name} already exists.")
def save_message_group_entry(group_name: str, message_group_entry: list):
check_and_create_group_collection(group_name)
collection = db[group_name]
collection.insert_one({"messages": message_group_entry})
logger.info("Saved message group entry to database.")
def get_message_group_entry_by_message_id(message_id: str, group_name: str):
if not type(message_id) is str:
message_id = str(message_id)
check_and_create_group_collection(group_name)
collection = db[group_name]
result = collection.find_one({
"messages": {
"$elemMatch": {
"message_id": message_id
}
}
})
if result:
return result["messages"]
else:
logger.info(f"No entry found for message ID: {message_id} in group: {group_name}")
return None
def get_thread_message_group_entry(thread_id: str, group_name: str):
"""Get message group entry for a specific thread."""
if not type(thread_id) is str:
thread_id = str(thread_id)
check_and_create_group_collection(group_name)
collection = db[group_name]
result = collection.find_one({
"messages": {
"$elemMatch": {
"thread_id": thread_id
}
}
})
if result:
return result["messages"]
else:
logger.info(f"No entry found for thread ID: {thread_id} in group: {group_name}")
return None
def delete_message_group_entry_by_message_id(message_id: str, group_name: str):
"""Delete message group entry by message ID."""
if not type(message_id) is str:
message_id = str(message_id)
check_and_create_group_collection(group_name)
collection = db[group_name]
result = collection.delete_one({
"messages": {
"$elemMatch": {
"message_id": message_id
}
}
})
if result.deleted_count > 0:
logger.info(f"Deleted message group entry for message ID: {message_id} in group: {group_name}")
return True
else:
logger.info(f"No entry found to delete for message ID: {message_id} in group: {group_name}")
return False
def set_user_avatar(user_id: str, emoji_avatar: str):
"""Set emoji avatar for a user."""
if not type(user_id) is str:
user_id = str(user_id)
collection = db[config.AVATAR_COLLECTION_NAME]
# Upsert - update if exists, insert if not
result = collection.update_one(
{"user_id": user_id},
{"$set": {"user_id": user_id, "emoji_avatar": emoji_avatar}},
upsert=True
)
if result.upserted_id:
logger.info(f"Created new avatar entry for user {user_id}: {emoji_avatar}")
else:
logger.info(f"Updated avatar for user {user_id}: {emoji_avatar}")
return True
def get_user_avatar(user_id: str):
"""Get emoji avatar for a user."""
if not type(user_id) is str:
user_id = str(user_id)
collection = db[config.AVATAR_COLLECTION_NAME]
result = collection.find_one({"user_id": user_id})
if result:
logger.debug(f"Found avatar for user {user_id}: {result['emoji_avatar']}")
return result["emoji_avatar"]
else:
logger.debug(f"No avatar found for user {user_id}")
return None
def delete_user_avatar(user_id: str):
"""Delete emoji avatar for a user."""
if not type(user_id) is str:
user_id = str(user_id)
collection = db[config.AVATAR_COLLECTION_NAME]
result = collection.delete_one({"user_id": user_id})
if result.deleted_count > 0:
logger.info(f"Deleted avatar for user {user_id}")
return True
else:
logger.info(f"No avatar found to delete for user {user_id}")
return False
def _forum_thread_collection_name(group_name: str) -> str:
return f"{group_name}_forum_threads"
def save_forum_thread_group_entry(group_name: str, thread_group_entry: list):
collection_name = _forum_thread_collection_name(group_name)
check_and_create_group_collection(collection_name)
collection = db[collection_name]
collection.insert_one({"threads": thread_group_entry})
logger.info("Saved forum thread group entry to database.")
def get_forum_thread_group_entry_by_thread_id(thread_id: str, group_name: str):
if not type(thread_id) is str:
thread_id = str(thread_id)
collection_name = _forum_thread_collection_name(group_name)
check_and_create_group_collection(collection_name)
collection = db[collection_name]
result = collection.find_one({
"threads": {
"$elemMatch": {
"thread_id": thread_id
}
}
})
if result:
return result["threads"]
else:
logger.info(f"No forum thread entry found for thread ID: {thread_id} in group: {group_name}")
return None
def delete_forum_thread_group_entry_by_thread_id(thread_id: str, group_name: str):
if not type(thread_id) is str:
thread_id = str(thread_id)
collection_name = _forum_thread_collection_name(group_name)
check_and_create_group_collection(collection_name)
collection = db[collection_name]
result = collection.delete_one({
"threads": {
"$elemMatch": {
"thread_id": thread_id
}
}
})
if result.deleted_count > 0:
logger.info(f"Deleted forum thread group entry for thread ID: {thread_id} in group: {group_name}")
return True
else:
logger.info(f"No forum thread entry found to delete for thread ID: {thread_id} in group: {group_name}")
return False