Skip to content

Commit 6757e45

Browse files
committed
update batching, update registration semaphore, log message
1 parent 79456b4 commit 6757e45

3 files changed

Lines changed: 119 additions & 145 deletions

File tree

Sources/Fuzzilli/Database/PostgresSQLStorage.swift

Lines changed: 108 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -444,89 +444,74 @@ public actor PostgresSQLStorage {
444444
}
445445
}
446446

447+
private let maxBatchSize = 500
448+
447449
public func storeProgramsBatch(programInputs: [ProgramInput], fuzzerId: Int) async throws -> [String] {
448450
guard !programInputs.isEmpty else { return [] }
449451

450452
return try await retryOnDeadlock {
451453
try await self._storeProgramsBatchImpl(programInputs: programInputs, fuzzerId: fuzzerId)
452454
}
453455
}
454-
456+
455457
private func _storeProgramsBatchImpl(programInputs: [ProgramInput], fuzzerId: Int) async throws -> [String] {
456-
// Sort by hash - ensures all workers acquire locks in the same order
457-
let sortedProgramInputs = programInputs.sorted { $0.programHash < $1.programHash }
458+
// Sort by hash so all workers acquire locks in the same order
459+
let sortedInputs = programInputs.sorted { $0.programHash < $1.programHash }
458460

459461
return try await databasePool.withConnection { connection in
460-
var programHashes: [String] = []
462+
var allHashes: [String] = []
461463

462-
try await connection.query("BEGIN", logger:self.logger)
464+
for chunk in sortedInputs.chunked(into: self.maxBatchSize) {
465+
let hashes = try await self.insertProgramChunk(chunk, fuzzerId: fuzzerId, connection: connection)
466+
allHashes.append(contentsOf: hashes)
467+
}
468+
return allHashes
469+
}
470+
}
463471

464-
do {
465-
var insertedCount = 0
466-
var skippedCount = 0
467-
468-
for input in sortedProgramInputs {
469-
let programHash = input.programHash
470-
let mutatorNames = input.mutatorNames
471-
let contributorNames = input.contributorNames
472-
473-
// Format as PostgreSQL arrays: ARRAY['name1', 'name2', ...]
474-
let mutatorsArray: String
475-
if !mutatorNames.isEmpty {
476-
let escapedMutators = mutatorNames.map { "'\($0.replacingOccurrences(of: "'", with: "''"))'" }
477-
mutatorsArray = "ARRAY[\(escapedMutators.joined(separator: ", "))]"
478-
} else {
479-
mutatorsArray = "NULL"
480-
}
472+
private func insertProgramChunk(_ chunk: [ProgramInput], fuzzerId: Int, connection: PostgresConnection) async throws -> [String] {
473+
guard !chunk.isEmpty else { return [] }
481474

482-
let contributorsArray: String
483-
if !contributorNames.isEmpty {
484-
let escapedContributors = contributorNames.map { "'\($0.replacingOccurrences(of: "'", with: "''"))'" }
485-
contributorsArray = "ARRAY[\(escapedContributors.joined(separator: ", "))]"
486-
} else {
487-
contributorsArray = "NULL"
488-
}
475+
var bindings = PostgresBindings()
476+
var valueClauses: [String] = []
477+
var i = 1
489478

490-
// Use ON CONFLICT DO NOTHING with RETURNING to detect if insert succeeded
491-
// First worker to insert wins, others skip gracefully
492-
// Single unified INSERT into the program table (previously split between fuzzer and program tables)
493-
let programQuery = PostgresQuery(stringLiteral: """
494-
INSERT INTO program (program_hash, fuzzer_id, inserted_at, program_base64, created_at, source_mutators, contributors, parent_program_hash)
495-
VALUES ('\(programHash)', \(fuzzerId), NOW(), '\(input.programBase64)', NOW(), \(mutatorsArray), \(contributorsArray), \(input.parentHash != nil ? "'\(input.parentHash!)'" : "NULL"))
496-
ON CONFLICT (program_hash) DO NOTHING
497-
RETURNING program_hash
498-
""")
479+
for input in chunk {
480+
let clause = "($\(i), $\(i+1), NOW(), $\(i+2), NOW(), $\(i+3), $\(i+4), $\(i+5))"
481+
valueClauses.append(clause)
499482

500-
let programResult = try await connection.query(programQuery, logger:self.logger)
501-
let programRows = try await programResult.collect()
502-
503-
// If RETURNING gave us a row, the insert succeeded
504-
// If no rows, it was skipped due to conflict
505-
if !programRows.isEmpty {
506-
programHashes.append(programHash)
507-
insertedCount += 1
508-
} else {
509-
skippedCount += 1
510-
}
511-
}
512-
513-
try await connection.query("COMMIT", logger:self.logger)
483+
bindings.append(input.programHash)
484+
bindings.append(fuzzerId)
485+
bindings.append(input.programBase64)
514486

515-
if self.enableLogging {
516-
self.logger.info("Successfully batch stored \(insertedCount) programs in database (\(skippedCount) skipped as duplicates)")
517-
}
487+
let mutatorStr = input.mutatorNames.isEmpty ? nil : "{" + input.mutatorNames.joined(separator: ",") + "}"
488+
bindings.append(mutatorStr)
518489

519-
return programHashes
490+
let contributorStr = input.contributorNames.isEmpty ? nil : "{" + input.contributorNames.joined(separator: ",") + "}"
491+
bindings.append(contributorStr)
520492

521-
} catch {
522-
_ = try? await connection.query("ROLLBACK", logger:self.logger)
523-
524-
if self.enableLogging {
525-
self.logger.error("Failed to batch store programs: \(String(reflecting: error))")
526-
}
527-
throw error
528-
}
493+
bindings.append(input.parentHash)
494+
i += 6
495+
}
496+
497+
let sql = """
498+
INSERT INTO program (
499+
program_hash, fuzzer_id, inserted_at,
500+
program_base64, created_at, source_mutators,
501+
contributors, parent_program_hash)
502+
VALUES \(valueClauses.joined(separator: ",\n "))
503+
ON CONFLICT (program_hash) DO NOTHING
504+
RETURNING program_hash;
505+
"""
506+
507+
let query = PostgresQuery(unsafeSQL: sql, binds: bindings)
508+
let result = try await connection.query(query, logger: self.logger)
509+
510+
var hashes: [String] = []
511+
for try await (hash,) in result.decode((String).self) {
512+
hashes.append(hash)
529513
}
514+
return hashes
530515
}
531516

532517
public func storeExecutionsBatch(executions: [ExecutionInput]) async throws -> [Int] {
@@ -538,55 +523,62 @@ public actor PostgresSQLStorage {
538523
}
539524

540525
private func _storeExecutionsBatchImpl(executions: [ExecutionInput]) async throws -> [Int] {
526+
let sortedExecutions = executions.sorted { $0.programHash < $1.programHash }
527+
541528
return try await databasePool.withConnection { connection in
542529
var executionIds: [Int] = []
543-
544-
try await connection.query("BEGIN", logger:self.logger)
545-
546-
do {
547-
// Sort by program_hash for consistent lock ordering
548-
let sortedExecutions = executions.sorted { $0.programHash < $1.programHash }
549-
550-
for execution in sortedExecutions {
551-
let query: PostgresQuery = """
552-
INSERT INTO execution (
553-
program_hash, execution_outcome_id, coverage_total,
554-
edges_found, total_edges, is_new_edge,
555-
stdout, stderr, fuzzout,
556-
turbofan_optimization_bits, feedback_nexus_count, created_at
557-
) VALUES (
558-
\(execution.programHash), \(execution.executionOutcomeId), \(execution.coverageTotal),
559-
\(execution.edgesFound), \(execution.totalEdges), \(execution.isNewEdge),
560-
\(execution.stdout), \(execution.stderr), \(execution.fuzzout),
561-
\(execution.turbofanOptimizationBits), \(execution.feedbackNexusCount), NOW()
562-
) RETURNING execution_id
563-
"""
564-
565-
let result = try await connection.query(query, logger:self.logger)
566530

567-
for row in try await result.collect() {
568-
let id = try row.decode(Int.self)
569-
executionIds.append(id)
570-
}
571-
}
572-
573-
try await connection.query("COMMIT", logger:self.logger)
574-
575-
if self.enableLogging {
576-
self.logger.info("Successfully batch stored \(executionIds.count) executions")
577-
}
578-
579-
return executionIds
580-
581-
} catch {
582-
_ = try? await connection.query("ROLLBACK", logger:self.logger)
583-
if self.enableLogging {
584-
self.logger.error("Failed to batch store executions: \(String(reflecting: error))")
585-
}
586-
throw error
531+
for chunk in sortedExecutions.chunked(into: self.maxBatchSize) {
532+
let ids = try await self.insertExecutionChunk(chunk, connection: connection)
533+
executionIds.append(contentsOf: ids)
587534
}
535+
return executionIds
588536
}
589-
}
537+
}
538+
539+
private func insertExecutionChunk(_ chunk: [ExecutionInput], connection: PostgresConnection) async throws -> [Int] {
540+
guard !chunk.isEmpty else { return [] }
541+
542+
var bindings = PostgresBindings()
543+
var valueClauses: [String] = []
544+
var i = 1
545+
546+
for execution in chunk {
547+
let clause = "($\(i), $\(i+1), $\(i+2), $\(i+3), $\(i+4), $\(i+5), $\(i+6), $\(i+7), $\(i+8), $\(i+9), $\(i+10), NOW())"
548+
valueClauses.append(clause)
549+
bindings.append(execution.programHash)
550+
bindings.append(execution.executionOutcomeId)
551+
bindings.append(execution.coverageTotal)
552+
bindings.append(execution.edgesFound)
553+
bindings.append(execution.totalEdges)
554+
bindings.append(execution.isNewEdge)
555+
bindings.append(execution.stdout)
556+
bindings.append(execution.stderr)
557+
bindings.append(execution.fuzzout)
558+
bindings.append(execution.turbofanOptimizationBits)
559+
bindings.append(execution.feedbackNexusCount)
560+
i += 11
561+
}
562+
563+
let sql = """
564+
INSERT INTO execution (
565+
program_hash, execution_outcome_id, coverage_total,
566+
edges_found, total_edges, is_new_edge,
567+
stdout, stderr, fuzzout,
568+
turbofan_optimization_bits, feedback_nexus_count, created_at)
569+
VALUES \(valueClauses.joined(separator: ",\n "))
570+
RETURNING execution_id;
571+
"""
572+
573+
let query = PostgresQuery(unsafeSQL: sql, binds: bindings)
574+
let result = try await connection.query(query, logger: self.logger)
575+
576+
var ids: [Int] = []
577+
for try await (id,) in result.decode((Int).self) {
578+
ids.append(id)
579+
}
580+
return ids
581+
}
590582

591583
/// Retry wrapper for deadlock handling
592584
/// Detects PostgreSQL deadlock errors (40P01) and retries with exponential backoff
@@ -829,4 +821,12 @@ public actor PostgresSQLStorage {
829821
}
830822
}
831823
}
824+
825+
}
826+
extension Array {
827+
func chunked(into size: Int) -> [[Element]] {
828+
stride(from: 0, to: count, by: size).map {
829+
Array(self[$0..<Swift.min($0 + size, count)])
830+
}
831+
}
832832
}

Sources/Fuzzilli/Fuzzer.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -949,9 +949,10 @@ public class Fuzzer {
949949

950950
// Condition 3: Corpus size-based limit
951951
// If we have a large corpus (e.g., from PostgreSQL sync), we should move to fuzzing
952-
if corpus.size >= 1500 {
952+
let ceiling = 1500
953+
if corpus.size >= ceiling {
953954
shouldExitCorpusGeneration = true
954-
exitReason = "corpus size threshold reached (\(corpus.size) samples)"
955+
exitReason = "corpus size threshold reached (\(ceiling) samples)"
955956
}
956957

957958
if shouldExitCorpusGeneration {

Sources/Fuzzilli/Modules/PostgreSQLSync.swift

Lines changed: 8 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -40,56 +40,29 @@ public class PostgreSQLSync: Module {
4040
}
4141

4242
// Register fuzzer synchronously before setting up event listeners
43-
// This ensures cachedFuzzerId is set before any events start firing
44-
// Use a semaphore to block until registration completes
45-
let registrationSemaphore = DispatchSemaphore(value: 0)
46-
43+
let semaphore = DispatchSemaphore(value: 0)
4744
Task {
4845
do {
49-
// Sleep for a random amount of time to avoid stampedes
50-
let jitter = TimeInterval.random(in: 0...60)
51-
try? await Task.sleep(nanoseconds: UInt64(jitter * 1_000_000_000))
52-
53-
// Step 1: Register this worker with the database (blocking via semaphore)
5446
let engineArgs = fuzzer.config.arguments
5547
self.cachedFuzzerId = try await storage.registerFuzzer(engineArguments: engineArgs)
5648
if enableLogging {
5749
logger.info("Fuzzer registered with PostgreSQL database: fuzzerId \(self.cachedFuzzerId ?? -1)")
5850
}
59-
60-
// Registration complete
61-
registrationSemaphore.signal()
51+
semaphore.signal()
6252

63-
// Step 2: Sync corpus from database to in-memory basicCorpus (can be async)
53+
// Sync corpus from database to in-memory basicCorpus
54+
let jitter = TimeInterval.random(in: 0...60)
55+
try? await Task.sleep(nanoseconds: UInt64(jitter * 1_000_000_000))
6456
let programs = try await storage.syncCorpusFromDatabase()
65-
logger.info("Found: \(programs.count) programs from db")
66-
if enableLogging {
67-
logger.info("Syncing \(programs.count) programs from database to corpus")
68-
}
69-
70-
// Import each program into the fuzzer's corpus
7157
for program in programs {
72-
fuzzer.async {
73-
fuzzer.importProgram(program, origin: .corpusImport(mode: .databaseSync), enableDropout: false)
74-
}
75-
}
76-
77-
if enableLogging {
78-
logger.info("Corpus synchronization complete: imported \(programs.count) programs")
58+
fuzzer.async { fuzzer.importProgram(program, origin: .corpusImport(mode: .databaseSync), enableDropout: false) }
7959
}
8060
} catch {
8161
logger.error("Failed to register fuzzer with PostgreSQL database: \(String(reflecting: error))")
82-
// Signal even on error so we don't deadlock
83-
registrationSemaphore.signal()
62+
semaphore.signal()
8463
}
8564
}
86-
87-
// Wait for registration to complete before continuing with event listener setup
88-
registrationSemaphore.wait()
89-
90-
if enableLogging {
91-
logger.info("Fuzzer registration complete, proceeding with event listener setup")
92-
}
65+
semaphore.wait()
9366

9467
// Track program ID for each execution to correlate with outputs
9568
var currentExecutingProgramId: String? = nil

0 commit comments

Comments
 (0)