Skip to content

Commit 7525ba7

Browse files
feat: decouple PubSub from DataHandler, add MongoDB and KeyDB support
1 parent ac336d3 commit 7525ba7

13 files changed

Lines changed: 228 additions & 128 deletions

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ repositories {
1919
dependencies {
2020
implementation 'com.google.code.gson:gson:2.11.0'
2121
compileOnly 'redis.clients:jedis:5.2.0'
22+
compileOnly 'org.mongodb:mongodb-driver-sync:5.3.1'
2223

2324
testImplementation platform('org.junit:junit-bom:5.10.0')
2425
testImplementation 'org.junit.jupiter:junit-jupiter'
2526
testImplementation 'redis.clients:jedis:5.2.0'
27+
testImplementation 'org.mongodb:mongodb-driver-sync:5.3.1'
2628
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
2729
}
2830

src/main/java/net/swofty/api/DataAPIImpl.java

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import net.swofty.data.format.JsonFormat;
66
import net.swofty.event.*;
77
import net.swofty.storage.DataStorage;
8-
import net.swofty.storage.RedisDataStorage;
98
import net.swofty.transaction.TransactionConsumer;
109
import net.swofty.transaction.TransactionFunction;
1110

@@ -24,9 +23,9 @@ public class DataAPIImpl implements DataAPI {
2423
private final EventBus eventBus;
2524
private final BulkOperationExecutor bulkOperations;
2625

27-
public DataAPIImpl(DataStorage storage, DataFormat format) {
26+
public DataAPIImpl(DataStorage storage, DataFormat format, PubSubHandler pubSub) {
2827
this.storage = storage;
29-
this.eventBus = createEventBus(storage);
28+
this.eventBus = (pubSub != null) ? new DistributedEventBus(pubSub) : new EventBus();
3029
this.linkRegistry = new LinkRegistryImpl();
3130
this.playerData = new PlayerDataManager(storage, format, eventBus);
3231
this.linkedData = new LinkedDataManager(storage, format, eventBus, linkRegistry);
@@ -35,23 +34,16 @@ public DataAPIImpl(DataStorage storage, DataFormat format) {
3534
this.bulkOperations = new BulkOperationExecutor(playerData, linkedData, storage, eventBus);
3635
}
3736

38-
public DataAPIImpl(DataStorage storage) {
39-
this(storage, new JsonFormat());
37+
public DataAPIImpl(DataStorage storage, DataFormat format) {
38+
this(storage, format, null);
4039
}
4140

42-
private static EventBus createEventBus(DataStorage storage) {
43-
if (storage instanceof RedisDataStorage redisStorage) {
44-
return new DistributedEventBus(redisStorage.getPool());
45-
}
46-
return new EventBus();
41+
public DataAPIImpl(DataStorage storage) {
42+
this(storage, new JsonFormat(), null);
4743
}
4844

49-
private void requireListenerSupport() {
50-
if (!storage.supportsListeners()) {
51-
throw new UnsupportedOperationException(
52-
"Event listeners are not supported with " + storage.getClass().getSimpleName()
53-
+ ". Use a storage backend that supports listeners (e.g. RedisDataStorage).");
54-
}
45+
public DataAPIImpl(DataStorage storage, PubSubHandler pubSub) {
46+
this(storage, new JsonFormat(), pubSub);
5547
}
5648

5749
// ==================== Player Fields ====================
@@ -205,46 +197,26 @@ public <K, R> R transactionDirect(K key, LinkType<K> type, TransactionFunction<R
205197

206198
@Override
207199
public <T> void subscribe(PlayerField<T> field, PlayerDataListener<T> listener) {
208-
requireListenerSupport();
209-
if (eventBus instanceof DistributedEventBus deb) {
210-
deb.registerField(field);
211-
}
212200
eventBus.subscribe(field, listener);
213201
}
214202

215203
@Override
216204
public <K, T> void subscribe(LinkedField<K, T> field, LinkedDataListener<K, T> listener) {
217-
requireListenerSupport();
218-
if (eventBus instanceof DistributedEventBus deb) {
219-
deb.registerField(field);
220-
}
221205
eventBus.subscribeLinked(field, listener);
222206
}
223207

224208
@Override
225209
public <K> void subscribe(LinkType<K> type, LinkChangeListener<K> listener) {
226-
requireListenerSupport();
227-
if (eventBus instanceof DistributedEventBus deb) {
228-
deb.registerLinkType(type);
229-
}
230210
eventBus.subscribeLinkChange(type, listener);
231211
}
232212

233213
@Override
234214
public <T> void subscribeExpiration(ExpiringField<T> field, ExpirationListener<T> listener) {
235-
requireListenerSupport();
236-
if (eventBus instanceof DistributedEventBus deb) {
237-
deb.registerField(field);
238-
}
239215
eventBus.subscribeExpiration(field, listener);
240216
}
241217

242218
@Override
243219
public <K, T> void subscribeExpiration(ExpiringLinkedField<K, T> field, LinkedExpirationListener<K, T> listener) {
244-
requireListenerSupport();
245-
if (eventBus instanceof DistributedEventBus deb) {
246-
deb.registerField(field);
247-
}
248220
eventBus.subscribeLinkedExpiration(field, listener);
249221
}
250222

src/main/java/net/swofty/event/DistributedEventBus.java

Lines changed: 34 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -11,48 +11,62 @@
1111
import net.swofty.data.DataReader;
1212
import net.swofty.data.DataWriter;
1313
import net.swofty.data.format.JsonFormat;
14-
import redis.clients.jedis.Jedis;
15-
import redis.clients.jedis.JedisPool;
16-
import redis.clients.jedis.JedisPubSub;
1714

1815
import java.lang.reflect.Type;
19-
import java.nio.charset.StandardCharsets;
2016
import java.util.*;
2117
import java.util.concurrent.ConcurrentHashMap;
2218

2319
public class DistributedEventBus extends EventBus {
2420
private static final Gson GSON = new GsonBuilder().create();
2521
private static final Type MAP_TYPE = new TypeToken<Map<String, Object>>() {}.getType();
2622

27-
private final JedisPool pool;
28-
private final String channel;
23+
private final PubSubHandler pubSubHandler;
2924
private final String nodeId;
3025
private final JsonFormat serializationFormat = new JsonFormat();
3126

3227
private final ConcurrentHashMap<String, DataField<?>> fieldRegistry = new ConcurrentHashMap<>();
3328
private final ConcurrentHashMap<String, LinkType<?>> linkTypeRegistry = new ConcurrentHashMap<>();
3429

35-
private Thread subscriberThread;
36-
private volatile JedisPubSub activeSub;
37-
private volatile boolean running = true;
38-
39-
public DistributedEventBus(JedisPool pool) {
40-
this(pool, "swofty:events", UUID.randomUUID().toString());
30+
public DistributedEventBus(PubSubHandler pubSubHandler) {
31+
this(pubSubHandler, UUID.randomUUID().toString());
4132
}
4233

43-
public DistributedEventBus(JedisPool pool, String channel, String nodeId) {
44-
this.pool = pool;
45-
this.channel = channel;
34+
public DistributedEventBus(PubSubHandler pubSubHandler, String nodeId) {
35+
this.pubSubHandler = pubSubHandler;
4636
this.nodeId = nodeId;
47-
startSubscriber();
37+
pubSubHandler.subscribe(this::handleMessage);
4838
}
4939

50-
public void registerField(DataField<?> field) {
40+
// ==================== Auto-register fields on subscribe ====================
41+
42+
@Override
43+
public <T> void subscribe(DataField<T> field, PlayerDataListener<T> listener) {
5144
fieldRegistry.put(field.fullKey(), field);
45+
super.subscribe(field, listener);
5246
}
5347

54-
public void registerLinkType(LinkType<?> type) {
48+
@Override
49+
public <K, T> void subscribeLinked(DataField<T> field, LinkedDataListener<K, T> listener) {
50+
fieldRegistry.put(field.fullKey(), field);
51+
super.subscribeLinked(field, listener);
52+
}
53+
54+
@Override
55+
public <K> void subscribeLinkChange(LinkType<K> type, LinkChangeListener<K> listener) {
5556
linkTypeRegistry.put(type.name(), type);
57+
super.subscribeLinkChange(type, listener);
58+
}
59+
60+
@Override
61+
public <T> void subscribeExpiration(ExpiringField<T> field, ExpirationListener<T> listener) {
62+
fieldRegistry.put(field.fullKey(), field);
63+
super.subscribeExpiration(field, listener);
64+
}
65+
66+
@Override
67+
public <K, T> void subscribeLinkedExpiration(ExpiringLinkedField<K, T> field, LinkedExpirationListener<K, T> listener) {
68+
fieldRegistry.put(field.fullKey(), field);
69+
super.subscribeLinkedExpiration(field, listener);
5670
}
5771

5872
// ==================== Override fire* to publish ====================
@@ -118,31 +132,7 @@ public <K, T> void fireLinkedExpired(ExpiringLinkedField<K, T> field, K linkKey,
118132
// ==================== Pub/Sub ====================
119133

120134
private void publish(EventMessage message) {
121-
try (Jedis jedis = pool.getResource()) {
122-
jedis.publish(channel, GSON.toJson(message));
123-
}
124-
}
125-
126-
private void startSubscriber() {
127-
subscriberThread = new Thread(() -> {
128-
while (running) {
129-
try (Jedis jedis = pool.getResource()) {
130-
activeSub = new JedisPubSub() {
131-
@Override
132-
public void onMessage(String ch, String msg) {
133-
handleMessage(msg);
134-
}
135-
};
136-
jedis.subscribe(activeSub, channel);
137-
} catch (Exception e) {
138-
if (running) {
139-
try { Thread.sleep(1000); } catch (InterruptedException ie) { break; }
140-
}
141-
}
142-
}
143-
}, "swofty-pubsub-subscriber");
144-
subscriberThread.setDaemon(true);
145-
subscriberThread.start();
135+
pubSubHandler.publish(GSON.toJson(message));
146136
}
147137

148138
@SuppressWarnings("unchecked")
@@ -263,12 +253,6 @@ private Set<UUID> listToUuidSet(Object raw) {
263253
// ==================== Lifecycle ====================
264254

265255
public void shutdown() {
266-
running = false;
267-
if (activeSub != null) {
268-
try { activeSub.unsubscribe(); } catch (Exception ignored) {}
269-
}
270-
if (subscriberThread != null) {
271-
subscriberThread.interrupt();
272-
}
256+
pubSubHandler.shutdown();
273257
}
274258
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package net.swofty.event;
2+
3+
import redis.clients.jedis.JedisPool;
4+
5+
/**
6+
* PubSubHandler implementation for KeyDB.
7+
* KeyDB is wire-compatible with Redis, so this uses Jedis under the hood.
8+
*/
9+
public class KeyDBPubSubHandler extends RedisPubSubHandler {
10+
public KeyDBPubSubHandler(JedisPool pool, String channel) {
11+
super(pool, channel);
12+
}
13+
14+
public KeyDBPubSubHandler(JedisPool pool) {
15+
super(pool, "swofty:events");
16+
}
17+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package net.swofty.event;
2+
3+
public interface PubSubHandler {
4+
void publish(String message);
5+
void subscribe(MessageHandler handler);
6+
void shutdown();
7+
8+
@FunctionalInterface
9+
interface MessageHandler {
10+
void onMessage(String message);
11+
}
12+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package net.swofty.event;
2+
3+
import redis.clients.jedis.Jedis;
4+
import redis.clients.jedis.JedisPool;
5+
import redis.clients.jedis.JedisPubSub;
6+
7+
public class RedisPubSubHandler implements PubSubHandler {
8+
private final JedisPool pool;
9+
private final String channel;
10+
11+
private Thread subscriberThread;
12+
private volatile JedisPubSub activeSub;
13+
private volatile boolean running = true;
14+
15+
public RedisPubSubHandler(JedisPool pool, String channel) {
16+
this.pool = pool;
17+
this.channel = channel;
18+
}
19+
20+
public RedisPubSubHandler(JedisPool pool) {
21+
this(pool, "swofty:events");
22+
}
23+
24+
@Override
25+
public void publish(String message) {
26+
try (Jedis jedis = pool.getResource()) {
27+
jedis.publish(channel, message);
28+
}
29+
}
30+
31+
@Override
32+
public void subscribe(MessageHandler handler) {
33+
subscriberThread = new Thread(() -> {
34+
while (running) {
35+
try (Jedis jedis = pool.getResource()) {
36+
activeSub = new JedisPubSub() {
37+
@Override
38+
public void onMessage(String ch, String msg) {
39+
handler.onMessage(msg);
40+
}
41+
};
42+
jedis.subscribe(activeSub, channel);
43+
} catch (Exception e) {
44+
if (running) {
45+
try { Thread.sleep(1000); } catch (InterruptedException ie) { break; }
46+
}
47+
}
48+
}
49+
}, "swofty-pubsub-subscriber");
50+
subscriberThread.setDaemon(true);
51+
subscriberThread.start();
52+
}
53+
54+
@Override
55+
public void shutdown() {
56+
running = false;
57+
if (activeSub != null) {
58+
try { activeSub.unsubscribe(); } catch (Exception ignored) {}
59+
}
60+
if (subscriberThread != null) {
61+
subscriberThread.interrupt();
62+
}
63+
}
64+
}

src/main/java/net/swofty/storage/DataStorage.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,4 @@ public interface DataStorage {
99
void delete(String type, String id);
1010
boolean exists(String type, String id);
1111

12-
/**
13-
* Whether this storage backend supports event listeners.
14-
* Distributed backends (e.g. Redis) return true; local-only backends (e.g. file) return false.
15-
*/
16-
default boolean supportsListeners() {
17-
return false;
18-
}
1912
}

src/main/java/net/swofty/storage/InMemoryDataStorage.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,4 @@ public boolean exists(String type, String id) {
3737
return bucket != null && bucket.containsKey(id);
3838
}
3939

40-
@Override
41-
public boolean supportsListeners() {
42-
return true;
43-
}
4440
}

0 commit comments

Comments
 (0)