2424INITIAL_OPTIMIZER_POINTS = settings_var ("fuzzer.initial_optimizer_points" , 25 )
2525MIN_FAILURE_SAMPLES = settings_var ("fuzzer.min_failure_samples" , 5 )
2626FAILURE_RATE_THRESHOLD = settings_var ("fuzzer.failure_rate_threshold" , 0.5 )
27+ FAILURES_CSV_PATH = settings_var ("fuzzer.failures_csv_path" , "failures.csv" )
28+ FULL_LOG_CSV_PATH = settings_var ("fuzzer.full_log_csv_path" , "full_scan_log.csv" )
29+ MAX_INJECTION_ATTEMPTS = settings_var ("fuzzer.max_injection_attempts" , 20 )
2730
2831
2932async def generate_prompts (
@@ -111,9 +114,7 @@ async def process_prompt(
111114
112115 if response .status_code >= 400 :
113116 logger .error (f"HTTP { response .status_code } { response .content = } " )
114- fuzzer_state .add_error (
115- module_name , prompt , response .status_code , response .text
116- )
117+ fuzzer_state .add_error (module_name , prompt , response .status_code , response .text )
117118 return tokens , True
118119
119120 # Process successful response
@@ -123,9 +124,7 @@ async def process_prompt(
123124 # Check if the response indicates a refusal
124125 refused = refusal_heuristic (response .json ())
125126 if refused :
126- fuzzer_state .add_refusal (
127- module_name , prompt , response .status_code , response_text
128- )
127+ fuzzer_state .add_refusal (module_name , prompt , response .status_code , response_text )
129128
130129 fuzzer_state .add_output (module_name , prompt , response_text , refused )
131130 return tokens , refused
@@ -169,10 +168,7 @@ async def process_prompt_batch(
169168 - Total number of tokens processed.
170169 - Number of failed prompts.
171170 """
172- tasks = [
173- process_prompt (request_factory , p , tokens , module_name , fuzzer_state )
174- for p in prompts
175- ]
171+ tasks = [process_prompt (request_factory , p , tokens , module_name , fuzzer_state ) for p in prompts ]
176172 results = await asyncio .gather (* tasks )
177173 total_tokens = sum (r [0 ] for r in results )
178174 failures = sum (1 for r in results if r [1 ])
@@ -216,11 +212,7 @@ async def scan_module(
216212
217213 # Initialize optimizer if optimization is enabled
218214 optimizer = (
219- Optimizer (
220- [Real (0 , 1 )], base_estimator = "GP" , n_initial_points = INITIAL_OPTIMIZER_POINTS
221- )
222- if optimize
223- else None
215+ Optimizer ([Real (0 , 1 )], base_estimator = "GP" , n_initial_points = INITIAL_OPTIMIZER_POINTS ) if optimize else None
224216 )
225217
226218 module_size = 0 if module .lazy else len (module .prompts )
@@ -422,8 +414,8 @@ async def perform_single_shot_scan(
422414 processed_prompts += module_size
423415
424416 yield ScanResult .status_msg ("Scan completed." )
425- fuzzer_state .export_failures ("failures.csv" )
426- fuzzer_state .export_full_log ("full_scan_log.csv" )
417+ fuzzer_state .export_failures (FAILURES_CSV_PATH )
418+ fuzzer_state .export_full_log (FULL_LOG_CSV_PATH )
427419
428420
429421async def perform_many_shot_scan (
@@ -515,7 +507,7 @@ async def perform_many_shot_scan(
515507 tokens += prompt_tokens
516508
517509 injected = False
518- for _ in range (20 ):
510+ for _ in range (MAX_INJECTION_ATTEMPTS ):
519511 if injected :
520512 break
521513
@@ -552,14 +544,12 @@ async def perform_many_shot_scan(
552544 ).model_dump_json ()
553545
554546 if optimize and len (failure_rates ) >= MIN_FAILURE_SAMPLES :
555- yield ScanResult .status_msg (
556- f"High failure rate detected ({ failure_rate :.2%} ). Stopping this module..."
557- )
547+ yield ScanResult .status_msg (f"High failure rate detected ({ failure_rate :.2%} ). Stopping this module..." )
558548 break
559549
560550 yield ScanResult .status_msg ("Scan completed." )
561- fuzzer_state .export_failures ("failures.csv" )
562- fuzzer_state .export_full_log ("full_scan_log.csv" )
551+ fuzzer_state .export_failures (FAILURES_CSV_PATH )
552+ fuzzer_state .export_full_log (FULL_LOG_CSV_PATH )
563553
564554
565555def scan_router (
0 commit comments