Skip to content

Commit f73de9f

Browse files
committed
Some additional cleanup
1 parent 562aa16 commit f73de9f

5 files changed

Lines changed: 70 additions & 49 deletions

File tree

Sources/PowerSync/Implementation/AsyncConnectionPool.swift

Lines changed: 58 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -61,26 +61,27 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
6161
private let initialStatements: [String]
6262
private let logger: any LoggerProtocol
6363
private let tableUpdatesStream = BroadcastStream<Set<String>>()
64-
private let inner: AsyncSemaphore<NativeConnectionPool?> = AsyncSemaphore(singleElement: nil)
64+
private let opener = PoolOpener()
6565

6666
init(location: DatabaseLocation, logger: any LoggerProtocol, initialStatements: [String] = []) {
6767
self.location = location
6868
self.logger = logger
6969
self.initialStatements = initialStatements
7070
}
71-
71+
7272
var tableUpdates: AsyncStream<Set<String>> {
7373
tableUpdatesStream.subscribe()
7474
}
75-
75+
76+
/// Asyncifies a synchronous unit of work on by running it on a suitable background thread.
7677
private func runBlocking<T>(action: @escaping @Sendable () throws -> T, qos: DispatchQoS.QoSClass = .userInitiated) async throws -> T {
7778
return try await withCheckedThrowingContinuation { continuation in
7879
DispatchQueue.global(qos: qos).async {
7980
continuation.resume(with: Result(catching: { try action() }))
8081
}
8182
}
8283
}
83-
84+
8485
private func configureConnection(connection: borrowing RawSqliteConnection, isWriter: Bool) throws {
8586
let context = connection.asLease()
8687
for stmt in initialStatements {
@@ -111,40 +112,10 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
111112
let _ = try context.execute(sql: "select powersync_update_hooks('install')", parameters: [])
112113
}
113114
}
114-
115-
private func obtainInner() async throws -> NativeConnectionPool {
116-
var lease = try await inner.acquire(count: 1)
117-
if let pool = lease.acquiredItems[0] {
118-
return pool
119-
} else {
120-
try registerPowerSyncCoreExtension()
121-
122-
@Sendable func handleUpdates(_ updates: Set<String>) {
123-
self.tableUpdatesStream.dispatch(event: updates)
124-
}
125-
126-
let pool = try await runBlocking { [self] in
127-
let writer = try location.openConnection(writer: true)
128-
try configureConnection(connection: writer, isWriter: true)
129115

130-
if case .inMemory = location {
131-
return NativeConnectionPool(singleConnection: writer, logger: logger, handleUpdates: handleUpdates)
132-
} else {
133-
let numReaders = 4
134-
var readers = RigidDeque<RawSqliteConnection>(capacity: numReaders)
135-
while !readers.isFull {
136-
let connection = try location.openConnection(writer: false)
137-
try configureConnection(connection: connection, isWriter: false)
138-
readers.append(connection)
139-
}
140-
141-
return NativeConnectionPool(writer: writer, readers: readers, logger: logger, handleUpdates: handleUpdates)
142-
}
143-
}
144-
145-
lease.acquiredItems[0] = pool
146-
return pool
147-
}
116+
/// Opens connections on a background thread to obtain the native connection pool.
117+
private func obtainInner() async throws -> NativeConnectionPool {
118+
try await opener.obtainPool(pool: self)
148119
}
149120

150121
func read<T>(onConnection: @escaping @Sendable (any SQLiteConnectionLease) throws -> T) async throws -> T {
@@ -160,7 +131,7 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
160131
try await runBlocking { try onConnection(connection) }
161132
}
162133
}
163-
134+
164135
func withAllConnections<T>(onConnection: @escaping @Sendable (any SQLiteConnectionLease, [any SQLiteConnectionLease]) throws -> T) async throws -> T {
165136
let pool = try await obtainInner()
166137
return try await pool.withAllConnections { writer, readers in
@@ -169,10 +140,55 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
169140
}
170141

171142
func close() async throws {
172-
var lease = try await inner.acquire(count: 1)
173-
if let pool = lease.acquiredItems[0] {
174-
try await pool.close()
175-
lease.acquiredItems[0] = nil
143+
try await self.opener.close()
144+
}
145+
146+
private actor PoolOpener {
147+
private var pool: NativeConnectionPool? = nil
148+
private var isClosed = false
149+
150+
func obtainPool(pool context: AsyncConnectionPool) async throws -> NativeConnectionPool {
151+
if let pool {
152+
return pool
153+
}
154+
155+
try registerPowerSyncCoreExtension()
156+
let handleUpdates: @Sendable (_: Set<String>) -> () = { [weak context] updates in
157+
context?.tableUpdatesStream.dispatch(event: updates)
158+
}
159+
160+
let pool = try await context.runBlocking {
161+
let writer = try context.location.openConnection(writer: true)
162+
try context.configureConnection(connection: writer, isWriter: true)
163+
164+
if case .inMemory = context.location {
165+
return NativeConnectionPool(singleConnection: writer, logger: context.logger, handleUpdates: handleUpdates)
166+
} else {
167+
let numReaders = 4
168+
var readers = RigidDeque<RawSqliteConnection>(capacity: numReaders)
169+
while !readers.isFull {
170+
let connection = try context.location.openConnection(writer: false)
171+
try context.configureConnection(connection: connection, isWriter: false)
172+
readers.append(connection)
173+
}
174+
175+
return NativeConnectionPool(writer: writer, readers: readers, logger: context.logger, handleUpdates: handleUpdates)
176+
}
177+
}
178+
179+
self.pool = pool
180+
return pool
181+
}
182+
183+
func close() async throws {
184+
if isClosed {
185+
return
186+
}
187+
188+
isClosed = true
189+
if let pool {
190+
try await pool.close()
191+
}
176192
}
177193
}
178194
}

Sources/PowerSync/Implementation/sqlite3/NativeConnectionPool.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import DequeModule
66
///
77
/// This class does not configure or open connections (that is the responsibility of ``AsyncConnectionPool``).
88
final class NativeConnectionPool: Sendable {
9+
// This could be an async mutex, but AsyncSemaphore has better cancellation support.
910
private let writer: AsyncSemaphore<RawSqliteConnection>
1011
private let readers: AsyncSemaphore<RawSqliteConnection>?
1112
private let handleUpdates: @Sendable (_: Set<String>) -> ()

Sources/PowerSync/Implementation/sqlite3/NativeStatement.swift

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,12 @@ struct NativeSqliteStatement: ~Copyable {
3535
var columnCount: Int {
3636
return Int(sqlite3_column_count(self.stmt))
3737
}
38-
39-
38+
4039
var columnNames: [String : Int] {
4140
return resolvedColumnNames!
4241
}
4342

44-
borrowing func bindValues(_ parameters: [PowerSyncDataType?]) throws (PowerSyncError) {
43+
borrowing func bindValues(_ parameters: [PowerSyncDataType?]) throws(PowerSyncError) {
4544
for (i, parameter) in parameters.enumerated() {
4645
let index = Int32(i + 1)
4746

@@ -53,7 +52,7 @@ struct NativeSqliteStatement: ~Copyable {
5352
}
5453
}
5554

56-
borrowing func bindValue(_ index: Int32, _ parameter: PowerSyncDataType?) throws (PowerSyncError) {
55+
borrowing func bindValue(_ index: Int32, _ parameter: PowerSyncDataType?) throws(PowerSyncError) {
5756
let rc: Int32
5857

5958
switch parameter {
@@ -98,7 +97,7 @@ struct NativeSqliteStatement: ~Copyable {
9897
}
9998
}
10099

101-
mutating func step() throws (PowerSyncError) -> Bool {
100+
mutating func step() throws(PowerSyncError) -> Bool {
102101
let rc = sqlite3_step(self.stmt)
103102
if rc == SQLITE_DONE {
104103
return false

Sources/PowerSync/Utils/MergeItemSequence.swift

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
/// An ``AsyncSequence`` merging all items emitted between ``AsyncIteratorProtocol/next``.
1+
/// An ``AsyncSequence`` merging all items emitted between calls to ``AsyncIteratorProtocol/next``.
2+
///
3+
/// This is useful for sequences where we just want to know that an event has occurred, without needing
4+
/// to know about the exact event. We use this internally to implement `watch()` queries with a throttle:
5+
/// If any amount of events have occurred between throttled calls to `next()`, we want to dispatch a single
6+
/// event.
27
struct MergeItemSequence<Base: AsyncSequence & Sendable>: AsyncSequence where Base.Element == () {
38
typealias AsyncIterator = IteratorImpl
49
typealias Element = ()

Sources/PowerSync/resolvePowerSyncLoadableExtensionPath.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
/// function invokes `sqlite3_auto_extension` to load the core extension automatically.
66
///
77
/// - Returns: `nil`
8-
/// - Throws: An error if the extension could not be registered watchOS.
8+
/// - Throws: An error if the extension could not be registered.
99
public func resolvePowerSyncLoadableExtensionPath() throws(PowerSyncError) -> String? {
1010
try registerPowerSyncCoreExtension()
1111
return nil

0 commit comments

Comments
 (0)