Skip to content

Commit 2ee2c79

Browse files
Make callbacks Sendable
Didn't do this in ce8c022 because I didn't have a good idea of our threading approach. But for the initial approach that we'll be taking — namely, calling the callbacks on ably-cocoa's callback queue — I think we'll need it. Haven't done it for batch stuff yet because I don't yet know whether it'll be necessary (will get a better idea when we implement it). Now that the callback can be called on any thread, we can no longer easily use the "capture the return value of `subscribe` pattern so that we can unsubscribe later" pattern. So, I've decided to pass the subscripiton as a second argument to the callback. Might be there's a better pattern we can use (e.g. pass in an object to the `subscribe` call, like JS `fetch`'s AbortController), but this'll do for now.
1 parent 50fece3 commit 2ee2c79

5 files changed

Lines changed: 20 additions & 17 deletions

File tree

Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveCounter.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ internal final class PublicDefaultLiveCounter: LiveCounter {
3434
try await proxied.decrement(amount: amount)
3535
}
3636

37-
internal func subscribe(listener: sending @escaping LiveObjectUpdateCallback<LiveCounterUpdate>) -> any SubscribeResponse {
37+
internal func subscribe(listener: @escaping LiveObjectUpdateCallback<LiveCounterUpdate>) -> any SubscribeResponse {
3838
proxied.subscribe(listener: listener)
3939
}
4040

4141
internal func unsubscribeAll() {
4242
proxied.unsubscribeAll()
4343
}
4444

45-
internal func on(event: LiveObjectLifecycleEvent, callback: sending @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
45+
internal func on(event: LiveObjectLifecycleEvent, callback: @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
4646
proxied.on(event: event, callback: callback)
4747
}
4848

Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveMap.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,15 @@ internal final class PublicDefaultLiveMap: LiveMap {
8080
try await proxied.remove(key: key)
8181
}
8282

83-
internal func subscribe(listener: sending @escaping LiveObjectUpdateCallback<LiveMapUpdate>) -> any SubscribeResponse {
83+
internal func subscribe(listener: @escaping LiveObjectUpdateCallback<LiveMapUpdate>) -> any SubscribeResponse {
8484
proxied.subscribe(listener: listener)
8585
}
8686

8787
internal func unsubscribeAll() {
8888
proxied.unsubscribeAll()
8989
}
9090

91-
internal func on(event: LiveObjectLifecycleEvent, callback: sending @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
91+
internal func on(event: LiveObjectLifecycleEvent, callback: @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
9292
proxied.on(event: event, callback: callback)
9393
}
9494

Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultRealtimeObjects.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ internal final class PublicDefaultRealtimeObjects: RealtimeObjects {
5151
try await proxied.batch(callback: callback)
5252
}
5353

54-
internal func on(event: ObjectsEvent, callback: sending @escaping ObjectsEventCallback) -> any OnObjectsEventResponse {
54+
internal func on(event: ObjectsEvent, callback: @escaping ObjectsEventCallback) -> any OnObjectsEventResponse {
5555
proxied.on(event: event, callback: callback)
5656
}
5757

Sources/AblyLiveObjects/Public/PublicTypes.swift

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@ import Ably
22

33
/// A callback used in ``LiveObject`` to listen for updates to the object.
44
///
5-
/// - Parameter update: The update object describing the changes made to the object.
6-
public typealias LiveObjectUpdateCallback<T> = (_ update: sending T) -> Void
5+
/// - Parameters:
6+
/// - update: The update object describing the changes made to the object.
7+
/// - subscription: A ``SubscribeResponse`` object that allows the provided listener to deregister itself from future updates.
8+
public typealias LiveObjectUpdateCallback<T> = @Sendable (_ update: sending T, _ subscription: SubscribeResponse) -> Void
79

810
/// The callback used for the events emitted by ``RealtimeObjects``.
9-
public typealias ObjectsEventCallback = () -> Void
11+
///
12+
/// - Parameter subscription: An ``OnObjectsEventResponse`` object that allows the provided listener to deregister itself from future updates.
13+
public typealias ObjectsEventCallback = @Sendable (_ subscription: OnObjectsEventResponse) -> Void
1014

1115
/// The callback used for the lifecycle events emitted by ``LiveObject``.
12-
public typealias LiveObjectLifecycleEventCallback = () -> Void
16+
/// - Parameter subscription: A ``OnLiveObjectLifecycleEventResponse`` object that allows the provided listener to deregister itself from future updates.
17+
public typealias LiveObjectLifecycleEventCallback = @Sendable (_ subscription: OnLiveObjectLifecycleEventResponse) -> Void
1318

1419
/// A function passed to ``RealtimeObjects/batch(callback:)`` to group multiple Objects operations into a single channel message.
1520
///
@@ -70,7 +75,7 @@ public protocol RealtimeObjects: Sendable {
7075
/// - callback: The event listener.
7176
/// - Returns: An ``OnObjectsEventResponse`` object that allows the provided listener to be deregistered from future updates.
7277
@discardableResult
73-
func on(event: ObjectsEvent, callback: sending @escaping ObjectsEventCallback) -> OnObjectsEventResponse
78+
func on(event: ObjectsEvent, callback: @escaping ObjectsEventCallback) -> OnObjectsEventResponse
7479

7580
/// Deregisters all registrations, for all events and listeners.
7681
func offAll()
@@ -337,7 +342,7 @@ public protocol LiveObject: AnyObject, Sendable {
337342
/// - Parameter listener: An event listener function that is called with an update object whenever this LiveObject is updated.
338343
/// - Returns: A ``SubscribeResponse`` object that allows the provided listener to be deregistered from future updates.
339344
@discardableResult
340-
func subscribe(listener: sending @escaping LiveObjectUpdateCallback<Update>) -> SubscribeResponse
345+
func subscribe(listener: @escaping LiveObjectUpdateCallback<Update>) -> SubscribeResponse
341346

342347
/// Deregisters all listeners from updates for this LiveObject.
343348
func unsubscribeAll()
@@ -349,7 +354,7 @@ public protocol LiveObject: AnyObject, Sendable {
349354
/// - callback: The event listener.
350355
/// - Returns: A ``OnLiveObjectLifecycleEventResponse`` object that allows the provided listener to be deregistered from future updates.
351356
@discardableResult
352-
func on(event: LiveObjectLifecycleEvent, callback: sending @escaping LiveObjectLifecycleEventCallback) -> OnLiveObjectLifecycleEventResponse
357+
func on(event: LiveObjectLifecycleEvent, callback: @escaping LiveObjectLifecycleEventCallback) -> OnLiveObjectLifecycleEventResponse
353358

354359
/// Deregisters all registrations, for all events and listeners.
355360
func offAll()

Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ func waitFixtureChannelIsReady(_: ARTRealtime) async throws {
9090

9191
func waitForMapKeyUpdate(_ map: any LiveMap, _ key: String) async {
9292
await withCheckedContinuation { (continuation: CheckedContinuation<Void, _>) in
93-
var subscription: SubscribeResponse!
94-
subscription = map.subscribe { update in
93+
map.subscribe { update, subscription in
9594
if update.update[key] != nil {
9695
subscription.unsubscribe()
9796
continuation.resume()
@@ -102,8 +101,7 @@ func waitForMapKeyUpdate(_ map: any LiveMap, _ key: String) async {
102101

103102
func waitForCounterUpdate(_ counter: any LiveCounter) async {
104103
await withCheckedContinuation { (continuation: CheckedContinuation<Void, _>) in
105-
var subscription: SubscribeResponse!
106-
subscription = counter.subscribe { _ in
104+
counter.subscribe { _, subscription in
107105
subscription.unsubscribe()
108106
continuation.resume()
109107
}
@@ -642,7 +640,7 @@ private struct ObjectsIntegrationTests {
642640

643641
async let counterSubPromise: Void = withCheckedThrowingContinuation { continuation in
644642
do {
645-
try #require(root.get(key: "counter")?.liveCounterValue).subscribe { update in
643+
try #require(root.get(key: "counter")?.liveCounterValue).subscribe { update, _ in
646644
#expect(update.amount == -1, "Check counter subscription callback is called with an expected update object after OBJECT_SYNC sequence with \"tombstone=true\"")
647645
continuation.resume()
648646
}

0 commit comments

Comments
 (0)