Skip to content

Commit 59bf9b3

Browse files
committed
[ECO-5458] Enhanced LiveMap with subscription support
- Modified DefaultLiveMap to integrate with LiveMapChangeCoordinator - Enhanced LiveMapManager to return LiveMapUpdate objects - Implemented comprehensive map change notification system
1 parent 608a688 commit 59bf9b3

2 files changed

Lines changed: 58 additions & 27 deletions

File tree

live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,13 @@ import io.ably.lib.objects.ObjectMessage
66
import io.ably.lib.objects.ObjectOperation
77
import io.ably.lib.objects.ObjectState
88
import io.ably.lib.objects.type.BaseLiveObject
9+
import io.ably.lib.objects.type.LiveObjectUpdate
910
import io.ably.lib.objects.type.ObjectType
11+
import io.ably.lib.objects.type.map.LiveMap
12+
import io.ably.lib.objects.type.map.LiveMapChange
13+
import io.ably.lib.objects.type.map.LiveMapUpdate
14+
import io.ably.lib.objects.type.noOp
15+
import io.ably.lib.util.Log
1016
import java.util.concurrent.ConcurrentHashMap
1117
import java.util.AbstractMap
1218

@@ -103,19 +109,36 @@ internal class DefaultLiveMap private constructor(
103109

104110
override fun validate(state: ObjectState) = liveMapManager.validate(state)
105111

106-
override fun applyObjectState(objectState: ObjectState): Map<String, String> {
112+
override fun subscribe(listener: LiveMapChange.Listener): ObjectsSubscription {
113+
adapter.throwIfInvalidAccessApiConfiguration(channelName)
114+
return liveMapManager.subscribe(listener)
115+
}
116+
117+
override fun unsubscribe(listener: LiveMapChange.Listener) = liveMapManager.unsubscribe(listener)
118+
119+
override fun unsubscribeAll() = liveMapManager.unsubscribeAll()
120+
121+
override fun applyObjectState(objectState: ObjectState): LiveMapUpdate {
107122
return liveMapManager.applyState(objectState)
108123
}
109124

110125
override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
111126
liveMapManager.applyOperation(operation, message.serial)
112127
}
113128

114-
override fun clearData(): Map<String, String> {
129+
override fun clearData(): LiveMapUpdate {
115130
return liveMapManager.calculateUpdateFromDataDiff(data.toMap(), emptyMap())
116131
.apply { data.clear() }
117132
}
118133

134+
override fun notifyUpdated(update: LiveObjectUpdate) {
135+
if (update.noOp) {
136+
return
137+
}
138+
Log.v(tag, "Object $objectId updated: $update")
139+
liveMapManager.notify(update as LiveMapUpdate)
140+
}
141+
119142
override fun onGCInterval() {
120143
data.entries.removeIf { (_, entry) -> entry.isEligibleForGc() }
121144
}

live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,20 @@ import io.ably.lib.objects.ObjectOperationAction
77
import io.ably.lib.objects.ObjectState
88
import io.ably.lib.objects.isInvalid
99
import io.ably.lib.objects.objectError
10+
import io.ably.lib.objects.type.map.LiveMapUpdate
11+
import io.ably.lib.objects.type.noOp
1012
import io.ably.lib.util.Log
1113

12-
internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
14+
internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChangeCoordinator() {
15+
1316
private val objectId = liveMap.objectId
1417

1518
private val tag = "LiveMapManager"
1619

1720
/**
1821
* @spec RTLM6 - Overrides object data with state from sync
1922
*/
20-
internal fun applyState(objectState: ObjectState): Map<String, String> {
23+
internal fun applyState(objectState: ObjectState): LiveMapUpdate {
2124
val previousData = liveMap.data.toMap()
2225

2326
if (objectState.tombstone) {
@@ -69,13 +72,13 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
6972
else -> throw objectError("Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4
7073
}
7174

72-
liveMap.notifyUpdated(update)
75+
liveMap.notifyUpdated(update) // RTLM15d1a, RTLM15d2a, RTLM15d3a
7376
}
7477

7578
/**
7679
* @spec RTLM16 - Applies map create operation
7780
*/
78-
private fun applyMapCreate(operation: ObjectOperation): Map<String, String> {
81+
private fun applyMapCreate(operation: ObjectOperation): LiveMapUpdate {
7982
if (liveMap.createOperationIsMerged) {
8083
// RTLM16b
8184
// There can't be two different create operation for the same object id, because the object id
@@ -85,7 +88,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
8588
tag,
8689
"Skipping applying MAP_CREATE op on a map instance as it was already applied before; objectId=${objectId}"
8790
)
88-
return mapOf()
91+
return noOpMapUpdate
8992
}
9093

9194
validateMapSemantics(operation.map?.semantics) // RTLM16c
@@ -99,7 +102,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
99102
private fun applyMapSet(
100103
mapOp: ObjectMapOp, // RTLM7d1
101104
timeSerial: String?, // RTLM7d2
102-
): Map<String, String> {
105+
): LiveMapUpdate {
103106
val existingEntry = liveMap.data[mapOp.key]
104107

105108
// RTLM7a
@@ -109,7 +112,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
109112
"Skipping update for key=\"${mapOp.key}\": op serial $timeSerial <= entry serial ${existingEntry.timeserial};" +
110113
" objectId=${objectId}"
111114
)
112-
return mapOf()
115+
return noOpMapUpdate
113116
}
114117

115118
if (mapOp.data.isInvalid()) {
@@ -142,7 +145,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
142145
)
143146
}
144147

145-
return mapOf(mapOp.key to "updated")
148+
return LiveMapUpdate(mapOf(mapOp.key to LiveMapUpdate.Change.UPDATED))
146149
}
147150

148151
/**
@@ -151,7 +154,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
151154
private fun applyMapRemove(
152155
mapOp: ObjectMapOp, // RTLM8c1
153156
timeSerial: String?, // RTLM8c2
154-
): Map<String, String> {
157+
): LiveMapUpdate {
155158
val existingEntry = liveMap.data[mapOp.key]
156159

157160
// RTLM8a
@@ -162,7 +165,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
162165
"Skipping remove for key=\"${mapOp.key}\": op serial $timeSerial <= entry serial ${existingEntry.timeserial}; " +
163166
"objectId=${objectId}"
164167
)
165-
return mapOf()
168+
return noOpMapUpdate
166169
}
167170

168171
if (existingEntry != null) {
@@ -182,7 +185,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
182185
)
183186
}
184187

185-
return mapOf(mapOp.key to "removed")
188+
return LiveMapUpdate(mapOf(mapOp.key to LiveMapUpdate.Change.REMOVED))
186189
}
187190

188191
/**
@@ -206,12 +209,12 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
206209
/**
207210
* @spec RTLM17 - Merges initial data from create operation
208211
*/
209-
private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): Map<String, String> {
212+
private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): LiveMapUpdate {
210213
if (operation.map?.entries.isNullOrEmpty()) { // no map entries in MAP_CREATE op
211-
return mapOf()
214+
return noOpMapUpdate
212215
}
213216

214-
val aggregatedUpdate = mutableMapOf<String, String>()
217+
val aggregatedUpdate = mutableListOf<LiveMapUpdate>()
215218

216219
// RTLM17a
217220
// in order to apply MAP_CREATE op for an existing map, we should merge their underlying entries keys.
@@ -228,25 +231,30 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
228231
}
229232

230233
// skip noop updates
231-
if (update.isEmpty()) {
234+
if (update.noOp) {
232235
return@forEach
233236
}
234237

235-
aggregatedUpdate.putAll(update)
238+
aggregatedUpdate.add(update)
236239
}
237240

238241
liveMap.createOperationIsMerged = true // RTLM17b
239242

240-
return aggregatedUpdate
243+
return LiveMapUpdate(
244+
aggregatedUpdate.map { it.update }.fold(emptyMap()) { acc, map -> acc + map }
245+
)
241246
}
242247

243-
internal fun calculateUpdateFromDataDiff(prevData: Map<String, LiveMapEntry>, newData: Map<String, LiveMapEntry>): Map<String, String> {
244-
val update = mutableMapOf<String, String>()
248+
internal fun calculateUpdateFromDataDiff(
249+
prevData: Map<String, LiveMapEntry>,
250+
newData: Map<String, LiveMapEntry>
251+
): LiveMapUpdate {
252+
val update = mutableMapOf<String, LiveMapUpdate.Change>()
245253

246254
// Check for removed entries
247255
for ((key, prevEntry) in prevData) {
248256
if (!prevEntry.isTombstoned && !newData.containsKey(key)) {
249-
update[key] = "removed"
257+
update[key] = LiveMapUpdate.Change.REMOVED
250258
}
251259
}
252260

@@ -255,7 +263,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
255263
if (!prevData.containsKey(key)) {
256264
// if property does not exist in current map, but new data has it as non-tombstoned property - got updated
257265
if (!newEntry.isTombstoned) {
258-
update[key] = "updated"
266+
update[key] = LiveMapUpdate.Change.UPDATED
259267
}
260268
// otherwise, if new data has this prop tombstoned - do nothing, as property didn't exist anyway
261269
continue
@@ -267,12 +275,12 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
267275
// compare tombstones first
268276
if (prevEntry.isTombstoned && !newEntry.isTombstoned) {
269277
// prev prop is tombstoned, but new is not. it means prop was updated to a meaningful value
270-
update[key] = "updated"
278+
update[key] = LiveMapUpdate.Change.UPDATED
271279
continue
272280
}
273281
if (!prevEntry.isTombstoned && newEntry.isTombstoned) {
274282
// prev prop is not tombstoned, but new is. it means prop was removed
275-
update[key] = "removed"
283+
update[key] = LiveMapUpdate.Change.REMOVED
276284
continue
277285
}
278286
if (prevEntry.isTombstoned && newEntry.isTombstoned) {
@@ -283,12 +291,12 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) {
283291
// both props exist and are not tombstoned, need to compare values to see if it was changed
284292
val valueChanged = prevEntry.data != newEntry.data
285293
if (valueChanged) {
286-
update[key] = "updated"
294+
update[key] = LiveMapUpdate.Change.UPDATED
287295
continue
288296
}
289297
}
290298

291-
return update
299+
return LiveMapUpdate(update)
292300
}
293301

294302
internal fun validate(state: ObjectState) {

0 commit comments

Comments
 (0)