Skip to content

Commit ea874a9

Browse files
committed
[AIT-296] fix: clear buffered operations unconditionally on ATTACHED (RTO4d)
- Ensured `BufferedObjectOperations` is always cleared when channel state transitions to ATTACHED, regardless of the `hasObjects` flag. - Updated tests to verify buffer clearing behavior (RTO4d).
1 parent 33855f8 commit ea874a9

3 files changed

Lines changed: 156 additions & 4 deletions

File tree

liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,8 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
296296
ChannelState.attached -> {
297297
Log.v(tag, "Objects.onAttached() channel=$channelName, hasObjects=$hasObjects")
298298

299+
objectsManager.clearBufferedObjectOperations() // RTO4d - clear unconditionally on ATTACHED
300+
299301
// RTO4a
300302
val fromInitializedState = this@DefaultRealtimeObjects.state == ObjectsState.Initialized
301303
if (hasObjects || fromInitializedState) {
@@ -310,7 +312,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
310312
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
311313
objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2
312314
objectsManager.clearSyncObjectsDataPool() // RTO4b3
313-
objectsManager.clearBufferedObjectOperations() // RTO4b5
315+
// RTO4b5 removed — buffer already cleared by RTO4d above
314316
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
315317
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
316318
objectsManager.endSync() // RTO4b4

liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
7777
internal fun startNewSync(syncId: String?) {
7878
Log.v(tag, "Starting new sync sequence: syncId=$syncId")
7979

80-
// need to discard all buffered object operation messages on new sync start
81-
bufferedObjectOperations.clear() // RTO5a2b
8280
syncObjectsDataPool.clear() // RTO5a2a
8381
currentSyncId = syncId
8482
syncCompletionWaiter = CompletableDeferred()

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import io.ably.lib.objects.type.livemap.LiveMapEntry
1515
import io.ably.lib.objects.unit.BufferedObjectOperations
1616
import io.ably.lib.objects.unit.ObjectsManager
1717
import io.ably.lib.objects.unit.SyncObjectsDataPool
18+
import io.ably.lib.objects.unit.getMockObjectsAdapter
1819
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
1920
import io.ably.lib.objects.unit.getMockRealtimeChannel
2021
import io.ably.lib.objects.unit.size
@@ -26,12 +27,22 @@ import io.mockk.every
2627
import io.mockk.verify
2728
import kotlinx.coroutines.CompletableDeferred
2829
import kotlinx.coroutines.test.runTest
30+
import org.junit.After
2931
import org.junit.Test
3032
import kotlin.test.assertEquals
3133
import kotlin.test.assertNotNull
3234

3335
class DefaultRealtimeObjectsTest {
3436

37+
private val testInstances = mutableListOf<DefaultRealtimeObjects>()
38+
39+
@After
40+
fun tearDown() {
41+
val cleanupError = AblyException.fromErrorInfo(ErrorInfo("test cleanup", 500))
42+
testInstances.forEach { it.dispose(cleanupError) }
43+
testInstances.clear()
44+
}
45+
3546
@Test
3647
fun `(RTO4, RTO4a) When channel ATTACHED with HAS_OBJECTS flag true should start sync sequence`() = runTest {
3748
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
@@ -74,7 +85,7 @@ class DefaultRealtimeObjectsTest {
7485
}
7586

7687
assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsDataPool.size) // RTO4b3
77-
assertEquals(0, defaultRealtimeObjects.ObjectsManager.BufferedObjectOperations.size) // RTO4b5
88+
assertEquals(0, defaultRealtimeObjects.ObjectsManager.BufferedObjectOperations.size) // RTO4d
7889
assertEquals(1, defaultRealtimeObjects.objectsPool.size()) // RTO4b1 - Only root remains
7990
assertEquals(rootObject, defaultRealtimeObjects.objectsPool.get(ROOT_OBJECT_ID)) // points to previously created root object
8091
assertEquals(0, rootObject.data.size) // RTO4b2 - root object must be empty
@@ -258,6 +269,147 @@ class DefaultRealtimeObjectsTest {
258269
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
259270
}
260271

272+
@Test
273+
fun `(RTO4d) ATTACHED with hasObjects=true still clears bufferedObjectOperations`() = runTest {
274+
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
275+
val manager = defaultRealtimeObjects.ObjectsManager
276+
277+
// Pre-populate bufferedObjectOperations with a dummy operation
278+
@Suppress("UNCHECKED_CAST")
279+
(manager.BufferedObjectOperations as MutableList<ObjectMessage>).add(
280+
ObjectMessage(
281+
id = "pre-attach-op",
282+
operation = ObjectOperation(
283+
action = ObjectOperationAction.CounterInc,
284+
objectId = "counter:test@1",
285+
counterOp = ObjectsCounterOp(amount = 5.0)
286+
)
287+
)
288+
)
289+
assertEquals(1, manager.BufferedObjectOperations.size)
290+
291+
// ATTACHED with hasObjects=true — RTO4d must clear the buffer before starting sync
292+
defaultRealtimeObjects.handleStateChange(ChannelState.attached, true)
293+
294+
assertWaiter { defaultRealtimeObjects.state == ObjectsState.Syncing }
295+
assertEquals(0, manager.BufferedObjectOperations.size, "RTO4d - buffer must be cleared unconditionally on ATTACHED")
296+
}
297+
298+
@Test
299+
fun `(RTO4d) Pre-ATTACHED buffered operations are discarded, not applied after sync`() = runTest {
300+
val defaultRealtimeObjects = DefaultRealtimeObjects("testChannel", getMockObjectsAdapter())
301+
.also { testInstances.add(it) }
302+
303+
// Set up a counter in the pool
304+
val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
305+
defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
306+
307+
val objectsManager = defaultRealtimeObjects.ObjectsManager
308+
309+
// Pre-populate bufferedObjectOperations with a COUNTER_INC — simulates an op received before ATTACHED
310+
@Suppress("UNCHECKED_CAST")
311+
(objectsManager.BufferedObjectOperations as MutableList<ObjectMessage>).add(
312+
ObjectMessage(
313+
id = "pre-attach-inc",
314+
operation = ObjectOperation(
315+
action = ObjectOperationAction.CounterInc,
316+
objectId = "counter:test@1",
317+
counterOp = ObjectsCounterOp(amount = 5.0)
318+
)
319+
)
320+
)
321+
assertEquals(1, objectsManager.BufferedObjectOperations.size)
322+
323+
// ATTACHED with hasObjects=true: RTO4d clears the buffer, then starts sync
324+
defaultRealtimeObjects.handleStateChange(ChannelState.attached, true)
325+
assertWaiter { defaultRealtimeObjects.state == ObjectsState.Syncing }
326+
assertEquals(0, objectsManager.BufferedObjectOperations.size, "buffer must be cleared by RTO4d")
327+
328+
// Complete sync by calling handleObjectSyncMessages directly (sequentialScope is idle now)
329+
objectsManager.handleObjectSyncMessages(
330+
listOf(
331+
ObjectMessage(
332+
id = "sync-msg-1",
333+
objectState = ObjectState(
334+
objectId = "counter:test@1",
335+
tombstone = false,
336+
siteTimeserials = mapOf("site1" to "serial1"),
337+
counter = ObjectsCounter(count = 0.0)
338+
)
339+
)
340+
),
341+
"sync-id:" // empty cursor — ends sync (RTO5a4)
342+
)
343+
344+
assertEquals(ObjectsState.Synced, defaultRealtimeObjects.state)
345+
346+
// The pre-ATTACHED COUNTER_INC was discarded — counter should remain at 0
347+
assertEquals(0.0, counter.data.get(), "RTO4d - pre-ATTACHED buffered op must be discarded, not applied after sync")
348+
}
349+
350+
@Test
351+
fun `(RTO5a2b removed) Buffered operations survive a server-initiated resync (new OBJECT_SYNC without ATTACHED)`() {
352+
val defaultRealtimeObjects = DefaultRealtimeObjects("testChannel", getMockObjectsAdapter())
353+
.also { testInstances.add(it) }
354+
355+
// Set up a counter in the pool
356+
val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
357+
counter.data.set(5.0)
358+
defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
359+
360+
val objectsManager = defaultRealtimeObjects.ObjectsManager
361+
362+
// sync-1 is in progress
363+
objectsManager.startNewSync("sync-1")
364+
assertEquals(ObjectsState.Syncing, defaultRealtimeObjects.state)
365+
366+
// Buffer a COUNTER_INC during sync-1
367+
objectsManager.handleObjectMessages(
368+
listOf(
369+
ObjectMessage(
370+
id = "channel-op-1",
371+
operation = ObjectOperation(
372+
action = ObjectOperationAction.CounterInc,
373+
objectId = "counter:test@1",
374+
counterOp = ObjectsCounterOp(amount = 3.0)
375+
),
376+
serial = "serial-op-1",
377+
siteCode = "site1"
378+
)
379+
)
380+
)
381+
assertEquals(1, objectsManager.BufferedObjectOperations.size, "op buffered during sync-1")
382+
383+
// Server sends a new OBJECT_SYNC with a different sync-id — triggers startNewSync("sync-2") internally
384+
// OLD behaviour (RTO5a2b): startNewSync would have cleared bufferedObjectOperations here
385+
// NEW behaviour (RTO5a2b removed): buffer is preserved
386+
objectsManager.handleObjectSyncMessages(
387+
listOf(
388+
ObjectMessage(
389+
id = "sync2-msg-1",
390+
objectState = ObjectState(
391+
objectId = "counter:test@1",
392+
tombstone = false,
393+
siteTimeserials = mapOf("site1" to "resync-serial"),
394+
counter = ObjectsCounter(count = 5.0)
395+
)
396+
)
397+
),
398+
"sync-2:cursor-1" // has cursor — not ending yet
399+
)
400+
401+
assertEquals(1, objectsManager.BufferedObjectOperations.size,
402+
"startNewSync must NOT clear bufferedObjectOperations (RTO5a2b removed)")
403+
404+
// Complete sync-2 (ending serial, no new messages)
405+
objectsManager.handleObjectSyncMessages(emptyList(), "sync-2:")
406+
407+
assertEquals(ObjectsState.Synced, defaultRealtimeObjects.state)
408+
// sync-2 restored counter to 5.0; buffered COUNTER_INC (+3.0) applied after sync → 8.0
409+
assertEquals(8.0, counter.data.get(),
410+
"buffered COUNTER_INC from before server-initiated resync must be applied after sync completes")
411+
}
412+
261413
@Test
262414
fun `(OM2) Populate objectMessage missing id, timestamp and connectionId from protocolMessage`() = runTest {
263415
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()

0 commit comments

Comments
 (0)