Skip to content

Commit 0453284

Browse files
authored
Multi-process database support: absolute paths, open retry, cross-process change signal (#147)
1 parent 1c7e77c commit 0453284

12 files changed

Lines changed: 473 additions & 51 deletions

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
* `PowerSyncDatabase(dbFilename:)` now accepts an absolute path (starting with `/`), used
6+
as-is so the database can live in an App Group container shared with app extensions.
7+
Plain filenames keep the existing behavior. The SDK coordinates opening the database to
8+
avoid conflicts and can share update notifications across the main app and extensions.
9+
310
## 1.14.4
411

512
- Fix crash when running a statement in a cursor callback ([#148](https://github.com/powersync-ja/powersync-swift/issues/148)).

Sources/PowerSync/Implementation/AsyncConnectionPool.swift

Lines changed: 110 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,49 @@ import Foundation
55
enum DatabaseLocation {
66
case inMemory
77
case inDefaultDirectory(name: String)
8-
8+
case atPath(String)
9+
10+
/// The on-disk path other processes can share, or `nil` when the database can't be
11+
/// shared. Only absolute paths (typically an App Group container) are shareable; the
12+
/// default directory is inside the app's own sandbox, which extensions cannot reach.
13+
var sharedPath: String? {
14+
switch self {
15+
case .inMemory, .inDefaultDirectory:
16+
return nil
17+
case let .atPath(path):
18+
return path
19+
}
20+
}
21+
922
func openConnection(writer: Bool) throws -> RawSqliteConnection {
10-
var db: OpaquePointer?
11-
let rc: Int32
12-
let path: String
13-
1423
switch self {
1524
case .inMemory:
16-
path = ":memory:"
17-
rc = sqlite3_open_v2(path, &db, SQLITE_OPEN_READWRITE, nil)
18-
case .inDefaultDirectory(let name):
19-
let fileManager = FileManager.default
20-
let databaseDirectory = (try DatabaseLocation.appleDefaultDatabaseDirectory()).path
21-
22-
if !fileManager.fileExists(atPath: databaseDirectory) {
23-
try fileManager.createDirectory(atPath: databaseDirectory, withIntermediateDirectories: true)
24-
}
25+
return try DatabaseLocation.open(path: ":memory:", flags: SQLITE_OPEN_READWRITE)
26+
case let .inDefaultDirectory(name):
27+
let directory = (try DatabaseLocation.appleDefaultDatabaseDirectory()).path
28+
return try DatabaseLocation.openFile(at: "\(directory)/\(name)", in: directory, writer: writer)
29+
case let .atPath(absolutePath):
30+
let directory = (absolutePath as NSString).deletingLastPathComponent
31+
return try DatabaseLocation.openFile(at: absolutePath, in: directory, writer: writer)
32+
}
33+
}
2534

26-
path = "\(databaseDirectory)/\(name)"
27-
let flags = if writer {
28-
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE
29-
} else {
30-
SQLITE_OPEN_READONLY
31-
}
32-
rc = sqlite3_open_v2(path, &db, flags, nil)
35+
/// Creates `directory` if needed, then opens the database file with the right flags.
36+
private static func openFile(at path: String, in directory: String, writer: Bool) throws -> RawSqliteConnection {
37+
let fileManager = FileManager.default
38+
if !fileManager.fileExists(atPath: directory) {
39+
try fileManager.createDirectory(atPath: directory, withIntermediateDirectories: true)
3340
}
41+
let flags = writer ? (SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE) : SQLITE_OPEN_READONLY
42+
return try open(path: path, flags: flags)
43+
}
3444

45+
private static func open(path: String, flags: Int32) throws -> RawSqliteConnection {
46+
var db: OpaquePointer?
47+
let rc = sqlite3_open_v2(path, &db, flags, nil)
3548
if rc != 0 {
3649
throw PowerSyncError.sqliteError(extendedResultCode: rc, offset: nil, message: "Could not open database \(path)", errorString: nil, sql: nil)
3750
}
38-
3951
return RawSqliteConnection(connection: db!)
4052
}
4153

@@ -51,7 +63,8 @@ enum DatabaseLocation {
5163
throw PowerSyncError.operationFailed(message: "Unable to find application support directory")
5264
}
5365

54-
return documentsDirectory.appendingPathComponent("databases")
66+
// `isDirectory: true` avoids a blocking stat to infer the URL kind.
67+
return documentsDirectory.appendingPathComponent("databases", isDirectory: true)
5568
}
5669
}
5770

@@ -62,11 +75,16 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
6275
private let logger: any LoggerProtocol
6376
private let tableUpdatesStream = BroadcastStream<Set<String>>()
6477
private let opener = PoolOpener()
78+
/// Cross-process change signaling; `nil` for in-memory databases (nothing to share).
79+
private let changeSignal: CrossProcessChangeSignal?
6580

6681
init(location: DatabaseLocation, logger: any LoggerProtocol, initialStatements: [String] = []) {
6782
self.location = location
6883
self.logger = logger
6984
self.initialStatements = initialStatements
85+
self.changeSignal = location.sharedPath.map {
86+
CrossProcessChangeSignal(databasePath: $0, logger: logger)
87+
}
7088
}
7189

7290
var tableUpdates: AsyncStream<Set<String>> {
@@ -88,12 +106,15 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
88106
let _ = try context.execute(sql: stmt, parameters: [])
89107
}
90108

109+
// The busy handler is installed first so later statements wait instead of failing,
110+
// but note it does NOT apply to the WAL transition below.
111+
let _ = try context.execute(sql: "pragma busy_timeout = 30000", parameters: [])
112+
91113
if isWriter {
92114
let _ = try context.execute(sql: "pragma journal_mode = WAL", parameters: [])
93115
}
94116

95117
let _ = try context.execute(sql: "pragma journal_size_limit = \(6 * 1024 * 1024)", parameters: [])
96-
let _ = try context.execute(sql: "pragma busy_timeout = 30000", parameters: [])
97118
let _ = try context.execute(sql: "pragma cache_size = -\(50 * 1024)", parameters: [])
98119

99120
if isWriter {
@@ -110,6 +131,63 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
110131
}
111132
}
112133

134+
/// Whether an error from opening/configuring a connection is transient contention
135+
/// (another process holds the file, e.g. mid WAL-recovery) and worth retrying.
136+
/// `pragma journal_mode = WAL` reports SQLITE_BUSY/SQLITE_BUSY_RECOVERY without
137+
/// consulting the busy handler, so `busy_timeout` cannot cover the open path.
138+
private static func isTransientOpenError(_ error: any Error) -> Bool {
139+
guard case let PowerSyncError.sqliteError(extendedResultCode, _, _, _, _) = error else {
140+
return false
141+
}
142+
let primary = extendedResultCode & 0xFF
143+
return primary == SQLITE_BUSY || primary == SQLITE_LOCKED
144+
}
145+
146+
/// Opens and configures all connections of the pool in a single blocking unit of work.
147+
/// One attempt: `RawSqliteConnection` is `~Copyable` and cannot cross the async
148+
/// boundary, so the whole pool is built here and the retry/backoff lives in the async
149+
/// caller (``buildPoolWithRetry(handleUpdates:)``).
150+
fileprivate func buildPool(handleUpdates: @escaping @Sendable (Set<String>) -> Void) throws -> NativeConnectionPool {
151+
let writer = try location.openConnection(writer: true)
152+
try configureConnection(connection: writer, isWriter: true)
153+
154+
if case .inMemory = location {
155+
return NativeConnectionPool(singleConnection: writer, logger: logger, handleUpdates: handleUpdates)
156+
}
157+
let numReaders = 4
158+
var readers = RigidDeque<RawSqliteConnection>(capacity: numReaders)
159+
while !readers.isFull {
160+
let reader = try location.openConnection(writer: false)
161+
try configureConnection(connection: reader, isWriter: false)
162+
readers.append(reader)
163+
}
164+
return NativeConnectionPool(writer: writer, readers: readers, logger: logger, handleUpdates: handleUpdates)
165+
}
166+
167+
/// Builds the pool, retrying with asynchronous backoff while another process holds the
168+
/// database (apps and their widgets/extensions open concurrently). Awaiting between
169+
/// attempts pins no thread and is cancellable, unlike a blocking sleep.
170+
private func buildPoolWithRetry(handleUpdates: @escaping @Sendable (Set<String>) -> Void) async throws -> NativeConnectionPool {
171+
// ~5s total budget: 10ms doubling to a 250ms cap. Concurrent opens (app + widget)
172+
// resolve in tens of milliseconds; a database still busy after seconds is stuck.
173+
// `Task.sleep(nanoseconds:)` keeps the package's iOS 15 / macOS 12 floor while
174+
// staying async and cancellable.
175+
var delayNanoseconds: UInt64 = 10_000_000
176+
let deadline = DispatchTime.now() + .seconds(5)
177+
while true {
178+
do {
179+
return try await runBlocking { try self.buildPool(handleUpdates: handleUpdates) }
180+
} catch where Self.isTransientOpenError(error) && DispatchTime.now() < deadline {
181+
logger.debug(
182+
"database busy while opening (another process holds it); retrying in \(delayNanoseconds / 1_000_000)ms",
183+
tag: "AsyncConnectionPool"
184+
)
185+
try await Task.sleep(nanoseconds: delayNanoseconds)
186+
delayNanoseconds = min(delayNanoseconds * 2, 250_000_000)
187+
}
188+
}
189+
}
190+
113191
/// Opens connections on a background thread to obtain the native connection pool.
114192
private func obtainInner() async throws -> NativeConnectionPool {
115193
try await opener.obtainPool(pool: self)
@@ -137,6 +215,7 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
137215
}
138216

139217
func close() async throws {
218+
changeSignal?.stop()
140219
try await self.opener.close()
141220
}
142221

@@ -152,27 +231,16 @@ final class AsyncConnectionPool: SQLiteConnectionPoolProtocol {
152231
try registerPowerSyncCoreExtension()
153232
let handleUpdates: @Sendable (_: Set<String>) -> () = { [weak context] updates in
154233
context?.tableUpdatesStream.dispatch(event: updates)
234+
// Tell other processes sharing this file that tables changed.
235+
context?.changeSignal?.post()
155236
}
156-
157-
let pool = try await context.runBlocking {
158-
let writer = try context.location.openConnection(writer: true)
159-
try context.configureConnection(connection: writer, isWriter: true)
160-
161-
if case .inMemory = context.location {
162-
return NativeConnectionPool(singleConnection: writer, logger: context.logger, handleUpdates: handleUpdates)
163-
} else {
164-
let numReaders = 4
165-
var readers = RigidDeque<RawSqliteConnection>(capacity: numReaders)
166-
while !readers.isFull {
167-
let connection = try context.location.openConnection(writer: false)
168-
try context.configureConnection(connection: connection, isWriter: false)
169-
readers.append(connection)
170-
}
171-
172-
return NativeConnectionPool(writer: writer, readers: readers, logger: context.logger, handleUpdates: handleUpdates)
173-
}
237+
context.changeSignal?.start { [weak context] in
238+
// Another process (or this one; harmless, throttled downstream) changed
239+
// the database outside this pool's update hooks.
240+
context?.tableUpdatesStream.dispatch(event: [EXTERNAL_CHANGES_MARKER])
174241
}
175242

243+
let pool = try await context.buildPoolWithRetry(handleUpdates: handleUpdates)
176244
self.pool = pool
177245
return pool
178246
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import Foundation
2+
import notify
3+
4+
/// Cross-process change signaling for a database file, built on Darwin notifications —
5+
/// the same mechanism Core Data uses for its remote-change notifications.
6+
///
7+
/// PowerSync's update hooks only observe writes made through the local connection pool.
8+
/// When several processes share a database file (an app and its widgets or App Intents
9+
/// extensions), each process posts this signal after committing a write; the others
10+
/// re-emit their `tableUpdates` with ``EXTERNAL_CHANGES_MARKER`` so `watch` queries
11+
/// re-run and the upload client checks `ps_crud`.
12+
///
13+
/// Darwin notifications carry no payload and are coalesced under pressure, which is fine:
14+
/// the marker means "something changed, re-query". Deliveries to the posting process
15+
/// itself are not suppressed — a redundant re-query (already throttled) is preferable to
16+
/// the race a sender-stamp suppression scheme introduces, where an external change could
17+
/// be misattributed and silently dropped.
18+
final class CrossProcessChangeSignal: @unchecked Sendable {
19+
private let name: String
20+
private let logger: any LoggerProtocol
21+
private var token: Int32 = NOTIFY_TOKEN_INVALID
22+
private let queue = DispatchQueue(label: "powersync.cross-process-signal")
23+
24+
init(databasePath: String, logger: any LoggerProtocol) {
25+
// Stable across processes: both sides derive the name from the canonical path.
26+
let canonical = URL(fileURLWithPath: databasePath).standardizedFileURL.path
27+
self.name = "com.powersync.changes.\(Self.fnv1a(canonical))"
28+
self.logger = logger
29+
}
30+
31+
/// Starts listening; `onExternalChange` runs on a private queue for every signal
32+
/// (including this process's own posts).
33+
func start(onChange: @escaping @Sendable () -> Void) {
34+
guard token == NOTIFY_TOKEN_INVALID else {
35+
return
36+
}
37+
let status = notify_register_dispatch(name, &token, queue) { _ in
38+
onChange()
39+
}
40+
if status != NOTIFY_STATUS_OK {
41+
logger.warning(
42+
"could not register cross-process change signal (status \(status)); "
43+
+ "changes from other processes will not wake watch queries",
44+
tag: "CrossProcessChangeSignal"
45+
)
46+
token = NOTIFY_TOKEN_INVALID
47+
}
48+
}
49+
50+
/// Posts the signal; called after every committed write.
51+
func post() {
52+
notify_post(name)
53+
}
54+
55+
func stop() {
56+
if token != NOTIFY_TOKEN_INVALID {
57+
notify_cancel(token)
58+
token = NOTIFY_TOKEN_INVALID
59+
}
60+
}
61+
62+
deinit {
63+
stop()
64+
}
65+
66+
/// FNV-1a 64-bit, hex-encoded: deterministic and dependency-free.
67+
private static func fnv1a(_ input: String) -> String {
68+
var hash: UInt64 = 0xCBF2_9CE4_8422_2325
69+
for byte in input.utf8 {
70+
hash ^= UInt64(byte)
71+
hash = hash &* 0x0000_0100_0000_01B3
72+
}
73+
return String(hash, radix: 16)
74+
}
75+
}

Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,13 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
121121
func close(deleteDatabase: Bool) async throws {
122122
try await close()
123123
if deleteDatabase, let dbFilename {
124-
// We can use the supplied dbLocation when we support that in future
125-
let directory = try DatabaseLocation.appleDefaultDatabaseDirectory()
126-
try deleteSQLiteFiles(dbFilename: dbFilename, in: directory)
124+
if dbFilename.hasPrefix("/") {
125+
let url = URL(fileURLWithPath: dbFilename)
126+
try deleteSQLiteFiles(dbFilename: url.lastPathComponent, in: url.deletingLastPathComponent())
127+
} else {
128+
let directory = try DatabaseLocation.appleDefaultDatabaseDirectory()
129+
try deleteSQLiteFiles(dbFilename: dbFilename, in: directory)
130+
}
127131
}
128132
}
129133

Sources/PowerSync/Implementation/queries/watch.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ func watchImpl<RowType: Sendable>(db: PowerSyncDatabaseImpl, options: WatchOptio
2929

3030
let updateNotifications = pool.tableUpdates.filter { changedTables in
3131
changedTables.contains(where: watchedTables.contains)
32+
// Another process changed unknown tables: conservatively re-query.
33+
|| changedTables.contains(EXTERNAL_CHANGES_MARKER)
3234
}.map { _ in () }
3335
// Allows emitting the first result even if there aren't changes
3436
let withInitial = AsyncAlgorithms.merge([()].async, updateNotifications)

Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ final class StreamingSyncClient: Sendable {
3737
}
3838

3939
private func uploadLoop(signals: SyncSignals) async throws {
40-
let updates = db.pool.tableUpdates.filter { updates in updates.contains("ps_crud") }.map { _ in () }
40+
let updates = db.pool.tableUpdates.filter { updates in
41+
updates.contains("ps_crud") || updates.contains(EXTERNAL_CHANGES_MARKER)
42+
}.map { _ in () }
4143
let allTriggers = MergeItemSequence(inner: AsyncAlgorithms.merge(updates, signals.signalCrudUpload.subscribe())).makeAsyncIterator()
4244

4345
// Use a do-while loop to ensure we start an upload iteration even if we can't connect to the service.

Sources/PowerSync/PowerSyncDatabase.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,13 @@ public let DEFAULT_DB_FILENAME = "powersync.db"
66
/// Creates a PowerSyncDatabase instance
77
/// - Parameters:
88
/// - schema: The database schema
9-
/// - dbFilename: The database filename. Defaults to "powersync.db"
9+
/// - dbFilename: The database filename. Defaults to "powersync.db". Plain names are
10+
/// stored in the default databases directory; an absolute path (starting with "/") is
11+
/// used as-is, which allows sharing the database with app extensions through an App
12+
/// Group container. The database itself can be used concurrently from the main app and
13+
/// its extensions, but only the main app should call `connect`. Two sync connections on
14+
/// the same database waste resources and are untested (and could corrupt the sync
15+
/// client); let extensions read and write, and leave syncing to the app.
1016
/// - logger: Optional logging interface
1117
/// - initialStatements: An optional list of statements to run as the database is opened.
1218
/// - Returns: A configured PowerSyncDatabase instance
@@ -18,6 +24,8 @@ public func PowerSyncDatabase(
1824
) -> PowerSyncDatabaseProtocol {
1925
let (location, group) = if dbFilename == ":memory:" {
2026
(DatabaseLocation.inMemory, DatabaseGroupCollection())
27+
} else if dbFilename.hasPrefix("/") {
28+
(DatabaseLocation.atPath(dbFilename), .shared)
2129
} else {
2230
(DatabaseLocation.inDefaultDirectory(name: dbFilename), .shared)
2331
}

Sources/PowerSync/Protocol/SQLiteConnectionPool.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ public protocol SQLiteStatementIteratorProtocol {
5757
/// This is the underlying pool implementation on which the higher-level PowerSync Swift SDK is built on.
5858
///
5959
/// This is an internal protocol and should not be implemented outside of the PowerSync SDK.
60+
/// Emitted on `tableUpdates` when another process changed the database. The concrete
61+
/// tables are unknown (cross-process signals carry no payload), so consumers must treat
62+
/// this as potentially matching every table they watch.
63+
let EXTERNAL_CHANGES_MARKER = "__powersync_external_changes__"
64+
6065
public protocol SQLiteConnectionPoolProtocol: Sendable {
6166
var tableUpdates: AsyncStream<Set<String>> { get }
6267

0 commit comments

Comments
 (0)