@@ -8,6 +8,17 @@ import PostgresKit
88/// and metadata from PostgreSQL database. It handles the actual database operations
99/// that the PostgreSQLCorpus uses for persistence and synchronization.
1010public actor PostgresSQLStorage {
11+ public struct ProgramSyncRecord {
12+ public let program : Program
13+ public let hash : String
14+ public let insertedAt : Date
15+
16+ public init ( program: Program , hash: String , insertedAt: Date ) {
17+ self . program = program
18+ self . hash = hash
19+ self . insertedAt = insertedAt
20+ }
21+ }
1122
1223 private let databasePool : DatabasePool
1324 private let logger : Logging . Logger
@@ -139,6 +150,27 @@ public actor PostgresSQLStorage {
139150 self . enableLogging = enableLogging
140151 self . logger = Logging . Logger ( label: " PostgresSQLStorage " )
141152 }
153+
154+ public func fetchLatestProgramSyncCursor( ) async throws -> ( insertedAt: Date , hash: String ) ? {
155+ return try await databasePool. withConnection { connection in
156+ let query = PostgresQuery ( stringLiteral: """
157+ SELECT inserted_at, program_hash
158+ FROM program
159+ ORDER BY inserted_at DESC, program_hash DESC
160+ LIMIT 1
161+ """ )
162+
163+ let result = try await connection. query ( query, logger: self . logger)
164+ let rows = try await result. collect ( )
165+
166+ guard let row = rows. first else {
167+ return nil
168+ }
169+
170+ let ( insertedAt, hash) = try row. decode ( ( Date, String) . self, context: . default)
171+ return ( insertedAt, hash)
172+ }
173+ }
142174
143175 /// Register a new fuzzer instance in the database or register worker with an existing fuzzer
144176 ///
@@ -769,22 +801,24 @@ public actor PostgresSQLStorage {
769801 /// - Parameters:
770802 /// - since: The timestamp to fetch programs from
771803 /// - limit: Maximum number of programs to fetch
772- public func fetchNewPrograms( since: Date, limit: Int = 100 ) async throws -> [ ( Program , String ) ] {
804+ public func fetchNewPrograms( since insertedAt : Date, after hash : String , limit: Int = 100 ) async throws -> [ ProgramSyncRecord ] {
773805 if self . enableLogging {
774- self . logger. info ( " Fetching new programs since : \( since ) " )
806+ self . logger. info ( " Fetching new programs after cursor : \( insertedAt ) / \( hash ) " )
775807 }
776808
777809 return try await databasePool. withConnection { connection in
778810 // Format the date for PostgreSQL
779811 let dateFormatter = ISO8601DateFormatter ( )
780812 dateFormatter. formatOptions = [ . withInternetDateTime, . withFractionalSeconds]
781- let sinceString = dateFormatter. string ( from: since)
813+ let sinceString = dateFormatter. string ( from: insertedAt)
814+ let escapedHash = hash. replacingOccurrences ( of: " ' " , with: " '' " )
782815
783816 let query = PostgresQuery ( stringLiteral: """
784- SELECT program_hash, program_base64
817+ SELECT program_hash, program_base64, inserted_at
785818 FROM program
786819 WHERE inserted_at > ' \( sinceString) '
787- ORDER BY inserted_at ASC
820+ OR (inserted_at = ' \( sinceString) ' AND program_hash > ' \( escapedHash) ')
821+ ORDER BY inserted_at ASC, program_hash ASC
788822 LIMIT \( limit)
789823 """ )
790824
@@ -799,14 +833,14 @@ public actor PostgresSQLStorage {
799833 self . logger. info ( " Fetch query returned \( rows. count) rows " )
800834 }
801835
802- var programs : [ ( Program , String ) ] = [ ]
836+ var programs : [ ProgramSyncRecord ] = [ ]
803837
804838 for row in rows {
805839 do {
806- let ( hash, data) = try row. decode ( ( String, String) . self, context: . default)
840+ let ( hash, data, rowInsertedAt ) = try row. decode ( ( String, String, Date ) . self, context: . default)
807841
808842 if let program = try ? DatabaseUtils . decodeProgramFromBase64 ( base64: data) {
809- programs. append ( ( program, hash) )
843+ programs. append ( ProgramSyncRecord ( program: program , hash: hash , insertedAt : rowInsertedAt ) )
810844 } else if self . enableLogging {
811845 self . logger. warning ( " Failed to decode program for hash: \( hash) " )
812846 }
@@ -828,6 +862,18 @@ public actor PostgresSQLStorage {
828862 public func refreshMaterializedViews( ) async throws {
829863 try await databasePool. withConnection { connection in
830864 do {
865+ let lockQuery = PostgresQuery ( stringLiteral: " SELECT pg_try_advisory_lock(937451) " )
866+ let lockResult = try await connection. query ( lockQuery, logger: self . logger)
867+ let lockRows = try await lockResult. collect ( )
868+ let shouldRefresh = try lockRows. first? . decode ( Bool . self, context: . default) ?? false
869+
870+ guard shouldRefresh else {
871+ if self . enableLogging {
872+ self . logger. info ( " Skipping materialized view refresh because another worker holds the refresh lock " )
873+ }
874+ return
875+ }
876+
831877 let startTime = Date ( )
832878
833879 // Call the PostgreSQL function that refreshes all materialized views
@@ -846,7 +892,12 @@ public actor PostgresSQLStorage {
846892 }
847893 }
848894 }
895+
896+ let unlockQuery = PostgresQuery ( stringLiteral: " SELECT pg_advisory_unlock(937451) " )
897+ _ = try await connection. query ( unlockQuery, logger: self . logger)
849898 } catch {
899+ let unlockQuery = PostgresQuery ( stringLiteral: " SELECT pg_advisory_unlock(937451) " )
900+ _ = try ? await connection. query ( unlockQuery, logger: self . logger)
850901 if self . enableLogging {
851902 self . logger. error ( " Failed to refresh materialized views: \( String ( reflecting: error) ) " )
852903 }
@@ -874,4 +925,4 @@ public actor PostgresSQLStorage {
874925 }
875926 }
876927 }
877- }
928+ }
0 commit comments