Skip to content

Commit 65c1c86

Browse files
committed
feat(velocity): evaluate delivery conditions in dispatch paths
1 parent 7b8d877 commit 65c1c86

4 files changed

Lines changed: 272 additions & 3 deletions

File tree

backends/src/main/java/dev/objz/commandbridge/backends/api/BackendCommandBridgeImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,11 +231,16 @@ private PluginMessage readPluginMessage(Envelope env) {
231231
}
232232

233233
private PluginMessage requirePluginMessage(Envelope env) {
234+
PluginMessage message;
234235
try {
235-
return Envelope.MAPPER.treeToValue(env.payload(), PluginMessage.class);
236+
message = Envelope.MAPPER.treeToValue(env.payload(), PluginMessage.class);
236237
} catch (Exception e) {
237238
throw new CompletionException(e);
238239
}
240+
if (message.error() != null) {
241+
throw new CompletionException(new IllegalStateException(message.error()));
242+
}
243+
return message;
239244
}
240245

241246
private String localServerId() {

velocity/src/main/java/dev/objz/commandbridge/velocity/Main.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import dev.objz.commandbridge.velocity.dispatch.CommandEntry;
3333
import dev.objz.commandbridge.velocity.api.VelocityCommandBridgeImpl;
3434
import dev.objz.commandbridge.velocity.api.VelocityPluginMessageHandler;
35+
import dev.objz.commandbridge.velocity.api.PluginMessageQueue;
3536
import dev.objz.commandbridge.velocity.cmd.bridge.framework.ArgumentBridge;
3637
import dev.objz.commandbridge.velocity.cmd.bridge.packetevents.PacketEventsArgumentBridge;
3738
import dev.objz.commandbridge.velocity.cmd.bridge.types.OfflinePlayerArgumentType;
@@ -85,6 +86,7 @@ public final class Main {
8586
private VelocityConfig cfg;
8687
private SessionHub sessions;
8788
private PlayerTracker playerTracker;
89+
private PluginMessageQueue pluginMessageQueue;
8890
private UserCache userCache;
8991
private AuthHandler authHandler;
9092
private CBCommand command;
@@ -155,6 +157,7 @@ public void onProxyInitialization(ProxyInitializeEvent e) {
155157

156158
sessions = new SessionHub();
157159
playerTracker = new PlayerTracker();
160+
pluginMessageQueue = new PluginMessageQueue();
158161
sessions.onRemove(session -> {
159162
playerTracker.remove(session.id());
160163
if (api != null) {
@@ -206,7 +209,7 @@ public void onProxyInitialization(ProxyInitializeEvent e) {
206209

207210
registrations.load(scriptManager.enabled());
208211

209-
api = new VelocityCommandBridgeImpl(sessions, playerTracker, cfg.serverId(), endpointServer);
212+
api = new VelocityCommandBridgeImpl(sessions, playerTracker, cfg.serverId(), endpointServer, pluginMessageQueue);
210213

211214
installRoutes();
212215

velocity/src/main/java/dev/objz/commandbridge/velocity/api/VelocityCommandBridgeImpl.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public final class VelocityCommandBridgeImpl implements CommandBridgeAPI {
4343
private final PlayerTracker playerTracker;
4444
private final String serverId;
4545
private final EndpointServer endpointServer;
46+
private final PluginMessageQueue pluginMessageQueue;
4647

4748
private final ConcurrentHashMap<Class<?>, MessageChannel<?>> channels = new ConcurrentHashMap<>();
4849
private final ConcurrentHashMap<String, Class<?>> payloadTypesByName = new ConcurrentHashMap<>();
@@ -53,11 +54,13 @@ public final class VelocityCommandBridgeImpl implements CommandBridgeAPI {
5354
public VelocityCommandBridgeImpl(SessionHub sessions,
5455
PlayerTracker playerTracker,
5556
String serverId,
56-
EndpointServer endpointServer) {
57+
EndpointServer endpointServer,
58+
PluginMessageQueue pluginMessageQueue) {
5759
this.sessions = Objects.requireNonNull(sessions);
5860
this.playerTracker = Objects.requireNonNull(playerTracker);
5961
this.serverId = Objects.requireNonNull(serverId);
6062
this.endpointServer = Objects.requireNonNull(endpointServer);
63+
this.pluginMessageQueue = Objects.requireNonNull(pluginMessageQueue);
6164
}
6265

6366
@Override
@@ -154,6 +157,10 @@ public void onServerDisconnected(ClientSession session) {
154157
public void handlePluginMessageRequest(Endpoint endpoint, Envelope env) {
155158
String to = env.to();
156159
if (to != null && !to.equals(serverId) && !"*".equals(to)) {
160+
PluginMessage relayMsg = readPluginMessage(env);
161+
if (relayMsg != null && !evaluateRelayConditions(env, relayMsg)) {
162+
return;
163+
}
157164
relayDirect(env);
158165
return;
159166
}
@@ -216,6 +223,18 @@ private CompletableFuture<Void> sendPluginMessage(Platform.ServerTarget target,
216223
return CompletableFuture.completedFuture(null);
217224
}
218225

226+
if (payload.requirePlayer() != null) {
227+
if (!playerTracker.isPlayerOn(payload.requirePlayer(), target.id())) {
228+
return CompletableFuture.completedFuture(null);
229+
}
230+
}
231+
if (payload.whenOnline() != null) {
232+
if (!playerTracker.isPlayerOn(payload.whenOnline(), target.id())) {
233+
pluginMessageQueue.queue(payload.whenOnline(), target, payload, null);
234+
return CompletableFuture.completedFuture(null);
235+
}
236+
}
237+
219238
Optional<ClientSession> session = sessions.findSession(target.id(), toLocation(target.type()));
220239
if (session.isEmpty()) {
221240
return CompletableFuture.failedFuture(
@@ -251,6 +270,13 @@ private CompletableFuture<PluginMessage> requestPluginMessage(Platform.ServerTar
251270
return CompletableFuture.completedFuture(payload);
252271
}
253272

273+
if (payload.requirePlayer() != null) {
274+
if (!playerTracker.isPlayerOn(payload.requirePlayer(), target.id())) {
275+
return CompletableFuture.failedFuture(
276+
new IllegalStateException("Player not on target server: " + target.id()));
277+
}
278+
}
279+
254280
Optional<ClientSession> session = sessions.findSession(target.id(), toLocation(target.type()));
255281
if (session.isEmpty()) {
256282
return CompletableFuture.failedFuture(
@@ -331,6 +357,43 @@ private void relayBroadcast(Envelope env) {
331357
}
332358
}
333359

360+
private boolean evaluateRelayConditions(Envelope env, PluginMessage message) {
361+
String targetId = env.to();
362+
363+
if (message.requirePlayer() != null) {
364+
if (!playerTracker.isPlayerOn(message.requirePlayer(), targetId)) {
365+
if (message.expectsResponse()) {
366+
sendErrorResponse(env, "Player not on target server: " + targetId);
367+
}
368+
return false;
369+
}
370+
}
371+
372+
if (message.whenOnline() != null) {
373+
if (!playerTracker.isPlayerOn(message.whenOnline(), targetId)) {
374+
pluginMessageQueue.queue(message.whenOnline(),
375+
Platform.BACKEND.target(targetId), message, env.from());
376+
return false;
377+
}
378+
}
379+
380+
return true;
381+
}
382+
383+
private void sendErrorResponse(Envelope originalEnv, String error) {
384+
Optional<ClientSession> senderSession = sessions.get(originalEnv.from());
385+
if (senderSession.isEmpty() || !isRoutable(senderSession.get())) {
386+
return;
387+
}
388+
PluginMessage errorMsg = new PluginMessage("error", null, false, null, null, error);
389+
Envelope response = Envelope.reply(originalEnv, MessageType.PLUGIN_MESSAGE_RESPONSE, serverId,
390+
Envelope.MAPPER.valueToTree(errorMsg));
391+
endpointServer.send(senderSession.get().endpoint(), response).dispatch().exceptionally(ex -> {
392+
Log.warn("Failed to send error response: {}", ex.getMessage());
393+
return null;
394+
});
395+
}
396+
334397
private PluginMessage readPluginMessage(Envelope env) {
335398
try {
336399
return Envelope.MAPPER.treeToValue(env.payload(), PluginMessage.class);
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package dev.objz.commandbridge.velocity.api;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
5+
import static org.junit.jupiter.api.Assertions.assertThrows;
6+
7+
import dev.objz.commandbridge.api.platform.Platform;
8+
import dev.objz.commandbridge.net.Endpoint;
9+
import dev.objz.commandbridge.net.ResponseAwaiter;
10+
import dev.objz.commandbridge.net.SendOperation;
11+
import dev.objz.commandbridge.net.payloads.PluginMessage;
12+
import dev.objz.commandbridge.net.proto.Envelope;
13+
import dev.objz.commandbridge.security.AuthStatus;
14+
import dev.objz.commandbridge.scripting.model.enums.Location;
15+
import dev.objz.commandbridge.velocity.net.EndpointServer;
16+
import dev.objz.commandbridge.velocity.net.session.ClientSession;
17+
import dev.objz.commandbridge.velocity.net.session.SessionHub;
18+
import dev.objz.commandbridge.velocity.util.PlayerTracker;
19+
import org.junit.jupiter.api.Test;
20+
21+
import java.lang.reflect.Method;
22+
import java.time.Duration;
23+
import java.util.List;
24+
import java.util.UUID;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.CompletionException;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
class VelocityCommandBridgeImplTest {
30+
31+
@Test
32+
void requirePlayerDropsSendWhenAbsent() {
33+
SessionHub sessions = new SessionHub();
34+
PlayerTracker tracker = new PlayerTracker();
35+
PluginMessageQueue queue = new PluginMessageQueue();
36+
CountingEndpointServer endpointServer = new CountingEndpointServer();
37+
VelocityCommandBridgeImpl bridge = new VelocityCommandBridgeImpl(
38+
sessions,
39+
tracker,
40+
"velocity",
41+
endpointServer,
42+
queue);
43+
44+
PluginMessage payload = new PluginMessage("test", null, false, UUID.randomUUID(), null, null);
45+
46+
invokeSend(bridge, Platform.BACKEND.target("backend-1"), payload).join();
47+
48+
assertEquals(0, endpointServer.sendCount());
49+
}
50+
51+
@Test
52+
void requirePlayerDeliversSendWhenPresent() {
53+
SessionHub sessions = new SessionHub();
54+
PlayerTracker tracker = new PlayerTracker();
55+
PluginMessageQueue queue = new PluginMessageQueue();
56+
CountingEndpointServer endpointServer = new CountingEndpointServer();
57+
VelocityCommandBridgeImpl bridge = new VelocityCommandBridgeImpl(
58+
sessions,
59+
tracker,
60+
"velocity",
61+
endpointServer,
62+
queue);
63+
64+
UUID player = UUID.randomUUID();
65+
tracker.addPlayer("backend-1", player);
66+
ClientSession session = sessions.add("backend-1", new TestEndpoint());
67+
session.location(Location.BACKEND);
68+
session.status(AuthStatus.AUTH_OK);
69+
70+
PluginMessage payload = new PluginMessage("test", null, false, player, null, null);
71+
72+
invokeSend(bridge, Platform.BACKEND.target("backend-1"), payload).join();
73+
74+
assertEquals(1, endpointServer.sendCount());
75+
}
76+
77+
@Test
78+
void requirePlayerFailsRequestWhenAbsent() {
79+
SessionHub sessions = new SessionHub();
80+
PlayerTracker tracker = new PlayerTracker();
81+
PluginMessageQueue queue = new PluginMessageQueue();
82+
CountingEndpointServer endpointServer = new CountingEndpointServer();
83+
VelocityCommandBridgeImpl bridge = new VelocityCommandBridgeImpl(
84+
sessions,
85+
tracker,
86+
"velocity",
87+
endpointServer,
88+
queue);
89+
90+
PluginMessage payload = new PluginMessage("test", null, true, UUID.randomUUID(), null, null);
91+
CompletableFuture<PluginMessage> future = invokeRequest(
92+
bridge,
93+
Platform.BACKEND.target("backend-1"),
94+
payload,
95+
Duration.ofSeconds(1));
96+
97+
CompletionException ex = assertThrows(CompletionException.class, future::join);
98+
Throwable cause = assertInstanceOf(IllegalStateException.class, ex.getCause());
99+
assertEquals("Player not on target server: backend-1", cause.getMessage());
100+
}
101+
102+
@Test
103+
void whenOnlineQueuesSendWhenAbsent() {
104+
SessionHub sessions = new SessionHub();
105+
PlayerTracker tracker = new PlayerTracker();
106+
PluginMessageQueue queue = new PluginMessageQueue();
107+
CountingEndpointServer endpointServer = new CountingEndpointServer();
108+
VelocityCommandBridgeImpl bridge = new VelocityCommandBridgeImpl(
109+
sessions,
110+
tracker,
111+
"velocity",
112+
endpointServer,
113+
queue);
114+
115+
UUID player = UUID.randomUUID();
116+
PluginMessage payload = new PluginMessage("test", null, false, null, player, null);
117+
118+
invokeSend(bridge, Platform.BACKEND.target("backend-1"), payload).join();
119+
120+
List<PluginMessageQueue.QueuedMessage> drained = queue.drain("backend-1", player);
121+
assertEquals(1, drained.size());
122+
assertEquals(0, endpointServer.sendCount());
123+
}
124+
125+
@SuppressWarnings("unchecked")
126+
private static CompletableFuture<Void> invokeSend(
127+
VelocityCommandBridgeImpl bridge,
128+
Platform.ServerTarget target,
129+
PluginMessage payload) {
130+
try {
131+
Method method = VelocityCommandBridgeImpl.class.getDeclaredMethod(
132+
"sendPluginMessage",
133+
Platform.ServerTarget.class,
134+
PluginMessage.class);
135+
method.setAccessible(true);
136+
return (CompletableFuture<Void>) method.invoke(bridge, target, payload);
137+
} catch (Exception e) {
138+
throw new RuntimeException(e);
139+
}
140+
}
141+
142+
@SuppressWarnings("unchecked")
143+
private static CompletableFuture<PluginMessage> invokeRequest(
144+
VelocityCommandBridgeImpl bridge,
145+
Platform.ServerTarget target,
146+
PluginMessage payload,
147+
Duration timeout) {
148+
try {
149+
Method method = VelocityCommandBridgeImpl.class.getDeclaredMethod(
150+
"requestPluginMessage",
151+
Platform.ServerTarget.class,
152+
PluginMessage.class,
153+
Duration.class);
154+
method.setAccessible(true);
155+
return (CompletableFuture<PluginMessage>) method.invoke(bridge, target, payload, timeout);
156+
} catch (Exception e) {
157+
throw new RuntimeException(e);
158+
}
159+
}
160+
161+
private static final class CountingEndpointServer implements EndpointServer {
162+
private final AtomicInteger sendCount = new AtomicInteger();
163+
164+
@Override
165+
public void start() {
166+
}
167+
168+
@Override
169+
public void stop() {
170+
}
171+
172+
@Override
173+
public SendOperation send(Endpoint endpoint, Envelope request) {
174+
sendCount.incrementAndGet();
175+
return new SendOperation(endpoint, request, new ResponseAwaiter());
176+
}
177+
178+
@Override
179+
public void close(Endpoint endpoint) {
180+
}
181+
182+
private int sendCount() {
183+
return sendCount.get();
184+
}
185+
}
186+
187+
private static final class TestEndpoint implements Endpoint {
188+
@Override
189+
public CompletableFuture<Void> send(Envelope env) {
190+
return CompletableFuture.completedFuture(null);
191+
}
192+
193+
@Override
194+
public boolean isOpen() {
195+
return true;
196+
}
197+
}
198+
}

0 commit comments

Comments
 (0)