Skip to content

Commit 33855f8

Browse files
committed
[AIT-276] fix: clear sync objects data pool only when channel state is not suspended
1 parent d732e91 commit 33855f8

3 files changed

Lines changed: 115 additions & 6 deletions

File tree

liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -319,19 +319,24 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
319319
ChannelState.detached,
320320
ChannelState.suspended,
321321
ChannelState.failed -> {
322-
val errorReason = try { adapter.getChannel(channelName).reason } catch (e: Exception) { null }
322+
val errorReason = try {
323+
adapter.getChannel(channelName).reason
324+
} catch (e: Exception) {
325+
null
326+
}
323327
val error = ablyException(
324328
"publishAndApply could not be applied locally: channel entered $state whilst waiting for objects sync",
325329
ErrorCode.PublishAndApplyFailedDueToChannelState,
326330
HttpStatusCode.BadRequest,
327331
cause = errorReason?.let { AblyException.fromErrorInfo(it) }
328332
)
329333
objectsManager.failBufferedAcks(error) // RTO20e1
330-
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
331-
objectsPool.clearObjectsData(false)
332-
objectsManager.clearSyncObjectsDataPool()
334+
if (state != ChannelState.suspended) {
335+
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
336+
objectsPool.clearObjectsData(false)
337+
objectsManager.clearSyncObjectsDataPool()
338+
}
333339
}
334-
335340
else -> {
336341
// No action needed for other states
337342
}

liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
122122
*/
123123
internal fun failBufferedAcks(error: AblyException) {
124124
syncCompletionWaiter?.completeExceptionally(error)
125+
syncCompletionWaiter = null
125126
}
126127

127128
/**

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@ import io.ably.lib.objects.unit.BufferedObjectOperations
1616
import io.ably.lib.objects.unit.ObjectsManager
1717
import io.ably.lib.objects.unit.SyncObjectsDataPool
1818
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
19+
import io.ably.lib.objects.unit.getMockRealtimeChannel
1920
import io.ably.lib.objects.unit.size
2021
import io.ably.lib.realtime.ChannelState
22+
import io.ably.lib.types.AblyException
23+
import io.ably.lib.types.ErrorInfo
2124
import io.ably.lib.types.ProtocolMessage
25+
import io.mockk.every
2226
import io.mockk.verify
27+
import kotlinx.coroutines.CompletableDeferred
2328
import kotlinx.coroutines.test.runTest
2429
import org.junit.Test
2530
import kotlin.test.assertEquals
26-
import io.mockk.every
31+
import kotlin.test.assertNotNull
2732

2833
class DefaultRealtimeObjectsTest {
2934

@@ -155,6 +160,104 @@ class DefaultRealtimeObjectsTest {
155160
}
156161
}
157162

163+
@Test
164+
fun `(RTO20e1) handleStateChange(DETACHED) fails pending ACK waiters with error 92008`() = runTest {
165+
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
166+
167+
// Capture the error passed to failBufferedAcks via a CompletableDeferred
168+
val capturedError = CompletableDeferred<AblyException>()
169+
every { defaultRealtimeObjects.ObjectsManager.failBufferedAcks(any()) } answers {
170+
capturedError.complete(firstArg())
171+
callOriginal()
172+
}
173+
174+
defaultRealtimeObjects.handleStateChange(ChannelState.detached, false)
175+
176+
val error = capturedError.await()
177+
assertEquals(92008, error.errorInfo.code) // PublishAndApplyFailedDueToChannelState
178+
}
179+
180+
@Test
181+
fun `(RTO20e1) handleStateChange(SUSPENDED) fails pending ACK waiters with error 92008`() = runTest {
182+
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
183+
184+
val capturedError = CompletableDeferred<AblyException>()
185+
every { defaultRealtimeObjects.ObjectsManager.failBufferedAcks(any()) } answers {
186+
capturedError.complete(firstArg())
187+
callOriginal()
188+
}
189+
190+
defaultRealtimeObjects.handleStateChange(ChannelState.suspended, false)
191+
192+
val error = capturedError.await()
193+
assertEquals(92008, error.errorInfo.code) // PublishAndApplyFailedDueToChannelState
194+
}
195+
196+
@Test
197+
fun `(RTO20e1) handleStateChange(FAILED) fails pending ACK waiters and propagates channel reason`() = runTest {
198+
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
199+
200+
// Override the channel returned by the adapter to carry a non-null reason
201+
val channelReason = ErrorInfo("channel failed due to auth error", 40100, 401)
202+
val channelWithReason = getMockRealtimeChannel("testChannelName")
203+
channelWithReason.reason = channelReason
204+
every { defaultRealtimeObjects.adapter.getChannel(any()) } returns channelWithReason
205+
206+
val capturedError = CompletableDeferred<AblyException>()
207+
every { defaultRealtimeObjects.ObjectsManager.failBufferedAcks(any()) } answers {
208+
capturedError.complete(firstArg())
209+
callOriginal()
210+
}
211+
212+
defaultRealtimeObjects.handleStateChange(ChannelState.failed, false)
213+
214+
val error = capturedError.await()
215+
assertEquals(92008, error.errorInfo.code)
216+
val causeException = error.cause as? AblyException
217+
assertNotNull(causeException, "Error cause must include the channel's reason")
218+
assertEquals(channelReason.code, causeException.errorInfo.code)
219+
assertEquals(channelReason.message, causeException.errorInfo.message)
220+
}
221+
222+
@Test
223+
fun `(RTO4) handleStateChange(SUSPENDED) does NOT clear objects data`() = runTest {
224+
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
225+
226+
// Use the failBufferedAcks call as a signal that the state-change coroutine has run to completion
227+
val failCalled = CompletableDeferred<Unit>()
228+
every { defaultRealtimeObjects.ObjectsManager.failBufferedAcks(any()) } answers {
229+
callOriginal()
230+
failCalled.complete(Unit)
231+
}
232+
233+
defaultRealtimeObjects.handleStateChange(ChannelState.suspended, false)
234+
235+
// For SUSPENDED, the coroutine ends immediately after failBufferedAcks (no clear calls)
236+
failCalled.await()
237+
238+
verify(exactly = 0) { defaultRealtimeObjects.objectsPool.clearObjectsData(any()) }
239+
verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
240+
}
241+
242+
@Test
243+
fun `(RTO4) handleStateChange(DETACHED) clears objects data and sync pool`() = runTest {
244+
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
245+
246+
// Use clearSyncObjectsDataPool (the last operation in the coroutine) as the completion signal
247+
val syncPoolCleared = CompletableDeferred<Unit>()
248+
every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() } answers {
249+
callOriginal()
250+
syncPoolCleared.complete(Unit)
251+
}
252+
253+
defaultRealtimeObjects.handleStateChange(ChannelState.detached, false)
254+
255+
syncPoolCleared.await()
256+
257+
verify(exactly = 1) { defaultRealtimeObjects.objectsPool.clearObjectsData(false) }
258+
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
259+
}
260+
158261
@Test
159262
fun `(OM2) Populate objectMessage missing id, timestamp and connectionId from protocolMessage`() = runTest {
160263
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()

0 commit comments

Comments
 (0)