Skip to content

Commit 6b70d37

Browse files
Replace @mainactor isolation with DispatchQueueMutex in EventEmitter
The liveobjects plugin uses DispatchQueue-based concurrency, not @mainactor. Adapt the EventEmitter (copied from ably-swift) to match: - Remove @mainactor from all protocols and classes - Add Sendable conformance throughout - Store mutable state in DispatchQueueMutex - Prefix all methods with nosync_ (must be called on internal queue) - Rename typealiases: MainActorEventListener → EventListener, MainActorNamedEventListener → NamedEventListener (now @sendable) - SubscriptionController takes internalQueue in init, uses DispatchQueueMutex for its registrations - DefaultInternalEventEmitter snapshots listeners inside withoutSync, calls them outside to avoid exclusivity violations on re-entry - Update tests to use ably_syncNoDeadlock and nosync_ methods Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ef66240 commit 6b70d37

4 files changed

Lines changed: 628 additions & 519 deletions

File tree

Sources/AblyLiveObjects/Internal/DefaultInternalEventEmitter.swift

Lines changed: 89 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,147 +2,178 @@ import Foundation
22

33
/// An implementation of ``InternalEventEmitter``.
44
/// Conforms to the RTE specification for EventEmitter behavior.
5-
@MainActor
6-
internal class DefaultInternalEventEmitter<Event: Hashable, Data>: InternalEventEmitter {
7-
// Storage for listener registrations
8-
private var allEventListeners: [ListenerRegistration<MainActorEventListener<Event, Data>>] = []
9-
private var namedEventListeners: [Event: [ListenerRegistration<MainActorNamedEventListener<Data>>]] = [:]
5+
///
6+
/// All `nosync_` methods must be called on the internal queue (enforced at runtime).
7+
internal final class DefaultInternalEventEmitter<Event: Hashable & Sendable, Data: Sendable>: InternalEventEmitter, Sendable {
8+
private let mutableStateMutex: DispatchQueueMutex<MutableState>
9+
10+
internal init(internalQueue: DispatchQueue) {
11+
mutableStateMutex = .init(dispatchQueue: internalQueue, initialValue: .init())
12+
}
13+
14+
// MARK: - MutableState
15+
16+
private struct MutableState {
17+
var allEventListeners: [ListenerRegistration<EventListener<Event, Data>>] = []
18+
var namedEventListeners: [Event: [ListenerRegistration<NamedEventListener<Data>>]] = [:]
19+
}
1020

1121
// MARK: - EventEmitter conformance
1222

1323
/// RTE3: Registers listener for all events
14-
internal func on(_ listener: @escaping MainActorEventListener<Event, Data>) {
24+
internal func nosync_on(_ listener: @escaping EventListener<Event, Data>) {
1525
let registration = ListenerRegistration(listener: listener, once: false)
16-
allEventListeners.append(registration)
26+
mutableStateMutex.withoutSync { state in
27+
state.allEventListeners.append(registration)
28+
}
1729
}
1830

1931
/// RTE3: Registers listener for all events with signal
20-
internal func on(signalledBy signal: SubscriptionController.Signal, _ listener: @escaping MainActorEventListener<Event, Data>) {
32+
internal func nosync_on(signalledBy signal: SubscriptionController.Signal, _ listener: @escaping EventListener<Event, Data>) {
2133
let registration = ListenerRegistration(listener: listener, once: false)
22-
allEventListeners.append(registration)
34+
mutableStateMutex.withoutSync { state in
35+
state.allEventListeners.append(registration)
36+
}
2337

24-
// Register this registration with the controller so it can remove it when off() is called
25-
signal.controller?.addRegistration(self, registrationId: registration.id)
38+
// Register this registration with the controller so it can remove it when nosync_off() is called
39+
signal.controller?.nosync_addRegistration(self, registrationId: registration.id)
2640
}
2741

2842
/// RTE3: Registers listener for specific event
29-
internal func on(_ event: Event, _ listener: @escaping MainActorNamedEventListener<Data>) {
43+
internal func nosync_on(_ event: Event, _ listener: @escaping NamedEventListener<Data>) {
3044
let registration = ListenerRegistration(listener: listener, once: false)
31-
namedEventListeners[event, default: []].append(registration)
45+
mutableStateMutex.withoutSync { state in
46+
state.namedEventListeners[event, default: []].append(registration)
47+
}
3248
}
3349

3450
/// RTE3: Registers listener for specific event with signal
35-
internal func on(_ event: Event, signalledBy signal: SubscriptionController.Signal, _ listener: @escaping MainActorNamedEventListener<Data>) {
51+
internal func nosync_on(_ event: Event, signalledBy signal: SubscriptionController.Signal, _ listener: @escaping NamedEventListener<Data>) {
3652
let registration = ListenerRegistration(listener: listener, once: false)
37-
namedEventListeners[event, default: []].append(registration)
53+
mutableStateMutex.withoutSync { state in
54+
state.namedEventListeners[event, default: []].append(registration)
55+
}
3856

3957
// Register this registration with the controller
40-
signal.controller?.addRegistration(self, registrationId: registration.id)
58+
signal.controller?.nosync_addRegistration(self, registrationId: registration.id)
4159
}
4260

4361
/// RTE4: Registers one-time listener for all events
44-
internal func once(_ listener: @escaping MainActorEventListener<Event, Data>) {
62+
internal func nosync_once(_ listener: @escaping EventListener<Event, Data>) {
4563
let registration = ListenerRegistration(listener: listener, once: true)
46-
allEventListeners.append(registration)
64+
mutableStateMutex.withoutSync { state in
65+
state.allEventListeners.append(registration)
66+
}
4767
}
4868

4969
/// RTE4: Registers one-time listener for all events with signal
50-
internal func once(signalledBy signal: SubscriptionController.Signal, _ listener: @escaping MainActorEventListener<Event, Data>) {
70+
internal func nosync_once(signalledBy signal: SubscriptionController.Signal, _ listener: @escaping EventListener<Event, Data>) {
5171
let registration = ListenerRegistration(listener: listener, once: true)
52-
allEventListeners.append(registration)
72+
mutableStateMutex.withoutSync { state in
73+
state.allEventListeners.append(registration)
74+
}
5375

5476
// Register this registration with the controller
55-
signal.controller?.addRegistration(self, registrationId: registration.id)
77+
signal.controller?.nosync_addRegistration(self, registrationId: registration.id)
5678
}
5779

5880
/// RTE4: Registers one-time listener for specific event
59-
internal func once(_ event: Event, _ listener: @escaping MainActorNamedEventListener<Data>) {
81+
internal func nosync_once(_ event: Event, _ listener: @escaping NamedEventListener<Data>) {
6082
let registration = ListenerRegistration(listener: listener, once: true)
61-
namedEventListeners[event, default: []].append(registration)
83+
mutableStateMutex.withoutSync { state in
84+
state.namedEventListeners[event, default: []].append(registration)
85+
}
6286
}
6387

6488
/// RTE4: Registers one-time listener for specific event with signal
65-
internal func once(_ event: Event, signalledBy signal: SubscriptionController.Signal, _ listener: @escaping MainActorNamedEventListener<Data>) {
89+
internal func nosync_once(_ event: Event, signalledBy signal: SubscriptionController.Signal, _ listener: @escaping NamedEventListener<Data>) {
6690
let registration = ListenerRegistration(listener: listener, once: true)
67-
namedEventListeners[event, default: []].append(registration)
91+
mutableStateMutex.withoutSync { state in
92+
state.namedEventListeners[event, default: []].append(registration)
93+
}
6894

6995
// Register this registration with the controller
70-
signal.controller?.addRegistration(self, registrationId: registration.id)
96+
signal.controller?.nosync_addRegistration(self, registrationId: registration.id)
7197
}
7298

7399
/// RTE5: Removes all listeners
74-
internal func off() {
75-
allEventListeners.removeAll()
76-
namedEventListeners.removeAll()
100+
internal func nosync_off() {
101+
mutableStateMutex.withoutSync { state in
102+
state.allEventListeners.removeAll()
103+
state.namedEventListeners.removeAll()
104+
}
77105
}
78106

79107
/// RTE5: Removes all listeners for a specific event
80-
internal func off(_ event: Event) {
81-
namedEventListeners.removeValue(forKey: event)
108+
internal func nosync_off(_ event: Event) {
109+
mutableStateMutex.withoutSync { state in
110+
_ = state.namedEventListeners.removeValue(forKey: event)
111+
}
82112
}
83113

84114
// MARK: - InternalEventEmitter conformance
85115

86116
/// RTE6: Emits an event, calling registered listeners
87117
/// RTE6a: The set of listeners must not change during emit
88-
internal func emit(event: Event, data: Data) {
89-
// Create snapshots to ensure RTE6a compliance - listeners called during emit don't affect this invocation
90-
let allEventListenersSnapshot = allEventListeners
91-
let namedEventListenersSnapshot = namedEventListeners[event] ?? []
118+
internal func nosync_emit(event: Event, data: Data) {
119+
// Take snapshots inside withoutSync to ensure RTE6a compliance
120+
let (allSnapshot, namedSnapshot) = mutableStateMutex.withoutSync { state -> ([ListenerRegistration<EventListener<Event, Data>>], [ListenerRegistration<NamedEventListener<Data>>]) in
121+
let allListeners = state.allEventListeners
122+
let namedListeners = state.namedEventListeners[event] ?? []
123+
return (allListeners, namedListeners)
124+
}
92125

93-
// Collect all listener IDs that need to be removed
126+
// Collect all once-listener IDs that need to be removed
94127
var idsToRemove: [UUID] = []
95128

96-
// Call all-event listeners
97-
for registration in allEventListenersSnapshot {
98-
// Call listener per RTE6
129+
// Call all-event listeners from the snapshot (outside withoutSync to avoid exclusivity violation if a listener re-enters the emitter)
130+
for registration in allSnapshot {
99131
registration.listener(event, data)
100132

101-
// Mark for removal if it was a once listener
102133
if registration.once {
103134
idsToRemove.append(registration.id)
104135
}
105136
}
106137

107-
// Call named event listeners
108-
for registration in namedEventListenersSnapshot {
109-
// Call listener per RTE6
138+
// Call named event listeners from the snapshot
139+
for registration in namedSnapshot {
110140
registration.listener(data)
111141

112-
// Mark for removal if it was a once listener
113142
if registration.once {
114143
idsToRemove.append(registration.id)
115144
}
116145
}
117146

118147
// Remove all once listeners in one pass
119148
for id in idsToRemove {
120-
removeRegistration(id: id)
149+
nosync_removeRegistration(id: id)
121150
}
122151
}
123152

124153
// MARK: - Registration removal (called internally and by SubscriptionController)
125154

126-
internal func removeRegistration(id: UUID) {
127-
// Remove from all-event listeners
128-
allEventListeners.removeAll { registration in
129-
registration.id == id
130-
}
131-
132-
// Remove from named-event listeners
133-
for (event, listeners) in namedEventListeners {
134-
namedEventListeners[event] = listeners.filter { registration in
135-
registration.id != id
155+
internal func nosync_removeRegistration(id: UUID) {
156+
mutableStateMutex.withoutSync { state in
157+
// Remove from all-event listeners
158+
state.allEventListeners.removeAll { registration in
159+
registration.id == id
136160
}
137-
if namedEventListeners[event]?.isEmpty == true {
138-
namedEventListeners.removeValue(forKey: event)
161+
162+
// Remove from named-event listeners
163+
for (event, listeners) in state.namedEventListeners {
164+
state.namedEventListeners[event] = listeners.filter { registration in
165+
registration.id != id
166+
}
167+
if state.namedEventListeners[event]?.isEmpty == true {
168+
_ = state.namedEventListeners.removeValue(forKey: event)
169+
}
139170
}
140171
}
141172
}
142173

143174
// MARK: - Private Types
144175

145-
private struct ListenerRegistration<Listener> {
176+
private struct ListenerRegistration<Listener>: Identifiable {
146177
let id = UUID()
147178
let listener: Listener
148179
let once: Bool

Sources/AblyLiveObjects/Internal/EventEmitter.swift

Lines changed: 48 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,108 @@
11
import Foundation
22

3-
public typealias MainActorEventListener<Event, Data> = @MainActor (Event, Data) -> Void
4-
public typealias MainActorNamedEventListener<Event> = @MainActor (Event) -> Void
3+
internal typealias EventListener<Event, Data> = @Sendable (Event, Data) -> Void
4+
internal typealias NamedEventListener<Event> = @Sendable (Event) -> Void
55

6-
@MainActor
76
internal protocol EventEmitter<Event, Data>: AnyObject {
87
associatedtype Event
98
associatedtype Data
109
associatedtype Signal
1110

12-
func on(_ listener: @escaping MainActorEventListener<Event, Data>)
13-
func on(signalledBy signal: Signal, _ listener: @escaping MainActorEventListener<Event, Data>)
11+
func nosync_on(_ listener: @escaping EventListener<Event, Data>)
12+
func nosync_on(signalledBy signal: Signal, _ listener: @escaping EventListener<Event, Data>)
1413

15-
func on(_ event: Event, _ listener: @escaping MainActorNamedEventListener<Data>)
16-
func on(_ event: Event, signalledBy signal: Signal, _ listener: @escaping MainActorNamedEventListener<Data>)
14+
func nosync_on(_ event: Event, _ listener: @escaping NamedEventListener<Data>)
15+
func nosync_on(_ event: Event, signalledBy signal: Signal, _ listener: @escaping NamedEventListener<Data>)
1716

18-
func once(_ listener: @escaping MainActorEventListener<Event, Data>)
19-
func once(signalledBy signal: Signal, _ listener: @escaping MainActorEventListener<Event, Data>)
17+
func nosync_once(_ listener: @escaping EventListener<Event, Data>)
18+
func nosync_once(signalledBy signal: Signal, _ listener: @escaping EventListener<Event, Data>)
2019

21-
func once(_ event: Event, _ listener: @escaping MainActorNamedEventListener<Data>)
22-
func once(_ event: Event, signalledBy signal: Signal, _ listener: @escaping MainActorNamedEventListener<Data>)
20+
func nosync_once(_ event: Event, _ listener: @escaping NamedEventListener<Data>)
21+
func nosync_once(_ event: Event, signalledBy signal: Signal, _ listener: @escaping NamedEventListener<Data>)
2322

24-
func off()
25-
func off(_ event: Event)
23+
func nosync_off()
24+
func nosync_off(_ event: Event)
2625
}
2726

2827
// Note: We _have_ to do something other than the off(listener:) that the IDL gives, because closures don't have identity in Swift. I've gone for this approach, instead of the return value used in chat-js and LiveObjects, because it makes it easy to unsubscribe from _within_ the closure, which happens e.g. if you want to listen for one of various state changes and then unsubscribe. The "controller" and "signal" language was taken from the AbortController used in the Web's `fetch()` API.
2928

30-
/// A subscription controller's `signal` can be passed to `EventEmitter`'s `on` or `once` methods. If you call `off()` on the controller then the listener will no longer be called.
29+
/// A subscription controller's `signal` can be passed to `EventEmitter`'s `nosync_on` or `nosync_once` methods. If you call `nosync_off()` on the controller then the listener will no longer be called.
3130
///
32-
/// - Note: Subscription lifetime is independent of that of the controller. That is, if you relinquish all references to a controller then the listener will still be called. Only calling `off()` (on the controller or on the `EventEmitter`) will end the subscription.
33-
@MainActor
34-
public protocol SubscriptionControllerProtocol {
31+
/// - Note: Subscription lifetime is independent of that of the controller. That is, if you relinquish all references to a controller then the listener will still be called. Only calling `nosync_off()` (on the controller or on the `EventEmitter`) will end the subscription.
32+
internal protocol SubscriptionControllerProtocol {
3533
associatedtype Signal
3634

3735
var signal: Signal { get }
3836

39-
/// Cancels any subscriptions for which this controller's signal was used. The listener that was passed to `on` or `once` will not be called again.
37+
/// Cancels any subscriptions for which this controller's signal was used. The listener that was passed to `nosync_on` or `nosync_once` will not be called again.
4038
///
4139
/// Calling this method will not affect any future subscriptions that use the same signal.
42-
func off()
40+
func nosync_off()
4341
}
4442

4543
/// The `SubscriptionControllerProtocol` implementation used by the SDK.
46-
@MainActor
47-
public final class SubscriptionController: SubscriptionControllerProtocol {
48-
public init() {
49-
signal = Signal()
44+
internal final class SubscriptionController: SubscriptionControllerProtocol, Sendable {
45+
internal init(internalQueue: DispatchQueue) {
46+
let signal = Signal()
47+
self.signal = signal
48+
registrations = DispatchQueueMutex(dispatchQueue: internalQueue, initialValue: [])
5049
signal.controller = self
5150
}
5251

53-
public class Signal {
52+
internal final class Signal: Sendable {
5453
internal let id = UUID()
55-
// Store weak reference to the controller that owns this signal
56-
internal weak var controller: SubscriptionController?
54+
// Store weak reference to the controller that owns this signal.
55+
// Safe because all access occurs on the internal queue.
56+
internal nonisolated(unsafe) weak var controller: SubscriptionController?
5757
}
5858

59-
public let signal: Signal
59+
internal let signal: Signal
6060

6161
// Store registrations that this controller manages
62-
private var registrations: [Registration] = []
62+
private let registrations: DispatchQueueMutex<[Registration]>
6363

64-
public func off() {
65-
// Remove all registrations that this controller created
66-
for registration in registrations {
67-
registration.removeFromEmitter()
64+
internal func nosync_off() {
65+
registrations.withoutSync { registrations in
66+
// Remove all registrations that this controller created
67+
for registration in registrations {
68+
registration.removeFromEmitter()
69+
}
70+
registrations.removeAll()
6871
}
69-
registrations.removeAll()
7072
}
7173

72-
internal func addRegistration(_ emitter: DefaultInternalEventEmitter<some Hashable, some Any>, registrationId: UUID) {
73-
let registration = Registration(emitter: emitter, registrationId: registrationId)
74-
registrations.append(registration)
74+
internal func nosync_addRegistration(_ emitter: DefaultInternalEventEmitter<some Hashable & Sendable, some Sendable>, registrationId: UUID) {
75+
registrations.withoutSync { registrations in
76+
let registration = Registration(emitter: emitter, registrationId: registrationId)
77+
registrations.append(registration)
7578

76-
// Clean up deallocated emitters periodically
77-
registrations.removeAll { registration in
78-
registration.isEmitterDeallocated
79+
// Clean up deallocated emitters periodically
80+
registrations.removeAll { registration in
81+
registration.isEmitterDeallocated
82+
}
7983
}
8084
}
8185

8286
// MARK: - Private Types
8387

84-
private struct Registration {
88+
private struct Registration: @unchecked Sendable {
8589
private weak var emitter: (any AnyObject)?
86-
private let emitterRemoveMethod: @MainActor (UUID) -> Void
90+
private let emitterRemoveMethod: @Sendable (UUID) -> Void
8791
private let registrationId: UUID
8892

89-
init(emitter: DefaultInternalEventEmitter<some Hashable, some Any>, registrationId: UUID) {
93+
init(emitter: DefaultInternalEventEmitter<some Hashable & Sendable, some Sendable>, registrationId: UUID) {
9094
self.emitter = emitter
9195
self.registrationId = registrationId
9296
// Capture the remove method to avoid protocol overhead
93-
emitterRemoveMethod = { @MainActor [weak emitter] id in
94-
emitter?.removeRegistration(id: id)
97+
emitterRemoveMethod = { [weak emitter] id in
98+
emitter?.nosync_removeRegistration(id: id)
9599
}
96100
}
97101

98102
var isEmitterDeallocated: Bool {
99103
emitter == nil
100104
}
101105

102-
@MainActor
103106
func removeFromEmitter() {
104107
emitterRemoveMethod(registrationId)
105108
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/// An extension of ``EventEmitter`` for use inside the codebase. It represents an `EventEmitter` that can be instructed to emit an event.
22
internal protocol InternalEventEmitter<Event, Data>: EventEmitter {
3-
func emit(event: Event, data: Data)
3+
func nosync_emit(event: Event, data: Data)
44
}
55

66
// (Note: I wonder whether having a mock InternalEventEmitter might be useful sometimes, when all you care about is whether a given event was emitted without having to go through the palaver of getting this data back out using a callback.)

0 commit comments

Comments
 (0)