Skip to content

Commit 838bf14

Browse files
committed
[ECO-5426] Added spec annotation comments for LiveMap, LiveCounter and LiveObjects
1 parent 4c8a3bc commit 838bf14

4 files changed

Lines changed: 87 additions & 60 deletions

File tree

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
4444
private val syncObjectsDataPool = ConcurrentHashMap<String, ObjectState>()
4545
private var currentSyncId: String? = null
4646
/**
47-
* @spec RTO5 - Buffered object operations during sync
47+
* @spec RTO7 - Buffered object operations during sync
4848
*/
49-
private val bufferedObjectOperations = mutableListOf<ObjectMessage>()
49+
private val bufferedObjectOperations = mutableListOf<ObjectMessage>() // RTO7a
5050

5151
/**
5252
* @spec RTO1 - Returns the root LiveMap object with proper validation and sync waiting
@@ -130,18 +130,21 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
130130
/**
131131
* Handles object messages (non-sync messages).
132132
*
133-
* @spec RTO5 - Buffers messages if not synced, applies immediately if synced
133+
* @spec RTO8 - Buffers messages if not synced, applies immediately if synced
134134
*/
135135
private fun handleObjectMessages(objectMessages: List<ObjectMessage>) {
136136
if (state != ObjectsState.SYNCED) {
137-
// Buffer messages if not synced yet
137+
// RTO7 - The client receives object messages in realtime over the channel concurrently with the sync sequence.
138+
// Some of the incoming object messages may have already been applied to the objects described in
139+
// the sync sequence, but others may not; therefore we must buffer these messages so that we can apply
140+
// them to the objects once the sync is complete.
138141
Log.v(tag, "Buffering ${objectMessages.size} object messages, state: $state")
139-
bufferedObjectOperations.addAll(objectMessages)
142+
bufferedObjectOperations.addAll(objectMessages) // RTO8a
140143
return
141144
}
142145

143146
// Apply messages immediately if synced
144-
applyObjectMessages(objectMessages)
147+
applyObjectMessages(objectMessages) // RTO8b
145148
}
146149

147150
/**
@@ -154,7 +157,7 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
154157
val newSyncSequence = currentSyncId != syncId
155158
if (newSyncSequence) {
156159
// RTO5a2 - new sync sequence started
157-
startNewSync(syncId) // RTO5a2a
160+
startNewSync(syncId)
158161
}
159162

160163
// RTO5a3 - continue current sync sequence
@@ -198,8 +201,8 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
198201
Log.v(tag, "Starting new sync sequence: syncId=$syncId")
199202

200203
// need to discard all buffered object operation messages on new sync start
201-
bufferedObjectOperations.clear()
202-
syncObjectsDataPool.clear()
204+
bufferedObjectOperations.clear() // RTO5a2b
205+
syncObjectsDataPool.clear() // RTO5a2a
203206
currentSyncId = syncId
204207
stateChange(ObjectsState.SYNCING, false)
205208
}
@@ -214,9 +217,9 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
214217
applySync()
215218
// should apply buffered object operations after we applied the sync.
216219
// can use regular non-sync object.operation logic
217-
applyObjectMessages(bufferedObjectOperations)
220+
applyObjectMessages(bufferedObjectOperations) // RTO5c6
218221

219-
bufferedObjectOperations.clear()
222+
bufferedObjectOperations.clear() // RTO5c5
220223
syncObjectsDataPool.clear() // RTO5c4
221224
currentSyncId = null // RTO5c3
222225
stateChange(ObjectsState.SYNCED, deferStateEvent)
@@ -264,19 +267,26 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
264267
/**
265268
* Applies object messages to objects.
266269
*
267-
* @spec RTO6 - Creates zero-value objects if they don't exist
270+
* @spec RTO9 - Creates zero-value objects if they don't exist
268271
*/
269272
private fun applyObjectMessages(objectMessages: List<ObjectMessage>) {
273+
// RTO9a
270274
for (objectMessage in objectMessages) {
271275
if (objectMessage.operation == null) {
276+
// RTO9a1
272277
Log.w(tag, "Object message received without operation field, skipping message: ${objectMessage.id}")
273278
continue
274279
}
275280

276-
val objectOperation: ObjectOperation = objectMessage.operation
277-
// RTO6a - get or create the zero value object in the pool
278-
val obj = objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId)
279-
obj.applyOperation(objectOperation, objectMessage)
281+
val objectOperation: ObjectOperation = objectMessage.operation // RTO9a2
282+
// RTO9a2a - we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
283+
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
284+
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
285+
// since they need to be able to eventually initialize themselves from that *_CREATE op.
286+
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
287+
// and then we can always apply the operation on the existing object in the pool.
288+
val obj = objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId) // RTO9a2a1
289+
obj.applyOperation(objectOperation, objectMessage) // RTO9a2a2, RTO9a2a3
280290
}
281291
}
282292

live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,23 @@ import io.ably.lib.util.Log
1212
* Base implementation of LiveObject interface.
1313
* Provides common functionality for all live objects.
1414
*
15-
* @spec RTLM1/RTLC1 - Base class for LiveMap/LiveCounter objects
15+
* @spec RTLO1/RTLO2 - Base class for LiveMap/LiveCounter objects
1616
*/
1717
internal abstract class BaseLiveObject(
18-
protected val objectId: String,
18+
protected val objectId: String, // // RTLO3a
1919
protected val adapter: LiveObjectsAdapter
2020
) {
2121

2222
protected open val tag = "BaseLiveObject"
2323
internal var isTombstoned = false
2424
internal var tombstonedAt: Long? = null
2525

26-
/**
27-
* @spec RTLM6/RTLC6 - Map of serials keyed by site code for LiveMap/LiveCounter
28-
*/
29-
protected val siteTimeserials = mutableMapOf<String, String>()
26+
protected val siteTimeserials = mutableMapOf<String, String>() // RTLO3b
3027

3128
/**
32-
* @spec RTLM6/RTLC6 - Flag to track if create operation has been merged for LiveMap/LiveCounter
29+
* @spec RTLO3 - Flag to track if create operation has been merged for LiveMap/LiveCounter
3330
*/
34-
protected var createOperationIsMerged = false
31+
protected var createOperationIsMerged = false // RTLO3c
3532

3633
fun notifyUpdated(update: Any) {
3734
// TODO: Implement event emission for updates
@@ -41,17 +38,17 @@ internal abstract class BaseLiveObject(
4138
/**
4239
* Checks if an operation can be applied based on serial comparison.
4340
*
44-
* @spec RTLM9/RTLC9 - Serial comparison logic for LiveMap/LiveCounter operations
41+
* @spec RTLO4a - Serial comparison logic for LiveMap/LiveCounter operations
4542
*/
4643
protected fun canApplyOperation(siteCode: String?, serial: String?): Boolean {
4744
if (serial.isNullOrEmpty()) {
48-
throw objectError("Invalid serial: $serial")
45+
throw objectError("Invalid serial: $serial") // RTLO4a3
4946
}
5047
if (siteCode.isNullOrEmpty()) {
51-
throw objectError("Invalid site code: $siteCode")
48+
throw objectError("Invalid site code: $siteCode") // RTLO4a3
5249
}
53-
val existingSiteSerial = siteTimeserials[siteCode]
54-
return existingSiteSerial == null || serial > existingSiteSerial
50+
val existingSiteSerial = siteTimeserials[siteCode] // RTLO4a4
51+
return existingSiteSerial == null || serial > existingSiteSerial // RTLO4a5, RTLO4a6
5552
}
5653

5754
/**

live-objects/src/main/kotlin/io/ably/lib/objects/type/DefaultLiveCounter.kt

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import io.ably.lib.objects.ObjectMessage
88
import io.ably.lib.objects.ObjectOperation
99
import io.ably.lib.objects.ObjectOperationAction
1010
import io.ably.lib.objects.ObjectState
11-
import io.ably.lib.objects.ObjectsPool
1211
import io.ably.lib.objects.ablyException
1312
import io.ably.lib.objects.objectError
1413
import io.ably.lib.types.AblyException
@@ -23,7 +22,7 @@ import io.ably.lib.util.Log
2322
internal class DefaultLiveCounter(
2423
objectId: String,
2524
adapter: LiveObjectsAdapter,
26-
) : BaseLiveObject(objectId, adapter), LiveCounter {
25+
) : LiveCounter, BaseLiveObject(objectId, adapter) {
2726

2827
override val tag = "LiveCounter"
2928
/**
@@ -86,6 +85,7 @@ internal class DefaultLiveCounter(
8685
val opSiteCode = message.siteCode
8786

8887
if (!canApplyOperation(opSiteCode, opSerial)) {
88+
// RTLC7b
8989
Log.v(
9090
tag,
9191
"Skipping ${operation.action} op: op serial $opSerial <= site serial ${siteTimeserials[opSiteCode]}; objectId=$objectId"
@@ -94,24 +94,24 @@ internal class DefaultLiveCounter(
9494
}
9595
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
9696
// as it's important to mark that the op was processed by the object
97-
updateTimeSerial(opSiteCode!!, opSerial!!)
97+
updateTimeSerial(opSiteCode!!, opSerial!!) // RTLC7c
9898

9999
if (isTombstoned) {
100100
// this object is tombstoned so the operation cannot be applied
101101
return;
102102
}
103103

104104
val update = when (operation.action) {
105-
ObjectOperationAction.CounterCreate -> applyCounterCreate(operation)
105+
ObjectOperationAction.CounterCreate -> applyCounterCreate(operation) // RTLC7d1
106106
ObjectOperationAction.CounterInc -> {
107107
if (operation.counterOp != null) {
108-
applyCounterInc(operation.counterOp)
108+
applyCounterInc(operation.counterOp) // RTLC7d2
109109
} else {
110110
throw payloadError(operation)
111111
}
112112
}
113113
ObjectOperationAction.ObjectDelete -> applyObjectDelete()
114-
else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}")
114+
else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3
115115
}
116116

117117
notifyUpdated(update)
@@ -124,40 +124,44 @@ internal class DefaultLiveCounter(
124124
}
125125

126126
/**
127-
* @spec RTLC6d - Merges initial data from create operation
127+
* @spec RTLC8 - Applies counter create operation
128128
*/
129129
private fun applyCounterCreate(operation: ObjectOperation): Map<String, Long> {
130130
if (createOperationIsMerged) {
131+
// RTLC8b
132+
// There can't be two different create operation for the same object id, because the object id
133+
// fully encodes that operation. This means we can safely ignore any new incoming create operations
134+
// if we already merged it once.
131135
Log.v(
132136
tag,
133137
"Skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=$objectId"
134138
)
135139
return mapOf()
136140
}
137141

138-
return mergeInitialDataFromCreateOperation(operation)
142+
return mergeInitialDataFromCreateOperation(operation) // RTLC8c
139143
}
140144

141145
/**
142-
* @spec RTLC8 - Applies counter increment operation
146+
* @spec RTLC9 - Applies counter increment operation
143147
*/
144148
private fun applyCounterInc(counterOp: ObjectCounterOp): Map<String, Long> {
145149
val amount = counterOp.amount?.toLong() ?: 0
146-
data += amount
150+
data += amount // RTLC9b
147151
return mapOf("amount" to amount)
148152
}
149153

150154
/**
151-
* @spec RTLC6d - Merges initial data from create operation
155+
* @spec RTLC10 - Merges initial data from create operation
152156
*/
153157
private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): Map<String, Long> {
154158
// if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case.
155159
// note that it is intentional to SUM the incoming count from the create op.
156160
// if we got here, it means that current counter instance is missing the initial value in its data reference,
157161
// which we're going to add now.
158162
val count = operation.counter?.count?.toLong() ?: 0
159-
data += count // RTLC6d1
160-
createOperationIsMerged = true // RTLC6d2
163+
data += count // RTLC10a
164+
createOperationIsMerged = true // RTLC10b
161165
return mapOf("amount" to count)
162166
}
163167

@@ -186,8 +190,12 @@ internal class DefaultLiveCounter(
186190
TODO("Not yet implemented")
187191
}
188192

193+
/**
194+
* @spec RTLC5 - Returns the current counter value
195+
*/
189196
override fun value(): Long {
190-
TODO("Not yet implemented")
197+
// RTLC5a, RTLC5b - Configuration validation would be done here
198+
return data // RTLC5c
191199
}
192200

193201
companion object {

0 commit comments

Comments
 (0)