-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathScamGuard.py
More file actions
346 lines (289 loc) · 14.6 KB
/
Copy pathScamGuard.py
File metadata and controls
346 lines (289 loc) · 14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# The core main instance of ScamGuard, used as the primary instance. This should have code that only needs to be ran by a single instance
# Such as the host system that shares commands/messages to sub-instances and things like backup.
# It should not handle any recv instructions from ServerHandler except for requests by sub-instances.
from Logger import Logger, LogLevel
from BotEnums import BanResult, BanAction, ModerationAction
from Config import Config
from BotBase import DiscordBot
from BotConnections import RelayServer
from datetime import datetime, timedelta
from discord import Embed, User, Member, HTTPException, Message, Thread
from discord.ext import tasks
from multiprocessing import Process
from BotSubprocess import CreateBotProcess
import asyncio
__all__ = ["ScamGuard"]
ConfigData:Config=Config()
class ScamGuard(DiscordBot):
ServerHandler:RelayServer = None # pyright: ignore[reportAssignmentType]
HasStartedInstances:bool = False
SubProcess={}
### Initialization ###
def __init__(self, AssignedBotID:int):
self.ServerHandler = RelayServer(AssignedBotID, self)
super().__init__(self.ServerHandler.GetFileLocation(), AssignedBotID)
async def setup_hook(self):
# TODO: Make a fancy table for this in the future
if (ConfigData["RunBackupEveryXHours"] > 0):
self.PeriodicBackup.start()
if (ConfigData["RunIdleCleanupEveryXHours"] > 0):
self.PeriodicLeave.start()
self.HandleListenRelay.start()
self.HandleBanExceed.start()
await super().setup_hook()
### Discord Tasks Handling ###
@tasks.loop(seconds=0.5)
async def HandleListenRelay(self):
await self.ServerHandler.TickRelay()
### Task Interval Handling ###
def ConfigBackupInterval(self):
self.PeriodicBackup.change_interval(seconds=0.0, minutes=0.0, hours=float(ConfigData["RunBackupEveryXHours"])) # pyright: ignore[reportAttributeAccessIssue]
def ConfigLeaveInterval(self):
self.PeriodicLeave.change_interval(seconds=0.0, minutes=0.0, hours=float(ConfigData["RunIdleCleanupEveryXHours"])) # pyright: ignore[reportAttributeAccessIssue]
def RetryTaskInterval(self, task):
task.change_interval(seconds=0.0, minutes=5.0, hours=0.0)
### Backup handling ###
# By default, this runs every 5 minutes, however upon loading configurations, this will update the
# backup interval to the proper settings
@tasks.loop(minutes=2)
async def PeriodicBackup(self):
# Prevent the first time this task runs from trying to backup at start.
if (self.PeriodicBackup.minutes == 2.0):
self.ConfigBackupInterval()
return
# These are safe async tasks that we can run backup during. If the task queue only has tasks of these names, then it is safe.
SafeAsyncTaskNames = ["CreateBanAnnouncement", "PostFirstTimeMessage", "PostScamReport"]
# If we have active async tasks in progress, check to see if we can continue without them
# i.e. we are currently waiting to do something.
if (len(self.AsyncTasks) > 0):
for itm in self.AsyncTasks:
if not itm.get_name() in SafeAsyncTaskNames:
Logger.Log(LogLevel.Warn, "There are currently async tasks in progress, will try backup again in 5 minutes...")
self.RetryTaskInterval(self.PeriodicBackup)
return
# If we currently have the minutes value set, then we need to make sure we get back onto the right track
if (self.PeriodicBackup.minutes != 0.0):
self.ConfigBackupInterval()
Logger.Log(LogLevel.Log, "Periodic Bot DB Backup Started...")
self.Database.Backup()
self.Database.CleanupBackups()
### Instance Cleanup ###
@tasks.loop(minutes=2)
async def PeriodicLeave(self):
# Prevent the first time this code ever runs from running directly at startup.
if (self.PeriodicLeave.minutes == 2.0):
self.ConfigLeaveInterval()
return
# If we are processing any async tasks, do not clean up the deactivated table
if (len(self.AsyncTasks) > 0):
self.RetryTaskInterval(self.PeriodicLeave)
return
# If the instances code hasn't been able to start, wait for when it's ready again.
if (not self.HasStartedInstances):
self.RetryTaskInterval(self.PeriodicLeave)
return
await self.RunPeriodicLeave(False)
async def RunPeriodicLeave(self, DryRun:bool):
# If this config is less than or equal to 0, then we don't execute the task
InactiveInstanceWindow:int = ConfigData["InactiveServerDayWindow"]
if (InactiveInstanceWindow <= 0):
return
# If we were in a retry state, reset the config loop again so that it's in the proper cadance
if (self.PeriodicLeave.minutes != 0.0):
self.ConfigLeaveInterval()
CurrentTime:datetime = datetime.now() - timedelta(days=float(InactiveInstanceWindow))
AllDisabledServers = self.Database.GetAllDeactivatedServers()
OldServerCount:int = len(AllDisabledServers)
Logger.CLog(OldServerCount > 0, LogLevel.Notice, f"Non-activated server ({OldServerCount}) purge. Dry run? {DryRun}")
ServersLeft:int = 0
for ServerData in AllDisabledServers:
if (CurrentTime > ServerData.created_at):
ServerID:int = int(ServerData.discord_server_id)
if (DryRun or self.LeaveServer(ServerID)):
ServersLeft += 1
Logger.Log(LogLevel.Verbose, f"Attempting to leave server {ServerID}.")
Logger.CLog(DryRun, LogLevel.Log, f"Attempting to leave server {ServerID}.")
else:
Logger.Log(LogLevel.Warn, f"Could not send leave request for server {ServerID}")
# Attempt to sleep the big scary rate limits away
if (not DryRun):
await asyncio.sleep(1.0)
Logger.CLog(ServersLeft > 0, LogLevel.Notice, f"Server Instance Cleanup Completed, left {ServersLeft} out of {len(AllDisabledServers)}")
### Handling Ban Exceeds ###
@tasks.loop(hours=1)
async def HandleBanExceed(self):
ExhaustedList = self.Database.GetExhaustedServers()
ExhaustedListCount = len(ExhaustedList)
if (ExhaustedListCount <= 0):
return
# If the instances are not started, then we should wait for them to start
if (not self.HasStartedInstances):
return
NumBans:int = self.Database.GetNumBans()
Logger.Log(LogLevel.Notice, f"Attempting to process {ExhaustedListCount} cooldown servers now")
for Server in ExhaustedList:
NumCount:int = NumBans - int(Server.current_pos)
ServerId:int = int(Server.discord_server_id)
self.Database.SetProcessingServerCooldown(ServerId, True)
self.AddAsyncTask(self.ReprocessBansForServer(ServerId, NumCount, True))
Logger.Log(LogLevel.Log, f"Enqueueing reprocessing of {NumCount} bans for server {ServerId}")
# Handling async tasks step flow
@PeriodicBackup.before_loop
@HandleBanExceed.before_loop
@PeriodicLeave.before_loop
@HandleListenRelay.before_loop
async def BeforeScheduledAsyncTasks(self):
# Wait until the bot is all set up before attempting periodic leaves
await self.wait_until_ready()
### Config Handling ###
def ProcessConfig(self, ShouldReload:bool):
super().ProcessConfig(ShouldReload)
### Discord Eventing ###
async def InitializeBotRuntime(self):
await super().InitializeBotRuntime()
await self.StartAllInstances()
### Thread handling (for automated checks) ###
async def LeaveThread(self, thread: Thread) -> bool:
try:
await thread.leave()
return True
except Exception as ex:
# Only print out exceptions if we're the control server, silently fail elsewhere.
if (thread.guild.id == ConfigData["ControlServer"]):
Logger.Log(LogLevel.Warn, f"Unable to leave the thread {thread.id}, encountered exception {str(ex)}")
return False
async def on_thread_join(self, thread: Thread):
# Only handle in the control server
if (thread.guild.id == ConfigData["ControlServer"]):
# and in the external reports channel
if (thread.parent_id == ConfigData["ExternalReportChannel"]):
async for message in thread.history(limit=2, oldest_first=True):
# leave the thread if we were invited by someone else.
if (message.author.id != ConfigData["ThreadInviteUser"]):
continue
# Check to see if we have content, which means it's our mentionable
if (message.content == ""):
continue
Logger.Log(LogLevel.Debug, f"Got post content of {message.content}")
IDGrabList = message.content.split()
if (len(IDGrabList) >= 2):
userID:int = int(IDGrabList[1])
try:
await message.delete()
except:
Logger.Log(LogLevel.Log, f"Could not delete mention message {message.id}")
ResponseEmbed:Embed = await self.CreateBanEmbed(userID)
await thread.send(embed = ResponseEmbed)
return
Logger.Log(LogLevel.Debug, f"Could not find any mentionable message, leaving thread {thread.id}")
await self.LeaveThread(thread)
### Subprocess instances ###
async def StartAllInstances(self, BypassCheck:bool=False, RestartMainClient:bool=False):
# Prevent us from restarting instances when on_ready may run again.
if (self.HasStartedInstances and not BypassCheck):
return
if (RestartMainClient):
Logger.Log(LogLevel.Log, "Restarting client instance for control bot instance")
await self.StartInstance(0)
# Spin up all the subinstances of the other bot clients
AllInstances = Config.GetAllSubTokens()
for InstanceID in AllInstances:
await self.StartInstance(int(InstanceID))
self.HasStartedInstances = True
async def StartInstance(self, InstanceID:int):
RelayFileHandleLocation = self.ServerHandler.GetFileLocation()
if (InstanceID == 0):
self.ClientHandler = None # pyright: ignore[reportAttributeAccessIssue]
self.SetupClientConnection(RelayFileHandleLocation)
self.ClientHandler.SendHello()
return
# Make sure to exit out of any instances if they're already running for this index
await self.StopInstanceIfExists(InstanceID)
Logger.Log(LogLevel.Log, f"Spinning up instance #{InstanceID}")
self.SubProcess[InstanceID] = Process(target=CreateBotProcess, args=(RelayFileHandleLocation, InstanceID), name=f'Bot-{InstanceID}')
self.SubProcess[InstanceID].start()
async def StopInstanceIfExists(self, InstanceID:int):
if (InstanceID in self.SubProcess and self.SubProcess[InstanceID] is not None):
ExistingProcess:Process = self.SubProcess[InstanceID]
ExistingProcess.terminate()
try:
# wait for the process to terminate
while ExistingProcess.is_alive():
await asyncio.sleep(2.0)
# The process can be closed properly now.
ExistingProcess.close()
except Exception as ex:
Logger.Log(LogLevel.Notice, f"Instance {InstanceID} has a hung instance, removing the subprocess {str(ex)}")
finally:
self.SubProcess[InstanceID] = None
### Command Processing & Utils ###
async def PublishAnnouncement(self, InMessage:str|Embed):
if (ConfigData.IsDevelopment()):
Logger.Log(LogLevel.Notice, "Announcement message was dropped because this instance is in development mode")
return
try:
NewMessage:Message|None = None
if (self.AnnouncementChannel is None):
return
if (type(InMessage) == Embed):
NewMessage = await self.AnnouncementChannel.send(embed=InMessage)
else:
NewMessage = await self.AnnouncementChannel.send(str(InMessage))
if (NewMessage is not None):
await NewMessage.publish()
elif (type(InMessage) == str):
Logger.Log(LogLevel.Error, f"Could not publish message {str(InMessage)}! Did not send!")
else:
Logger.Log(LogLevel.Error, f"Could not publish message, as it did not send!")
except HTTPException as ex:
Logger.Log(LogLevel.Log, f"WARN: Unable to publish message to announcement channel {str(ex)}")
### Ban Handling ###
async def HandleBanAction(self, TargetId:int, Sender:Member|User, Action:ModerationAction, *, ThreadId:int|None, Reason:str|None) -> BanAction:
DatabaseAction:BanAction = BanAction.DBError
NewAnnouncement:Embed|None = None
if (Action == ModerationAction.Ban):
DatabaseAction = self.Database.AddBan(TargetId, Sender.name, Sender.id, ThreadId)
NewAnnouncement = await self.CreateBanEmbed(TargetId, True, Reason)
elif (Action == ModerationAction.Unban):
NewAnnouncement = await self.CreateBanEmbed(TargetId, False, Reason)
DatabaseAction = self.Database.RemoveBan(TargetId)
else:
Logger.Log(LogLevel.Error, f"An invalid moderation action was passed to HandleBanAction, {Action}")
return BanAction.DBError
# If we encountered an error, return said error, don't do anything else.
if (DatabaseAction not in [BanAction.Banned, BanAction.Unbanned]):
return DatabaseAction
# Queue up the announcement
if (NewAnnouncement is not None):
self.AddAsyncTask(self.CreateBanAnnouncement(NewAnnouncement, Action))
# Queue up the action
self.AddAsyncTask(self.PropagateActionToServers(TargetId, Sender, Action, Reason))
return DatabaseAction
async def CreateBanAnnouncement(self, Announcement:Embed, ActionTaken:ModerationAction):
if (ActionTaken is ModerationAction.Ban or ActionTaken is ModerationAction.Unban):
self.UpdateEmbedForPublish(Announcement, ActionTaken)
await self.PublishAnnouncement(Announcement)
async def ReprocessBansForInstance(self, InstanceID:int, LastActions:int):
if (InstanceID == self.BotID):
await self.ReprocessInstance(LastActions)
else:
self.ClientHandler.SendReprocessInstanceBans(InstanceId=InstanceID, InNumToRetry=LastActions)
async def ReprocessBansForServer(self, ServerId:int, LastActions:int=0, HandlingCooldown:bool=False) -> BanResult:
TargetBotId:int|None = self.Database.GetBotIdForServer(ServerId)
if (TargetBotId == self.BotID):
return await self.ReprocessBans(ServerId, LastActions, HandlingCooldown)
elif (TargetBotId is None):
return BanResult.Error
else:
self.ClientHandler.SendReprocessBans(ServerId, InstanceId=TargetBotId,
InNumToRetry=LastActions, InHandlingCooldown=HandlingCooldown)
return BanResult.Processed
async def PropagateActionToServers(self, TargetId:int, Sender:Member|User, Action:ModerationAction, Reason:str|None=None):
SenderName:str = Sender.name
if (Action == ModerationAction.Ban):
self.ClientHandler.SendBan(TargetId, SenderName, Reason)
elif (Action == ModerationAction.Unban):
self.ClientHandler.SendUnban(TargetId, SenderName, Reason)
elif (Action == ModerationAction.Kick):
self.ClientHandler.SendKick(TargetId, SenderName, Reason)
await self.ProcessActionOnUser(TargetId, AuthorizerName=SenderName, Action=Action, Reason=Reason)