Skip to content

Commit b66c81c

Browse files
committed
[ECO-5375] Refactored LiveObjects plugin to handle channelSerial
1 parent ac0ece7 commit b66c81c

6 files changed

Lines changed: 85 additions & 33 deletions

File tree

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.ably.lib.objects;
2+
3+
import io.ably.lib.plugins.PluginConnectionAdapter;
4+
import io.ably.lib.realtime.AblyRealtime;
5+
import io.ably.lib.realtime.CompletionListener;
6+
import io.ably.lib.types.AblyException;
7+
import io.ably.lib.types.ProtocolMessage;
8+
import io.ably.lib.util.Log;
9+
import org.jetbrains.annotations.NotNull;
10+
11+
public interface LiveObjectsAdapter extends PluginConnectionAdapter {
12+
void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial);
13+
14+
class Adapter implements LiveObjectsAdapter {
15+
private final AblyRealtime ably;
16+
private static final String TAG = LiveObjectsAdapter.class.getName();
17+
18+
public Adapter(@NotNull AblyRealtime ably) {
19+
this.ably = ably;
20+
}
21+
22+
@Override
23+
public void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial) {
24+
if (ably.channels.containsKey(channelName)) {
25+
ably.channels.get(channelName).properties.channelSerial = channelSerial;
26+
} else {
27+
Log.e(TAG, "setChannelSerial(): channel not found: " + channelName);
28+
}
29+
}
30+
31+
@Override
32+
public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException {
33+
ably.connection.connectionManager.send(msg, true, listener);
34+
}
35+
}
36+
}

lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
import java.util.List;
77
import java.util.Map;
88

9+
import io.ably.lib.objects.LiveObjectsAdapter;
910
import io.ably.lib.objects.LiveObjectsPlugin;
10-
import io.ably.lib.plugins.PluginConnectionAdapter;
1111
import io.ably.lib.rest.AblyRest;
1212
import io.ably.lib.rest.Auth;
1313
import io.ably.lib.transport.ConnectionManager;
@@ -187,9 +187,10 @@ public interface Channels extends ReadOnlyMap<String, Channel> {
187187
private LiveObjectsPlugin tryInitializeLiveObjectsPlugin() {
188188
try {
189189
Class<?> liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin");
190+
LiveObjectsAdapter adapter = new LiveObjectsAdapter.Adapter(this);
190191
return (LiveObjectsPlugin) liveObjectsImplementation
191-
.getDeclaredConstructor(PluginConnectionAdapter.class)
192-
.newInstance(this.connection.connectionManager);
192+
.getDeclaredConstructor(LiveObjectsAdapter.class)
193+
.newInstance(adapter);
193194
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
194195
InvocationTargetException e) {
195196
Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e);

lib/src/main/java/io/ably/lib/transport/ConnectionManager.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import io.ably.lib.debug.DebugOptions.RawProtocolListener;
1616
import io.ably.lib.http.HttpHelpers;
1717
import io.ably.lib.objects.LiveObjectsPlugin;
18-
import io.ably.lib.plugins.PluginConnectionAdapter;
1918
import io.ably.lib.realtime.AblyRealtime;
2019
import io.ably.lib.realtime.Channel;
2120
import io.ably.lib.realtime.CompletionListener;
@@ -37,7 +36,7 @@
3736
import io.ably.lib.util.PlatformAgentProvider;
3837
import io.ably.lib.util.ReconnectionStrategy;
3938

40-
public class ConnectionManager implements ConnectListener, PluginConnectionAdapter {
39+
public class ConnectionManager implements ConnectListener {
4140
final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
4241

4342
/**************************************************************
@@ -1687,11 +1686,6 @@ public QueuedMessage(ProtocolMessage msg, CompletionListener listener) {
16871686
}
16881687
}
16891688

1690-
@Override
1691-
public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException {
1692-
this.send(msg, true, listener);
1693-
}
1694-
16951689
public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener listener) throws AblyException {
16961690
State state;
16971691
synchronized(this) {

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package io.ably.lib.objects
22

33
import io.ably.lib.types.Callback
4+
import io.ably.lib.types.ProtocolMessage
5+
import io.ably.lib.util.Log
6+
7+
internal class DefaultLiveObjects(private val adapter: LiveObjectsAdapter, private val channelName: String): LiveObjects {
8+
private val tag = DefaultLiveObjects::class.simpleName
49

5-
internal class DefaultLiveObjects(private val channelName: String): LiveObjects {
610
override fun getRoot(): LiveMap {
711
TODO("Not yet implemented")
812
}
@@ -43,6 +47,16 @@ internal class DefaultLiveObjects(private val channelName: String): LiveObjects
4347
TODO("Not yet implemented")
4448
}
4549

50+
fun handle(msg: ProtocolMessage) {
51+
// RTL15b
52+
msg.channelSerial?.let {
53+
if (msg.action === ProtocolMessage.Action.`object`) {
54+
Log.v(tag, "Setting channel serial for channelName: $channelName, value: ${msg.channelSerial}")
55+
adapter.setChannelSerial(channelName, msg.channelSerial)
56+
}
57+
}
58+
}
59+
4660
fun dispose() {
4761
// Dispose of any resources associated with this LiveObjects instance
4862
// For example, close any open connections or clean up references

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,19 @@
11
package io.ably.lib.objects
22

3-
import io.ably.lib.plugins.PluginConnectionAdapter
4-
import io.ably.lib.realtime.CompletionListener
5-
import io.ably.lib.types.ErrorInfo
63
import io.ably.lib.types.ProtocolMessage
7-
import kotlinx.coroutines.CompletableDeferred
84
import java.util.concurrent.ConcurrentHashMap
95

10-
public class DefaultLiveObjectsPlugin(private val pluginConnectionAdapter: PluginConnectionAdapter) : LiveObjectsPlugin {
6+
public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) : LiveObjectsPlugin {
117

128
private val liveObjects = ConcurrentHashMap<String, DefaultLiveObjects>()
139

1410
override fun getInstance(channelName: String): LiveObjects {
15-
return liveObjects.getOrPut(channelName) { DefaultLiveObjects(channelName) }
11+
return liveObjects.getOrPut(channelName) { DefaultLiveObjects(adapter, channelName) }
1612
}
1713

18-
public suspend fun send(message: ProtocolMessage) {
19-
val deferred = CompletableDeferred<Unit>()
20-
pluginConnectionAdapter.send(message, object : CompletionListener {
21-
override fun onSuccess() {
22-
deferred.complete(Unit)
23-
}
24-
25-
override fun onError(reason: ErrorInfo) {
26-
deferred.completeExceptionally(Exception(reason.message))
27-
}
28-
})
29-
deferred.await()
30-
}
31-
32-
override fun handle(message: ProtocolMessage) {
33-
TODO("Not yet implemented")
14+
override fun handle(msg: ProtocolMessage) {
15+
val channelName = msg.channel
16+
liveObjects[channelName]?.handle(msg)
3417
}
3518

3619
override fun dispose(channelName: String) {
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.ably.lib.objects
2+
3+
import io.ably.lib.realtime.CompletionListener
4+
import io.ably.lib.types.ErrorInfo
5+
import io.ably.lib.types.ProtocolMessage
6+
import kotlinx.coroutines.CompletableDeferred
7+
8+
internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) {
9+
val deferred = CompletableDeferred<Unit>()
10+
try {
11+
this.send(message, object : CompletionListener {
12+
override fun onSuccess() {
13+
deferred.complete(Unit)
14+
}
15+
16+
override fun onError(reason: ErrorInfo) {
17+
deferred.completeExceptionally(Exception(reason.message))
18+
}
19+
})
20+
} catch (e: Exception) {
21+
deferred.completeExceptionally(e)
22+
}
23+
deferred.await()
24+
}

0 commit comments

Comments
 (0)