Skip to content

Commit 5d1fc81

Browse files
committed
Merge branch 'feature/objects-getroot' into feature/object-subscriptions
2 parents 60ee93c + 8b5ea81 commit 5d1fc81

10 files changed

Lines changed: 71 additions & 94 deletions

File tree

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import io.ably.lib.objects.state.ObjectsStateEvent
55
import io.ably.lib.objects.type.counter.LiveCounter
66
import io.ably.lib.objects.type.map.LiveMap
77
import io.ably.lib.realtime.ChannelState
8+
import io.ably.lib.types.AblyException
89
import io.ably.lib.types.Callback
910
import io.ably.lib.types.ProtocolMessage
1011
import io.ably.lib.util.Log
@@ -24,7 +25,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
2425
*/
2526
internal val objectsPool = ObjectsPool(this)
2627

27-
internal var state = ObjectsState.INITIALIZED
28+
internal var state = ObjectsState.Initialized
2829

2930
/**
3031
* @spec RTO4 - Used for handling object messages and object sync messages
@@ -37,6 +38,11 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
3738
private val sequentialScope =
3839
CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName(channelName) + SupervisorJob())
3940

41+
/**
42+
* Provides a channel-specific scope for safely executing asynchronous operations with callbacks.
43+
*/
44+
private val callbackScope = ObjectsCallbackScope(channelName)
45+
4046
/**
4147
* Event bus for handling incoming object messages sequentially.
4248
*/
@@ -49,10 +55,6 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
4955

5056
override fun getRoot(): LiveMap = runBlocking { getRootAsync() }
5157

52-
override fun getRootAsync(callback: Callback<LiveMap>) {
53-
GlobalCallbackScope.launchWithCallback(callback) { getRootAsync() }
54-
}
55-
5658
override fun createMap(liveMap: LiveMap): LiveMap {
5759
TODO("Not yet implemented")
5860
}
@@ -65,6 +67,10 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
6567
TODO("Not yet implemented")
6668
}
6769

70+
override fun getRootAsync(callback: Callback<LiveMap>) {
71+
callbackScope.launchWithCallback(callback) { getRootAsync() }
72+
}
73+
6874
override fun createMapAsync(liveMap: LiveMap, callback: Callback<LiveMap>) {
6975
TODO("Not yet implemented")
7076
}
@@ -157,7 +163,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
157163
Log.v(tag, "Objects.onAttached() channel=$channelName, hasObjects=$hasObjects")
158164

159165
// RTO4a
160-
val fromInitializedState = this@DefaultLiveObjects.state == ObjectsState.INITIALIZED
166+
val fromInitializedState = this@DefaultLiveObjects.state == ObjectsState.Initialized
161167
if (hasObjects || fromInitializedState) {
162168
// should always start a new sync sequence if we're in the initialized state, no matter the HAS_OBJECTS flag value.
163169
// this guarantees we emit both "syncing" -> "synced" events in that order.
@@ -191,12 +197,13 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
191197
}
192198

193199
// Dispose of any resources associated with this LiveObjects instance
194-
fun dispose(reason: String) {
195-
val cancellationError = CancellationException("Objects disposed for channel $channelName, reason: $reason")
196-
incomingObjectsHandler.cancel(cancellationError) // objectsEventBus automatically garbage collected when collector is cancelled
200+
fun dispose(cause: AblyException) {
201+
val disposeReason = CancellationException().apply { initCause(cause) }
202+
incomingObjectsHandler.cancel(disposeReason) // objectsEventBus automatically garbage collected when collector is cancelled
197203
objectsPool.dispose()
198204
objectsManager.dispose()
199-
// Don't cancel sequentialScope (needed in public methods), just cancel ongoing coroutines
200-
sequentialScope.coroutineContext.cancelChildren(cancellationError)
205+
// Don't cancel sequentialScope (needed in getRoot method), just cancel ongoing coroutines
206+
sequentialScope.coroutineContext.cancelChildren(disposeReason)
207+
callbackScope.cancel(disposeReason) // cancel all ongoing callbacks
201208
}
202209
}

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) :
2222
}
2323

2424
override fun dispose(channelName: String) {
25-
liveObjects[channelName]?.dispose("Channel has ben released using channels.release()")
25+
liveObjects[channelName]?.dispose(clientError("Channel has ben released using channels.release()"))
2626
liveObjects.remove(channelName)
2727
}
2828

2929
override fun dispose() {
3030
liveObjects.values.forEach {
31-
it.dispose("AblyClient has been closed using client.close()")
31+
it.dispose(clientError("AblyClient has been closed using client.close()"))
3232
}
3333
liveObjects.clear()
3434
}

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
2828
* @spec RTO8 - Buffers messages if not synced, applies immediately if synced
2929
*/
3030
internal fun handleObjectMessages(objectMessages: List<ObjectMessage>) {
31-
if (liveObjects.state != ObjectsState.SYNCED) {
31+
if (liveObjects.state != ObjectsState.Synced) {
3232
// RTO7 - The client receives object messages in realtime over the channel concurrently with the sync sequence.
3333
// Some of the incoming object messages may have already been applied to the objects described in
3434
// the sync sequence, but others may not; therefore we must buffer these messages so that we can apply
@@ -78,7 +78,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
7878
bufferedObjectOperations.clear() // RTO5a2b
7979
syncObjectsDataPool.clear() // RTO5a2a
8080
currentSyncId = syncId
81-
stateChange(ObjectsState.SYNCING, false)
81+
stateChange(ObjectsState.Syncing, false)
8282
}
8383

8484
/**
@@ -96,7 +96,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
9696
bufferedObjectOperations.clear() // RTO5c5
9797
syncObjectsDataPool.clear() // RTO5c4
9898
currentSyncId = null // RTO5c3
99-
stateChange(ObjectsState.SYNCED, deferStateEvent)
99+
stateChange(ObjectsState.Synced, deferStateEvent)
100100
}
101101

102102
/**

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsState.kt

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ import kotlinx.coroutines.*
1010
* @spec RTO2 - enum representing objects state
1111
*/
1212
internal enum class ObjectsState {
13-
INITIALIZED,
14-
SYNCING,
15-
SYNCED
13+
Initialized,
14+
Syncing,
15+
Synced
1616
}
1717

1818
/**
@@ -21,9 +21,9 @@ internal enum class ObjectsState {
2121
* INITIALIZED maps to null (no event), while SYNCING and SYNCED map to their respective events.
2222
*/
2323
private val objectsStateToEventMap = mapOf(
24-
ObjectsState.INITIALIZED to null,
25-
ObjectsState.SYNCING to ObjectsStateEvent.SYNCING,
26-
ObjectsState.SYNCED to ObjectsStateEvent.SYNCED
24+
ObjectsState.Initialized to null,
25+
ObjectsState.Syncing to ObjectsStateEvent.SYNCING,
26+
ObjectsState.Synced to ObjectsStateEvent.SYNCED
2727
)
2828

2929
/**
@@ -63,8 +63,6 @@ internal abstract class ObjectsStateCoordinator : ObjectsStateChange, HandlesObj
6363
// related to RTC10, should have a separate EventEmitter for users of the library
6464
private val externalObjectStateEmitter = ObjectsStateEmitter()
6565

66-
private val emitterScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob())
67-
6866
override fun on(event: ObjectsStateEvent, listener: ObjectsStateChange.Listener): ObjectsSubscription {
6967
externalObjectStateEmitter.on(event, listener)
7068
return ObjectsSubscription {
@@ -78,16 +76,13 @@ internal abstract class ObjectsStateCoordinator : ObjectsStateChange, HandlesObj
7876

7977
override fun objectsStateChanged(newState: ObjectsState) {
8078
objectsStateToEventMap[newState]?.let { objectsStateEvent ->
81-
// emitterScope makes sure next launch can only start when previous launch finishes
82-
emitterScope.launch {
83-
internalObjectStateEmitter.emit(objectsStateEvent)
84-
externalObjectStateEmitter.emit(objectsStateEvent)
85-
}
79+
internalObjectStateEmitter.emit(objectsStateEvent)
80+
externalObjectStateEmitter.emit(objectsStateEvent)
8681
}
8782
}
8883

8984
override suspend fun ensureSynced(currentState: ObjectsState) {
90-
if (currentState != ObjectsState.SYNCED) {
85+
if (currentState != ObjectsState.Synced) {
9186
val deferred = CompletableDeferred<Unit>()
9287
internalObjectStateEmitter.once(ObjectsStateEvent.SYNCED) {
9388
Log.v(tag, "Objects state changed to SYNCED, resuming ensureSynced")
@@ -97,10 +92,7 @@ internal abstract class ObjectsStateCoordinator : ObjectsStateChange, HandlesObj
9792
}
9893
}
9994

100-
override fun disposeObjectsStateListeners() {
101-
offAll()
102-
emitterScope.cancel("ObjectsManager disposed")
103-
}
95+
override fun disposeObjectsStateListeners() = offAll()
10496
}
10597

10698
private class ObjectsStateEmitter : EventEmitter<ObjectsStateEvent, ObjectsStateChange.Listener>() {

live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import io.ably.lib.types.Callback
55
import io.ably.lib.types.ErrorInfo
66
import io.ably.lib.util.Log
77
import kotlinx.coroutines.*
8+
import java.util.concurrent.CancellationException
89

910
internal fun ablyException(
1011
errorMessage: String,
@@ -49,26 +50,31 @@ internal val String.byteSize: Int
4950
get() = this.toByteArray(Charsets.UTF_8).size
5051

5152
/**
52-
* A global coroutine scope for executing callbacks asynchronously.
53-
* Provides safe execution of suspend functions with results delivered via callbacks,
54-
* with proper error handling for both the execution and callback invocation.
53+
* A channel-specific coroutine scope for executing callbacks asynchronously in the LiveObjects system.
54+
* Provides safe execution of suspend functions with results delivered via callbacks.
55+
* Supports proper error handling and cancellation during LiveObjects disposal.
5556
*/
56-
internal object GlobalCallbackScope {
57-
private const val TAG = "GlobalCallbackScope"
57+
internal class ObjectsCallbackScope(channelName: String) {
58+
private val tag = "ObjectsCallbackScope-$channelName"
59+
5860
private val scope =
59-
CoroutineScope(Dispatchers.Default + CoroutineName("LiveObjects-GlobalCallbackScope") + SupervisorJob())
61+
CoroutineScope(Dispatchers.Default + CoroutineName(tag) + SupervisorJob())
6062

6163
internal fun <T> launchWithCallback(callback: Callback<T>, block: suspend () -> T) {
6264
scope.launch {
6365
try {
6466
val result = block()
6567
try { callback.onSuccess(result) } catch (t: Throwable) {
66-
Log.e(TAG, "Error occurred while executing callback's onSuccess handler", t)
68+
Log.e(tag, "Error occurred while executing callback's onSuccess handler", t)
6769
} // catch and don't rethrow error from callback
6870
} catch (throwable: Throwable) {
6971
val exception = throwable as? AblyException
7072
callback.onError(exception?.errorInfo)
7173
}
7274
}
7375
}
76+
77+
internal fun cancel(cause: CancellationException) {
78+
scope.coroutineContext.cancelChildren(cause)
79+
}
7480
}

live-objects/src/main/kotlin/io/ably/lib/objects/serialization/JsonSerialization.kt

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,7 @@ internal class ObjectDataJsonSerializer : JsonSerializer<ObjectData>, JsonDeseri
5252
is String -> obj.addProperty("string", v)
5353
is Number -> obj.addProperty("number", v.toDouble())
5454
is Binary -> obj.addProperty("bytes", Base64.getEncoder().encodeToString(v.data))
55-
// Spec: OD4c5
56-
is JsonObject, is JsonArray -> {
57-
obj.addProperty("string", v.toString())
58-
obj.addProperty("encoding", "json")
59-
}
55+
is JsonObject, is JsonArray -> obj.addProperty("json", v.toString()) // Spec: OD4c5
6056
}
6157
}
6258
return obj
@@ -65,24 +61,12 @@ internal class ObjectDataJsonSerializer : JsonSerializer<ObjectData>, JsonDeseri
6561
override fun deserialize(json: JsonElement, typeOfT: Type?, context: JsonDeserializationContext?): ObjectData {
6662
val obj = if (json.isJsonObject) json.asJsonObject else throw JsonParseException("Expected JsonObject")
6763
val objectId = if (obj.has("objectId")) obj.get("objectId").asString else null
68-
val encoding = if (obj.has("encoding")) obj.get("encoding").asString else null
6964
val value = when {
7065
obj.has("boolean") -> ObjectValue(obj.get("boolean").asBoolean)
71-
// Spec: OD5b3
72-
obj.has("string") && encoding == "json" -> {
73-
val jsonStr = obj.get("string").asString
74-
val parsed = JsonParser.parseString(jsonStr)
75-
ObjectValue(
76-
when {
77-
parsed.isJsonObject -> parsed.asJsonObject
78-
parsed.isJsonArray -> parsed.asJsonArray
79-
else -> throw JsonParseException("Invalid JSON string for encoding=json")
80-
}
81-
)
82-
}
8366
obj.has("string") -> ObjectValue(obj.get("string").asString)
8467
obj.has("number") -> ObjectValue(obj.get("number").asDouble)
8568
obj.has("bytes") -> ObjectValue(Binary(Base64.getDecoder().decode(obj.get("bytes").asString)))
69+
obj.has("json") -> ObjectValue(JsonParser.parseString(obj.get("json").asString))
8670
else -> {
8771
if (objectId != null)
8872
null

live-objects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.ably.lib.objects.serialization
22

33
import com.google.gson.JsonArray
4-
import com.google.gson.JsonElement
54
import com.google.gson.JsonObject
65
import com.google.gson.JsonParser
76
import io.ably.lib.objects.*
@@ -617,9 +616,6 @@ private fun ObjectData.writeMsgpack(packer: MessagePacker) {
617616
if (objectId != null) fieldCount++
618617
value?.let {
619618
fieldCount++
620-
if (it.value is JsonElement) {
621-
fieldCount += 1 // For extra "encoding" field
622-
}
623619
}
624620

625621
packer.packMapHeader(fieldCount)
@@ -649,10 +645,8 @@ private fun ObjectData.writeMsgpack(packer: MessagePacker) {
649645
packer.writePayload(v.data)
650646
}
651647
is JsonObject, is JsonArray -> {
652-
packer.packString("string")
653-
packer.packString(v.toString())
654-
packer.packString("encoding")
655648
packer.packString("json")
649+
packer.packString(v.toString())
656650
}
657651
}
658652
}
@@ -665,8 +659,6 @@ private fun readObjectData(unpacker: MessageUnpacker): ObjectData {
665659
val fieldCount = unpacker.unpackMapHeader()
666660
var objectId: String? = null
667661
var value: ObjectValue? = null
668-
var encoding: String? = null
669-
var stringValue: String? = null
670662

671663
for (i in 0 until fieldCount) {
672664
val fieldName = unpacker.unpackString().intern()
@@ -680,33 +672,29 @@ private fun readObjectData(unpacker: MessageUnpacker): ObjectData {
680672
when (fieldName) {
681673
"objectId" -> objectId = unpacker.unpackString()
682674
"boolean" -> value = ObjectValue(unpacker.unpackBoolean())
683-
"string" -> stringValue = unpacker.unpackString()
675+
"string" -> value = ObjectValue(unpacker.unpackString())
684676
"number" -> value = ObjectValue(unpacker.unpackDouble())
685677
"bytes" -> {
686678
val size = unpacker.unpackBinaryHeader()
687679
val bytes = ByteArray(size)
688680
unpacker.readPayload(bytes)
689681
value = ObjectValue(Binary(bytes))
690682
}
691-
"encoding" -> encoding = unpacker.unpackString()
683+
"json" -> {
684+
val jsonString = unpacker.unpackString()
685+
val parsed = JsonParser.parseString(jsonString)
686+
value = ObjectValue(
687+
when {
688+
parsed.isJsonObject -> parsed.asJsonObject
689+
parsed.isJsonArray -> parsed.asJsonArray
690+
else ->
691+
throw ablyException("Invalid JSON string for json field", ErrorCode.MapValueDataTypeUnsupported, HttpStatusCode.InternalServerError)
692+
}
693+
)
694+
}
692695
else -> unpacker.skipValue()
693696
}
694697
}
695698

696-
// Handle string with encoding if needed
697-
if (stringValue != null && encoding == "json") {
698-
val parsed = JsonParser.parseString(stringValue)
699-
value = ObjectValue(
700-
when {
701-
parsed.isJsonObject -> parsed.asJsonObject
702-
parsed.isJsonArray -> parsed.asJsonArray
703-
else ->
704-
throw ablyException("Invalid JSON string for encoding=json", ErrorCode.MapValueDataTypeUnsupported, HttpStatusCode.InternalServerError)
705-
}
706-
)
707-
} else if (stringValue != null) {
708-
value = ObjectValue(stringValue)
709-
}
710-
711699
return ObjectData(objectId = objectId, value = value)
712700
}

live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class DefaultLiveObjectsTest : IntegrationTest() {
3838
val objects = channel.objects
3939
assertNotNull(objects)
4040

41-
assertEquals(ObjectsState.INITIALIZED, objects.State, "Initial state should be INITIALIZED")
41+
assertEquals(ObjectsState.Initialized, objects.State, "Initial state should be INITIALIZED")
4242

4343
val syncStates = mutableListOf<ObjectsStateEvent>()
4444
objects.on(ObjectsStateEvent.SYNCING) {

live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultLiveObjectsTest.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class DefaultLiveObjectsTest {
3434
// RTO4a - If the HAS_OBJECTS flag is 1, the server will shortly perform an OBJECT_SYNC sequence
3535
defaultLiveObjects.handleStateChange(ChannelState.attached, true)
3636

37-
assertWaiter { defaultLiveObjects.state == ObjectsState.SYNCING }
37+
assertWaiter { defaultLiveObjects.state == ObjectsState.Syncing }
3838

3939
// It is expected that the client will start a new sync sequence
4040
verify(exactly = 1) {
@@ -59,7 +59,7 @@ class DefaultLiveObjectsTest {
5959
defaultLiveObjects.handleStateChange(ChannelState.attached, false)
6060

6161
// Verify expected outcomes
62-
assertWaiter { defaultLiveObjects.state == ObjectsState.SYNCED } // RTO4b4
62+
assertWaiter { defaultLiveObjects.state == ObjectsState.Synced } // RTO4b4
6363

6464
verify(exactly = 1) {
6565
defaultLiveObjects.objectsPool.resetToInitialPool(true)
@@ -80,7 +80,7 @@ class DefaultLiveObjectsTest {
8080
val defaultLiveObjects = getDefaultLiveObjectsWithMockedDeps()
8181

8282
// Ensure we're in INITIALIZED state
83-
defaultLiveObjects.state = ObjectsState.INITIALIZED
83+
defaultLiveObjects.state = ObjectsState.Initialized
8484

8585
// RTO4a - Should start sync even with HAS_OBJECTS flag false when in INITIALIZED state
8686
defaultLiveObjects.handleStateChange(ChannelState.attached, false)

0 commit comments

Comments
 (0)