Skip to content

Commit 33ff9e2

Browse files
committed
[ECO-5457] Created separate interface for ObjectsStateChange to avoid polluting
LiveObjects - Added impl. for the same in ObjectsState.kt - Extended the impl. in objectsmanager
1 parent 21703e4 commit 33ff9e2

7 files changed

Lines changed: 158 additions & 110 deletions

File tree

lib/src/main/java/io/ably/lib/objects/LiveObjects.java

Lines changed: 2 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package io.ably.lib.objects;
22

3-
import io.ably.lib.objects.state.ObjectsStateEvent;
4-
import io.ably.lib.objects.state.ObjectsStateListener;
5-
import io.ably.lib.objects.state.ObjectsStateSubscription;
3+
import io.ably.lib.objects.state.ObjectsStateChange;
64
import io.ably.lib.types.Callback;
75
import org.jetbrains.annotations.Blocking;
86
import org.jetbrains.annotations.NonBlocking;
@@ -19,7 +17,7 @@
1917
* <p>Implementations of this interface must be thread-safe as they may be accessed
2018
* from multiple threads concurrently.
2119
*/
22-
public interface LiveObjects {
20+
public interface LiveObjects extends ObjectsStateChange {
2321

2422
/**
2523
* Retrieves the root LiveMap object.
@@ -151,38 +149,4 @@ public interface LiveObjects {
151149
*/
152150
@NonBlocking
153151
void createCounterAsync(@NotNull Long initialValue, @NotNull Callback<@NotNull LiveCounter> callback);
154-
155-
/**
156-
* Subscribes to a specific Live Objects synchronization state event.
157-
*
158-
* <p>This method registers the provided listener to be notified when the specified
159-
* synchronization state event occurs. The returned subscription can be used to
160-
* unsubscribe later when the notifications are no longer needed.
161-
*
162-
* @param event the synchronization state event to subscribe to (SYNCING or SYNCED)
163-
* @param listener the listener that will be called when the event occurs
164-
* @return a subscription object that can be used to unsubscribe from the event
165-
*/
166-
@NonBlocking
167-
ObjectsStateSubscription on(@NotNull ObjectsStateEvent event, @NotNull ObjectsStateListener listener);
168-
169-
/**
170-
* Unsubscribes the specified listener from all synchronization state events.
171-
*
172-
* <p>After calling this method, the provided listener will no longer receive
173-
* any synchronization state event notifications.
174-
*
175-
* @param listener the listener to unregister from all events
176-
*/
177-
@NonBlocking
178-
void off(@NotNull ObjectsStateListener listener);
179-
180-
/**
181-
* Unsubscribes all listeners from all synchronization state events.
182-
*
183-
* <p>After calling this method, no listeners will receive any synchronization
184-
* state event notifications until new listeners are registered.
185-
*/
186-
@NonBlocking
187-
void offAll();
188152
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.ably.lib.objects.state;
2+
3+
import org.jetbrains.annotations.NonBlocking;
4+
import org.jetbrains.annotations.NotNull;
5+
6+
public interface ObjectsStateChange {
7+
/**
8+
* Subscribes to a specific Live Objects synchronization state event.
9+
*
10+
* <p>This method registers the provided listener to be notified when the specified
11+
* synchronization state event occurs. The returned subscription can be used to
12+
* unsubscribe later when the notifications are no longer needed.
13+
*
14+
* @param event the synchronization state event to subscribe to (SYNCING or SYNCED)
15+
* @param listener the listener that will be called when the event occurs
16+
* @return a subscription object that can be used to unsubscribe from the event
17+
*/
18+
@NonBlocking
19+
ObjectsStateSubscription on(@NotNull ObjectsStateEvent event, @NotNull ObjectsStateChange.Listener listener);
20+
21+
/**
22+
* Unsubscribes the specified listener from all synchronization state events.
23+
*
24+
* <p>After calling this method, the provided listener will no longer receive
25+
* any synchronization state event notifications.
26+
*
27+
* @param listener the listener to unregister from all events
28+
*/
29+
@NonBlocking
30+
void off(@NotNull ObjectsStateChange.Listener listener);
31+
32+
/**
33+
* Unsubscribes all listeners from all synchronization state events.
34+
*
35+
* <p>After calling this method, no listeners will receive any synchronization
36+
* state event notifications until new listeners are registered.
37+
*/
38+
@NonBlocking
39+
void offAll();
40+
41+
/**
42+
* Interface for receiving notifications about Live Objects synchronization state changes.
43+
* <p>
44+
* Implement this interface and register it with an ObjectsStateEmitter to be notified
45+
* when synchronization state transitions occur.
46+
*/
47+
interface Listener {
48+
/**
49+
* Called when the synchronization state changes.
50+
*
51+
* @param objectsStateEvent The new state event (SYNCING or SYNCED)
52+
*/
53+
void onStateChanged(ObjectsStateEvent objectsStateEvent);
54+
}
55+
}

lib/src/main/java/io/ably/lib/objects/state/ObjectsStateEvent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
* Represents the synchronization state of Ably Live Objects.
55
* <p>
66
* This enum is used to notify listeners about state changes in the synchronization process.
7-
* Clients can register an {@link ObjectsStateListener} to receive these events.
7+
* Clients can register an {@link ObjectsStateChange.Listener} to receive these events.
88
*/
99
public enum ObjectsStateEvent {
1010
/**

lib/src/main/java/io/ably/lib/objects/state/ObjectsStateListener.java

Lines changed: 0 additions & 16 deletions
This file was deleted.

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.ably.lib.objects
22

3+
import io.ably.lib.objects.state.ObjectsStateChange
34
import io.ably.lib.objects.state.ObjectsStateEvent
4-
import io.ably.lib.objects.state.ObjectsStateListener
55
import io.ably.lib.objects.state.ObjectsStateSubscription
66
import io.ably.lib.realtime.ChannelState
77
import io.ably.lib.types.Callback
@@ -90,25 +90,17 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
9090
TODO("Not yet implemented")
9191
}
9292

93-
override fun on(event: ObjectsStateEvent, listener: ObjectsStateListener): ObjectsStateSubscription {
94-
objectsManager.publicObjectStateEmitter.on(event, listener)
95-
return ObjectsStateSubscription {
96-
objectsManager.publicObjectStateEmitter.off(event, listener)
97-
}
98-
}
93+
override fun on(event: ObjectsStateEvent, listener: ObjectsStateChange.Listener): ObjectsStateSubscription =
94+
objectsManager.on(event, listener)
9995

100-
override fun off(listener: ObjectsStateListener) {
101-
objectsManager.publicObjectStateEmitter.off(listener)
102-
}
96+
override fun off(listener: ObjectsStateChange.Listener) = objectsManager.off(listener)
10397

104-
override fun offAll() {
105-
objectsManager.publicObjectStateEmitter.off()
106-
}
98+
override fun offAll() = objectsManager.offAll()
10799

108100
private suspend fun getRootAsync(): LiveMap {
109101
return sequentialScope.async {
110102
adapter.throwIfInvalidAccessApiConfiguration(channelName)
111-
objectsManager.ensureSynced()
103+
objectsManager.ensureSynced(state)
112104
objectsPool.get(ROOT_OBJECT_ID) as LiveMap
113105
}.await()
114106
}

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
package io.ably.lib.objects
22

3-
import io.ably.lib.objects.state.ObjectsStateEvent
43
import io.ably.lib.objects.type.BaseLiveObject
54
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
65
import io.ably.lib.objects.type.livemap.DefaultLiveMap
76
import io.ably.lib.util.Log
8-
import kotlinx.coroutines.*
97

108
/**
119
* @spec RTO5 - Processes OBJECT and OBJECT_SYNC messages during sync sequences
1210
* @spec RTO6 - Creates zero-value objects when needed
1311
*/
14-
internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
12+
internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): ObjectsStateCoordinator() {
1513
private val tag = "ObjectsManager"
1614
/**
1715
* @spec RTO5 - Sync objects data pool for collecting sync messages
@@ -23,13 +21,6 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
2321
*/
2422
private val bufferedObjectOperations = mutableListOf<ObjectMessage>() // RTO7a
2523

26-
// composition over inheritance, used to handle object state changes internally
27-
private val internalObjectStateEmitter = ObjectsStateEmitter()
28-
// related to RTC10, should have a separate EventEmitter for users of the library
29-
internal val publicObjectStateEmitter = ObjectsStateEmitter()
30-
// Coroutine scope for running sequential operations on a single thread, used to avoid concurrency issues.
31-
private val emitterScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob())
32-
3324
/**
3425
* Handles object messages (non-sync messages).
3526
*
@@ -224,21 +215,6 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
224215
}
225216
}
226217

227-
/**
228-
* Suspends the current coroutine until objects are synchronized.
229-
* Returns immediately if state is already SYNCED, otherwise waits for the SYNCED event.
230-
*/
231-
internal suspend fun ensureSynced() {
232-
if (liveObjects.state != ObjectsState.SYNCED) {
233-
val deferred = CompletableDeferred<Unit>()
234-
internalObjectStateEmitter.once(ObjectsStateEvent.SYNCED) {
235-
Log.v(tag, "Objects state changed to SYNCED, resuming ensureSynced")
236-
deferred.complete(Unit)
237-
}
238-
deferred.await()
239-
}
240-
}
241-
242218
/**
243219
* Changes the state and emits events.
244220
*
@@ -248,24 +224,16 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
248224
if (liveObjects.state == newState) {
249225
return
250226
}
251-
227+
Log.v(tag, "Objects state changed to: $newState from ${liveObjects.state}")
252228
liveObjects.state = newState
253-
Log.v(tag, "Objects state changed to: $newState")
254229

255-
val event = objectsStateToEventMap[newState]
256-
event?.let {
257-
// Use of deferEvent not needed since emitterScope makes sure next launch can only start when previous launch
258-
// finishes processing of all events. Also, emit method is synchronized amongst different threads
259-
emitterScope.launch {
260-
internalObjectStateEmitter.emit(it)
261-
publicObjectStateEmitter.emit(it)
262-
}
263-
}
230+
// deferEvent not needed since objectsStateChanged processes events in a sequential coroutine scope
231+
objectsStateChanged(newState)
264232
}
265233

266234
internal fun dispose() {
267235
syncObjectsDataPool.clear()
268236
bufferedObjectOperations.clear()
269-
emitterScope.cancel("ObjectsManager disposed")
237+
disposeObjectsStateListeners()
270238
}
271239
}
Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package io.ably.lib.objects
22

3+
import io.ably.lib.objects.state.ObjectsStateChange
34
import io.ably.lib.objects.state.ObjectsStateEvent
4-
import io.ably.lib.objects.state.ObjectsStateListener
5+
import io.ably.lib.objects.state.ObjectsStateSubscription
56
import io.ably.lib.util.EventEmitter
7+
import io.ably.lib.util.Log
8+
import kotlinx.coroutines.*
69

710
/**
811
* @spec RTO2 - enum representing objects state
@@ -13,14 +16,96 @@ internal enum class ObjectsState {
1316
SYNCED
1417
}
1518

16-
internal val objectsStateToEventMap = mapOf(
19+
/**
20+
* Maps internal ObjectsState values to their corresponding public ObjectsStateEvent values.
21+
* Used to determine which events should be emitted when state changes occur.
22+
* INITIALIZED maps to null (no event), while SYNCING and SYNCED map to their respective events.
23+
*/
24+
private val objectsStateToEventMap = mapOf(
1725
ObjectsState.INITIALIZED to null,
1826
ObjectsState.SYNCING to ObjectsStateEvent.SYNCING,
1927
ObjectsState.SYNCED to ObjectsStateEvent.SYNCED
2028
)
2129

22-
internal class ObjectsStateEmitter : EventEmitter<ObjectsStateEvent, ObjectsStateListener>() {
23-
override fun apply(listener: ObjectsStateListener?, event: ObjectsStateEvent?, vararg args: Any?) {
30+
/**
31+
* An interface for managing and communicating changes in the synchronization state of live objects.
32+
*
33+
* Implementations should ensure thread-safe event emission and proper synchronization
34+
* between state change notifications.
35+
*/
36+
internal interface HandlesObjectsStateChange {
37+
/**
38+
* Handles changes in the state of live objects by notifying all registered listeners.
39+
* Implementations should ensure thread-safe event emission to both internal and public listeners.
40+
* Makes sure every event is processed in the order they were received.
41+
* @param newState The new state of the objects, SYNCING or SYNCED.
42+
*/
43+
fun objectsStateChanged(newState: ObjectsState)
44+
45+
/**
46+
* Suspends the current coroutine until objects are synchronized.
47+
* Returns immediately if state is already SYNCED, otherwise waits for the SYNCED event.
48+
*
49+
* @param currentState The current state of objects to determine if waiting is necessary
50+
*/
51+
suspend fun ensureSynced(currentState: ObjectsState)
52+
53+
/**
54+
* Disposes all registered state change listeners and cancels any pending operations.
55+
* Should be called when the associated LiveObjects instance is no longer needed.
56+
*/
57+
fun disposeObjectsStateListeners()
58+
}
59+
60+
61+
internal abstract class ObjectsStateCoordinator : ObjectsStateChange, HandlesObjectsStateChange {
62+
private val tag = "ObjectsStateCoordinator"
63+
private val internalObjectStateEmitter = ObjectsStateEmitter()
64+
// related to RTC10, should have a separate EventEmitter for users of the library
65+
private val externalObjectStateEmitter = ObjectsStateEmitter()
66+
67+
private val emitterScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob())
68+
69+
override fun on(event: ObjectsStateEvent, listener: ObjectsStateChange.Listener): ObjectsStateSubscription {
70+
externalObjectStateEmitter.on(event, listener)
71+
return ObjectsStateSubscription {
72+
externalObjectStateEmitter.off(event, listener)
73+
}
74+
}
75+
76+
override fun off(listener: ObjectsStateChange.Listener) = externalObjectStateEmitter.off(listener)
77+
78+
override fun offAll() = externalObjectStateEmitter.off()
79+
80+
override fun objectsStateChanged(newState: ObjectsState) {
81+
objectsStateToEventMap[newState]?.let { objectsStateEvent ->
82+
// emitterScope makes sure next launch can only start when previous launch finishes
83+
emitterScope.launch {
84+
internalObjectStateEmitter.emit(objectsStateEvent)
85+
externalObjectStateEmitter.emit(objectsStateEvent)
86+
}
87+
}
88+
}
89+
90+
override suspend fun ensureSynced(currentState: ObjectsState) {
91+
if (currentState != ObjectsState.SYNCED) {
92+
val deferred = CompletableDeferred<Unit>()
93+
internalObjectStateEmitter.once(ObjectsStateEvent.SYNCED) {
94+
Log.v(tag, "Objects state changed to SYNCED, resuming ensureSynced")
95+
deferred.complete(Unit)
96+
}
97+
deferred.await()
98+
}
99+
}
100+
101+
override fun disposeObjectsStateListeners() {
102+
offAll()
103+
emitterScope.cancel("ObjectsManager disposed")
104+
}
105+
}
106+
107+
private class ObjectsStateEmitter : EventEmitter<ObjectsStateEvent, ObjectsStateChange.Listener>() {
108+
override fun apply(listener: ObjectsStateChange.Listener?, event: ObjectsStateEvent?, vararg args: Any?) {
24109
listener?.onStateChanged(event!!)
25110
}
26111
}

0 commit comments

Comments
 (0)