Skip to content

Commit b2a4de8

Browse files
Make callbacks Sendable
Didn't do this in ce8c022 because I didn't have a good idea of our threading approach. But for the initial approach that we'll be taking — namely, calling the callbacks on ably-cocoa's callback queue — I think we'll need it. Haven't done it for batch stuff yet because I don't yet know whether it'll be necessary (will get a better idea when we implement it). Now that the callback can be called on any thread, we can no longer easily use the "capture the return value of `subscribe` pattern so that we can unsubscribe later" pattern. So, I've decided to pass the subscripiton as a second argument to the callback. Might be there's a better pattern we can use (e.g. pass in an object to the `subscribe` call, like JS `fetch`'s AbortController), but this'll do for now.
1 parent 0f13d54 commit b2a4de8

3 files changed

Lines changed: 20 additions & 17 deletions

File tree

Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicProxyObjects.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ internal final class PublicDefaultRealtimeObjects: RealtimeObjects {
5454
try await proxied.batch(callback: callback)
5555
}
5656

57-
internal func on(event: ObjectsEvent, callback: sending ObjectsEventCallback) -> any OnObjectsEventResponse {
57+
internal func on(event: ObjectsEvent, callback: ObjectsEventCallback) -> any OnObjectsEventResponse {
5858
proxied.on(event: event, callback: callback)
5959
}
6060

@@ -159,15 +159,15 @@ internal final class PublicDefaultLiveMap: LiveMap {
159159
try await proxied.remove(key: key)
160160
}
161161

162-
internal func subscribe(listener: sending @escaping LiveObjectUpdateCallback<LiveMapUpdate>) -> any SubscribeResponse {
162+
internal func subscribe(listener: @escaping LiveObjectUpdateCallback<LiveMapUpdate>) -> any SubscribeResponse {
163163
proxied.subscribe(listener: listener)
164164
}
165165

166166
internal func unsubscribeAll() {
167167
proxied.unsubscribeAll()
168168
}
169169

170-
internal func on(event: LiveObjectLifecycleEvent, callback: sending @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
170+
internal func on(event: LiveObjectLifecycleEvent, callback: @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
171171
proxied.on(event: event, callback: callback)
172172
}
173173

@@ -206,15 +206,15 @@ internal final class PublicDefaultLiveCounter: LiveCounter {
206206
try await proxied.decrement(amount: amount)
207207
}
208208

209-
internal func subscribe(listener: sending @escaping LiveObjectUpdateCallback<LiveCounterUpdate>) -> any SubscribeResponse {
209+
internal func subscribe(listener: @escaping LiveObjectUpdateCallback<LiveCounterUpdate>) -> any SubscribeResponse {
210210
proxied.subscribe(listener: listener)
211211
}
212212

213213
internal func unsubscribeAll() {
214214
proxied.unsubscribeAll()
215215
}
216216

217-
internal func on(event: LiveObjectLifecycleEvent, callback: sending @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
217+
internal func on(event: LiveObjectLifecycleEvent, callback: @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
218218
proxied.on(event: event, callback: callback)
219219
}
220220

Sources/AblyLiveObjects/Public/PublicTypes.swift

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@ import Ably
22

33
/// A callback used in ``LiveObject`` to listen for updates to the object.
44
///
5-
/// - Parameter update: The update object describing the changes made to the object.
6-
public typealias LiveObjectUpdateCallback<T> = (_ update: sending T) -> Void
5+
/// - Parameters:
6+
/// - update: The update object describing the changes made to the object.
7+
/// - subscription: A ``SubscribeResponse`` object that allows the provided listener to deregister itself from future updates.
8+
public typealias LiveObjectUpdateCallback<T> = @Sendable (_ update: sending T, _ subscription: SubscribeResponse) -> Void
79

810
/// The callback used for the events emitted by ``RealtimeObjects``.
9-
public typealias ObjectsEventCallback = () -> Void
11+
///
12+
/// - Parameter subscription: An ``OnObjectsEventResponse`` object that allows the provided listener to deregister itself from future updates.
13+
public typealias ObjectsEventCallback = @Sendable (_ subscription: OnObjectsEventResponse) -> Void
1014

1115
/// The callback used for the lifecycle events emitted by ``LiveObject``.
12-
public typealias LiveObjectLifecycleEventCallback = () -> Void
16+
/// - Parameter subscription: A ``OnLiveObjectLifecycleEventResponse`` object that allows the provided listener to deregister itself from future updates.
17+
public typealias LiveObjectLifecycleEventCallback = @Sendable (_ subscription: OnLiveObjectLifecycleEventResponse) -> Void
1318

1419
/// A function passed to ``RealtimeObjects/batch(callback:)`` to group multiple Objects operations into a single channel message.
1520
///
@@ -70,7 +75,7 @@ public protocol RealtimeObjects: Sendable {
7075
/// - callback: The event listener.
7176
/// - Returns: An ``OnObjectsEventResponse`` object that allows the provided listener to be deregistered from future updates.
7277
@discardableResult
73-
func on(event: ObjectsEvent, callback: sending @escaping ObjectsEventCallback) -> OnObjectsEventResponse
78+
func on(event: ObjectsEvent, callback: @escaping ObjectsEventCallback) -> OnObjectsEventResponse
7479

7580
/// Deregisters all registrations, for all events and listeners.
7681
func offAll()
@@ -337,7 +342,7 @@ public protocol LiveObject: AnyObject, Sendable {
337342
/// - Parameter listener: An event listener function that is called with an update object whenever this LiveObject is updated.
338343
/// - Returns: A ``SubscribeResponse`` object that allows the provided listener to be deregistered from future updates.
339344
@discardableResult
340-
func subscribe(listener: sending @escaping LiveObjectUpdateCallback<Update>) -> SubscribeResponse
345+
func subscribe(listener: @escaping LiveObjectUpdateCallback<Update>) -> SubscribeResponse
341346

342347
/// Deregisters all listeners from updates for this LiveObject.
343348
func unsubscribeAll()
@@ -349,7 +354,7 @@ public protocol LiveObject: AnyObject, Sendable {
349354
/// - callback: The event listener.
350355
/// - Returns: A ``OnLiveObjectLifecycleEventResponse`` object that allows the provided listener to be deregistered from future updates.
351356
@discardableResult
352-
func on(event: LiveObjectLifecycleEvent, callback: sending @escaping LiveObjectLifecycleEventCallback) -> OnLiveObjectLifecycleEventResponse
357+
func on(event: LiveObjectLifecycleEvent, callback: @escaping LiveObjectLifecycleEventCallback) -> OnLiveObjectLifecycleEventResponse
353358

354359
/// Deregisters all registrations, for all events and listeners.
355360
func offAll()

Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@ func waitFixtureChannelIsReady(_: ARTRealtime) async throws {
107107

108108
func waitForMapKeyUpdate(_ map: any LiveMap, _ key: String) async {
109109
await withCheckedContinuation { (continuation: CheckedContinuation<Void, _>) in
110-
var subscription: SubscribeResponse!
111-
subscription = map.subscribe { update in
110+
map.subscribe { update, subscription in
112111
if update.update[key] != nil {
113112
subscription.unsubscribe()
114113
continuation.resume()
@@ -119,8 +118,7 @@ func waitForMapKeyUpdate(_ map: any LiveMap, _ key: String) async {
119118

120119
func waitForCounterUpdate(_ counter: any LiveCounter) async {
121120
await withCheckedContinuation { (continuation: CheckedContinuation<Void, _>) in
122-
var subscription: SubscribeResponse!
123-
subscription = counter.subscribe { _ in
121+
counter.subscribe { _, subscription in
124122
subscription.unsubscribe()
125123
continuation.resume()
126124
}
@@ -666,7 +664,7 @@ private struct ObjectsIntegrationTests {
666664

667665
async let counterSubPromise: Void = withCheckedThrowingContinuation { continuation in
668666
do {
669-
try #require(root.get(key: "counter")?.liveCounterValue).subscribe { update in
667+
try #require(root.get(key: "counter")?.liveCounterValue).subscribe { update, _ in
670668
#expect(update.amount == -1, "Check counter subscription callback is called with an expected update object after OBJECT_SYNC sequence with \"tombstone=true\"")
671669
continuation.resume()
672670
}

0 commit comments

Comments
 (0)