88import SQLite3
99import Foundation
1010
11+ /// A change that happened in the database.
12+ /// Represents many events. Changes are published
13+ /// after commit which may contain many events
14+ /// that happened within the transaction.
15+ public struct DatabaseChange {
16+ /// A set of tables that we affected by the commit
17+ public let affectedTables : Set < String >
18+ /// The raw list of events from SQLite
19+ public let events : [ DatabaseEvent ]
20+ }
21+
22+ /// The raw fields SQLite gives us doing an `update_hook`
1123public struct DatabaseEvent : Sendable {
24+ /// What kind of operation happened
1225 public let operation : Operation
26+ /// The database affected if any
1327 public let databaseName : String ?
28+ /// The table affected if any
1429 public let tableName : String ?
30+ /// The row id of the affected row.
1531 public let rowId : Int64
1632
1733 public enum Operation : Int32 , Sendable {
@@ -21,53 +37,65 @@ public struct DatabaseEvent: Sendable {
2137 }
2238}
2339
40+ /// Manages all of the subscriptions to a database.
41+ /// Will listen to SQLites hooks as well as recieve
42+ /// `didCommit` calls from the owning database `Connection`.
2443class DatabaseObserver : @unchecked Sendable {
44+ /// Lock to protect the `subscribers`.
45+ /// Would have been nice to make this an actor but it would
46+ /// have made things `async` that shouldn't be like
47+ /// cancellation and others.
2548 private let lock = NSLock ( )
49+ /// A map of all subscribers. The key is their pointer
2650 private var subscribers : [ ObjectIdentifier : any DatabaseSubscriber ] = [ : ]
27-
51+ /// We get the events from the database before the commit happens.
52+ /// So if a caller requeries the database they will get old
53+ /// data since the write would be in an uncommited transaction.
54+ /// So we keep a list of all events that happened and then
55+ /// on commit we can dispatch them all.
2856 private var pendingEvents : [ DatabaseEvent ] = [ ]
2957
58+ /// Subscribes the `subscriber` to any database events.
59+ /// Events are flushed upon commit and not as they come in.
3060 func subscribe( subscriber: any DatabaseSubscriber ) {
31- lock. lock ( )
32- defer { lock. unlock ( ) }
33-
34- let id = ObjectIdentifier ( subscriber)
35-
36- guard subscribers [ id] == nil else {
37- return
61+ lock. withLock {
62+ let id = ObjectIdentifier ( subscriber)
63+ guard subscribers [ id] == nil else { return }
64+ subscribers [ id] = subscriber
3865 }
39-
40- subscribers [ id] = subscriber
4166 }
4267
68+ /// Cancels the subscribers subscription.
4369 func cancel( subscriber: any DatabaseSubscriber ) {
44- lock. lock ( )
45- defer { lock. unlock ( ) }
46-
47- subscribers [ ObjectIdentifier ( subscriber) ] = nil
48- }
49-
50- func receive( event: DatabaseEvent ) {
51- lock. lock ( )
52- defer { lock. unlock ( ) }
53-
54- pendingEvents. append ( event)
70+ lock. withLock {
71+ subscribers [ ObjectIdentifier ( subscriber) ] = nil
72+ }
5573 }
5674
75+ /// Must be called by the owning database.
76+ /// We do not use `sqlite3_commit_hook` since it is actually
77+ /// called during the commit. Not after it.
5778 func didCommit( ) {
58- lock. lock ( )
59- defer { lock. unlock ( ) }
60-
61- let events = pendingEvents
62- pendingEvents. removeAll ( )
63-
64- for subscriber in subscribers. values {
65- for event in events {
66- subscriber. receive ( event: event)
79+ lock. withLock {
80+ let events = pendingEvents
81+ pendingEvents. removeAll ( )
82+
83+ // Merge all events into a single change
84+ let change = DatabaseChange (
85+ affectedTables: Set ( events. compactMap ( \. databaseName) ) ,
86+ events: events
87+ )
88+
89+ for subscriber in subscribers. values {
90+ subscriber. receive ( change: change)
6791 }
6892 }
6993 }
7094
95+ /// SQLite hooks are a per connection thing. Updates from
96+ /// one connection do no publish on another so the owning
97+ /// database must install the hooks for event connection
98+ /// it initializes.
7199 func installHooks( into connection: SQLiteConnection ) {
72100 sqlite3_update_hook (
73101 connection. sqliteConnection,
@@ -87,7 +115,7 @@ class DatabaseObserver: @unchecked Sendable {
87115 )
88116 }
89117
90- func receiveSqliteUpdateHook(
118+ private func receiveSqliteUpdateHook(
91119 operation: Int32 ,
92120 dbName: UnsafePointer < CChar > ? ,
93121 tableName: UnsafePointer < CChar > ? ,
@@ -104,10 +132,19 @@ class DatabaseObserver: @unchecked Sendable {
104132 rowId: rowId
105133 )
106134
107- receive ( event: event)
135+ lock. withLock {
136+ pendingEvents. append ( event)
137+ }
108138 }
109139}
110140
141+
142+ /// A subscriber that listens to database changes
111143public protocol DatabaseSubscriber : AnyObject {
112- func receive( event: DatabaseEvent )
144+ /// After a commit, this is called with the `change`
145+ /// which contains the events that happened during the
146+ /// transaction and any additional metadata
147+ ///
148+ /// - Parameter change: The metadata about the change.
149+ func receive( change: DatabaseChange )
113150}
0 commit comments