diff --git a/src/sysdiagnose/analysers/ps_everywhere.py b/src/sysdiagnose/analysers/ps_everywhere.py index edaca35b..8167925c 100644 --- a/src/sysdiagnose/analysers/ps_everywhere.py +++ b/src/sysdiagnose/analysers/ps_everywhere.py @@ -1,7 +1,7 @@ #! /usr/bin/env python3 -from datetime import datetime -from typing import Generator, Set, Optional +from datetime import datetime, timedelta +from typing import Generator, Set, Optional, Dict from sysdiagnose.parsers import ps from sysdiagnose.utils.base import BaseAnalyserInterface, SysdiagnoseConfig, logger, Event from sysdiagnose.parsers.ps import PsParser @@ -22,6 +22,9 @@ class PsEverywhereAnalyser(BaseAnalyserInterface): to build a comprehensive list of running processes across different system logs. The timestamp is 'a' time the process was seen in the log, without being specifically the first or last seen. + + Deduplication strategy: Processes are deduplicated within a 1-hour window. If the same process + appears more than 1 hour apart, both occurrences are kept to track temporal patterns. """ description = "List all processes we can find a bit everywhere." @@ -30,6 +33,10 @@ class PsEverywhereAnalyser(BaseAnalyserInterface): def __init__(self, config: SysdiagnoseConfig, case_id: str): super().__init__(__file__, config, case_id) self.all_ps: Set[str] = set() + # Track last seen timestamp for each process (for time-based deduplication) + self.process_last_seen: Dict[str, datetime] = {} + # PID to process name mapping for parent name resolution + self.pid_to_name: Dict[int, str] = {} @staticmethod def _strip_flags(process: str) -> str: @@ -42,6 +49,81 @@ def _strip_flags(process: str) -> str: process, *_ = process.partition(' ') return process + @staticmethod + def _sanitize_uid(uid: Optional[int]) -> Optional[int]: + """ + Sanitizes UID values by filtering out invalid/placeholder values. + + :param uid: The UID value to sanitize + :return: The UID if valid, None if invalid/placeholder + """ + # 0xAAAAAAAA (2863311530) is a common placeholder/uninitialized value + # 0xFFFFFFFF (4294967295) is -1 as unsigned, also invalid + if uid in (2863311530, 4294967295): + return None + return uid + + def _resolve_ppname(self, ppid: Optional[int]) -> Optional[str]: + """ + Resolves parent process ID to parent process name using the PID mapping. + + :param ppid: Parent process ID + :return: Parent process name if found, None otherwise + """ + if ppid is None: + return None + return self.pid_to_name.get(ppid) + + def _build_pid_mapping(self): + """ + Builds a PID to process name mapping from available parsers. + This mapping is used to resolve parent process names from PPIDs. + """ + # Build from ps.txt + try: + for p in PsParser(self.config, self.case_id).get_result(): + pid = p['data'].get('pid') + command = p['data'].get('command') + if pid and command: + self.pid_to_name[pid] = self._strip_flags(command) + except Exception as e: + logger.debug(f"Could not build PID mapping from ps.txt: {e}") + + # Build from psthread.txt + try: + for p in PsThreadParser(self.config, self.case_id).get_result(): + pid = p['data'].get('pid') + command = p['data'].get('command') + if pid and command: + self.pid_to_name[pid] = self._strip_flags(command) + except Exception as e: + logger.debug(f"Could not build PID mapping from psthread.txt: {e}") + + # Build from spindump + try: + for event in SpindumpNoSymbolsParser(self.config, self.case_id).get_result(): + p = event['data'] + if 'process' in p: + pid = p.get('pid') + process_name = p.get('path', p['process']) + if pid and process_name: + self.pid_to_name[pid] = self._strip_flags(process_name) + except Exception as e: + logger.debug(f"Could not build PID mapping from spindump: {e}") + + # Build from taskinfo + try: + for p in TaskinfoParser(self.config, self.case_id).get_result(): + if 'name' in p['data']: + pid = p['data'].get('pid') + name = p['data'].get('name') + if pid and name: + self.pid_to_name[pid] = self._strip_flags(name) + except Exception as e: + logger.debug(f"Could not build PID mapping from taskinfo: {e}") + + logger.info(f"Built PID mapping with {len(self.pid_to_name)} entries") + @staticmethod def message_extract_binary(process: str, message: str) -> Optional[str | list[str]]: """ @@ -163,6 +245,9 @@ def execute(self) -> Generator[dict, None, None]: :yield: A dictionary containing process details from various sources. """ + # Build PID to name mapping first for parent name resolution + self._build_pid_mapping() + for func in dir(self): if func.startswith(f"_{self.__class__.__name__}__extract_ps_"): yield from getattr(self, func)() # Dynamically call extract methods @@ -176,14 +261,18 @@ def __extract_ps_base_file(self) -> Generator[dict, None, None]: entity_type = 'ps.txt' try: for p in PsParser(self.config, self.case_id).get_result(): + uid = self._sanitize_uid(p['data'].get('uid')) + pid = p['data'].get('pid') + ppid = p['data'].get('ppid') + ppname = self._resolve_ppname(ppid) ps_event = Event( datetime=datetime.fromisoformat(p['datetime']), - message= self._strip_flags(p['data']['command']), + message=self._strip_flags(p['data']['command']), timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': uid, 'pid': pid, 'ppid': ppid, 'ppname': ppname} ) - if self.add_if_full_command_is_not_in_set(ps_event.message): + if self.add_if_full_command_is_not_in_set(ps_event.message, ps_event.datetime, uid, pid, ppid): yield ps_event.to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type} file. {e}") @@ -197,14 +286,18 @@ def __extract_ps_thread_file(self) -> Generator[dict, None, None]: entity_type = 'psthread.txt' try: for p in PsThreadParser(self.config, self.case_id).get_result(): + uid = self._sanitize_uid(p['data'].get('uid')) + pid = p['data'].get('pid') + ppid = p['data'].get('ppid') + ppname = self._resolve_ppname(ppid) ps_event = Event( datetime=datetime.fromisoformat(p['datetime']), message=self._strip_flags(p['data']['command']), timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': uid, 'pid': pid, 'ppid': ppid, 'ppname': ppname} ) - if self.add_if_full_command_is_not_in_set(ps_event.message): + if self.add_if_full_command_is_not_in_set(ps_event.message, ps_event.datetime, uid, pid, ppid): yield ps_event.to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type} file. {e}") @@ -222,26 +315,32 @@ def __extract_ps_spindump_nosymbols_file(self) -> Generator[dict, None, None]: if 'process' not in p: continue process_name = p.get('path', '/kernel' if p['process'] == 'kernel_task [0]' else p['process']) - - if self.add_if_full_command_is_not_in_set(self._strip_flags(process_name)): + event_datetime = datetime.fromisoformat(event['datetime']) + uid = self._sanitize_uid(p.get('uid')) + pid = p.get('pid') + ppid = p.get('ppid') + # Spindump has a direct 'parent' field with the parent process name + ppname = p.get('parent') + + if self.add_if_full_command_is_not_in_set(self._strip_flags(process_name), event_datetime, uid, pid, ppid): yield Event( - datetime=datetime.fromisoformat(event['datetime']), + datetime=event_datetime, message=self._strip_flags(process_name), timestamp_desc=event['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': uid, 'pid': pid, 'ppid': ppid, 'ppname': ppname} ).to_dict() for t in p['threads']: try: thread_name = f"{self._strip_flags(process_name)}::{t['thread_name']}" - if self.add_if_full_command_is_not_in_set(thread_name): + if self.add_if_full_command_is_not_in_set(thread_name, event_datetime, uid, pid, ppid): yield Event( - datetime=datetime.fromisoformat(event['datetime']), + datetime=event_datetime, message=self._strip_flags(thread_name), timestamp_desc=event['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': uid, 'pid': pid, 'ppid': ppid, 'ppname': ppname} ).to_dict() except KeyError: pass @@ -252,19 +351,24 @@ def __extract_ps_shutdownlogs(self) -> Generator[dict, None, None]: """ Extracts process data from shutdown logs. + Note: Unlike other sources, shutdown logs always keep all entries even if duplicate, + as each entry represents a different shutdown event where the process was blocking. + :return: A generator yielding dictionaries containing process details from shutdown logs. """ entity_type = 'shutdown.logs' try: for p in ShutdownLogsParser(self.config, self.case_id).get_result(): - if self.add_if_full_command_is_not_in_set(self._strip_flags(p['data']['command'])): - yield Event( - datetime=datetime.fromisoformat(p['datetime']), - message=self._strip_flags(p['data']['command']), - timestamp_desc=p['timestamp_desc'], - module=self.module_name, - data={'source': entity_type} - ).to_dict() + # Always yield shutdown log entries, even if duplicate + # Each occurrence represents a different shutdown event + pid = p['data'].get('pid') + yield Event( + datetime=datetime.fromisoformat(p['datetime']), + message=self._strip_flags(p['data']['command']), + timestamp_desc=p['timestamp_desc'], + module=self.module_name, + data={'source': entity_type, 'uid': None, 'pid': pid, 'ppid': None, 'ppname': None} + ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") @@ -277,39 +381,43 @@ def __extract_ps_logarchive(self) -> Generator[dict, None, None]: entity_type = 'log archive' try: for p in LogarchiveParser(self.config, self.case_id).get_result(): + p_datetime = datetime.fromisoformat(p['datetime']) + euid = self._sanitize_uid(p['data'].get('euid')) + pid = p['data'].get('pid') + # First check if we can extract a binary from the message extracted_process = self.message_extract_binary(p['data']['process'], p['message']) if extracted_process: # Handle the case where extracted_process is a list of paths if isinstance(extracted_process, list): for proc_path in extracted_process: - if self.add_if_full_command_is_not_in_set(self._strip_flags(proc_path)): + if self.add_if_full_command_is_not_in_set(self._strip_flags(proc_path), p_datetime, None, None, None): yield Event( - datetime.fromisoformat(p['datetime']), + p_datetime, message=self._strip_flags(proc_path), timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': None, 'pid': None, 'ppid': None, 'ppname': None} ).to_dict() else: # Handle the case where it's a single string - if self.add_if_full_command_is_not_in_set(self._strip_flags(extracted_process)): + if self.add_if_full_command_is_not_in_set(self._strip_flags(extracted_process), p_datetime, None, None, None): yield Event( - datetime=datetime.fromisoformat(p['datetime']), + datetime=p_datetime, message=self._strip_flags(extracted_process), timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': None, 'pid': None, 'ppid': None, 'ppname': None} ).to_dict() # Process the original process name - if self.add_if_full_command_is_not_in_set(self._strip_flags(p['data']['process'])): + if self.add_if_full_command_is_not_in_set(self._strip_flags(p['data']['process']), p_datetime, euid, pid, None): yield Event( - datetime=datetime.fromisoformat(p['datetime']), + datetime=p_datetime, message=self._strip_flags(p['data']['process']), timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': euid, 'pid': pid, 'ppid': None, 'ppname': None} ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") @@ -323,14 +431,13 @@ def __extract_ps_uuid2path(self) -> Generator[dict, None, None]: entity_type = 'uuid2path' try: for p in UUID2PathParser(self.config, self.case_id).get_result().values(): - if self.add_if_full_command_is_not_in_set(self._strip_flags(p)): - # FIXME: what timestamp to use here? + if self.add_if_full_command_is_not_in_set(self._strip_flags(p), self.sysdiagnose_creation_datetime, None, None, None): yield Event( datetime=self.sysdiagnose_creation_datetime, message=self._strip_flags(p), timestamp_desc="Process path from UUID existing at sysdiagnose creation time", module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': None, 'pid': None, 'ppid': None, 'ppname': None} ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") @@ -347,25 +454,27 @@ def __extract_ps_taskinfo(self) -> Generator[dict, None, None]: if 'name' not in p['data']: continue - if self.add_if_full_path_is_not_in_set(self._strip_flags(p['data']['name'])): + p_datetime = datetime.fromisoformat(p['datetime']) + pid = p['data'].get('pid') + if self.add_if_full_path_is_not_in_set(self._strip_flags(p['data']['name']), p_datetime, None, pid, None): yield Event( - datetime=datetime.fromisoformat(p['datetime']), + datetime=p_datetime, message=self._strip_flags(p['data']['name']), timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': None, 'pid': pid, 'ppid': None, 'ppname': None} ).to_dict() for t in p['data']['threads']: try: thread_name = f"{self._strip_flags(p['data']['name'])}::{t['thread name']}" - if self.add_if_full_path_is_not_in_set(thread_name): + if self.add_if_full_path_is_not_in_set(thread_name, p_datetime, None, pid, None): yield Event( - datetime.fromisoformat(p['datetime']), + p_datetime, message=thread_name, timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': None, 'pid': pid, 'ppid': None, 'ppname': None} ).to_dict() except KeyError: pass @@ -383,14 +492,13 @@ def __extract_ps_remotectl_dumpstate(self) -> Generator[dict, None, None]: remotectl_dumpstate_json = RemotectlDumpstateParser(self.config, self.case_id).get_result() if remotectl_dumpstate_json: for p in remotectl_dumpstate_json['Local device']['Services']: - if self.add_if_full_path_is_not_in_set(self._strip_flags(p)): - # FIXME: what timestamp to use here? + if self.add_if_full_path_is_not_in_set(self._strip_flags(p), self.sysdiagnose_creation_datetime, None, None, None): yield Event( datetime=self.sysdiagnose_creation_datetime, message=self._strip_flags(p), timestamp_desc="Existing service at sysdiagnose creation time", module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': None, 'pid': None, 'ppid': None, 'ppname': None} ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") @@ -404,13 +512,14 @@ def __extract_ps_logdata_statistics(self) -> Generator[dict, None, None]: entity_type = 'logdata.statistics.jsonl' try: for p in LogDataStatisticsParser(self.config, self.case_id).get_result(): - if self.add_if_full_command_is_not_in_set(self._strip_flags(p['data']['process'])): + p_datetime = datetime.fromisoformat(p['datetime']) + if self.add_if_full_command_is_not_in_set(self._strip_flags(p['data']['process']), p_datetime, None, None, None): yield Event( - datetime=datetime.fromisoformat(p['datetime']), + datetime=p_datetime, message=self._strip_flags(p['data']['process']), timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': None, 'pid': None, 'ppid': None, 'ppname': None} ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") @@ -425,44 +534,91 @@ def __extract_ps_logdata_statistics_txt(self) -> Generator[dict, None, None]: try: for p in LogDataStatisticsTxtParser(self.config, self.case_id).get_result(): - if self.add_if_full_path_is_not_in_set(self._strip_flags(p['data']['process'])): + p_datetime = datetime.fromisoformat(p['datetime']) + if self.add_if_full_path_is_not_in_set(self._strip_flags(p['data']['process']), p_datetime, None, None, None): yield Event( - datetime=datetime.fromisoformat(p['datetime']), + datetime=p_datetime, message=self._strip_flags(p['data']['process']), timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': None, 'pid': None, 'ppid': None, 'ppname': None} ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") - def add_if_full_path_is_not_in_set(self, name: str) -> bool: + def add_if_full_path_is_not_in_set(self, name: str, timestamp: Optional[datetime] = None, uid: Optional[int] = None, pid: Optional[int] = None, ppid: Optional[int] = None) -> bool: """ - Ensures that a process path is unique before adding it to the shared set. + Ensures that a process path is unique before adding it to the shared set, + with time-based deduplication: only keep duplicates if they occur more than 1 hour apart. + UID, PID, and PPID are considered part of the uniqueness - same process with different values is treated as separate. :param name: Process path name - :return: True if the process was not in the set and was added, False otherwise. + :param timestamp: Timestamp of the process occurrence (optional, for time-based deduplication) + :param uid: User ID of the process (optional, considered in uniqueness check) + :param pid: Process ID (optional, considered in uniqueness check) + :param ppid: Parent Process ID (optional, considered in uniqueness check) + :return: True if the process was not in the set or last seen > 1 hour ago, False otherwise. """ - for item in self.all_ps: - if item.endswith(name): - return False - if item.split('::')[0].endswith(name): + # Create a unique key that includes name, UID, PID, and PPID + unique_key = f"{name}|uid:{uid}|pid:{pid}|ppid:{ppid}" + + # If no timestamp provided, use old behavior (always check for duplicates) + if timestamp is None: + for item in self.all_ps: + if item.endswith(name): + return False + if item.split('::')[0].endswith(name): + return False + if '::' not in item and item.split(' ')[0].endswith(name): + return False # This covers cases with space-separated commands + self.all_ps.add(unique_key) + return True + + # Time-based deduplication: check if we've seen this process+uid combination recently + if unique_key in self.process_last_seen: + time_diff = timestamp - self.process_last_seen[unique_key] + # Only add if more than 1 hour has passed + if time_diff < timedelta(hours=1): return False - if '::' not in item and item.split(' ')[0].endswith(name): - return False # This covers cases with space-separated commands - self.all_ps.add(name) + + # Add or update the process + self.all_ps.add(unique_key) + self.process_last_seen[unique_key] = timestamp return True - def add_if_full_command_is_not_in_set(self, name: str) -> bool: + def add_if_full_command_is_not_in_set(self, name: str, timestamp: Optional[datetime] = None, uid: Optional[int] = None, pid: Optional[int] = None, ppid: Optional[int] = None) -> bool: """ - Ensures that a process command is unique before adding it to the shared set. + Ensures that a process command is unique before adding it to the shared set, + with time-based deduplication: only keep duplicates if they occur more than 1 hour apart. + UID, PID, and PPID are considered part of the uniqueness - same process with different values is treated as separate. :param name: Process command name - :return: True if the process was not in the set and was added, False otherwise. + :param timestamp: Timestamp of the process occurrence (optional, for time-based deduplication) + :param uid: User ID of the process (optional, considered in uniqueness check) + :param pid: Process ID (optional, considered in uniqueness check) + :param ppid: Parent Process ID (optional, considered in uniqueness check) + :return: True if the process was not in the set or last seen > 1 hour ago, False otherwise. """ - for item in self.all_ps: - if item.startswith(name): + # Create a unique key that includes name, UID, PID, and PPID + unique_key = f"{name}|uid:{uid}|pid:{pid}|ppid:{ppid}" + + # If no timestamp provided, use old behavior (always check for duplicates) + if timestamp is None: + for item in self.all_ps: + if item.startswith(name): + return False + self.all_ps.add(unique_key) + return True + + # Time-based deduplication: check if we've seen this process+uid combination recently + if unique_key in self.process_last_seen: + time_diff = timestamp - self.process_last_seen[unique_key] + # Only add if more than 1 hour has passed + if time_diff < timedelta(hours=1): return False - self.all_ps.add(name) + + # Add or update the process + self.all_ps.add(unique_key) + self.process_last_seen[unique_key] = timestamp return True