@@ -4,6 +4,7 @@ import io.ably.lib.objects.type.BaseLiveObject
44import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
55import io.ably.lib.objects.type.livemap.DefaultLiveMap
66import io.ably.lib.util.Log
7+ import kotlinx.coroutines.*
78
89/* *
910 * @spec RTO5 - Processes OBJECT and OBJECT_SYNC messages during sync sequences
@@ -21,6 +22,13 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
2122 */
2223 private val bufferedObjectOperations = mutableListOf<ObjectMessage >() // RTO7a
2324
25+ // composition over inheritance, used to handle object state changes internally
26+ private val internalObjectStateEmitter = ObjectsStateEmitter ()
27+ // related to RTC10, should have a separate EventEmitter for users of the library
28+ private val publicObjectStateEmitter = ObjectsStateEmitter ()
29+ // Coroutine scope for running sequential operations on a single thread, used to avoid concurrency issues.
30+ private val emitterScope = CoroutineScope (Dispatchers .Default .limitedParallelism(1 ) + SupervisorJob ())
31+
2432 /* *
2533 * Handles object messages (non-sync messages).
2634 *
@@ -77,7 +85,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
7785 bufferedObjectOperations.clear() // RTO5a2b
7886 syncObjectsDataPool.clear() // RTO5a2a
7987 currentSyncId = syncId
80- liveObjects. stateChange(ObjectsState .SYNCING , false )
88+ stateChange(ObjectsState .SYNCING , false )
8189 }
8290
8391 /* *
@@ -95,7 +103,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
95103 bufferedObjectOperations.clear() // RTO5c5
96104 syncObjectsDataPool.clear() // RTO5c4
97105 currentSyncId = null // RTO5c3
98- liveObjects. stateChange(ObjectsState .SYNCED , deferStateEvent)
106+ stateChange(ObjectsState .SYNCED , deferStateEvent)
99107 }
100108
101109 /* *
@@ -215,8 +223,48 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
215223 }
216224 }
217225
226+ /* *
227+ * Suspends the current coroutine until objects are synchronized.
228+ * Returns immediately if state is already SYNCED, otherwise waits for the SYNCED event.
229+ */
230+ internal suspend fun ensureSynced () {
231+ if (liveObjects.state != ObjectsState .SYNCED ) {
232+ val deferred = CompletableDeferred <Unit >()
233+ internalObjectStateEmitter.once(ObjectsEvent .SYNCED ) {
234+ Log .v(tag, " Objects state changed to SYNCED, resuming ensureSynced" )
235+ deferred.complete(Unit )
236+ }
237+ deferred.await()
238+ }
239+ }
240+
241+ /* *
242+ * Changes the state and emits events.
243+ *
244+ * @spec RTO2 - Emits state change events for syncing and synced states
245+ */
246+ private fun stateChange (newState : ObjectsState , deferEvent : Boolean ) {
247+ if (liveObjects.state == newState) {
248+ return
249+ }
250+
251+ liveObjects.state = newState
252+ Log .v(tag, " Objects state changed to: $newState " )
253+
254+ val event = objectsStateToEventMap[newState]
255+ event?.let {
256+ // Use of deferEvent not needed, since emit method is synchronized amongst different threads
257+ // emitterScope makes sure, next launch can only start when previous launch finishes processing of all events
258+ emitterScope.launch {
259+ internalObjectStateEmitter.emit(it)
260+ publicObjectStateEmitter.emit(it)
261+ }
262+ }
263+ }
264+
218265 internal fun dispose () {
219266 syncObjectsDataPool.clear()
220267 bufferedObjectOperations.clear()
268+ emitterScope.cancel(" ObjectsManager disposed" )
221269 }
222270}
0 commit comments