|
5 | 5 | // Created by Wes Wickwire on 3/10/25. |
6 | 6 | // |
7 | 7 |
|
8 | | -public final class QueryObservation<Input, Output>: DatabaseSubscriber, Sendable |
| 8 | +import Foundation |
| 9 | + |
| 10 | +public final class DatabaseQueryObservation<Input, Output>: DatabaseSubscriber, QueryObservation, @unchecked Sendable |
9 | 11 | where Input: Sendable, Output: Sendable |
10 | 12 | { |
11 | 13 | private let query: any DatabaseQuery<Input, Output> |
12 | 14 | private let input: Input |
13 | 15 | private let database: any Database |
14 | | - private let handle: @Sendable (Output) -> Void |
15 | | - private let cancelled: @Sendable () -> Void |
| 16 | + private let lock = NSLock() |
| 17 | + private let queue = Queue() |
| 18 | + |
| 19 | + private var onChange: (@Sendable (Output) -> Void)? |
| 20 | + private var onError: (@Sendable (Error) -> Void)? |
16 | 21 |
|
17 | 22 | init( |
18 | 23 | query: any DatabaseQuery<Input, Output>, |
19 | 24 | input: Input, |
20 | | - database: any Database, |
21 | | - handle: @Sendable @escaping (Output) -> Void, |
22 | | - cancelled: @Sendable @escaping () -> Void |
| 25 | + database: any Database |
23 | 26 | ) { |
24 | 27 | self.query = query |
25 | 28 | self.input = input |
26 | 29 | self.database = database |
27 | | - self.handle = handle |
28 | | - self.cancelled = cancelled |
29 | 30 | } |
30 | 31 |
|
31 | 32 | public func receive(event: DatabaseEvent) { |
32 | | - Task { |
33 | | - try await handle(query.execute(with: input, in: database)) |
| 33 | + enqueueNext() |
| 34 | + } |
| 35 | + |
| 36 | + public func cancel() { |
| 37 | + lock.withLock { |
| 38 | + onChange = nil |
| 39 | + onError = nil |
| 40 | + } |
| 41 | + |
| 42 | + database.cancel(subscriber: self) |
| 43 | + } |
| 44 | + |
| 45 | + public func start( |
| 46 | + onChange: @escaping @Sendable (Output) -> Void, |
| 47 | + onError: @escaping @Sendable (Error) -> Void |
| 48 | + ) { |
| 49 | + lock.withLock { |
| 50 | + self.onChange = onChange |
| 51 | + self.onError = onError |
34 | 52 | } |
| 53 | + |
| 54 | + database.observe(subscriber: self) |
| 55 | + enqueueNext() |
35 | 56 | } |
36 | 57 |
|
37 | | - public func onCancel() { |
38 | | - cancelled() |
| 58 | + private func emitNext() async { |
| 59 | + guard let onChange else { |
| 60 | + return assertionFailure("Started without handle set") |
| 61 | + } |
| 62 | + |
| 63 | + do { |
| 64 | + let output = try await query.execute(with: input, in: database) |
| 65 | + onChange(output) |
| 66 | + } catch { |
| 67 | + onError?(error) |
| 68 | + cancel() |
| 69 | + } |
39 | 70 | } |
40 | 71 |
|
41 | | - public func cancel() { |
42 | | - database.cancel(subscriber: self) |
| 72 | + private func enqueueNext() { |
| 73 | + queue.enqueue { [weak self] in |
| 74 | + await self?.emitNext() |
| 75 | + } |
| 76 | + } |
| 77 | +} |
| 78 | + |
| 79 | +public protocol QueryObservation<Output>: Sendable, AsyncSequence { |
| 80 | + associatedtype Output: Sendable |
| 81 | + |
| 82 | + func start( |
| 83 | + onChange: @escaping @Sendable (Output) -> Void, |
| 84 | + onError: @escaping @Sendable (Error) -> Void |
| 85 | + ) |
| 86 | + |
| 87 | + func cancel() |
| 88 | +} |
| 89 | + |
| 90 | +extension QueryObservation { |
| 91 | + public func makeAsyncIterator() -> AsyncThrowingStream<Output, Error>.AsyncIterator { |
| 92 | + return asStream().makeAsyncIterator() |
| 93 | + } |
| 94 | + |
| 95 | + func asStream() -> AsyncThrowingStream<Output, Error> { |
| 96 | + AsyncThrowingStream<Output, Error> { continuation in |
| 97 | + start { output in |
| 98 | + continuation.yield(output) |
| 99 | + } onError: { error in |
| 100 | + continuation.finish(throwing: error) |
| 101 | + } |
| 102 | + |
| 103 | + continuation.onTermination = { _ in |
| 104 | + cancel() |
| 105 | + } |
| 106 | + } |
| 107 | + } |
| 108 | +} |
| 109 | + |
| 110 | +final class Queue: Sendable { |
| 111 | + typealias Action = @Sendable () async -> Void |
| 112 | + |
| 113 | + private let task: Task<(), Never> |
| 114 | + private let stream: AsyncStream<Action> |
| 115 | + private let continuation: AsyncStream<Action>.Continuation |
| 116 | + |
| 117 | + init() { |
| 118 | + let (stream, continuation) = AsyncStream<Action>.makeStream() |
| 119 | + self.stream = stream |
| 120 | + self.continuation = continuation |
| 121 | + self.task = Task { |
| 122 | + for await action in stream { |
| 123 | + await action() |
| 124 | + } |
| 125 | + } |
43 | 126 | } |
44 | 127 |
|
45 | | - public func start() async throws { |
46 | | - try await database.observe(subscriber: self) |
47 | | - try await emitNext() |
| 128 | + deinit { |
| 129 | + task.cancel() |
48 | 130 | } |
49 | 131 |
|
50 | | - private func emitNext() async throws { |
51 | | - let output = try await query.execute(with: input, in: (database)) |
52 | | - handle(output) |
| 132 | + func enqueue(_ action: @escaping Action) { |
| 133 | + continuation.yield(action) |
53 | 134 | } |
54 | 135 | } |
0 commit comments