55// Created by Wes Wickwire on 11/9/24.
66//
77
8- public typealias Query < Input, Output> = Queryable < Input , Output , ( ) >
8+ public typealias Query < Input, Output> = Queryable < Input , Output >
99
10- public protocol Queryable < Input, Output, DB > : Sendable {
10+ public protocol Queryable < Input, Output> : Sendable {
1111 associatedtype Input : Sendable
1212 associatedtype Output : Sendable
13- associatedtype DB : Sendable
1413
1514 /// Whether the query requires a read or write transaction.
1615 var transactionKind : TransactionKind { get }
1716
1817 func execute(
1918 with input: Input ,
20- in database: DB
19+ in database: any Database
2120 ) async throws -> Output
2221
2322 func execute(
@@ -26,18 +25,53 @@ public protocol Queryable<Input, Output, DB>: Sendable {
2625 ) throws -> Output
2726}
2827
29- extension Queryable where DB == any Database {
30- public func execute(
28+ public extension Queryable {
29+ func execute(
3130 with input: Input ,
32- in database: DB
31+ in database: any Database
3332 ) async throws -> Output {
3433 let tx = try await database. begin ( transactionKind)
3534 return try execute ( with: input, tx: tx)
3635 }
36+
37+ func observe(
38+ with input: Input ,
39+ in database: any Database ,
40+ handle: @Sendable @escaping ( Output ) -> Void ,
41+ cancelled: @Sendable @escaping ( ) -> Void
42+ ) -> QueryObservation < Input , Output > {
43+ return QueryObservation (
44+ query: self . with ( database: database) ,
45+ input: input,
46+ database: database,
47+ handle: handle,
48+ cancelled: cancelled
49+ )
50+ }
51+
52+ func stream(
53+ with input: Input ,
54+ in database: any Database
55+ ) -> AsyncThrowingStream < Output , Error > {
56+ return AsyncThrowingStream < Output , Error > { continuation in
57+ let observation = self . observe (
58+ with: input,
59+ in: database
60+ ) { output in
61+ continuation. yield ( output)
62+ } cancelled: {
63+ // Nothing to do
64+ }
65+
66+ continuation. onTermination = { _ in
67+ observation. cancel ( )
68+ }
69+ }
70+ }
3771}
3872
3973extension Queryable where Input == ( ) {
40- func execute( in database: DB ) async throws -> Output {
74+ func execute( in database: any Database ) async throws -> Output {
4175 return try await execute ( with: ( ) , in: database)
4276 }
4377
@@ -46,53 +80,10 @@ extension Queryable where Input == () {
4680 }
4781
4882 func observe(
49- in database: DB ,
83+ in database: any Database ,
5084 handle: @Sendable @escaping ( Output ) -> Void ,
5185 cancelled: @Sendable @escaping ( ) -> Void
52- ) -> QueryObservation < Input , Output , DB > {
86+ ) -> QueryObservation < Input , Output > {
5387 return observe ( with: ( ) , in: database, handle: handle, cancelled: cancelled)
5488 }
55-
56- func stream( in database: DB ) -> AsyncThrowingStream < Output , Error > {
57- return stream ( with: ( ) , in: database)
58- }
59- }
60-
61- public extension Queryable where Input == ( ) , DB == ( ) {
62- func execute( ) async throws -> Output {
63- return try await execute ( with: ( ) , in: ( ) )
64- }
6589}
66-
67-
68- /// An injectable query that can be executed without explicitly
69- /// sending in the database.
70- // func execute(
71- // with input: Input
72- // ) async throws -> Output
73- //
74- // func observe(
75- // with input: Input,
76- // handle: @Sendable @escaping (Output) -> Void,
77- // cancelled: @Sendable @escaping () -> Void
78- // ) -> QueryObservation<Input, Output>
79-
80- //
81- //public extension Query {
82- // func stream(
83- // with input: Input
84- // ) -> AsyncThrowingStream<Output, Error> {
85- // return AsyncThrowingStream<Output, Error> { continuation in
86- // let observation = self.observe(with: input) { output in
87- // continuation.yield(output)
88- // } cancelled: {
89- // // Nothing to do
90- // }
91- //
92- // continuation.onTermination = { _ in
93- // observation.cancel()
94- // }
95- // }
96- // }
97- //}
98-
0 commit comments