Skip to content

Commit 608a688

Browse files
committed
[ECO-5458] Enhanced LiveCounter with subscription support
- Updated BaseLiveObject with subscription coordination capabilities - Modified DefaultLiveCounter to integrate with LiveCounterChangeCoordinator - Enhanced LiveCounterManager to return LiveCounterUpdate objects
1 parent 36efe86 commit 608a688

3 files changed

Lines changed: 59 additions & 24 deletions

File tree

live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,18 @@ import io.ably.lib.objects.ObjectOperation
55
import io.ably.lib.objects.ObjectState
66
import io.ably.lib.objects.ObjectsPoolDefaults
77
import io.ably.lib.objects.objectError
8+
import io.ably.lib.objects.type.livecounter.noOpCounterUpdate
9+
import io.ably.lib.objects.type.livemap.noOpMapUpdate
810
import io.ably.lib.util.Log
911

1012
internal enum class ObjectType(val value: String) {
1113
Map("map"),
1214
Counter("counter")
1315
}
1416

17+
// Spec: RTLO4b4b
18+
internal val LiveObjectUpdate.noOp get() = this.update == null
19+
1520
/**
1621
* Base implementation of LiveObject interface.
1722
* Provides common functionality for all live objects.
@@ -42,7 +47,7 @@ internal abstract class BaseLiveObject(
4247
*
4348
* @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter
4449
*/
45-
internal fun applyObjectSync(objectState: ObjectState): Map<String, Any> {
50+
internal fun applyObjectSync(objectState: ObjectState): LiveObjectUpdate {
4651
validate(objectState)
4752
// object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation.
4853
// should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object.
@@ -51,7 +56,10 @@ internal abstract class BaseLiveObject(
5156

5257
if (isTombstoned) {
5358
// this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of object state message processing
54-
return mapOf()
59+
if (objectType == ObjectType.Map) {
60+
return noOpMapUpdate
61+
}
62+
return noOpCounterUpdate
5563
}
5664
return applyObjectState(objectState) // RTLM6, RTLC6
5765
}
@@ -74,7 +82,7 @@ internal abstract class BaseLiveObject(
7482
Log.v(
7583
tag,
7684
"Skipping ${objectOperation.action} op: op serial $msgTimeSerial <= site serial ${siteTimeserials[msgSiteCode]}; " +
77-
"objectId=$objectId"
85+
"objectId=$objectId"
7886
)
7987
return
8088
}
@@ -89,11 +97,6 @@ internal abstract class BaseLiveObject(
8997
applyObjectOperation(objectOperation, objectMessage) // RTLC7d
9098
}
9199

92-
internal fun notifyUpdated(update: Any) {
93-
// TODO: Implement event emission for updates
94-
Log.v(tag, "Object $objectId updated: $update")
95-
}
96-
97100
/**
98101
* Checks if an operation can be applied based on serial comparison.
99102
*
@@ -119,7 +122,7 @@ internal abstract class BaseLiveObject(
119122
/**
120123
* Marks the object as tombstoned.
121124
*/
122-
internal fun tombstone(): Any {
125+
internal fun tombstone(): LiveObjectUpdate {
123126
isTombstoned = true
124127
tombstonedAt = System.currentTimeMillis()
125128
val update = clearData()
@@ -150,7 +153,7 @@ internal abstract class BaseLiveObject(
150153
* @return A map describing the changes made to the object's data
151154
*
152155
*/
153-
abstract fun applyObjectState(objectState: ObjectState): Map<String, Any>
156+
abstract fun applyObjectState(objectState: ObjectState): LiveObjectUpdate
154157

155158
/**
156159
* Applies an operation to this live object.
@@ -176,7 +179,14 @@ internal abstract class BaseLiveObject(
176179
*
177180
* @return A map representing the diff of changes made
178181
*/
179-
abstract fun clearData(): Map<String, Any>
182+
abstract fun clearData(): LiveObjectUpdate
183+
184+
/**
185+
* Notifies subscribers about changes made to this live object. Propagates updates through the
186+
* appropriate manager after converting the generic update map to type-specific update objects.
187+
* Spec: RTLO4b4c
188+
*/
189+
abstract fun notifyUpdated(update: LiveObjectUpdate)
180190

181191
/**
182192
* Called during garbage collection intervals to clean up expired entries.

live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,14 @@ import io.ably.lib.objects.*
44
import io.ably.lib.objects.ObjectOperation
55
import io.ably.lib.objects.ObjectState
66
import io.ably.lib.objects.type.BaseLiveObject
7+
import io.ably.lib.objects.type.LiveObjectUpdate
78
import io.ably.lib.objects.type.ObjectType
9+
import io.ably.lib.objects.type.counter.LiveCounter
10+
import io.ably.lib.objects.type.counter.LiveCounterChange
11+
import io.ably.lib.objects.type.counter.LiveCounterUpdate
12+
import io.ably.lib.objects.type.noOp
813
import java.util.concurrent.atomic.AtomicReference
14+
import io.ably.lib.util.Log
915

1016
/**
1117
* Implementation of LiveObject for LiveCounter.
@@ -54,18 +60,35 @@ internal class DefaultLiveCounter private constructor(
5460
return data.get()
5561
}
5662

63+
override fun subscribe(listener: LiveCounterChange.Listener): ObjectsSubscription {
64+
adapter.throwIfInvalidAccessApiConfiguration(channelName)
65+
return liveCounterManager.subscribe(listener)
66+
}
67+
68+
override fun unsubscribe(listener: LiveCounterChange.Listener) = liveCounterManager.unsubscribe(listener)
69+
70+
override fun unsubscribeAll() = liveCounterManager.unsubscribeAll()
71+
5772
override fun validate(state: ObjectState) = liveCounterManager.validate(state)
5873

59-
override fun applyObjectState(objectState: ObjectState): Map<String, Double> {
74+
override fun applyObjectState(objectState: ObjectState): LiveCounterUpdate {
6075
return liveCounterManager.applyState(objectState)
6176
}
6277

6378
override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
6479
liveCounterManager.applyOperation(operation)
6580
}
6681

67-
override fun clearData(): Map<String, Double> {
68-
return mapOf("amount" to data.get()).apply { data.set(0.0) }
82+
override fun clearData(): LiveCounterUpdate {
83+
return LiveCounterUpdate(data.get()).apply { data.set(0.0) }
84+
}
85+
86+
override fun notifyUpdated(update: LiveObjectUpdate) {
87+
if (update.noOp) {
88+
return
89+
}
90+
Log.v(tag, "Object $objectId updated: $update")
91+
liveCounterManager.notify(update as LiveCounterUpdate)
6992
}
7093

7194
override fun onGCInterval() {

live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,19 @@ import io.ably.lib.objects.ObjectOperation
55
import io.ably.lib.objects.ObjectOperationAction
66
import io.ably.lib.objects.ObjectState
77
import io.ably.lib.objects.objectError
8+
import io.ably.lib.objects.type.counter.LiveCounterUpdate
89
import io.ably.lib.util.Log
910

10-
internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
11+
internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter): LiveCounterChangeCoordinator() {
12+
1113
private val objectId = liveCounter.objectId
1214

1315
private val tag = "LiveCounterManager"
1416

1517
/**
1618
* @spec RTLC6 - Overrides counter data with state from sync
1719
*/
18-
internal fun applyState(objectState: ObjectState): Map<String, Double> {
20+
internal fun applyState(objectState: ObjectState): LiveCounterUpdate {
1921
val previousData = liveCounter.data.get()
2022

2123
if (objectState.tombstone) {
@@ -31,7 +33,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
3133
}
3234
}
3335

34-
return mapOf("amount" to (liveCounter.data.get() - previousData))
36+
return LiveCounterUpdate(liveCounter.data.get() - previousData)
3537
}
3638

3739
/**
@@ -51,13 +53,13 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
5153
else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3
5254
}
5355

54-
liveCounter.notifyUpdated(update)
56+
liveCounter.notifyUpdated(update) // RTLC7d1a, RTLC7d2a
5557
}
5658

5759
/**
5860
* @spec RTLC8 - Applies counter create operation
5961
*/
60-
private fun applyCounterCreate(operation: ObjectOperation): Map<String, Double> {
62+
private fun applyCounterCreate(operation: ObjectOperation): LiveCounterUpdate {
6163
if (liveCounter.createOperationIsMerged) {
6264
// RTLC8b
6365
// There can't be two different create operation for the same object id, because the object id
@@ -67,7 +69,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
6769
tag,
6870
"Skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=$objectId"
6971
)
70-
return mapOf()
72+
return noOpCounterUpdate // RTLC8c
7173
}
7274

7375
return mergeInitialDataFromCreateOperation(operation) // RTLC8c
@@ -76,17 +78,17 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
7678
/**
7779
* @spec RTLC9 - Applies counter increment operation
7880
*/
79-
private fun applyCounterInc(counterOp: ObjectCounterOp): Map<String, Double> {
81+
private fun applyCounterInc(counterOp: ObjectCounterOp): LiveCounterUpdate {
8082
val amount = counterOp.amount ?: 0.0
8183
val previousValue = liveCounter.data.get()
8284
liveCounter.data.set(previousValue + amount) // RTLC9b
83-
return mapOf("amount" to amount)
85+
return LiveCounterUpdate(amount)
8486
}
8587

8688
/**
8789
* @spec RTLC10 - Merges initial data from create operation
8890
*/
89-
private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): Map<String, Double> {
91+
private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): LiveCounterUpdate {
9092
// if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case.
9193
// note that it is intentional to SUM the incoming count from the create op.
9294
// if we got here, it means that current counter instance is missing the initial value in its data reference,
@@ -95,7 +97,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
9597
val previousValue = liveCounter.data.get()
9698
liveCounter.data.set(previousValue + count) // RTLC10a
9799
liveCounter.createOperationIsMerged = true // RTLC10b
98-
return mapOf("amount" to count)
100+
return LiveCounterUpdate(count)
99101
}
100102

101103
internal fun validate(state: ObjectState) {

0 commit comments

Comments
 (0)