Skip to content

Commit 95135d5

Browse files
committed
[AIT-208] feat: partial sync implementation
- rename `syncObjectsDataPool` to `syncObjectsPool` - Standardized naming across `ObjectsManager` to improve code clarity. - Updated references in implementation and tests.
1 parent 74bf748 commit 95135d5

5 files changed

Lines changed: 194 additions & 26 deletions

File tree

liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
313313
// if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel.
314314
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
315315
objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2
316-
objectsManager.clearSyncObjectsDataPool() // RTO4b3
316+
objectsManager.clearSyncObjectsPool() // RTO4b3
317317
objectsManager.clearBufferedObjectOperations() // RTO4b5
318318
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
319319
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
@@ -338,7 +338,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
338338
if (state != ChannelState.suspended) {
339339
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
340340
objectsPool.clearObjectsData(false)
341-
objectsManager.clearSyncObjectsDataPool()
341+
objectsManager.clearSyncObjectsPool()
342342
}
343343
}
344344
else -> {

liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
1717
/**
1818
* @spec RTO5 - Sync objects data pool for collecting sync messages
1919
*/
20-
private val syncObjectsDataPool = mutableMapOf<String, ObjectMessage>()
20+
private val syncObjectsPool = mutableMapOf<String, ObjectMessage>()
2121
private var currentSyncId: String? = null
2222
/**
2323
* @spec RTO7 - Buffered object operations during sync
@@ -59,7 +59,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
5959
}
6060

6161
// RTO5a3 - continue current sync sequence
62-
applyObjectSyncMessages(objectMessages) // RTO5b
62+
applyObjectSyncMessages(objectMessages) // RTO5f
6363

6464
// RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync
6565
if (syncTracker.hasSyncEnded()) {
@@ -79,7 +79,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
7979

8080
// need to discard all buffered object operation messages on new sync start
8181
bufferedObjectOperations.clear() // RTO5a2b
82-
syncObjectsDataPool.clear() // RTO5a2a
82+
syncObjectsPool.clear() // RTO5a2a
8383
currentSyncId = syncId
8484
syncCompletionWaiter = CompletableDeferred()
8585
stateChange(ObjectsState.Syncing)
@@ -95,7 +95,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
9595
applySync() // RTO5c1/2/7
9696
applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6
9797
bufferedObjectOperations.clear() // RTO5c5
98-
syncObjectsDataPool.clear() // RTO5c4
98+
syncObjectsPool.clear() // RTO5c4
9999
currentSyncId = null // RTO5c3
100100
realtimeObjects.appliedOnAckSerials.clear() // RTO5c9
101101
stateChange(ObjectsState.Synced) // RTO5c8
@@ -129,8 +129,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
129129
* Clears the sync objects data pool.
130130
* Used by DefaultRealtimeObjects.handleStateChange.
131131
*/
132-
internal fun clearSyncObjectsDataPool() {
133-
syncObjectsDataPool.clear()
132+
internal fun clearSyncObjectsPool() {
133+
syncObjectsPool.clear()
134134
}
135135

136136
/**
@@ -147,7 +147,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
147147
* @spec RTO5c - Processes sync data and updates objects pool
148148
*/
149149
private fun applySync() {
150-
if (syncObjectsDataPool.isEmpty()) {
150+
if (syncObjectsPool.isEmpty()) {
151151
return
152152
}
153153

@@ -156,7 +156,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
156156
val existingObjectUpdates = mutableListOf<Pair<BaseRealtimeObject, ObjectUpdate>>()
157157

158158
// RTO5c1
159-
for ((objectId, objectMessage) in syncObjectsDataPool) {
159+
for ((objectId, objectMessage) in syncObjectsPool) {
160160
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
161161
receivedObjectIds.add(objectId)
162162
val existingObject = realtimeObjects.objectsPool.get(objectId)
@@ -234,9 +234,9 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
234234
}
235235

236236
/**
237-
* Applies sync messages to sync data pool.
237+
* Applies sync messages to sync data pool, merging partial sync messages for the same objectId.
238238
*
239-
* @spec RTO5b - Collects object states during sync sequence
239+
* @spec RTO5f - Collects and merges object states during sync sequence
240240
*/
241241
private fun applyObjectSyncMessages(objectMessages: List<ObjectMessage>) {
242242
for (objectMessage in objectMessages) {
@@ -246,11 +246,44 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
246246
}
247247

248248
val objectState: ObjectState = objectMessage.objectState
249-
if (objectState.counter != null || objectState.map != null) {
250-
syncObjectsDataPool[objectState.objectId] = objectMessage
251-
} else {
252-
// RTO5c1b1c - object state must contain either counter or map data
253-
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
249+
val objectId = objectState.objectId
250+
val existingEntry = syncObjectsPool[objectId]
251+
252+
if (existingEntry == null) {
253+
// RTO5f1 - objectId not in pool, store directly
254+
if (objectState.counter != null || objectState.map != null) {
255+
syncObjectsPool[objectId] = objectMessage
256+
} else {
257+
// RTO5c1b1c - object state must contain either counter or map data
258+
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
259+
}
260+
continue
261+
}
262+
263+
// RTO5f2 - objectId already in pool; this is a partial sync message, merge based on type
264+
when {
265+
objectState.map != null -> {
266+
// RTO5f2a - map object: merge entries
267+
if (objectState.tombstone) {
268+
// RTO5f2a1 - tombstone: replace pool entry entirely
269+
syncObjectsPool[objectId] = objectMessage
270+
} else {
271+
// RTO5f2a2 - merge map entries; server guarantees no duplicate keys across partials
272+
val existingState = existingEntry.objectState!! // non-null for existing entry
273+
val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty()
274+
val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries)
275+
val mergedState = existingState.copy(map = mergedMap)
276+
syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)
277+
}
278+
}
279+
objectState.counter != null -> {
280+
// RTO5f2b - counter objects must never be split across messages
281+
Log.e(tag, "Received partial sync message for a counter object, skipping: ${objectMessage.id}")
282+
}
283+
else -> {
284+
// RTO5f2c - unsupported type, log warning and skip
285+
Log.w(tag, "Received partial sync message for an unsupported object type, skipping: ${objectMessage.id}")
286+
}
254287
}
255288
}
256289
}
@@ -286,7 +319,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
286319

287320
internal fun dispose() {
288321
syncCompletionWaiter?.cancel()
289-
syncObjectsDataPool.clear()
322+
syncObjectsPool.clear()
290323
bufferedObjectOperations.clear()
291324
disposeObjectsStateListeners()
292325
}

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ internal val BaseRealtimeObject.TombstonedAt: Long?
6868
* START - DefaultRealtimeObjects dep mocks
6969
* ======================================
7070
*/
71-
internal val ObjectsManager.SyncObjectsDataPool: Map<String, ObjectState>
72-
get() = this.getPrivateField("syncObjectsDataPool")
71+
internal val ObjectsManager.SyncObjectsPool: Map<String, ObjectMessage>
72+
get() = this.getPrivateField("syncObjectsPool")
7373

7474
internal val ObjectsManager.BufferedObjectOperations: List<ObjectMessage>
7575
get() = this.getPrivateField("bufferedObjectOperations")

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import io.ably.lib.objects.type.livemap.DefaultLiveMap
1313
import io.ably.lib.objects.type.livemap.LiveMapEntry
1414
import io.ably.lib.objects.unit.BufferedObjectOperations
1515
import io.ably.lib.objects.unit.ObjectsManager
16-
import io.ably.lib.objects.unit.SyncObjectsDataPool
16+
import io.ably.lib.objects.unit.SyncObjectsPool
1717
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
1818
import io.ably.lib.objects.unit.getMockRealtimeChannel
1919
import io.ably.lib.objects.unit.size
@@ -72,7 +72,7 @@ class DefaultRealtimeObjectsTest {
7272
defaultRealtimeObjects.ObjectsManager.endSync()
7373
}
7474

75-
assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsDataPool.size) // RTO4b3
75+
assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsPool.size) // RTO4b3
7676
assertEquals(0, defaultRealtimeObjects.ObjectsManager.BufferedObjectOperations.size) // RTO4b5
7777
assertEquals(1, defaultRealtimeObjects.objectsPool.size()) // RTO4b1 - Only root remains
7878
assertEquals(rootObject, defaultRealtimeObjects.objectsPool.get(ROOT_OBJECT_ID)) // points to previously created root object
@@ -235,16 +235,16 @@ class DefaultRealtimeObjectsTest {
235235
failCalled.await()
236236

237237
verify(exactly = 0) { defaultRealtimeObjects.objectsPool.clearObjectsData(any()) }
238-
verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
238+
verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() }
239239
}
240240

241241
@Test
242242
fun `(RTO4) handleStateChange(DETACHED) clears objects data and sync pool`() = runTest {
243243
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
244244

245-
// Use clearSyncObjectsDataPool (the last operation in the coroutine) as the completion signal
245+
// Use clearSyncObjectsPool (the last operation in the coroutine) as the completion signal
246246
val syncPoolCleared = CompletableDeferred<Unit>()
247-
every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() } answers {
247+
every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() } answers {
248248
callOriginal()
249249
syncPoolCleared.complete(Unit)
250250
}
@@ -254,7 +254,7 @@ class DefaultRealtimeObjectsTest {
254254
syncPoolCleared.await()
255255

256256
verify(exactly = 1) { defaultRealtimeObjects.objectsPool.clearObjectsData(false) }
257-
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
257+
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() }
258258
}
259259

260260
@Test

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import io.ably.lib.objects.unit.*
1212
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
1313
import io.ably.lib.types.AblyException
1414
import io.ably.lib.types.ErrorInfo
15+
import io.ably.lib.util.Log
1516
import io.mockk.*
1617
import kotlinx.coroutines.launch
1718
import kotlinx.coroutines.test.runTest
@@ -655,6 +656,140 @@ class ObjectsManagerTest {
655656
"appliedOnAckSerials should be empty when serials length mismatches")
656657
}
657658

659+
@Test
660+
fun `(RTO5f2a2) partial sync map entries are merged across two messages with the same objectId`() {
661+
val defaultRealtimeObjects = makeRealtimeObjects()
662+
val objectsManager = defaultRealtimeObjects.ObjectsManager
663+
664+
val msg1 = ObjectMessage(
665+
id = "msg1",
666+
objectState = ObjectState(
667+
objectId = "map:test@1",
668+
tombstone = false,
669+
siteTimeserials = mapOf("site1" to "serial1"),
670+
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1"))))
671+
)
672+
)
673+
val msg2 = ObjectMessage(
674+
id = "msg2",
675+
objectState = ObjectState(
676+
objectId = "map:test@1",
677+
tombstone = false,
678+
siteTimeserials = mapOf("site1" to "serial2"),
679+
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key2" to ObjectsMapEntry(data = ObjectData(string = "value2"))))
680+
)
681+
)
682+
683+
objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")
684+
685+
val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap
686+
assertNotNull(liveMap.data["key1"], "key1 should be present after merge")
687+
assertNotNull(liveMap.data["key2"], "key2 should be present after merge")
688+
assertEquals("value1", liveMap.data["key1"]?.data?.string)
689+
assertEquals("value2", liveMap.data["key2"]?.data?.string)
690+
}
691+
692+
@Test
693+
fun `(RTO5f2a1) tombstone on second partial message replaces pool entry entirely`() {
694+
val defaultRealtimeObjects = makeRealtimeObjects()
695+
val objectsManager = defaultRealtimeObjects.ObjectsManager
696+
697+
val msg1 = ObjectMessage(
698+
id = "msg1",
699+
objectState = ObjectState(
700+
objectId = "map:test@1",
701+
tombstone = false,
702+
siteTimeserials = mapOf("site1" to "serial1"),
703+
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1"))))
704+
)
705+
)
706+
val msg2 = ObjectMessage(
707+
id = "msg2",
708+
objectState = ObjectState(
709+
objectId = "map:test@1",
710+
tombstone = true,
711+
siteTimeserials = mapOf("site1" to "serial2"),
712+
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = emptyMap())
713+
)
714+
)
715+
716+
objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")
717+
718+
val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap
719+
// After tombstone replaces the entry, the map should have no key1
720+
assertNull(liveMap.data["key1"], "key1 should not be present after tombstone replaced the pool entry")
721+
}
722+
723+
@Test
724+
fun `(RTO5f2b) partial sync counter message logs error and is skipped`() {
725+
val defaultRealtimeObjects = makeRealtimeObjects()
726+
val objectsManager = defaultRealtimeObjects.ObjectsManager
727+
728+
mockkStatic(Log::class)
729+
every { Log.e(any(), any<String>()) } returns 0
730+
731+
val msg1 = ObjectMessage(
732+
id = "msg1",
733+
objectState = ObjectState(
734+
objectId = "counter:test@1",
735+
tombstone = false,
736+
siteTimeserials = mapOf("site1" to "serial1"),
737+
counter = ObjectsCounter(count = 10.0)
738+
)
739+
)
740+
val msg2 = ObjectMessage(
741+
id = "msg2",
742+
objectState = ObjectState(
743+
objectId = "counter:test@1",
744+
tombstone = false,
745+
siteTimeserials = mapOf("site1" to "serial2"),
746+
counter = ObjectsCounter(count = 5.0)
747+
)
748+
)
749+
750+
objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")
751+
752+
// Pool should contain only msg1 (msg2 skipped)
753+
val counter = defaultRealtimeObjects.objectsPool.get("counter:test@1") as DefaultLiveCounter
754+
assertEquals(10.0, counter.data.get(), "counter value should be from msg1 only (msg2 skipped)")
755+
verify { Log.e(any(), match<String> { it.contains("partial sync message for a counter") }) }
756+
}
757+
758+
@Test
759+
fun `(RTO5f2c) partial sync message with unsupported type logs warning and is skipped`() {
760+
val defaultRealtimeObjects = makeRealtimeObjects()
761+
val objectsManager = defaultRealtimeObjects.ObjectsManager
762+
763+
mockkStatic(Log::class)
764+
every { Log.w(any(), any<String>()) } returns 0
765+
766+
val msg1 = ObjectMessage(
767+
id = "msg1",
768+
objectState = ObjectState(
769+
objectId = "map:test@1",
770+
tombstone = false,
771+
siteTimeserials = mapOf("site1" to "serial1"),
772+
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1"))))
773+
)
774+
)
775+
// msg2 has neither map nor counter — hits the else branch (RTO5f2c)
776+
val msg2 = ObjectMessage(
777+
id = "msg2",
778+
objectState = ObjectState(
779+
objectId = "map:test@1",
780+
tombstone = false,
781+
siteTimeserials = mapOf("site1" to "serial2"),
782+
)
783+
)
784+
785+
objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")
786+
787+
// Pool entry should still be msg1 (msg2 was skipped)
788+
val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap
789+
assertNotNull(liveMap.data["key1"], "key1 should still be present (msg2 skipped)")
790+
verify { Log.w(any(), match<String> { it.contains("unsupported object type") }) }
791+
}
792+
658793
private fun mockZeroValuedObjects() {
659794
mockkObject(DefaultLiveMap.Companion)
660795
every {

0 commit comments

Comments
 (0)