Skip to content

Commit 5045f8e

Browse files
committed
[ECO-5447] Updated BaseLiveObject#tombstone method to accept serialTimestamp
1. Updated LiveMap and LiveCounter impl to accept serialTimestamp from ObjectMessage 2. Updated unit tests for both LiveMap and LiveCounter
1 parent 091560b commit 5045f8e

12 files changed

Lines changed: 195 additions & 49 deletions

File tree

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
1515
/**
1616
* @spec RTO5 - Sync objects data pool for collecting sync messages
1717
*/
18-
private val syncObjectsDataPool = mutableMapOf<String, ObjectState>()
18+
private val syncObjectsDataPool = mutableMapOf<String, ObjectMessage>()
1919
private var currentSyncId: String? = null
2020
/**
2121
* @spec RTO7 - Buffered object operations during sync
@@ -130,19 +130,20 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
130130
val existingObjectUpdates = mutableListOf<Pair<BaseLiveObject, LiveObjectUpdate>>()
131131

132132
// RTO5c1
133-
for ((objectId, objectState) in syncObjectsDataPool) {
133+
for ((objectId, objectMessage) in syncObjectsDataPool) {
134+
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
134135
receivedObjectIds.add(objectId)
135136
val existingObject = liveObjects.objectsPool.get(objectId)
136137

137138
// RTO5c1a
138139
if (existingObject != null) {
139140
// Update existing object
140-
val update = existingObject.applyObjectSync(objectState) // RTO5c1a1
141+
val update = existingObject.applyObjectSync(objectMessage) // RTO5c1a1
141142
existingObjectUpdates.add(Pair(existingObject, update))
142143
} else { // RTO5c1b
143144
// RTO5c1b1, RTO5c1b1a, RTO5c1b1b - Create new object and add it to the pool
144145
val newObject = createObjectFromState(objectState)
145-
newObject.applyObjectSync(objectState)
146+
newObject.applyObjectSync(objectMessage)
146147
liveObjects.objectsPool.set(objectId, newObject)
147148
}
148149
}
@@ -201,7 +202,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
201202

202203
val objectState: ObjectState = objectMessage.objectState
203204
if (objectState.counter != null || objectState.map != null) {
204-
syncObjectsDataPool[objectState.objectId] = objectState
205+
syncObjectsDataPool[objectState.objectId] = objectMessage
205206
} else {
206207
// RTO5c1b1c - object state must contain either counter or map data
207208
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")

live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ internal abstract class BaseLiveObject(
4747
*
4848
* @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter
4949
*/
50-
internal fun applyObjectSync(objectState: ObjectState): LiveObjectUpdate {
50+
internal fun applyObjectSync(objectMessage: ObjectMessage): LiveObjectUpdate {
51+
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
5152
validate(objectState)
5253
// object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation.
5354
// should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object.
@@ -61,7 +62,7 @@ internal abstract class BaseLiveObject(
6162
}
6263
return noOpCounterUpdate
6364
}
64-
return applyObjectState(objectState) // RTLM6, RTLC6
65+
return applyObjectState(objectState, objectMessage) // RTLM6, RTLC6
6566
}
6667

6768
/**
@@ -122,11 +123,14 @@ internal abstract class BaseLiveObject(
122123
/**
123124
* Marks the object as tombstoned.
124125
*/
125-
internal fun tombstone(): LiveObjectUpdate {
126+
internal fun tombstone(serialTimestamp: Long?): LiveObjectUpdate {
127+
if (serialTimestamp == null) {
128+
Log.w(tag, "Tombstoning object $objectId without serial timestamp, using local timestamp instead")
129+
}
126130
isTombstoned = true
127-
tombstonedAt = System.currentTimeMillis()
131+
tombstonedAt = serialTimestamp?: System.currentTimeMillis()
128132
val update = clearData()
129-
// TODO: Emit lifecycle events
133+
// TODO: Emit BaseLiveObject lifecycle events
130134
return update
131135
}
132136

@@ -153,7 +157,7 @@ internal abstract class BaseLiveObject(
153157
* @return A map describing the changes made to the object's data
154158
*
155159
*/
156-
abstract fun applyObjectState(objectState: ObjectState): LiveObjectUpdate
160+
abstract fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveObjectUpdate
157161

158162
/**
159163
* Applies an operation to this live object.

live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ internal class DefaultLiveCounter private constructor(
7272

7373
override fun validate(state: ObjectState) = liveCounterManager.validate(state)
7474

75-
override fun applyObjectState(objectState: ObjectState): LiveCounterUpdate {
76-
return liveCounterManager.applyState(objectState)
75+
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveCounterUpdate {
76+
return liveCounterManager.applyState(objectState, message.serialTimestamp)
7777
}
7878

7979
override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
80-
liveCounterManager.applyOperation(operation)
80+
liveCounterManager.applyOperation(operation, message.serialTimestamp)
8181
}
8282

8383
override fun clearData(): LiveCounterUpdate {

live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
1717
/**
1818
* @spec RTLC6 - Overrides counter data with state from sync
1919
*/
20-
internal fun applyState(objectState: ObjectState): LiveCounterUpdate {
20+
internal fun applyState(objectState: ObjectState, serialTimestamp: Long?): LiveCounterUpdate {
2121
val previousData = liveCounter.data.get()
2222

2323
if (objectState.tombstone) {
24-
liveCounter.tombstone()
24+
liveCounter.tombstone(serialTimestamp)
2525
} else {
2626
// override data for this object with data from the object state
2727
liveCounter.createOperationIsMerged = false // RTLC6b
@@ -39,7 +39,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
3939
/**
4040
* @spec RTLC7 - Applies operations to LiveCounter
4141
*/
42-
internal fun applyOperation(operation: ObjectOperation) {
42+
internal fun applyOperation(operation: ObjectOperation, serialTimestamp: Long?) {
4343
val update = when (operation.action) {
4444
ObjectOperationAction.CounterCreate -> applyCounterCreate(operation) // RTLC7d1
4545
ObjectOperationAction.CounterInc -> {
@@ -49,7 +49,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
4949
throw objectError("No payload found for ${operation.action} op for LiveCounter objectId=${objectId}")
5050
}
5151
}
52-
ObjectOperationAction.ObjectDelete -> liveCounter.tombstone()
52+
ObjectOperationAction.ObjectDelete -> liveCounter.tombstone(serialTimestamp)
5353
else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3
5454
}
5555

live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ internal class DefaultLiveMap private constructor(
119119

120120
override fun unsubscribeAll() = liveMapManager.unsubscribeAll()
121121

122-
override fun applyObjectState(objectState: ObjectState): LiveMapUpdate {
123-
return liveMapManager.applyState(objectState)
122+
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveMapUpdate {
123+
return liveMapManager.applyState(objectState, message.serialTimestamp)
124124
}
125125

126126
override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {

live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
2020
/**
2121
* @spec RTLM6 - Overrides object data with state from sync
2222
*/
23-
internal fun applyState(objectState: ObjectState): LiveMapUpdate {
23+
internal fun applyState(objectState: ObjectState, serialTimestamp: Long?): LiveMapUpdate {
2424
val previousData = liveMap.data.toMap()
2525

2626
if (objectState.tombstone) {
27-
liveMap.tombstone()
27+
liveMap.tombstone(serialTimestamp)
2828
} else {
2929
// override data for this object with data from the object state
3030
liveMap.createOperationIsMerged = false // RTLM6b
@@ -68,7 +68,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
6868
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
6969
}
7070
}
71-
ObjectOperationAction.ObjectDelete -> liveMap.tombstone()
71+
ObjectOperationAction.ObjectDelete -> liveMap.tombstone(serialTimestamp)
7272
else -> throw objectError("Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4
7373
}
7474

live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ internal fun ObjectsPool.size(): Int {
5757
return pool.size
5858
}
5959

60+
internal val BaseLiveObject.TombstonedAt: Long?
61+
get() = this.getPrivateField("tombstonedAt")
62+
6063
/**
6164
* ======================================
6265
* START - DefaultLiveObjects dep mocks

live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,17 @@ class ObjectsManagerTest {
8080
val testObject1 = objectsPool.get("map:testObject@1")
8181
assertNotNull(testObject1, "map:testObject@1 should exist in pool after sync")
8282
verify(exactly = 1) {
83-
testObject1.applyObjectSync(any<ObjectState>())
83+
testObject1.applyObjectSync(any<ObjectMessage>())
8484
}
8585
val testObject2 = objectsPool.get("counter:testObject@2")
8686
assertNotNull(testObject2, "counter:testObject@2 should exist in pool after sync")
8787
verify(exactly = 1) {
88-
testObject2.applyObjectSync(any<ObjectState>())
88+
testObject2.applyObjectSync(any<ObjectMessage>())
8989
}
9090
val testObject3 = objectsPool.get("map:testObject@3")
9191
assertNotNull(testObject3, "map:testObject@3 should exist in pool after sync")
9292
verify(exactly = 1) {
93-
testObject3.applyObjectSync(any<ObjectState>())
93+
testObject3.applyObjectSync(any<ObjectMessage>())
9494
}
9595
}
9696

live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,15 @@ class DefaultLiveCounterTest {
2626
siteTimeserials = mapOf("site3" to "serial3", "site4" to "serial4"),
2727
tombstone = false,
2828
)
29-
liveCounter.applyObjectSync(objectState)
29+
30+
val objectMessage = ObjectMessage(
31+
id = "testId",
32+
objectState = objectState,
33+
serial = "serial1",
34+
siteCode = "site1"
35+
)
36+
37+
liveCounter.applyObjectSync(objectMessage)
3038
assertEquals(mapOf("site3" to "serial3", "site4" to "serial4"), liveCounter.siteTimeserials) // RTLC6a
3139
}
3240

live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package io.ably.lib.objects.unit.type.livecounter
22

33
import io.ably.lib.objects.*
44
import io.ably.lib.objects.unit.LiveCounterManager
5+
import io.ably.lib.objects.unit.TombstonedAt
56
import io.ably.lib.objects.unit.getDefaultLiveCounterWithMockedDeps
67
import io.ably.lib.types.AblyException
78
import org.junit.Test
@@ -24,7 +25,7 @@ class DefaultLiveCounterManagerTest {
2425
tombstone = false,
2526
)
2627

27-
val update = liveCounterManager.applyState(objectState)
28+
val update = liveCounterManager.applyState(objectState, null)
2829

2930
assertFalse(liveCounter.createOperationIsMerged) // RTLC6b
3031
assertEquals(25.0, liveCounter.data.get()) // RTLC6c
@@ -55,7 +56,7 @@ class DefaultLiveCounterManagerTest {
5556
)
5657

5758
// RTLC6d - Merge initial data from create operation
58-
val update = liveCounterManager.applyState(objectState)
59+
val update = liveCounterManager.applyState(objectState, null)
5960

6061
assertEquals(25.0, liveCounter.data.get()) // 15 from state + 10 from create op
6162
assertEquals(20.0, update.update.amount) // Total change
@@ -75,7 +76,7 @@ class DefaultLiveCounterManagerTest {
7576

7677
// RTLC7d3 - Should throw error for unsupported action
7778
val exception = assertFailsWith<AblyException> {
78-
liveCounterManager.applyOperation(operation)
79+
liveCounterManager.applyOperation(operation, null)
7980
}
8081

8182
val errorInfo = exception.errorInfo
@@ -96,7 +97,7 @@ class DefaultLiveCounterManagerTest {
9697
)
9798

9899
// RTLC7d1 - Apply counter create operation
99-
liveCounterManager.applyOperation(operation)
100+
liveCounterManager.applyOperation(operation, null)
100101

101102
assertEquals(20.0, liveCounter.data.get()) // Should be set to counter count
102103
assertTrue(liveCounter.createOperationIsMerged) // Should be marked as merged
@@ -119,7 +120,7 @@ class DefaultLiveCounterManagerTest {
119120
)
120121

121122
// RTLC8b - Should skip if already merged
122-
liveCounterManager.applyOperation(operation)
123+
liveCounterManager.applyOperation(operation, null)
123124

124125
assertEquals(4.0, liveCounter.data.get()) // Should not change (still 0)
125126
assertTrue(liveCounter.createOperationIsMerged) // Should remain merged
@@ -142,7 +143,7 @@ class DefaultLiveCounterManagerTest {
142143
)
143144

144145
// RTLC8c - Should apply if not merged
145-
liveCounterManager.applyOperation(operation)
146+
liveCounterManager.applyOperation(operation, null)
146147
assertTrue(liveCounter.createOperationIsMerged) // Should be marked as merged
147148

148149
assertEquals(30.0, liveCounter.data.get()) // Should be set to counter count
@@ -165,7 +166,7 @@ class DefaultLiveCounterManagerTest {
165166

166167
// RTLC10a - Should default to 0
167168
// RTLC10b - Mark as merged
168-
liveCounterManager.applyOperation(operation)
169+
liveCounterManager.applyOperation(operation, null)
169170

170171
assertEquals(10.0, liveCounter.data.get()) // No change (null defaults to 0)
171172
assertTrue(liveCounter.createOperationIsMerged) // RTLC10b
@@ -186,7 +187,7 @@ class DefaultLiveCounterManagerTest {
186187
)
187188

188189
// RTLC7d2 - Apply counter increment operation
189-
liveCounterManager.applyOperation(operation)
190+
liveCounterManager.applyOperation(operation, null)
190191

191192
assertEquals(15.0, liveCounter.data.get()) // RTLC9b - 10 + 5
192193
}
@@ -204,7 +205,7 @@ class DefaultLiveCounterManagerTest {
204205

205206
// RTLC7d2 - Should throw error for missing payload
206207
val exception = assertFailsWith<AblyException> {
207-
liveCounterManager.applyOperation(operation)
208+
liveCounterManager.applyOperation(operation, null)
208209
}
209210

210211
val errorInfo = exception.errorInfo
@@ -229,7 +230,7 @@ class DefaultLiveCounterManagerTest {
229230
action = ObjectOperationAction.CounterInc,
230231
objectId = "testCounterId",
231232
counterOp = counterOp
232-
))
233+
), null)
233234

234235
assertEquals(17.0, liveCounter.data.get()) // 10 + 7
235236
}
@@ -249,8 +250,63 @@ class DefaultLiveCounterManagerTest {
249250
action = ObjectOperationAction.CounterInc,
250251
objectId = "testCounterId",
251252
counterOp = counterOp
252-
))
253+
), null)
253254

254255
assertEquals(10.0, liveCounter.data.get()) // Should not change (null defaults to 0)
255256
}
257+
258+
@Test
259+
fun `(RTLC6, OM2j) DefaultLiveCounter should handle tombstone with serialTimestamp in state`() {
260+
val liveCounter = getDefaultLiveCounterWithMockedDeps()
261+
val liveCounterManager = liveCounter.LiveCounterManager
262+
263+
// Set initial data
264+
liveCounter.data.set(10.0)
265+
266+
val expectedTimestamp = 1234567890L
267+
val objectState = ObjectState(
268+
objectId = "testCounterId",
269+
counter = null, // Null counter for tombstone
270+
siteTimeserials = mapOf("site1" to "serial1"),
271+
tombstone = true, // Object is tombstoned
272+
)
273+
274+
val update = liveCounterManager.applyState(objectState, expectedTimestamp)
275+
276+
assertTrue(liveCounter.isTombstoned) // Should be tombstoned
277+
assertEquals(expectedTimestamp, liveCounter.TombstonedAt) // Should use provided timestamp
278+
assertEquals(0.0, liveCounter.data.get()) // Should be reset after tombstone
279+
280+
// Assert on update field - should show the change
281+
assertEquals(-10.0, update.update.amount) // Difference from 10.0 to 0.0
282+
}
283+
284+
@Test
285+
fun `(RTLC6, OM2j) DefaultLiveCounter should handle tombstone without serialTimestamp in state`() {
286+
val liveCounter = getDefaultLiveCounterWithMockedDeps()
287+
val liveCounterManager = liveCounter.LiveCounterManager
288+
289+
// Set initial data
290+
liveCounter.data.set(10.0)
291+
292+
val objectState = ObjectState(
293+
objectId = "testCounterId",
294+
counter = null, // Null counter for tombstone
295+
siteTimeserials = mapOf("site1" to "serial1"),
296+
tombstone = true, // Object is tombstoned
297+
)
298+
299+
val beforeOperation = System.currentTimeMillis()
300+
val update = liveCounterManager.applyState(objectState, null)
301+
val afterOperation = System.currentTimeMillis()
302+
303+
assertTrue(liveCounter.isTombstoned) // Should be tombstoned
304+
assertNotNull(liveCounter.TombstonedAt) // Should have timestamp
305+
assertTrue(liveCounter.TombstonedAt!! >= beforeOperation) // Should be after operation start
306+
assertTrue(liveCounter.TombstonedAt!! <= afterOperation) // Should be before operation end
307+
assertEquals(0.0, liveCounter.data.get()) // Should be reset after tombstone
308+
309+
// Assert on update field - should show the change
310+
assertEquals(-10.0, update.update.amount) // Difference from 10.0 to 0.0
311+
}
256312
}

0 commit comments

Comments
 (0)