Skip to content

Commit e8bba9e

Browse files
authored
Merge pull request #1139 from ably/feature/object-subscriptions-refactored
[ECO-5458][LiveObjects] Implement object subscriptions
2 parents da85ed6 + 4f1fc2d commit e8bba9e

30 files changed

Lines changed: 796 additions & 112 deletions

lib/src/main/java/io/ably/lib/objects/LiveObjects.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.ably.lib.objects;
22

33
import io.ably.lib.objects.state.ObjectsStateChange;
4+
import io.ably.lib.objects.type.counter.LiveCounter;
5+
import io.ably.lib.objects.type.map.LiveMap;
46
import org.jetbrains.annotations.Blocking;
57
import org.jetbrains.annotations.NonBlocking;
68
import org.jetbrains.annotations.NotNull;

lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
* s.unsubscribe();
1212
* }
1313
* </pre>
14+
* Spec: RTLO4b5
1415
*/
1516
public interface ObjectsSubscription {
1617
/**
1718
* This method should be called when the subscription is no longer needed,
1819
* it will make sure no further events will be sent to the subscriber and
1920
* that references to the subscriber are cleaned up.
21+
* Spec: RTLO4b5a
2022
*/
2123
void unsubscribe();
2224
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.ably.lib.objects.type;
2+
3+
import org.jetbrains.annotations.Nullable;
4+
5+
/**
6+
* Abstract base class for all LiveObject update notifications.
7+
* Provides common structure for updates that occur on LiveMap and LiveCounter objects.
8+
* Contains the update data that describes what changed in the live object.
9+
* Spec: RTLO4b4
10+
*/
11+
public abstract class LiveObjectUpdate {
12+
/**
13+
* The update data containing details about the change that occurred
14+
* Spec: RTLO4b4a
15+
*/
16+
@Nullable
17+
protected final Object update;
18+
19+
/**
20+
* Creates a LiveObjectUpdate with the specified update data.
21+
*
22+
* @param update the data describing the change, or null for no-op updates
23+
*/
24+
protected LiveObjectUpdate(@Nullable Object update) {
25+
this.update = update;
26+
}
27+
}

lib/src/main/java/io/ably/lib/objects/LiveCounter.java renamed to lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
package io.ably.lib.objects;
1+
package io.ably.lib.objects.type.counter;
22

3+
import io.ably.lib.objects.ObjectsCallback;
34
import org.jetbrains.annotations.Blocking;
45
import org.jetbrains.annotations.NonBlocking;
56
import org.jetbrains.annotations.NotNull;
@@ -10,7 +11,7 @@
1011
* It allows incrementing, decrementing, and retrieving the current value of the counter,
1112
* both synchronously and asynchronously.
1213
*/
13-
public interface LiveCounter {
14+
public interface LiveCounter extends LiveCounterChange {
1415

1516
/**
1617
* Increments the value of the counter by 1.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.ably.lib.objects.type.counter;
2+
3+
import io.ably.lib.objects.ObjectsSubscription;
4+
import org.jetbrains.annotations.NonBlocking;
5+
import org.jetbrains.annotations.NotNull;
6+
7+
/**
8+
* Provides methods to subscribe to real-time updates on LiveCounter objects.
9+
* Enables clients to receive notifications when counter values change due to
10+
* operations performed by any client connected to the same channel.
11+
*/
12+
public interface LiveCounterChange {
13+
14+
/**
15+
* Subscribes to real-time updates on this LiveCounter object.
16+
* Multiple listeners can be subscribed to the same object independently.
17+
* Spec: RTLO4b
18+
*
19+
* @param listener the listener to be notified of counter updates
20+
* @return an ObjectsSubscription for managing this specific listener
21+
*/
22+
@NonBlocking
23+
@NotNull ObjectsSubscription subscribe(@NotNull Listener listener);
24+
25+
/**
26+
* Unsubscribes a specific listener from receiving updates.
27+
* Has no effect if the listener is not currently subscribed.
28+
* Spec: RTLO4c
29+
*
30+
* @param listener the listener to be unsubscribed
31+
*/
32+
@NonBlocking
33+
void unsubscribe(@NotNull Listener listener);
34+
35+
/**
36+
* Unsubscribes all listeners from receiving updates.
37+
* No notifications will be delivered until new listeners are subscribed.
38+
* Spec: RTLO4d
39+
*/
40+
@NonBlocking
41+
void unsubscribeAll();
42+
43+
/**
44+
* Listener interface for receiving LiveCounter updates.
45+
* Spec: RTLO4b3
46+
*/
47+
interface Listener {
48+
/**
49+
* Called when the LiveCounter has been updated.
50+
* Should execute quickly as it's called from the real-time processing thread.
51+
*
52+
* @param update details about the counter change
53+
*/
54+
void onUpdated(@NotNull LiveCounterUpdate update);
55+
}
56+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package io.ably.lib.objects.type.counter;
2+
3+
import io.ably.lib.objects.type.LiveObjectUpdate;
4+
import org.jetbrains.annotations.NotNull;
5+
6+
/**
7+
* Represents an update that occurred on a LiveCounter object.
8+
* Contains information about counter value changes from increment/decrement operations.
9+
* Updates can represent positive changes (increments) or negative changes (decrements).
10+
*
11+
* @spec RTLC11, RTLC11a - LiveCounter update structure and behavior
12+
*/
13+
public class LiveCounterUpdate extends LiveObjectUpdate {
14+
15+
/**
16+
* Creates a no-op LiveCounterUpdate representing no actual change.
17+
*/
18+
public LiveCounterUpdate() {
19+
super(null);
20+
}
21+
22+
/**
23+
* Creates a LiveCounterUpdate with the specified amount change.
24+
*
25+
* @param amount the amount by which the counter changed (positive = increment, negative = decrement)
26+
*/
27+
public LiveCounterUpdate(@NotNull Double amount) {
28+
super(new Update(amount));
29+
}
30+
31+
/**
32+
* Gets the update information containing the amount of change.
33+
*
34+
* @return the Update object with the counter modification amount
35+
*/
36+
@NotNull
37+
public LiveCounterUpdate.Update getUpdate() {
38+
return (Update) update;
39+
}
40+
41+
/**
42+
* Returns a string representation of this LiveCounterUpdate.
43+
*
44+
* @return a string showing the amount of change to the counter
45+
*/
46+
@Override
47+
public String toString() {
48+
if (update == null) {
49+
return "LiveCounterUpdate{no change}";
50+
}
51+
return "LiveCounterUpdate{amount=" + getUpdate().getAmount() + "}";
52+
}
53+
54+
/**
55+
* Contains the specific details of a counter update operation.
56+
*
57+
* @spec RTLC11b, RTLC11b1 - Counter update data structure
58+
*/
59+
public static class Update {
60+
private final @NotNull Double amount;
61+
62+
/**
63+
* Creates an Update with the specified amount.
64+
*
65+
* @param amount the counter change amount (positive = increment, negative = decrement)
66+
*/
67+
public Update(@NotNull Double amount) {
68+
this.amount = amount;
69+
}
70+
71+
/**
72+
* Gets the amount by which the counter value was modified.
73+
*
74+
* @return the change amount (positive for increments, negative for decrements)
75+
*/
76+
public @NotNull Double getAmount() {
77+
return amount;
78+
}
79+
}
80+
}

lib/src/main/java/io/ably/lib/objects/LiveMap.java renamed to lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
package io.ably.lib.objects;
1+
package io.ably.lib.objects.type.map;
22

3+
import io.ably.lib.objects.ObjectsCallback;
34
import org.jetbrains.annotations.Blocking;
45
import org.jetbrains.annotations.NonBlocking;
56
import org.jetbrains.annotations.Contract;
@@ -13,7 +14,7 @@
1314
* The LiveMap interface provides methods to interact with a live, real-time map structure.
1415
* It supports both synchronous and asynchronous operations for managing key-value pairs.
1516
*/
16-
public interface LiveMap {
17+
public interface LiveMap extends LiveMapChange {
1718

1819
/**
1920
* Retrieves the value associated with the specified key.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.ably.lib.objects.type.map;
2+
3+
import io.ably.lib.objects.ObjectsSubscription;
4+
import org.jetbrains.annotations.NonBlocking;
5+
import org.jetbrains.annotations.NotNull;
6+
7+
/**
8+
* Provides methods to subscribe to real-time updates on LiveMap objects.
9+
* Enables clients to receive notifications when map entries are added, updated, or removed.
10+
* Uses last-write-wins conflict resolution when multiple clients modify the same key.
11+
*/
12+
public interface LiveMapChange {
13+
14+
/**
15+
* Subscribes to real-time updates on this LiveMap object.
16+
* Multiple listeners can be subscribed to the same object independently.
17+
* Spec: RTLO4b
18+
*
19+
* @param listener the listener to be notified of map updates
20+
* @return an ObjectsSubscription for managing this specific listener
21+
*/
22+
@NonBlocking
23+
@NotNull ObjectsSubscription subscribe(@NotNull Listener listener);
24+
25+
/**
26+
* Unsubscribes a specific listener from receiving updates.
27+
* Has no effect if the listener is not currently subscribed.
28+
* Spec: RTLO4c
29+
*
30+
* @param listener the listener to be unsubscribed
31+
*/
32+
@NonBlocking
33+
void unsubscribe(@NotNull Listener listener);
34+
35+
/**
36+
* Unsubscribes all listeners from receiving updates.
37+
* No notifications will be delivered until new listeners are subscribed.
38+
* Spec: RTLO4d
39+
*/
40+
@NonBlocking
41+
void unsubscribeAll();
42+
43+
/**
44+
* Listener interface for receiving LiveMap updates.
45+
* Spec: RTLO4b3
46+
*/
47+
interface Listener {
48+
/**
49+
* Called when the LiveMap has been updated.
50+
* Should execute quickly as it's called from the real-time processing thread.
51+
*
52+
* @param update details about which keys were modified and how
53+
*/
54+
void onUpdated(@NotNull LiveMapUpdate update);
55+
}
56+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package io.ably.lib.objects.type.map;
2+
3+
import io.ably.lib.objects.type.LiveObjectUpdate;
4+
import org.jetbrains.annotations.NotNull;
5+
6+
import java.util.Map;
7+
8+
/**
9+
* Represents an update that occurred on a LiveMap object.
10+
* Contains information about which keys were modified and whether they were updated or removed.
11+
*
12+
* @spec RTLM18, RTLM18a - LiveMap update structure and behavior
13+
*/
14+
public class LiveMapUpdate extends LiveObjectUpdate {
15+
16+
/**
17+
* Creates a no-op LiveMapUpdate representing no actual change.
18+
*/
19+
public LiveMapUpdate() {
20+
super(null);
21+
}
22+
23+
/**
24+
* Creates a LiveMapUpdate with the specified key changes.
25+
*
26+
* @param update map of key names to their change types (UPDATED or REMOVED)
27+
*/
28+
public LiveMapUpdate(@NotNull Map<String, Change> update) {
29+
super(update);
30+
}
31+
32+
/**
33+
* Gets the map of key changes that occurred in this update.
34+
*
35+
* @return map of key names to their change types
36+
*/
37+
@NotNull
38+
public Map<String, Change> getUpdate() {
39+
return (Map<String, Change>) update;
40+
}
41+
42+
/**
43+
* Returns a string representation of this LiveMapUpdate.
44+
*
45+
* @return a string showing the map key changes in this update
46+
*/
47+
@Override
48+
public String toString() {
49+
if (update == null) {
50+
return "LiveMapUpdate{no change}";
51+
}
52+
return "LiveMapUpdate{changes=" + getUpdate() + "}";
53+
}
54+
55+
/**
56+
* Indicates the type of change that occurred to a map key.
57+
*
58+
* @spec RTLM18b - Map change types
59+
*/
60+
public enum Change {
61+
/** The key was added or its value was modified */
62+
UPDATED,
63+
/** The key was removed from the map */
64+
REMOVED
65+
}
66+
}

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package io.ably.lib.objects
22

33
import io.ably.lib.objects.state.ObjectsStateChange
44
import io.ably.lib.objects.state.ObjectsStateEvent
5+
import io.ably.lib.objects.type.counter.LiveCounter
6+
import io.ably.lib.objects.type.map.LiveMap
57
import io.ably.lib.realtime.ChannelState
68
import io.ably.lib.types.AblyException
79
import io.ably.lib.types.ProtocolMessage
@@ -125,7 +127,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
125127
* @spec OM2 - Populates missing fields from parent protocol message
126128
*/
127129
private fun initializeHandlerForIncomingObjectMessages(): Job {
128-
return sequentialScope.launch {
130+
return sequentialScope.launch {
129131
objectsEventBus.collect { protocolMessage ->
130132
// OM2 - Populate missing fields from parent
131133
val objects = protocolMessage.state.filterIsInstance<ObjectMessage>()

0 commit comments

Comments
 (0)