diff --git a/examples/har_trees/compute_features.py b/examples/har_trees/compute_features.py index 23f23a9f..978be45a 100644 --- a/examples/har_trees/compute_features.py +++ b/examples/har_trees/compute_features.py @@ -7,15 +7,22 @@ from timebased import calculate_features_xyz, DATA_TYPECODE, N_FEATURES -def compute_dataset_features(data: npyfile.Reader, +def compute_dataset_features(data: npyfile.Reader, window_length, + hop_length=None, skip_samples=0, limit_samples=None, verbose=0): + if hop_length is None: + hop_length = window_length + + if window_length % hop_length != 0: + raise ValueError(f"hop_length must be an even divisor of window_length. Got window={window_length} hop={hop_length}") + + # Check that data is expected format shape = data.shape - assert len(shape) == 3, shape - n_samples, window_length, n_axes = shape + assert len(shape) == 2, shape + n_samples, n_axes = shape assert n_axes == 3, shape - #assert window_length == 128, shape # We expect data to be h/int16 assert data.typecode == DATA_TYPECODE, data.typecode @@ -26,26 +33,39 @@ def compute_dataset_features(data: npyfile.Reader, y_values = array.array(DATA_TYPECODE, (0 for _ in range(window_length))) z_values = array.array(DATA_TYPECODE, (0 for _ in range(window_length))) - chunk_size = window_length*n_axes - sample_counter = 0 + chunk_size = hop_length*n_axes + window_counter = 0 + start_idx = 0 data_chunks = data.read_data_chunks(chunk_size, offset=chunk_size*skip_samples) + for arr in data_chunks: - # process the data + print('cc', len(arr)) + if len(arr) < chunk_size: + # short read, last data piece, ignore + continue + + # Window was full, make room for more + if start_idx >= window_length: + overlap = window_length - hop_length + if overlap > 0: + x_values[:overlap] = x_values[hop_length:] + y_values[:overlap] = y_values[hop_length:] + z_values[:overlap] = z_values[hop_length:] + start_idx = overlap + + # Copy the input data # De-interleave data from XYZ1 XYZ2... into separate continious X,Y,Z - for i in range(window_length): + for i in range(hop_length): x_values[i] = arr[(i*3)+0] y_values[i] = arr[(i*3)+1] z_values[i] = arr[(i*3)+2] + start_idx += hop_length - #print(x_values) - #print(y_values) - #print(z_values) - - assert len(x_values) == window_length - assert len(y_values) == window_length - assert len(z_values) == window_length + # waiting for window to fill + if start_idx < window_length: + continue feature_calc_start = time.ticks_ms() features = calculate_features_xyz((x_values, y_values, z_values)) @@ -54,35 +74,47 @@ def compute_dataset_features(data: npyfile.Reader, print('feature-calc-end', duration) yield features + window_counter += 1 - sample_counter += 1 - if limit_samples is not None and sample_counter > limit_samples: + if limit_samples is not None and window_counter > limit_samples: break -def main(): +def parse(): + import argparse + + parser = argparse.ArgumentParser(description='Compute features from NPY file') + parser.add_argument('--input', required=True, help='Input .npy file') + parser.add_argument('--output', required=True, help='Output .npy file') + parser.add_argument('--samplerate', type=int, default=None, help='Samplerate. Currently ignored') + parser.add_argument('--skip', type=int, default=0, help='Number of samples to skip (default: 0)') + parser.add_argument('--limit', type=int, default=None, help='Maximum number of samples to process (default: None)') + parser.add_argument('--window_length', type=int, default=128, help='Window length (default: 128)') + parser.add_argument('--hop_length', type=int, default=None, help='Hop length (default: window_length)') - if len(sys.argv) != 3: - print('Usage: compute_features.py IN.npy OUT.npy') + args = parser.parse_args() + return args - _, in_path, out_path = sys.argv +def main(): - skip_samples = 0 - limit_samples = None + args = parse() out_typecode = 'f' - n_features = N_FEATURES - + n_features = N_FEATURES features_array = array.array(out_typecode, (0 for _ in range(n_features))) - with npyfile.Reader(in_path) as data: - n_samples, window_length, n_axes = data.shape + with npyfile.Reader(args.input) as data: + n_samples, n_axes = data.shape + + n_windows = (n_samples - args.window_length) // args.hop_length - out_shape = (n_samples, n_features) - with npyfile.Writer(out_path, out_shape, out_typecode) as out: + out_shape = (n_windows, n_features) + with npyfile.Writer(args.output, out_shape, out_typecode) as out: generator = compute_dataset_features(data, - skip_samples=skip_samples, - limit_samples=limit_samples, + window_length=args.window_length, + hop_length=args.hop_length, + skip_samples=args.skip, + limit_samples=args.limit, ) for features in generator: #print('features', len(features), features) diff --git a/examples/har_trees/data/configurations/uci_har.yaml b/examples/har_trees/data/configurations/uci_har.yaml index 9d044d1c..086e1c24 100644 --- a/examples/har_trees/data/configurations/uci_har.yaml +++ b/examples/har_trees/data/configurations/uci_har.yaml @@ -5,6 +5,8 @@ data_columns: - acc_x - acc_y - acc_z +features: 'custom' +samplerate: 50 classes: # - STAND_TO_LIE # - SIT_TO_LIE diff --git a/examples/har_trees/har_train.py b/examples/har_trees/har_train.py index 8262a1e5..a097dfa5 100644 --- a/examples/har_trees/har_train.py +++ b/examples/har_trees/har_train.py @@ -7,6 +7,7 @@ import subprocess import json import itertools +from typing import Optional import yaml import pandas @@ -100,181 +101,267 @@ def evaluate(windows : pandas.DataFrame, groupby : str, hyperparameters : dict, return cv_results, estimator, splits, scores, figures -def extract_windows(sensordata : pandas.DataFrame, - window_length : int, - window_hop : int, - groupby : list[str], - time_column = 'time', - ): - - groups = sensordata.groupby(groupby, observed=True) - - - for group_idx, group_df in groups: - - windows = [] - - # make sure order is correct - group_df = group_df.reset_index().set_index(time_column).sort_index() - - # create windows - win_start = 0 - length = len(group_df) - while win_start < length: - win_end = win_start + window_length - # ignore partial window at the end - if win_end > length: - break - - win = group_df.iloc[win_start:win_end].copy() - win['window'] = win.index[0] - assert len(win) == window_length, (len(win), window_length) - - windows.append(win) - - win_start += window_hop - - yield windows -def assign_window_label(labels, majority=0.66): - """ - Assign the most common label to window, if it is above the @majority threshold - Otherwise return NA - """ - counts = labels.value_counts(dropna=True) - threshold = majority * len(labels) - if counts.iloc[0] > threshold: - return counts.index[0] - else: - return None +class DataProcessorProgram(): + def __init__(self, program : list[str], + options : dict = {}, + input_option : str = '--input', + output_option : str = '--output', + serialization : str = 'csv', + timeout : float = 10.0, + column_order : Optional[list[str]] = None, + ): -def timebased_features(windows : list[pandas.DataFrame], - columns : list[str], - python_bin='python') -> pandas.DataFrame: + self.program = program + self.options = options + self.input_option = input_option + self.output_option = output_option + self.serialization = serialization + self.timeout = timeout + self.column_order = column_order - #print('w', len(windows), columns) + supported = set(['npy', 'csv']) + if not serialization in supported: + raise ValueError(f'Unsupported serialization {serialization}. Valid: {supported}') - here = os.path.dirname(__file__) - feature_extraction_script = os.path.join(here, 'compute_features.py') - data = numpy.stack([ d[columns] for d in windows ]) - assert data.dtype == numpy.int16 - assert data.shape[2] == 3, data.shape + def process(self, data : pandas.DataFrame) -> pandas.DataFrame: + """ + Run a program (executable) to compute features - #log.debug('data-range', - # upper=numpy.quantile(data, 0.99), - # lower=numpy.quantile(data, 0.01), - #) + Takes a DataFrame with sensor data as input. + The sensor data should be continous and regular in time. + + Returns windows with features. + The windows are usually overlapping in time. + """ + + + extension = self.serialization + + mod = data.copy() + if self.column_order is not None: + mod = mod.reset_index() + mod = mod[self.column_order] + + with tempfile.TemporaryDirectory() as tempdir: + #tempdir = 'temp' # XXX, debug + + data_path = os.path.join(tempdir, f'data.{extension}') + features_path = os.path.join(tempdir, f'features.{extension}') + + # Persist the data + if self.serialization == 'npy': + # make sure C order, and non-sparse + arr = numpy.ascontiguousarray(mod) + arr = arr.astype(numpy.int16) + numpy.save(data_path, arr) + elif self.serialization == 'csv': + mod.to_csv(data_path, index=False) + else: + raise NotImplementedError(self.serialization) + + # Build arguments + args = [] + self.program + + # Input and output + if self.input_option: + args += [ self.input_option, data_path ] + else: + args += [ data_path ] + + if self.output_option: + args += [ self.output_option, features_path ] + else: + args += [ features_path ] + + # Other options + for k, v in self.options.items(): + args += [ f'--{k}', str(v) ] + + cmd = ' '.join(args) + try: + out = subprocess.check_output(args, timeout=self.timeout) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: + code = getattr(e, 'returncode', None) + log.error('preprocessor-error', + cmd=cmd, out=e.stdout, err=e.stderr, code=code) + raise e + + log.debug('preprocessor-run', + cmd=cmd, + #out=out, + ) + + # Load output + if self.serialization == 'npy': + # TODO: support feature names. Separat output file, with --features + out = numpy.load(features_path) + windows = pandas.DataFrame(out) + # FIXME: support reading times, not infer + span = (data.index.max() - data.index.min()).total_seconds() + dt = span / len(windows) + log.debug('preprocess', windows=len(windows), dt=dt) + windows['time'] = dt * numpy.arange(len(windows)) + elif self.serialization == 'csv': + windows = pandas.read_csv(features_path) + span = (data.index.max() - data.index.min()).total_seconds() + dt = span / len(windows) + log.debug('preprocess', windows=len(windows), dt=dt) + else: + raise NotImplementedError(self.serialization) + + windows['time'] = pandas.to_timedelta(windows['time'], unit='s') + + # post-conditions + time_in = data.index + time_out = windows['time'] + + window_duration = pandas.Timedelta(4.0, unit='s') # XXX: hardcoded + start_delta = time_out.min() - time_in.min() + assert abs(start_delta) <= window_duration, (start_delta, time_out.min(), time_in.min()) + end_delta = time_out.max() - time_in.max() + assert abs(end_delta) <= window_duration, (end_delta, time_out.max(), time_in.max()) + + return windows + +class TimebasedFeatureExtractor(DataProcessorProgram): + + def __init__(self, sensitivity=4.0, python_bin='python', **kwargs): + super().__init__(self, serialization='npy', **kwargs) + + here = os.path.dirname(__file__) + feature_extraction_script = os.path.join(here, 'compute_features.py') + self.program = [ python_bin, feature_extraction_script ] + + self.sensitivity = sensitivity + + def process(self, data): + + data = data.copy() + columns = self.column_order + + # Convert from floats in "g" to the sensor scaling in int16 + data.loc[:, columns] = \ + ((data.loc[:, columns] / self.sensitivity) * (2**15-1)).astype(numpy.int16) - with tempfile.TemporaryDirectory() as tempdir: - data_path = os.path.join(tempdir, 'data.npy') - features_path = os.path.join(tempdir, 'features.npy') + return super().process(data) - # Persist output - numpy.save(data_path, data) - # Run MicroPython program - args = [ - python_bin, - feature_extraction_script, - data_path, - features_path, - ] - cmd = ' '.join(args) - #log.debug('run-timebased', cmd=cmd) - try: - out = subprocess.check_output(args) - except subprocess.CalledProcessError as e: - log.error('micropython-error', out=e.stdout, err=e.stderr) - raise e - - # Load output - out = numpy.load(features_path) - assert len(out) == len(data) - - # TODO: add feature names - df = pandas.DataFrame(out) - - return df - -def batched_iterator(iterable, batch_size): - """Yield lists of size batch_size from iterable""" - iterator = iter(iterable) - while batch := list(itertools.islice(iterator, batch_size)): - yield batch - -def process_in_parallel_streaming(gen, process_item, batch_size=1000, n_jobs=-1): - for batch in batched_iterator(gen, batch_size): - yield from joblib.Parallel(n_jobs=n_jobs)( - joblib.delayed(process_item)(item) for item in batch - ) def extract_features(sensordata : pandas.DataFrame, columns : list[str], - groupby, - window_length = 128, - window_hop = 64, - features='timebased', - quant_div = 4, - quant_depth = 6, - sensitivity = 2.0, # how many g range the int16 sensor data has + groupby : list[str], + extractor, + samplerate = 50, label_column='activity', time_column='time', + parallel_jobs : int = 4 , + verbose = 1, ) -> pandas.DataFrame: """ Convert sensor data into fixed-sized time windows and extact features """ - if features == 'quant': - raise NotImplementedError - elif features == 'timebased': - feature_extractor = lambda w: timebased_features(w, columns=columns) - else: - raise ValueError(f"Unsupported features: {features}") - - # Split into fixed-length windows - features_values = [] + # Process one whole stream of sensor data at a time + # the feature extraction process might have time/history dependent logic, + # such as filters estimating gravity, background levels etc + def process_one(idx, stream : pandas.DataFrame) -> pandas.DataFrame: + + if verbose >= 4: + log.debug('process-one-start', + samples=len(stream), + idx=idx, + ) - def process_one(windows) -> pandas.DataFrame: # drop invalid data - windows = [ w for w in windows if not w[columns].isnull().values.any() ] - - # Convert from floats in "g" to the sensor scaling in int16 - data_windows = [ ((w[columns] / sensitivity) * (2**15-1)).astype(numpy.int16) for w in windows ] + stream = stream.dropna(subset=columns) # Extract features - df = feature_extractor(data_windows) + windows = extractor.process(stream) - # Convert features to 16-bit integers - # XXX: Assuming that they are already in resonable scale - # TODO: consider moving the quantization to inside timebased - quant = df.values.astype(numpy.int16) - df.loc[:,:] = quant + # Combine with identifying information + # time should come from data processing + assert time_column in windows + + # the group is our job to manage + index_columns = list(groupby) + [time_column] + for idx_column, idx_value in zip(groupby, idx): + windows[idx_column] = idx_value + + windows = windows.set_index(index_columns) + if verbose >= 4: + + log.debug('process-one-done', + columns=list(windows.columns), + index_columns=list(windows.index.names), + windows=len(windows), + samples=len(stream), + ) + return windows + + #win['window'] = win.index[0] + # PERF: may be possible to parellize within a sensordata stream, + # but then need each section to have a run-in period that is long enough + # for any time-dependent logic to stabilize, and to merge while ignoring the run-in + def split_sections(data, groupby : list[str], time_column='time'): + groups = sensordata.groupby(groupby, observed=True) + for group_idx, df in groups: - # Attach labels - df[label_column] = [ assign_window_label(w[label_column]) for w in windows ] + # ensure sorted by time + df = df.reset_index() - # Combine with identifying information - index_columns = list(groupby + ['window']) - for idx_column in index_columns: - df[idx_column] = [w[idx_column].iloc[0] for w in windows] - df = df.set_index(index_columns) + # convert to time-delta, if neeeded + #if pandas.api.types.is_datetime64_dtype(df[time_column]): + # df[time_column] = df[time_column] - df[time_column].min() - return df + df = df.set_index(time_column).sort_index() - - data_generator = extract_windows(sensordata, window_length, window_hop, groupby=groupby, time_column=time_column) - feature_generator = process_in_parallel_streaming(data_generator, process_one, batch_size=10) + expected_freq = pandas.Timedelta(1/samplerate, unit='s') + diff = df.index.to_series().diff() + holes = diff[diff > expected_freq] + irregular = diff[diff != expected_freq].dropna() + + # Convert to regular time-series + times = pandas.timedelta_range(df.index.min(), df.index.max(), freq=expected_freq) + df = df.reindex(times) + + missing = df[columns].isna().any(axis=1) + missing_ratio = numpy.count_nonzero(missing) / len(df) + if missing_ratio > 0.01: + log.debug('section-missing-data', + idx=group_idx, + rows=len(df[missing]), + ratio=missing_ratio, + irregular=len(irregular), + ) + + # Fill holes (if any) + df = df.ffill() + + assert pandas.api.types.is_timedelta64_dtype(df.index) - for df in feature_generator: + df[time_column] = df.index - features_values.append(df) + yield group_idx, df + sections = split_sections(sensordata, groupby=groupby, time_column=time_column) + jobs = [ joblib.delayed(process_one)(idx, df) for idx, df in sections] + + log.debug('process-parallel', jobs=len(jobs)) + + start_time = time.time() + features_values = joblib.Parallel(n_jobs=parallel_jobs)(jobs) out = pandas.concat(features_values) + duration = round(time.time() - start_time, 3) + + log.debug('process-parallel-done', rows=len(out), duration=duration) + + return out -def export_model(path, out): +def export_model(path, out, name='motion'): with open(path, "rb") as f: classifier = pickle.load(f) @@ -283,7 +370,7 @@ def export_model(path, out): class_mapping = dict(zip(classes, range(len(classifier.classes_)))) cmodel = emlearn.convert(classifier) - cmodel.save(name='harmodel', format='csv', file=out) + cmodel.save(name=name, format='csv', file=out) def load_config(file_path): @@ -292,6 +379,99 @@ def load_config(file_path): data = yaml.safe_load(f) return data +def label_windows(sensordata, + windows, + groupby, + label_column, + window_duration : pandas.Timedelta, + majority=0.66) -> pandas.DataFrame: + + windows = windows.copy() + # default to unknown=NA + windows[label_column] = None + + print(sensordata.head()) + + sensor_groups = {idx: df for idx, df in sensordata.groupby(groupby, group_keys=False, as_index=False) } + + log.debug('label-windows', groups=groupby, g=list(sensor_groups.keys())) + + for idx, ww in windows.groupby(groupby): + data = sensor_groups[idx] + #log.debug('label-window', idx=idx, index_dtype=data.index.dtype) + data = data.reset_index().set_index('time') # XXX: Why is this needed? + + # convert to time-delta, if neeeded + #if pandas.api.types.is_datetime64_dtype(data.index): + # data.index = data.index - data.index.min() + + for idx, w in ww.iterrows(): + window_end = idx[-1] # XXX: assuming this is time + window_start = window_end - window_duration + + labels = data.loc[window_start:window_end, label_column] + threshold = majority * len(labels) + counts = labels.value_counts(dropna=True) + label = counts.index[0] if counts.iloc[0] > threshold else None + windows.loc[idx, label_column] = label + + return windows + +def plot_timelines(sensordata, windows, groupby, sensor_columns, label_column): + + # Plot + from plotting import make_timeline_plot + + sensor_columns = [ c for c in sensor_columns if c.startswith('gyro_')] # XXX + + sensor_groups = {idx: df for idx, df in sensordata.groupby(groupby, group_keys=False, as_index=False) } + + log.debug('label-windows', groups=groupby, g=list(sensor_groups.keys())) + + for idx, ww in windows.groupby(groupby, group_keys=False, as_index=False): + data = sensor_groups[idx] + #log.debug('label-window', idx=idx, index_dtype=data.index.dtype) + + # XXX: Why is this needed? + data = data.reset_index().set_index('time') + ww = ww.reset_index().set_index('time') + + # convert to seconds + #data.index = data.index / pandas.Timedelta(seconds=1) + #ww.index = ww.index / pandas.Timedelta(seconds=1) + + feature_columns = list(ww.columns) + + feature_columns = [ + 'motion_mag_rms', 'motion_mag_p2p', 'motion_x_rms', 'motion_y_rms', 'motion_z_rms', + 'fft_0_4hz', 'fft_0_8hz', 'fft_1_2hz', 'fft_1_6hz', 'fft_1_10hz', 'fft_2_3hz', 'fft_2_7hz', 'fft_3_1hz', 'fft_3_5hz'] + + line_features = [ + #'orientation_x', 'orientation_y', 'orientation_z', + 'motion_mag_rms' + ] + + #print('pp', ww[o]) + + idx_name = '_'.join([str(s) for s in idx] ) + plot_path = f'plot_{idx_name}.png' + # Make a plot + width = 1600 + aspect = 2.0 + height = width/aspect + fig = make_timeline_plot(data, ww, + sensor_columns=sensor_columns, + label_column=label_column, + line_feature_columns=line_features, + heatmap_feature_columns=feature_columns, + colors=None, + class_names=['class_0', 'class_1'], # FIXME: pass + predictions=None, # FIXME: pass separate + width=width, aspect=aspect) + + fig.write_image(plot_path, scale=1.5, width=width, height=height) + print('Wrote plot', plot_path) + def run_pipeline(run, hyperparameters, dataset, config, data_dir, @@ -303,7 +483,6 @@ def run_pipeline(run, hyperparameters, dataset, dataset_config = load_config(config) - if not os.path.exists(out_dir): os.makedirs(out_dir) @@ -313,9 +492,6 @@ def run_pipeline(run, hyperparameters, dataset, log.info('data-load-start', dataset=dataset) data = pandas.read_parquet(data_path) - #print(data.index.names) - #print(data.columns) - groups = dataset_config['groups'] data_columns = dataset_config['data_columns'] enabled_classes = dataset_config['classes'] @@ -323,6 +499,9 @@ def run_pipeline(run, hyperparameters, dataset, time_column = dataset_config.get('time_column', 'time') sensitivity = dataset_config.get('sensitivity', 4.0) + print('dd', sorted(data.columns)) + print('dt', data.dtypes) + data[label_column] = data[label_column].astype(str) data_load_duration = time.time() - data_load_start @@ -331,18 +510,75 @@ def run_pipeline(run, hyperparameters, dataset, feature_extraction_start = time.time() log.info('feature-extraction-start', dataset=dataset, + features=features, ) window_length = model_settings['window_length'] + samplerate = dataset_config.get('samplerate', 100) + window_hop = model_settings['window_hop'] + + window_duration = (window_length / samplerate) + + remap = { + 'x': 'acc_x', + 'y': 'acc_y', + 'z': 'acc_z', + } + data = data.rename(columns=remap) + + + # convert to time-delta, if neeeded + def convert_time(data): + if pandas.api.types.is_datetime64_dtype(data.index): + data.index = data.index - data.index.min() + return data + + data = data.groupby(groups, as_index=False, group_keys=False).apply(convert_time) + + + # Setup feature extraction + extract_options = dict( + window_length=window_length, + hop_length=window_hop, + samplerate=samplerate, + ) + if features == 'timebased': + #columns = ['x', 'y', 'z'] + columns = ['acc_x', 'acc_y', 'acc_z'] + extractor = TimebasedFeatureExtractor(sensitivity=sensitivity, column_order=columns, options=extract_options) + + elif features == 'custom': + # FIXME: unhardcode path + executable = ['/home/jon/projects/emlearn/examples/motion_recognition/build/motion_preprocess'] + + columns = ['time', 'acc_x', 'acc_y', 'acc_z', 'gyro_x', 'gyro_y', 'gyro_z'] + data_columns = [ c for c in columns if not c == 'time' ] + extractor = DataProcessorProgram(program=executable, + options=extract_options, column_order=columns) + + # Feature extractor expects these to be set + data['gyro_x'] = 0.0 + data['gyro_y'] = 0.0 + data['gyro_z'] = 0.0 + else: + raise ValueError(f"Unsupported features: {features}") + features = extract_features(data, + extractor=extractor, columns=data_columns, - groupby=groups, - features=features, - sensitivity=sensitivity, - window_length=window_length, - window_hop=model_settings['window_hop'], + groupby=groups, label_column=label_column, time_column=time_column, + samplerate=samplerate, + #parallel_jobs=1, + ) + + # Attach labels + features = label_windows(data, features, + groupby=groups, + label_column=label_column, + window_duration=pandas.Timedelta(window_duration, unit='s'), ) + labeled = numpy.count_nonzero(features[label_column].notna()) feature_extraction_duration = time.time() - feature_extraction_start @@ -353,6 +589,11 @@ def run_pipeline(run, hyperparameters, dataset, duration=feature_extraction_duration, ) + #plot_timelines(data, features, groupby=groups, + # sensor_columns=data_columns, label_column=label_column) + + # FIXME: keep the windows in evaluation, only ignore for training + # Drop windows without labels features = features[features[label_column].notna()] diff --git a/examples/har_trees/plotting.py b/examples/har_trees/plotting.py new file mode 100644 index 00000000..93f6832c --- /dev/null +++ b/examples/har_trees/plotting.py @@ -0,0 +1,325 @@ + +import numpy +import pandas + +import plotly.express as px +import plotly.graph_objects as go + +from plotly.colors import qualitative + +from sklearn.preprocessing import MinMaxScaler, StandardScaler, RobustScaler + +def find_runs(labels : pandas.Series, + label_column='activity', + name='run'): + + # Detect changes + change_points = labels != labels.shift() + run_ids = change_points.cumsum() + + # Extract runs + def foo(g): + #print(g.iloc[0], g.index[0], g.index[-1]) + out = pandas.Series({ + 'label': g.iloc[0][label_column], + 'start_time': g.index[0], + 'end_time': g.index[-1], + }) + return out + + labels_df = labels.to_frame() + labels_df[name] = run_ids + runs = labels_df.groupby(name, as_index=False).apply(foo, include_groups=False) + return runs + +def get_subplot_axes(rows, cols, row, col): + subplot_num = (row - 1) * cols + col + if subplot_num == 1: + return 'x', 'y' + else: + return f'x{subplot_num}', f'y{subplot_num}' + +def add_events(fig, df, label_colors, subplot={}, cols=0, font_size=16, font_family='sans-serif'): + + xaxis, yaxis = get_subplot_axes(fig, cols=cols, **subplot) + + # Plot rectangles + for _, row in df.iterrows(): + label = row['label'] + if pandas.isna(label): + #print('Skipping', row) + continue + fig.add_shape( + type='rect', + x0=row['start_time'], + x1=row['end_time'], + y0=0, + y1=1, + xref='x', + yref=f'{yaxis} domain', + fillcolor=label_colors[row['label']], + opacity=0.3, + line_width=0, + layer='below', + **subplot, + ) + + # Optional label annotation + fig.add_annotation( + x=(row['start_time'] + (row['end_time'] - row['start_time']) / 2), + y=1.02, + text=row['label'], + showarrow=False, + xref='x', + yref=f'{yaxis} domain', + font=dict(size=font_size, family=font_family), + **subplot, + ) + +def time_ticks(times, every=30, skip_start=0): + n_times = len(times) + start = times.min() + skip_start + end = times.max() + + def minute_second_format(t : float): + """MM:SS""" + return f"{int(t//60):02}:{int(t%60):02}" + + tick_vals = numpy.arange(start, end, every) + tick_text = [ minute_second_format(t) for t in tick_vals] + + return tick_vals, tick_text + +def make_label_colors(labels) -> dict[str, object]: + + color_pool = qualitative.Set3 + qualitative.Dark24 + qualitative.Pastel1 + label_colors = dict(zip(labels, color_pool)) + assert len(labels) <= len(color_pool) + + return label_colors + +def plot_timeline(fig, df, + data: list[str], + label_colors=None, + data_colors=None, + time='time', + label='activity', + subplot={}, + cols=1, + opacity=0.5, + ): + + df = df.reset_index() + df[time] = convert_times(df[time]) + df = df.sort_values(time) + + if label_colors is None and label is not None: + label_colors = make_label_colors(df[label].unique()) + + if data_colors is None: + data_colors = make_label_colors(list(set(data))) + + if label is not None: + df = df.set_index(time) + events = find_runs(df[label], label_column=label) + events = events[~events.label.isin(['transient'])] + df = df.reset_index() + add_events(fig, events, label_colors=label_colors, subplot=subplot, cols=cols) + + # Add each axis as a line + for column in data: + y = df[column] + #print(column) + #print(df[time]) + trace = go.Scatter(x=df[time], + y=y, + mode='lines', + name=column, + line=dict(color=data_colors[column]), + opacity=opacity, + ) + fig.add_trace(trace, **subplot) + +def convert_times(times): + out = times + out = out / pandas.Timedelta(seconds=1) + out -= out.min() + return out + +def configure_xaxis(fig, times, every=60, col=1, row=1, standoff=10): + + times = convert_times(times) + + tick_vals, tick_text = time_ticks(times, skip_start=30) + + # Customize layout + fig.update_xaxes( + tickmode='array', + tickvals=tick_vals, + ticktext=tick_text, + col=col, row=row, + ) + + fig.add_annotation( + text="Time (MM:SS)", + x=0, # Left edge of plot area + y=-0.035, # Below the plot (negative moves down) + xref="x domain", # Relative to subplot domain + yref="paper", # Relative to entire figure + xanchor="left", # Left-align text + yanchor="top", # Anchor to top of text + showarrow=False, + font=dict(size=14) + ) + + +def plot_heatmap(fig, data, columns, y_labels=None, colorscale='RdBu', time='time', zmax=1.0, zmin=None, subplot={}): + + if zmin is None: + zmin = -zmax + + from plotly import graph_objects as go + + if y_labels is None: + y_labels = columns + + data[time] = convert_times(data[time]) + + x_labels = data[time] + z_data = data[columns].T.values # Transpose for proper orientation + + # Create heatmap + fig.add_heatmap( + z=z_data, + x=x_labels, + y=y_labels, + colorscale=colorscale, + showscale=False, + zmin=zmin, + zmax=zmax, + **subplot, + ) + + return fig + + +def fft_freq_from_bin(bin_idx, length, sample_rate): + """Convert FFT bin index to frequency""" + return bin_idx * sample_rate / length + +def make_timeline_plot(data, features, predictions, + colors, + class_names, + label_column, + sensor_columns=[], + line_feature_columns=[], + heatmap_feature_columns=[], + width = 1600, + aspect = 2.0, + ): + + from plotly.subplots import make_subplots + from plotting import configure_xaxis, plot_heatmap + + # Create subplots + rows = 4 + fig = make_subplots( + rows=rows, cols=1, + row_titles=('Input', 'Features', 'Predictions'), + shared_xaxes=True, + vertical_spacing=0.02, + row_heights=(0.2, 0.2, 0.6, 0.2), + ) + subplots = [ dict(col=1, row=i) for i in range(1, rows+1) ] + + # Input data + if sensor_columns: + sensor_colors = dict(zip(sensor_columns, ['red', 'green', 'blue'])) + scaled_data = data.copy() + scale_sensor = 1.0 # XXX: guessing appropriate scaling factor + scaled_data.loc[:, sensor_columns] = scaled_data.loc[:, sensor_columns] / scale_sensor + plot_timeline(fig, scaled_data, data=sensor_columns, + label=label_column, + label_colors=colors, + data_colors=sensor_colors, + subplot=subplots[0] + ) + #fig.update_yaxes(range=(-5, 5), **subplots[0], title_text="Acceleration (g)") + + # Features + if line_feature_columns: + scaler = RobustScaler(quantile_range=(5.0, 95.0), with_centering=False) + scaler.set_output(transform='pandas') + features_scaled = scaler.fit_transform(features[line_feature_columns]) + + print('features', sorted(features.columns)) + print('pf', features_scaled.head()) + + plot_timeline(fig, features, + data=line_feature_columns, + label=label_column, + label_colors=colors, + data_colors=colors, + opacity=1.0, + subplot=subplots[1]) + + + if heatmap_feature_columns: + heatmap_colorscale = 'blues' + scaler = RobustScaler(quantile_range=(5.0, 95.0), with_centering=False) + scaler.set_output(transform='pandas') + features_scaled = scaler.fit_transform(features[heatmap_feature_columns]) + plot_heatmap(fig, + features_scaled.reset_index(), + columns=heatmap_feature_columns, + #y_labels=feature_names, + subplot=subplots[2], + #zmin=0.0, zmax=3.0, + colorscale=heatmap_colorscale, + ) + + + # Predictions + if predictions is not None: + plot_timeline(fig, predictions, data=class_names, + label=label_column, + label_colors=colors, + data_colors=colors, + opacity=1.0, + subplot=subplots[3]) + fig.update_yaxes(range=(0, 1.0), title_text="Probability", **subplots[3]) + + + configure_xaxis(fig, data.index, every=60, row=rows-0) + + # Customize layout + fig.update_layout( + template='plotly_white', + ) + + fig.update_layout( + legend=dict( + orientation="h", + yanchor="top", + y=-0.03, # Adjust distance from plot + xanchor="center", + x=0.5, + font=dict(size=10), # Smaller font if needed + itemwidth=30 # Control item spacing + ), + margin=dict(b=30) # Add bottom margin for legend space + ) + + fig.update_yaxes(tickformat=".1f") + + height = int(width / aspect) + fig.update_layout( + width=width, + height=height, + margin=dict(l=10, r=10, t=10, b=10), # Fixed margins + autosize=False + ) + + return fig + +