Skip to content

Commit e14f8b0

Browse files
Merge pull request #122 from ably/AIT-454-map-clear
[AIT-454] Implement `MAP_CLEAR` operation
2 parents 30d867b + 99fe9ba commit e14f8b0

10 files changed

Lines changed: 1153 additions & 15 deletions

Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ internal final class InternalDefaultLiveMap: Sendable {
3535
}
3636
}
3737

38+
internal var testsOnly_clearTimeserial: String? {
39+
mutableStateMutex.withSync { mutableState in
40+
mutableState.clearTimeserial
41+
}
42+
}
43+
3844
private let logger: Logger
3945
private let userCallbackQueue: DispatchQueue
4046
private let clock: SimpleClock
@@ -396,6 +402,15 @@ internal final class InternalDefaultLiveMap: Sendable {
396402
}
397403
}
398404

405+
/// Test-only method to apply a MAP_CLEAR operation, per RTLM24.
406+
internal func testsOnly_applyMapClearOperation(serial: String?) -> LiveObjectUpdate<DefaultLiveMapUpdate> {
407+
mutableStateMutex.withSync { mutableState in
408+
mutableState.applyMapClearOperation(
409+
serial: serial,
410+
)
411+
}
412+
}
413+
399414
/// Resets the map's data, per RTO4b2. This is to be used when an `ATTACHED` ProtocolMessage indicates that the only object in a channel is an empty root map.
400415
internal func nosync_resetData() {
401416
mutableStateMutex.withoutSync { mutableState in
@@ -452,6 +467,9 @@ internal final class InternalDefaultLiveMap: Sendable {
452467
/// The "private `semantics` field" of RTO5c1b1b.
453468
internal var semantics: WireEnum<ObjectsMapSemantics>?
454469

470+
/// RTLM25
471+
internal var clearTimeserial: String?
472+
455473
/// Replaces the internal data of this map with the provided ObjectState, per RTLM6.
456474
///
457475
/// - Parameters:
@@ -492,6 +510,9 @@ internal final class InternalDefaultLiveMap: Sendable {
492510
// RTLM6g: Store the current data value as previousData for use in RTLM6h
493511
let previousData = data
494512

513+
// RTLM6i
514+
clearTimeserial = state.map?.clearTimeserial
515+
495516
// RTLM6b: Set the private flag createOperationIsMerged to false
496517
liveObjectMutableState.createOperationIsMerged = false
497518

@@ -702,6 +723,15 @@ internal final class InternalDefaultLiveMap: Sendable {
702723
liveObjectMutableState.emit(.update(.init(update: dataBeforeApplyingOperation.mapValues { _ in .removed })), on: userCallbackQueue)
703724
// RTLM15d5b
704725
return true
726+
case .known(.mapClear):
727+
// RTLM15d8
728+
let update = applyMapClearOperation(
729+
serial: applicableOperation.objectMessageSerial,
730+
)
731+
// RTLM15d8a
732+
liveObjectMutableState.emit(update, on: userCallbackQueue)
733+
// RTLM15d8b
734+
return true
705735
default:
706736
// RTLM15d4
707737
logger.log("Operation \(operation) has unsupported action for LiveMap; discarding", level: .warn)
@@ -720,6 +750,11 @@ internal final class InternalDefaultLiveMap: Sendable {
720750
userCallbackQueue: DispatchQueue,
721751
clock: SimpleClock,
722752
) -> LiveObjectUpdate<DefaultLiveMapUpdate> {
753+
// RTLM7h
754+
if let clearTimeserial, operationTimeserial.map({ $0 <= clearTimeserial }) ?? true {
755+
return .noop
756+
}
757+
723758
// RTLM7a: If an entry exists in the private data for the specified key
724759
if let existingEntry = data[key] {
725760
// RTLM7a1: If the operation cannot be applied as per RTLM9, discard the operation
@@ -762,6 +797,11 @@ internal final class InternalDefaultLiveMap: Sendable {
762797
internal mutating func applyMapRemoveOperation(key: String, operationTimeserial: String?, operationSerialTimestamp: Date?, logger: Logger, clock: SimpleClock) -> LiveObjectUpdate<DefaultLiveMapUpdate> {
763798
// (Note that, where the spec tells us to set ObjectsMapEntry.data to nil, we actually set it to an empty ObjectData, which is equivalent, since it contains no data)
764799

800+
// RTLM8g
801+
if let clearTimeserial, operationTimeserial.map({ $0 <= clearTimeserial }) ?? true {
802+
return .noop
803+
}
804+
765805
// Calculate the tombstonedAt for the new or updated entry per RTLM8f
766806
let tombstonedAt: Date?
767807
if let operationSerialTimestamp {
@@ -869,11 +909,44 @@ internal final class InternalDefaultLiveMap: Sendable {
869909
)
870910
}
871911

912+
/// Applies a `MAP_CLEAR` operation, per RTLM24.
913+
internal mutating func applyMapClearOperation(
914+
serial: String?,
915+
) -> LiveObjectUpdate<DefaultLiveMapUpdate> {
916+
guard let serial else {
917+
return .noop
918+
}
919+
920+
// RTLM24c
921+
if let clearTimeserial, serial <= clearTimeserial {
922+
return .noop
923+
}
924+
925+
// RTLM24d
926+
clearTimeserial = serial
927+
928+
// RTLM24e, RTLM24e1: entry timeserial is nil, or serial > entry timeserial
929+
let keysToRemove = data.filter { _, entry in
930+
guard let entryTimeserial = entry.timeserial else {
931+
return true
932+
}
933+
return serial > entryTimeserial
934+
}.keys
935+
936+
for key in keysToRemove {
937+
data.removeValue(forKey: key)
938+
}
939+
940+
// RTLM24e1b, RTLM24f
941+
let removedKeys = Dictionary(uniqueKeysWithValues: keysToRemove.map { ($0, LiveMapUpdateAction.removed) })
942+
return .update(DefaultLiveMapUpdate(update: removedKeys))
943+
}
944+
872945
/// Resets the map's data and emits a `removed` event for the existing keys, per RTO4b2 and RTO4b2a. This is to be used when an `ATTACHED` ProtocolMessage indicates that the only object in a channel is an empty root map.
873946
internal mutating func resetData(userCallbackQueue: DispatchQueue) {
874947
// RTO4b2
875948
let previousData = data
876-
data = [:]
949+
resetDataToZeroValued()
877950

878951
// RTO4b2a
879952
let mapUpdate = DefaultLiveMapUpdate(update: previousData.mapValues { _ in .removed })
@@ -884,6 +957,7 @@ internal final class InternalDefaultLiveMap: Sendable {
884957
mutating func resetDataToZeroValued() {
885958
// RTLM4
886959
data = [:]
960+
clearTimeserial = nil
887961
}
888962

889963
/// Releases entries that were tombstoned more than `gracePeriod` ago, per RTLM19.

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -957,7 +957,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
957957
switch operation.action {
958958
case let .known(action):
959959
switch action {
960-
case .mapCreate, .mapSet, .mapRemove, .counterCreate, .counterInc, .objectDelete:
960+
case .mapCreate, .mapSet, .mapRemove, .counterCreate, .counterInc, .objectDelete, .mapClear:
961961
// RTO9a2a3
962962
let applied = entry.nosync_apply(
963963
operation,

Sources/AblyLiveObjects/Protocol/ObjectMessage.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ internal struct ObjectOperation: Equatable {
4545
internal var objectDelete: WireObjectDelete? // OOP3o
4646
internal var mapCreateWithObjectId: MapCreateWithObjectId? // OOP3p
4747
internal var counterCreateWithObjectId: CounterCreateWithObjectId? // OOP3q
48+
internal var mapClear: WireMapClear? // OOP3r
4849
}
4950

5051
internal struct ObjectData: Equatable {
@@ -96,6 +97,7 @@ internal struct ObjectsMapEntry: Equatable {
9697
internal struct ObjectsMap: Equatable {
9798
internal var semantics: WireEnum<ObjectsMapSemantics> // OMP3a
9899
internal var entries: [String: ObjectsMapEntry]? // OMP3b
100+
internal var clearTimeserial: String? // OMP3c
99101
}
100102

101103
internal struct ObjectState: Equatable {
@@ -178,6 +180,7 @@ internal extension ObjectOperation {
178180
counterCreate = wireObjectOperation.counterCreate
179181
counterInc = wireObjectOperation.counterInc
180182
objectDelete = wireObjectOperation.objectDelete
183+
mapClear = wireObjectOperation.mapClear
181184
// Outbound-only — do not access on inbound data
182185
mapCreateWithObjectId = nil
183186
counterCreateWithObjectId = nil
@@ -199,6 +202,7 @@ internal extension ObjectOperation {
199202
objectDelete: objectDelete,
200203
mapCreateWithObjectId: mapCreateWithObjectId?.toWire(),
201204
counterCreateWithObjectId: counterCreateWithObjectId?.toWire(),
205+
mapClear: mapClear,
202206
)
203207
}
204208
}
@@ -414,6 +418,7 @@ internal extension ObjectsMap {
414418
entries = try wireObjectsMap.entries?.ablyLiveObjects_mapValuesWithTypedThrow { wireMapEntry throws(ARTErrorInfo) in
415419
try .init(wireObjectsMapEntry: wireMapEntry, format: format)
416420
}
421+
clearTimeserial = wireObjectsMap.clearTimeserial
417422
}
418423

419424
/// Converts this `ObjectsMap` to a `WireObjectsMap`, applying the data encoding rules of OD4.
@@ -424,6 +429,7 @@ internal extension ObjectsMap {
424429
.init(
425430
semantics: semantics,
426431
entries: entries?.mapValues { $0.toWire(format: format) },
432+
clearTimeserial: clearTimeserial,
427433
)
428434
}
429435
}
@@ -520,6 +526,7 @@ extension ObjectOperation: CustomDebugStringConvertible {
520526
if let objectDelete { parts.append("objectDelete: \(objectDelete)") }
521527
if let mapCreateWithObjectId { parts.append("mapCreateWithObjectId: \(mapCreateWithObjectId)") }
522528
if let counterCreateWithObjectId { parts.append("counterCreateWithObjectId: \(counterCreateWithObjectId)") }
529+
if let mapClear { parts.append("mapClear: \(mapClear)") }
523530

524531
return "{ " + parts.joined(separator: ", ") + " }"
525532
}
@@ -553,6 +560,7 @@ extension ObjectsMap: CustomDebugStringConvertible {
553560
.joined(separator: ", ")
554561
parts.append("entries: { \(formattedEntries) }")
555562
}
563+
if let clearTimeserial { parts.append("clearTimeserial: \(clearTimeserial)") }
556564

557565
return "{ " + parts.joined(separator: ", ") + " }"
558566
}

Sources/AblyLiveObjects/Protocol/WireObjectMessage.swift

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ internal enum ObjectOperationAction: Int {
151151
case counterCreate = 3
152152
case counterInc = 4
153153
case objectDelete = 5
154+
case mapClear = 6
154155
}
155156

156157
// OMP2
@@ -169,6 +170,7 @@ internal struct WireObjectOperation {
169170
internal var objectDelete: WireObjectDelete? // OOP3o
170171
internal var mapCreateWithObjectId: WireMapCreateWithObjectId? // OOP3p
171172
internal var counterCreateWithObjectId: WireCounterCreateWithObjectId? // OOP3q
173+
internal var mapClear: WireMapClear? // OOP3r
172174
}
173175

174176
extension WireObjectOperation: WireObjectCodable {
@@ -183,6 +185,7 @@ extension WireObjectOperation: WireObjectCodable {
183185
case objectDelete
184186
case mapCreateWithObjectId
185187
case counterCreateWithObjectId
188+
case mapClear
186189
}
187190

188191
internal init(wireObject: [String: WireValue]) throws(ARTErrorInfo) {
@@ -195,6 +198,7 @@ extension WireObjectOperation: WireObjectCodable {
195198
counterCreate = try wireObject.optionalDecodableValueForKey(WireKey.counterCreate.rawValue)
196199
counterInc = try wireObject.optionalDecodableValueForKey(WireKey.counterInc.rawValue)
197200
objectDelete = try wireObject.optionalDecodableValueForKey(WireKey.objectDelete.rawValue)
201+
mapClear = try wireObject.optionalDecodableValueForKey(WireKey.mapClear.rawValue)
198202
// Outbound-only — do not access on inbound data
199203
mapCreateWithObjectId = nil
200204
counterCreateWithObjectId = nil
@@ -230,6 +234,9 @@ extension WireObjectOperation: WireObjectCodable {
230234
if let counterCreateWithObjectId {
231235
result[WireKey.counterCreateWithObjectId.rawValue] = .object(counterCreateWithObjectId.toWireObject)
232236
}
237+
if let mapClear {
238+
result[WireKey.mapClear.rawValue] = .object(mapClear.toWireObject)
239+
}
233240

234241
return result
235242
}
@@ -292,12 +299,14 @@ extension WireObjectState: WireObjectCodable {
292299
internal struct WireObjectsMap {
293300
internal var semantics: WireEnum<ObjectsMapSemantics> // OMP3a
294301
internal var entries: [String: WireObjectsMapEntry]? // OMP3b
302+
internal var clearTimeserial: String? // OMP3c
295303
}
296304

297305
extension WireObjectsMap: WireObjectCodable {
298306
internal enum WireKey: String {
299307
case semantics
300308
case entries
309+
case clearTimeserial
301310
}
302311

303312
internal init(wireObject: [String: WireValue]) throws(ARTErrorInfo) {
@@ -308,6 +317,7 @@ extension WireObjectsMap: WireObjectCodable {
308317
}
309318
return try WireObjectsMapEntry(wireObject: object)
310319
}
320+
clearTimeserial = try wireObject.optionalStringValueForKey(WireKey.clearTimeserial.rawValue)
311321
}
312322

313323
internal var toWireObject: [String: WireValue] {
@@ -318,6 +328,9 @@ extension WireObjectsMap: WireObjectCodable {
318328
if let entries {
319329
result[WireKey.entries.rawValue] = .object(entries.mapValues { .object($0.toWireObject) })
320330
}
331+
if let clearTimeserial {
332+
result[WireKey.clearTimeserial.rawValue] = .string(clearTimeserial)
333+
}
321334

322335
return result
323336
}
@@ -484,6 +497,20 @@ extension WireObjectDelete: WireObjectCodable {
484497
}
485498
}
486499

500+
internal struct WireMapClear: Equatable {
501+
// Empty struct
502+
}
503+
504+
extension WireMapClear: WireObjectCodable {
505+
internal init(wireObject _: [String: WireValue]) throws(ARTErrorInfo) {
506+
// No fields to decode
507+
}
508+
509+
internal var toWireObject: [String: WireValue] {
510+
[:]
511+
}
512+
}
513+
487514
internal struct WireMapCreateWithObjectId: Equatable {
488515
internal var initialValue: String // MCRO2a
489516
internal var nonce: String // MCRO2b

Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ struct TestFactories {
346346
objectDelete: WireObjectDelete? = nil,
347347
mapCreateWithObjectId: MapCreateWithObjectId? = nil,
348348
counterCreateWithObjectId: CounterCreateWithObjectId? = nil,
349+
mapClear: WireMapClear? = nil,
349350
) -> ObjectOperation {
350351
ObjectOperation(
351352
action: action,
@@ -358,6 +359,7 @@ struct TestFactories {
358359
objectDelete: objectDelete,
359360
mapCreateWithObjectId: mapCreateWithObjectId,
360361
counterCreateWithObjectId: counterCreateWithObjectId,
362+
mapClear: mapClear,
361363
)
362364
}
363365

@@ -533,10 +535,12 @@ struct TestFactories {
533535
static func objectsMap(
534536
semantics: WireEnum<ObjectsMapSemantics> = .known(.lww),
535537
entries: [String: ObjectsMapEntry]? = nil,
538+
clearTimeserial: String? = nil,
536539
) -> ObjectsMap {
537540
ObjectsMap(
538541
semantics: semantics,
539542
entries: entries,
543+
clearTimeserial: clearTimeserial,
540544
)
541545
}
542546

@@ -599,6 +603,23 @@ struct TestFactories {
599603
)
600604
}
601605

606+
/// Creates an InboundObjectMessage with a MAP_CLEAR operation
607+
static func mapClearOperationMessage(
608+
objectId: String = "map:test@123",
609+
serial: String = "ts1",
610+
siteCode: String = "site1",
611+
) -> InboundObjectMessage {
612+
inboundObjectMessage(
613+
operation: objectOperation(
614+
action: .known(.mapClear),
615+
objectId: objectId,
616+
mapClear: WireMapClear(),
617+
),
618+
serial: serial,
619+
siteCode: siteCode,
620+
)
621+
}
622+
602623
/// Creates an InboundObjectMessage with a MAP_CREATE operation
603624
static func mapCreateOperationMessage(
604625
objectId: String = "map:test@123",

0 commit comments

Comments
 (0)