Skip to content

Commit 716a61e

Browse files
authored
Mitigate Discord 429s (#6461)
1 parent 637ac8f commit 716a61e

File tree

6 files changed

+325
-123
lines changed

6 files changed

+325
-123
lines changed

EssentialsDiscord/src/main/java/net/essentialsx/discord/JDADiscordService.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import net.essentialsx.discord.util.ConsoleInjector;
5151
import net.essentialsx.discord.util.DiscordUtil;
5252
import net.essentialsx.discord.util.MessageUtil;
53-
import net.essentialsx.discord.util.WrappedWebhookClient;
53+
import net.essentialsx.discord.util.WebhookDispatcher;
5454
import org.bukkit.Bukkit;
5555
import org.bukkit.entity.Player;
5656
import org.bukkit.event.HandlerList;
@@ -84,11 +84,11 @@ public class JDADiscordService implements DiscordService, IEssentialsModule {
8484
private JDA jda;
8585
private Guild guild;
8686
private TextChannel primaryChannel;
87-
private WrappedWebhookClient consoleWebhook;
87+
private WebhookDispatcher consoleWebhook;
8888
private String lastConsoleId;
8989
private final Map<String, MessageType> registeredTypes = new HashMap<>();
9090
private final Map<MessageType, String> typeToChannelId = new HashMap<>();
91-
private final Map<String, WrappedWebhookClient> channelIdToWebhook = new HashMap<>();
91+
private final Map<String, WebhookDispatcher> channelIdToWebhook = new HashMap<>();
9292
private ConsoleInjector injector;
9393
private DiscordCommandDispatcher commandDispatcher;
9494
private InteractionControllerImpl interactionController;
@@ -145,11 +145,11 @@ public void sendMessage(DiscordMessageEvent event, String message, boolean group
145145

146146
final String webhookChannelId = typeToChannelId.get(event.getType());
147147
if (webhookChannelId != null) {
148-
final WrappedWebhookClient client = channelIdToWebhook.get(webhookChannelId);
149-
if (client != null) {
148+
final WebhookDispatcher dispatcher = channelIdToWebhook.get(webhookChannelId);
149+
if (dispatcher != null) {
150150
final String avatarUrl = event.getAvatarUrl() != null ? event.getAvatarUrl() : jda.getSelfUser().getAvatarUrl();
151151
final String name = event.getName() != null ? event.getName() : guild.getSelfMember().getEffectiveName();
152-
client.send(getWebhookMessage(strippedContent, avatarUrl, name, groupMentions));
152+
dispatcher.send(getWebhookMessage(strippedContent, avatarUrl, name, groupMentions));
153153
return;
154154
}
155155
}
@@ -160,7 +160,7 @@ public void sendMessage(DiscordMessageEvent event, String message, boolean group
160160
}
161161
channel.sendMessage(strippedContent)
162162
.setAllowedMentions(groupMentions ? null : DiscordUtil.NO_GROUP_MENTIONS)
163-
.queue();
163+
.queue(null, error -> logger.log(Level.WARNING, "Failed to send message to channel " + channel.getName(), error));
164164
}
165165

166166
public void startup() throws LoginException, InterruptedException {
@@ -229,7 +229,7 @@ public void startup() throws LoginException, InterruptedException {
229229
}
230230

231231
// Load emotes into cache, JDA will handle updates from here on out.
232-
guild.retrieveEmojis().queue();
232+
guild.retrieveEmojis().queue(null, error -> logger.log(Level.WARNING, "Failed to retrieve emojis from guild", error));
233233

234234
updatePrimaryChannel();
235235

@@ -390,8 +390,8 @@ public void updatePresence() {
390390

391391
public void updateTypesRelay() {
392392
if (!getSettings().isShowAvatar() && !getSettings().isCustomBotName()) {
393-
for (WrappedWebhookClient webhook : channelIdToWebhook.values()) {
394-
webhook.close();
393+
for (WebhookDispatcher dispatcher : channelIdToWebhook.values()) {
394+
dispatcher.close();
395395
}
396396
typeToChannelId.clear();
397397
channelIdToWebhook.clear();
@@ -410,14 +410,14 @@ public void updateTypesRelay() {
410410

411411
final Webhook webhook = DiscordUtil.getOrCreateWebhook(channel, DiscordUtil.ADVANCED_RELAY_NAME).join();
412412
if (webhook == null) {
413-
final WrappedWebhookClient current = channelIdToWebhook.remove(channel.getId());
413+
final WebhookDispatcher current = channelIdToWebhook.remove(channel.getId());
414414
if (current != null) {
415415
current.close();
416416
}
417417
continue;
418418
}
419419
typeToChannelId.put(type, channel.getId());
420-
channelIdToWebhook.put(channel.getId(), DiscordUtil.getWebhookClient(webhook.getIdLong(), webhook.getToken(), jda.getHttpClient()));
420+
channelIdToWebhook.put(channel.getId(), new WebhookDispatcher(DiscordUtil.getWebhookClient(webhook.getIdLong(), webhook.getToken(), jda.getHttpClient())));
421421
}
422422
}
423423

@@ -471,7 +471,7 @@ public void updateConsoleRelay() {
471471
}
472472

473473
shutdownConsoleRelay(false);
474-
consoleWebhook = DiscordUtil.getWebhookClient(webhookId, webhookToken, jda.getHttpClient());
474+
consoleWebhook = new WebhookDispatcher(DiscordUtil.getWebhookClient(webhookId, webhookToken, jda.getHttpClient()), 50);
475475
if (injector == null || injector.isRemoved()) {
476476
injector = new ConsoleInjector(this);
477477
injector.start();
@@ -510,8 +510,8 @@ public void shutdown() {
510510

511511
shutdownConsoleRelay(true);
512512

513-
for (WrappedWebhookClient webhook : channelIdToWebhook.values()) {
514-
webhook.close();
513+
for (WebhookDispatcher dispatcher : channelIdToWebhook.values()) {
514+
dispatcher.close();
515515
}
516516

517517
// Unregister leftover jda listeners
@@ -583,7 +583,10 @@ public CompletableFuture<Void> modifyMemberRoles(InteractionMember member, Colle
583583
}
584584

585585
final CompletableFuture<Void> future = new CompletableFuture<>();
586-
guild.modifyMemberRoles(((InteractionMemberImpl) member).getJdaObject(), add, remove).queue(future::complete);
586+
guild.modifyMemberRoles(((InteractionMemberImpl) member).getJdaObject(), add, remove).queue(future::complete, error -> {
587+
logger.log(Level.WARNING, "Failed to modify member roles", error);
588+
future.complete(null);
589+
});
587590
return future;
588591
}
589592

@@ -613,7 +616,7 @@ public DiscordSettings getSettings() {
613616
return plugin.getSettings();
614617
}
615618

616-
public WrappedWebhookClient getConsoleWebhook() {
619+
public WebhookDispatcher getConsoleWebhook() {
617620
return consoleWebhook;
618621
}
619622

EssentialsDiscord/src/main/java/net/essentialsx/discord/util/ConsoleInjector.java

Lines changed: 18 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414
import org.bukkit.Bukkit;
1515

1616
import java.time.Instant;
17+
import java.util.concurrent.ArrayBlockingQueue;
1718
import java.util.concurrent.BlockingQueue;
18-
import java.util.concurrent.LinkedBlockingQueue;
19-
import java.util.concurrent.atomic.AtomicInteger;
20-
import java.util.concurrent.atomic.AtomicLong;
2119
import java.util.regex.Pattern;
2220

2321
import static com.earth2me.essentials.I18n.tlLiteral;
@@ -27,40 +25,18 @@ public class ConsoleInjector extends AbstractAppender {
2725
private final static java.util.logging.Logger logger = EssentialsDiscord.getWrappedLogger();
2826

2927
private final static long QUEUE_PROCESS_PERIOD_SECONDS = 2;
28+
private final static int QUEUE_CAPACITY = 500;
3029

3130
private final JDADiscordService jda;
32-
private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
31+
private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
3332
private final int taskId;
3433
private boolean removed = false;
3534

36-
private final AtomicLong lastRateLimitTime = new AtomicLong(0);
37-
private final AtomicInteger recentRateLimit = new AtomicInteger(0);
38-
private final AtomicInteger totalBackoffEvents = new AtomicInteger();
39-
4035
public ConsoleInjector(JDADiscordService jda) {
4136
super("EssentialsX-ConsoleInjector", null, null, false);
4237
this.jda = jda;
4338
((Logger) LogManager.getRootLogger()).addAppender(this);
4439
taskId = Bukkit.getScheduler().runTaskTimerAsynchronously(jda.getPlugin(), () -> {
45-
// Check to see if we're supposed to be backing off, preform backoff if the case.
46-
if (recentRateLimit.get() < 0) {
47-
if (totalBackoffEvents.get() * 20 >= jda.getSettings().getConsoleSkipDelay() * 60) {
48-
logger.warning("EssXBackoff: Reached console skip delay, attempt to skip");
49-
jda.getConsoleWebhook().abandonRequests();
50-
messageQueue.clear();
51-
totalBackoffEvents.set(0);
52-
recentRateLimit.set(0);
53-
lastRateLimitTime.set(0);
54-
return;
55-
}
56-
57-
final int backoff = recentRateLimit.incrementAndGet();
58-
if (jda.isDebug()) {
59-
logger.warning("EssXBackoff: Webhook backoff in progress, skipping queue processing. Resuming in " + Math.abs(backoff) + " cycles.");
60-
}
61-
return;
62-
}
63-
6440
final StringBuilder buffer = new StringBuilder();
6541
String curLine;
6642
while ((curLine = messageQueue.peek()) != null) {
@@ -78,7 +54,11 @@ public ConsoleInjector(JDADiscordService jda) {
7854
}
7955

8056
private void sendMessage(String content) {
81-
jda.getConsoleWebhook().send(jda.getWebhookMessage(content)).exceptionally(e -> {
57+
final WebhookDispatcher webhook = jda.getConsoleWebhook();
58+
if (webhook == null || webhook.isShutdown()) {
59+
return;
60+
}
61+
webhook.send(jda.getWebhookMessage(content)).exceptionally(e -> {
8262
logger.severe(tlLiteral("discordErrorWebhook"));
8363
remove();
8464
return null;
@@ -97,40 +77,13 @@ public void append(LogEvent event) {
9777
return;
9878
}
9979

100-
if (entry.startsWith("EssXBackoff: ")) {
101-
return;
102-
}
103-
104-
if (event.getLoggerName().contains("club.minnced.discord.webhook.WebhookClient") && entry.startsWith("Encountered 429, retrying after ")) {
105-
if (recentRateLimit.get() >= 0) {
106-
recentRateLimit.incrementAndGet();
107-
}
108-
109-
if (lastRateLimitTime.get() == 0 || System.currentTimeMillis() - lastRateLimitTime.get() > 5000) {
110-
lastRateLimitTime.set(System.currentTimeMillis());
111-
112-
// A negative value would mean the timer is current preforming a backoff, don't stop it.
113-
if (recentRateLimit.get() >= 0) {
114-
recentRateLimit.set(0);
115-
}
116-
} else if (recentRateLimit.get() >= 2) {
117-
// Start the webhook backoff, defaulting to 20s, which should reset our bucket.
118-
if (jda.isDebug()) {
119-
totalBackoffEvents.getAndIncrement();
120-
logger.warning("EssXBackoff: Beginning Webhook Backoff");
121-
}
122-
recentRateLimit.set(-20);
123-
}
124-
return;
125-
}
126-
12780
final String[] loggerNameSplit = event.getLoggerName().split("\\.");
12881
final String loggerName = loggerNameSplit[loggerNameSplit.length - 1].trim();
12982

13083
if (!loggerName.isEmpty()) {
13184
entry = "[" + loggerName + "] " + entry;
13285
}
133-
86+
13487
if (!jda.getSettings().getConsoleFilters().isEmpty()) {
13588
for (final Pattern pattern : jda.getSettings().getConsoleFilters()) {
13689
if (pattern.matcher(entry).find()) {
@@ -139,11 +92,18 @@ public void append(LogEvent event) {
13992
}
14093
}
14194

142-
messageQueue.addAll(Splitter.fixedLength(Message.MAX_CONTENT_LENGTH - 50).splitToList(
95+
for (final String line : Splitter.fixedLength(Message.MAX_CONTENT_LENGTH - 50).splitToList(
14396
MessageUtil.formatMessage(jda.getSettings().getConsoleFormat(),
14497
TimeFormat.TIME_LONG.format(Instant.now()),
14598
event.getLevel().name(),
146-
MessageUtil.sanitizeDiscordMarkdown(entry))));
99+
MessageUtil.sanitizeDiscordMarkdown(entry)))) {
100+
101+
if (!messageQueue.offer(line)) {
102+
if (jda.isDebug()) {
103+
logger.fine("Console relay queue full, dropping message.");
104+
}
105+
}
106+
}
147107
}
148108

149109
public void remove() {

EssentialsDiscord/src/main/java/net/essentialsx/discord/util/DiscordUtil.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import net.dv8tion.jda.api.entities.channel.concrete.TextChannel;
1616
import net.essentialsx.api.v2.events.discord.DiscordMessageEvent;
1717
import net.essentialsx.api.v2.services.discord.MessageType;
18+
import net.essentialsx.discord.EssentialsDiscord;
1819
import net.essentialsx.discord.JDADiscordService;
1920
import okhttp3.OkHttpClient;
2021
import org.bukkit.Bukkit;
@@ -26,8 +27,11 @@
2627
import java.util.concurrent.CompletableFuture;
2728
import java.util.concurrent.CopyOnWriteArrayList;
2829
import java.util.function.Predicate;
30+
import java.util.logging.Level;
31+
import java.util.logging.Logger;
2932

3033
public final class DiscordUtil {
34+
private static final Logger logger = EssentialsDiscord.getWrappedLogger();
3135
public final static String ADVANCED_RELAY_NAME = "EssX Advanced Relay";
3236
public final static String CONSOLE_RELAY_NAME = "EssX Console Relay";
3337
public final static List<Message.MentionType> NO_GROUP_MENTIONS;
@@ -63,7 +67,6 @@ public static WrappedWebhookClient getWebhookClient(long id, String token, OkHtt
6367
*
6468
* @param channel The channel to search for/create webhooks in.
6569
* @param webhookName The name of the webhook to search for/create.
66-
*
6770
* @return A future which completes with the webhook by the given name in the given channel, or null
6871
* if the bot lacks the proper permissions.
6972
*/
@@ -82,7 +85,7 @@ public static CompletableFuture<Webhook> getOrCreateWebhook(final TextChannel ch
8285
}
8386
}
8487
createWebhook(channel, webhookName).thenAccept(future::complete);
85-
});
88+
}, error -> logger.log(Level.WARNING, "Failed to retrieve webhooks from channel " + channel.getName(), error));
8689
return future;
8790
}
8891

@@ -101,17 +104,17 @@ private static void cleanWebhooks(final Guild guild, String webhookName) {
101104
guild.retrieveWebhooks().queue(webhooks -> {
102105
for (final Webhook webhook : webhooks) {
103106
if (webhook.getName().equalsIgnoreCase(webhookName) && !ACTIVE_WEBHOOKS.contains(webhook.getId())) {
104-
webhook.delete().reason("EssentialsX Discord: webhook cleanup").queue();
107+
webhook.delete().reason("EssentialsX Discord: webhook cleanup").queue(null, error -> logger.log(Level.WARNING, "Failed to delete webhook " + webhook.getName(), error));
105108
}
106109
}
107-
});
110+
}, error -> logger.log(Level.WARNING, "Failed to retrieve webhooks from guild " + guild.getName(), error));
108111
}
109112

110113
/**
111114
* Creates a webhook with the given name in the given channel.
112115
*
113-
* @param channel The channel to search for webhooks in.
114-
* @param webhookName The name of the webhook to look for.
116+
* @param channel The channel to search for webhooks in.
117+
* @param webhookName The name of the webhook to look for.
115118
* @return A future which completes with the webhook by the given name in the given channel or null if no permissions.
116119
*/
117120
public static CompletableFuture<Webhook> createWebhook(TextChannel channel, String webhookName) {
@@ -123,7 +126,7 @@ public static CompletableFuture<Webhook> createWebhook(TextChannel channel, Stri
123126
channel.createWebhook(webhookName).queue(webhook -> {
124127
future.complete(webhook);
125128
ACTIVE_WEBHOOKS.addIfAbsent(webhook.getId());
126-
});
129+
}, error -> logger.log(Level.WARNING, "Failed to create webhook " + webhookName + " in channel " + channel.getName(), error));
127130
return future;
128131
}
129132

0 commit comments

Comments
 (0)