Skip to content

Commit bdb7af0

Browse files
committed
[ECO-5426] Added separate LivemapManager for handling incoming objectMessages
- Refactored/simplified GC for LiveMapEntries and LiveObjects - Improved BaseLiveObject interface, added comprehensive doc
1 parent 838bf14 commit bdb7af0

8 files changed

Lines changed: 454 additions & 452 deletions

File tree

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
246246
// RTO5c1a
247247
if (existingObject != null) {
248248
// Update existing object
249-
val update = existingObject.overrideWithObjectState(objectState) // RTO5c1a1
249+
val update = existingObject.applyObjectState(objectState) // RTO5c1a1
250250
existingObjectUpdates.add(Pair(existingObject, update))
251251
} else { // RTO5c1b
252252
// RTO5c1b1 - Create new object and add it to the pool
@@ -323,7 +323,7 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
323323
objectState.map != null -> DefaultLiveMap.zeroValue(objectState.objectId, adapter, objectsPool) // RTO5c1b1b
324324
else -> throw clientError("Object state must contain either counter or map data") // RTO5c1b1c
325325
}.apply {
326-
overrideWithObjectState(objectState)
326+
applyObjectState(objectState)
327327
}
328328
}
329329

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ internal class ObjectsPool(
104104
/**
105105
* Clears the data stored for all objects in the pool.
106106
*/
107-
fun clearObjectsData(emitUpdateEvents: Boolean) {
107+
private fun clearObjectsData(emitUpdateEvents: Boolean) {
108108
for (obj in pool.values) {
109109
val update = obj.clearData()
110110
if (emitUpdateEvents) {
@@ -118,7 +118,7 @@ internal class ObjectsPool(
118118
*
119119
* @spec RTO6 - Creates zero-value objects when needed
120120
*/
121-
fun createZeroValueObjectIfNotExists(objectId: String): BaseLiveObject {
121+
internal fun createZeroValueObjectIfNotExists(objectId: String): BaseLiveObject {
122122
val existingObject = get(objectId)
123123
if (existingObject != null) {
124124
return existingObject // RTO6a
@@ -148,23 +148,13 @@ internal class ObjectsPool(
148148
* Garbage collection interval handler.
149149
*/
150150
private fun onGCInterval() {
151-
val toDelete = mutableListOf<String>()
152-
153-
for ((objectId, obj) in pool.entries) {
154-
// Tombstoned objects should be removed from the pool if they have been tombstoned for longer than grace period.
155-
// By removing them from the local pool, Objects plugin no longer keeps a reference to those objects, allowing JVM's
156-
// Garbage Collection to eventually free the memory for those objects, provided the user no longer references them either.
157-
if (obj.isTombstoned &&
158-
obj.tombstonedAt != null &&
159-
System.currentTimeMillis() - obj.tombstonedAt!! >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS) {
160-
toDelete.add(objectId)
161-
continue
151+
pool.entries.removeIf { (_, obj) ->
152+
if (obj.isEligibleForGc()) { true } // Remove from pool
153+
else {
154+
obj.onGCInterval()
155+
false // Keep in pool
162156
}
163-
164-
obj.onGCInterval()
165157
}
166-
167-
toDelete.forEach { pool.remove(it) }
168158
}
169159

170160
/**
Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package io.ably.lib.objects.type
22

3-
import io.ably.lib.objects.LiveObjectsAdapter
3+
import io.ably.lib.objects.*
44
import io.ably.lib.objects.ObjectMessage
55
import io.ably.lib.objects.ObjectOperation
66
import io.ably.lib.objects.ObjectState
7-
import io.ably.lib.objects.ObjectsPool
7+
import io.ably.lib.objects.ObjectsPoolDefaults
88
import io.ably.lib.objects.objectError
99
import io.ably.lib.util.Log
1010

@@ -15,20 +15,18 @@ import io.ably.lib.util.Log
1515
* @spec RTLO1/RTLO2 - Base class for LiveMap/LiveCounter objects
1616
*/
1717
internal abstract class BaseLiveObject(
18-
protected val objectId: String, // // RTLO3a
18+
internal val objectId: String, // // RTLO3a
1919
protected val adapter: LiveObjectsAdapter
2020
) {
2121

2222
protected open val tag = "BaseLiveObject"
23-
internal var isTombstoned = false
24-
internal var tombstonedAt: Long? = null
2523

26-
protected val siteTimeserials = mutableMapOf<String, String>() // RTLO3b
24+
internal val siteTimeserials = mutableMapOf<String, String>() // RTLO3b
2725

28-
/**
29-
* @spec RTLO3 - Flag to track if create operation has been merged for LiveMap/LiveCounter
30-
*/
31-
protected var createOperationIsMerged = false // RTLO3c
26+
internal var createOperationIsMerged = false // RTLO3c
27+
28+
internal var isTombstoned = false
29+
internal var tombstonedAt: Long? = null
3230

3331
fun notifyUpdated(update: Any) {
3432
// TODO: Implement event emission for updates
@@ -40,7 +38,7 @@ internal abstract class BaseLiveObject(
4038
*
4139
* @spec RTLO4a - Serial comparison logic for LiveMap/LiveCounter operations
4240
*/
43-
protected fun canApplyOperation(siteCode: String?, serial: String?): Boolean {
41+
internal fun canApplyOperation(siteCode: String?, serial: String?): Boolean {
4442
if (serial.isNullOrEmpty()) {
4543
throw objectError("Invalid serial: $serial") // RTLO4a3
4644
}
@@ -51,41 +49,31 @@ internal abstract class BaseLiveObject(
5149
return existingSiteSerial == null || serial > existingSiteSerial // RTLO4a5, RTLO4a6
5250
}
5351

54-
/**
55-
* Updates the time serial for a given site code.
56-
*/
57-
protected fun updateTimeSerial(opSiteCode: String, opSerial: String) {
58-
siteTimeserials[opSiteCode] = opSerial
59-
}
60-
61-
/**
62-
* Applies object delete operation.
63-
*
64-
* @spec RTLM10/RTLC10 - Object deletion for LiveMap/LiveCounter
65-
*/
66-
protected fun applyObjectDelete(): Any {
67-
return tombstone()
68-
}
69-
7052
/**
7153
* Marks the object as tombstoned.
72-
*
73-
* @spec RTLM11/RTLC11 - Tombstone functionality for LiveMap/LiveCounter
7454
*/
75-
protected fun tombstone(): Any {
55+
internal fun tombstone(): Any {
7656
isTombstoned = true
7757
tombstonedAt = System.currentTimeMillis()
7858
val update = clearData()
7959
// TODO: Emit lifecycle events
8060
return update
8161
}
8262

63+
/**
64+
* Checks if the object is eligible for garbage collection.
65+
*/
66+
internal fun isEligibleForGc(): Boolean {
67+
val currentTime = System.currentTimeMillis()
68+
return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true
69+
}
70+
8371
/**
8472
* This is invoked by ObjectMessage having updated data with parent `ProtocolMessageAction` as `object_sync`
8573
* @return an update describing the changes
8674
* @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter
8775
*/
88-
abstract fun overrideWithObjectState(objectState: ObjectState): Map<String, Any>
76+
abstract fun applyObjectState(objectState: ObjectState): Map<String, Any>
8977

9078
/**
9179
* This is invoked by ObjectMessage having updated data with parent `ProtocolMessageAction` as `object`
@@ -96,11 +84,28 @@ internal abstract class BaseLiveObject(
9684

9785
/**
9886
* Clears the object's data and returns an update describing the changes.
87+
* This is called during tombstoning and explicit clear operations.
88+
*
89+
* This method:
90+
* 1. Calculates a diff between the current state and an empty state
91+
* 2. Clears all entries from the underlying data structure
92+
* 3. Returns a map containing metadata about what was cleared
93+
*
94+
* The returned map is used to notifying other components about what entries were removed.
95+
*
96+
* @return A map representing the diff of changes made
9997
*/
10098
abstract fun clearData(): Map<String, Any>
10199

102100
/**
103-
* Called during garbage collection intervals.
101+
* Called during garbage collection intervals to clean up expired entries.
102+
*
103+
* This method should identify and remove entries that:
104+
* - Have been marked as tombstoned
105+
* - Have a tombstone timestamp older than the configured grace period
106+
*
107+
* Implementations typically use single-pass removal techniques to
108+
* efficiently clean up expired data without creating temporary collections.
104109
*/
105110
abstract fun onGCInterval()
106111
}

live-objects/src/main/kotlin/io/ably/lib/objects/type/DefaultLiveCounter.kt

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ internal class DefaultLiveCounter(
3333
/**
3434
* @spec RTLC6 - Overrides counter data with state from sync
3535
*/
36-
override fun overrideWithObjectState(objectState: ObjectState): Map<String, Long> {
36+
override fun applyObjectState(objectState: ObjectState): Map<String, Long> {
3737
if (objectState.objectId != objectId) {
3838
throw objectError("Invalid object state: object state objectId=${objectState.objectId}; LiveCounter objectId=$objectId")
3939
}
@@ -94,7 +94,7 @@ internal class DefaultLiveCounter(
9494
}
9595
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
9696
// as it's important to mark that the op was processed by the object
97-
updateTimeSerial(opSiteCode!!, opSerial!!) // RTLC7c
97+
siteTimeserials[opSiteCode!!] = opSerial!! // RTLC7c
9898

9999
if (isTombstoned) {
100100
// this object is tombstoned so the operation cannot be applied
@@ -110,7 +110,7 @@ internal class DefaultLiveCounter(
110110
throw payloadError(operation)
111111
}
112112
}
113-
ObjectOperationAction.ObjectDelete -> applyObjectDelete()
113+
ObjectOperationAction.ObjectDelete -> tombstone()
114114
else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3
115115
}
116116

@@ -165,10 +165,6 @@ internal class DefaultLiveCounter(
165165
return mapOf("amount" to count)
166166
}
167167

168-
/**
169-
* Called during garbage collection intervals.
170-
* Nothing to GC for a counter object.
171-
*/
172168
override fun onGCInterval() {
173169
// Nothing to GC for a counter object
174170
return

0 commit comments

Comments
 (0)