@@ -51,17 +51,9 @@ public actor ConnectionPool: Sendable {
5151 // Turn on WAL mode
5252 try connection. execute ( sql: " PRAGMA journal_mode=WAL; " )
5353
54- let tx = try Transaction (
55- connection: connection,
56- kind: . write,
57- pool: nil
58- )
59-
54+ let tx = try Transaction ( connection: connection, kind: . write)
6055 try MigrationRunner . execute ( migrations: migrations, tx: tx)
61-
62- // We don't want an async inti so we can skip the reclaim to remove the await
63- // and manually add it to the availableConnections manually.
64- try tx. commitWithoutReclaim ( )
56+ try tx. commit ( )
6557
6658 self . availableConnections = [ connection]
6759 }
@@ -71,47 +63,26 @@ public actor ConnectionPool: Sendable {
7163 return count >= limit
7264 }
7365
74- /// Gives the connection back to the pool.
75- func reclaim(
76- connection: SQLiteConnection ,
77- txKind: TransactionKind
78- ) async {
79- availableConnections. append ( connection)
80- alertAnyWaitersOfAvailableConnection ( )
81-
82- if txKind == . write {
83- await writeLock. unlock ( )
84- }
85- }
86- }
87-
88- extension ConnectionPool : Connection {
89- public nonisolated func observe( subscriber: any DatabaseSubscriber ) {
90- observer. subscribe ( subscriber: subscriber)
91- }
92-
93- public nonisolated func cancel( subscriber: any DatabaseSubscriber ) {
94- observer. cancel ( subscriber: subscriber)
95- }
96-
97- public nonisolated func didCommit( transaction: borrowing Transaction ) {
98- observer. didCommit ( )
99- }
100-
10166 /// Starts a transaction.
102- public func begin(
103- _ kind: TransactionKind
67+ private func begin(
68+ _ kind: Transaction . Kind
10469 ) async throws ( FeatherError) -> sending Transaction {
10570 // Writes must be exclusive, make sure to wait on any pending writes.
10671 if kind == . write {
10772 await writeLock. lock ( )
10873 }
10974
110- return try await Transaction (
111- connection: getConnection ( ) ,
112- kind: kind,
113- pool: self
114- )
75+ return try await Transaction ( connection: getConnection ( ) , kind: kind)
76+ }
77+
78+ /// Gives the connection back to the pool.
79+ private func reclaim( tx: borrowing Transaction ) async {
80+ availableConnections. append ( tx. connection)
81+ alertAnyWaitersOfAvailableConnection ( )
82+
83+ if tx. kind == . write {
84+ await writeLock. unlock ( )
85+ }
11586 }
11687
11788 /// Will get, wait or create a connection to the database
@@ -147,3 +118,41 @@ extension ConnectionPool: Connection {
147118 waiter. resume ( with: . success( connection) )
148119 }
149120}
121+
122+ extension ConnectionPool : Connection {
123+ public nonisolated func observe( subscriber: any DatabaseSubscriber ) {
124+ observer. subscribe ( subscriber: subscriber)
125+ }
126+
127+ public nonisolated func cancel( subscriber: any DatabaseSubscriber ) {
128+ observer. cancel ( subscriber: subscriber)
129+ }
130+
131+ /// Starts a transaction.
132+ public func begin< Output> (
133+ _ kind: Transaction . Kind ,
134+ execute: ( borrowing Transaction ) throws -> Output
135+ ) async throws -> Output {
136+ let tx = try await begin ( kind)
137+
138+ // The `Result` wrapper seems weird, but allows us to keep
139+ // tx functions consuming. Cause we cannot call `commit` in
140+ // the `do` and on failure call `rollback` since it would
141+ // have been consumed in the `commit`.
142+ let result = Result {
143+ try execute ( tx)
144+ }
145+
146+ await reclaim ( tx: tx)
147+
148+ switch result {
149+ case . success( let output) :
150+ try tx. commit ( )
151+ observer. didCommit ( )
152+ return output
153+ case . failure( let error) :
154+ try tx. commitOrRollback ( )
155+ throw error
156+ }
157+ }
158+ }
0 commit comments