Skip to content

Commit 38a3fee

Browse files
committed
[ECO-5457] Moved callbackScope at global level to be shared amongst all public api methods
1 parent 67c2b62 commit 38a3fee

5 files changed

Lines changed: 38 additions & 41 deletions

File tree

lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ public interface LiveObjectsPlugin {
4646
* Disposes of the LiveObjects instance associated with the specified channel name.
4747
* This method removes the LiveObjects instance for the given channel, releasing any
4848
* resources associated with it.
49+
* This is invoked when ablyRealtimeClient.channels.release(channelName) is called
4950
*
5051
* @param channelName the name of the channel whose LiveObjects instance is to be removed.
5152
*/
5253
void dispose(@NotNull String channelName);
5354

5455
/**
5556
* Disposes of the plugin instance and all underlying resources.
57+
* This is invoked when ablyRealtimeClient.close() is called
5658
*/
5759
void dispose();
5860
}

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import io.ably.lib.util.Log
99
import kotlinx.coroutines.*
1010
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1111
import kotlinx.coroutines.flow.MutableSharedFlow
12+
import java.util.concurrent.CancellationException
1213

1314
/**
1415
* Default implementation of LiveObjects interface.
@@ -34,12 +35,6 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
3435
private val sequentialScope =
3536
CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName(channelName) + SupervisorJob())
3637

37-
/**
38-
* Coroutine scope for handling callbacks asynchronously.
39-
*/
40-
private val callbackScope =
41-
CoroutineScope(Dispatchers.Default + CoroutineName("LiveObjectsCallback-$channelName") + SupervisorJob())
42-
4338
/**
4439
* Event bus for handling incoming object messages sequentially.
4540
*/
@@ -50,12 +45,10 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
5045
incomingObjectsHandler = initializeHandlerForIncomingObjectMessages()
5146
}
5247

53-
override fun getRoot(): LiveMap {
54-
return runBlocking { getRootAsync() }
55-
}
48+
override fun getRoot(): LiveMap = runBlocking { getRootAsync() }
5649

5750
override fun getRootAsync(callback: Callback<LiveMap>) {
58-
callbackScope.launchWithCallback(callback) { getRootAsync() }
51+
GlobalCallbackScope.launchWithCallback(callback) { getRootAsync() }
5952
}
6053

6154
override fun createMap(liveMap: LiveMap): LiveMap {
@@ -97,12 +90,10 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
9790

9891
override fun offAll() = objectsManager.offAll()
9992

100-
private suspend fun getRootAsync(): LiveMap {
101-
return sequentialScope.async {
102-
adapter.throwIfInvalidAccessApiConfiguration(channelName)
103-
objectsManager.ensureSynced(state)
104-
objectsPool.get(ROOT_OBJECT_ID) as LiveMap
105-
}.await()
93+
private suspend fun getRootAsync(): LiveMap = withContext(sequentialScope.coroutineContext) {
94+
adapter.throwIfInvalidAccessApiConfiguration(channelName)
95+
objectsManager.ensureSynced(state)
96+
objectsPool.get(ROOT_OBJECT_ID) as LiveMap
10697
}
10798

10899
/**
@@ -188,9 +179,12 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
188179
}
189180

190181
// Dispose of any resources associated with this LiveObjects instance
191-
fun dispose() {
192-
incomingObjectsHandler.cancel() // objectsEventBus automatically garbage collected when collector is cancelled
182+
fun dispose(reason: String) {
183+
val cancellationError = CancellationException("Objects disposed for channel $channelName, reason: $reason")
184+
incomingObjectsHandler.cancel(cancellationError) // objectsEventBus automatically garbage collected when collector is cancelled
193185
objectsPool.dispose()
194186
objectsManager.dispose()
187+
// Don't cancel sequentialScope (needed in public methods), just cancel ongoing coroutines
188+
sequentialScope.coroutineContext.cancelChildren(cancellationError)
195189
}
196190
}

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) :
2222
}
2323

2424
override fun dispose(channelName: String) {
25-
liveObjects[channelName]?.dispose()
25+
liveObjects[channelName]?.dispose("Channel has ben released using channels.release()")
2626
liveObjects.remove(channelName)
2727
}
2828

2929
override fun dispose() {
3030
liveObjects.values.forEach {
31-
it.dispose()
31+
it.dispose("AblyClient has been closed using client.close()")
3232
}
3333
liveObjects.clear()
3434
}

live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,26 +49,26 @@ internal val String.byteSize: Int
4949
get() = this.toByteArray(Charsets.UTF_8).size
5050

5151
/**
52-
* Executes a suspend function within a coroutine and handles the result via a callback.
53-
*
54-
* This utility bridges between coroutine-based implementation code and callback-based APIs.
55-
* It launches a coroutine in the current scope to execute the provided suspend block,
56-
* then routes the result or any error to the appropriate callback method.
57-
*
58-
* @param T The type of result expected from the operation
59-
* @param callback The callback to invoke with the operation result or error
60-
* @param block The suspend function to execute that returns a value of type T
52+
* A global coroutine scope for executing callbacks asynchronously.
53+
* Provides safe execution of suspend functions with results delivered via callbacks,
54+
* with proper error handling for both the execution and callback invocation.
6155
*/
62-
internal fun <T> CoroutineScope.launchWithCallback(callback: Callback<T>, block: suspend () -> T) {
63-
launch {
64-
try {
65-
val result = block()
66-
try { callback.onSuccess(result) } catch (t: Throwable) {
67-
Log.e("asyncCallback", "Error occurred while executing callback's onSuccess handler", t)
68-
} // catch and don't rethrow error from callback
69-
} catch (throwable: Throwable) {
70-
val exception = throwable as? AblyException
71-
callback.onError(exception?.errorInfo)
56+
internal object GlobalCallbackScope {
57+
private const val TAG = "GlobalCallbackScope"
58+
private val scope =
59+
CoroutineScope(Dispatchers.Default + CoroutineName("LiveObjects-GlobalCallbackScope") + SupervisorJob())
60+
61+
internal fun <T> launchWithCallback(callback: Callback<T>, block: suspend () -> T) {
62+
scope.launch {
63+
try {
64+
val result = block()
65+
try { callback.onSuccess(result) } catch (t: Throwable) {
66+
Log.e(TAG, "Error occurred while executing callback's onSuccess handler", t)
67+
} // catch and don't rethrow error from callback
68+
} catch (throwable: Throwable) {
69+
val exception = throwable as? AblyException
70+
callback.onError(exception?.errorInfo)
71+
}
7272
}
7373
}
7474
}

live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultLiveObjectsTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,16 @@ class DefaultLiveObjectsTest {
5757
// RTO4b - If the HAS_OBJECTS flag is 0, the sync sequence must be considered complete immediately
5858
defaultLiveObjects.handleStateChange(ChannelState.attached, false)
5959

60+
// Verify expected outcomes
61+
assertWaiter { defaultLiveObjects.state == ObjectsState.SYNCED } // RTO4b4
62+
6063
verify(exactly = 1) {
6164
defaultLiveObjects.objectsPool.resetToInitialPool(true)
6265
}
6366
verify(exactly = 1) {
6467
defaultLiveObjects.ObjectsManager.endSync(any<Boolean>())
6568
}
6669

67-
// Verify expected outcomes
68-
assertWaiter { defaultLiveObjects.state == ObjectsState.SYNCED } // RTO4b4
6970
assertEquals(0, defaultLiveObjects.ObjectsManager.SyncObjectsDataPool.size) // RTO4b3
7071
assertEquals(0, defaultLiveObjects.ObjectsManager.BufferedObjectOperations.size) // RTO4b5
7172
assertEquals(1, defaultLiveObjects.objectsPool.size()) // RTO4b1 - Only root remains

0 commit comments

Comments
 (0)