-
-
Notifications
You must be signed in to change notification settings - Fork 747
Expand file tree
/
Copy path_scheduler.py
More file actions
625 lines (526 loc) · 25.1 KB
/
_scheduler.py
File metadata and controls
625 lines (526 loc) · 25.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
import textwrap
import typing as t
from abc import abstractmethod
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime, timedelta
from gettext import ngettext
import arrow
import dateutil.parser
import discord
from async_rediscache import RedisCache
from discord.ext.commands import Context
from pydis_core.site_api import ResponseCodeError
from pydis_core.utils import scheduling
from pydis_core.utils.channel import get_or_fetch_channel
from bot import constants
from bot.bot import Bot
from bot.constants import Colours, Roles
from bot.converters import MemberOrUser
from bot.exts.moderation.infraction import _utils
from bot.exts.moderation.modlog import ModLog
from bot.log import get_logger
from bot.utils import messages, time
from bot.utils.channel import is_mod_channel
from bot.utils.modlog import send_log_message
log = get_logger(__name__)
AUTOMATED_TIDY_UP_HOURS = 8
class InfractionScheduler:
"""Handles the application, pardoning, and expiration of infractions."""
messages_to_tidy: RedisCache = RedisCache()
def __init__(self, bot: Bot, supported_infractions: t.Container[str]):
self.bot = bot
self.scheduler = scheduling.Scheduler(self.__class__.__name__)
self.tidy_up_scheduler = scheduling.Scheduler(
f"{self.__class__.__name__}TidyUp"
)
self.supported_infractions = supported_infractions
async def cog_unload(self) -> None:
"""Cancel scheduled tasks."""
self.scheduler.cancel_all()
self.tidy_up_scheduler.cancel_all()
@property
def mod_log(self) -> ModLog:
"""Get the currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
async def cog_load(self) -> None:
"""Schedule expiration for previous infractions."""
await self.bot.wait_until_guild_available()
supported_infractions = self.supported_infractions
log.trace(f"Rescheduling infractions for {self.__class__.__name__}.")
infractions = await self.bot.api_client.get(
"bot/infractions",
params={
"active": "true",
"ordering": "expires_at",
"permanent": "false",
"types": ",".join(supported_infractions),
},
)
to_schedule = [i for i in infractions if i["id"] not in self.scheduler]
for infraction in to_schedule:
log.trace("Scheduling %r", infraction)
self.schedule_expiration(infraction)
# Call ourselves again when the last infraction would expire. This will be the "oldest" infraction we've seen
# from the database so far, and new ones are scheduled as part of application.
# We make sure to fire this
if to_schedule:
next_reschedule_point = max(
dateutil.parser.isoparse(infr["expires_at"]) for infr in to_schedule
)
log.trace("Will reschedule remaining infractions at %s", next_reschedule_point)
self.scheduler.schedule_at(next_reschedule_point, -1, self.cog_load())
log.trace("Done rescheduling expirations, scheduling tidy up tasks.")
for key, expire_at in await self.messages_to_tidy.items():
channel_id, message_id = map(int, key.split(":"))
expire_at = dateutil.parser.isoparse(expire_at)
log.trace(
"Scheduling tidy up for message %s in channel %s at %s",
message_id, channel_id, expire_at
)
self.tidy_up_scheduler.schedule_at(
expire_at,
message_id,
self._delete_infraction_message(channel_id, message_id)
)
async def _delete_infraction_message(
self,
channel_id: int,
message_id: int
) -> None:
"""
Delete a message in the given channel.
This is used to delete infraction messages after a certain period of time.
"""
try:
channel = await get_or_fetch_channel(self.bot, channel_id)
message = await channel.fetch_message(message_id)
await message.delete()
log.trace(f"Deleted infraction message {message_id} in channel {channel_id}.")
except discord.NotFound:
log.warning(f"Channel or message {message_id} not found in channel {channel_id}.")
except discord.Forbidden:
log.warning(f"Bot lacks permissions to delete message {message_id} in channel {channel_id}.")
except discord.HTTPException:
log.exception(f"Issue during scheduled deletion of message {message_id} in channel {channel_id}.")
return # Keep the task in Redis on HTTP errors
await self.messages_to_tidy.delete(f"{channel_id}:{message_id}")
async def reapply_infraction(
self,
infraction: _utils.Infraction,
action: Callable[[], Awaitable[None]] | None
) -> None:
"""
Reapply an infraction if it's still active or deactivate it if less than 60 sec left.
Note: The `action` provided is an async function rather than a coroutine
to prevent getting a RuntimeWarning if it is not used (e.g. in mocked tests).
"""
if infraction["expires_at"] is not None:
# Calculate the time remaining, in seconds, for the infraction.
expiry = dateutil.parser.isoparse(infraction["expires_at"])
delta = (expiry - arrow.utcnow()).total_seconds()
else:
# If the infraction is permanent, it is not possible to get the time remaining.
delta = None
# Mark as inactive if the infraction is not permanent and less than a minute remains.
if delta is not None and delta < 60:
log.info(
"Infraction will be deactivated instead of re-applied "
"because less than 1 minute remains."
)
await self.deactivate_infraction(infraction)
return
# Allowing mod log since this is a passive action that should be logged.
try:
await action()
except discord.HTTPException as e:
# When user joined and then right after this left again before action completed, this can't apply roles
if e.code == 10007 or e.status == 404:
log.info(
f"Can't reapply {infraction['type']} to user {infraction['user']} because user left the guild."
)
else:
log.exception(
f"Got unexpected HTTPException (HTTP {e.status}, Discord code {e.code})"
f"when running {infraction['type']} action for {infraction['user']}."
)
else:
log.info(f"Re-applied {infraction['type']} to user {infraction['user']} upon rejoining.")
async def apply_infraction(
self,
ctx: Context,
infraction: _utils.Infraction,
user: MemberOrUser,
action: Callable[[], Awaitable[None]] | None = None,
user_reason: str | None = None,
additional_info: str = "",
) -> bool:
"""
Apply an infraction to the user, log the infraction, and optionally notify the user.
`action`, if not provided, will result in the infraction not getting scheduled for deletion.
`user_reason`, if provided, will be sent to the user in place of the infraction reason.
`additional_info` will be attached to the text field in the mod-log embed.
Note: The `action` provided is an async function rather than just a coroutine
to prevent getting a RuntimeWarning if it is not used (e.g. in mocked tests).
Returns whether or not the infraction succeeded.
"""
infr_type = infraction["type"]
icon = _utils.INFRACTION_ICONS[infr_type][0]
reason = infraction["reason"]
id_ = infraction["id"]
jump_url = infraction["jump_url"]
expiry = time.format_with_duration(
infraction["expires_at"],
infraction["last_applied"]
)
if user_reason is None:
user_reason = reason
log.trace(f"Applying {infr_type} infraction #{id_} to {user}.")
# Default values for the confirmation message and mod log.
confirm_msg = ":ok_hand: applied"
# Specifying an expiry for a note or warning makes no sense.
if infr_type in ("note", "warning"):
expiry_msg = ""
else:
expiry_msg = f" until {expiry}" if expiry else " permanently"
dm_result = ""
dm_log_text = ""
expiry_log_text = f"\nExpires: {expiry}" if expiry else ""
log_title = "applied"
log_content = None
failed = False
# DM the user about the infraction if it's not a shadow/hidden infraction.
# This needs to happen before we apply the infraction, as the bot cannot
# send DMs to user that it doesn't share a guild with. If we were to
# apply kick/ban infractions first, this would mean that we'd make it
# impossible for us to deliver a DM. See python-discord/bot#982.
if not infraction["hidden"] and infr_type in {"ban", "kick"}:
if await _utils.notify_infraction(infraction, user, user_reason):
dm_result = ":incoming_envelope: "
dm_log_text = "\nDM: Sent"
else:
dm_result = f"{constants.Emojis.failmail} "
dm_log_text = "\nDM: **Failed**"
end_msg = ""
if is_mod_channel(ctx.channel):
log.trace(f"Fetching total infraction count for {user}.")
infractions = await self.bot.api_client.get(
"bot/infractions",
params={"user__id": str(user.id)}
)
total = len(infractions)
end_msg = f" (#{id_} ; {total} infraction{ngettext('', 's', total)} total)"
elif infraction["actor"] == self.bot.user.id:
log.trace(
f"Infraction #{id_} actor is bot; including the reason in the confirmation message."
)
if reason:
end_msg = (
f" (reason: {textwrap.shorten(reason, width=1500, placeholder='...')})."
f"\n\nThe <@&{Roles.moderators}> have been alerted for review"
)
purge = infraction.get("purge", "")
# Execute the necessary actions to apply the infraction on Discord.
if action:
log.trace(f"Running the infraction #{id_} application action.")
try:
await action()
if expiry:
# Schedule the expiration of the infraction.
self.schedule_expiration(infraction)
except discord.HTTPException as e:
# Accordingly display that applying the infraction failed.
# Don't use ctx.message.author; antispam only patches ctx.author.
confirm_msg = ":x: failed to apply"
expiry_msg = ""
log_content = ctx.author.mention
log_title = "failed to apply"
log_msg = f"Failed to apply {' '.join(infr_type.split('_'))} infraction #{id_} to {user}"
if isinstance(e, discord.Forbidden):
log.warning(f"{log_msg}: bot lacks permissions.")
elif e.code == 10007 or e.status == 404:
log.info(
f"Can't apply {infraction['type']} to user {infraction['user']} because user left from guild."
)
else:
log.exception(log_msg)
failed = True
if not failed:
infr_message = f" **{purge}{' '.join(infr_type.split('_'))}** to {user.mention}{expiry_msg}{end_msg}"
# If we need to DM and haven't already tried to
if not infraction["hidden"] and infr_type not in {"ban", "kick"}:
if await _utils.notify_infraction(infraction, user, user_reason):
dm_result = ":incoming_envelope: "
dm_log_text = "\nDM: Sent"
else:
dm_result = f"{constants.Emojis.failmail} "
dm_log_text = "\nDM: **Failed**"
if infr_type == "warning" and not ctx.channel.permissions_for(user).view_channel:
failed = True
log_title = "failed to apply"
additional_info += "\n*Failed to show the warning to the user*"
confirm_msg = (f":x: Failed to apply **warning** to {user.mention} "
"because DMing the user was unsuccessful")
if failed:
log.trace(f"Trying to delete infraction {id_} from database because applying infraction failed.")
try:
await self.bot.api_client.delete(f"bot/infractions/{id_}")
except ResponseCodeError as e:
confirm_msg += " and failed to delete"
log_title += " and failed to delete"
log.error(f"Deletion of {infr_type} infraction #{id_} failed with error code {e.status}.")
infr_message = ""
# Send a confirmation message to the invoking context.
log.trace(f"Sending infraction #{id_} confirmation message.")
mentions = discord.AllowedMentions(users=[user], roles=False)
sent_msg = await ctx.send(f"{dm_result}{confirm_msg}{infr_message}.", allowed_mentions=mentions)
if infraction["actor"] == self.bot.user.id:
expire_message_time = datetime.now(UTC) + timedelta(hours=AUTOMATED_TIDY_UP_HOURS)
log.trace(f"Scheduling message tidy for infraction #{id_} in {AUTOMATED_TIDY_UP_HOURS} hours.")
# Schedule the message to be deleted after a certain period of time.
self.tidy_up_scheduler.schedule_at(
expire_message_time,
sent_msg.id,
self._delete_infraction_message(ctx.channel.id, sent_msg.id)
)
# Persist to Redis to handle for bot restarts.
await self.messages_to_tidy.set(
f"{ctx.channel.id}:{sent_msg.id}",
expire_message_time.isoformat(),
)
if jump_url is None:
jump_url = "(Infraction issued in a ModMail channel.)"
else:
jump_url = f"[Click here.]({jump_url})"
# Send a log message to the mod log.
# Don't use ctx.message.author for the actor; antispam only patches ctx.author.
log.trace(f"Sending apply mod log for infraction #{id_}.")
await send_log_message(
self.bot,
icon_url=icon,
colour=Colours.soft_red,
title=f"Infraction {log_title}: {' '.join(infr_type.split('_'))}",
thumbnail=user.display_avatar.url,
text=textwrap.dedent(f"""
Member: {messages.format_user(user)}
Actor: {ctx.author.mention}{dm_log_text}{expiry_log_text}
Reason: {reason}
Jump URL: {jump_url}
{additional_info}
"""),
content=log_content,
footer=f"ID: {id_}"
)
log.info(f"{'Failed to apply' if failed else 'Applied'} {purge}{infr_type} infraction #{id_} to {user}.")
return not failed
async def pardon_infraction(
self,
ctx: Context,
infr_type: str,
user: MemberOrUser,
pardon_reason: str | None = None,
*,
send_msg: bool = True,
notify: bool = True
) -> None:
"""
Prematurely end an infraction for a user and log the action in the mod log.
If `pardon_reason` is None, then the database will not receive
appended text explaining why the infraction was pardoned.
If `send_msg` is True, then a pardoning confirmation message will be sent to
the context channel. Otherwise, no such message will be sent.
If `notify` is True, notify the user of the pardon via DM where applicable.
"""
log.trace(f"Pardoning {infr_type} infraction for {user}.")
# Check the current active infraction
log.trace(f"Fetching active {infr_type} infractions for {user}.")
response = await self.bot.api_client.get(
"bot/infractions",
params={
"active": "true",
"type": infr_type,
"user__id": user.id
}
)
if not response:
log.debug(f"No active {infr_type} infraction found for {user}.")
await ctx.send(f":x: There's no active {infr_type} infraction for user {user.mention}.")
return
# Deactivate the infraction and cancel its scheduled expiration task.
log_text = await self.deactivate_infraction(response[0], pardon_reason, send_log=False, notify=notify)
log_text["Member"] = messages.format_user(user)
log_text["Actor"] = ctx.author.mention
log_content = None
id_ = response[0]["id"]
footer = f"ID: {id_}"
# Accordingly display whether the user was successfully notified via DM.
dm_emoji = ""
if log_text.get("DM") == "Sent":
dm_emoji = ":incoming_envelope: "
elif "DM" in log_text:
dm_emoji = f"{constants.Emojis.failmail} "
# Accordingly display whether the pardon failed.
if "Failure" in log_text:
confirm_msg = ":x: failed to pardon"
log_title = "pardon failed"
log_content = ctx.author.mention
log.warning(f"Failed to pardon {infr_type} infraction #{id_} for {user}.")
else:
confirm_msg = ":ok_hand: pardoned"
log_title = "pardoned"
log.info(f"Pardoned {infr_type} infraction #{id_} for {user}.")
# Send a confirmation message to the invoking context.
if send_msg:
log.trace(f"Sending infraction #{id_} pardon confirmation message.")
await ctx.send(
f"{dm_emoji}{confirm_msg} infraction **{' '.join(infr_type.split('_'))}** for {user.mention}. "
f"{log_text.get('Failure', '')}"
)
# Move reason to end of entry to avoid cutting out some keys
log_text["Reason"] = log_text.pop("Reason")
# Send a log message to the mod log.
await send_log_message(
self.bot,
icon_url=_utils.INFRACTION_ICONS[infr_type][1],
colour=Colours.soft_green,
title=f"Infraction {log_title}: {' '.join(infr_type.split('_'))}",
thumbnail=user.display_avatar.url,
text="\n".join(f"{k}: {v}" for k, v in log_text.items()),
footer=footer,
content=log_content,
)
async def deactivate_infraction(
self,
infraction: _utils.Infraction,
pardon_reason: str | None = None,
*,
send_log: bool = True,
notify: bool = True
) -> dict[str, str]:
"""
Deactivate an active infraction and return a dictionary of lines to send in a mod log.
The infraction is removed from Discord, marked as inactive in the database, and has its
expiration task cancelled.
If `pardon_reason` is None, then the database will not receive
appended text explaining why the infraction was pardoned.
If `send_log` is True, a mod log is sent for the deactivation of the infraction.
If `notify` is True, notify the user of the pardon via DM where applicable.
Infractions of unsupported types will raise a ValueError.
"""
guild = self.bot.get_guild(constants.Guild.id)
mod_role = guild.get_role(constants.Roles.moderators)
user_id = infraction["user"]
actor = infraction["actor"]
type_ = infraction["type"]
id_ = infraction["id"]
log.info(f"Marking infraction #{id_} as inactive (expired).")
log_content = None
log_text = {
"Member": f"<@{user_id}>",
"Actor": f"<@{actor}>",
"Reason": infraction["reason"],
"Created": time.format_with_duration(infraction["inserted_at"], infraction["expires_at"]),
}
try:
log.trace("Awaiting the pardon action coroutine.")
returned_log = await self._pardon_action(infraction, notify)
if returned_log is not None:
log_text = {**log_text, **returned_log} # Merge the logs together
else:
raise ValueError(
f"Attempted to deactivate an unsupported infraction #{id_} ({type_})!"
)
except discord.Forbidden:
log.warning(f"Failed to deactivate infraction #{id_} ({type_}): bot lacks permissions.")
log_text["Failure"] = "The bot lacks permissions to do this (role hierarchy?)"
log_content = mod_role.mention
except discord.HTTPException as e:
if e.code == 10007 or e.status == 404:
log.info(
f"Can't pardon {infraction['type']} for user {infraction['user']} because user left the guild."
)
log_text["Failure"] = "User left the guild."
log_content = mod_role.mention
else:
log.exception(f"Failed to deactivate infraction #{id_} ({type_})")
log_text["Failure"] = f"HTTPException with status {e.status} and code {e.code}."
log_content = mod_role.mention
# Check if the user is currently being watched by Big Brother.
try:
log.trace(f"Determining if user {user_id} is currently being watched by Big Brother.")
active_watch = await self.bot.api_client.get(
"bot/infractions",
params={
"active": "true",
"type": "watch",
"user__id": user_id
}
)
log_text["Watching"] = "Yes" if active_watch else "No"
except ResponseCodeError:
log.exception(f"Failed to fetch watch status for user {user_id}")
log_text["Watching"] = "Unknown - failed to fetch watch status."
try:
# Mark infraction as inactive in the database.
log.trace(f"Marking infraction #{id_} as inactive in the database.")
data = {"active": False}
if pardon_reason is not None:
data["reason"] = ""
# Append pardon reason to infraction in database.
if (punish_reason := infraction["reason"]) is not None:
data["reason"] = punish_reason + " | "
data["reason"] += f"Pardoned: {pardon_reason}"
await self.bot.api_client.patch(
f"bot/infractions/{id_}",
json=data
)
except ResponseCodeError as e:
log.exception(f"Failed to deactivate infraction #{id_} ({type_})")
log_line = f"API request failed with code {e.status}."
log_content = mod_role.mention
# Append to an existing failure message if possible
if "Failure" in log_text:
log_text["Failure"] += f" {log_line}"
else:
log_text["Failure"] = log_line
# Cancel the expiration task.
if infraction["expires_at"] is not None:
self.scheduler.cancel(infraction["id"])
# Send a log message to the mod log.
if send_log:
log_title = "expiration failed" if "Failure" in log_text else "expired"
user = self.bot.get_user(user_id)
avatar = user.display_avatar.url if user else None
# Move reason to end so when reason is too long, this is not gonna cut out required items.
log_text["Reason"] = log_text.pop("Reason")
log.trace(f"Sending deactivation mod log for infraction #{id_}.")
await send_log_message(
self.bot,
icon_url=_utils.INFRACTION_ICONS[type_][1],
colour=Colours.soft_green,
title=f"Infraction {log_title}: {type_}",
thumbnail=avatar,
text="\n".join(f"{k}: {v}" for k, v in log_text.items()),
footer=f"ID: {id_}",
content=log_content,
)
return log_text
@abstractmethod
async def _pardon_action(
self,
infraction: _utils.Infraction,
notify: bool
) -> dict[str, str] | None:
"""
Execute deactivation steps specific to the infraction's type and return a log dict.
If `notify` is True, notify the user of the pardon via DM where applicable.
If an infraction type is unsupported, return None instead.
"""
raise NotImplementedError
def schedule_expiration(self, infraction: _utils.Infraction) -> None:
"""
Marks an infraction expired after the delay from time of scheduling to time of expiration.
At the time of expiration, the infraction is marked as inactive on the website and the
expiration task is cancelled.
"""
expiry = dateutil.parser.isoparse(infraction["expires_at"])
self.scheduler.schedule_at(expiry, infraction["id"], self.deactivate_infraction(infraction))