Skip to content

Commit 529efd8

Browse files
committed
[ECO-5426] Updated code, marked data types in objectsPool, LiveCounter and LiveMap as thread safe
1 parent 45fde3f commit 529efd8

15 files changed

Lines changed: 160 additions & 110 deletions

File tree

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ internal enum class ObjectsState {
2121
* Default implementation of LiveObjects interface.
2222
* Provides the core functionality for managing live objects on a channel.
2323
*/
24-
internal class DefaultLiveObjects(private val channelName: String, internal val adapter: LiveObjectsAdapter): LiveObjects {
24+
internal class DefaultLiveObjects(internal val channelName: String, internal val adapter: LiveObjectsAdapter): LiveObjects {
2525
private val tag = "DefaultLiveObjects"
2626
/**
2727
* @spec RTO3 - Objects pool storing all live objects by object ID
2828
*/
29-
internal val objectsPool = ObjectsPool(adapter)
29+
internal val objectsPool = ObjectsPool(this)
3030

3131
internal var state = ObjectsState.INITIALIZED
3232

@@ -203,9 +203,12 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
203203
}
204204

205205
// Dispose of any resources associated with this LiveObjects instance
206-
fun dispose() {
207-
incomingObjectsHandler.cancel() // objectsEventBus automatically garbage collected when collector is cancelled
206+
fun dispose(reason: String) {
207+
val cancellationError = CancellationException("Objects disposed for channel $channelName, reason: $reason")
208+
incomingObjectsHandler.cancel(cancellationError) // objectsEventBus automatically garbage collected when collector is cancelled
208209
objectsPool.dispose()
209210
objectsManager.dispose()
211+
// Don't cancel sequentialScope (needed in public methods), just cancel ongoing coroutines
212+
sequentialScope.coroutineContext.cancelChildren(cancellationError)
210213
}
211214
}

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) :
2222
}
2323

2424
override fun dispose(channelName: String) {
25-
liveObjects[channelName]?.dispose()
25+
liveObjects[channelName]?.dispose("Channel has ben released using channels.release()")
2626
liveObjects.remove(channelName)
2727
}
2828

2929
override fun dispose() {
3030
liveObjects.values.forEach {
31-
it.dispose()
31+
it.dispose("AblyClient has been closed using client.close()")
3232
}
3333
liveObjects.clear()
3434
}

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,8 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
214214
*/
215215
private fun createObjectFromState(objectState: ObjectState): BaseLiveObject {
216216
return when {
217-
objectState.counter != null -> DefaultLiveCounter.zeroValue(objectState.objectId, liveObjects.adapter) // RTO5c1b1a
218-
objectState.map != null -> DefaultLiveMap.zeroValue(objectState.objectId, liveObjects.adapter, liveObjects.objectsPool) // RTO5c1b1b
217+
objectState.counter != null -> DefaultLiveCounter.zeroValue(objectState.objectId, liveObjects) // RTO5c1b1a
218+
objectState.map != null -> DefaultLiveMap.zeroValue(objectState.objectId, liveObjects) // RTO5c1b1b
219219
else -> throw clientError("Object state must contain either counter or map data") // RTO5c1b1c
220220
}
221221
}

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
66
import io.ably.lib.objects.type.livemap.DefaultLiveMap
77
import io.ably.lib.util.Log
88
import kotlinx.coroutines.*
9+
import java.util.concurrent.ConcurrentHashMap
910

1011
/**
1112
* Constants for ObjectsPool configuration
@@ -32,14 +33,15 @@ internal const val ROOT_OBJECT_ID = "root"
3233
* @spec RTO3 - Maintains an objects pool for all live objects on the channel
3334
*/
3435
internal class ObjectsPool(
35-
private val adapter: LiveObjectsAdapter
36+
private val liveObjects: DefaultLiveObjects
3637
) {
3738
private val tag = "ObjectsPool"
3839

3940
/**
41+
* ConcurrentHashMap for thread-safe access from public APIs in LiveMap and LiveCounter.
4042
* @spec RTO3a - Pool storing all live objects by object ID
4143
*/
42-
private val pool = mutableMapOf<String, BaseLiveObject>()
44+
private val pool = ConcurrentHashMap<String, BaseLiveObject>()
4345

4446
/**
4547
* Coroutine scope for garbage collection
@@ -48,22 +50,12 @@ internal class ObjectsPool(
4850
private var gcJob: Job // Job for the garbage collection coroutine
4951

5052
init {
51-
// Initialize pool with root object
52-
createInitialPool()
53+
// RTO3b - Initialize pool with root object
54+
pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, liveObjects)
5355
// Start garbage collection coroutine
5456
gcJob = startGCJob()
5557
}
5658

57-
/**
58-
* Creates the initial pool with root object.
59-
*
60-
* @spec RTO3b - Creates root LiveMap object
61-
*/
62-
private fun createInitialPool() {
63-
val root = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, adapter, this)
64-
pool[ROOT_OBJECT_ID] = root
65-
}
66-
6759
/**
6860
* Gets a live object from the pool by object ID.
6961
*/
@@ -119,8 +111,8 @@ internal class ObjectsPool(
119111

120112
val parsedObjectId = ObjectId.fromString(objectId) // RTO6b
121113
return when (parsedObjectId.type) {
122-
ObjectType.Map -> DefaultLiveMap.zeroValue(objectId, adapter, this) // RTO6b2
123-
ObjectType.Counter -> DefaultLiveCounter.zeroValue(objectId, adapter) // RTO6b3
114+
ObjectType.Map -> DefaultLiveMap.zeroValue(objectId, liveObjects) // RTO6b2
115+
ObjectType.Counter -> DefaultLiveCounter.zeroValue(objectId, liveObjects) // RTO6b3
124116
}.apply {
125117
set(objectId, this) // RTO6b4 - Add the zero-value object to the pool
126118
}

live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import io.ably.lib.objects.ObjectState
66
import io.ably.lib.objects.type.BaseLiveObject
77
import io.ably.lib.objects.type.ObjectType
88
import io.ably.lib.types.Callback
9+
import java.util.concurrent.atomic.AtomicReference
910

1011
/**
1112
* Implementation of LiveObject for LiveCounter.
@@ -14,21 +15,25 @@ import io.ably.lib.types.Callback
1415
*/
1516
internal class DefaultLiveCounter private constructor(
1617
objectId: String,
17-
adapter: LiveObjectsAdapter,
18+
private val liveObjects: DefaultLiveObjects,
1819
) : LiveCounter, BaseLiveObject(objectId, ObjectType.Counter) {
1920

2021
override val tag = "LiveCounter"
2122

2223
/**
23-
* Counter data value
24+
* Thread-safe reference to hold the counter data value.
25+
* Accessed from public API for LiveCounter and updated by LiveCounterManager.
2426
*/
25-
internal var data: Double = 0.0 // RTLC3
27+
internal val data = AtomicReference<Double>(0.0) // RTLC3
2628

2729
/**
2830
* liveCounterManager instance for managing LiveMap operations
2931
*/
3032
private val liveCounterManager = LiveCounterManager(this)
3133

34+
private val channelName = liveObjects.channelName
35+
private val adapter: LiveObjectsAdapter get() = liveObjects.adapter
36+
3237
override fun increment() {
3338
TODO("Not yet implemented")
3439
}
@@ -60,7 +65,7 @@ internal class DefaultLiveCounter private constructor(
6065
}
6166

6267
override fun clearData(): Map<String, Double> {
63-
return mapOf("amount" to data).apply { data = 0.0 }
68+
return mapOf("amount" to data.get()).apply { data.set(0.0) }
6469
}
6570

6671
override fun onGCInterval() {
@@ -73,8 +78,8 @@ internal class DefaultLiveCounter private constructor(
7378
* Creates a zero-value counter object.
7479
* @spec RTLC4 - Returns LiveCounter with 0 value
7580
*/
76-
internal fun zeroValue(objectId: String, adapter: LiveObjectsAdapter): DefaultLiveCounter {
77-
return DefaultLiveCounter(objectId, adapter)
81+
internal fun zeroValue(objectId: String, liveObjects: DefaultLiveObjects): DefaultLiveCounter {
82+
return DefaultLiveCounter(objectId, liveObjects)
7883
}
7984
}
8085
}

live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,22 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
1616
* @spec RTLC6 - Overrides counter data with state from sync
1717
*/
1818
internal fun applyState(objectState: ObjectState): Map<String, Double> {
19-
val previousData = liveCounter.data
19+
val previousData = liveCounter.data.get()
2020

2121
if (objectState.tombstone) {
2222
liveCounter.tombstone()
2323
} else {
2424
// override data for this object with data from the object state
2525
liveCounter.createOperationIsMerged = false // RTLC6b
26-
liveCounter.data = objectState.counter?.count ?: 0.0 // RTLC6c
26+
liveCounter.data.set(objectState.counter?.count ?: 0.0) // RTLC6c
2727

2828
// RTLC6d
2929
objectState.createOp?.let { createOp ->
3030
mergeInitialDataFromCreateOperation(createOp)
3131
}
3232
}
3333

34-
return mapOf("amount" to (liveCounter.data - previousData))
34+
return mapOf("amount" to (liveCounter.data.get() - previousData))
3535
}
3636

3737
/**
@@ -78,7 +78,8 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
7878
*/
7979
private fun applyCounterInc(counterOp: ObjectCounterOp): Map<String, Double> {
8080
val amount = counterOp.amount ?: 0.0
81-
liveCounter.data += amount // RTLC9b
81+
val previousValue = liveCounter.data.get()
82+
liveCounter.data.set(previousValue + amount) // RTLC9b
8283
return mapOf("amount" to amount)
8384
}
8485

@@ -91,7 +92,8 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
9192
// if we got here, it means that current counter instance is missing the initial value in its data reference,
9293
// which we're going to add now.
9394
val count = operation.counter?.count ?: 0.0
94-
liveCounter.data += count // RTLC10a
95+
val previousValue = liveCounter.data.get()
96+
liveCounter.data.set(previousValue + count) // RTLC10a
9597
liveCounter.createOperationIsMerged = true // RTLC10b
9698
return mapOf("amount" to count)
9799
}

live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,14 @@
11
package io.ably.lib.objects.type.livemap
22

33
import io.ably.lib.objects.*
4-
import io.ably.lib.objects.ObjectsPool
5-
import io.ably.lib.objects.ObjectsPoolDefaults
64
import io.ably.lib.objects.MapSemantics
7-
import io.ably.lib.objects.ObjectData
85
import io.ably.lib.objects.ObjectMessage
96
import io.ably.lib.objects.ObjectOperation
107
import io.ably.lib.objects.ObjectState
118
import io.ably.lib.objects.type.BaseLiveObject
129
import io.ably.lib.objects.type.ObjectType
1310
import io.ably.lib.types.Callback
14-
15-
/**
16-
* @spec RTLM3 - Map data structure storing entries
17-
*/
18-
internal data class LiveMapEntry(
19-
var isTombstoned: Boolean = false,
20-
var tombstonedAt: Long? = null,
21-
var timeserial: String? = null,
22-
var data: ObjectData? = null
23-
)
24-
25-
/**
26-
* Extension function to check if a LiveMapEntry is expired and ready for garbage collection
27-
*/
28-
private fun LiveMapEntry.isEligibleForGc(): Boolean {
29-
val currentTime = System.currentTimeMillis()
30-
return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true
31-
}
11+
import java.util.concurrent.ConcurrentHashMap
3212

3313
/**
3414
* Implementation of LiveObject for LiveMap.
@@ -37,22 +17,25 @@ private fun LiveMapEntry.isEligibleForGc(): Boolean {
3717
*/
3818
internal class DefaultLiveMap private constructor(
3919
objectId: String,
40-
adapter: LiveObjectsAdapter,
41-
internal val objectsPool: ObjectsPool,
20+
private val liveObjects: DefaultLiveObjects,
4221
internal val semantics: MapSemantics = MapSemantics.LWW
4322
) : LiveMap, BaseLiveObject(objectId, ObjectType.Map) {
4423

4524
override val tag = "LiveMap"
25+
4626
/**
47-
* Map of key to LiveMapEntry
27+
* ConcurrentHashMap for thread-safe access from public APIs in LiveMap and LiveMapManager.
4828
*/
49-
internal val data = mutableMapOf<String, LiveMapEntry>()
29+
internal val data = ConcurrentHashMap<String, LiveMapEntry>()
5030

5131
/**
5232
* LiveMapManager instance for managing LiveMap operations
5333
*/
5434
private val liveMapManager = LiveMapManager(this)
5535

36+
private val channelName = liveObjects.channelName
37+
private val adapter: LiveObjectsAdapter get() = liveObjects.adapter
38+
internal val objectsPool: ObjectsPool get() = liveObjects.objectsPool
5639

5740
override fun get(keyName: String): Any? {
5841
TODO("Not yet implemented")
@@ -114,8 +97,8 @@ internal class DefaultLiveMap private constructor(
11497
* Creates a zero-value map object.
11598
* @spec RTLM4 - Returns LiveMap with empty map data
11699
*/
117-
internal fun zeroValue(objectId: String, adapter: LiveObjectsAdapter, objectsPool: ObjectsPool): DefaultLiveMap {
118-
return DefaultLiveMap(objectId, adapter, objectsPool)
100+
internal fun zeroValue(objectId: String, objects: DefaultLiveObjects): DefaultLiveMap {
101+
return DefaultLiveMap(objectId, objects)
119102
}
120103
}
121104
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package io.ably.lib.objects.type.livemap
2+
3+
import io.ably.lib.objects.ObjectData
4+
import io.ably.lib.objects.ObjectsPool
5+
import io.ably.lib.objects.ObjectsPoolDefaults
6+
7+
/**
8+
* @spec RTLM3 - Map data structure storing entries
9+
*/
10+
internal data class LiveMapEntry(
11+
val isTombstoned: Boolean = false,
12+
val tombstonedAt: Long? = null,
13+
val timeserial: String? = null,
14+
val data: ObjectData? = null
15+
)
16+
17+
/**
18+
* Checks if entry is directly tombstoned or references a tombstoned object. Spec: RTLM14
19+
* @param objectsPool The object pool containing referenced LiveObjects
20+
*/
21+
internal fun LiveMapEntry.isEntryOrRefTombstoned(objectsPool: ObjectsPool): Boolean {
22+
if (isTombstoned) {
23+
return true // RTLM14a
24+
}
25+
data?.objectId?.let { refId -> // RTLM5d2f -has an objectId reference
26+
objectsPool.get(refId)?.let { refObject ->
27+
if (refObject.isTombstoned) {
28+
return true
29+
}
30+
}
31+
}
32+
return false // RTLM14b
33+
}
34+
35+
/**
36+
* Returns value as is if object data stores a primitive type or
37+
* a reference to another LiveObject from the pool if it stores an objectId.
38+
*/
39+
internal fun LiveMapEntry.getResolvedValue(objectsPool: ObjectsPool): Any? {
40+
if (isTombstoned) { return null } // RTLM5d2a
41+
42+
data?.value?.let { return it.value } // RTLM5d2b, RTLM5d2c, RTLM5d2d, RTLM5d2e
43+
44+
data?.objectId?.let { refId -> // RTLM5d2f -has an objectId reference
45+
objectsPool.get(refId)?.let { refObject ->
46+
if (refObject.isTombstoned) {
47+
return null // tombstoned objects must not be surfaced to the end users
48+
}
49+
return refObject // RTLM5d2f2
50+
}
51+
}
52+
return null // RTLM5d2g, RTLM5d2f1
53+
}
54+
55+
/**
56+
* Extension function to check if a LiveMapEntry is expired and ready for garbage collection
57+
*/
58+
internal fun LiveMapEntry.isEligibleForGc(): Boolean {
59+
val currentTime = System.currentTimeMillis()
60+
return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true
61+
}

0 commit comments

Comments
 (0)