Skip to content

Commit a7f768b

Browse files
Implement the subscriptionsCallbacksScenarios
As in 8be3aed. Cursor struggled a bit when porting the JS tests, not considering some Swift concurrency subtleties (the fact that `async let` or `Task` tasks do not have any "synchronous part" thus needing to be careful about not missing events, same as in c75fa2a, and the lack of any ordering guarantees when triggering `Task`s), so I introduced the Subscription.addListener and MainActorStorage types to perform more work synchronously to be in line with the JS tests.
1 parent 70306a0 commit a7f768b

2 files changed

Lines changed: 540 additions & 1 deletion

File tree

Tests/AblyLiveObjectsTests/Helpers/Subscriber.swift

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ final class Subscriber<each CallbackArg: Sendable>: Sendable {
99
// Used to synchronize access to the nonisolated(unsafe) mutable state.
1010
private let mutex = NSLock()
1111
private nonisolated(unsafe) var invocations: [(repeat each CallbackArg)] = []
12+
private nonisolated(unsafe) var listeners: [CallbackWrapper] = []
1213

1314
/// Creates a `Subscriber`.
1415
///
@@ -39,13 +40,50 @@ final class Subscriber<each CallbackArg: Sendable>: Sendable {
3940
guard let self else {
4041
return
4142
}
42-
mutex.withLock {
43+
let callListeners = mutex.withLock {
4344
let invocation = (repeat each arg)
4445
invocations.append(invocation)
46+
47+
return { [listeners] in
48+
for listener in listeners {
49+
listener.callAsFunction(repeat each invocation)
50+
}
51+
}
4552
}
4653
if let action {
4754
action(repeat each arg)
4855
}
56+
callListeners()
4957
}
5058
}
59+
60+
/// A wrapper that allows us to store a callback that takes variadic args.
61+
///
62+
/// This allows us to avoid the error "Cannot fully abstract a value of variadic function type '@Sendable (repeat each CallbackArg) -> ()' because different contexts will not be able to reliably agree on a calling convention; try wrapping it in a struct" that we get if we try to directly store the callback in an array. Claude suggested this solution.
63+
private struct CallbackWrapper {
64+
let callback: @Sendable (repeat each CallbackArg) -> Void
65+
66+
func callAsFunction(_ args: repeat each CallbackArg) {
67+
callback(repeat each args)
68+
}
69+
}
70+
71+
/// Adds a listener which replays all previously buffered and future invocations of any function previously created by ``createListener(_:)``.
72+
///
73+
/// This is useful for the scenario where you want to set up a subscription synchronously (so as not to miss any events) but then in an `async` context perform actions as a result of the invocation of the listener. (You could equally use the SDK's `AsyncSequence` interface but the approach here is a closer mapping of the ported JS integration tests that call `subscribe`.)
74+
func addListener(_ listener: @escaping (@Sendable (repeat each CallbackArg) -> Void)) {
75+
let performInvocations = mutex.withLock {
76+
listeners.append(.init(callback: listener))
77+
78+
return { [invocations, callbackQueue] in
79+
for invocation in invocations {
80+
callbackQueue.async {
81+
listener(repeat each invocation)
82+
}
83+
}
84+
}
85+
}
86+
87+
performInvocations()
88+
}
5189
}

0 commit comments

Comments
 (0)