Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions Sources/AblyLiveObjects/Internal/DefaultInternalEventEmitter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import Foundation

/// An implementation of ``InternalEventEmitter``.
/// Conforms to the RTE specification for EventEmitter behavior.
///
/// All `nosync_` methods must be called on the internal queue (enforced at runtime).
internal final class DefaultInternalEventEmitter<Event: Hashable & Sendable, Data: Sendable>: InternalEventEmitter, Sendable {
private let mutableStateMutex: DispatchQueueMutex<MutableState>

internal init(internalQueue: DispatchQueue) {
mutableStateMutex = .init(dispatchQueue: internalQueue, initialValue: .init())
}

// MARK: - MutableState

private struct MutableState {
var allEventListeners: [ListenerRegistration<EventListener<Event, Data>>] = []
var namedEventListeners: [Event: [ListenerRegistration<NamedEventListener<Data>>]] = [:]
}

// MARK: - EventEmitter conformance

/// RTE3: Registers listener for all events
internal func nosync_on(_ listener: @escaping EventListener<Event, Data>) {
let registration = ListenerRegistration(listener: listener, once: false)
mutableStateMutex.withoutSync { state in
state.allEventListeners.append(registration)
}
}

/// RTE3: Registers listener for all events with signal
internal func nosync_on(signalledBy signal: SubscriptionController.Signal, _ listener: @escaping EventListener<Event, Data>) {
let registration = ListenerRegistration(listener: listener, once: false)
mutableStateMutex.withoutSync { state in
state.allEventListeners.append(registration)
}

// Register this registration with the controller so it can remove it when nosync_off() is called
signal.controller?.nosync_addRegistration(self, registrationId: registration.id)
}

/// RTE3: Registers listener for specific event
internal func nosync_on(_ event: Event, _ listener: @escaping NamedEventListener<Data>) {
let registration = ListenerRegistration(listener: listener, once: false)
mutableStateMutex.withoutSync { state in
state.namedEventListeners[event, default: []].append(registration)
}
}

/// RTE3: Registers listener for specific event with signal
internal func nosync_on(_ event: Event, signalledBy signal: SubscriptionController.Signal, _ listener: @escaping NamedEventListener<Data>) {
let registration = ListenerRegistration(listener: listener, once: false)
mutableStateMutex.withoutSync { state in
state.namedEventListeners[event, default: []].append(registration)
}

// Register this registration with the controller
signal.controller?.nosync_addRegistration(self, registrationId: registration.id)
}

/// RTE4: Registers one-time listener for all events
internal func nosync_once(_ listener: @escaping EventListener<Event, Data>) {
let registration = ListenerRegistration(listener: listener, once: true)
mutableStateMutex.withoutSync { state in
state.allEventListeners.append(registration)
}
}

/// RTE4: Registers one-time listener for all events with signal
internal func nosync_once(signalledBy signal: SubscriptionController.Signal, _ listener: @escaping EventListener<Event, Data>) {
let registration = ListenerRegistration(listener: listener, once: true)
mutableStateMutex.withoutSync { state in
state.allEventListeners.append(registration)
}

// Register this registration with the controller
signal.controller?.nosync_addRegistration(self, registrationId: registration.id)
}

/// RTE4: Registers one-time listener for specific event
internal func nosync_once(_ event: Event, _ listener: @escaping NamedEventListener<Data>) {
let registration = ListenerRegistration(listener: listener, once: true)
mutableStateMutex.withoutSync { state in
state.namedEventListeners[event, default: []].append(registration)
}
}

/// RTE4: Registers one-time listener for specific event with signal
internal func nosync_once(_ event: Event, signalledBy signal: SubscriptionController.Signal, _ listener: @escaping NamedEventListener<Data>) {
let registration = ListenerRegistration(listener: listener, once: true)
mutableStateMutex.withoutSync { state in
state.namedEventListeners[event, default: []].append(registration)
}

// Register this registration with the controller
signal.controller?.nosync_addRegistration(self, registrationId: registration.id)
}

/// RTE5: Removes all listeners
internal func nosync_off() {
mutableStateMutex.withoutSync { state in
state.allEventListeners.removeAll()
state.namedEventListeners.removeAll()
}
}

/// RTE5: Removes all listeners for a specific event
internal func nosync_off(_ event: Event) {
mutableStateMutex.withoutSync { state in
_ = state.namedEventListeners.removeValue(forKey: event)
}
}

// MARK: - InternalEventEmitter conformance

/// RTE6: Emits an event, calling registered listeners
/// RTE6a: The set of listeners must not change during emit
internal func nosync_emit(event: Event, data: Data) {
// Take snapshots inside withoutSync to ensure RTE6a compliance
let (allSnapshot, namedSnapshot) = mutableStateMutex.withoutSync { state -> ([ListenerRegistration<EventListener<Event, Data>>], [ListenerRegistration<NamedEventListener<Data>>]) in
let allListeners = state.allEventListeners
let namedListeners = state.namedEventListeners[event] ?? []
return (allListeners, namedListeners)
}

// Collect all once-listener IDs that need to be removed
var idsToRemove: [UUID] = []

// Call all-event listeners from the snapshot (outside withoutSync to avoid exclusivity violation if a listener re-enters the emitter)
for registration in allSnapshot {
registration.listener(event, data)

if registration.once {
idsToRemove.append(registration.id)
}
}

// Call named event listeners from the snapshot
for registration in namedSnapshot {
registration.listener(data)

if registration.once {
idsToRemove.append(registration.id)
}
}

// Remove all once listeners in one pass
for id in idsToRemove {
nosync_removeRegistration(id: id)
}
}

// MARK: - Registration removal (called internally and by SubscriptionController)

internal func nosync_removeRegistration(id: UUID) {
mutableStateMutex.withoutSync { state in
// Remove from all-event listeners
state.allEventListeners.removeAll { registration in
registration.id == id
}

// Remove from named-event listeners
for (event, listeners) in state.namedEventListeners {
state.namedEventListeners[event] = listeners.filter { registration in
registration.id != id
}
if state.namedEventListeners[event]?.isEmpty == true {
_ = state.namedEventListeners.removeValue(forKey: event)
}
}
}
}

// MARK: - Private Types

private struct ListenerRegistration<Listener>: Identifiable {
let id = UUID()
let listener: Listener
let once: Bool

init(listener: Listener, once: Bool) {
self.listener = listener
self.once = once
}
}
}
110 changes: 110 additions & 0 deletions Sources/AblyLiveObjects/Internal/EventEmitter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import Foundation

internal typealias EventListener<Event, Data> = @Sendable (Event, Data) -> Void
internal typealias NamedEventListener<Event> = @Sendable (Event) -> Void

internal protocol EventEmitter<Event, Data>: AnyObject {
associatedtype Event
associatedtype Data
associatedtype Signal

func nosync_on(_ listener: @escaping EventListener<Event, Data>)
func nosync_on(signalledBy signal: Signal, _ listener: @escaping EventListener<Event, Data>)

func nosync_on(_ event: Event, _ listener: @escaping NamedEventListener<Data>)
func nosync_on(_ event: Event, signalledBy signal: Signal, _ listener: @escaping NamedEventListener<Data>)

func nosync_once(_ listener: @escaping EventListener<Event, Data>)
func nosync_once(signalledBy signal: Signal, _ listener: @escaping EventListener<Event, Data>)

func nosync_once(_ event: Event, _ listener: @escaping NamedEventListener<Data>)
func nosync_once(_ event: Event, signalledBy signal: Signal, _ listener: @escaping NamedEventListener<Data>)

func nosync_off()
func nosync_off(_ event: Event)
}

// Note: We _have_ to do something other than the off(listener:) that the IDL gives, because closures don't have identity in Swift. I've gone for this approach, instead of the return value used in chat-js and LiveObjects, because it makes it easy to unsubscribe from _within_ the closure, which happens e.g. if you want to listen for one of various state changes and then unsubscribe. The "controller" and "signal" language was taken from the AbortController used in the Web's `fetch()` API.

/// A subscription controller's `signal` can be passed to `EventEmitter`'s `nosync_on` or `nosync_once` methods. If you call `nosync_off()` on the controller then the listener will no longer be called.
///
/// - Note: Subscription lifetime is independent of that of the controller. That is, if you relinquish all references to a controller then the listener will still be called. Only calling `nosync_off()` (on the controller or on the `EventEmitter`) will end the subscription.
internal protocol SubscriptionControllerProtocol {
associatedtype Signal

var signal: Signal { get }

/// Cancels any subscriptions for which this controller's signal was used. The listener that was passed to `nosync_on` or `nosync_once` will not be called again.
///
/// Calling this method will not affect any future subscriptions that use the same signal.
func nosync_off()
}

/// The `SubscriptionControllerProtocol` implementation used by the SDK.
internal final class SubscriptionController: SubscriptionControllerProtocol, Sendable {
internal init(internalQueue: DispatchQueue) {
let signal = Signal()
self.signal = signal
registrations = DispatchQueueMutex(dispatchQueue: internalQueue, initialValue: [])
signal.controller = self
}

internal final class Signal: Sendable {
internal let id = UUID()
// Store weak reference to the controller that owns this signal.
// Safe because all access occurs on the internal queue.
internal nonisolated(unsafe) weak var controller: SubscriptionController?
}

internal let signal: Signal

// Store registrations that this controller manages
private let registrations: DispatchQueueMutex<[Registration]>

internal func nosync_off() {
registrations.withoutSync { registrations in
// Remove all registrations that this controller created
for registration in registrations {
registration.removeFromEmitter()
}
registrations.removeAll()
}
}

internal func nosync_addRegistration(_ emitter: DefaultInternalEventEmitter<some Hashable & Sendable, some Sendable>, registrationId: UUID) {
registrations.withoutSync { registrations in
let registration = Registration(emitter: emitter, registrationId: registrationId)
registrations.append(registration)

// Clean up deallocated emitters periodically
registrations.removeAll { registration in
registration.isEmitterDeallocated
}
}
}

// MARK: - Private Types

private struct Registration: @unchecked Sendable {
private weak var emitter: (any AnyObject)?
private let emitterRemoveMethod: @Sendable (UUID) -> Void
private let registrationId: UUID

init(emitter: DefaultInternalEventEmitter<some Hashable & Sendable, some Sendable>, registrationId: UUID) {
self.emitter = emitter
self.registrationId = registrationId
// Capture the remove method to avoid protocol overhead
emitterRemoveMethod = { [weak emitter] id in
emitter?.nosync_removeRegistration(id: id)
}
}

var isEmitterDeallocated: Bool {
emitter == nil
}

func removeFromEmitter() {
emitterRemoveMethod(registrationId)
}
}
}
36 changes: 9 additions & 27 deletions Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
) {
mutableStateMutex = .init(
dispatchQueue: internalQueue,
initialValue: .init(liveObjectMutableState: .init(objectID: objectID), data: data),
initialValue: .init(liveObjectMutableState: .init(objectID: objectID, internalQueue: internalQueue), data: data),
)
self.logger = logger
self.userCallbackQueue = userCallbackQueue
Expand Down Expand Up @@ -150,44 +150,26 @@ internal final class InternalDefaultLiveCounter: Sendable {
@discardableResult
internal func subscribe(listener: @escaping LiveObjectUpdateCallback<DefaultLiveCounterUpdate>, coreSDK: CoreSDK) throws(ARTErrorInfo) -> any SubscribeResponse {
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
// swiftlint:disable:next trailing_closure
try mutableState.liveObjectMutableState.nosync_subscribe(listener: listener, coreSDK: coreSDK, updateSelfLater: { [weak self] action in
guard let self else {
return
}

mutableStateMutex.withSync { mutableState in
action(&mutableState.liveObjectMutableState)
}
})
try mutableState.liveObjectMutableState.nosync_subscribe(listener: listener, coreSDK: coreSDK)
}
}

internal func unsubscribeAll() {
mutableStateMutex.withSync { mutableState in
mutableState.liveObjectMutableState.unsubscribeAll()
mutableState.liveObjectMutableState.nosync_unsubscribeAll()
}
}

@discardableResult
internal func on(event: LiveObjectLifecycleEvent, callback: @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
mutableStateMutex.withSync { mutableState in
// swiftlint:disable:next trailing_closure
mutableState.liveObjectMutableState.on(event: event, callback: callback, updateSelfLater: { [weak self] action in
guard let self else {
return
}

mutableStateMutex.withSync { mutableState in
action(&mutableState.liveObjectMutableState)
}
})
mutableState.liveObjectMutableState.nosync_on(event: event, callback: callback)
}
}

internal func offAll() {
mutableStateMutex.withSync { mutableState in
mutableState.liveObjectMutableState.offAll()
mutableState.liveObjectMutableState.nosync_offAll()
}
}

Expand All @@ -198,7 +180,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
/// This is used to instruct this counter to emit updates during an `OBJECT_SYNC`.
internal func nosync_emit(_ update: LiveObjectUpdate<DefaultLiveCounterUpdate>) {
mutableStateMutex.withoutSync { mutableState in
mutableState.liveObjectMutableState.emit(update, on: userCallbackQueue)
mutableState.liveObjectMutableState.nosync_emit(update, on: userCallbackQueue)
}
}

Expand Down Expand Up @@ -425,14 +407,14 @@ internal final class InternalDefaultLiveCounter: Sendable {
logger: logger,
)
// RTLC7d1a
liveObjectMutableState.emit(update, on: userCallbackQueue)
liveObjectMutableState.nosync_emit(update, on: userCallbackQueue)
// RTLC7d1b
return true
case .known(.counterInc):
// RTLC7d5
let update = applyCounterIncOperation(operation.counterInc)
// RTLC7d5a
liveObjectMutableState.emit(update, on: userCallbackQueue)
liveObjectMutableState.nosync_emit(update, on: userCallbackQueue)
// RTLC7d5b
return true
case .known(.objectDelete):
Expand All @@ -447,7 +429,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
)

// RTLC7d4a
liveObjectMutableState.emit(.update(.init(amount: -dataBeforeApplyingOperation)), on: userCallbackQueue)
liveObjectMutableState.nosync_emit(.update(.init(amount: -dataBeforeApplyingOperation)), on: userCallbackQueue)
// RTLC7d4b
return true
default:
Expand Down
Loading
Loading