Skip to content

Commit e00af51

Browse files
authored
Merge pull request #1087 from ably/feature/declare-data-models
[ECO-5375] Define internal liveobject data models
2 parents ac0ece7 + 4dfb1a0 commit e00af51

11 files changed

Lines changed: 448 additions & 73 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.ably.lib.objects;
2+
3+
import io.ably.lib.realtime.AblyRealtime;
4+
import io.ably.lib.realtime.CompletionListener;
5+
import io.ably.lib.types.AblyException;
6+
import io.ably.lib.types.ProtocolMessage;
7+
import io.ably.lib.util.Log;
8+
import org.jetbrains.annotations.NotNull;
9+
10+
public class Adapter implements LiveObjectsAdapter {
11+
private final AblyRealtime ably;
12+
private static final String TAG = LiveObjectsAdapter.class.getName();
13+
14+
public Adapter(@NotNull AblyRealtime ably) {
15+
this.ably = ably;
16+
}
17+
18+
@Override
19+
public void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial) {
20+
if (ably.channels.containsKey(channelName)) {
21+
ably.channels.get(channelName).properties.channelSerial = channelSerial;
22+
} else {
23+
Log.e(TAG, "setChannelSerial(): channel not found: " + channelName);
24+
}
25+
}
26+
27+
@Override
28+
public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException {
29+
// Always queue LiveObjects messages to ensure reliable state synchronization and proper acknowledgment
30+
ably.connection.connectionManager.send(msg, true, listener);
31+
}
32+
}

lib/src/main/java/io/ably/lib/objects/LiveMap.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ public interface LiveMap {
1818

1919
/**
2020
* Retrieves the value associated with the specified key.
21-
* If this map object is tombstoned (deleted), `undefined` is returned.
22-
* If no entry is associated with the specified key, `undefined` is returned.
23-
* If map entry is tombstoned (deleted), `undefined` is returned.
21+
* If this map object is tombstoned (deleted), null is returned.
22+
* If no entry is associated with the specified key, null is returned.
23+
* If map entry is tombstoned (deleted), null is returned.
2424
* If the value associated with the provided key is an objectId string of another LiveObject, a reference to that LiveObject
25-
* is returned, provided it exists in the local pool and is not tombstoned. Otherwise, `undefined` is returned.
25+
* is returned, provided it exists in the local pool and is not tombstoned. Otherwise, null is returned.
2626
* If the value is not an objectId, then that value is returned.
2727
*
2828
* @param keyName the key whose associated value is to be returned.

lib/src/main/java/io/ably/lib/plugins/PluginConnectionAdapter.java renamed to lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,11 @@
1-
package io.ably.lib.plugins;
1+
package io.ably.lib.objects;
22

33
import io.ably.lib.realtime.CompletionListener;
44
import io.ably.lib.types.AblyException;
55
import io.ably.lib.types.ProtocolMessage;
6+
import org.jetbrains.annotations.NotNull;
67

7-
/**
8-
* The PluginConnectionAdapter interface defines a contract for managing real-time communication
9-
* between plugins and the Ably Realtime system. Implementations of this interface are responsible
10-
* for sending protocol messages to their intended recipients, optionally queuing events, and
11-
* notifying listeners of the operation's outcome.
12-
*/
13-
public interface PluginConnectionAdapter {
14-
8+
public interface LiveObjectsAdapter {
159
/**
1610
* Sends a protocol message to its intended recipient.
1711
* This method transmits a protocol message, allowing for queuing events if necessary,
@@ -22,4 +16,12 @@ public interface PluginConnectionAdapter {
2216
* @throws AblyException if an error occurs during the send operation.
2317
*/
2418
void send(ProtocolMessage msg, CompletionListener listener) throws AblyException;
19+
20+
/**
21+
* Sets the channel serial for a specific channel.
22+
* @param channelName the name of the channel for which to set the serial
23+
* @param channelSerial the serial to set for the channel
24+
*/
25+
void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial);
2526
}
27+

lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package io.ably.lib.objects;
22

3-
import io.ably.lib.plugins.PluginInstance;
3+
import io.ably.lib.types.ProtocolMessage;
44
import org.jetbrains.annotations.NotNull;
55

66
/**
77
* The LiveObjectsPlugin interface provides a mechanism for managing and interacting with
88
* live data objects in a real-time environment. It allows for the retrieval, disposal, and
99
* management of LiveObjects instances associated with specific channel names.
1010
*/
11-
public interface LiveObjectsPlugin extends PluginInstance {
11+
public interface LiveObjectsPlugin {
1212

1313
/**
1414
* Retrieves an instance of LiveObjects associated with the specified channel name.
@@ -21,6 +21,15 @@ public interface LiveObjectsPlugin extends PluginInstance {
2121
@NotNull
2222
LiveObjects getInstance(@NotNull String channelName);
2323

24+
/**
25+
* Handles a protocol message.
26+
* This method is invoked whenever a protocol message is received, allowing the implementation
27+
* to process the message and take appropriate actions.
28+
*
29+
* @param message the protocol message to handle.
30+
*/
31+
void handle(@NotNull ProtocolMessage message);
32+
2433
/**
2534
* Disposes of the LiveObjects instance associated with the specified channel name.
2635
* This method removes the LiveObjects instance for the given channel, releasing any
@@ -29,4 +38,9 @@ public interface LiveObjectsPlugin extends PluginInstance {
2938
* @param channelName the name of the channel whose LiveObjects instance is to be removed.
3039
*/
3140
void dispose(@NotNull String channelName);
41+
42+
/**
43+
* Disposes of the plugin instance and all underlying resources.
44+
*/
45+
void dispose();
3246
}

lib/src/main/java/io/ably/lib/plugins/PluginInstance.java

Lines changed: 0 additions & 25 deletions
This file was deleted.

lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
import java.util.List;
77
import java.util.Map;
88

9+
import io.ably.lib.objects.Adapter;
10+
import io.ably.lib.objects.LiveObjectsAdapter;
911
import io.ably.lib.objects.LiveObjectsPlugin;
10-
import io.ably.lib.plugins.PluginConnectionAdapter;
1112
import io.ably.lib.rest.AblyRest;
1213
import io.ably.lib.rest.Auth;
1314
import io.ably.lib.transport.ConnectionManager;
@@ -187,9 +188,10 @@ public interface Channels extends ReadOnlyMap<String, Channel> {
187188
private LiveObjectsPlugin tryInitializeLiveObjectsPlugin() {
188189
try {
189190
Class<?> liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin");
191+
LiveObjectsAdapter adapter = new Adapter(this);
190192
return (LiveObjectsPlugin) liveObjectsImplementation
191-
.getDeclaredConstructor(PluginConnectionAdapter.class)
192-
.newInstance(this.connection.connectionManager);
193+
.getDeclaredConstructor(LiveObjectsAdapter.class)
194+
.newInstance(adapter);
193195
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
194196
InvocationTargetException e) {
195197
Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e);

lib/src/main/java/io/ably/lib/transport/ConnectionManager.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import io.ably.lib.debug.DebugOptions.RawProtocolListener;
1616
import io.ably.lib.http.HttpHelpers;
1717
import io.ably.lib.objects.LiveObjectsPlugin;
18-
import io.ably.lib.plugins.PluginConnectionAdapter;
1918
import io.ably.lib.realtime.AblyRealtime;
2019
import io.ably.lib.realtime.Channel;
2120
import io.ably.lib.realtime.CompletionListener;
@@ -37,7 +36,7 @@
3736
import io.ably.lib.util.PlatformAgentProvider;
3837
import io.ably.lib.util.ReconnectionStrategy;
3938

40-
public class ConnectionManager implements ConnectListener, PluginConnectionAdapter {
39+
public class ConnectionManager implements ConnectListener {
4140
final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
4241

4342
/**************************************************************
@@ -1687,11 +1686,6 @@ public QueuedMessage(ProtocolMessage msg, CompletionListener listener) {
16871686
}
16881687
}
16891688

1690-
@Override
1691-
public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException {
1692-
this.send(msg, true, listener);
1693-
}
1694-
16951689
public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener listener) throws AblyException {
16961690
State state;
16971691
synchronized(this) {

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package io.ably.lib.objects
22

33
import io.ably.lib.types.Callback
4+
import io.ably.lib.types.ProtocolMessage
5+
import io.ably.lib.util.Log
6+
7+
internal class DefaultLiveObjects(private val channelName: String, private val adapter: LiveObjectsAdapter): LiveObjects {
8+
private val tag = DefaultLiveObjects::class.simpleName
49

5-
internal class DefaultLiveObjects(private val channelName: String): LiveObjects {
610
override fun getRoot(): LiveMap {
711
TODO("Not yet implemented")
812
}
@@ -43,6 +47,16 @@ internal class DefaultLiveObjects(private val channelName: String): LiveObjects
4347
TODO("Not yet implemented")
4448
}
4549

50+
fun handle(msg: ProtocolMessage) {
51+
// RTL15b
52+
msg.channelSerial?.let {
53+
if (msg.action === ProtocolMessage.Action.`object`) {
54+
Log.v(tag, "Setting channel serial for channelName: $channelName, value: ${msg.channelSerial}")
55+
adapter.setChannelSerial(channelName, msg.channelSerial)
56+
}
57+
}
58+
}
59+
4660
fun dispose() {
4761
// Dispose of any resources associated with this LiveObjects instance
4862
// For example, close any open connections or clean up references

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,19 @@
11
package io.ably.lib.objects
22

3-
import io.ably.lib.plugins.PluginConnectionAdapter
4-
import io.ably.lib.realtime.CompletionListener
5-
import io.ably.lib.types.ErrorInfo
63
import io.ably.lib.types.ProtocolMessage
7-
import kotlinx.coroutines.CompletableDeferred
84
import java.util.concurrent.ConcurrentHashMap
95

10-
public class DefaultLiveObjectsPlugin(private val pluginConnectionAdapter: PluginConnectionAdapter) : LiveObjectsPlugin {
6+
public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) : LiveObjectsPlugin {
117

128
private val liveObjects = ConcurrentHashMap<String, DefaultLiveObjects>()
139

1410
override fun getInstance(channelName: String): LiveObjects {
15-
return liveObjects.getOrPut(channelName) { DefaultLiveObjects(channelName) }
11+
return liveObjects.getOrPut(channelName) { DefaultLiveObjects(channelName, adapter) }
1612
}
1713

18-
public suspend fun send(message: ProtocolMessage) {
19-
val deferred = CompletableDeferred<Unit>()
20-
pluginConnectionAdapter.send(message, object : CompletionListener {
21-
override fun onSuccess() {
22-
deferred.complete(Unit)
23-
}
24-
25-
override fun onError(reason: ErrorInfo) {
26-
deferred.completeExceptionally(Exception(reason.message))
27-
}
28-
})
29-
deferred.await()
30-
}
31-
32-
override fun handle(message: ProtocolMessage) {
33-
TODO("Not yet implemented")
14+
override fun handle(msg: ProtocolMessage) {
15+
val channelName = msg.channel
16+
liveObjects[channelName]?.handle(msg)
3417
}
3518

3619
override fun dispose(channelName: String) {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.ably.lib.objects
2+
3+
import io.ably.lib.realtime.CompletionListener
4+
import io.ably.lib.types.AblyException
5+
import io.ably.lib.types.ErrorInfo
6+
import io.ably.lib.types.ProtocolMessage
7+
import kotlinx.coroutines.suspendCancellableCoroutine
8+
import kotlin.coroutines.resume
9+
import kotlin.coroutines.resumeWithException
10+
11+
internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = suspendCancellableCoroutine { continuation ->
12+
try {
13+
this.send(message, object : CompletionListener {
14+
override fun onSuccess() {
15+
continuation.resume(Unit)
16+
}
17+
18+
override fun onError(reason: ErrorInfo) {
19+
continuation.resumeWithException(AblyException.fromErrorInfo(reason))
20+
}
21+
})
22+
} catch (e: Exception) {
23+
continuation.resumeWithException(e)
24+
}
25+
}
26+
27+
internal enum class ProtocolMessageFormat(private val value: String) {
28+
Msgpack("msgpack"),
29+
Json("json");
30+
31+
override fun toString(): String = value
32+
}
33+
34+
internal class Binary(val data: ByteArray?) {
35+
override fun equals(other: Any?): Boolean {
36+
if (this === other) return true
37+
if (other !is Binary) return false
38+
return data?.contentEquals(other.data) == true
39+
}
40+
41+
override fun hashCode(): Int {
42+
return data?.contentHashCode() ?: 0
43+
}
44+
}

0 commit comments

Comments
 (0)