@@ -28,24 +28,28 @@ internal final class InternalDefaultLiveCounter: Sendable {
2828 }
2929
3030 private let logger : AblyPlugin . Logger
31+ private let userCallbackQueue : DispatchQueue
3132
3233 // MARK: - Initialization
3334
3435 internal convenience init (
3536 testsOnly_data data: Double ,
3637 objectID: String ,
37- logger: AblyPlugin . Logger
38+ logger: AblyPlugin . Logger ,
39+ userCallbackQueue: DispatchQueue
3840 ) {
39- self . init ( data: data, objectID: objectID, logger: logger)
41+ self . init ( data: data, objectID: objectID, logger: logger, userCallbackQueue : userCallbackQueue )
4042 }
4143
4244 private init (
4345 data: Double ,
4446 objectID: String ,
45- logger: AblyPlugin . Logger
47+ logger: AblyPlugin . Logger ,
48+ userCallbackQueue: DispatchQueue
4649 ) {
4750 mutableState = . init( liveObject: . init( objectID: objectID) , data: data)
4851 self . logger = logger
52+ self . userCallbackQueue = userCallbackQueue
4953 }
5054
5155 /// Creates a "zero-value LiveCounter", per RTLC4.
@@ -55,11 +59,13 @@ internal final class InternalDefaultLiveCounter: Sendable {
5559 internal static func createZeroValued(
5660 objectID: String ,
5761 logger: AblyPlugin . Logger ,
62+ userCallbackQueue: DispatchQueue ,
5863 ) -> Self {
5964 . init(
6065 data: 0 ,
6166 objectID: objectID,
6267 logger: logger,
68+ userCallbackQueue: userCallbackQueue,
6369 )
6470 }
6571
@@ -90,47 +96,73 @@ internal final class InternalDefaultLiveCounter: Sendable {
9096 notYetImplemented ( )
9197 }
9298
93- internal func subscribe( listener _: ( sending any LiveCounterUpdate ) -> Void ) -> any SubscribeResponse {
94- notYetImplemented ( )
99+ @discardableResult
100+ internal func subscribe( listener: @escaping LiveObjectUpdateCallback < DefaultLiveCounterUpdate > , coreSDK: CoreSDK ) throws ( ARTErrorInfo) -> any SubscribeResponse {
101+ try mutex. ablyLiveObjects_withLockWithTypedThrow { ( ) throws ( ARTErrorInfo) in
102+ // swiftlint:disable:next trailing_closure
103+ try mutableState. liveObject. subscribe ( listener: listener, coreSDK: coreSDK, updateSelfLater: { [ weak self] action in
104+ guard let self else {
105+ return
106+ }
107+
108+ mutex. withLock {
109+ action ( & mutableState. liveObject)
110+ }
111+ } )
112+ }
95113 }
96114
97115 internal func unsubscribeAll( ) {
98- notYetImplemented ( )
116+ mutex. withLock {
117+ mutableState. liveObject. unsubscribeAll ( )
118+ }
99119 }
100120
101- internal func on( event _: LiveObjectLifecycleEvent , callback _: ( ) -> Void ) -> any OnLiveObjectLifecycleEventResponse {
121+ @discardableResult
122+ internal func on( event _: LiveObjectLifecycleEvent , callback _: @escaping LiveObjectLifecycleEventCallback ) -> any OnLiveObjectLifecycleEventResponse {
102123 notYetImplemented ( )
103124 }
104125
105126 internal func offAll( ) {
106127 notYetImplemented ( )
107128 }
108129
130+ // MARK: - Emitting update from external sources
131+
132+ /// Emit an event from this `LiveCounter`.
133+ ///
134+ /// This is used to instruct this counter to emit updates during an `OBJECT_SYNC`.
135+ internal func emit( _ update: LiveObjectUpdate < DefaultLiveCounterUpdate > ) {
136+ mutex. withLock {
137+ mutableState. liveObject. emit ( update, on: userCallbackQueue)
138+ }
139+ }
140+
109141 // MARK: - Data manipulation
110142
111143 /// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
112- internal func replaceData( using state: ObjectState ) {
144+ internal func replaceData( using state: ObjectState ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
113145 mutex. withLock {
114146 mutableState. replaceData ( using: state)
115147 }
116148 }
117149
118150 /// Test-only method to merge initial value from an ObjectOperation, per RTLC10.
119- internal func testsOnly_mergeInitialValue( from operation: ObjectOperation ) {
151+ internal func testsOnly_mergeInitialValue( from operation: ObjectOperation ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
120152 mutex. withLock {
121153 mutableState. mergeInitialValue ( from: operation)
122154 }
123155 }
124156
125157 /// Test-only method to apply a COUNTER_CREATE operation, per RTLC8.
126- internal func testsOnly_applyCounterCreateOperation( _ operation: ObjectOperation ) {
158+ internal func testsOnly_applyCounterCreateOperation( _ operation: ObjectOperation ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
127159 mutex. withLock {
128160 mutableState. applyCounterCreateOperation ( operation, logger: logger)
129161 }
130162 }
131163
132164 /// Test-only method to apply a COUNTER_INC operation, per RTLC9.
133- internal func testsOnly_applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) {
165+ internal func testsOnly_applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
134166 mutex. withLock {
135167 mutableState. applyCounterIncOperation ( operation)
136168 }
@@ -150,6 +182,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
150182 objectMessageSiteCode: objectMessageSiteCode,
151183 objectsPool: & objectsPool,
152184 logger: logger,
185+ userCallbackQueue: userCallbackQueue,
153186 )
154187 }
155188 }
@@ -158,13 +191,13 @@ internal final class InternalDefaultLiveCounter: Sendable {
158191
159192 private struct MutableState {
160193 /// The mutable state common to all LiveObjects.
161- internal var liveObject : LiveObjectMutableState
194+ internal var liveObject : LiveObjectMutableState < DefaultLiveCounterUpdate >
162195
163196 /// The internal data that this map holds, per RTLC3.
164197 internal var data : Double
165198
166199 /// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
167- internal mutating func replaceData( using state: ObjectState ) {
200+ internal mutating func replaceData( using state: ObjectState ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
168201 // RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
169202 liveObject. siteTimeserials = state. siteTimeserials
170203
@@ -175,19 +208,32 @@ internal final class InternalDefaultLiveCounter: Sendable {
175208 data = state. counter? . count? . doubleValue ?? 0
176209
177210 // RTLC6d: If ObjectState.createOp is present, merge the initial value into the LiveCounter as described in RTLC10
178- if let createOp = state. createOp {
211+ return if let createOp = state. createOp {
179212 mergeInitialValue ( from: createOp)
213+ } else {
214+ // TODO: I assume this is what to do, clarify in https://github.com/ably/specification/pull/346/files#r2201363446
215+ . noop
180216 }
181217 }
182218
183219 /// Merges the initial value from an ObjectOperation into this LiveCounter, per RTLC10.
184- internal mutating func mergeInitialValue( from operation: ObjectOperation ) {
220+ internal mutating func mergeInitialValue( from operation: ObjectOperation ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
221+ let update : LiveObjectUpdate < DefaultLiveCounterUpdate >
222+
185223 // RTLC10a: Add ObjectOperation.counter.count to data, if it exists
186224 if let operationCount = operation. counter? . count? . doubleValue {
187225 data += operationCount
226+ // RTLC10c
227+ update = . update( DefaultLiveCounterUpdate ( amount: operationCount) )
228+ } else {
229+ // RTLC10d
230+ update = . noop
188231 }
232+
189233 // RTLC10b: Set the private flag createOperationIsMerged to true
190234 liveObject. createOperationIsMerged = true
235+
236+ return update
191237 }
192238
193239 /// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
@@ -197,6 +243,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
197243 objectMessageSiteCode: String ? ,
198244 objectsPool: inout ObjectsPool ,
199245 logger: Logger ,
246+ userCallbackQueue: DispatchQueue ,
200247 ) {
201248 guard let applicableOperation = liveObject. canApplyOperation ( objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
202249 // RTLC7b
@@ -210,13 +257,17 @@ internal final class InternalDefaultLiveCounter: Sendable {
210257 switch operation. action {
211258 case . known( . counterCreate) :
212259 // RTLC7d1
213- applyCounterCreateOperation (
260+ let update = applyCounterCreateOperation (
214261 operation,
215262 logger: logger,
216263 )
264+ // RTLC7d1a
265+ liveObject. emit ( update, on: userCallbackQueue)
217266 case . known( . counterInc) :
218267 // RTLC7d2
219- applyCounterIncOperation ( operation. counterOp)
268+ let update = applyCounterIncOperation ( operation. counterOp)
269+ // RTLC7d2a
270+ liveObject. emit ( update, on: userCallbackQueue)
220271 default :
221272 // RTLC7d3
222273 logger. log ( " Operation \( operation) has unsupported action for LiveCounter; discarding " , level: . warn)
@@ -227,25 +278,28 @@ internal final class InternalDefaultLiveCounter: Sendable {
227278 internal mutating func applyCounterCreateOperation(
228279 _ operation: ObjectOperation ,
229280 logger: Logger ,
230- ) {
281+ ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
231282 if liveObject. createOperationIsMerged {
232283 // RTLC8b
233284 logger. log ( " Not applying COUNTER_CREATE because a COUNTER_CREATE has already been applied " , level: . warn)
234- return
285+ return . noop
235286 }
236287
237- // RTLC8c
238- mergeInitialValue ( from: operation)
288+ // RTLC8c, RTLC8e
289+ return mergeInitialValue ( from: operation)
239290 }
240291
241292 /// Applies a `COUNTER_INC` operation, per RTLC9.
242- internal mutating func applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) {
293+ internal mutating func applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
243294 guard let operation else {
244- return
295+ // RTL9e
296+ return . noop
245297 }
246298
247- // RTLC9b
248- data += operation. amount. doubleValue
299+ // RTLC9b, RTLC9d
300+ let amount = operation. amount. doubleValue
301+ data += amount
302+ return . update( DefaultLiveCounterUpdate ( amount: amount) )
249303 }
250304 }
251305}
0 commit comments