Skip to content

Commit 4df4816

Browse files
Merge pull request #121 from ably/AIT-287-new-rules-for-discarding-buffered-events
[AIT-287] Implement new rules for discarding ops buffered during sync
2 parents 761376b + f15cfac commit 4df4816

3 files changed

Lines changed: 227 additions & 52 deletions

File tree

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,17 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
8484
}
8585
}
8686

87+
/// Returns the number of buffered object operations if in the SYNCING state, or nil otherwise.
88+
internal var testsOnly_bufferedObjectOperationsCount: Int? {
89+
mutableStateMutex.withSync { mutableState in
90+
if case let .syncing(syncingData) = mutableState.state {
91+
syncingData.bufferedObjectOperations.count
92+
} else {
93+
nil
94+
}
95+
}
96+
}
97+
8798
// These drive the testsOnly_waitingForSyncEvents property that informs the test suite when `getRoot()` is waiting for the object sync sequence to complete per RTO1c.
8899
private let waitingForSyncEvents: AsyncStream<Void>
89100
private let waitingForSyncEventsContinuation: AsyncStream<Void>.Continuation
@@ -728,8 +739,12 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
728739
onChannelAttachedHasObjects = hasObjects
729740

730741
// We will subsequently transition to .synced either by the completion of the RTO4a OBJECT_SYNC, or by the RTO4b no-HAS_OBJECTS case below
731-
if state.toObjectsSyncState != .syncing {
732-
// RTO4c
742+
switch state {
743+
case let .syncing(syncingData):
744+
// RTO4d
745+
syncingData.bufferedObjectOperations = []
746+
case .initialized, .synced:
747+
// RTO4c, RTO4d
733748
transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: nil)), userCallbackQueue: userCallbackQueue)
734749
}
735750

@@ -743,7 +758,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
743758

744759
// I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex.
745760

746-
// RTO4b3, RTO4b4, RTO4b5, RTO5c3, RTO5c4, RTO5c5, RTO5c9, RTO5c8
761+
// RTO4b3, RTO4b4, RTO5c3, RTO5c4, RTO5c5, RTO5c9, RTO5c8
747762
appliedOnAckSerials.removeAll()
748763
transition(to: .synced, userCallbackQueue: userCallbackQueue)
749764

@@ -784,9 +799,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
784799
// Figure out whether to continue any existing sync sequence or start a new one
785800
let isNewSyncSequence = syncCursor == nil || syncingData.syncSequence?.id != syncCursor?.sequenceID
786801
if isNewSyncSequence {
787-
// RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
802+
// RTO5a2a: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
788803
syncingData.syncSequence = nil
789-
syncingData.bufferedObjectOperations = []
790804
}
791805
}
792806

Tests/AblyLiveObjectsTests/InternalDefaultRealtimeObjectsTests.swift

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ struct InternalDefaultRealtimeObjectsTests {
126126

127127
// @spec RTO5a2
128128
// @spec RTO5a2a
129-
// @spec RTO5a2b
130129
@Test
131130
func newSequenceIdDiscardsInFlightSync() async throws {
132131
let internalQueue = TestFactories.createInternalQueue()
@@ -145,13 +144,6 @@ struct InternalDefaultRealtimeObjectsTests {
145144

146145
#expect(realtimeObjects.testsOnly_hasSyncSequence)
147146

148-
// Inject an OBJECT; it will get buffered per RTO8a and subsequently discarded per RTO5a2b
149-
internalQueue.ably_syncNoDeadlock {
150-
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
151-
TestFactories.mapCreateOperationMessage(objectId: "map:3@789"),
152-
])
153-
}
154-
155147
// Start new sequence with different ID (RTO5a2)
156148
let secondMessages = [TestFactories.simpleMapMessage(objectId: "map:2@456")]
157149
internalQueue.ably_syncNoDeadlock {
@@ -175,7 +167,6 @@ struct InternalDefaultRealtimeObjectsTests {
175167
// Verify only the second sequence's objects were applied (RTO5a2a - previous cleared)
176168
let pool = realtimeObjects.testsOnly_objectsPool
177169
#expect(pool.entries["map:1@123"] == nil) // From discarded first sequence
178-
#expect(pool.entries["map:3@789"] == nil) // Check we discarded the OBJECT that was buffered during discarded first sequence (RTO5a2b)
179170
#expect(pool.entries["map:2@456"] != nil) // From completed second sequence
180171
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
181172
}
@@ -366,9 +357,10 @@ struct InternalDefaultRealtimeObjectsTests {
366357
struct OnChannelAttachedTests {
367358
// MARK: - RTO4a Tests
368359

369-
// @spec RTO4a - Checks that when the `HAS_OBJECTS` flag is 1 (i.e. the server will shortly perform an `OBJECT_SYNC` sequence) we don't modify any internal state
360+
// @spec RTO4a - Checks that when the `HAS_OBJECTS` flag is 1 (i.e. the server will shortly perform an `OBJECT_SYNC` sequence) we don't modify the objects pool or sync sequence
361+
// @specOneOf(1/2) RTO4d
370362
@Test
371-
func doesNotModifyStateWhenHasObjectsIsTrue() {
363+
func handlesHasObjectTrue() {
372364
let internalQueue = TestFactories.createInternalQueue()
373365
let realtimeObjects = InternalDefaultRealtimeObjectsTests.createDefaultRealtimeObjects(internalQueue: internalQueue)
374366

@@ -389,12 +381,21 @@ struct InternalDefaultRealtimeObjectsTests {
389381

390382
#expect(realtimeObjects.testsOnly_hasSyncSequence)
391383

384+
// Inject a buffered OBJECT operation
385+
internalQueue.ably_syncNoDeadlock {
386+
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
387+
TestFactories.mapCreateOperationMessage(objectId: "map:buffered@789"),
388+
])
389+
}
390+
391+
#expect(realtimeObjects.testsOnly_bufferedObjectOperationsCount == 1)
392+
392393
// When: onChannelAttached is called with hasObjects = true
393394
internalQueue.ably_syncNoDeadlock {
394395
realtimeObjects.nosync_onChannelAttached(hasObjects: true)
395396
}
396397

397-
// Then: Nothing should be modified
398+
// Then:
398399
#expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == true)
399400

400401
// Verify ObjectsPool is unchanged
@@ -405,6 +406,9 @@ struct InternalDefaultRealtimeObjectsTests {
405406

406407
// Verify sync sequence is still active
407408
#expect(realtimeObjects.testsOnly_hasSyncSequence)
409+
410+
// RTO4d: Verify buffered object operations were cleared
411+
#expect(realtimeObjects.testsOnly_bufferedObjectOperationsCount == 0)
408412
}
409413

410414
// MARK: - RTO4b Tests
@@ -414,7 +418,7 @@ struct InternalDefaultRealtimeObjectsTests {
414418
// @spec RTO4b2a
415419
// @spec RTO4b3
416420
// @spec RTO4b4
417-
// @spec RTO4b5
421+
// @specOneOf(2/2) RTO4d
418422
@available(iOS 17.0.0, tvOS 17.0.0, *)
419423
@Test
420424
func handlesHasObjectsFalse() async throws {
@@ -490,7 +494,7 @@ struct InternalDefaultRealtimeObjectsTests {
490494
#expect(newRoot as AnyObject === originalPool.root as AnyObject) // Should be same instance
491495
#expect(newRoot.testsOnly_data.isEmpty) // Should be zero-valued (empty)
492496

493-
// RTO4b3, RTO4b4, RTO4b5: SyncObjectsPool must be cleared, sync sequence cleared, BufferedObjectOperations cleared, appliedOnAckSerials cleared
497+
// RTO4b3, RTO4b4, RTO4d: SyncObjectsPool must be cleared, sync sequence cleared, BufferedObjectOperations cleared, appliedOnAckSerials cleared
494498
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
495499
#expect(realtimeObjects.testsOnly_appliedOnAckSerials.isEmpty)
496500
}

0 commit comments

Comments
 (0)