Skip to content

Commit 6fffcee

Browse files
committed
Await initialization before connecting
1 parent c49f08c commit 6fffcee

5 files changed

Lines changed: 136 additions & 146 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Changelog
22

3-
## 1.14.0-Beta.0
3+
## 1.14.0
44

55
* Remove internal dependency on the PowerSync Kotlin SDK. Going forward, the Swift SDK is implemented in Swift!
66
__Important__: While these changes are tested, they are a full rewrite of the internal connection pool logic.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
// The current version of the PowerSync Swift SDK. This should be updated to the latest version in `CHANGELOG.md` when a new version is released.
2-
let libraryVersion = "1.14.0-Beta.0"
2+
let libraryVersion = "1.14.0"
33

Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
88
private let dbFilename: String?
99
private let httpClient: HttpClient
1010
private let initializer = DatabaseInitializationAction()
11-
fileprivate let queries: ConnectionPoolQueries
11+
let pool: any SQLiteConnectionPoolProtocol
1212
let schema: AsyncMutex<Schema>
1313

1414
init(
@@ -24,7 +24,7 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
2424
self.logger = logger
2525
self.schema = AsyncMutex(schema)
2626
self.httpClient = httpClient
27-
self.queries = ConnectionPoolQueries(pool: pool)
27+
self.pool = pool
2828
self.group = activeInstanceStore.referenceGroup(identifier: identifier, logger: logger)
2929
}
3030

@@ -44,7 +44,7 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
4444

4545
fileprivate func resolveOfflineSyncStatus() async throws {
4646
// We can't use get() here because it runs as part of the initialization step.
47-
let offlineSyncStatus = try await queries.readLock { connection in
47+
let offlineSyncStatus = try await readLockInner { connection in
4848
try connection.get(sql: "SELECT powersync_offline_sync_status()", parameters: []) { cursor in
4949
let raw = try cursor.getString(index: 0)
5050
guard let data = raw.data(using: .utf8) else {
@@ -70,16 +70,17 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
7070
}
7171

7272
fileprivate func applySchema(schema: Schema) async throws {
73-
try await queries.withAll { writer, readers in
73+
try await pool.withAllConnections { writer, readers in
7474
let encoded = try StreamingSyncClient.jsonEncoder.encode(schema)
7575
guard let asString = String(data: encoded, encoding: .utf8) else {
7676
throw PowerSyncError.operationFailed(message: "Could not serialize schema")
7777
}
78-
try writer.execute(sql: "SELECT powersync_replace_schema(?)", parameters: [asString])
78+
79+
let _ = try writer.execute(sql: "SELECT powersync_replace_schema(?)", parameters: [.string(asString)])
7980

8081
for reader in readers {
8182
// Update the schema on all read connections
82-
try reader.execute(sql: "pragma table_info('sqlite_master')", parameters: [])
83+
let _ = try reader.execute(sql: "pragma table_info('sqlite_master')", parameters: [])
8384
}
8485
}
8586
}
@@ -113,7 +114,7 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
113114
try await initialize()
114115
try await initializer.close {
115116
await group.syncCoordinator.disconnect()
116-
try await queries.pool.close()
117+
try await pool.close()
117118
}
118119
}
119120

@@ -127,6 +128,7 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
127128
}
128129

129130
func connect(connector: any PowerSyncBackendConnectorProtocol, options: ConnectOptions?) async throws {
131+
try await initialize()
130132
await group.syncCoordinator.connect(db: self, connector: connector, options: options ?? ConnectOptions(), client: httpClient)
131133
}
132134

@@ -143,23 +145,35 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
143145

144146
do {
145147
let flags = flags
146-
let _ = try await queries.writeLock { ctx in try ctx.execute(sql: "SELECT powersync_clear(?)", parameters: [flags]) }
148+
let _ = try await writeLockInner { ctx in try ctx.execute(sql: "SELECT powersync_clear(?)", parameters: [flags]) }
147149
}
148150
}
149151
}
150152

151153
func writeLock<R: Sendable>(callback: @escaping @Sendable (any ConnectionContext) throws -> R) async throws -> R {
152154
try await initialize()
153-
return try await queries.writeLock(callback: callback)
155+
return try await writeLockInner(callback: callback)
156+
}
157+
158+
fileprivate func writeLockInner<R: Sendable>(callback: @escaping @Sendable (any ConnectionContext) throws -> R) async throws -> R {
159+
return try await self.pool.write { connection in
160+
try callback(ConnectionLeaseContext(lease: connection))
161+
}
154162
}
155163

156164
func readLock<R: Sendable>(callback: @escaping @Sendable (any ConnectionContext) throws -> R) async throws -> R {
157165
try await initialize()
158-
return try await queries.readLock(callback: callback)
166+
return try await readLockInner(callback: callback)
167+
}
168+
169+
fileprivate func readLockInner<R: Sendable>(callback: @escaping @Sendable (any ConnectionContext) throws -> R) async throws -> R {
170+
return try await pool.read { connection in
171+
try callback(ConnectionLeaseContext(lease: connection))
172+
}
159173
}
160174

161175
func watch<RowType: Sendable>(options: WatchOptions<RowType>) throws -> AsyncThrowingStream<[RowType], any Error> {
162-
return try queries.watch(options: options)
176+
return watchImpl(db: self, options: options)
163177
}
164178

165179
static let maxOpId = Int64.max
@@ -178,7 +192,7 @@ private actor DatabaseInitializationAction {
178192
return
179193
}
180194

181-
powerSyncVersion = try await db.queries.writeLock { conn in
195+
powerSyncVersion = try await db.writeLockInner { conn in
182196
let sqliteVersion = try conn.get(sql: "SELECT sqlite_version()", parameters: []) { try $0.getString(index: 0) }
183197
let powerSyncVersion = try conn.get(sql: "SELECT powersync_rs_version()", parameters: []) { try $0.getString(index: 0) }
184198

Sources/PowerSync/Implementation/queries/ConnectionPoolQueries.swift

Lines changed: 0 additions & 132 deletions
This file was deleted.
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import AsyncAlgorithms
2+
3+
func watchImpl<RowType: Sendable>(db: PowerSyncDatabaseImpl, options: WatchOptions<RowType>) -> AsyncThrowingStream<[RowType], any Error> {
4+
AsyncThrowingStream { continuation in
5+
// Create an outer task to monitor cancellation
6+
let task = Task {
7+
do {
8+
let watchedTables = try await getQuerySourceTables(
9+
db: db,
10+
sql: options.sql,
11+
parameters: options.parameters
12+
)
13+
14+
let updateNotifications = db.pool.tableUpdates.filter { changedTables in
15+
changedTables.contains(where: watchedTables.contains)
16+
}.map { _ in () }
17+
// Allows emitting the first result even if there aren't changes
18+
let withInitial = AsyncAlgorithms.merge([()].async, updateNotifications)
19+
let merged = MergeItemSequence(inner: withInitial)
20+
21+
for try await _ in merged {
22+
// Check if the outer task is cancelled
23+
try Task.checkCancellation()
24+
25+
try continuation.yield(await db.getAll(
26+
sql: options.sql,
27+
parameters: options.parameters,
28+
mapper: options.mapper
29+
))
30+
try await sleepForSeconds(seconds: options.throttle)
31+
}
32+
33+
continuation.finish()
34+
} catch {
35+
if error is CancellationError {
36+
continuation.finish()
37+
} else {
38+
continuation.finish(throwing: error)
39+
}
40+
}
41+
}
42+
43+
// Propagate cancellation from the outer task to the inner task
44+
continuation.onTermination = { @Sendable _ in
45+
task.cancel() // This cancels the inner task when the stream is terminated
46+
}
47+
}
48+
}
49+
50+
private func getQuerySourceTables(
51+
db: PowerSyncDatabaseImpl,
52+
sql: String,
53+
parameters: [Sendable?]
54+
) async throws -> Set<String> {
55+
let rows = try await db.getAll(
56+
sql: "EXPLAIN \(sql)",
57+
parameters: parameters,
58+
mapper: { cursor in
59+
try ExplainQueryResult(
60+
addr: cursor.getString(index: 0),
61+
opcode: cursor.getString(index: 1),
62+
p1: cursor.getInt64(index: 2),
63+
p2: cursor.getInt64(index: 3),
64+
p3: cursor.getInt64(index: 4)
65+
)
66+
}
67+
)
68+
69+
let rootPages = rows.compactMap { row in
70+
if (row.opcode == "OpenRead" || row.opcode == "OpenWrite") &&
71+
row.p3 == 0 && row.p2 != 0
72+
{
73+
return row.p2
74+
}
75+
return nil
76+
}
77+
78+
do {
79+
let pagesData = try StreamingSyncClient.jsonEncoder.encode(rootPages)
80+
guard let pagesString = String(data: pagesData, encoding: .utf8) else {
81+
throw PowerSyncError.operationFailed(
82+
message: "Failed to convert pages data to UTF-8 string"
83+
)
84+
}
85+
86+
let tableRows = try await db.getAll(
87+
sql: "SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))",
88+
parameters: [
89+
pagesString,
90+
]
91+
) { try $0.getString(index: 0) }
92+
93+
return Set(tableRows)
94+
} catch {
95+
throw PowerSyncError.operationFailed(
96+
message: "Could not determine watched query tables",
97+
underlyingError: error
98+
)
99+
}
100+
}
101+
102+
private struct ExplainQueryResult {
103+
let addr: String
104+
let opcode: String
105+
let p1: Int64
106+
let p2: Int64
107+
let p3: Int64
108+
}

0 commit comments

Comments
 (0)