1313
1414import json
1515import logging
16- from datetime import datetime
16+ from datetime import datetime , timezone
1717from pathlib import Path
1818from typing import Any , Literal , Optional
1919
@@ -43,7 +43,7 @@ def record_feedback(
4343 feedback : Literal ["tp" , "fp" ],
4444 reason : str ,
4545 finding_details : Optional [dict [str , Any ]] = None ,
46- user : str = "user"
46+ user : str = "user" ,
4747 ) -> bool :
4848 """
4949 Store feedback for future model improvement
@@ -69,7 +69,7 @@ def record_feedback(
6969 "feedback_label" : "true_positive" if feedback == "tp" else "false_positive" ,
7070 "reason" : reason ,
7171 "user" : user ,
72- "timestamp" : datetime .utcnow ( ).isoformat (),
72+ "timestamp" : datetime .now ( tz = timezone . utc ).isoformat (),
7373 }
7474
7575 # Include finding details if provided
@@ -86,21 +86,15 @@ def record_feedback(
8686 with open (self .feedback_file , "a" ) as f :
8787 f .write (json .dumps (feedback_entry ) + "\n " )
8888
89- logger .info (
90- f"Recorded feedback: finding={ finding_id } , "
91- f"feedback={ feedback } , reason='{ reason [:50 ]} ...'"
92- )
89+ logger .info (f"Recorded feedback: finding={ finding_id } , feedback={ feedback } , reason='{ reason [:50 ]} ...'" )
9390
9491 return True
9592
9693 except Exception as e :
9794 logger .error (f"Failed to record feedback: { e } " )
9895 return False
9996
100- def get_all_feedback (
101- self ,
102- feedback_type : Optional [Literal ["tp" , "fp" ]] = None
103- ) -> list [dict [str , Any ]]:
97+ def get_all_feedback (self , feedback_type : Optional [Literal ["tp" , "fp" ]] = None ) -> list [dict [str , Any ]]:
10498 """
10599 Retrieve all feedback entries
106100
@@ -157,12 +151,7 @@ def get_feedback_by_id(self, finding_id: str) -> Optional[dict[str, Any]]:
157151
158152 return None
159153
160- def get_similar_findings (
161- self ,
162- finding_type : str ,
163- scanner : str ,
164- limit : int = 5
165- ) -> list [dict [str , Any ]]:
154+ def get_similar_findings (self , finding_type : str , scanner : str , limit : int = 5 ) -> list [dict [str , Any ]]:
166155 """
167156 Retrieve past feedback for similar findings (for few-shot prompting)
168157
@@ -185,24 +174,15 @@ def get_similar_findings(
185174 similar = []
186175 for entry in all_feedback :
187176 finding = entry .get ("finding" , {})
188- if (finding .get ("scanner" ) == scanner and
189- finding .get ("finding_type" ) == finding_type ):
177+ if finding .get ("scanner" ) == scanner and finding .get ("finding_type" ) == finding_type :
190178 similar .append (entry )
191179
192180 # Sort by timestamp (most recent first)
193- similar .sort (
194- key = lambda x : x .get ("timestamp" , "" ),
195- reverse = True
196- )
181+ similar .sort (key = lambda x : x .get ("timestamp" , "" ), reverse = True )
197182
198183 return similar [:limit ]
199184
200- def generate_few_shot_examples (
201- self ,
202- finding_type : str ,
203- scanner : str ,
204- max_examples : int = 3
205- ) -> str :
185+ def generate_few_shot_examples (self , finding_type : str , scanner : str , max_examples : int = 3 ) -> str :
206186 """
207187 Generate few-shot prompt examples from historical feedback
208188
@@ -229,9 +209,9 @@ def generate_few_shot_examples(
229209
230210 example = f"""
231211Example { i } :
232- Finding Type: { finding .get (' finding_type' , ' unknown' )}
233- Scanner: { finding .get (' scanner' , ' unknown' )}
234- Description: { finding .get (' description' , '' )[:150 ]}
212+ Finding Type: { finding .get (" finding_type" , " unknown" )}
213+ Scanner: { finding .get (" scanner" , " unknown" )}
214+ Description: { finding .get (" description" , "" )[:150 ]}
235215User Feedback: { feedback_label .upper ()}
236216Reason: { reason }
237217"""
@@ -344,19 +324,16 @@ def export_for_training(self, output_file: str) -> bool:
344324 "messages" : [
345325 {
346326 "role" : "system" ,
347- "content" : "You are a security analysis expert. Evaluate if findings are true positives or false positives."
327+ "content" : "You are a security analysis expert. Evaluate if findings are true positives or false positives." ,
348328 },
349329 {
350330 "role" : "user" ,
351331 "content" : f"Finding Type: { finding .get ('finding_type' )} \n "
352- f"Scanner: { finding .get ('scanner' )} \n "
353- f"Description: { finding .get ('description' )} \n "
354- f"Is this a true positive or false positive?"
332+ f"Scanner: { finding .get ('scanner' )} \n "
333+ f"Description: { finding .get ('description' )} \n "
334+ f"Is this a true positive or false positive?" ,
355335 },
356- {
357- "role" : "assistant" ,
358- "content" : f"{ feedback_label .upper ()} : { reason } "
359- }
336+ {"role" : "assistant" , "content" : f"{ feedback_label .upper ()} : { reason } " },
360337 ]
361338 }
362339
@@ -392,25 +369,16 @@ def main():
392369 """CLI interface for feedback management"""
393370 import argparse
394371
395- parser = argparse .ArgumentParser (
396- description = "Manage user feedback on security findings"
397- )
398- parser .add_argument (
399- "--feedback-dir" ,
400- default = ".argus/feedback" ,
401- help = "Feedback directory path"
402- )
372+ parser = argparse .ArgumentParser (description = "Manage user feedback on security findings" )
373+ parser .add_argument ("--feedback-dir" , default = ".argus/feedback" , help = "Feedback directory path" )
403374
404375 subparsers = parser .add_subparsers (dest = "command" , help = "Command to execute" )
405376
406377 # Record feedback
407378 record_parser = subparsers .add_parser ("record" , help = "Record feedback for a finding" )
408379 record_parser .add_argument ("finding_id" , help = "Finding ID" )
409380 record_parser .add_argument (
410- "--mark" ,
411- choices = ["tp" , "fp" ],
412- required = True ,
413- help = "Mark as true positive (tp) or false positive (fp)"
381+ "--mark" , choices = ["tp" , "fp" ], required = True , help = "Mark as true positive (tp) or false positive (fp)"
414382 )
415383 record_parser .add_argument ("--reason" , required = True , help = "Reason for feedback" )
416384
@@ -423,11 +391,7 @@ def main():
423391
424392 # List feedback
425393 list_parser = subparsers .add_parser ("list" , help = "List all feedback" )
426- list_parser .add_argument (
427- "--type" ,
428- choices = ["tp" , "fp" ],
429- help = "Filter by feedback type"
430- )
394+ list_parser .add_argument ("--type" , choices = ["tp" , "fp" ], help = "Filter by feedback type" )
431395
432396 args = parser .parse_args ()
433397
@@ -438,11 +402,7 @@ def main():
438402 collector = FeedbackCollector (args .feedback_dir )
439403
440404 if args .command == "record" :
441- success = collector .record_feedback (
442- finding_id = args .finding_id ,
443- feedback = args .mark ,
444- reason = args .reason
445- )
405+ success = collector .record_feedback (finding_id = args .finding_id , feedback = args .mark , reason = args .reason )
446406 if success :
447407 print (f"✅ Recorded feedback: { args .mark .upper ()} for finding { args .finding_id } " )
448408 else :
@@ -463,8 +423,9 @@ def main():
463423 print ("\n By Scanner:" )
464424 for scanner , scanner_stats in stats ["by_scanner" ].items ():
465425 fp_rate = (scanner_stats ["fp" ] / scanner_stats ["total" ] * 100 ) if scanner_stats ["total" ] > 0 else 0
466- print (f" { scanner :20s} : { scanner_stats ['total' ]:3d} total, "
467- f"{ scanner_stats ['fp' ]:3d} FP ({ fp_rate :.0f} %)" )
426+ print (
427+ f" { scanner :20s} : { scanner_stats ['total' ]:3d} total, { scanner_stats ['fp' ]:3d} FP ({ fp_rate :.0f} %)"
428+ )
468429
469430 if stats .get ("recent_feedback" ):
470431 print ("\n Recent Feedback:" )
@@ -484,7 +445,7 @@ def main():
484445 elif args .command == "list" :
485446 feedback_list = collector .get_all_feedback (feedback_type = args .type )
486447
487- print (f"\n { '=' * 80 } " )
448+ print (f"\n { '=' * 80 } " )
488449 print (f"FEEDBACK LIST ({ len (feedback_list )} entries)" )
489450 print ("=" * 80 )
490451
0 commit comments