Skip to content

Commit 30d867b

Browse files
Merge pull request #117 from ably/AIT-207-partial-object-sync
[AIT-207] Partial object sync
2 parents 060be55 + 650f876 commit 30d867b

11 files changed

Lines changed: 639 additions & 117 deletions

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
109109
internal var id: String
110110

111111
/// The `ObjectMessage`s gathered during this sync sequence.
112-
internal var syncObjectsPool: [SyncObjectsPoolEntry]
112+
internal var syncObjectsPool: SyncObjectsPool
113113
}
114114

115115
internal init(
@@ -804,16 +804,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
804804
}
805805
}
806806

807-
let syncObjectsPoolEntries = objectMessages.compactMap { objectMessage in
808-
if let object = objectMessage.object {
809-
SyncObjectsPoolEntry(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
810-
} else {
811-
nil
812-
}
813-
}
814-
815807
// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
816-
let completedSyncObjectsPool: [SyncObjectsPoolEntry]?
808+
let completedSyncObjectsPool: SyncObjectsPool?
817809
// The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC.
818810
let syncSequenceForSyncingState: SyncSequence?
819811

@@ -823,15 +815,17 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
823815
} else {
824816
nil
825817
}
826-
var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: [])
827-
// RTO5b
828-
updatedSyncSequence.syncObjectsPool.append(contentsOf: syncObjectsPoolEntries)
818+
var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: .init())
819+
// RTO5f
820+
updatedSyncSequence.syncObjectsPool.accumulate(objectMessages, logger: logger)
829821
syncSequenceForSyncingState = updatedSyncSequence
830822

831823
completedSyncObjectsPool = syncCursor.isEndOfSequence ? updatedSyncSequence.syncObjectsPool : nil
832824
} else {
833825
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
834-
completedSyncObjectsPool = syncObjectsPoolEntries
826+
var pool = SyncObjectsPool()
827+
pool.accumulate(objectMessages, logger: logger)
828+
completedSyncObjectsPool = pool
835829
syncSequenceForSyncingState = nil
836830
}
837831

Sources/AblyLiveObjects/Internal/ObjectsPool.swift

Lines changed: 79 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ internal struct ObjectsPool {
278278

279279
/// Applies the objects gathered during an `OBJECT_SYNC` to this `ObjectsPool`, per RTO5c1 and RTO5c2.
280280
internal mutating func nosync_applySyncObjectsPool(
281-
_ syncObjectsPool: [SyncObjectsPoolEntry],
281+
_ syncObjectsPool: SyncObjectsPool,
282282
logger: Logger,
283283
internalQueue: DispatchQueue,
284284
userCallbackQueue: DispatchQueue,
@@ -293,72 +293,37 @@ internal struct ObjectsPool {
293293
var updatesToExistingObjects: [ObjectsPool.Entry.DeferredUpdate] = []
294294

295295
// RTO5c1: For each ObjectState member in the SyncObjectsPool list
296-
for syncObjectsPoolEntry in syncObjectsPool {
297-
receivedObjectIds.insert(syncObjectsPoolEntry.state.objectId)
296+
for objectMessage in syncObjectsPool {
297+
// Every message yielded by SyncObjectsPool is guaranteed to have a non-nil `.object` with `.map` or `.counter`.
298+
guard let state = objectMessage.object else {
299+
preconditionFailure("SyncObjectsPool yielded a message with nil object")
300+
}
301+
receivedObjectIds.insert(state.objectId)
298302

299303
// RTO5c1a: If an object with ObjectState.objectId exists in the internal ObjectsPool
300-
if let existingEntry = entries[syncObjectsPoolEntry.state.objectId] {
301-
logger.log("Updating existing object with ID: \(syncObjectsPoolEntry.state.objectId)", level: .debug)
304+
if let existingEntry = entries[state.objectId] {
305+
logger.log("Updating existing object with ID: \(state.objectId)", level: .debug)
302306

303307
// RTO5c1a1: Override the internal data for the object as per RTLC6, RTLM6
304308
let deferredUpdate = existingEntry.nosync_replaceData(
305-
using: syncObjectsPoolEntry.state,
306-
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
309+
using: state,
310+
objectMessageSerialTimestamp: objectMessage.serialTimestamp,
307311
objectsPool: &self,
308312
userCallbackQueue: userCallbackQueue,
309313
)
310314
// RTO5c1a2: Store this update to emit at end
311315
updatesToExistingObjects.append(deferredUpdate)
312316
} else {
313317
// RTO5c1b: If an object with ObjectState.objectId does not exist in the internal ObjectsPool
314-
logger.log("Creating new object with ID: \(syncObjectsPoolEntry.state.objectId)", level: .debug)
315-
316-
// RTO5c1b1: Create a new LiveObject using the data from ObjectState and add it to the internal ObjectsPool:
317-
let newEntry: Entry?
318-
319-
if syncObjectsPoolEntry.state.counter != nil {
320-
// RTO5c1b1a: If ObjectState.counter is present, create a zero-value LiveCounter,
321-
// set its private objectId equal to ObjectState.objectId and override its internal data per RTLC6
322-
let counter = InternalDefaultLiveCounter.createZeroValued(
323-
objectID: syncObjectsPoolEntry.state.objectId,
324-
logger: logger,
325-
internalQueue: internalQueue,
326-
userCallbackQueue: userCallbackQueue,
327-
clock: clock,
328-
)
329-
_ = counter.nosync_replaceData(
330-
using: syncObjectsPoolEntry.state,
331-
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
332-
)
333-
newEntry = .counter(counter)
334-
} else if let objectsMap = syncObjectsPoolEntry.state.map {
335-
// RTO5c1b1b: If ObjectState.map is present, create a zero-value LiveMap,
336-
// set its private objectId equal to ObjectState.objectId, set its private semantics
337-
// equal to ObjectState.map.semantics and override its internal data per RTLM6
338-
let map = InternalDefaultLiveMap.createZeroValued(
339-
objectID: syncObjectsPoolEntry.state.objectId,
340-
semantics: objectsMap.semantics,
341-
logger: logger,
342-
internalQueue: internalQueue,
343-
userCallbackQueue: userCallbackQueue,
344-
clock: clock,
345-
)
346-
_ = map.nosync_replaceData(
347-
using: syncObjectsPoolEntry.state,
348-
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
349-
objectsPool: &self,
350-
)
351-
newEntry = .map(map)
352-
} else {
353-
// RTO5c1b1c: Otherwise, log a warning that an unsupported object state message has been received, and discard the current ObjectState without taking any action
354-
logger.log("Unsupported object state message received for objectId: \(syncObjectsPoolEntry.state.objectId)", level: .warn)
355-
newEntry = nil
356-
}
357-
358-
if let newEntry {
359-
// Note that we will never replace the root object here, and thus never break the RTO3b invariant that the root object is always a map. This is because the pool always contains a root object and thus we always go through the RTO5c1a branch of the `if` above.
360-
entries[syncObjectsPoolEntry.state.objectId] = newEntry
361-
}
318+
// (The nosync_createObjectFromSync precondition that this is not the root object is satisfied because the pool always contains a root object. The precondition that state has counter or map is satisfied because SyncObjectsPool guarantees this for every yielded message.)
319+
nosync_createObjectFromSync(
320+
state: state,
321+
objectMessage: objectMessage,
322+
logger: logger,
323+
internalQueue: internalQueue,
324+
userCallbackQueue: userCallbackQueue,
325+
clock: clock,
326+
)
362327
}
363328
}
364329

@@ -380,6 +345,65 @@ internal struct ObjectsPool {
380345
logger.log("applySyncObjectsPool completed. Pool now contains \(entries.count) objects", level: .debug)
381346
}
382347

348+
/// Creates a new object from a sync entry and adds it to the pool, per RTO5c1b.
349+
///
350+
/// - Precondition: `state.objectId` must not be the root object ID, in order to preserve the RTO3b invariant that the root is always a map.
351+
/// - Precondition: `state` must have either `.counter` or `.map` populated.
352+
private mutating func nosync_createObjectFromSync(
353+
state: ObjectState,
354+
objectMessage: InboundObjectMessage,
355+
logger: Logger,
356+
internalQueue: DispatchQueue,
357+
userCallbackQueue: DispatchQueue,
358+
clock: SimpleClock,
359+
) {
360+
precondition(state.objectId != ObjectsPool.rootKey)
361+
362+
logger.log("Creating new object with ID: \(state.objectId)", level: .debug)
363+
364+
// RTO5c1b1: Create a new LiveObject using the data from ObjectState and add it to the internal ObjectsPool:
365+
let newEntry: Entry
366+
367+
if state.counter != nil {
368+
// RTO5c1b1a: If ObjectState.counter is present, create a zero-value LiveCounter,
369+
// set its private objectId equal to ObjectState.objectId and override its internal data per RTLC6
370+
let counter = InternalDefaultLiveCounter.createZeroValued(
371+
objectID: state.objectId,
372+
logger: logger,
373+
internalQueue: internalQueue,
374+
userCallbackQueue: userCallbackQueue,
375+
clock: clock,
376+
)
377+
_ = counter.nosync_replaceData(
378+
using: state,
379+
objectMessageSerialTimestamp: objectMessage.serialTimestamp,
380+
)
381+
newEntry = .counter(counter)
382+
} else if let objectsMap = state.map {
383+
// RTO5c1b1b: If ObjectState.map is present, create a zero-value LiveMap,
384+
// set its private objectId equal to ObjectState.objectId, set its private semantics
385+
// equal to ObjectState.map.semantics and override its internal data per RTLM6
386+
let map = InternalDefaultLiveMap.createZeroValued(
387+
objectID: state.objectId,
388+
semantics: objectsMap.semantics,
389+
logger: logger,
390+
internalQueue: internalQueue,
391+
userCallbackQueue: userCallbackQueue,
392+
clock: clock,
393+
)
394+
_ = map.nosync_replaceData(
395+
using: state,
396+
objectMessageSerialTimestamp: objectMessage.serialTimestamp,
397+
objectsPool: &self,
398+
)
399+
newEntry = .map(map)
400+
} else {
401+
preconditionFailure("state for objectId \(state.objectId) has neither counter nor map")
402+
}
403+
404+
entries[state.objectId] = newEntry
405+
}
406+
383407
/// Removes all entries except the root, and clears the root's data. This is to be used when an `ATTACHED` ProtocolMessage indicates that the only object in a channel is an empty root map, per RTO4b.
384408
internal mutating func nosync_reset() {
385409
let root = root
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import Foundation
2+
3+
/// The RTO5f collection of objects gathered during an `OBJECT_SYNC` sequence, ready to be applied to the `ObjectsPool`.
4+
///
5+
/// Every stored message is guaranteed to have a non-nil `.object` with either `.map` or `.counter` populated.
6+
internal struct SyncObjectsPool: Collection {
7+
/// Keyed by `objectId`. Every value has a non-nil `.object` with either `.map` or `.counter` populated; the
8+
/// `accumulate` method enforces this invariant.
9+
private var objectMessages: [String: InboundObjectMessage]
10+
11+
/// Creates an empty pool.
12+
internal init() {
13+
objectMessages = [:]
14+
}
15+
16+
/// Accumulates object messages into the pool per RTO5f.
17+
internal mutating func accumulate(
18+
_ objectMessages: [InboundObjectMessage],
19+
logger: Logger,
20+
) {
21+
for objectMessage in objectMessages {
22+
accumulate(objectMessage, logger: logger)
23+
}
24+
}
25+
26+
/// Accumulates a single `ObjectMessage` into the pool per RTO5f.
27+
private mutating func accumulate(
28+
_ objectMessage: InboundObjectMessage,
29+
logger: Logger,
30+
) {
31+
// RTO5f3: Reject unsupported object types before pool lookup. Only messages whose `.object` has `.map` or `.counter`
32+
// are stored, which callers of the iteration can rely on.
33+
guard let object = objectMessage.object, object.map != nil || object.counter != nil else {
34+
logger.log("Skipping unsupported object type during sync for objectId \(objectMessage.object?.objectId ?? "unknown")", level: .warn)
35+
return
36+
}
37+
38+
let objectId = object.objectId
39+
40+
if let existing = objectMessages[objectId] {
41+
// RTO5f2: An entry already exists for this objectId (partial object state).
42+
if object.map != nil {
43+
// RTO5f2a: Incoming message has a map.
44+
if object.tombstone {
45+
// RTO5f2a1: Incoming tombstone is true — replace the entire entry.
46+
objectMessages[objectId] = objectMessage
47+
} else {
48+
// RTO5f2a2: Merge map entries into the existing message.
49+
var merged = existing
50+
if let incomingEntries = object.map?.entries {
51+
var mergedObject = merged.object!
52+
guard var mergedMap = mergedObject.map else {
53+
// Not a specified scenario — the server won't send a map and a non-map for the same
54+
// objectId in practice. Guard defensively rather than force-unwrapping.
55+
logger.log("Existing entry for objectId \(objectId) is not a map; replacing with incoming message", level: .error)
56+
objectMessages[objectId] = objectMessage
57+
return
58+
}
59+
var mergedEntries = mergedMap.entries ?? [:]
60+
mergedEntries.merge(incomingEntries) { _, new in new }
61+
mergedMap.entries = mergedEntries
62+
mergedObject.map = mergedMap
63+
merged.object = mergedObject
64+
}
65+
objectMessages[objectId] = merged
66+
}
67+
} else {
68+
// RTO5f2b: Incoming message has a counter — log error, skip.
69+
logger.log("Received partial counter sync for objectId \(objectId); skipping", level: .error)
70+
}
71+
} else {
72+
// RTO5f1: No entry exists for this objectId — store the message.
73+
objectMessages[objectId] = objectMessage
74+
}
75+
}
76+
77+
// MARK: - Collection conformance
78+
79+
internal typealias Index = Dictionary<String, InboundObjectMessage>.Values.Index
80+
internal typealias Element = InboundObjectMessage
81+
82+
internal var startIndex: Index { objectMessages.values.startIndex }
83+
internal var endIndex: Index { objectMessages.values.endIndex }
84+
internal func index(after i: Index) -> Index { objectMessages.values.index(after: i) }
85+
internal subscript(position: Index) -> Element { objectMessages.values[position] }
86+
}

Sources/AblyLiveObjects/Internal/SyncObjectsPoolEntry.swift

Lines changed: 0 additions & 15 deletions
This file was deleted.

Sources/AblyLiveObjects/Protocol/ObjectMessage.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import Foundation
55
// This file contains the ObjectMessage types that we use within the codebase. We convert them to and from the corresponding wire types (e.g. `InboundWireObjectMessage`) for sending and receiving over the wire.
66

77
/// An `ObjectMessage` received in the `state` property of an `OBJECT` or `OBJECT_SYNC` `ProtocolMessage`.
8-
internal struct InboundObjectMessage {
8+
internal struct InboundObjectMessage: Equatable {
99
internal var id: String? // OM2a
1010
internal var clientId: String? // OM2b
1111
internal var connectionId: String? // OM2c

Sources/AblyLiveObjects/Protocol/WireObjectMessage.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ extension WireObjectData: WireObjectCodable {
631631
/// A type that can be either a string or binary data.
632632
///
633633
/// Used to represent the values that `WireObjectData.bytes` might hold, after being encoded per OD4 or before being decoded per OD5.
634-
internal enum StringOrData: WireCodable {
634+
internal enum StringOrData: Equatable, WireCodable {
635635
case string(String)
636636
case data(Data)
637637

Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ struct TestFactories {
268268
object: ObjectState? = nil,
269269
serial: String? = nil,
270270
siteCode: String? = nil,
271+
serialTimestamp: Date? = nil,
271272
) -> InboundObjectMessage {
272273
InboundObjectMessage(
273274
id: id,
@@ -279,6 +280,7 @@ struct TestFactories {
279280
object: object,
280281
serial: serial,
281282
siteCode: siteCode,
283+
serialTimestamp: serialTimestamp,
282284
)
283285
}
284286

Tests/AblyLiveObjectsTests/InternalDefaultRealtimeObjectsTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ struct InternalDefaultRealtimeObjectsTests {
6060
// @spec RTO5a1
6161
// @spec RTO5a3
6262
// @spec RTO5a4
63-
// @spec RTO5b
63+
// @spec RTO5f
6464
// @spec RTO5c3
6565
// @spec RTO5c4
6666
// @spec RTO5c5

0 commit comments

Comments
 (0)