Skip to content

Commit 21703e4

Browse files
committed
[ECO-5457] feat: Add Live Objects state change handling
• Add ObjectsStateEvent enum for tracking Live Objects synchronization states (SYNCING/SYNCED) • Implement ObjectsStateListener interface for receiving state change notifications • Create ObjectsStateSubscription interface for managing listener subscriptions and cleanup • Update LiveObjects, DefaultLiveObjects, ObjectsManager, ObjectsState, and Utils classes to support state management
1 parent 40c7936 commit 21703e4

8 files changed

Lines changed: 123 additions & 19 deletions

File tree

lib/src/main/java/io/ably/lib/objects/LiveObjects.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package io.ably.lib.objects;
22

3+
import io.ably.lib.objects.state.ObjectsStateEvent;
4+
import io.ably.lib.objects.state.ObjectsStateListener;
5+
import io.ably.lib.objects.state.ObjectsStateSubscription;
36
import io.ably.lib.types.Callback;
47
import org.jetbrains.annotations.Blocking;
58
import org.jetbrains.annotations.NonBlocking;
@@ -148,4 +151,38 @@ public interface LiveObjects {
148151
*/
149152
@NonBlocking
150153
void createCounterAsync(@NotNull Long initialValue, @NotNull Callback<@NotNull LiveCounter> callback);
154+
155+
/**
156+
* Subscribes to a specific Live Objects synchronization state event.
157+
*
158+
* <p>This method registers the provided listener to be notified when the specified
159+
* synchronization state event occurs. The returned subscription can be used to
160+
* unsubscribe later when the notifications are no longer needed.
161+
*
162+
* @param event the synchronization state event to subscribe to (SYNCING or SYNCED)
163+
* @param listener the listener that will be called when the event occurs
164+
* @return a subscription object that can be used to unsubscribe from the event
165+
*/
166+
@NonBlocking
167+
ObjectsStateSubscription on(@NotNull ObjectsStateEvent event, @NotNull ObjectsStateListener listener);
168+
169+
/**
170+
* Unsubscribes the specified listener from all synchronization state events.
171+
*
172+
* <p>After calling this method, the provided listener will no longer receive
173+
* any synchronization state event notifications.
174+
*
175+
* @param listener the listener to unregister from all events
176+
*/
177+
@NonBlocking
178+
void off(@NotNull ObjectsStateListener listener);
179+
180+
/**
181+
* Unsubscribes all listeners from all synchronization state events.
182+
*
183+
* <p>After calling this method, no listeners will receive any synchronization
184+
* state event notifications until new listeners are registered.
185+
*/
186+
@NonBlocking
187+
void offAll();
151188
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.ably.lib.objects.state;
2+
3+
/**
4+
* Represents the synchronization state of Ably Live Objects.
5+
* <p>
6+
* This enum is used to notify listeners about state changes in the synchronization process.
7+
* Clients can register an {@link ObjectsStateListener} to receive these events.
8+
*/
9+
public enum ObjectsStateEvent {
10+
/**
11+
* Indicates that synchronization between local and remote objects is in progress.
12+
*/
13+
SYNCING,
14+
15+
/**
16+
* Indicates that synchronization has completed successfully and objects are in sync.
17+
*/
18+
SYNCED
19+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.ably.lib.objects.state;
2+
3+
/**
4+
* Interface for receiving notifications about Live Objects synchronization state changes.
5+
* <p>
6+
* Implement this interface and register it with an ObjectsStateEmitter to be notified
7+
* when synchronization state transitions occur.
8+
*/
9+
public interface ObjectsStateListener {
10+
/**
11+
* Called when the synchronization state changes.
12+
*
13+
* @param objectsStateEvent The new state event (SYNCING or SYNCED)
14+
*/
15+
void onStateChanged(ObjectsStateEvent objectsStateEvent);
16+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.ably.lib.objects.state;
2+
3+
/**
4+
* Represents a subscription that can be unsubscribed from.
5+
* This interface provides a way to clean up and remove subscriptions when they
6+
* are no longer needed.
7+
* Example usage:
8+
* ```java
9+
* ObjectsStateSubscription s = objects.subscribe(ObjectsStateEvent.SYNCING, new ObjectsStateListener() {});
10+
* // Later when done with the subscription
11+
* s.unsubscribe();
12+
*/
13+
public interface ObjectsStateSubscription {
14+
/**
15+
* This method should be called when the subscription is no longer needed,
16+
* it will make sure no further events will be sent to the subscriber and
17+
* that references to the subscriber are cleaned up.
18+
*/
19+
void unsubscribe();
20+
}

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package io.ably.lib.objects
22

3+
import io.ably.lib.objects.state.ObjectsStateEvent
4+
import io.ably.lib.objects.state.ObjectsStateListener
5+
import io.ably.lib.objects.state.ObjectsStateSubscription
36
import io.ably.lib.realtime.ChannelState
47
import io.ably.lib.types.Callback
58
import io.ably.lib.types.ProtocolMessage
@@ -52,7 +55,7 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
5255
}
5356

5457
override fun getRootAsync(callback: Callback<LiveMap>) {
55-
callbackScope.with(callback) { getRootAsync() }
58+
callbackScope.launchWithCallback(callback) { getRootAsync() }
5659
}
5760

5861
override fun createMap(liveMap: LiveMap): LiveMap {
@@ -87,6 +90,21 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
8790
TODO("Not yet implemented")
8891
}
8992

93+
override fun on(event: ObjectsStateEvent, listener: ObjectsStateListener): ObjectsStateSubscription {
94+
objectsManager.publicObjectStateEmitter.on(event, listener)
95+
return ObjectsStateSubscription {
96+
objectsManager.publicObjectStateEmitter.off(event, listener)
97+
}
98+
}
99+
100+
override fun off(listener: ObjectsStateListener) {
101+
objectsManager.publicObjectStateEmitter.off(listener)
102+
}
103+
104+
override fun offAll() {
105+
objectsManager.publicObjectStateEmitter.off()
106+
}
107+
90108
private suspend fun getRootAsync(): LiveMap {
91109
return sequentialScope.async {
92110
adapter.throwIfInvalidAccessApiConfiguration(channelName)

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.ably.lib.objects
22

3+
import io.ably.lib.objects.state.ObjectsStateEvent
34
import io.ably.lib.objects.type.BaseLiveObject
45
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
56
import io.ably.lib.objects.type.livemap.DefaultLiveMap
@@ -25,7 +26,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
2526
// composition over inheritance, used to handle object state changes internally
2627
private val internalObjectStateEmitter = ObjectsStateEmitter()
2728
// related to RTC10, should have a separate EventEmitter for users of the library
28-
private val publicObjectStateEmitter = ObjectsStateEmitter()
29+
internal val publicObjectStateEmitter = ObjectsStateEmitter()
2930
// Coroutine scope for running sequential operations on a single thread, used to avoid concurrency issues.
3031
private val emitterScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob())
3132

@@ -230,7 +231,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
230231
internal suspend fun ensureSynced() {
231232
if (liveObjects.state != ObjectsState.SYNCED) {
232233
val deferred = CompletableDeferred<Unit>()
233-
internalObjectStateEmitter.once(ObjectsEvent.SYNCED) {
234+
internalObjectStateEmitter.once(ObjectsStateEvent.SYNCED) {
234235
Log.v(tag, "Objects state changed to SYNCED, resuming ensureSynced")
235236
deferred.complete(Unit)
236237
}
@@ -253,8 +254,8 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
253254

254255
val event = objectsStateToEventMap[newState]
255256
event?.let {
256-
// Use of deferEvent not needed, since emit method is synchronized amongst different threads
257-
// emitterScope makes sure, next launch can only start when previous launch finishes processing of all events
257+
// Use of deferEvent not needed since emitterScope makes sure next launch can only start when previous launch
258+
// finishes processing of all events. Also, emit method is synchronized amongst different threads
258259
emitterScope.launch {
259260
internalObjectStateEmitter.emit(it)
260261
publicObjectStateEmitter.emit(it)
Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.ably.lib.objects
22

3+
import io.ably.lib.objects.state.ObjectsStateEvent
4+
import io.ably.lib.objects.state.ObjectsStateListener
35
import io.ably.lib.util.EventEmitter
46

57
/**
@@ -11,23 +13,14 @@ internal enum class ObjectsState {
1113
SYNCED
1214
}
1315

14-
public enum class ObjectsEvent {
15-
SYNCING,
16-
SYNCED
17-
}
18-
1916
internal val objectsStateToEventMap = mapOf(
2017
ObjectsState.INITIALIZED to null,
21-
ObjectsState.SYNCING to ObjectsEvent.SYNCING,
22-
ObjectsState.SYNCED to ObjectsEvent.SYNCED
18+
ObjectsState.SYNCING to ObjectsStateEvent.SYNCING,
19+
ObjectsState.SYNCED to ObjectsStateEvent.SYNCED
2320
)
2421

25-
public fun interface ObjectsStateListener {
26-
public fun onStateChanged(state: ObjectsEvent)
27-
}
28-
29-
internal class ObjectsStateEmitter : EventEmitter<ObjectsEvent, ObjectsStateListener>() {
30-
override fun apply(listener: ObjectsStateListener?, event: ObjectsEvent?, vararg args: Any?) {
22+
internal class ObjectsStateEmitter : EventEmitter<ObjectsStateEvent, ObjectsStateListener>() {
23+
override fun apply(listener: ObjectsStateListener?, event: ObjectsStateEvent?, vararg args: Any?) {
3124
listener?.onStateChanged(event!!)
3225
}
3326
}

live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ internal val String.byteSize: Int
5858
* @param callback The callback to invoke with the operation result or error
5959
* @param block The suspend function to execute that returns a value of type T
6060
*/
61-
internal fun <T> CoroutineScope.with(callback: Callback<T>, block: suspend () -> T) {
61+
internal fun <T> CoroutineScope.launchWithCallback(callback: Callback<T>, block: suspend () -> T) {
6262
launch {
6363
try {
6464
val result = block()

0 commit comments

Comments
 (0)