33from asyncio import AbstractEventLoop
44from asyncio import Queue as AsyncQueue
55from asyncio import new_event_loop
6+ from io import BufferedReader
67from queue import Queue
78from subprocess import Popen
89from time import sleep
910from typing import List
1011
11- from recoverpy .lib .search .result_processor import ResultProcessor
12+ from recoverpy .lib .helper import get_dd_output , decode_result
13+ from recoverpy .lib .search .result_filter import ResultFilter
1214from recoverpy .lib .search .thread_factory import (
1315 start_grep_process ,
1416 start_progress_monitoring_thread ,
15- start_result_dequeue_thread ,
16- start_result_enqueue_thread ,
17+ start_result_formatter_thread ,
18+ start_grep_stdout_consumer_thread ,
1719)
1820from recoverpy .log .logger import log
1921from recoverpy .models .grep_result import GrepResult
2022from recoverpy .models .search_params import SearchParams
2123from recoverpy .models .search_progress import SearchProgress
2224
2325
26+ def _consume_grep_stdout (out : BufferedReader , queue : Queue [bytes ]) -> None :
27+ log .debug ("grep_consumer - Grep output enqueue thread started" )
28+ for line in iter (out .readline , b"" ):
29+ queue .put (line )
30+ out .close ()
31+
32+
2433class SearchEngine :
2534 _grep_process : Popen [bytes ]
2635 _seen_inodes : set [int ] = set ()
@@ -35,35 +44,37 @@ def _initialize_search_components(
3544 ) -> None :
3645 self .search_params = SearchParams (partition , searched_string )
3746 self .search_progress = SearchProgress ()
38- self .result_processor = ResultProcessor (self .search_params )
47+ self .result_processor = ResultFilter (self .search_params )
3948
4049 async def start_search (self ) -> None :
41- self ._initialize_grep_process ()
50+ self ._start_grep_process ()
4251 self ._start_auxiliary_threads ()
4352
4453 def stop_search (self ) -> None :
4554 self ._grep_process .kill ()
4655
47- def _initialize_grep_process (self ) -> None :
56+ def _start_grep_process (self ) -> None :
4857 self ._grep_process = start_grep_process (
4958 searched_string = self .search_params .searched_lines [0 ],
5059 partition = self .search_params .partition ,
5160 )
5261
5362 def _start_auxiliary_threads (self ) -> None :
54- start_result_enqueue_thread (self ._grep_process , self .results_queue )
55- start_result_dequeue_thread (self ._dequeue_results )
63+ start_grep_stdout_consumer_thread (
64+ _consume_grep_stdout , self ._grep_process , self .results_queue
65+ )
66+ start_result_formatter_thread (self ._format_results )
5667 start_progress_monitoring_thread (self ._grep_process , self .search_progress )
5768
58- def _dequeue_results (self ) -> None :
69+ def _format_results (self ) -> None :
5970 loop = new_event_loop ()
6071 while True :
61- results = self .result_processor .get_new_results (self .results_queue )
62- self ._add_new_results (results , loop )
72+ results = self .result_processor .filter_results (self .results_queue )
73+ self ._process_new_results (results , loop )
6374 log .debug (f"search_engine - Dequeued { len (results )} results" )
6475 sleep (0.1 )
6576
66- def _add_new_results (self , results : List [str ], loop : AbstractEventLoop ) -> None :
77+ def _process_new_results (self , results : List [str ], loop : AbstractEventLoop ) -> None :
6778 for result in results :
6879 grep_result = self ._create_grep_result (
6980 result , self .search_progress .result_count
@@ -83,8 +94,24 @@ def _create_grep_result(self, result: str, result_index: int) -> GrepResult:
8394 def _configure_grep_result (
8495 self , grep_result : GrepResult , result_index : int
8596 ) -> None :
86- grep_result .inode = self .result_processor . fix_inode (grep_result .inode )
87- grep_result .line = self .result_processor . fix_line_start (grep_result .line )
97+ grep_result .inode = self ._fix_inode (grep_result .inode )
98+ grep_result .line = self ._fix_line_start (grep_result .line )
8899 grep_result .css_class = (
89100 "grep-result-odd" if result_index % 2 == 0 else "grep-result-even"
90101 )
102+
103+ def _fix_line_start (self , line : str ) -> str :
104+ result_index : int = line .find (self .search_params .searched_lines [0 ])
105+ return line [min (result_index , 15 ) :]
106+
107+ def _fix_inode (self , inode : int ) -> int :
108+ inode //= self .search_params .block_size
109+
110+ for _ in range (10 ):
111+ dd_output = get_dd_output (
112+ self .search_params .partition , self .search_params .block_size , inode
113+ )
114+ if self .search_params .searched_lines [0 ] in decode_result (dd_output ):
115+ return inode
116+ inode += 1
117+ return inode
0 commit comments