5656import os
5757import re
5858import time
59- from typing import Dict , Tuple
59+ from typing import Dict , Iterable , Tuple
6060
6161import sams .base
6262import sams .core
6565
6666
6767class Process :
68+ """ Object representing a single process with a single PID.
69+
70+ Parameters
71+ ----------
72+ pid : int
73+ Process ID.
74+ jobid : int
75+ Slurm job ID.
76+ """
6877 def __init__ (self ,
6978 pid : int ,
7079 jobid : int ):
@@ -81,19 +90,18 @@ def __init__(self,
8190 self .exe = os .readlink (f'/proc/{ self .pid :d} /exe' )
8291 logger .debug (f'Pid: { pid } (JobId: { jobid } ) has exe: { self .exe } ' )
8392 except Exception :
84- logger .debug (f'Pid: { pid } (JobId: { jobid } ) has no exe or pid has disapeard ' )
93+ logger .debug (f'Pid: { pid } (JobId: { jobid } ) has no exe or pid has disapeared ' )
8594 self .ignore = True
86- return
8795
88- def _parse_stat (self , stat ):
96+ def _get_parsed_stats (self , stat ) -> dict :
8997 """Parse the relevant content from /proc/***/stat"""
9098
9199 m = re .search (r'^\d+ \(.*\) [RSDZTyEXxKWPI] (.*)' , stat )
92100 stats = m .group (1 ).split (r' ' )
93101 return dict (user = float (stats [14 - 4 ]) / self .clock_tics , # User CPU time in s.
94102 system = float (stats [15 - 4 ]) / self .clock_tics ) # System CPU time in s.
95103
96- def update (self , uptime ):
104+ def update (self , uptime ) -> None :
97105 """Update information about pids"""
98106 if self .done :
99107 logger .debug (f'Pid: { self .pid :d} is done' )
@@ -112,7 +120,7 @@ def update(self, uptime):
112120 try :
113121 with open (f'/proc/{ self .pid :d} /task/{ task :d} /stat' ) as f :
114122 stat = f .read ()
115- stats = self ._parse_stat (stat )
123+ stats = self ._get_parsed_stats (stat )
116124 self .tasks [task ] = dict (
117125 user = stats ["user" ],
118126 system = stats ["system" ])
@@ -123,7 +131,7 @@ def update(self, uptime):
123131
124132 self .updated = time .time ()
125133
126- def aggregate (self ):
134+ def get_aggregated_task_info (self ) -> Dict :
127135 """Return the aggregated information for all tasks"""
128136 return dict (starttime = self .starttime ,
129137 exe = self .exe ,
@@ -181,9 +189,9 @@ def _collect_sample(self) -> Tuple[Dict, Dict]:
181189 self .processes [pid ] = Process (pid , self .jobid )
182190 self .processes [pid ].update (uptime )
183191 # Send information about current usage
184- return self ._aggregate
192+ return self ._get_aggregated_processes ()
185193
186- def sample (self ):
194+ def sample (self ) -> None :
187195 logger .debug ('sample()' )
188196 aggr , total = self ._collect_sample ()
189197 # Initial call
@@ -207,31 +215,32 @@ def sample(self):
207215 self ._most_recent_sample = self .storage_wrapping (sample )
208216
209217 @property
210- def valid_procs (self ):
218+ def valid_procs (self ) -> Iterable :
211219 """ List of procs for which p.ignore is False """
212220 logger .debug (f'procs: { [p for p in self .processes .values ()]} ' )
213221 logger .debug (f'valid_procs: { [p for p in self .processes .values () if not p .ignore ]} ' )
214222 return [p for p in self .processes .values () if not p .ignore ]
215223
216- def last_updated (self ):
224+ def get_update_time (self ) -> int :
217225 procs = self .valid_procs
218226 if len (procs ) == 0 :
219227 return self .create_time
220228 return int (max (p .updated for p in procs ))
221229
222- def start_time (self ):
230+ def get_start_time (self ) -> int :
223231 procs = self .valid_procs
224232 if not procs :
225233 return 0
226234 return int (min (p .starttime for p in procs ))
227235
228- def _aggregate (self ) -> Tuple [Dict , Dict ]:
236+ def _get_aggregated_processes (self ) -> Tuple [Dict , Dict ]:
229237 aggr = dict ()
230238 total = dict (user = 0.0 ,
231239 system = 0.0 )
232- aggregated_procs = [p .aggregate () for p in self .valid_procs ]
240+ aggregated_procs = [p .get_aggregated_task_info () for p in self .valid_procs ]
233241 for a in aggregated_procs :
234- logger .debug (f'_aggregate: exe: { a ["exe" ]} , user: { a ["user" ]} , system: { a ["system" ]} ' )
242+ logger .debug (f'_get_aggregated_processes: exe: { a ["exe" ]} , '
243+ f'user: { a ["user" ]} , system: { a ["system" ]} ' )
235244 exe = a ['exe' ]
236245 if exe not in aggr :
237246 aggr [exe ] = dict (user = 0.0 , system = 0.0 )
@@ -243,7 +252,7 @@ def _aggregate(self) -> Tuple[Dict, Dict]:
243252
244253 def final_data (self ) -> Dict :
245254 logger .debug ('{self.id} final_data' )
246- aggr , _ = self ._aggregate ()
255+ aggr , _ = self ._get_aggregated_processes ()
247256 return dict (execs = aggr ,
248- start_time = self .start_time (),
249- end_time = self .last_updated ())
257+ start_time = self .get_start_time (),
258+ end_time = self .get_update_time ())
0 commit comments