Skip to content

Commit 22c71cf

Browse files
Buffer OBJECT ProtocolMessages during a sync
Based on [1] at 29276a5. I wrote the implementation, and for the tests followed the development approach described in cb427d8. [1] ably/specification#343
1 parent 6430358 commit 22c71cf

2 files changed

Lines changed: 144 additions & 21 deletions

File tree

Sources/AblyLiveObjects/DefaultRealtimeObjects.swift

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
2323
}
2424
}
2525

26-
/// If this returns false, it means that there is currently no stored sync sequence ID or SyncObjectsPool
26+
/// If this returns false, it means that there is currently no stored sync sequence ID, SyncObjectsPool, or BufferedObjectOperations.
2727
internal var testsOnly_hasSyncSequence: Bool {
2828
mutex.withLock {
2929
mutableState.syncSequence != nil
@@ -45,6 +45,9 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
4545

4646
/// The `ObjectMessage`s gathered during this sync sequence.
4747
internal var syncObjectsPool: [ObjectState]
48+
49+
/// `OBJECT` ProtocolMessages that were received during this sync sequence, to be applied once the sync sequence is complete, per RTO7a.
50+
internal var bufferedObjectOperations: [InboundObjectMessage]
4851
}
4952

5053
/// Tracks whether an object sync sequence has happened yet. This allows us to wait for a sync before returning from `getRoot()`, per RTO1c.
@@ -230,6 +233,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
230233

231234
private struct MutableState {
232235
internal var objectsPool: ObjectsPool
236+
/// Note that we only ever populate this during a multi-`ProtocolMessage` sync sequence. It is not used in the RTO4b or RTO5a5 cases where the sync data is entirely contained within a single ProtocolMessage, because an individual ProtocolMessage is processed atomically and so no other operations that might wish to query this property can occur concurrently with the handling of these cases.
233237
internal var syncSequence: SyncSequence?
234238
internal var syncStatus = SyncStatus()
235239
internal var onChannelAttachedHasObjects: Bool?
@@ -255,7 +259,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
255259

256260
// I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex.
257261

258-
// RTO4b3, RTO4b4, RTO5c3, RTO5c4
262+
// RTO4b3, RTO4b4, RTO4b5, RTO5c3, RTO5c4, RTO5c5
259263
syncSequence = nil
260264
syncStatus.signalSyncComplete()
261265
}
@@ -275,6 +279,8 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
275279

276280
// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
277281
let completedSyncObjectsPool: [ObjectState]?
282+
// If populated, this contains a set of buffered inbound OBJECT messages that should be applied.
283+
let completedSyncBufferedObjectOperations: [InboundObjectMessage]?
278284

279285
if let protocolMessageChannelSerial {
280286
let syncCursor: SyncCursor
@@ -292,27 +298,28 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
292298
// RTO5a3: Continue existing sync sequence
293299
syncSequence
294300
} else {
295-
// RTO5a2: new sequence started, discard previous
296-
.init(id: syncCursor.sequenceID, syncObjectsPool: [])
301+
// RTO5a2a, RTO5a2b: new sequence started, discard previous
302+
.init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: [])
297303
}
298304
} else {
299305
// There's no current sync sequence; start one
300-
.init(id: syncCursor.sequenceID, syncObjectsPool: [])
306+
.init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: [])
301307
}
302308

303309
// RTO5b
304310
updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap(\.object))
305311

306312
syncSequence = updatedSyncSequence
307313

308-
completedSyncObjectsPool = if syncCursor.isEndOfSequence {
309-
updatedSyncSequence.syncObjectsPool
314+
(completedSyncObjectsPool, completedSyncBufferedObjectOperations) = if syncCursor.isEndOfSequence {
315+
(updatedSyncSequence.syncObjectsPool, updatedSyncSequence.bufferedObjectOperations)
310316
} else {
311-
nil
317+
(nil, nil)
312318
}
313319
} else {
314320
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
315321
completedSyncObjectsPool = objectMessages.compactMap(\.object)
322+
completedSyncBufferedObjectOperations = nil
316323
}
317324

318325
if let completedSyncObjectsPool {
@@ -323,7 +330,21 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
323330
coreSDK: coreSDK,
324331
logger: logger,
325332
)
326-
// RTO5c3, RTO5c4
333+
334+
// RTO5c6
335+
if let completedSyncBufferedObjectOperations, !completedSyncBufferedObjectOperations.isEmpty {
336+
logger.log("Applying \(completedSyncBufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug)
337+
for objectMessage in completedSyncBufferedObjectOperations {
338+
applyObjectProtocolMessageObjectMessage(
339+
objectMessage,
340+
logger: logger,
341+
mapDelegate: mapDelegate,
342+
coreSDK: coreSDK,
343+
)
344+
}
345+
}
346+
347+
// RTO5c3, RTO5c4, RTO5c5
327348
syncSequence = nil
328349

329350
syncStatus.signalSyncComplete()
@@ -342,16 +363,22 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
342363

343364
logger.log("handleObjectProtocolMessage(objectMessages: \(objectMessages))", level: .debug)
344365

345-
// TODO: RTO8a's buffering
346-
347-
// RTO8b
348-
for objectMessage in objectMessages {
349-
applyObjectProtocolMessageObjectMessage(
350-
objectMessage,
351-
logger: logger,
352-
mapDelegate: mapDelegate,
353-
coreSDK: coreSDK,
354-
)
366+
if let existingSyncSequence = syncSequence {
367+
// RTO8a: Buffer the OBJECT message, to be handled once the sync completes
368+
logger.log("Buffering OBJECT message due to in-progress sync", level: .debug)
369+
var newSyncSequence = existingSyncSequence
370+
newSyncSequence.bufferedObjectOperations.append(contentsOf: objectMessages)
371+
syncSequence = newSyncSequence
372+
} else {
373+
// RTO8b: Handle the OBJECT message immediately
374+
for objectMessage in objectMessages {
375+
applyObjectProtocolMessageObjectMessage(
376+
objectMessage,
377+
logger: logger,
378+
mapDelegate: mapDelegate,
379+
coreSDK: coreSDK,
380+
)
381+
}
355382
}
356383
}
357384

Tests/AblyLiveObjectsTests/DefaultRealtimeObjectsTests.swift

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ struct DefaultRealtimeObjectsTests {
5353
// @spec RTO5b
5454
// @spec RTO5c3
5555
// @spec RTO5c4
56+
// @spec RTO5c5
5657
@Test
5758
func handlesMultiProtocolMessageSync() async throws {
5859
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
@@ -94,7 +95,7 @@ struct DefaultRealtimeObjectsTests {
9495
protocolMessageChannelSerial: "\(sequenceId):", // Empty cursor indicates end
9596
)
9697

97-
// Verify sync sequence is cleared and there is no SyncObjectsPool (RTO5c3, RTO5c4)
98+
// Verify sync sequence is cleared and there is no SyncObjectsPool or BufferedObjectOperations (RTO5c3, RTO5c4, RTO5c5)
9899
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
99100

100101
// Verify all objects were applied to pool (side effect of applySyncObjectsPool per RTO5c1b1b)
@@ -108,6 +109,7 @@ struct DefaultRealtimeObjectsTests {
108109

109110
// @spec RTO5a2
110111
// @spec RTO5a2a
112+
// @spec RTO5a2b
111113
@Test
112114
func newSequenceIdDiscardsInFlightSync() async throws {
113115
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
@@ -123,6 +125,11 @@ struct DefaultRealtimeObjectsTests {
123125

124126
#expect(realtimeObjects.testsOnly_hasSyncSequence)
125127

128+
// Inject an OBJECT; it will get buffered per RTO8a and subsequently discarded per RTO5a2b
129+
realtimeObjects.handleObjectProtocolMessage(objectMessages: [
130+
TestFactories.mapCreateOperationMessage(objectId: "map:3@789"),
131+
])
132+
126133
// Start new sequence with different ID (RTO5a2)
127134
let secondMessages = [TestFactories.simpleMapMessage(objectId: "map:2@456")]
128135
realtimeObjects.handleObjectSyncProtocolMessage(
@@ -142,6 +149,7 @@ struct DefaultRealtimeObjectsTests {
142149
// Verify only the second sequence's objects were applied (RTO5a2a - previous cleared)
143150
let pool = realtimeObjects.testsOnly_objectsPool
144151
#expect(pool.entries["map:1@123"] == nil) // From discarded first sequence
152+
#expect(pool.entries["map:3@789"] == nil) // Check we discarded the OBJECT that was buffered during discarded first sequence (RTO5a2b)
145153
#expect(pool.entries["map:2@456"] != nil) // From completed second sequence
146154
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
147155
}
@@ -336,6 +344,7 @@ struct DefaultRealtimeObjectsTests {
336344
// @spec RTO4b2
337345
// @spec RTO4b3
338346
// @spec RTO4b4
347+
// @spec RTO4b5
339348
@Test
340349
func handlesHasObjectsFalse() {
341350
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
@@ -381,7 +390,7 @@ struct DefaultRealtimeObjectsTests {
381390
#expect(newRoot as AnyObject !== originalPool.root as AnyObject) // Should be a new instance
382391
#expect(newRoot.testsOnly_data.isEmpty) // Should be zero-valued (empty)
383392

384-
// RTO4b3, RTO4b4: SyncObjectsPool must be cleared, sync sequence cleared
393+
// RTO4b3, RTO4b4, RTO4b5: SyncObjectsPool must be cleared, sync sequence cleared, BufferedObjectOperations cleared
385394
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
386395
}
387396

@@ -922,5 +931,92 @@ struct DefaultRealtimeObjectsTests {
922931
#expect(counter.testsOnly_siteTimeserials["site1"] == "ts2")
923932
}
924933
}
934+
935+
// Tests that when an OBJECT ProtocolMessage is received during a sync sequence, its operations are buffered per RTO8a and applied after sync completion per RTO5c6.
936+
struct BufferOperationTests {
937+
// @spec RTO8a
938+
// @spec RTO5c6
939+
@Test
940+
func buffersObjectOperationsDuringSyncAndAppliesAfterCompletion() async throws {
941+
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
942+
let sequenceId = "seq123"
943+
944+
// Start sync sequence with first OBJECT_SYNC message
945+
let (entryKey, entry) = TestFactories.stringMapEntry(key: "existingKey", value: "existingValue")
946+
let firstSyncMessages = [
947+
TestFactories.mapObjectMessage(
948+
objectId: "map:1@123",
949+
siteTimeserials: ["site1": "ts1"], // Explicit sync data siteCode and serial
950+
entries: [entryKey: entry],
951+
),
952+
]
953+
realtimeObjects.handleObjectSyncProtocolMessage(
954+
objectMessages: firstSyncMessages,
955+
protocolMessageChannelSerial: "\(sequenceId):cursor1",
956+
)
957+
958+
// Verify sync sequence is active
959+
#expect(realtimeObjects.testsOnly_hasSyncSequence)
960+
961+
// Inject first OBJECT ProtocolMessage during sync (RTO8a)
962+
let firstObjectMessage = TestFactories.mapSetOperationMessage(
963+
objectId: "map:1@123",
964+
key: "key1",
965+
value: "value1",
966+
serial: "ts3", // Higher than sync data "ts1"
967+
siteCode: "site1",
968+
)
969+
realtimeObjects.handleObjectProtocolMessage(objectMessages: [firstObjectMessage])
970+
971+
// Verify the operation was buffered and not applied yet
972+
let poolAfterFirstObject = realtimeObjects.testsOnly_objectsPool
973+
#expect(poolAfterFirstObject.entries["map:1@123"] == nil) // Object not yet created from sync
974+
975+
// Inject second OBJECT ProtocolMessage during sync (RTO8a)
976+
let secondObjectMessage = TestFactories.counterIncOperationMessage(
977+
objectId: "counter:1@456",
978+
amount: 10,
979+
serial: "ts4", // Higher than sync data "ts2"
980+
siteCode: "site1",
981+
)
982+
realtimeObjects.handleObjectProtocolMessage(objectMessages: [secondObjectMessage])
983+
984+
// Verify the second operation was also buffered and not applied yet
985+
let poolAfterSecondObject = realtimeObjects.testsOnly_objectsPool
986+
#expect(poolAfterSecondObject.entries["counter:1@456"] == nil) // Object not yet created from sync
987+
988+
// Complete sync sequence with final OBJECT_SYNC message
989+
let finalSyncMessages = [
990+
TestFactories.counterObjectMessage(
991+
objectId: "counter:1@456",
992+
siteTimeserials: ["site1": "ts2"],
993+
count: 5,
994+
),
995+
]
996+
realtimeObjects.handleObjectSyncProtocolMessage(
997+
objectMessages: finalSyncMessages,
998+
protocolMessageChannelSerial: "\(sequenceId):", // Empty cursor indicates end
999+
)
1000+
1001+
// Verify sync sequence is cleared
1002+
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
1003+
1004+
// Verify all objects were applied to pool from sync
1005+
let finalPool = realtimeObjects.testsOnly_objectsPool
1006+
let map = try #require(finalPool.entries["map:1@123"]?.mapValue)
1007+
let counter = try #require(finalPool.entries["counter:1@456"]?.counterValue)
1008+
1009+
// Verify the buffered operations were applied after sync completion (RTO5c6)
1010+
// Check that MAP_SET operation was applied to the map
1011+
let mapValue = try #require(map.get(key: "key1")?.stringValue)
1012+
#expect(mapValue == "value1")
1013+
#expect(map.testsOnly_siteTimeserials["site1"] == "ts3")
1014+
1015+
// Check that COUNTER_INC operation was applied to the counter
1016+
let counterValue = try counter.value
1017+
#expect(counterValue == 15) // 5 (from sync) + 10 (from buffered operation)
1018+
#expect(counter.testsOnly_siteTimeserials["site1"] == "ts4")
1019+
}
1020+
}
9251021
}
9261022
}

0 commit comments

Comments
 (0)