11import AsyncAlgorithms
22
33func watchImpl< RowType: Sendable > ( db: PowerSyncDatabaseImpl , options: WatchOptions < RowType > ) -> AsyncThrowingStream < [ RowType ] , any Error > {
4+ // Note: Hold a weak reference to the database, active watch streams shouldn't prevent a database from being closed.
45 AsyncThrowingStream { continuation in
56 // Create an outer task to monitor cancellation
6- let task = Task {
7+ let task = Task { [ weak db] in
8+ var didFinish = false
9+ defer {
10+ if !didFinish {
11+ continuation. finish ( )
12+ }
13+ }
14+
715 do {
8- let watchedTables = try await getQuerySourceTables (
9- db: db,
10- sql: options. sql,
11- parameters: options. parameters
12- )
16+ let watchedTables : Set < String >
17+ let pool : any SQLiteConnectionPoolProtocol
18+
19+ if let db {
20+ watchedTables = try await getQuerySourceTables (
21+ db: db,
22+ sql: options. sql,
23+ parameters: options. parameters
24+ )
25+ pool = db. pool
26+ } else {
27+ return
28+ }
1329
14- let updateNotifications = db . pool. tableUpdates. filter { changedTables in
30+ let updateNotifications = pool. tableUpdates. filter { changedTables in
1531 changedTables. contains ( where: watchedTables. contains)
1632 } . map { _ in ( ) }
1733 // Allows emitting the first result even if there aren't changes
@@ -21,6 +37,7 @@ func watchImpl<RowType: Sendable>(db: PowerSyncDatabaseImpl, options: WatchOptio
2137 for try await _ in merged {
2238 // Check if the outer task is cancelled
2339 try Task . checkCancellation ( )
40+ guard let db else { return }
2441
2542 try continuation. yield ( await db. getAll (
2643 sql: options. sql,
@@ -29,12 +46,9 @@ func watchImpl<RowType: Sendable>(db: PowerSyncDatabaseImpl, options: WatchOptio
2946 ) )
3047 try await sleepForSeconds ( seconds: options. throttle)
3148 }
32-
33- continuation. finish ( )
3449 } catch {
35- if error is CancellationError {
36- continuation. finish ( )
37- } else {
50+ if !( error is CancellationError ) {
51+ didFinish = true
3852 continuation. finish ( throwing: error)
3953 }
4054 }
@@ -47,8 +61,19 @@ func watchImpl<RowType: Sendable>(db: PowerSyncDatabaseImpl, options: WatchOptio
4761 }
4862}
4963
64+ private func prepareWatch(
65+ db: borrowing PowerSyncDatabaseImpl ,
66+ sql: String ,
67+ parameters: [ Sendable ? ]
68+ ) async throws -> ( Set < String > , any SQLiteConnectionPoolProtocol ) {
69+ (
70+ try await getQuerySourceTables ( db: db, sql: sql, parameters: parameters) ,
71+ db. pool,
72+ )
73+ }
74+
5075private func getQuerySourceTables(
51- db: PowerSyncDatabaseImpl ,
76+ db: borrowing PowerSyncDatabaseImpl ,
5277 sql: String ,
5378 parameters: [ Sendable ? ]
5479) async throws -> Set < String > {
0 commit comments