Skip to content

Commit 1355584

Browse files
committed
[ECO-5482] Implemented spec RTO15 for objects realtime publish
1 parent 0e7e377 commit 1355584

4 files changed

Lines changed: 49 additions & 2 deletions

File tree

lib/src/main/java/io/ably/lib/objects/Adapter.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33
import io.ably.lib.realtime.AblyRealtime;
44
import io.ably.lib.realtime.ChannelState;
55
import io.ably.lib.realtime.CompletionListener;
6-
import io.ably.lib.types.*;
6+
import io.ably.lib.transport.ConnectionManager;
7+
import io.ably.lib.types.AblyException;
8+
import io.ably.lib.types.ChannelMode;
9+
import io.ably.lib.types.ChannelOptions;
10+
import io.ably.lib.types.ClientOptions;
11+
import io.ably.lib.types.ProtocolMessage;
712
import io.ably.lib.util.Log;
813
import org.jetbrains.annotations.NotNull;
914

@@ -67,4 +72,9 @@ public ChannelState getChannelState(@NotNull String channelName) {
6772
public @NotNull ClientOptions getClientOptions() {
6873
return ably.options;
6974
}
75+
76+
@Override
77+
public @NotNull ConnectionManager getConnectionManager() {
78+
return ably.connection.connectionManager;
79+
}
7080
}

lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.ably.lib.realtime.ChannelState;
44
import io.ably.lib.realtime.CompletionListener;
5+
import io.ably.lib.transport.ConnectionManager;
56
import io.ably.lib.types.AblyException;
67
import io.ably.lib.types.ChannelMode;
78
import io.ably.lib.types.ClientOptions;
@@ -63,5 +64,14 @@ public interface LiveObjectsAdapter {
6364
* @return the client options containing configuration parameters
6465
*/
6566
@NotNull ClientOptions getClientOptions();
67+
68+
/**
69+
* Retrieves the connection manager for handling connection state and operations.
70+
* Used to check connection status, obtain error information, and manage
71+
* message transmission across the Ably connection.
72+
*
73+
* @return the connection manager instance
74+
*/
75+
@NotNull ConnectionManager getConnectionManager();
6676
}
6777

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,20 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
106106
objectsPool.get(ROOT_OBJECT_ID) as LiveMap
107107
}
108108

109+
/**
110+
* Spec: RTO15
111+
*/
112+
internal suspend fun publish(objectMessages: Array<ObjectMessage>) {
113+
// RTO15b, RTL6c - Ensure that the channel is in a valid state for publishing
114+
adapter.throwIfUnpublishableState(channelName)
115+
adapter.ensureMessageSizeWithinLimit(objectMessages)
116+
// RTO15e - Must construct the ProtocolMessage as per RTO15e1, RTO15e2, RTO15e3
117+
val protocolMessage = ProtocolMessage(ProtocolMessage.Action.`object`, channelName)
118+
protocolMessage.state = objectMessages
119+
// RTO15f, RTO15g - Send the ProtocolMessage using the adapter and capture success/failure
120+
adapter.sendAsync(protocolMessage)
121+
}
122+
109123
/**
110124
* Handles a ProtocolMessage containing proto action as `object` or `object_sync`.
111125
* @spec RTL1 - Processes incoming object messages and object sync messages

live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import kotlinx.coroutines.suspendCancellableCoroutine
99
import kotlin.coroutines.resume
1010
import kotlin.coroutines.resumeWithException
1111

12+
/**
13+
* Spec: RTO15g
14+
*/
1215
internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = suspendCancellableCoroutine { continuation ->
1316
try {
1417
this.send(message, object : CompletionListener {
@@ -25,11 +28,14 @@ internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = su
2528
}
2629
}
2730

31+
/**
32+
* Spec: RTO15d
33+
*/
2834
internal fun LiveObjectsAdapter.ensureMessageSizeWithinLimit(objectMessages: Array<ObjectMessage>) {
2935
val maximumAllowedSize = maxMessageSizeLimit()
3036
val objectsTotalMessageSize = objectMessages.sumOf { it.size() }
3137
if (objectsTotalMessageSize > maximumAllowedSize) {
32-
throw ablyException("ObjectMessage size $objectsTotalMessageSize exceeds maximum allowed size of $maximumAllowedSize bytes",
38+
throw ablyException("ObjectMessages size $objectsTotalMessageSize exceeds maximum allowed size of $maximumAllowedSize bytes",
3339
ErrorCode.MaxMessageSizeExceeded)
3440
}
3541
}
@@ -53,6 +59,13 @@ internal fun LiveObjectsAdapter.throwIfInvalidWriteApiConfiguration(channelName:
5359
throwIfInChannelState(channelName, arrayOf(ChannelState.detached, ChannelState.failed, ChannelState.suspended))
5460
}
5561

62+
internal fun LiveObjectsAdapter.throwIfUnpublishableState(channelName: String) {
63+
if (!connectionManager.isActive) {
64+
throw ablyException(connectionManager.stateErrorInfo)
65+
}
66+
throwIfInChannelState(channelName, arrayOf(ChannelState.failed, ChannelState.suspended))
67+
}
68+
5669
// Spec: RTO2
5770
internal fun LiveObjectsAdapter.throwIfMissingChannelMode(channelName: String, channelMode: ChannelMode) {
5871
val channelModes = getChannelModes(channelName)

0 commit comments

Comments
 (0)