Skip to content

Commit 708fe84

Browse files
committed
[AIT-276] refactor: fixes based on review comments
1 parent f57632f commit 708fe84

4 files changed

Lines changed: 175 additions & 101 deletions

File tree

liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,15 +319,14 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
319319
ChannelState.detached,
320320
ChannelState.suspended,
321321
ChannelState.failed -> {
322-
// RTO20e1 - fail any publishAndApply operations waiting for sync
323322
val errorReason = try { adapter.getChannel(channelName).reason } catch (e: Exception) { null }
324323
val error = ablyException(
325324
"publishAndApply could not be applied locally: channel entered $state whilst waiting for objects sync",
326325
ErrorCode.PublishAndApplyFailedDueToChannelState,
327326
HttpStatusCode.BadRequest,
328327
cause = errorReason?.let { AblyException.fromErrorInfo(it) }
329328
)
330-
objectsManager.failBufferedAcks(error)
329+
objectsManager.failBufferedAcks(error) // RTO20e1
331330
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
332331
objectsPool.clearObjectsData(false)
333332
objectsManager.clearSyncObjectsDataPool()

liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
2323
* @spec RTO7 - Buffered object operations during sync
2424
*/
2525
private val bufferedObjectOperations = mutableListOf<ObjectMessage>() // RTO7a
26-
/**
27-
* @spec RTO22 - ACK results buffered during sync, with deferred for caller waiting
28-
*/
29-
private val bufferedAcks = mutableListOf<Pair<List<ObjectMessage>, CompletableDeferred<Unit>>>()
26+
private var syncCompletionWaiter: CompletableDeferred<Unit>? = null
3027

3128
/**
3229
* Handles object messages (non-sync messages).
@@ -83,11 +80,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
8380
// need to discard all buffered object operation messages on new sync start
8481
bufferedObjectOperations.clear() // RTO5a2b
8582
syncObjectsDataPool.clear() // RTO5a2a
86-
// RTO21b - clear ACK tracking state on new sync (safety guard; RTO20e1 should have already failed deferreds)
87-
for ((_, deferred) in bufferedAcks) { deferred.cancel() }
88-
bufferedAcks.clear()
89-
realtimeObjects.appliedOnAckSerials.clear()
9083
currentSyncId = syncId
84+
syncCompletionWaiter = CompletableDeferred()
9185
stateChange(ObjectsState.Syncing, false)
9286
}
9387

@@ -98,47 +92,36 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
9892
*/
9993
internal fun endSync(deferStateEvent: Boolean) {
10094
Log.v(tag, "Ending sync sequence")
101-
applySync()
102-
realtimeObjects.appliedOnAckSerials.clear() // RTO5c9 - sync replaces state
103-
104-
// RTO5c6b: apply buffered ACKs before buffered OBJECT messages (proper dedup ordering)
105-
for ((messages, deferred) in bufferedAcks) {
106-
applyObjectMessages(messages, ObjectsOperationSource.LOCAL)
107-
deferred.complete(Unit) // signal publishAndApply to resume
108-
}
109-
bufferedAcks.clear()
110-
95+
applySync() // RTO5c1/2/7
11196
applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6
112-
bufferedObjectOperations.clear() // RTO5c5
113-
syncObjectsDataPool.clear() // RTO5c4
114-
currentSyncId = null // RTO5c3
115-
stateChange(ObjectsState.Synced, deferStateEvent) // RTO5c8
97+
bufferedObjectOperations.clear() // RTO5c5
98+
syncObjectsDataPool.clear() // RTO5c4
99+
currentSyncId = null // RTO5c3
100+
realtimeObjects.appliedOnAckSerials.clear() // RTO5c9
101+
stateChange(ObjectsState.Synced, deferStateEvent) // RTO5c8
102+
syncCompletionWaiter?.complete(Unit)
103+
syncCompletionWaiter = null
116104
}
117105

118106
/**
119107
* Called from publishAndApply (via withContext sequentialScope).
120108
* If SYNCED: apply immediately with LOCAL source.
121-
* If SYNCING: buffer, suspend deferred until endSync processes it (RTO20e).
109+
* If not SYNCED: suspend until endSync transitions to SYNCED (RTO20e), then apply.
122110
*/
123111
internal suspend fun applyAckResult(messages: List<ObjectMessage>) {
124-
if (realtimeObjects.state == ObjectsState.Synced) {
125-
applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
126-
} else {
127-
val deferred = CompletableDeferred<Unit>()
128-
bufferedAcks.add(Pair(messages, deferred))
129-
deferred.await() // RTO20e - suspends until endSync completes or channel fails (RTO20e1)
112+
if (realtimeObjects.state != ObjectsState.Synced) {
113+
if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred()
114+
syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
130115
}
116+
applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
131117
}
132118

133119
/**
134-
* Fails all buffered ACK deferreds.
120+
* Fails all pending apply waiters.
135121
* Called when the channel enters DETACHED/SUSPENDED/FAILED (RTO20e1).
136122
*/
137123
internal fun failBufferedAcks(error: AblyException) {
138-
for ((_, deferred) in bufferedAcks) {
139-
deferred.completeExceptionally(error)
140-
}
141-
bufferedAcks.clear()
124+
syncCompletionWaiter?.completeExceptionally(error)
142125
}
143126

144127
/**
@@ -301,8 +284,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
301284
}
302285

303286
internal fun dispose() {
304-
for ((_, deferred) in bufferedAcks) { deferred.cancel() }
305-
bufferedAcks.clear()
287+
syncCompletionWaiter?.cancel()
306288
syncObjectsDataPool.clear()
307289
bufferedObjectOperations.clear()
308290
disposeObjectsStateListeners()

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import io.ably.lib.types.ClientOptions
1717
import io.mockk.every
1818
import io.mockk.mockk
1919
import io.mockk.spyk
20+
import kotlinx.coroutines.CompletableDeferred
2021

2122
internal fun getMockRealtimeChannel(
2223
channelName: String,
@@ -73,8 +74,8 @@ internal val ObjectsManager.SyncObjectsDataPool: Map<String, ObjectState>
7374
internal val ObjectsManager.BufferedObjectOperations: List<ObjectMessage>
7475
get() = this.getPrivateField("bufferedObjectOperations")
7576

76-
internal val ObjectsManager.BufferedAcks: List<Pair<List<ObjectMessage>, *>>
77-
get() = this.getPrivateField("bufferedAcks")
77+
internal val ObjectsManager.SyncCompletionWaiter: CompletableDeferred<Unit>?
78+
get() = this.getPrivateField("syncCompletionWaiter")
7879

7980
internal var DefaultRealtimeObjects.ObjectsManager: ObjectsManager
8081
get() = this.getPrivateField("objectsManager")

0 commit comments

Comments
 (0)