|
| 1 | +import argparse |
| 2 | +import math |
| 3 | +import time |
| 4 | + |
| 5 | +import matplotlib |
| 6 | + |
| 7 | +matplotlib.use('Agg') |
| 8 | +import matplotlib.pyplot as plt |
| 9 | +import numpy as np |
| 10 | + |
| 11 | +from brainflow.board_shim import BoardShim, BrainFlowInputParams, BoardIds, BrainFlowPresets |
| 12 | + |
| 13 | + |
| 14 | +def print_summary(optics_data, timestamps, sampling_rate, channel_labels): |
| 15 | + print(f'optics samples: {optics_data.shape[1]}') |
| 16 | + |
| 17 | + if timestamps.size > 1 and np.all(np.isfinite(timestamps)): |
| 18 | + timestamp_diffs = np.diff(timestamps) |
| 19 | + timestamp_diffs = timestamp_diffs[timestamp_diffs > 0] |
| 20 | + if timestamp_diffs.size > 0: |
| 21 | + actual_rate = 1.0 / np.median(timestamp_diffs) |
| 22 | + duration = timestamps[-1] - timestamps[0] |
| 23 | + print(f'timestamp duration: {duration:.3f} sec') |
| 24 | + print(f'timestamp sampling rate: {actual_rate:.2f} Hz, expected: {sampling_rate} Hz') |
| 25 | + |
| 26 | + nan_count = int(np.isnan(optics_data).sum()) |
| 27 | + print(f'nan values: {nan_count}') |
| 28 | + |
| 29 | + for label, row in zip(channel_labels, optics_data): |
| 30 | + finite = row[np.isfinite(row)] |
| 31 | + if finite.size == 0: |
| 32 | + print(f'{label}: no finite values') |
| 33 | + continue |
| 34 | + stddev = float(np.std(finite)) |
| 35 | + print( |
| 36 | + f'{label}: min={np.min(finite):.3f}, max={np.max(finite):.3f}, ' |
| 37 | + f'mean={np.mean(finite):.3f}, std={stddev:.3f}' |
| 38 | + ) |
| 39 | + if stddev < 1e-9: |
| 40 | + print(f'{label}: flat signal') |
| 41 | + |
| 42 | + |
| 43 | +def plot_optics(optics_data, timestamps, sampling_rate, output_file, channel_labels): |
| 44 | + if timestamps.size == optics_data.shape[1] and timestamps.size > 0 and np.all(np.isfinite(timestamps)): |
| 45 | + x_axis = timestamps - timestamps[0] |
| 46 | + x_label = 'Time, sec' |
| 47 | + else: |
| 48 | + x_axis = np.arange(optics_data.shape[1]) / sampling_rate |
| 49 | + x_label = 'Samples' |
| 50 | + |
| 51 | + num_channels = optics_data.shape[0] |
| 52 | + cols = min(4, num_channels) |
| 53 | + rows = math.ceil(num_channels / cols) |
| 54 | + fig, axes = plt.subplots(rows, cols, figsize=(4 * cols, 2.4 * rows), sharex=True) |
| 55 | + axes = np.atleast_1d(axes).reshape(-1) |
| 56 | + |
| 57 | + for index, axis in enumerate(axes): |
| 58 | + if index < num_channels: |
| 59 | + axis.plot(x_axis, optics_data[index]) |
| 60 | + axis.set_title(channel_labels[index]) |
| 61 | + axis.grid(True) |
| 62 | + else: |
| 63 | + axis.axis('off') |
| 64 | + |
| 65 | + for axis in axes[-cols:]: |
| 66 | + axis.set_xlabel(x_label) |
| 67 | + |
| 68 | + fig.tight_layout() |
| 69 | + fig.savefig(output_file, dpi=150) |
| 70 | + print(f'wrote plot: {output_file}') |
| 71 | + |
| 72 | + |
| 73 | +def main(): |
| 74 | + BoardShim.enable_board_logger() |
| 75 | + |
| 76 | + parser = argparse.ArgumentParser() |
| 77 | + parser.add_argument('--duration', type=int, required=False, default=20) |
| 78 | + parser.add_argument('--mac-address', type=str, required=False, default='') |
| 79 | + parser.add_argument('--serial-number', type=str, required=False, default='') |
| 80 | + parser.add_argument('--timeout', type=int, required=False, default=0) |
| 81 | + parser.add_argument('--other-info', type=str, required=False, default='preset=p1035;low_latency=true') |
| 82 | + parser.add_argument('--output-file', type=str, required=False, default='muse_anthena_optics_p1035.png') |
| 83 | + args = parser.parse_args() |
| 84 | + |
| 85 | + params = BrainFlowInputParams() |
| 86 | + params.mac_address = args.mac_address |
| 87 | + params.serial_number = args.serial_number |
| 88 | + params.timeout = args.timeout |
| 89 | + params.other_info = args.other_info |
| 90 | + |
| 91 | + board_id = BoardIds.MUSE_S_ANTHENA_BOARD.value |
| 92 | + preset = BrainFlowPresets.ANCILLARY_PRESET |
| 93 | + optical_channels = BoardShim.get_optical_channels(board_id, preset) |
| 94 | + timestamp_channel = BoardShim.get_timestamp_channel(board_id, preset) |
| 95 | + sampling_rate = BoardShim.get_sampling_rate(board_id, preset) |
| 96 | + |
| 97 | + board = BoardShim(board_id, params) |
| 98 | + try: |
| 99 | + board.prepare_session() |
| 100 | + board.start_stream() |
| 101 | + try: |
| 102 | + time.sleep(args.duration) |
| 103 | + data = board.get_board_data(preset=preset) |
| 104 | + finally: |
| 105 | + board.stop_stream() |
| 106 | + finally: |
| 107 | + if board.is_prepared(): |
| 108 | + board.release_session() |
| 109 | + |
| 110 | + optics_data = data[optical_channels, :] |
| 111 | + timestamps = data[timestamp_channel, :] |
| 112 | + channel_labels = [f'Optics {channel}' for channel in optical_channels] |
| 113 | + print_summary(optics_data, timestamps, sampling_rate, channel_labels) |
| 114 | + plot_optics(optics_data, timestamps, sampling_rate, args.output_file, channel_labels) |
| 115 | + |
| 116 | + |
| 117 | +if __name__ == '__main__': |
| 118 | + main() |
0 commit comments