Skip to content

Commit 9e48ef3

Browse files
Refactor SubscriptionStorage to wrap DefaultInternalEventEmitter
SubscriptionStorage was a struct that manually managed subscription bookkeeping. Because it was a value type, unsubscription required a verbose updateSelfLater closure-threading pattern at every call site. Replace it with a class wrapping DefaultInternalEventEmitter. A private EmitData struct bundles the update with the dispatch queue so nosync_emit can still accept a queue parameter. Each nosync_subscribe creates a SubscriptionController whose signal is passed to the emitter; the returned SubscribeResponse captures the controller and synchronously dispatches nosync_off() on unsubscribe. Because SubscriptionStorage is now a reference type, the updateSelfLater closures throughout LiveObjectMutableState, InternalDefaultLiveCounter, InternalDefaultLiveMap, and InternalDefaultRealtimeObjects are no longer needed and are removed. The subscription/emit methods on LiveObjectMutableState drop their mutating qualifier and gain a nosync_ prefix for consistency. TODO: Lawrence check that this resolves #102 TODO: There's still a mention of updateSelfLater Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6b70d37 commit 9e48ef3

7 files changed

Lines changed: 126 additions & 228 deletions

File tree

Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
5252
) {
5353
mutableStateMutex = .init(
5454
dispatchQueue: internalQueue,
55-
initialValue: .init(liveObjectMutableState: .init(objectID: objectID), data: data),
55+
initialValue: .init(liveObjectMutableState: .init(objectID: objectID, internalQueue: internalQueue), data: data),
5656
)
5757
self.logger = logger
5858
self.userCallbackQueue = userCallbackQueue
@@ -150,44 +150,26 @@ internal final class InternalDefaultLiveCounter: Sendable {
150150
@discardableResult
151151
internal func subscribe(listener: @escaping LiveObjectUpdateCallback<DefaultLiveCounterUpdate>, coreSDK: CoreSDK) throws(ARTErrorInfo) -> any SubscribeResponse {
152152
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
153-
// swiftlint:disable:next trailing_closure
154-
try mutableState.liveObjectMutableState.nosync_subscribe(listener: listener, coreSDK: coreSDK, updateSelfLater: { [weak self] action in
155-
guard let self else {
156-
return
157-
}
158-
159-
mutableStateMutex.withSync { mutableState in
160-
action(&mutableState.liveObjectMutableState)
161-
}
162-
})
153+
try mutableState.liveObjectMutableState.nosync_subscribe(listener: listener, coreSDK: coreSDK)
163154
}
164155
}
165156

166157
internal func unsubscribeAll() {
167158
mutableStateMutex.withSync { mutableState in
168-
mutableState.liveObjectMutableState.unsubscribeAll()
159+
mutableState.liveObjectMutableState.nosync_unsubscribeAll()
169160
}
170161
}
171162

172163
@discardableResult
173164
internal func on(event: LiveObjectLifecycleEvent, callback: @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
174165
mutableStateMutex.withSync { mutableState in
175-
// swiftlint:disable:next trailing_closure
176-
mutableState.liveObjectMutableState.on(event: event, callback: callback, updateSelfLater: { [weak self] action in
177-
guard let self else {
178-
return
179-
}
180-
181-
mutableStateMutex.withSync { mutableState in
182-
action(&mutableState.liveObjectMutableState)
183-
}
184-
})
166+
mutableState.liveObjectMutableState.nosync_on(event: event, callback: callback)
185167
}
186168
}
187169

188170
internal func offAll() {
189171
mutableStateMutex.withSync { mutableState in
190-
mutableState.liveObjectMutableState.offAll()
172+
mutableState.liveObjectMutableState.nosync_offAll()
191173
}
192174
}
193175

@@ -198,7 +180,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
198180
/// This is used to instruct this counter to emit updates during an `OBJECT_SYNC`.
199181
internal func nosync_emit(_ update: LiveObjectUpdate<DefaultLiveCounterUpdate>) {
200182
mutableStateMutex.withoutSync { mutableState in
201-
mutableState.liveObjectMutableState.emit(update, on: userCallbackQueue)
183+
mutableState.liveObjectMutableState.nosync_emit(update, on: userCallbackQueue)
202184
}
203185
}
204186

@@ -425,14 +407,14 @@ internal final class InternalDefaultLiveCounter: Sendable {
425407
logger: logger,
426408
)
427409
// RTLC7d1a
428-
liveObjectMutableState.emit(update, on: userCallbackQueue)
410+
liveObjectMutableState.nosync_emit(update, on: userCallbackQueue)
429411
// RTLC7d1b
430412
return true
431413
case .known(.counterInc):
432414
// RTLC7d5
433415
let update = applyCounterIncOperation(operation.counterInc)
434416
// RTLC7d5a
435-
liveObjectMutableState.emit(update, on: userCallbackQueue)
417+
liveObjectMutableState.nosync_emit(update, on: userCallbackQueue)
436418
// RTLC7d5b
437419
return true
438420
case .known(.objectDelete):
@@ -447,7 +429,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
447429
)
448430

449431
// RTLC7d4a
450-
liveObjectMutableState.emit(.update(.init(amount: -dataBeforeApplyingOperation)), on: userCallbackQueue)
432+
liveObjectMutableState.nosync_emit(.update(.init(amount: -dataBeforeApplyingOperation)), on: userCallbackQueue)
451433
// RTLC7d4b
452434
return true
453435
default:

Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ internal final class InternalDefaultLiveMap: Sendable {
7878
) {
7979
mutableStateMutex = .init(
8080
dispatchQueue: internalQueue,
81-
initialValue: .init(liveObjectMutableState: .init(objectID: objectID), data: data, semantics: semantics),
81+
initialValue: .init(liveObjectMutableState: .init(objectID: objectID, internalQueue: internalQueue), data: data, semantics: semantics),
8282
)
8383
self.logger = logger
8484
self.userCallbackQueue = userCallbackQueue
@@ -232,44 +232,26 @@ internal final class InternalDefaultLiveMap: Sendable {
232232
@discardableResult
233233
internal func subscribe(listener: @escaping LiveObjectUpdateCallback<DefaultLiveMapUpdate>, coreSDK: CoreSDK) throws(ARTErrorInfo) -> any SubscribeResponse {
234234
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
235-
// swiftlint:disable:next trailing_closure
236-
try mutableState.liveObjectMutableState.nosync_subscribe(listener: listener, coreSDK: coreSDK, updateSelfLater: { [weak self] action in
237-
guard let self else {
238-
return
239-
}
240-
241-
mutableStateMutex.withSync { mutableState in
242-
action(&mutableState.liveObjectMutableState)
243-
}
244-
})
235+
try mutableState.liveObjectMutableState.nosync_subscribe(listener: listener, coreSDK: coreSDK)
245236
}
246237
}
247238

248239
internal func unsubscribeAll() {
249240
mutableStateMutex.withSync { mutableState in
250-
mutableState.liveObjectMutableState.unsubscribeAll()
241+
mutableState.liveObjectMutableState.nosync_unsubscribeAll()
251242
}
252243
}
253244

254245
@discardableResult
255246
internal func on(event: LiveObjectLifecycleEvent, callback: @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
256247
mutableStateMutex.withSync { mutableState in
257-
// swiftlint:disable:next trailing_closure
258-
mutableState.liveObjectMutableState.on(event: event, callback: callback, updateSelfLater: { [weak self] action in
259-
guard let self else {
260-
return
261-
}
262-
263-
mutableStateMutex.withSync { mutableState in
264-
action(&mutableState.liveObjectMutableState)
265-
}
266-
})
248+
mutableState.liveObjectMutableState.nosync_on(event: event, callback: callback)
267249
}
268250
}
269251

270252
internal func offAll() {
271253
mutableStateMutex.withSync { mutableState in
272-
mutableState.liveObjectMutableState.offAll()
254+
mutableState.liveObjectMutableState.nosync_offAll()
273255
}
274256
}
275257

@@ -280,7 +262,7 @@ internal final class InternalDefaultLiveMap: Sendable {
280262
/// This is used to instruct this map to emit updates during an `OBJECT_SYNC`.
281263
internal func nosync_emit(_ update: LiveObjectUpdate<DefaultLiveMapUpdate>) {
282264
mutableStateMutex.withoutSync { mutableState in
283-
mutableState.liveObjectMutableState.emit(update, on: userCallbackQueue)
265+
mutableState.liveObjectMutableState.nosync_emit(update, on: userCallbackQueue)
284266
}
285267
}
286268

@@ -663,7 +645,7 @@ internal final class InternalDefaultLiveMap: Sendable {
663645
clock: clock,
664646
)
665647
// RTLM15d1a
666-
liveObjectMutableState.emit(update, on: userCallbackQueue)
648+
liveObjectMutableState.nosync_emit(update, on: userCallbackQueue)
667649
// RTLM15d1b
668650
return true
669651
case .known(.mapSet):
@@ -688,7 +670,7 @@ internal final class InternalDefaultLiveMap: Sendable {
688670
clock: clock,
689671
)
690672
// RTLM15d6a
691-
liveObjectMutableState.emit(update, on: userCallbackQueue)
673+
liveObjectMutableState.nosync_emit(update, on: userCallbackQueue)
692674
// RTLM15d6b
693675
return true
694676
case .known(.mapRemove):
@@ -705,7 +687,7 @@ internal final class InternalDefaultLiveMap: Sendable {
705687
clock: clock,
706688
)
707689
// RTLM15d7a
708-
liveObjectMutableState.emit(update, on: userCallbackQueue)
690+
liveObjectMutableState.nosync_emit(update, on: userCallbackQueue)
709691
// RTLM15d7b
710692
return true
711693
case .known(.objectDelete):
@@ -720,7 +702,7 @@ internal final class InternalDefaultLiveMap: Sendable {
720702
)
721703

722704
// RTLM15d5a
723-
liveObjectMutableState.emit(.update(.init(update: dataBeforeApplyingOperation.mapValues { _ in .removed })), on: userCallbackQueue)
705+
liveObjectMutableState.nosync_emit(.update(.init(update: dataBeforeApplyingOperation.mapValues { _ in .removed })), on: userCallbackQueue)
724706
// RTLM15d5b
725707
return true
726708
case .known(.mapClear):
@@ -729,7 +711,7 @@ internal final class InternalDefaultLiveMap: Sendable {
729711
serial: applicableOperation.objectMessageSerial,
730712
)
731713
// RTLM15d8a
732-
liveObjectMutableState.emit(update, on: userCallbackQueue)
714+
liveObjectMutableState.nosync_emit(update, on: userCallbackQueue)
733715
// RTLM15d8b
734716
return true
735717
default:
@@ -950,7 +932,7 @@ internal final class InternalDefaultLiveMap: Sendable {
950932

951933
// RTO4b2a
952934
let mapUpdate = DefaultLiveMapUpdate(update: previousData.mapValues { _ in .removed })
953-
liveObjectMutableState.emit(.update(mapUpdate), on: userCallbackQueue)
935+
liveObjectMutableState.nosync_emit(.update(mapUpdate), on: userCallbackQueue)
954936
}
955937

956938
/// Needed for ``InternalLiveObject`` conformance.

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 27 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
135135
userCallbackQueue: userCallbackQueue,
136136
clock: clock,
137137
),
138+
internalQueue: internalQueue,
138139
garbageCollectionGracePeriod: garbageCollectionOptions.gracePeriod,
139140
),
140141
)
@@ -322,40 +323,21 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
322323
@discardableResult
323324
internal func on(event: ObjectsEvent, callback: @escaping ObjectsEventCallback) -> any OnObjectsEventResponse {
324325
mutableStateMutex.withSync { mutableState in
325-
// swiftlint:disable:next trailing_closure
326-
mutableState.on(event: event, callback: callback, updateSelfLater: { [weak self] action in
327-
guard let self else {
328-
return
329-
}
330-
331-
mutableStateMutex.withSync { mutableState in
332-
action(&mutableState)
333-
}
334-
})
326+
mutableState.nosync_on(event: event, callback: callback)
335327
}
336328
}
337329

338330
/// Adds a subscriber to the ``internalObjectsEventSubscriptionStorage`` (i.e. unaffected by `offAll()`).
339331
@discardableResult
340332
internal func onInternal(event: ObjectsEvent, callback: @escaping ObjectsEventCallback) -> any OnObjectsEventResponse {
341-
// TODO: Looking at this again later the whole process for adding a subscriber is really verbose and boilerplate-y, and I think the unfortunate result of me trying to be clever at some point; revisit in https://github.com/ably/ably-liveobjects-swift-plugin/issues/102
342333
mutableStateMutex.withSync { mutableState in
343-
// swiftlint:disable:next trailing_closure
344-
mutableState.onInternal(event: event, callback: callback, updateSelfLater: { [weak self] action in
345-
guard let self else {
346-
return
347-
}
348-
349-
mutableStateMutex.withSync { mutableState in
350-
action(&mutableState)
351-
}
352-
})
334+
mutableState.nosync_onInternal(event: event, callback: callback)
353335
}
354336
}
355337

356338
internal func offAll() {
357339
mutableStateMutex.withSync { mutableState in
358-
mutableState.offAll()
340+
mutableState.nosync_offAll()
359341
}
360342
}
361343

@@ -643,10 +625,10 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
643625
private struct MutableState {
644626
internal var objectsPool: ObjectsPool
645627
internal var onChannelAttachedHasObjects: Bool?
646-
internal var objectsEventSubscriptionStorage = SubscriptionStorage<ObjectsEvent, Void>()
628+
internal let objectsEventSubscriptionStorage: SubscriptionStorage<ObjectsEvent, Void>
647629

648630
/// Used when the object wishes to subscribe to its own events (i.e. unaffected by `offAll()`); used e.g. to wait for a sync before returning from `getRoot()`, per RTO1c.
649-
internal var internalObjectsEventSubscriptionStorage = SubscriptionStorage<ObjectsEvent, Void>()
631+
internal let internalObjectsEventSubscriptionStorage: SubscriptionStorage<ObjectsEvent, Void>
650632

651633
/// The RTO10b grace period for which we will retain tombstoned objects and map entries.
652634
internal var garbageCollectionGracePeriod: GarbageCollectionOptions.GracePeriod
@@ -678,6 +660,17 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
678660
/// The RTO17 sync state. Also stores the sync sequence data.
679661
internal var state = State.initialized
680662

663+
init(
664+
objectsPool: ObjectsPool,
665+
internalQueue: DispatchQueue,
666+
garbageCollectionGracePeriod: GarbageCollectionOptions.GracePeriod,
667+
) {
668+
self.objectsPool = objectsPool
669+
self.garbageCollectionGracePeriod = garbageCollectionGracePeriod
670+
objectsEventSubscriptionStorage = SubscriptionStorage(internalQueue: internalQueue)
671+
internalObjectsEventSubscriptionStorage = SubscriptionStorage(internalQueue: internalQueue)
672+
}
673+
681674
/// Has the same cases as `ObjectsSyncState` but with associated data to store the sync sequence data and represent the constraint that you only have a sync sequence if you're SYNCING.
682675
internal enum State {
683676
case initialized
@@ -726,7 +719,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
726719
return
727720
}
728721
// RTO17b
729-
emitObjectsEvent(event, on: userCallbackQueue)
722+
nosync_emitObjectsEvent(event, on: userCallbackQueue)
730723
}
731724

732725
internal mutating func nosync_onChannelAttached(
@@ -1018,45 +1011,28 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
10181011
}
10191012
}
10201013

1021-
internal typealias UpdateMutableState = @Sendable (_ action: (inout Self) -> Void) -> Void
1022-
10231014
@discardableResult
1024-
internal mutating func on(event: ObjectsEvent, callback: @escaping ObjectsEventCallback, updateSelfLater: @escaping UpdateMutableState) -> any OnObjectsEventResponse {
1025-
let updateSubscriptionStorage: SubscriptionStorage<ObjectsEvent, Void>.UpdateSubscriptionStorage = { action in
1026-
updateSelfLater { mutableState in
1027-
action(&mutableState.objectsEventSubscriptionStorage)
1028-
}
1029-
}
1030-
1031-
let subscription = objectsEventSubscriptionStorage.subscribe(
1015+
internal func nosync_on(event: ObjectsEvent, callback: @escaping ObjectsEventCallback) -> any OnObjectsEventResponse {
1016+
let subscription = objectsEventSubscriptionStorage.nosync_subscribe(
10321017
listener: { _, subscriptionInCallback in
10331018
let response = ObjectsEventResponse(subscription: subscriptionInCallback)
10341019
callback(response)
10351020
},
10361021
eventName: event,
1037-
updateSelfLater: updateSubscriptionStorage,
10381022
)
10391023

10401024
return ObjectsEventResponse(subscription: subscription)
10411025
}
10421026

10431027
/// Adds a subscriber to the ``internalObjectsEventSubscriptionStorage`` (i.e. unaffected by `offAll()`).
10441028
@discardableResult
1045-
internal mutating func onInternal(event: ObjectsEvent, callback: @escaping ObjectsEventCallback, updateSelfLater: @escaping UpdateMutableState) -> any OnObjectsEventResponse {
1046-
// TODO: Looking at this again later the whole process for adding a subscriber is really verbose and boilerplate-y, and I think the unfortunate result of me trying to be clever at some point; revisit in https://github.com/ably/ably-liveobjects-swift-plugin/issues/102. Also as things stand we end up not being able to use this method because we run into Swift exclusivity violations when we try to unsubscribe from within a listener that's invoked when the mutable state mutex is already held (see https://github.com/ably/ably-liveobjects-swift-plugin/issues/120), so e.g. the RTO20 wait-for-synced can't use this mechanism, which it should be able to.
1047-
let updateSubscriptionStorage: SubscriptionStorage<ObjectsEvent, Void>.UpdateSubscriptionStorage = { action in
1048-
updateSelfLater { mutableState in
1049-
action(&mutableState.internalObjectsEventSubscriptionStorage)
1050-
}
1051-
}
1052-
1053-
let subscription = internalObjectsEventSubscriptionStorage.subscribe(
1029+
internal func nosync_onInternal(event: ObjectsEvent, callback: @escaping ObjectsEventCallback) -> any OnObjectsEventResponse {
1030+
let subscription = internalObjectsEventSubscriptionStorage.nosync_subscribe(
10541031
listener: { _, subscriptionInCallback in
10551032
let response = ObjectsEventResponse(subscription: subscriptionInCallback)
10561033
callback(response)
10571034
},
10581035
eventName: event,
1059-
updateSelfLater: updateSubscriptionStorage,
10601036
)
10611037

10621038
return ObjectsEventResponse(subscription: subscription)
@@ -1071,13 +1047,13 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
10711047
}
10721048
}
10731049

1074-
internal mutating func offAll() {
1075-
objectsEventSubscriptionStorage.unsubscribeAll()
1050+
internal func nosync_offAll() {
1051+
objectsEventSubscriptionStorage.nosync_unsubscribeAll()
10761052
}
10771053

1078-
internal func emitObjectsEvent(_ event: ObjectsEvent, on queue: DispatchQueue) {
1079-
objectsEventSubscriptionStorage.emit(eventName: event, on: queue)
1080-
internalObjectsEventSubscriptionStorage.emit(eventName: event, on: queue)
1054+
internal func nosync_emitObjectsEvent(_ event: ObjectsEvent, on queue: DispatchQueue) {
1055+
objectsEventSubscriptionStorage.nosync_emit(eventName: event, on: queue)
1056+
internalObjectsEventSubscriptionStorage.nosync_emit(eventName: event, on: queue)
10811057
}
10821058
}
10831059
}

Sources/AblyLiveObjects/Internal/InternalLiveObject.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ internal extension InternalLiveObject {
3737
// Emit the deleted lifecycle event
3838
// Taken from https://github.com/ably/ably-js/blob/e280bff11a4a7627362c5185e764b7ebd0490570/src/plugins/objects/liveobject.ts#L168
3939
// TODO: Bring in line with spec once it exists (https://github.com/ably/ably-liveobjects-swift-plugin/issues/77)
40-
liveObjectMutableState.emitLifecycleEvent(.deleted, on: userCallbackQueue)
40+
liveObjectMutableState.nosync_emitLifecycleEvent(.deleted, on: userCallbackQueue)
4141
}
4242

4343
/// Applies an `OBJECT_DELETE` operation, per RTLO5.

0 commit comments

Comments
 (0)