Skip to content

Commit dfd3ef0

Browse files
committed
Add database groups
1 parent dd5adc3 commit dfd3ef0

9 files changed

Lines changed: 128 additions & 19 deletions

File tree

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
final class DatabaseGroupCollection: Sendable {
2+
private let groups: Mutex<[ActiveDatabaseGroupData]> = Mutex([])
3+
4+
fileprivate func closeGroup(identifier: String) {
5+
groups.withLock { $0.removeAll { group in group.identifier == identifier } }
6+
}
7+
8+
func referenceGroup(identifier: String, logger: LoggerProtocol) -> ActiveDatabaseGroup {
9+
groups.withLock { activeDatabases in
10+
let existingGroup = activeDatabases.first { $0.identifier == identifier }
11+
let data: ActiveDatabaseGroupData
12+
if let existingGroup {
13+
logger.warning("""
14+
Multiple PowerSync instances for the same database have been detected.
15+
This can cause unexpected results.
16+
Please check your PowerSync client instantiation logic if this is not intentional.
17+
""", tag: "DatabaseGroupCollection")
18+
data = existingGroup
19+
} else {
20+
data = ActiveDatabaseGroupData(identifier: identifier)
21+
activeDatabases.append(data)
22+
}
23+
24+
return ActiveDatabaseGroup(data: data, collection: self)
25+
}
26+
}
27+
28+
static let shared = DatabaseGroupCollection()
29+
}
30+
31+
private final class ActiveDatabaseGroupData: Sendable {
32+
let identifier: String
33+
let syncCoordinator = SyncCoordinator()
34+
35+
init(identifier: String) {
36+
self.identifier = identifier
37+
}
38+
}
39+
40+
/// A collection of PowerSync databases with the same path / identifier.
41+
///
42+
/// We expect that each group will only ever have one database because we encourage users to write their databases as
43+
/// singletons. We print a warning when two databasees are part of the same group.
44+
/// Additionally, we want to avoid two databases in the same group having a sync stream open at the same time to avoid
45+
/// duplicate resources being used. For this reason, each active database group has a single sync coordinator actor
46+
/// responsible for initializing the sync process for all databases in the group.
47+
final class ActiveDatabaseGroup: Sendable {
48+
fileprivate let data: ActiveDatabaseGroupData
49+
private weak let collection: DatabaseGroupCollection?
50+
51+
fileprivate init(data: ActiveDatabaseGroupData, collection: DatabaseGroupCollection) {
52+
self.data = data
53+
self.collection = collection
54+
}
55+
56+
var syncCoordinator: SyncCoordinator {
57+
data.syncCoordinator
58+
}
59+
60+
deinit {
61+
if let collection {
62+
collection.closeGroup(identifier: self.data.identifier)
63+
}
64+
}
65+
}

Sources/PowerSync/Implementation/PowerSyncDatabaseImpl.swift

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import Foundation
33

44
final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
55
let logger: any LoggerProtocol
6-
let syncCoordinator = SyncCoordinator()
6+
let group: ActiveDatabaseGroup
77
let syncStatus = SwiftSyncStatus()
88
private let dbFilename: String?
99
private let httpClient: HttpClient
@@ -13,6 +13,8 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
1313

1414
init(
1515
dbFilename: String? = nil,
16+
identifier: String,
17+
activeInstanceStore: DatabaseGroupCollection = .shared,
1618
logger: any LoggerProtocol,
1719
pool: any SQLiteConnectionPoolProtocol,
1820
httpClient: HttpClient,
@@ -23,14 +25,15 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
2325
self.schema = AsyncMutex(schema)
2426
self.httpClient = httpClient
2527
self.queries = ConnectionPoolQueries(pool: pool)
28+
self.group = activeInstanceStore.referenceGroup(identifier: identifier, logger: logger)
2629
}
27-
30+
2831
var currentStatus: any SyncStatus {
2932
syncStatus
3033
}
3134

3235
func resolveOfflineSyncStatusIfNotConnected() async throws {
33-
try await syncCoordinator.guardNotConnected(inner: {
36+
try await group.syncCoordinator.guardNotConnected(inner: {
3437
try await resolveOfflineSyncStatus()
3538
}, ifConnected: {})
3639
}
@@ -56,7 +59,7 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
5659

5760
func updateSchema(schema: any SchemaProtocol) async throws {
5861
try await initializer.ensureInitialized(db: self)
59-
try await syncCoordinator.guardNotConnected(
62+
try await group.syncCoordinator.guardNotConnected(
6063
inner: {
6164
let schema = Schema(other: schema)
6265
await self.schema.withMutex { $0 = schema }
@@ -99,7 +102,7 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
99102
}
100103

101104
func disconnect() async throws {
102-
await syncCoordinator.disconnect()
105+
await group.syncCoordinator.disconnect()
103106
}
104107

105108
func syncStream(name: String, params: JsonParam?) -> any SyncStream {
@@ -109,7 +112,7 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
109112
func close() async throws {
110113
try await initialize()
111114
try await initializer.close {
112-
await syncCoordinator.disconnect()
115+
await group.syncCoordinator.disconnect()
113116
try await queries.pool.close()
114117
}
115118
}
@@ -124,12 +127,12 @@ final class PowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
124127
}
125128

126129
func connect(connector: any PowerSyncBackendConnectorProtocol, options: ConnectOptions?) async throws {
127-
await syncCoordinator.connect(db: self, connector: connector, options: options ?? ConnectOptions(), client: httpClient)
130+
await group.syncCoordinator.connect(db: self, connector: connector, options: options ?? ConnectOptions(), client: httpClient)
128131
}
129132

130133
func disconnectAndClear(clearLocal: Bool, soft: Bool) async throws {
131134
try await initialize()
132-
try await syncCoordinator.disconnectAndThen {
135+
try await group.syncCoordinator.disconnectAndThen {
133136
var flags = 0
134137
if clearLocal {
135138
flags |= 1

Sources/PowerSync/Implementation/SyncStreams.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,11 @@ struct PendingSyncStream: SyncStream {
8888
}
8989

9090
func subscribe(ttl: TimeInterval?, priority: BucketPriority?) async throws -> any SyncStreamSubscription {
91-
return try await db.syncCoordinator.streams.subscribe(db: db, stream: self, ttl: ttl, priority: priority)
91+
return try await db.group.syncCoordinator.streams.subscribe(db: db, stream: self, ttl: ttl, priority: priority)
9292
}
9393

9494
func unsubscribeAll() async throws {
95-
let tracker = db.syncCoordinator.streams
95+
let tracker = db.group.syncCoordinator.streams
9696
let key = self.key
9797
tracker.removeStreamGroup(key: key)
9898
try await tracker.subscriptionsCommand(db: db, request: .unsubscribe(key))
@@ -125,7 +125,7 @@ final class SyncSubscriptionImplementation: SyncStreamSubscription {
125125
}
126126

127127
deinit {
128-
db.syncCoordinator.streams.decrementRefCount(key: key)
128+
db.group.syncCoordinator.streams.decrementRefCount(key: key)
129129
}
130130
}
131131

Sources/PowerSync/Implementation/sqlite3/NativeConnectionPool.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ final class NativeConnectionPool: Sendable {
2424

2525
private func dispatchWrites(lease: NativeConnectionLease) throws {
2626
try lease.withIterator(sql: "SELECT powersync_update_hooks('get')", parameters: []) { rows in
27-
var rows = rows
2827
let affectedTables = try rows.next {
2928
let decoder = JSONDecoder()
3029
return try decoder.decode(Set<String>.self, from: try $0.getString(index: 0).data(using: .utf8)!)

Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ private struct ActiveSyncIteration: Sendable {
245245
parameters: syncClient.options.params,
246246
schema: await syncClient.db.schema.inner,
247247
includeDefaults: syncClient.options.includeDefaultStreams,
248-
activeStreams: syncClient.db.syncCoordinator.streams.currentStreams,
248+
activeStreams: syncClient.db.group.syncCoordinator.streams.currentStreams,
249249
appMetadata: syncClient.options.appMetadata,
250250
)))
251251

@@ -357,7 +357,7 @@ private struct ActiveSyncIteration: Sendable {
357357
}
358358

359359
private func watchSyncStreams() async throws {
360-
let changes = syncClient.db.syncCoordinator.streams.streamsChanged.subscribe()
360+
let changes = syncClient.db.group.syncCoordinator.streams.streamsChanged.subscribe()
361361
for await change in changes {
362362
self.localEvents.dispatch(event: .updateSubscriptions(streams: change))
363363
}

Sources/PowerSync/PowerSyncDatabase.swift

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@ public func PowerSyncDatabase(
1616
logger: (any LoggerProtocol) = DefaultLogger(),
1717
initialStatements: [String] = []
1818
) -> PowerSyncDatabaseProtocol {
19-
let location = if dbFilename == ":memory:" {
20-
DatabaseLocation.inMemory
19+
let (location, group) = if dbFilename == ":memory:" {
20+
(DatabaseLocation.inMemory, DatabaseGroupCollection())
2121
} else {
22-
DatabaseLocation.inDefaultDirectory(name: dbFilename)
22+
(DatabaseLocation.inDefaultDirectory(name: dbFilename), .shared)
2323
}
2424
let pool = AsyncConnectionPool(location: location, initialStatements: initialStatements)
2525
return PowerSyncDatabaseImpl(
2626
dbFilename: dbFilename,
27+
identifier: dbFilename,
28+
activeInstanceStore: group,
2729
logger: logger,
2830
pool: pool,
2931
httpClient: PlatformHttpClient.shared,
@@ -53,6 +55,7 @@ public func OpenedPowerSyncDatabase(
5355
logger: (any LoggerProtocol) = DefaultLogger()
5456
) -> PowerSyncDatabaseProtocol {
5557
return PowerSyncDatabaseImpl(
58+
identifier: identifier,
5659
logger: logger,
5760
pool: pool,
5861
httpClient: PlatformHttpClient.shared,
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
@testable import PowerSync
2+
import Testing
3+
4+
@Suite
5+
struct MultipleInstanceTest {
6+
@Test func warnsAboutMultipleInstances() async throws {
7+
let pool = AsyncConnectionPool(location: .inMemory)
8+
let logWriter = TestLogWriterAdapter()
9+
let logger = DefaultLogger(minSeverity: .warning, writers: [logWriter])
10+
let schema = Schema()
11+
12+
let a = PowerSyncDatabaseImpl(identifier: "id", logger: logger, pool: pool, httpClient: PlatformHttpClient.shared, schema: schema)
13+
try #require(logWriter.getLogs().isEmpty)
14+
15+
let b = PowerSyncDatabaseImpl(identifier: "id", logger: logger, pool: pool, httpClient: PlatformHttpClient.shared, schema: schema)
16+
let _ = try #require(logWriter.getLogs().first { $0.contains("Multiple PowerSync instances for the same database have been detected.") })
17+
18+
// Ensure databases are kept around until the end of the test (if a gets closed before, we would't see the warning).
19+
let _ = consume a
20+
let _ = consume b
21+
}
22+
23+
@Test func doesNotWarnForClosedInstances() async throws {
24+
let pool = AsyncConnectionPool(location: .inMemory)
25+
let logWriter = TestLogWriterAdapter()
26+
let logger = DefaultLogger(minSeverity: .warning, writers: [logWriter])
27+
let schema = Schema()
28+
29+
do {
30+
let _ = PowerSyncDatabaseImpl(identifier: "id2", logger: logger, pool: pool, httpClient: PlatformHttpClient.shared, schema: schema)
31+
}
32+
33+
let b = PowerSyncDatabaseImpl(identifier: "id2", logger: logger, pool: pool, httpClient: PlatformHttpClient.shared, schema: schema)
34+
try #require(logWriter.getLogs().isEmpty)
35+
let _ = consume b
36+
}
37+
}

Tests/PowerSyncTests/Kotlin/SqlCursorTests.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ final class SqlCursorTests: XCTestCase {
6666
])
6767

6868
database = PowerSyncDatabaseImpl(
69-
dbFilename: ":memory:",
69+
identifier: ":memory:",
70+
activeInstanceStore: DatabaseGroupCollection(),
7071
logger: DefaultLogger(),
7172
pool: AsyncConnectionPool(location: .inMemory),
7273
httpClient: PlatformHttpClient.shared,

Tests/PowerSyncTests/SyncTests.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,8 @@ let defaultSchema = Schema(tables: [
673673

674674
private func openDatabase(_ client: any HttpClient, schema: Schema = defaultSchema, logger: any LoggerProtocol = DefaultLogger()) -> PowerSyncDatabaseProtocol {
675675
return PowerSyncDatabaseImpl(
676-
dbFilename: ":memory:",
676+
identifier: ":memory:",
677+
activeInstanceStore: DatabaseGroupCollection(),
677678
logger: logger,
678679
pool: AsyncConnectionPool(location: .inMemory),
679680
httpClient: client,

0 commit comments

Comments
 (0)