Skip to content

Commit 6446864

Browse files
committed
1. Added coroutinex as a runtime and test dependency to liveobjects
2. Added bridging interfaces to send and receive protocol messages
1 parent d81cb24 commit 6446864

9 files changed

Lines changed: 126 additions & 15 deletions

File tree

lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
package io.ably.lib.objects;
22

3+
import io.ably.lib.plugins.PluginInstance;
34
import org.jetbrains.annotations.NotNull;
45

56
/**
6-
* The LiveObjectsPlugin interface provides a mechanism to retrieve instances of LiveObjects
7-
* associated with specific channel names. This allows for interaction with live data objects
8-
* in a real-time environment.
7+
* The LiveObjectsPlugin interface provides a mechanism for managing and interacting with
8+
* live data objects in a real-time environment. It allows for the retrieval, disposal, and
9+
* management of LiveObjects instances associated with specific channel names.
910
*/
10-
public interface LiveObjectsPlugin {
11+
public interface LiveObjectsPlugin extends PluginInstance {
1112

1213
/**
1314
* Retrieves an instance of LiveObjects associated with the specified channel name.
15+
* This method ensures that a LiveObjects instance is available for the given channel,
16+
* creating one if it does not already exist.
1417
*
1518
* @param channelName the name of the channel for which the LiveObjects instance is to be retrieved.
1619
* @return the LiveObjects instance associated with the specified channel name.
@@ -19,6 +22,8 @@ public interface LiveObjectsPlugin {
1922

2023
/**
2124
* Disposes of the LiveObjects instance associated with the specified channel name.
25+
* This method removes the LiveObjects instance for the given channel, releasing any
26+
* resources associated with it.
2227
*
2328
* @param channelName the name of the channel whose LiveObjects instance is to be removed.
2429
*/
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.ably.lib.plugins;
2+
3+
import io.ably.lib.realtime.CompletionListener;
4+
import io.ably.lib.types.AblyException;
5+
import io.ably.lib.types.ProtocolMessage;
6+
7+
/**
8+
* The PluginConnectionAdapter interface defines a contract for managing real-time communication
9+
* between plugins and the Ably Realtime system. Implementations of this interface are responsible
10+
* for sending protocol messages to their intended recipients, optionally queuing events, and
11+
* notifying listeners of the operation's outcome.
12+
*/
13+
public interface PluginConnectionAdapter {
14+
15+
/**
16+
* Sends a protocol message to its intended recipient.
17+
* This method transmits a protocol message, allowing for queuing events if necessary,
18+
* and notifies the provided listener upon the success or failure of the send operation.
19+
*
20+
* @param msg the protocol message to send.
21+
* @param listener a listener to be notified of the success or failure of the send operation.
22+
* @throws AblyException if an error occurs during the send operation.
23+
*/
24+
void send(ProtocolMessage msg, CompletionListener listener) throws AblyException;
25+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.ably.lib.plugins;
2+
3+
import io.ably.lib.types.ProtocolMessage;
4+
import org.jetbrains.annotations.NotNull;
5+
6+
/**
7+
* The ProtocolMessageHandler interface defines a contract for handling protocol messages.
8+
* Implementations of this interface are responsible for processing incoming protocol messages
9+
* and performing the necessary actions based on the message content.
10+
*/
11+
public interface PluginInstance {
12+
/**
13+
* Handles a protocol message.
14+
* This method is invoked whenever a protocol message is received, allowing the implementation
15+
* to process the message and take appropriate actions.
16+
*
17+
* @param message the protocol message to handle.
18+
*/
19+
void handle(@NotNull ProtocolMessage message);
20+
21+
/**
22+
* Disposes of the plugin instance and all underlying resources.
23+
*/
24+
void dispose();
25+
}

lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Map;
88

99
import io.ably.lib.objects.LiveObjectsPlugin;
10+
import io.ably.lib.plugins.PluginConnectionAdapter;
1011
import io.ably.lib.rest.AblyRest;
1112
import io.ably.lib.rest.Auth;
1213
import io.ably.lib.transport.ConnectionManager;
@@ -71,7 +72,10 @@ public AblyRealtime(ClientOptions options) throws AblyException {
7172
super(options);
7273
final InternalChannels channels = new InternalChannels();
7374
this.channels = channels;
74-
connection = new Connection(this, channels, platformAgentProvider);
75+
76+
liveObjectsPlugin = tryInitializeLiveObjectsPlugin();
77+
78+
connection = new Connection(this, channels, platformAgentProvider, liveObjectsPlugin);
7579

7680
if (!StringUtils.isNullOrEmpty(options.recover)) {
7781
RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover);
@@ -81,8 +85,6 @@ public AblyRealtime(ClientOptions options) throws AblyException {
8185
}
8286
}
8387

84-
liveObjectsPlugin = tryInitializeLiveObjectsPlugin();
85-
8688
if(options.autoConnect) connection.connect();
8789
}
8890

@@ -182,7 +184,9 @@ public interface Channels extends ReadOnlyMap<String, Channel> {
182184
private LiveObjectsPlugin tryInitializeLiveObjectsPlugin() {
183185
try {
184186
Class<?> liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin");
185-
return (LiveObjectsPlugin) liveObjectsImplementation.getDeclaredConstructor().newInstance();
187+
return (LiveObjectsPlugin) liveObjectsImplementation
188+
.getDeclaredConstructor(PluginConnectionAdapter.class)
189+
.newInstance(this.connection.connectionManager);
186190
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
187191
InvocationTargetException e) {
188192
Log.w(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e);

lib/src/main/java/io/ably/lib/realtime/Connection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.ably.lib.realtime;
22

3+
import io.ably.lib.objects.LiveObjectsPlugin;
34
import io.ably.lib.realtime.ConnectionStateListener.ConnectionStateChange;
45
import io.ably.lib.transport.ConnectionManager;
56
import io.ably.lib.types.AblyException;
@@ -122,10 +123,10 @@ public void close() {
122123
* internal
123124
*****************/
124125

125-
Connection(AblyRealtime ably, ConnectionManager.Channels channels, PlatformAgentProvider platformAgentProvider) throws AblyException {
126+
Connection(AblyRealtime ably, ConnectionManager.Channels channels, PlatformAgentProvider platformAgentProvider, LiveObjectsPlugin liveObjectsPlugin) throws AblyException {
126127
this.ably = ably;
127128
this.state = ConnectionState.initialized;
128-
this.connectionManager = new ConnectionManager(ably, this, channels, platformAgentProvider);
129+
this.connectionManager = new ConnectionManager(ably, this, channels, platformAgentProvider, liveObjectsPlugin);
129130
}
130131

131132
public void onConnectionStateChange(ConnectionStateChange stateChange) {

lib/src/main/java/io/ably/lib/transport/ConnectionManager.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import io.ably.lib.debug.DebugOptions;
1515
import io.ably.lib.debug.DebugOptions.RawProtocolListener;
1616
import io.ably.lib.http.HttpHelpers;
17+
import io.ably.lib.objects.LiveObjectsPlugin;
18+
import io.ably.lib.plugins.PluginConnectionAdapter;
1719
import io.ably.lib.realtime.AblyRealtime;
1820
import io.ably.lib.realtime.Channel;
1921
import io.ably.lib.realtime.CompletionListener;
@@ -35,7 +37,7 @@
3537
import io.ably.lib.util.PlatformAgentProvider;
3638
import io.ably.lib.util.ReconnectionStrategy;
3739

38-
public class ConnectionManager implements ConnectListener {
40+
public class ConnectionManager implements ConnectListener, PluginConnectionAdapter {
3941
final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
4042

4143
/**************************************************************
@@ -79,6 +81,13 @@ public class ConnectionManager implements ConnectListener {
7981
*/
8082
private boolean cleaningUpAfterEnteringTerminalState = false;
8183

84+
/**
85+
* A nullable reference to the LiveObjects plugin.
86+
* <p>
87+
* This field is initialized only if the LiveObjects plugin is present in the classpath.
88+
*/
89+
private final LiveObjectsPlugin liveObjectsPlugin;
90+
8291
/**
8392
* Methods on the channels map owned by the {@link AblyRealtime} instance
8493
* which the {@link ConnectionManager} needs access to.
@@ -764,11 +773,12 @@ public void run() {
764773
* ConnectionManager
765774
***********************/
766775

767-
public ConnectionManager(final AblyRealtime ably, final Connection connection, final Channels channels, final PlatformAgentProvider platformAgentProvider) throws AblyException {
776+
public ConnectionManager(final AblyRealtime ably, final Connection connection, final Channels channels, final PlatformAgentProvider platformAgentProvider, LiveObjectsPlugin liveObjectsPlugin) throws AblyException {
768777
this.ably = ably;
769778
this.connection = connection;
770779
this.channels = channels;
771780
this.platformAgentProvider = platformAgentProvider;
781+
this.liveObjectsPlugin = liveObjectsPlugin;
772782

773783
ClientOptions options = ably.options;
774784
this.hosts = new Hosts(options.realtimeHost, Defaults.HOST_REALTIME, options);
@@ -1220,6 +1230,12 @@ public void onMessage(ITransport transport, ProtocolMessage message) throws Ably
12201230
case auth:
12211231
addAction(new ReauthAction());
12221232
break;
1233+
case object:
1234+
case object_sync:
1235+
if (liveObjectsPlugin != null) {
1236+
liveObjectsPlugin.handle(message);
1237+
}
1238+
break;
12231239
default:
12241240
onChannelMessage(message);
12251241
}
@@ -1667,6 +1683,11 @@ public QueuedMessage(ProtocolMessage msg, CompletionListener listener) {
16671683
}
16681684
}
16691685

1686+
@Override
1687+
public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException {
1688+
this.send(msg, true, listener);
1689+
}
1690+
16701691
public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener listener) throws AblyException {
16711692
State state;
16721693
synchronized(this) {

lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void connectionmanager_fallback_none_withoutconnection() throws AblyExcep
137137
Connection connection = Mockito.mock(Connection.class);
138138
final ConnectionManager.Channels channels = Mockito.mock(ConnectionManager.Channels.class);
139139

140-
ConnectionManager connectionManager = new ConnectionManager(ably, connection, channels, new EmptyPlatformAgentProvider()) {
140+
ConnectionManager connectionManager = new ConnectionManager(ably, connection, channels, new EmptyPlatformAgentProvider(), null) {
141141
@Override
142142
protected boolean checkConnectivity() {
143143
return false;

live-objects/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ repositories {
1010
dependencies {
1111
implementation(project(":java"))
1212
testImplementation(kotlin("test"))
13+
implementation(libs.coroutine.core)
14+
15+
testImplementation(libs.coroutine.test)
1316
}
1417

1518
tasks.test {
Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package io.ably.lib.objects
22

3+
import io.ably.lib.plugins.PluginConnectionAdapter
4+
import io.ably.lib.realtime.CompletionListener
5+
import io.ably.lib.types.ErrorInfo
6+
import io.ably.lib.types.ProtocolMessage
7+
import kotlinx.coroutines.CompletableDeferred
38
import java.util.concurrent.ConcurrentHashMap
49

5-
public class DefaultLiveObjectsPlugin : LiveObjectsPlugin {
10+
public class DefaultLiveObjectsPlugin(private val pluginConnectionAdapter: PluginConnectionAdapter) : LiveObjectsPlugin {
611

7-
private val liveObjects = ConcurrentHashMap<String, LiveObjects>()
12+
private val liveObjects = ConcurrentHashMap<String, DefaultLiveObjects>()
813

914
override fun getInstance(channelName: String): LiveObjects {
1015
return liveObjects.getOrPut(channelName) { DefaultLiveObjects(channelName) }
@@ -13,4 +18,26 @@ public class DefaultLiveObjectsPlugin : LiveObjectsPlugin {
1318
override fun dispose(channelName: String) {
1419
liveObjects.remove(channelName)
1520
}
21+
22+
public suspend fun send(message: ProtocolMessage) {
23+
val deferred = CompletableDeferred<Unit>()
24+
pluginConnectionAdapter.send(message, object : CompletionListener {
25+
override fun onSuccess() {
26+
deferred.complete(Unit)
27+
}
28+
29+
override fun onError(reason: ErrorInfo) {
30+
deferred.completeExceptionally(Exception(reason.message))
31+
}
32+
})
33+
deferred.await()
34+
}
35+
36+
override fun handle(message: ProtocolMessage) {
37+
TODO("Not yet implemented")
38+
}
39+
40+
override fun dispose() {
41+
TODO("Not yet implemented")
42+
}
1643
}

0 commit comments

Comments
 (0)