Skip to content

Commit a44f1d2

Browse files
committed
QueryStream
1 parent 4b6614e commit a44f1d2

14 files changed

Lines changed: 308 additions & 88 deletions

Sources/Otter/Cursor.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77

88
import SQLite3
99

10+
/// A low-level iterator over the results of a prepared database statement.
11+
///
12+
/// `Cursor` wraps a `Statement` and allows stepping through query results one
13+
/// row at a time. Use `next()` functions to get the next row in the iteration.
1014
public struct Cursor<Element>: ~Copyable {
1115
private let statement: Statement
1216

Sources/Otter/OtterError.swift

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
// Created by Wes Wickwire on 2/16/25.
66
//
77

8+
import Foundation
9+
810
public enum OtterError: Error, Equatable {
911
case failedToOpenConnection(path: String)
1012
case failedToInitializeStatement
@@ -46,3 +48,68 @@ public enum OtterError: Error, Equatable {
4648
return .cannotEncode("\(type)", to: "\(otherType)", reason: reason)
4749
}
4850
}
51+
52+
extension OtterError: CustomStringConvertible {
53+
public var description: String {
54+
switch self {
55+
case .failedToOpenConnection(let path):
56+
return "Failed to open database connection at path '\(path)'."
57+
case .failedToInitializeStatement:
58+
return "Failed to initialize SQL statement."
59+
case .columnIsNil(let index):
60+
return "Column at index \(index) is nil."
61+
case .noMoreColumns:
62+
return "No more columns available in the row."
63+
case .queryReturnedNoValue:
64+
return "Query returned no value."
65+
case .sqlite(let code, let message):
66+
if let message = message {
67+
return "SQLite error \(code): \(message)"
68+
} else {
69+
return "SQLite error \(code)"
70+
}
71+
case .txNoLongerValid:
72+
return "Transaction is no longer valid."
73+
case .failedToGetConnection:
74+
return "Failed to get a connection from the pool."
75+
case .poolCannotHaveZeroConnections:
76+
return "Connection pool cannot have zero connections."
77+
case .alreadyCommited:
78+
return "Transaction has already been committed."
79+
case .entityWasNotFound:
80+
return "Requested entity was not found."
81+
case .subscriptionAlreadyStarted:
82+
return "Query observation has already been started."
83+
case .invalidUuidString:
84+
return "Invalid UUID string."
85+
case .cannotDecode(let type, let from, let reason):
86+
if let reason = reason {
87+
return "Cannot decode \(type) from \(from): \(reason)"
88+
} else {
89+
return "Cannot decode \(type) from \(from)."
90+
}
91+
case .cannotEncode(let type, let to, let reason):
92+
if let reason = reason {
93+
return "Cannot encode \(type) to \(to): \(reason)"
94+
} else {
95+
return "Cannot encode \(type) to \(to)."
96+
}
97+
case .decodingError(let message):
98+
return "Decoding error: \(message)"
99+
case .encodingError(let message):
100+
return "Encoding error: \(message)"
101+
case .requiredAssociationFailed(let parent, let childKey):
102+
return "Required association failed: \(parent).\(childKey)"
103+
case .cannotObserveWriteQuery:
104+
return "Cannot observe a write query."
105+
case .cannotWriteInAReadTransaction:
106+
return "Cannot perform a write in a read-only transaction."
107+
case .unexpectedNil:
108+
return "Unexpected nil encountered."
109+
}
110+
}
111+
}
112+
113+
extension OtterError: LocalizedError {
114+
public var errorDescription: String? { description }
115+
}

Sources/Otter/Queries/AnyQuery.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ public struct AnyQuery<Input: Sendable, Output: Sendable>: Query {
3636
connection: query.connection,
3737
watchedTables: query.watchedTables,
3838
execute: { try query.execute(with: $0, tx: $1) },
39-
observe: { query.observe(with: $0) }
39+
observe: { query.observation(with: $0) }
4040
)
4141
}
4242

4343
public func execute(with input: Input, tx: borrowing Transaction) throws -> Output {
4444
try _execute(input, tx)
4545
}
4646

47-
public func observe(with input: Input) -> any QueryObservation<Output> {
47+
public func observation(with input: Input) -> any QueryObservation<Output> {
4848
_observe(input)
4949
}
5050
}

Sources/Otter/Queries/Fail.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public extension Queries {
4949
throw error
5050
}
5151

52-
public func observe(with input: Input) -> any QueryObservation<Output> {
52+
public func observation(with input: Input) -> any QueryObservation<Output> {
5353
return Observation(error: error)
5454
}
5555

Sources/Otter/Queries/Just.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public extension Queries {
7272
return output
7373
}
7474

75-
public func observe(with input: Input) -> any QueryObservation<Output> {
75+
public func observation(with input: Input) -> any QueryObservation<Output> {
7676
return Observation(output: output)
7777
}
7878

Sources/Otter/Queries/Map.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public extension Queries {
3131
try transform(input, base.execute(with: input, tx: tx))
3232
}
3333

34-
public func observe(with input: Base.Input) -> any QueryObservation<Output> {
35-
return Observation(base: base.observe(with: input), input: input, transform: transform)
34+
public func observation(with input: Base.Input) -> any QueryObservation<Output> {
35+
return Observation(base: base.observation(with: input), input: input, transform: transform)
3636
}
3737

3838
struct Observation: QueryObservation {

Sources/Otter/Queries/MapInput.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public extension Queries {
3030
try base.execute(with: transform(input), tx: tx)
3131
}
3232

33-
public func observe(with input: Input) -> any QueryObservation<Output> {
34-
return base.observe(with: transform(input))
33+
public func observation(with input: Input) -> any QueryObservation<Output> {
34+
return base.observation(with: transform(input))
3535
}
3636
}
3737
}

Sources/Otter/Queries/Test.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public extension Queries {
6060
return try execute(input)
6161
}
6262

63-
public nonisolated func observe(with input: Input) -> any QueryObservation<Output> {
63+
public nonisolated func observation(with input: Input) -> any QueryObservation<Output> {
6464
lock.withLock { observeCallCount += 1 }
6565
return Observation(input: input, query: self)
6666
}

Sources/Otter/Query+Combine.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public struct QueryPublisher<Output>: Publisher {
4646
switch state {
4747
case .pending:
4848
// Received first demand, start observation
49-
let observation = query.observe()
49+
let observation = query.observation(with: ())
5050

5151
observation.start { [weak self] output in
5252
self?.receive(output: output)

Sources/Otter/Query.swift

Lines changed: 94 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,78 +28,137 @@ public protocol Query<Input, Output>: Sendable {
2828
/// if those tables changed.
2929
var watchedTables: Set<String> { get }
3030

31-
/// Executes the query
32-
///
33-
/// - Parameters:
34-
/// - input: The query's input
35-
/// - tx: The transaction to run the query in
36-
/// - Returns: The query's output
37-
func execute(
38-
with input: Input,
39-
tx: borrowing Transaction
40-
) throws -> Output
41-
42-
/// Observes the query's value over time. When the database
43-
/// changes new values will automatically be refreshed.
44-
///
45-
/// The `QueryObservation` is an `AsyncSequence` and can
46-
/// be observed with a for loop.
31+
/// Executes the query once within the given transaction.
4732
///
33+
/// Example:
4834
/// ```swift
49-
/// for try await value in query.observe() {
50-
/// print(value)
35+
/// try await queries.begin(.read) { tx in
36+
/// let user = try userQuery.execute(with: 42, tx: tx)
37+
/// print("Fetched user:", user)
5138
/// }
5239
/// ```
5340
///
54-
/// - Parameter input: The query's input
55-
/// - Returns: The observation.
56-
func observe(with input: Input) -> any QueryObservation<Output>
41+
/// - Parameters:
42+
/// - input: The query input or parameters to use for execution.
43+
/// - tx: The active transaction in which the query will be executed.
44+
/// - Returns: The decoded `Output` of the query.
45+
/// - Throws: An error if the query fails to execute or if the results
46+
/// cannot be decoded into the expected type.
47+
func execute(with input: Input, tx: borrowing Transaction) throws -> Output
48+
49+
/// Initializes a QueryObservation that watches the database for
50+
/// changes on anything that affects the query and emits changes
51+
/// overtime.
52+
///
53+
/// This likely will not be used directly yet using `observe` instead.
54+
func observation(with input: Input) -> any QueryObservation<Output>
5755
}
5856

5957
public extension Query {
58+
/// Executes the query once and returns the result.
59+
///
60+
/// Example:
61+
/// ```swift
62+
/// let user = try await userQuery.execute(with: 42, tx: tx)
63+
/// ```
64+
///
65+
/// - Parameters:
66+
/// - input: The query input or parameters to use for execution.
67+
/// - tx: The active transaction in which the query will be executed.
68+
/// - Returns: The decoded `Output` of the query.
69+
/// - Throws: An error if the query fails to execute or if the results
70+
/// cannot be decoded into the expected type.
6071
func execute(with input: Input) async throws -> Output {
6172
try await connection.begin(transactionKind) { tx in
6273
try execute(with: input, tx: tx)
6374
}
6475
}
6576

66-
func observe(with input: Input) -> any QueryObservation<Output> {
67-
return DatabaseQueryObservation(
77+
func observation(with input: Input) -> any QueryObservation<Output> {
78+
// By default just return a DatabaseQueryObservation
79+
DatabaseQueryObservation(
6880
query: self,
6981
input: input,
7082
watchedTables: watchedTables,
7183
connection: connection
7284
)
7385
}
86+
87+
/// Observes the results of a database query and streams updates as the
88+
/// underlying data changes.
89+
///
90+
/// This method returns an `AsyncSequence` that first yields the current
91+
/// results of the query, then continues to emit new values whenever the
92+
/// relevant database tables are modified. Use this when you need to react
93+
/// to live changes in the database.
94+
///
95+
/// Example:
96+
/// ```swift
97+
/// for await row in query.observe(with: input) {
98+
/// print("Row updated:", row)
99+
/// }
100+
/// ```
101+
///
102+
/// - Parameter input: The query or input definition used to fetch results.
103+
/// - Returns: A `QueryStream` sequence of `Output` values that reflect
104+
/// both the initial results and subsequent changes.
105+
func observe(with input: Input) -> QueryStream<Output> {
106+
QueryStream(observation(with: input))
107+
}
74108
}
75109

76110
public extension Query where Input == () {
77-
/// Executes the query in the given transaction
78-
/// - Parameter tx: The transaction to execute the query in
79-
/// - Returns: The query's output
111+
/// Executes the query once within the given transaction.
112+
///
113+
/// Example:
114+
/// ```swift
115+
/// try await queries.begin(.read) { tx in
116+
/// let user = try userQuery.execute(tx: tx)
117+
/// print("Fetched user:", user)
118+
/// }
119+
/// ```
120+
///
121+
/// - Parameters:
122+
/// - tx: The active transaction in which the query will be executed.
123+
/// - Returns: The decoded `Output` of the query.
124+
/// - Throws: An error if the query fails to execute or if the results
125+
/// cannot be decoded into the expected type.
80126
func execute(tx: borrowing Transaction) throws -> Output {
81127
return try execute(with: (), tx: tx)
82128
}
83129

84-
/// Executes the query
130+
/// Executes the query once and returns the result.
131+
///
132+
/// Example:
133+
/// ```swift
134+
/// let user = try await userQuery.execute()
135+
/// ```
136+
///
137+
/// - Returns: The decoded `Output` of the query.
138+
/// - Throws: An error if the query fails to execute or if the results
139+
/// cannot be decoded into the expected type.
85140
func execute() async throws -> Output {
86141
return try await execute(with: ())
87142
}
88143

89-
/// Observes the query's value over time. When the database
90-
/// changes new values will automatically be refreshed.
144+
/// Observes the results of a database query and streams updates as the
145+
/// underlying data changes.
91146
///
92-
/// The `QueryObservation` is an `AsyncSequence` and can
93-
/// be observed with a for loop.
147+
/// This method returns an `AsyncSequence` that first yields the current
148+
/// results of the query, then continues to emit new values whenever the
149+
/// relevant database tables are modified. Use this when you need to react
150+
/// to live changes in the database.
94151
///
152+
/// Example:
95153
/// ```swift
96-
/// for try await value in query.observe() {
97-
/// print(value)
154+
/// for await row in query.observe() {
155+
/// print("Row updated:", row)
98156
/// }
99157
/// ```
100158
///
101-
/// - Returns: The observation.
102-
func observe() -> any QueryObservation<Output> {
159+
/// - Returns: A `QueryStream` sequence of `Output` values that reflect
160+
/// both the initial results and subsequent changes.
161+
func observe() -> QueryStream<Output> {
103162
return observe(with: ())
104163
}
105164
}

0 commit comments

Comments
 (0)