@@ -100,41 +100,6 @@ def evaluate(windows : pandas.DataFrame, groupby : str, hyperparameters : dict,
100100 return cv_results , estimator , splits , scores , figures
101101
102102
103- def extract_windows (sensordata : pandas .DataFrame ,
104- window_length : int ,
105- window_hop : int ,
106- groupby : list [str ],
107- time_column = 'time' ,
108- ):
109-
110- groups = sensordata .groupby (groupby , observed = True )
111-
112-
113- for group_idx , group_df in groups :
114-
115- windows = []
116-
117- # make sure order is correct
118- group_df = group_df .reset_index ().set_index (time_column ).sort_index ()
119-
120- # create windows
121- win_start = 0
122- length = len (group_df )
123- while win_start < length :
124- win_end = win_start + window_length
125- # ignore partial window at the end
126- if win_end > length :
127- break
128-
129- win = group_df .iloc [win_start :win_end ].copy ()
130- win ['window' ] = win .index [0 ]
131- assert len (win ) == window_length , (len (win ), window_length )
132-
133- windows .append (win )
134-
135- win_start += window_hop
136-
137- yield windows
138103
139104def assign_window_label (labels , majority = 0.66 ):
140105 """
@@ -199,95 +164,106 @@ def timebased_features(windows : list[pandas.DataFrame],
199164 return df
200165
201166
202- def custom_features (windows : list [pandas .DataFrame ],
203- columns : list [str ],
204- executable : str = '' ,
205- options : dict = {},
206- input_option : str = '--input' ,
207- output_option : str = '--output' ,
208- serialization : str = 'csv' ) -> pandas .DataFrame :
209- """
210- Run a program (executable) to compute features
167+ class DataProcessorProgram ():
211168
212- """
169+ def __init__ (self , executable : str ,
170+ options : dict = {},
171+ input_option : str = '--input' ,
172+ output_option : str = '--output' ,
173+ serialization : str = 'csv'
174+ ):
213175
214- assert serialization == 'csv' # TODO: also support .npy
215- extension = serialization
176+ self .executable = executable
177+ self .options = options
178+ self .input_option = input_option
179+ self .output_option = output_option
180+ self .serialization = serialization
216181
217- # Filter columns
218- data = pandas .concat ([ d for d in windows ])
182+ def process (self , data : pandas .DataFrame ) -> pandas .DataFrame :
183+ """
184+ Run a program (executable) to compute features
219185
220- # FIXME: unhardcode
221- data [ 'time' ] = 0
222- data [ 'gyro_x' ] = 0
223- data [ 'gyro_y' ] = 0
224- data [ 'gyro_z' ] = 0
225- columns = [ 'time' , 'acc_x' , 'acc_y' , 'acc_z' , 'gyro_x' , 'gyro_y' , 'gyro_z' ]
186+ Takes a DataFrame with sensor data as input.
187+ The sensor data should be continous and regular in time.
188+
189+ Returns windows with features.
190+ The windows are usually overlapping in time.
191+ """
226192
227- data = data [columns ]
193+ assert self .serialization == 'csv' # TODO: also support .npy
194+ extension = self .serialization
228195
229- log .debug ('custom-features-start' , columns = list (data .columns ))
196+ # FIXME: unhardcode. Maybe allow specifying column_order ?
197+ data ['gyro_x' ] = 0
198+ data ['gyro_y' ] = 0
199+ data ['gyro_z' ] = 0
200+ data = data .reset_index ()
201+ ser_columns = ['time' , 'acc_x' , 'acc_y' , 'acc_z' , 'gyro_x' , 'gyro_y' , 'gyro_z' ]
202+ data = data [ser_columns ]
230203
231- with tempfile .TemporaryDirectory () as tempdir :
232- data_path = os .path .join (tempdir , f'data.{ extension } ' )
233- features_path = os .path .join (tempdir , f'features.{ extension } ' )
204+ log .debug ('custom-features-start' , columns = list (data .columns ))
234205
235- # Persist the data
236- data .to_csv (data_path , index = False )
206+ with tempfile .TemporaryDirectory () as tempdir :
207+ data_path = os .path .join (tempdir , f'data.{ extension } ' )
208+ features_path = os .path .join (tempdir , f'features.{ extension } ' )
237209
238- # Build arguments
239- args = [
240- executable ,
241- ]
210+ # Persist the data
211+ data .to_csv (data_path , index = False )
242212
243- # Input and output
244- if input_option :
245- args += [ input_option , data_path ]
246- else :
247- args += [ data_path ]
213+ # Build arguments
214+ args = [
215+ self .executable ,
216+ ]
248217
249- if output_option :
250- args += [ output_option , features_path ]
251- else :
252- args += [ features_path ]
218+ # Input and output
219+ if self .input_option :
220+ args += [ self .input_option , data_path ]
221+ else :
222+ args += [ data_path ]
253223
254- # Other options
255- for k , v in options .items ():
256- args += [ f'--{ k } ' , v ]
224+ if self .output_option :
225+ args += [ self .output_option , features_path ]
226+ else :
227+ args += [ features_path ]
257228
258- cmd = ' ' .join (args )
259- try :
260- out = subprocess .check_output (args )
261- except subprocess .CalledProcessError as e :
262- log .error ('preprocessor-error' ,
263- cmd = cmd , out = e .stdout , code = e .returncode , err = e .stderr )
264- raise e
229+ # Other options
230+ for k , v in self .options .items ():
231+ args += [ f'--{ k } ' , v ]
265232
266- # Load output
267- out = pandas .read_csv (features_path )
268- assert len (out ) == len (data )
233+ cmd = ' ' .join (args )
234+ try :
235+ out = subprocess .check_output (args )
236+ except subprocess .CalledProcessError as e :
237+ log .error ('preprocessor-error' ,
238+ cmd = cmd , out = e .stdout , code = e .returncode , err = e .stderr )
239+ raise e
269240
270- # TODO: add feature names
271- df = pandas .DataFrame ( out )
241+ # Load output
242+ out = pandas .read_csv ( features_path )
272243
273- # post-conditions
274- # one feature vector per window
275- assert len (df ) == len (windows ), (len (df ), len (windows ))
244+ # TODO: add feature names
245+ windows = pandas .DataFrame (out )
276246
277- return df
247+ # post-conditions
248+ time_in = data ['time' ] / pandas .Timedelta (seconds = 1 )
249+ time_out = windows ['time' ]
278250
251+ window_duration = 1.0 # XXX: hardcoded
252+ start_delta = time_out .min () - time_in .min ()
253+ assert abs (start_delta ) <= window_duration , (start_delta , time_out .min (), time_in .min ())
254+ end_delta = time_out .max () - time_in .max ()
255+ assert abs (end_delta ) <= window_duration , (end_delta , time_out .max (), time_in .max ())
256+
257+ return windows
258+
259+ class TimebasedFeatureExtractor ():
260+
261+ def __init__ (self ):
262+
263+ here = os .path .dirname (__file__ )
264+ feature_extraction_script = os .path .join (here , 'compute_features.py' )
279265
280- def batched_iterator (iterable , batch_size ):
281- """Yield lists of size batch_size from iterable"""
282- iterator = iter (iterable )
283- while batch := list (itertools .islice (iterator , batch_size )):
284- yield batch
285266
286- def process_in_parallel_streaming (gen , process_item , batch_size = 1000 , n_jobs = - 1 ):
287- for batch in batched_iterator (gen , batch_size ):
288- yield from joblib .Parallel (n_jobs = n_jobs )(
289- joblib .delayed (process_item )(item ) for item in batch
290- )
291267
292268def extract_features (sensordata : pandas .DataFrame ,
293269 columns : list [str ],
@@ -305,59 +281,106 @@ def extract_features(sensordata : pandas.DataFrame,
305281 Convert sensor data into fixed-sized time windows and extact features
306282 """
307283
284+ # TODO: pass in the entire feature extractor
308285 if features == 'quant' :
309286 raise NotImplementedError
310287 elif features == 'timebased' :
288+ raise NotImplementedError # FIXME, bring back
311289 feature_extractor = lambda w : timebased_features (w , columns = columns )
312290 elif features == 'custom' :
313291
292+ # FIXME: unhardcode columns
293+ log .debug ('sensordata' , columns = sensordata .columns , index = sensordata .index .names )
294+ #data['time'] = 0
295+ #data = sensordata
296+ #data = data.reset_index()
297+ #data = data[ser_columns]
298+ #data = data.set_index(groupby)
299+ #sensordata = data
300+
314301 # FIXME: unhardcode
315302 executable = '/home/jon/projects/emlearn/examples/motion_recognition/build/motion_preprocess'
316303 options = {}
317304
318- feature_extractor = lambda w : custom_features (w , columns = columns , executable = executable , options = options )
305+ # FIXME: respect window_length, window_hop
306+ feature_extractor = DataProcessorProgram (executable = executable , options = options )
319307 else :
320308 raise ValueError (f"Unsupported features: { features } " )
321309
322- # Split into fixed-length windows
323- features_values = []
324310
325- def process_one (windows ) -> pandas .DataFrame :
311+ # Process one whole stream of sensor data at a time
312+ # the feature extraction process might have time/history dependent logic,
313+ # such as filters estimating gravity, background levels etc
314+
315+ def process_one (idx , stream : pandas .DataFrame ) -> pandas .DataFrame :
316+
326317 # drop invalid data
327- windows = [ w for w in windows if not w [ columns ]. isnull (). values . any () ]
318+ stream = stream . dropna ( subset = columns )
328319
329320 # Convert from floats in "g" to the sensor scaling in int16
330- data_windows = [ ((w [columns ] / sensitivity ) * (2 ** 15 - 1 )).astype (numpy .int16 ) for w in windows ]
321+ stream .loc [:, columns ] = \
322+ ((stream .loc [:, columns ] / sensitivity ) * (2 ** 15 - 1 )).astype (numpy .int16 )
323+
324+ # FIXME: make sure time-series is regular
325+ # potentially zero-fill ?
331326
332327 # Extract features
333- df = feature_extractor ( data_windows )
328+ windows = feature_extractor . process ( stream )
334329
335330 # Convert features to 16-bit integers
336331 # XXX: Assuming that they are already in resonable scale
337332 # TODO: consider moving the quantization to inside timebased
338- quant = df . values . astype ( numpy . int16 )
339- df .loc [:,: ] = quant
333+ feature_columns = list ( set ( windows . columns ) - set ([ time_column ]) )
334+ windows .loc [:,feature_columns ] = windows [ feature_columns ]. astype ( numpy . int16 )
340335
341336 # Attach labels
342- df [label_column ] = [ assign_window_label (w [label_column ]) for w in windows ]
337+ windows [label_column ] = assign_window_label (stream [label_column ])
343338
344339 # Combine with identifying information
345- index_columns = list (groupby + ['window' ])
346- for idx_column in index_columns :
347- df [idx_column ] = [w [idx_column ].iloc [0 ] for w in windows ]
348- df = df .set_index (index_columns )
340+ # time should come from data processing
341+ assert time_column in windows
342+
343+ # the group is our job to manage
344+ index_columns = list (groupby ) + [time_column ]
345+ for idx_column , idx_value in zip (groupby , idx ):
346+ windows [idx_column ] = idx_value
347+
348+ windows = windows .set_index (index_columns )
349+ log .debug ('process-one-done' ,
350+ columns = list (windows .columns ),
351+ index_columns = list (windows .index .names ),
352+ windows = len (windows ),
353+ samples = len (stream ),
354+ )
355+ return windows
356+
357+ #win['window'] = win.index[0]
358+ # PERF: may be possible to parellize within a sensordata stream,
359+ # but then need each section to have a run-in period that is long enough
360+ # for any time-dependent logic to stabilize, and to merge while ignoring the run-in
361+ def split_sections (data , groupby : list [str ], time_column = 'time' ):
362+ groups = sensordata .groupby (groupby , observed = True )
363+ for group_idx , group_df in groups :
349364
350- return df
365+ # ensure sorted by time
366+ group_df = group_df .reset_index ().set_index (time_column ).sort_index ()
351367
352-
353- data_generator = extract_windows (sensordata , window_length , window_hop , groupby = groupby , time_column = time_column )
354- feature_generator = process_in_parallel_streaming (data_generator , process_one , batch_size = 10 )
368+ yield group_idx , group_df
355369
356- for df in feature_generator :
370+ sections = split_sections (sensordata , groupby = groupby , time_column = time_column )
371+ jobs = [ joblib .delayed (process_one )(idx , df ) for idx , df in sections ]
357372
358- features_values .append (df )
373+ log .debug ('process-parallel' , jobs = len (jobs ))
374+ n_jobs = 4
359375
376+ start_time = time .time ()
377+ features_values = joblib .Parallel (n_jobs = n_jobs )(jobs )
360378 out = pandas .concat (features_values )
379+ duration = round (time .time - start_time (), 3 )
380+
381+ log .debug ('process-parallel-done' , rows = len (out ), duration = duration )
382+
383+
361384 return out
362385
363386def export_model (path , out ):
0 commit comments