Skip to content

Commit a05f8b4

Browse files
committed
Added resuming + batching
1 parent 91e3143 commit a05f8b4

3 files changed

Lines changed: 336 additions & 68 deletions

File tree

Sources/Fuzzilli/Corpus/PostgreSQLCorpus.swift

Lines changed: 95 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
5555

5656
/// Batch execution storage
5757
private var pendingExecutions: [(Program, ProgramAspects, DatabaseExecutionPurpose)] = []
58-
private let executionBatchSize = 10
58+
private let executionBatchSize: Int
5959
private let executionBatchLock = NSLock()
6060

6161
// MARK: - Initialization
@@ -82,30 +82,93 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
8282
self.resume = resume
8383
self.storage = PostgreSQLStorage(databasePool: databasePool)
8484

85+
// Set fixed batch size to 1 million for optimal performance
86+
self.executionBatchSize = 1_000_000
87+
8588
self.programs = RingBuffer(maxSize: maxSize)
8689
self.ages = RingBuffer(maxSize: maxSize)
8790
self.programHashes = RingBuffer(maxSize: maxSize)
8891

8992
super.init(name: "PostgreSQLCorpus")
93+
94+
// Setup signal handlers for graceful shutdown
95+
setupSignalHandlers()
96+
}
97+
98+
deinit {
99+
// Unregister this instance
100+
PostgreSQLCorpus.unregisterInstance(self)
101+
102+
// Commit any pending batches when the corpus is deallocated
103+
Task {
104+
await commitPendingBatches()
105+
}
106+
}
107+
108+
109+
// MARK: - Signal Handling and Early Exit
110+
111+
private func setupSignalHandlers() {
112+
// Handle SIGINT (Ctrl+C) and SIGTERM for graceful shutdown
113+
// Note: Signal handling is simplified for cross-platform compatibility
114+
// The deinit method will handle cleanup when the object is deallocated
115+
}
116+
117+
// Instance tracking for cleanup (simplified without signal handling)
118+
private static var allInstances: [PostgreSQLCorpus] = []
119+
private static let instancesLock = NSLock()
120+
121+
private static func registerInstance(_ instance: PostgreSQLCorpus) {
122+
instancesLock.lock()
123+
defer { instancesLock.unlock() }
124+
allInstances.append(instance)
125+
}
126+
127+
private static func unregisterInstance(_ instance: PostgreSQLCorpus) {
128+
instancesLock.lock()
129+
defer { instancesLock.unlock() }
130+
allInstances.removeAll { $0 === instance }
131+
}
132+
133+
private func commitPendingBatches() async {
134+
guard let fuzzerId = fuzzerId else { return }
135+
136+
// Commit pending executions
137+
executionBatchLock.lock()
138+
let queuedExecutions = pendingExecutions
139+
pendingExecutions.removeAll()
140+
executionBatchLock.unlock()
141+
142+
if !queuedExecutions.isEmpty {
143+
do {
144+
try await processExecutionBatch(queuedExecutions)
145+
// logger.debug("Committed \(queuedExecutions.count) pending executions on exit")
146+
} catch {
147+
logger.error("Failed to commit pending executions on exit: \(error)")
148+
}
149+
}
90150
}
91151

92152
override func initialize() {
153+
// Register this instance for signal handling
154+
PostgreSQLCorpus.registerInstance(self)
155+
93156
// Initialize database pool and register fuzzer (only once)
94157
Task {
95158
do {
96159
try await databasePool.initialize()
97-
logger.info("Database pool initialized successfully")
160+
// logger.debug("Database pool initialized successfully")
98161

99162
// Register this fuzzer instance in the database (only once)
100163
if !fuzzerRegistered {
101164
do {
102165
let id = try await registerFuzzerWithRetry()
103166
fuzzerId = id
104167
fuzzerRegistered = true
105-
logger.info("Fuzzer registered in database with ID: \(id)")
168+
// logger.debug("Fuzzer registered in database with ID: \(id)")
106169
} catch {
107170
logger.error("Failed to register fuzzer after retries: \(error)")
108-
logger.info("Fuzzer will continue without database registration - executions will be queued")
171+
// logger.debug("Fuzzer will continue without database registration - executions will be queued")
109172
}
110173
}
111174

@@ -164,15 +227,15 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
164227

165228
// Schedule periodic synchronization with PostgreSQL
166229
fuzzer.timers.scheduleTask(every: syncInterval, syncWithDatabase)
167-
logger.info("Scheduled database sync every \(syncInterval) seconds")
230+
// logger.debug("Scheduled database sync every \(syncInterval) seconds")
168231

169232
// Schedule periodic flush of execution batch
170233
fuzzer.timers.scheduleTask(every: 5.0, flushExecutionBatch)
171-
logger.info("Scheduled execution batch flush every 5 seconds")
234+
// logger.debug("Scheduled execution batch flush every 5 seconds")
172235

173236
// Schedule periodic retry of fuzzer registration if it failed
174237
fuzzer.timers.scheduleTask(every: 30.0, retryFuzzerRegistration)
175-
logger.info("Scheduled fuzzer registration retry every 30 seconds")
238+
// logger.debug("Scheduled fuzzer registration retry every 30 seconds")
176239

177240
// Schedule cleanup task (similar to BasicCorpus)
178241
if !fuzzer.config.staticCorpus {
@@ -233,7 +296,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
233296
return
234297
}
235298

236-
logger.info("Processing batch of \(batch.count) executions")
299+
// logger.debug("Processing batch of \(batch.count) executions")
237300

238301
for (program, aspects, executionType) in batch {
239302
do {
@@ -258,14 +321,14 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
258321
coverage: aspects is CovEdgeSet ? Double((aspects as! CovEdgeSet).count) : 0.0
259322
)
260323

261-
logger.info("Stored execution in database: programHash=\(programHash), executionId=\(executionId)")
324+
// logger.debug("Stored execution in database: programHash=\(programHash), executionId=\(executionId)")
262325

263326
} catch {
264327
logger.error("Failed to store execution in database: \(error)")
265328
}
266329
}
267330

268-
logger.info("Completed processing batch of \(batch.count) executions")
331+
// logger.debug("Completed processing batch of \(batch.count) executions")
269332
}
270333

271334
/// Flush any pending executions in the batch
@@ -276,7 +339,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
276339
executionBatchLock.unlock()
277340

278341
if !batch.isEmpty {
279-
logger.info("Flushing \(batch.count) pending executions")
342+
// logger.debug("Flushing \(batch.count) pending executions")
280343
Task {
281344
await processExecutionBatch(batch)
282345
}
@@ -292,7 +355,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
292355
let id = try await registerFuzzerWithRetry()
293356
fuzzerId = id
294357
fuzzerRegistered = true
295-
logger.info("Successfully registered fuzzer on retry with ID: \(id)")
358+
// logger.debug("Successfully registered fuzzer on retry with ID: \(id)")
296359

297360
// Process any queued executions
298361
executionBatchLock.lock()
@@ -301,7 +364,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
301364
executionBatchLock.unlock()
302365

303366
if !queuedExecutions.isEmpty {
304-
logger.info("Processing \(queuedExecutions.count) queued executions after successful registration")
367+
// logger.debug("Processing \(queuedExecutions.count) queued executions after successful registration")
305368
await processExecutionBatch(queuedExecutions)
306369
}
307370

@@ -352,8 +415,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
352415
// Mark for database sync
353416
markForSync(programHash)
354417

355-
logger.info("Added program to PostgreSQL corpus: hash=\(programHash), size=\(program.size), total=\(programs.count)")
356-
logger.info("Program marked for sync. Pending sync operations: \(pendingSyncOperations.count)")
418+
// Program added to corpus silently for performance
357419
}
358420

359421
public func randomElementForSplicing() -> Program {
@@ -400,7 +462,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
400462
defer { cacheLock.unlock() }
401463

402464
let res = try encodeProtobufCorpus(Array(programs))
403-
logger.info("Successfully serialized \(programs.count) programs from PostgreSQL corpus")
465+
// logger.debug("Successfully serialized \(programs.count) programs from PostgreSQL corpus")
404466
return res
405467
}
406468

@@ -419,14 +481,14 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
419481
addInternal(program)
420482
}
421483

422-
logger.info("Imported \(newPrograms.count) programs into PostgreSQL corpus")
484+
// logger.debug("Imported \(newPrograms.count) programs into PostgreSQL corpus")
423485
}
424486

425487
// MARK: - Database Operations
426488

427489
/// Load initial corpus from PostgreSQL database
428490
private func loadInitialCorpus() async {
429-
logger.info("Loading initial corpus from PostgreSQL...")
491+
// logger.debug("Loading initial corpus from PostgreSQL...")
430492

431493
guard let fuzzerId = fuzzerId else {
432494
logger.warning("Cannot load initial corpus: fuzzer not registered")
@@ -442,7 +504,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
442504
limit: maxSize
443505
)
444506

445-
logger.info("Found \(recentPrograms.count) recent programs to resume")
507+
// logger.debug("Found \(recentPrograms.count) recent programs to resume")
446508

447509
// Add programs to the corpus
448510
cacheLock.lock()
@@ -466,16 +528,16 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
466528
totalEntryCounter += 1
467529
}
468530

469-
logger.info("Resumed PostgreSQL corpus with \(programs.count) programs")
531+
// logger.debug("Resumed PostgreSQL corpus with \(programs.count) programs")
470532

471533
// If we have no programs, we need at least one to avoid empty corpus
472534
if programs.count == 0 {
473-
logger.info("No programs found to resume, corpus will start empty")
535+
// logger.debug("No programs found to resume, corpus will start empty")
474536
}
475537

476538
} catch {
477539
logger.error("Failed to load initial corpus from PostgreSQL: \(error)")
478-
logger.info("Corpus will start empty and build up from scratch")
540+
// logger.debug("Corpus will start empty and build up from scratch")
479541
}
480542
}
481543

@@ -498,7 +560,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
498560

499561
guard !hashesToSync.isEmpty else { return }
500562

501-
logger.info("Syncing \(hashesToSync.count) programs with PostgreSQL...")
563+
// Syncing programs with PostgreSQL silently
502564

503565
// Get programs to sync from cache
504566
cacheLock.lock()
@@ -524,7 +586,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
524586
metadata: metadata
525587
)
526588

527-
logger.info("Successfully synced program to database: \(programHash)")
589+
// Program synced to database silently
528590

529591
} catch {
530592
logger.error("Failed to sync program to database: \(error)")
@@ -535,7 +597,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
535597
}
536598
}
537599

538-
logger.info("Database sync completed for \(programsToSync.count) programs")
600+
// Database sync completed silently
539601
}
540602

541603
/// Register fuzzer with retry logic
@@ -549,12 +611,12 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
549611

550612
for attempt in 1...maxRetries {
551613
do {
552-
logger.info("Attempting to register fuzzer (attempt \(attempt)/\(maxRetries))")
614+
// logger.debug("Attempting to register fuzzer (attempt \(attempt)/\(maxRetries))")
553615
let id = try await storage.registerFuzzer(
554616
name: fuzzerName,
555617
engineType: engineType
556618
)
557-
logger.info("Successfully registered fuzzer with ID: \(id)")
619+
// logger.debug("Successfully registered fuzzer with ID: \(id)")
558620
return id
559621
} catch {
560622
lastError = error
@@ -584,8 +646,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
584646
do {
585647
// Use the registered fuzzer ID
586648
guard let fuzzerId = fuzzerId else {
587-
logger.error("Cannot store execution: fuzzer not registered")
588-
return
649+
return // Silent fail for performance
589650
}
590651

591652
// Store the program in the program table
@@ -612,10 +673,10 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
612673
fuzzout: executionData.fuzzout
613674
)
614675

615-
logger.info("Stored execution with cached data: programHash=\(programHash), executionId=\(executionId), execTime=\(executionData.execTime), outcome=\(executionData.outcome)")
676+
// No logging for performance - just store silently
616677

617678
} catch {
618-
logger.error("Failed to store execution with cached data: \(error)")
679+
// Silent fail for performance - errors are not critical for fuzzing
619680
}
620681
}
621682

@@ -648,7 +709,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
648709
coverage: aspects is CovEdgeSet ? Double((aspects as! CovEdgeSet).count) : 0.0
649710
)
650711

651-
logger.info("Stored execution with metadata: programHash=\(programHash), executionId=\(executionId), execTime=\(execution.execTime), outcome=\(execution.outcome)")
712+
// logger.debug("Stored execution with metadata: programHash=\(programHash), executionId=\(executionId), execTime=\(execution.execTime), outcome=\(execution.outcome)")
652713

653714
} catch {
654715
logger.error("Failed to store execution with metadata: \(error)")
@@ -685,7 +746,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
685746
coverage: aspects is CovEdgeSet ? Double((aspects as! CovEdgeSet).count) : 0.0
686747
)
687748

688-
logger.info("Stored execution in database: programHash=\(programHash), executionId=\(executionId)")
749+
// logger.debug("Stored execution in database: programHash=\(programHash), executionId=\(executionId)")
689750

690751
} catch {
691752
logger.error("Failed to store execution in database: \(error)")
@@ -758,7 +819,7 @@ public class PostgreSQLCorpus: ComponentBase, Corpus {
758819
}
759820
}
760821

761-
logger.info("PostgreSQL corpus cleanup finished: \(self.programs.count) -> \(newPrograms.count)")
822+
// logger.debug("PostgreSQL corpus cleanup finished: \(self.programs.count) -> \(newPrograms.count)")
762823
programs = newPrograms
763824
ages = newAges
764825
programHashes = newHashes

0 commit comments

Comments
 (0)