Skip to content

Commit 091560b

Browse files
committed
[ECO-5447] Updated ObjectMessage with server provided serialTimestamp
1. Updated MsgpackSerialization accordingly. 2. Updated LiveMapManagerTests, added few more tests related to changes
1 parent 60ee93c commit 091560b

5 files changed

Lines changed: 250 additions & 27 deletions

File tree

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ internal data class ObjectMapEntry(
125125
*/
126126
val timeserial: String? = null,
127127

128+
/**
129+
* A timestamp from the [timeserial] field. Only present if [tombstone] is `true`
130+
* Spec: OME2d
131+
*/
132+
val serialTimestamp: Long? = null,
133+
128134
/**
129135
* The data that represents the value of the map entry.
130136
* Spec: OME2c
@@ -336,6 +342,12 @@ internal data class ObjectMessage(
336342
*/
337343
val serial: String? = null,
338344

345+
/**
346+
* A timestamp from the [serial] field.
347+
* Spec: OM2j
348+
*/
349+
val serialTimestamp: Long? = null,
350+
339351
/**
340352
* An opaque string used as a key to update the map of serial values on an object.
341353
* Spec: OM2i

live-objects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ internal fun ObjectMessage.writeMsgpack(packer: MessagePacker) {
3838
if (operation != null) fieldCount++
3939
if (objectState != null) fieldCount++
4040
if (serial != null) fieldCount++
41+
if (serialTimestamp != null) fieldCount++
4142
if (siteCode != null) fieldCount++
4243

4344
packer.packMapHeader(fieldCount)
@@ -82,6 +83,11 @@ internal fun ObjectMessage.writeMsgpack(packer: MessagePacker) {
8283
packer.packString(serial)
8384
}
8485

86+
if (serialTimestamp != null) {
87+
packer.packString("serialTimestamp")
88+
packer.packLong(serialTimestamp)
89+
}
90+
8591
if (siteCode != null) {
8692
packer.packString("siteCode")
8793
packer.packString(siteCode)
@@ -107,6 +113,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
107113
var operation: ObjectOperation? = null
108114
var objectState: ObjectState? = null
109115
var serial: String? = null
116+
var serialTimestamp: Long? = null
110117
var siteCode: String? = null
111118

112119
for (i in 0 until fieldCount) {
@@ -127,6 +134,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
127134
"operation" -> operation = readObjectOperation(unpacker)
128135
"object" -> objectState = readObjectState(unpacker)
129136
"serial" -> serial = unpacker.unpackString()
137+
"serialTimestamp" -> serialTimestamp = unpacker.unpackLong()
130138
"siteCode" -> siteCode = unpacker.unpackString()
131139
else -> unpacker.skipValue()
132140
}
@@ -141,6 +149,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
141149
operation = operation,
142150
objectState = objectState,
143151
serial = serial,
152+
serialTimestamp = serialTimestamp,
144153
siteCode = siteCode
145154
)
146155
}
@@ -558,6 +567,7 @@ private fun ObjectMapEntry.writeMsgpack(packer: MessagePacker) {
558567

559568
if (tombstone != null) fieldCount++
560569
if (timeserial != null) fieldCount++
570+
if (serialTimestamp != null) fieldCount++
561571
if (data != null) fieldCount++
562572

563573
packer.packMapHeader(fieldCount)
@@ -572,6 +582,11 @@ private fun ObjectMapEntry.writeMsgpack(packer: MessagePacker) {
572582
packer.packString(timeserial)
573583
}
574584

585+
if (serialTimestamp != null) {
586+
packer.packString("serialTimestamp")
587+
packer.packLong(serialTimestamp)
588+
}
589+
575590
if (data != null) {
576591
packer.packString("data")
577592
data.writeMsgpack(packer)
@@ -586,6 +601,7 @@ private fun readObjectMapEntry(unpacker: MessageUnpacker): ObjectMapEntry {
586601

587602
var tombstone: Boolean? = null
588603
var timeserial: String? = null
604+
var serialTimestamp: Long? = null
589605
var data: ObjectData? = null
590606

591607
for (i in 0 until fieldCount) {
@@ -600,12 +616,13 @@ private fun readObjectMapEntry(unpacker: MessageUnpacker): ObjectMapEntry {
600616
when (fieldName) {
601617
"tombstone" -> tombstone = unpacker.unpackBoolean()
602618
"timeserial" -> timeserial = unpacker.unpackString()
619+
"serialTimestamp" -> serialTimestamp = unpacker.unpackLong()
603620
"data" -> data = readObjectData(unpacker)
604621
else -> unpacker.skipValue()
605622
}
606623
}
607624

608-
return ObjectMapEntry(tombstone = tombstone, timeserial = timeserial, data = data)
625+
return ObjectMapEntry(tombstone = tombstone, timeserial = timeserial, serialTimestamp = serialTimestamp, data = data)
609626
}
610627

611628
/**

live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ internal class DefaultLiveMap private constructor(
124124
}
125125

126126
override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
127-
liveMapManager.applyOperation(operation, message.serial)
127+
liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp)
128128
}
129129

130130
override fun clearData(): LiveMapUpdate {

live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
3333
objectState.map?.entries?.forEach { (key, entry) ->
3434
liveMap.data[key] = LiveMapEntry(
3535
isTombstoned = entry.tombstone ?: false,
36-
tombstonedAt = if (entry.tombstone == true) System.currentTimeMillis() else null,
36+
tombstonedAt = if (entry.tombstone == true) entry.serialTimestamp ?: System.currentTimeMillis() else null,
3737
timeserial = entry.timeserial,
3838
data = entry.data
3939
)
@@ -51,19 +51,19 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
5151
/**
5252
* @spec RTLM15 - Applies operations to LiveMap
5353
*/
54-
internal fun applyOperation(operation: ObjectOperation, messageTimeserial: String?) {
54+
internal fun applyOperation(operation: ObjectOperation, serial: String?, serialTimestamp: Long?) {
5555
val update = when (operation.action) {
5656
ObjectOperationAction.MapCreate -> applyMapCreate(operation) // RTLM15d1
5757
ObjectOperationAction.MapSet -> {
5858
if (operation.mapOp != null) {
59-
applyMapSet(operation.mapOp, messageTimeserial) // RTLM15d2
59+
applyMapSet(operation.mapOp, serial) // RTLM15d2
6060
} else {
6161
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
6262
}
6363
}
6464
ObjectOperationAction.MapRemove -> {
6565
if (operation.mapOp != null) {
66-
applyMapRemove(operation.mapOp, messageTimeserial) // RTLM15d3
66+
applyMapRemove(operation.mapOp, serial, serialTimestamp) // RTLM15d3
6767
} else {
6868
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
6969
}
@@ -132,7 +132,6 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
132132
// RTLM7a2 - Replace existing entry with new one instead of mutating
133133
liveMap.data[mapOp.key] = LiveMapEntry(
134134
isTombstoned = false, // RTLM7a2c
135-
tombstonedAt = null,
136135
timeserial = timeSerial, // RTLM7a2b
137136
data = mapOp.data // RTLM7a2a
138137
)
@@ -154,6 +153,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
154153
private fun applyMapRemove(
155154
mapOp: ObjectMapOp, // RTLM8c1
156155
timeSerial: String?, // RTLM8c2
156+
timeStamp: Long?, // RTLM8c3
157157
): LiveMapUpdate {
158158
val existingEntry = liveMap.data[mapOp.key]
159159

@@ -168,19 +168,28 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
168168
return noOpMapUpdate
169169
}
170170

171+
val tombstonedAt = if (timeStamp != null) timeStamp else {
172+
Log.w(
173+
tag,
174+
"No timestamp provided for MAP_REMOVE op on key=\"${mapOp.key}\"; using current time as tombstone time; " +
175+
"objectId=${objectId}"
176+
)
177+
System.currentTimeMillis()
178+
}
179+
171180
if (existingEntry != null) {
172181
// RTLM8a2 - Replace existing entry with new one instead of mutating
173182
liveMap.data[mapOp.key] = LiveMapEntry(
174183
isTombstoned = true, // RTLM8a2c
175-
tombstonedAt = System.currentTimeMillis(),
184+
tombstonedAt = tombstonedAt,
176185
timeserial = timeSerial, // RTLM8a2b
177186
data = null // RTLM8a2a
178187
)
179188
} else {
180189
// RTLM8b, RTLM8b1
181190
liveMap.data[mapOp.key] = LiveMapEntry(
182191
isTombstoned = true, // RTLM8b2
183-
tombstonedAt = System.currentTimeMillis(),
192+
tombstonedAt = tombstonedAt,
184193
timeserial = timeSerial
185194
)
186195
}
@@ -224,7 +233,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
224233
val opTimeserial = entry.timeserial
225234
val update = if (entry.tombstone == true) {
226235
// RTLM17a2 - entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op
227-
applyMapRemove(ObjectMapOp(key), opTimeserial)
236+
applyMapRemove(ObjectMapOp(key), opTimeserial, entry.serialTimestamp)
228237
} else {
229238
// RTLM17a1 - entry in MAP_CREATE op is not removed, try to set it via MAP_SET op
230239
applyMapSet(ObjectMapOp(key, entry.data), opTimeserial)

0 commit comments

Comments
 (0)