Skip to content

Commit ac5da9f

Browse files
committed
feat: api implementation
1 parent b153684 commit ac5da9f

15 files changed

Lines changed: 943 additions & 15 deletions

File tree

api/src/main/java/dev/objz/commandbridge/api/CommandBridgeAPI.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@
1111

1212
import java.util.Optional;
1313
import java.util.Set;
14+
import java.util.concurrent.CompletableFuture;
1415
import java.util.function.Consumer;
1516

1617
public interface CommandBridgeAPI {
1718

1819
<T extends ChannelPayload, C extends MessageChannel<T>> C channel(ChannelType<T, C> type);
1920

20-
<P extends ChannelPayload> void broadcast(MessageChannel<P> channel, P payload);
21+
<P extends ChannelPayload> CompletableFuture<Void> broadcast(MessageChannel<P> channel, P payload);
2122

2223
Platform.ServerTarget server();
2324

api/src/main/java/dev/objz/commandbridge/api/CommandBridgeProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ public static <T extends CommandBridgeAPI> T get(Class<T> type) {
2525
return type.cast(api);
2626
}
2727

28-
static void register(CommandBridgeAPI impl) {
28+
public static void register(CommandBridgeAPI impl) {
2929
Objects.requireNonNull(impl);
3030
if (instance != null) {
3131
throw new IllegalStateException("CommandBridge already registered");
3232
}
3333
instance = impl;
3434
}
3535

36-
static void unregister() {
36+
public static void unregister() {
3737
instance = null;
3838
}
3939
}

api/src/main/java/dev/objz/commandbridge/api/channel/MessageChannel.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99

1010
public interface MessageChannel<P extends ChannelPayload> {
1111

12-
void send(Platform.ServerTarget target, P payload);
13-
14-
CompletableFuture<Void> sendAsync(Platform.ServerTarget target, P payload);
12+
CompletableFuture<Void> send(Platform.ServerTarget target, P payload);
1513

1614
CompletableFuture<P> request(Platform.ServerTarget target, P payload);
1715

api/src/main/java/dev/objz/commandbridge/api/channel/command/CommandChannel.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,19 @@
44
import dev.objz.commandbridge.api.platform.Platform;
55

66
import java.util.UUID;
7+
import java.util.concurrent.CompletableFuture;
78

89
public interface CommandChannel extends MessageChannel<CommandPayload> {
910

10-
default void console(Platform.ServerTarget target, String command) {
11-
send(target, CommandPayload.console(command));
11+
default CompletableFuture<Void> console(Platform.ServerTarget target, String command) {
12+
return send(target, CommandPayload.console(command));
1213
}
1314

14-
default void player(Platform.ServerTarget target, String command, UUID player) {
15-
send(target, CommandPayload.player(command, player));
15+
default CompletableFuture<Void> player(Platform.ServerTarget target, String command, UUID player) {
16+
return send(target, CommandPayload.player(command, player));
1617
}
1718

18-
default void operator(Platform.ServerTarget target, String command, UUID player) {
19-
send(target, CommandPayload.operator(command, player));
19+
default CompletableFuture<Void> operator(Platform.ServerTarget target, String command, UUID player) {
20+
return send(target, CommandPayload.operator(command, player));
2021
}
2122
}
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
package dev.objz.commandbridge.backends.api;
2+
3+
import dev.objz.commandbridge.api.CommandBridgeAPI;
4+
import dev.objz.commandbridge.api.CommandBridgeProvider;
5+
import dev.objz.commandbridge.api.channel.ChannelPayload;
6+
import dev.objz.commandbridge.api.channel.ChannelType;
7+
import dev.objz.commandbridge.api.channel.MessageChannel;
8+
import dev.objz.commandbridge.api.channel.command.CommandChannelType;
9+
import dev.objz.commandbridge.api.message.MessageContext;
10+
import dev.objz.commandbridge.api.message.MessageListener;
11+
import dev.objz.commandbridge.api.message.ServerEventListener;
12+
import dev.objz.commandbridge.api.message.Subscription;
13+
import dev.objz.commandbridge.api.platform.ConnectionState;
14+
import dev.objz.commandbridge.api.platform.Platform;
15+
import dev.objz.commandbridge.api.platform.PlayerLocator;
16+
import dev.objz.commandbridge.backends.net.client.BackendClient;
17+
import dev.objz.commandbridge.logging.Log;
18+
import dev.objz.commandbridge.net.channel.CommandMessageChannel;
19+
import dev.objz.commandbridge.net.channel.PluginMessageChannel;
20+
import dev.objz.commandbridge.net.payloads.PluginMessage;
21+
import dev.objz.commandbridge.net.proto.Envelope;
22+
import dev.objz.commandbridge.net.proto.MessageType;
23+
24+
import java.time.Duration;
25+
import java.util.Objects;
26+
import java.util.Optional;
27+
import java.util.Set;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.CompletionException;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.CopyOnWriteArrayList;
32+
import java.util.concurrent.CopyOnWriteArraySet;
33+
import java.util.concurrent.Executors;
34+
import java.util.concurrent.ScheduledExecutorService;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.concurrent.atomic.AtomicReference;
38+
import java.util.function.Consumer;
39+
40+
public final class BackendCommandBridgeImpl implements CommandBridgeAPI {
41+
42+
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(15);
43+
44+
private final BackendClient client;
45+
private final ConcurrentHashMap<Class<?>, MessageChannel<?>> channels = new ConcurrentHashMap<>();
46+
private final ConcurrentHashMap<String, ChannelType<?, ?>> channelTypesByName = new ConcurrentHashMap<>();
47+
private final ConcurrentHashMap<String, CopyOnWriteArrayList<MessageListener<?>>> listenersByType = new ConcurrentHashMap<>();
48+
private final CopyOnWriteArraySet<Consumer<ConnectionState>> stateListeners = new CopyOnWriteArraySet<>();
49+
private volatile ScheduledExecutorService statePoller;
50+
private final AtomicReference<ConnectionState> lastState = new AtomicReference<>(ConnectionState.DISCONNECTED);
51+
private final AtomicBoolean polling = new AtomicBoolean(false);
52+
private final AtomicBoolean providerRegistered = new AtomicBoolean(false);
53+
54+
public BackendCommandBridgeImpl(BackendClient client) {
55+
this.client = Objects.requireNonNull(client);
56+
}
57+
58+
public void bootstrap() {
59+
client.inboundRouter().register(MessageType.PLUGIN_MESSAGE, new BackendPluginMessageHandler(this, false));
60+
client.inboundRouter().register(MessageType.PLUGIN_MESSAGE_RESPONSE,
61+
new BackendPluginMessageHandler(this, true));
62+
if (providerRegistered.compareAndSet(false, true)) {
63+
CommandBridgeProvider.register(this);
64+
}
65+
}
66+
67+
public void shutdown() {
68+
if (providerRegistered.compareAndSet(true, false)) {
69+
CommandBridgeProvider.unregister();
70+
}
71+
if (statePoller != null) {
72+
statePoller.shutdownNow();
73+
}
74+
}
75+
76+
@Override
77+
public <T extends ChannelPayload, C extends MessageChannel<T>> C channel(ChannelType<T, C> type) {
78+
Objects.requireNonNull(type);
79+
channelTypesByName.putIfAbsent(type.getClass().getName(), type);
80+
MessageChannel<?> channel = channels.computeIfAbsent(type.getClass(), ignored -> createChannel(type));
81+
return typeCast(channel);
82+
}
83+
84+
@Override
85+
public <P extends ChannelPayload> CompletableFuture<Void> broadcast(MessageChannel<P> channel, P payload) {
86+
Objects.requireNonNull(channel);
87+
Objects.requireNonNull(payload);
88+
return channel.send(Platform.BACKEND.target("*"), payload);
89+
}
90+
91+
@Override
92+
public Platform.ServerTarget server() {
93+
return Platform.BACKEND.target(localServerId());
94+
}
95+
96+
@Override
97+
public ConnectionState connectionState() {
98+
return client.status().toConnectionState();
99+
}
100+
101+
@Override
102+
public Optional<Set<String>> connectedServers() {
103+
return Optional.empty();
104+
}
105+
106+
@Override
107+
public Optional<PlayerLocator> playerLocator() {
108+
return Optional.empty();
109+
}
110+
111+
@Override
112+
public Subscription onServerConnected(ServerEventListener listener) {
113+
throw new UnsupportedOperationException("Server events are only available on the proxy");
114+
}
115+
116+
@Override
117+
public Subscription onServerDisconnected(ServerEventListener listener) {
118+
throw new UnsupportedOperationException("Server events are only available on the proxy");
119+
}
120+
121+
@Override
122+
public Subscription onConnectionStateChanged(Consumer<ConnectionState> listener) {
123+
Objects.requireNonNull(listener);
124+
stateListeners.add(listener);
125+
listener.accept(connectionState());
126+
startStatePolling();
127+
return () -> stateListeners.remove(listener);
128+
}
129+
130+
public PluginMessage handlePluginMessageRequest(Envelope env) {
131+
String to = env.to();
132+
String localId = localServerId();
133+
if (to != null && !to.equals(localId) && !"*".equals(to)) {
134+
return null;
135+
}
136+
137+
PluginMessage message = readPluginMessage(env);
138+
if (message == null) {
139+
return null;
140+
}
141+
142+
dispatchLocal(env, message);
143+
if ("*".equals(to)) {
144+
return null;
145+
}
146+
return message;
147+
}
148+
149+
public void handlePluginMessageResponse(Envelope env) {
150+
String to = env.to();
151+
if (to != null && !to.equals(localServerId()) && !"*".equals(to)) {
152+
Log.debug("Dropping plugin response for mismatched target {}", to);
153+
}
154+
}
155+
156+
private <T extends ChannelPayload, C extends MessageChannel<T>> MessageChannel<?> createChannel(
157+
ChannelType<T, C> type) {
158+
if (type instanceof CommandChannelType commandType) {
159+
return new CommandMessageChannel(commandType,
160+
this::sendPluginMessage,
161+
this::requestPluginMessage,
162+
listener -> registerListener(commandType, listener));
163+
}
164+
165+
return new PluginMessageChannel<>(type,
166+
this::sendPluginMessage,
167+
this::requestPluginMessage,
168+
listener -> registerListener(type, listener));
169+
}
170+
171+
private <P extends ChannelPayload> Subscription registerListener(ChannelType<P, ? extends MessageChannel<P>> type,
172+
MessageListener<P> listener) {
173+
String key = type.getClass().getName();
174+
channelTypesByName.putIfAbsent(key, type);
175+
CopyOnWriteArrayList<MessageListener<?>> listeners = listenersByType.computeIfAbsent(key,
176+
ignored -> new CopyOnWriteArrayList<>());
177+
listeners.add(listener);
178+
return () -> listeners.remove(listener);
179+
}
180+
181+
private CompletableFuture<Void> sendPluginMessage(Platform.ServerTarget target, PluginMessage payload) {
182+
Envelope env = Envelope.make(MessageType.PLUGIN_MESSAGE, localServerId(), target.id(),
183+
Envelope.MAPPER.valueToTree(payload));
184+
return client.send(env).dispatch();
185+
}
186+
187+
private CompletableFuture<PluginMessage> requestPluginMessage(Platform.ServerTarget target,
188+
PluginMessage payload,
189+
Duration timeout) {
190+
Duration effectiveTimeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
191+
Envelope env = Envelope.make(MessageType.PLUGIN_MESSAGE, localServerId(), target.id(),
192+
Envelope.MAPPER.valueToTree(payload));
193+
return client.send(env)
194+
.expect(MessageType.PLUGIN_MESSAGE_RESPONSE)
195+
.timeout(effectiveTimeout)
196+
.await()
197+
.thenApply(this::requirePluginMessage);
198+
}
199+
200+
private void dispatchLocal(Envelope env, PluginMessage message) {
201+
ChannelType<?, ?> channelType = channelTypesByName.get(message.channelType());
202+
if (channelType == null) {
203+
return;
204+
}
205+
206+
Object payload;
207+
try {
208+
payload = Envelope.MAPPER.treeToValue(message.data(), channelType.type());
209+
} catch (Exception e) {
210+
Log.warn("Failed to decode plugin payload for {}: {}", message.channelType(), e.getMessage());
211+
return;
212+
}
213+
214+
CopyOnWriteArrayList<MessageListener<?>> listeners = listenersByType.get(message.channelType());
215+
if (listeners == null || listeners.isEmpty()) {
216+
return;
217+
}
218+
219+
if (env.from() == null || env.from().isBlank()) {
220+
Log.warn("Received plugin message with missing source server ID");
221+
return;
222+
}
223+
Platform.ServerTarget source = Platform.BACKEND.target(env.from());
224+
MessageContext<ChannelPayload> context = contextCast(new MessageContext<>(
225+
typeCast(channelType),
226+
source,
227+
env.ts()));
228+
229+
for (MessageListener<?> raw : listeners) {
230+
try {
231+
listenerCast(raw).accept(context, payloadCast(payload));
232+
} catch (Exception e) {
233+
Log.warn("Plugin message listener failed for {}: {}", message.channelType(), e.getMessage());
234+
}
235+
}
236+
}
237+
238+
private PluginMessage readPluginMessage(Envelope env) {
239+
try {
240+
return Envelope.MAPPER.treeToValue(env.payload(), PluginMessage.class);
241+
} catch (Exception e) {
242+
Log.warn("Failed to decode plugin message: {}", e.getMessage());
243+
return null;
244+
}
245+
}
246+
247+
private PluginMessage requirePluginMessage(Envelope env) {
248+
try {
249+
return Envelope.MAPPER.treeToValue(env.payload(), PluginMessage.class);
250+
} catch (Exception e) {
251+
throw new CompletionException(e);
252+
}
253+
}
254+
255+
private String localServerId() {
256+
String id = client.serverId();
257+
return (id == null || id.isBlank()) ? "backend" : id;
258+
}
259+
260+
private void startStatePolling() {
261+
if (!polling.compareAndSet(false, true)) {
262+
return;
263+
}
264+
265+
statePoller = Executors.newSingleThreadScheduledExecutor(runnable -> {
266+
Thread thread = new Thread(runnable, "commandbridge-api-state");
267+
thread.setDaemon(true);
268+
return thread;
269+
});
270+
271+
lastState.set(connectionState());
272+
statePoller.scheduleAtFixedRate(() -> {
273+
try {
274+
ConnectionState current = connectionState();
275+
ConnectionState previous = lastState.getAndSet(current);
276+
if (current != previous) {
277+
for (Consumer<ConnectionState> listener : stateListeners) {
278+
try {
279+
listener.accept(current);
280+
} catch (Exception e) {
281+
Log.warn("Failed to run state-change listener: {}", e.getMessage());
282+
}
283+
}
284+
}
285+
} catch (Exception e) {
286+
Log.warn("State polling failed: {}", e.getMessage());
287+
}
288+
}, 500L, 500L, TimeUnit.MILLISECONDS);
289+
}
290+
291+
@SuppressWarnings("unchecked")
292+
private static <T extends ChannelPayload, C extends MessageChannel<T>> C typeCast(Object value) {
293+
return (C) value;
294+
}
295+
296+
@SuppressWarnings("unchecked")
297+
private static MessageContext<ChannelPayload> contextCast(MessageContext<?> context) {
298+
return (MessageContext<ChannelPayload>) context;
299+
}
300+
301+
@SuppressWarnings("unchecked")
302+
private static MessageListener<ChannelPayload> listenerCast(MessageListener<?> listener) {
303+
return (MessageListener<ChannelPayload>) listener;
304+
}
305+
306+
private static ChannelPayload payloadCast(Object value) {
307+
return (ChannelPayload) value;
308+
}
309+
}

0 commit comments

Comments
 (0)