Skip to content

Commit 3e0ca95

Browse files
authored
Merge pull request #1137 from ably/feature/object-sync-refactored
[ECO-5426][LiveObjects] Implement object sync / operation
2 parents 86e2dac + 34449c7 commit 3e0ca95

37 files changed

Lines changed: 4042 additions & 139 deletions

lib/src/main/java/io/ably/lib/objects/LiveCounter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,5 @@ public interface LiveCounter {
5858
*/
5959
@NotNull
6060
@Contract(pure = true) // Indicates this method does not modify the state of the object.
61-
Long value();
61+
Double value();
6262
}

lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.ably.lib.objects;
22

3+
import io.ably.lib.realtime.ChannelState;
34
import io.ably.lib.types.ProtocolMessage;
45
import org.jetbrains.annotations.NotNull;
56

@@ -30,6 +31,17 @@ public interface LiveObjectsPlugin {
3031
*/
3132
void handle(@NotNull ProtocolMessage message);
3233

34+
/**
35+
* Handles state changes for a specific channel.
36+
* This method is invoked whenever a channel's state changes, allowing the implementation
37+
* to update the LiveObjects instances accordingly based on the new state and presence of objects.
38+
*
39+
* @param channelName the name of the channel whose state has changed.
40+
* @param state the new state of the channel.
41+
* @param hasObjects flag indicates whether the channel has any associated live objects.
42+
*/
43+
void handleStateChange(@NotNull String channelName, @NotNull ChannelState state, boolean hasObjects);
44+
3345
/**
3446
* Disposes of the LiveObjects instance associated with the specified channel name.
3547
* This method removes the LiveObjects instance for the given channel, releasing any

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,15 @@ private void setState(ChannelState newState, ErrorInfo reason, boolean resumed,
145145
this.reason = stateChange.reason;
146146
}
147147

148+
// cover states other than attached, ChannelState.attached already covered in setAttached
149+
if (liveObjectsPlugin != null && newState!= ChannelState.attached) {
150+
try {
151+
liveObjectsPlugin.handleStateChange(name, newState, false);
152+
} catch (Throwable t) {
153+
Log.e(TAG, "Unexpected exception in LiveObjectsPlugin.handle", t);
154+
}
155+
}
156+
148157
if (newState != ChannelState.attaching && newState != ChannelState.suspended) {
149158
this.retryAttempt = 0;
150159
}
@@ -439,6 +448,13 @@ private void setAttached(ProtocolMessage message) {
439448
}
440449
return;
441450
}
451+
if (liveObjectsPlugin != null) {
452+
try {
453+
liveObjectsPlugin.handleStateChange(name, ChannelState.attached, message.hasFlag(Flag.has_objects));
454+
} catch (Throwable t) {
455+
Log.e(TAG, "Unexpected exception in LiveObjectsPlugin.handle", t);
456+
}
457+
}
442458
if(state == ChannelState.attached) {
443459
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
444460
if (!message.hasFlag(Flag.resumed)) { // RTL12

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 161 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,59 @@
11
package io.ably.lib.objects
22

3+
import io.ably.lib.realtime.ChannelState
34
import io.ably.lib.types.Callback
45
import io.ably.lib.types.ProtocolMessage
56
import io.ably.lib.util.Log
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
9+
import kotlinx.coroutines.flow.MutableSharedFlow
610

7-
internal class DefaultLiveObjects(private val channelName: String, private val adapter: LiveObjectsAdapter): LiveObjects {
8-
private val tag = DefaultLiveObjects::class.simpleName
11+
/**
12+
* @spec RTO2 - enum representing objects state
13+
*/
14+
internal enum class ObjectsState {
15+
INITIALIZED,
16+
SYNCING,
17+
SYNCED
18+
}
19+
20+
/**
21+
* Default implementation of LiveObjects interface.
22+
* Provides the core functionality for managing live objects on a channel.
23+
*/
24+
internal class DefaultLiveObjects(internal val channelName: String, internal val adapter: LiveObjectsAdapter): LiveObjects {
25+
private val tag = "DefaultLiveObjects"
26+
/**
27+
* @spec RTO3 - Objects pool storing all live objects by object ID
28+
*/
29+
internal val objectsPool = ObjectsPool(this)
30+
31+
internal var state = ObjectsState.INITIALIZED
32+
33+
/**
34+
* @spec RTO4 - Used for handling object messages and object sync messages
35+
*/
36+
private val objectsManager = ObjectsManager(this)
937

38+
/**
39+
* Coroutine scope for running sequential operations on a single thread, used to avoid concurrency issues.
40+
*/
41+
private val sequentialScope =
42+
CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName(channelName) + SupervisorJob())
43+
44+
/**
45+
* Event bus for handling incoming object messages sequentially.
46+
*/
47+
private val objectsEventBus = MutableSharedFlow<ProtocolMessage>(extraBufferCapacity = UNLIMITED)
48+
private val incomingObjectsHandler: Job
49+
50+
init {
51+
incomingObjectsHandler = initializeHandlerForIncomingObjectMessages()
52+
}
53+
54+
/**
55+
* @spec RTO1 - Returns the root LiveMap object with proper validation and sync waiting
56+
*/
1057
override fun getRoot(): LiveMap {
1158
TODO("Not yet implemented")
1259
}
@@ -47,18 +94,121 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
4794
TODO("Not yet implemented")
4895
}
4996

50-
fun handle(msg: ProtocolMessage) {
51-
// RTL15b
52-
msg.channelSerial?.let {
53-
if (msg.action === ProtocolMessage.Action.`object`) {
54-
Log.v(tag, "Setting channel serial for channelName: $channelName, value: ${msg.channelSerial}")
55-
adapter.setChannelSerial(channelName, msg.channelSerial)
97+
/**
98+
* Handles a ProtocolMessage containing proto action as `object` or `object_sync`.
99+
* @spec RTL1 - Processes incoming object messages and object sync messages
100+
*/
101+
internal fun handle(protocolMessage: ProtocolMessage) {
102+
// RTL15b - Set channel serial for OBJECT messages
103+
adapter.setChannelSerial(channelName, protocolMessage)
104+
105+
if (protocolMessage.state == null || protocolMessage.state.isEmpty()) {
106+
Log.w(tag, "Received ProtocolMessage with null or empty objects, ignoring")
107+
return
108+
}
109+
110+
objectsEventBus.tryEmit(protocolMessage)
111+
}
112+
113+
/**
114+
* Initializes the handler for incoming object messages and object sync messages.
115+
* Processes the messages sequentially to ensure thread safety and correct order of operations.
116+
*
117+
* @spec OM2 - Populates missing fields from parent protocol message
118+
*/
119+
private fun initializeHandlerForIncomingObjectMessages(): Job {
120+
return sequentialScope.launch {
121+
objectsEventBus.collect { protocolMessage ->
122+
// OM2 - Populate missing fields from parent
123+
val objects = protocolMessage.state.filterIsInstance<ObjectMessage>()
124+
.mapIndexed { index, objMsg ->
125+
objMsg.copy(
126+
connectionId = objMsg.connectionId ?: protocolMessage.connectionId, // OM2c
127+
timestamp = objMsg.timestamp ?: protocolMessage.timestamp, // OM2e
128+
id = objMsg.id ?: (protocolMessage.id + ':' + index) // OM2a
129+
)
130+
}
131+
132+
try {
133+
when (protocolMessage.action) {
134+
ProtocolMessage.Action.`object` -> objectsManager.handleObjectMessages(objects)
135+
ProtocolMessage.Action.object_sync -> objectsManager.handleObjectSyncMessages(
136+
objects,
137+
protocolMessage.channelSerial
138+
)
139+
else -> Log.w(tag, "Ignoring protocol message with unhandled action: ${protocolMessage.action}")
140+
}
141+
} catch (exception: Exception) {
142+
// Skip current message if an error occurs, don't rethrow to avoid crashing the collector
143+
Log.e(tag, "Error handling objects message with protocolMsg id ${protocolMessage.id}", exception)
144+
}
56145
}
57146
}
58147
}
59148

60-
fun dispose() {
61-
// Dispose of any resources associated with this LiveObjects instance
62-
// For example, close any open connections or clean up references
149+
internal fun handleStateChange(state: ChannelState, hasObjects: Boolean) {
150+
sequentialScope.launch {
151+
when (state) {
152+
ChannelState.attached -> {
153+
Log.v(tag, "Objects.onAttached() channel=$channelName, hasObjects=$hasObjects")
154+
155+
// RTO4a
156+
val fromInitializedState = this@DefaultLiveObjects.state == ObjectsState.INITIALIZED
157+
if (hasObjects || fromInitializedState) {
158+
// should always start a new sync sequence if we're in the initialized state, no matter the HAS_OBJECTS flag value.
159+
// this guarantees we emit both "syncing" -> "synced" events in that order.
160+
objectsManager.startNewSync(null)
161+
}
162+
163+
// RTO4b
164+
if (!hasObjects) {
165+
// if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel.
166+
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
167+
objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2
168+
objectsManager.clearSyncObjectsDataPool() // RTO4b3
169+
objectsManager.clearBufferedObjectOperations() // RTO4b5
170+
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
171+
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
172+
objectsManager.endSync(fromInitializedState) // RTO4b4
173+
}
174+
}
175+
ChannelState.detached,
176+
ChannelState.failed -> {
177+
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
178+
objectsPool.clearObjectsData(false)
179+
objectsManager.clearSyncObjectsDataPool()
180+
}
181+
182+
else -> {
183+
// No action needed for other states
184+
}
185+
}
186+
}
187+
}
188+
189+
/**
190+
* Changes the state and emits events.
191+
*
192+
* @spec RTO2 - Emits state change events for syncing and synced states
193+
*/
194+
internal fun stateChange(newState: ObjectsState, deferEvent: Boolean) {
195+
if (state == newState) {
196+
return
197+
}
198+
199+
state = newState
200+
Log.v(tag, "Objects state changed to: $newState")
201+
202+
// TODO: Emit state change events
203+
}
204+
205+
// Dispose of any resources associated with this LiveObjects instance
206+
fun dispose(reason: String) {
207+
val cancellationError = CancellationException("Objects disposed for channel $channelName, reason: $reason")
208+
incomingObjectsHandler.cancel(cancellationError) // objectsEventBus automatically garbage collected when collector is cancelled
209+
objectsPool.dispose()
210+
objectsManager.dispose()
211+
// Don't cancel sequentialScope (needed in public methods), just cancel ongoing coroutines
212+
sequentialScope.coroutineContext.cancelChildren(cancellationError)
63213
}
64214
}

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.ably.lib.objects
22

3+
import io.ably.lib.realtime.ChannelState
34
import io.ably.lib.types.ProtocolMessage
45
import java.util.concurrent.ConcurrentHashMap
56

@@ -16,14 +17,18 @@ public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) :
1617
liveObjects[channelName]?.handle(msg)
1718
}
1819

20+
override fun handleStateChange(channelName: String, state: ChannelState, hasObjects: Boolean) {
21+
liveObjects[channelName]?.handleStateChange(state, hasObjects)
22+
}
23+
1924
override fun dispose(channelName: String) {
20-
liveObjects[channelName]?.dispose()
25+
liveObjects[channelName]?.dispose("Channel has ben released using channels.release()")
2126
liveObjects.remove(channelName)
2227
}
2328

2429
override fun dispose() {
2530
liveObjects.values.forEach {
26-
it.dispose()
31+
it.dispose("AblyClient has been closed using client.close()")
2732
}
2833
liveObjects.clear()
2934
}

live-objects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ internal enum class ErrorCode(public val code: Int) {
44
BadRequest(40_000),
55
InternalError(50_000),
66
MaxMessageSizeExceeded(40_009),
7+
InvalidObject(92_000),
8+
// LiveMap specific error codes
9+
MapKeyShouldBeString(40_003),
10+
MapValueDataTypeUnsupported(40_013),
711
}
812

913
internal enum class HttpStatusCode(public val code: Int) {

live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ internal fun LiveObjectsAdapter.ensureMessageSizeWithinLimit(objectMessages: Arr
3232
}
3333
}
3434

35+
internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMessage: ProtocolMessage) {
36+
if (protocolMessage.action != ProtocolMessage.Action.`object`) return
37+
val channelSerial = protocolMessage.channelSerial
38+
if (channelSerial.isNullOrEmpty()) return
39+
setChannelSerial(channelName, channelSerial)
40+
}
41+
3542
internal class Binary(val data: ByteArray) {
3643
override fun equals(other: Any?): Boolean {
3744
if (this === other) return true
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.ably.lib.objects
2+
3+
import io.ably.lib.objects.type.ObjectType
4+
5+
internal class ObjectId private constructor(
6+
internal val type: ObjectType,
7+
private val hash: String,
8+
private val timestampMs: Long
9+
) {
10+
/**
11+
* Converts ObjectId to string representation.
12+
*/
13+
override fun toString(): String {
14+
return "${type.value}:$hash@$timestampMs"
15+
}
16+
17+
companion object {
18+
/**
19+
* Creates ObjectId instance from hashed object id string.
20+
*/
21+
fun fromString(objectId: String): ObjectId {
22+
if (objectId.isEmpty()) {
23+
throw objectError("Invalid object id: $objectId")
24+
}
25+
26+
// Parse format: type:hash@msTimestamp
27+
val parts = objectId.split(':')
28+
if (parts.size != 2) {
29+
throw objectError("Invalid object id: $objectId")
30+
}
31+
32+
val (typeStr, rest) = parts
33+
34+
val type = when (typeStr) {
35+
"map" -> ObjectType.Map
36+
"counter" -> ObjectType.Counter
37+
else -> throw objectError("Invalid object type in object id: $objectId")
38+
}
39+
40+
val hashAndTimestamp = rest.split('@')
41+
if (hashAndTimestamp.size != 2) {
42+
throw objectError("Invalid object id: $objectId")
43+
}
44+
45+
val hash = hashAndTimestamp[0]
46+
47+
if (hash.isEmpty()) {
48+
throw objectError("Invalid object id: $objectId")
49+
}
50+
51+
val msTimestampStr = hashAndTimestamp[1]
52+
53+
val msTimestamp = try {
54+
msTimestampStr.toLong()
55+
} catch (e: NumberFormatException) {
56+
throw objectError("Invalid object id: $objectId", e)
57+
}
58+
59+
return ObjectId(type, hash, msTimestamp)
60+
}
61+
}
62+
}

0 commit comments

Comments
 (0)