1414from io import BufferedReader
1515from queue import Queue
1616from subprocess import Popen
17- from time import sleep
1817from typing import List
1918
20- from recoverpy .lib .helper import get_dd_output , decode_result
21- from recoverpy .lib .search .result_filter import ResultFilter
19+ from recoverpy .lib .helper import get_dd_output , decode_result , get_inode
2220from recoverpy .lib .search .thread_factory import (
2321 start_grep_process ,
2422 start_progress_monitoring_thread ,
25- start_result_formatter_thread ,
23+ start_result_converter_thread ,
2624 start_grep_stdout_consumer_thread ,
2725)
2826from recoverpy .log .logger import log
@@ -43,16 +41,10 @@ class SearchEngine:
4341 _seen_inodes : set [int ] = set ()
4442
4543 def __init__ (self , partition : str , searched_string : str ):
46- self ._initialize_search_components (partition , searched_string )
47- self .raw_grep_results_queue : Queue [bytes ] = Queue ()
48- self .formatted_results_queue : AsyncQueue [GrepResult ] = AsyncQueue ()
49-
50- def _initialize_search_components (
51- self , partition : str , searched_string : str
52- ) -> None :
5344 self .search_params = SearchParams (partition , searched_string )
5445 self .search_progress = SearchProgress ()
55- self .result_filter = ResultFilter (self .search_params )
46+ self .raw_grep_results_queue : Queue [bytes ] = Queue ()
47+ self .formatted_results_queue : AsyncQueue [GrepResult ] = AsyncQueue ()
5648
5749 async def start_search (self ) -> None :
5850 self ._start_grep_process ()
@@ -71,33 +63,75 @@ def _start_auxiliary_threads(self) -> None:
7163 start_grep_stdout_consumer_thread (
7264 _consume_grep_stdout , self ._grep_process , self .raw_grep_results_queue
7365 )
74- start_result_formatter_thread (self ._format_results )
66+ start_result_converter_thread (self ._convert_results )
7567 start_progress_monitoring_thread (self ._grep_process , self .search_progress )
7668
77- def _format_results (self ) -> None :
69+ def _convert_results (self ) -> None :
7870 """Initiate formatting and filtering of raw grep results.
79- A new event loop is created to avoid blocking the main App loop ."""
71+ A new event loop is created to interact with the async queue ."""
8072 loop = new_event_loop ()
8173 while True :
82- results = self .result_filter . filter_results (self .raw_grep_results_queue )
74+ results = self ._decode_new_results (self .raw_grep_results_queue )
8375 self ._process_new_results (results , loop )
8476 log .debug (f"search_engine - Dequeued { len (results )} results" )
85- sleep (0.1 )
77+
78+ def _decode_new_results (self , queue_object : Queue [bytes ]) -> List [str ]:
79+ """Consume raw grep results, filter out false positives if multiline and return decoded results."""
80+ queue_list : List [bytes ] = list (queue_object .queue )
81+ queue_size = len (queue_list )
82+ queue_object .queue .clear ()
83+ if queue_size == 0 :
84+ return []
85+
86+ decoded_results : List [str ] = [decode_result (r ) for r in queue_list ]
87+
88+ if self .search_params .is_multi_line :
89+ return self ._filter_multiline_results (decoded_results )
90+ return decoded_results
91+
92+ def _filter_multiline_results (self , results : List [str ]) -> List [str ]:
93+ """Filter out false positives from multiline results."""
94+ return [result for result in results if self ._is_result_valid (result )]
95+
96+ def _is_result_valid (self , result : str ) -> bool :
97+ """Check if result contains all searched lines."""
98+ inode = int (get_inode (result ))
99+ block_factor = self .search_params .block_size * 8
100+
101+ both_block_output = self ._get_combined_block_output (inode , block_factor )
102+ return all (
103+ line in both_block_output for line in self .search_params .searched_lines
104+ )
105+
106+ def _get_combined_block_output (self , inode : int , block_factor : int ) -> str :
107+ """Get combined output of current and next block."""
108+ block_index = inode // block_factor
109+ block_output = get_dd_output (
110+ self .search_params .partition , block_factor , block_index
111+ )
112+ next_block_output = get_dd_output (
113+ self .search_params .partition , block_factor , block_index + 1
114+ )
115+
116+ return decode_result (block_output ) + decode_result (next_block_output )
86117
87118 def _process_new_results (self , results : List [str ], loop : AbstractEventLoop ) -> None :
88- """Consumes filtered grep results , convert them into GrepResult objects
119+ """Consumes grep result strings , convert them into GrepResult objects
89120 and enqueues them into the formatted results queue for UI."""
90121 for result in results :
91122 grep_result = self ._create_grep_result (
92123 result , self .search_progress .result_count
93124 )
94125
95- if grep_result .inode not in self ._seen_inodes :
96- self ._seen_inodes .add (grep_result .inode )
97- loop .run_until_complete (self .formatted_results_queue .put (grep_result ))
98- self .search_progress .result_count += 1
126+ if grep_result .inode in self ._seen_inodes :
127+ continue
128+
129+ self ._seen_inodes .add (grep_result .inode )
130+ loop .run_until_complete (self .formatted_results_queue .put (grep_result ))
131+ self .search_progress .result_count += 1
99132
100133 def _create_grep_result (self , result : str , result_index : int ) -> GrepResult :
134+ """Create a GrepResult object from a raw grep result string."""
101135 grep_result = GrepResult (result )
102136 self ._configure_grep_result (grep_result , result_index )
103137 log .debug (f"search_engine - Created grep result { grep_result .inode } " )
@@ -129,4 +163,4 @@ def _fix_inode(self, inode: int) -> int:
129163 def _fix_line_start (self , line : str ) -> str :
130164 """Remove unnecessary characters from the start of the line to improve readability."""
131165 result_index : int = line .find (self .search_params .searched_lines [0 ])
132- return line [min (result_index , 15 ) :]
166+ return line [min (result_index , 15 ) :]
0 commit comments