Skip to content

Commit 7b8d877

Browse files
committed
feat(velocity): add PluginMessageQueue for whenOnline delivery
1 parent e947ee1 commit 7b8d877

3 files changed

Lines changed: 198 additions & 0 deletions

File tree

velocity/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ dependencies {
3939
compileOnly("com.github.retrooper:packetevents-velocity:2.11.2")
4040

4141
testImplementation(libs.junit.jupiter)
42+
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
4243
}
4344

4445
tasks.test {
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package dev.objz.commandbridge.velocity.api;
2+
3+
import dev.objz.commandbridge.api.platform.Platform;
4+
import dev.objz.commandbridge.logging.Log;
5+
import dev.objz.commandbridge.net.payloads.PluginMessage;
6+
7+
import java.util.ArrayList;
8+
import java.util.Iterator;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.Objects;
12+
import java.util.Queue;
13+
import java.util.UUID;
14+
import java.util.concurrent.ConcurrentHashMap;
15+
import java.util.concurrent.ConcurrentLinkedQueue;
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.atomic.AtomicInteger;
18+
19+
public final class PluginMessageQueue {
20+
21+
private static final long TTL_MS = TimeUnit.MINUTES.toMillis(5);
22+
private static final int MAX_TOTAL = 1000;
23+
24+
record QueuedMessage(Platform.ServerTarget target, PluginMessage message, String from, long queuedAt) {
25+
}
26+
27+
private final ConcurrentHashMap<UUID, Queue<QueuedMessage>> queues = new ConcurrentHashMap<>();
28+
private final AtomicInteger totalSize = new AtomicInteger(0);
29+
30+
public void queue(UUID playerUuid, Platform.ServerTarget target, PluginMessage message, String from) {
31+
Objects.requireNonNull(playerUuid);
32+
Objects.requireNonNull(target);
33+
Objects.requireNonNull(message);
34+
if (totalSize.get() >= MAX_TOTAL) {
35+
Log.warn("PluginMessageQueue is full ({} entries). Dropping message for player {}", MAX_TOTAL, playerUuid);
36+
return;
37+
}
38+
// Strip whenOnline from stored message to prevent re-queue on replay
39+
PluginMessage stored = new PluginMessage(message.channelType(), message.data(),
40+
message.expectsResponse(), message.requirePlayer(), null, message.error());
41+
queues.computeIfAbsent(playerUuid, k -> new ConcurrentLinkedQueue<>())
42+
.add(new QueuedMessage(target, stored, from, System.currentTimeMillis()));
43+
totalSize.incrementAndGet();
44+
}
45+
46+
public List<QueuedMessage> drain(String clientId, UUID playerUuid) {
47+
Objects.requireNonNull(clientId);
48+
Objects.requireNonNull(playerUuid);
49+
Queue<QueuedMessage> queue = queues.get(playerUuid);
50+
if (queue == null || queue.isEmpty()) {
51+
return List.of();
52+
}
53+
long now = System.currentTimeMillis();
54+
List<QueuedMessage> result = new ArrayList<>();
55+
Iterator<QueuedMessage> it = queue.iterator();
56+
while (it.hasNext()) {
57+
QueuedMessage entry = it.next();
58+
if (now - entry.queuedAt() > TTL_MS) {
59+
it.remove();
60+
totalSize.decrementAndGet();
61+
continue;
62+
}
63+
if (entry.target().id().equalsIgnoreCase(clientId)) {
64+
it.remove();
65+
totalSize.decrementAndGet();
66+
result.add(entry);
67+
}
68+
}
69+
if (queue.isEmpty()) {
70+
queues.remove(playerUuid, queue);
71+
}
72+
return List.copyOf(result);
73+
}
74+
75+
public void removeByServer(String serverId) {
76+
Objects.requireNonNull(serverId);
77+
for (Map.Entry<UUID, Queue<QueuedMessage>> entry : queues.entrySet()) {
78+
Queue<QueuedMessage> queue = entry.getValue();
79+
Iterator<QueuedMessage> it = queue.iterator();
80+
while (it.hasNext()) {
81+
if (it.next().target().id().equalsIgnoreCase(serverId)) {
82+
it.remove();
83+
totalSize.decrementAndGet();
84+
}
85+
}
86+
if (queue.isEmpty()) {
87+
queues.remove(entry.getKey(), queue);
88+
}
89+
}
90+
}
91+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package dev.objz.commandbridge.velocity.api;
2+
3+
import static dev.objz.commandbridge.api.platform.Platform.backend;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
import static org.junit.jupiter.api.Assertions.assertNull;
6+
import static org.junit.jupiter.api.Assertions.assertTrue;
7+
8+
import java.util.List;
9+
import java.util.UUID;
10+
11+
import org.junit.jupiter.api.BeforeAll;
12+
import org.junit.jupiter.api.BeforeEach;
13+
import org.junit.jupiter.api.Test;
14+
15+
import dev.objz.commandbridge.api.platform.Platform;
16+
import dev.objz.commandbridge.logging.Log;
17+
import dev.objz.commandbridge.net.payloads.PluginMessage;
18+
19+
class PluginMessageQueueTest {
20+
21+
private PluginMessageQueue queue;
22+
23+
@BeforeAll
24+
static void installLog() {
25+
try {
26+
Log.install(java.util.logging.Logger.getLogger("test"));
27+
} catch (IllegalStateException ignored) { }
28+
}
29+
30+
@BeforeEach
31+
void setUp() {
32+
queue = new PluginMessageQueue();
33+
}
34+
35+
@Test
36+
void queueAndDrain() {
37+
UUID player = UUID.randomUUID();
38+
Platform.ServerTarget target = backend("survival-1");
39+
PluginMessage msg = new PluginMessage("test-channel", null, false);
40+
41+
queue.queue(player, target, msg, "velocity");
42+
43+
List<PluginMessageQueue.QueuedMessage> drained = queue.drain("survival-1", player);
44+
assertEquals(1, drained.size());
45+
assertEquals("survival-1", drained.get(0).target().id());
46+
47+
List<PluginMessageQueue.QueuedMessage> second = queue.drain("survival-1", player);
48+
assertTrue(second.isEmpty());
49+
}
50+
51+
@Test
52+
void fifoOrdering() {
53+
UUID player = UUID.randomUUID();
54+
Platform.ServerTarget target = backend("s1");
55+
56+
queue.queue(player, target, new PluginMessage("msgA", null, false), "velocity");
57+
queue.queue(player, target, new PluginMessage("msgB", null, false), "velocity");
58+
queue.queue(player, target, new PluginMessage("msgC", null, false), "velocity");
59+
60+
List<PluginMessageQueue.QueuedMessage> drained = queue.drain("s1", player);
61+
assertEquals(3, drained.size());
62+
assertEquals("msgA", drained.get(0).message().channelType());
63+
assertEquals("msgB", drained.get(1).message().channelType());
64+
assertEquals("msgC", drained.get(2).message().channelType());
65+
}
66+
67+
@Test
68+
void serverDisconnectCleanup() {
69+
UUID player = UUID.randomUUID();
70+
Platform.ServerTarget target = backend("survival-1");
71+
PluginMessage msg = new PluginMessage("test-channel", null, false);
72+
73+
queue.queue(player, target, msg, "velocity");
74+
queue.removeByServer("survival-1");
75+
76+
List<PluginMessageQueue.QueuedMessage> drained = queue.drain("survival-1", player);
77+
assertTrue(drained.isEmpty());
78+
}
79+
80+
@Test
81+
void whenOnlineFieldStripped() {
82+
UUID player = UUID.randomUUID();
83+
UUID onlineTarget = UUID.randomUUID();
84+
Platform.ServerTarget target = backend("s1");
85+
PluginMessage msg = new PluginMessage("test-channel", null, false, null, onlineTarget, null);
86+
87+
queue.queue(player, target, msg, "velocity");
88+
89+
List<PluginMessageQueue.QueuedMessage> drained = queue.drain("s1", player);
90+
assertEquals(1, drained.size());
91+
assertNull(drained.get(0).message().whenOnline());
92+
}
93+
94+
@Test
95+
void queueCapEnforced() {
96+
UUID player = UUID.randomUUID();
97+
Platform.ServerTarget target = backend("s1");
98+
99+
for (int i = 0; i < 1001; i++) {
100+
queue.queue(player, target, new PluginMessage("msg-" + i, null, false), "velocity");
101+
}
102+
103+
List<PluginMessageQueue.QueuedMessage> drained = queue.drain("s1", player);
104+
assertEquals(1000, drained.size());
105+
}
106+
}

0 commit comments

Comments
 (0)