Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/spikeinterface/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,14 @@
get_best_job_kwargs,
ensure_n_jobs,
ensure_chunk_size,
ChunkExecutor,
TimeSeriesChunkExecutor,
split_job_kwargs,
fix_job_kwargs,
)
from .recording_tools import (
write_binary_recording,
write_memory_recording,
write_recording_to_zarr,
write_to_h5_dataset_format,
get_random_data_chunks,
get_channel_distances,
Expand Down
8 changes: 4 additions & 4 deletions src/spikeinterface/core/baserecording.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import numpy as np
from probeinterface import read_probeinterface, write_probeinterface

from .chunkable import ChunkableSegment, ChunkableMixin
from .time_series import TimeSeriesSegment, TimeSeries
from .baserecordingsnippets import BaseRecordingSnippets
from .core_tools import convert_bytes_to_str, convert_seconds_to_str
from .job_tools import split_job_kwargs


class BaseRecording(BaseRecordingSnippets, ChunkableMixin):
class BaseRecording(BaseRecordingSnippets, TimeSeries):
"""
Abstract class representing several a multichannel timeseries (or block of raw ephys traces).
Internally handle list of RecordingSegment
Expand Down Expand Up @@ -311,7 +311,7 @@ def _save(self, format="binary", verbose: bool = False, **save_kwargs):
kwargs, job_kwargs = split_job_kwargs(save_kwargs)

if format == "binary":
from .chunkable_tools import write_binary
from .time_series_tools import write_binary

folder = kwargs["folder"]
file_paths = [folder / f"traces_cached_seg{i}.raw" for i in range(self.get_num_segments())]
Expand Down Expand Up @@ -637,7 +637,7 @@ def astype(self, dtype, round: bool | None = None):
return astype(self, dtype=dtype, round=round)


class BaseRecordingSegment(ChunkableSegment):
class BaseRecordingSegment(TimeSeriesSegment):
"""
Abstract class representing a multichannel timeseries, or block of raw ephys traces
"""
Expand Down
14 changes: 7 additions & 7 deletions src/spikeinterface/core/job_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def divide_segment_into_chunks(num_frames, chunk_size):
return chunks


def divide_chunkable_into_chunks(recording, chunk_size):
def divide_time_series_into_chunks(recording, chunk_size):
slices = []
for segment_index in range(recording.get_num_segments()):
num_frames = recording.get_num_samples(segment_index)
Expand Down Expand Up @@ -242,7 +242,7 @@ def ensure_n_jobs(extractor, n_jobs=1):
return n_jobs


def chunk_duration_to_chunk_size(chunk_duration, chunkable: "ChunkableMixin"):
def chunk_duration_to_chunk_size(chunk_duration, chunkable: "TimeSeries"):
if isinstance(chunk_duration, float):
chunk_size = int(chunk_duration * chunkable.get_sampling_frequency())
elif isinstance(chunk_duration, str):
Expand All @@ -259,7 +259,7 @@ def chunk_duration_to_chunk_size(chunk_duration, chunkable: "ChunkableMixin"):


def ensure_chunk_size(
chunkable: "ChunkableMixin",
chunkable: "TimeSeries",
total_memory=None,
chunk_size=None,
chunk_memory=None,
Expand Down Expand Up @@ -320,7 +320,7 @@ def ensure_chunk_size(
return chunk_size


class ChunkExecutor:
class TimeSeriesChunkExecutor:
"""
Core class for parallel processing to run a "function" over chunks on a chunkable extractor.

Expand All @@ -334,7 +334,7 @@ class ChunkExecutor:

Parameters
----------
chunkable : ChunkableMixin
chunkable : TimeSeries
The chunkable object to be processed.
func : function
Function that runs on each chunk
Expand Down Expand Up @@ -383,7 +383,7 @@ class ChunkExecutor:

def __init__(
self,
chunkable: "ChunkableMixin",
chunkable: "TimeSeries",
func,
init_func,
init_args,
Expand Down Expand Up @@ -487,7 +487,7 @@ def run(self, slices=None):

if slices is None:
# TODO: rename
slices = divide_chunkable_into_chunks(self.chunkable, self.chunk_size)
slices = divide_time_series_into_chunks(self.chunkable, self.chunk_size)

if self.handle_returns:
returns = []
Expand Down
14 changes: 7 additions & 7 deletions src/spikeinterface/core/node_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import numpy as np

from spikeinterface.core.base import base_peak_dtype, spike_peak_dtype
from spikeinterface.core.chunkable import ChunkableMixin
from spikeinterface.core.time_series import TimeSeries
from spikeinterface.core import BaseRecording, get_chunk_with_margin
from spikeinterface.core.job_tools import ChunkExecutor, fix_job_kwargs, _shared_job_kwargs_doc
from spikeinterface.core.job_tools import TimeSeriesChunkExecutor, fix_job_kwargs, _shared_job_kwargs_doc
from spikeinterface.core import get_channel_distances
from spikeinterface.core.core_tools import ms_to_samples

Expand All @@ -26,7 +26,7 @@ class PipelineNode:

def __init__(
self,
chunkable: ChunkableMixin,
chunkable: TimeSeries,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
chunkable: TimeSeries,
time_series: TimeSeries,

?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return_output: bool | tuple[bool] = True,
parents: list[Type["PipelineNode"]] | None = None,
):
Expand All @@ -38,7 +38,7 @@ def __init__(

Parameters
----------
chunkable : ChunkableMixin
chunkable : TimeSeries
The chunkable object.
return_output : bool or tuple[bool], default: True
Whether or not the output of the node is returned by the pipeline.
Expand Down Expand Up @@ -526,7 +526,7 @@ def check_graph(nodes, check_for_peak_source=True):


def run_node_pipeline(
chunkable: ChunkableMixin,
chunkable: TimeSeries,
nodes: list[PipelineNode],
job_kwargs: dict,
job_name: str = "pipeline",
Expand Down Expand Up @@ -566,7 +566,7 @@ def run_node_pipeline(

Parameters
----------
chunkable: ChunkableMixin
chunkable: TimeSeries
The chunkable object to run the pipeline on. This is typically a recording but it can be anything that have the
same interface for getting chunks with margin.
nodes: a list of PipelineNode
Expand Down Expand Up @@ -628,7 +628,7 @@ def run_node_pipeline(

init_args = (chunkable, nodes, skip_after_n_peaks_per_worker)

processor = ChunkExecutor(
processor = TimeSeriesChunkExecutor(
chunkable,
_compute_peak_pipeline_chunk,
_init_peak_pipeline,
Expand Down
165 changes: 159 additions & 6 deletions src/spikeinterface/core/recording_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,169 @@
ensure_chunk_size,
divide_segment_into_chunks,
fix_job_kwargs,
ChunkExecutor,
TimeSeriesChunkExecutor,
_shared_job_kwargs_doc,
split_job_kwargs,
)

from .chunkable_tools import get_random_sample_slices, get_chunks, get_chunk_with_margin
from .time_series_tools import get_random_sample_slices, get_chunks, get_chunk_with_margin
from .time_series_tools import write_binary as _write_binary
from .time_series_tools import write_memory as _write_memory
from .time_series_tools import _write_time_series_to_zarr

# for back-compatibility imports
from .chunkable_tools import write_binary as write_binary_recording
from .chunkable_tools import write_memory as write_memory_recording

def write_binary_recording(
recording,
file_paths,
file_timestamps_paths=None,
dtype=None,
add_file_extension=True,
byte_offset=0,
verbose=False,
**job_kwargs,
):
"""
Save the traces of a recording to binary format.

Parameters
----------
recording : BaseRecording
The recording to save to binary file.
file_paths : list[Path | str] | Path | str
The path to the files to save data for each segment.
file_timestamps_paths : list[Path | str] | Path | str | None, default: None
The path to the timestamps file. If None, timestamps are not saved.
dtype : dtype or None, default: None
Type of the saved data.
add_file_extension : bool, default: True
If True, and the file path does not end in "raw", "bin", or "dat" then "raw" is added as an extension.
byte_offset : int, default: 0
Offset in bytes for the binary file (e.g. to write a header).
verbose : bool, default: False
Verbosity of the chunk executor.
{}
"""
return _write_binary(
recording,
file_paths=file_paths,
file_timestamps_paths=file_timestamps_paths,
dtype=dtype,
add_file_extension=add_file_extension,
byte_offset=byte_offset,
verbose=verbose,
**job_kwargs,
)


write_binary_recording.__doc__ = write_binary_recording.__doc__.format(_shared_job_kwargs_doc)


def write_memory_recording(
recording,
dtype=None,
verbose=False,
buffer_type="auto",
job_name="write_memory",
**job_kwargs,
):
"""
Save the traces of a recording into numpy arrays in memory.

Uses SharedMemory when ``n_jobs > 1``.

Parameters
----------
recording : BaseRecording
The recording to save to memory.
dtype : dtype, default: None
Type of the saved data.
verbose : bool, default: False
If True, output is verbose (when chunks are used).
buffer_type : "auto" | "numpy" | "sharedmem", default: "auto"
The type of buffer to use for storing the data.
job_name : str, default: "write_memory"
Name of the job.
{}

Returns
-------
arrays : list
One array per segment.
"""
return _write_memory(
recording,
dtype=dtype,
verbose=verbose,
buffer_type=buffer_type,
job_name=job_name,
**job_kwargs,
)


write_memory_recording.__doc__ = write_memory_recording.__doc__.format(_shared_job_kwargs_doc)


def write_recording_to_zarr(
recording,
zarr_group,
dataset_paths,
dataset_timestamps_paths=None,
extra_chunks=None,
dtype=None,
compressor_data=None,
filters_data=None,
compressor_times=None,
filters_times=None,
verbose=False,
**job_kwargs,
):
"""
Save the traces of a recording to zarr format.

Parameters
----------
recording : BaseRecording
The recording to save in zarr format.
zarr_group : zarr.Group
The zarr group to add traces to.
dataset_paths : list
List of paths to traces datasets in the zarr group.
dataset_timestamps_paths : list or None, default: None
List of paths to timestamps datasets in the zarr group. If None, timestamps are not saved.
extra_chunks : tuple or None, default: None
Extra chunking dimensions to use for the zarr dataset. The first dimension is always time and
controlled by the job_kwargs. Useful to chunk by channel, with ``extra_chunks=(channel_chunk_size,)``.
dtype : dtype, default: None
Type of the saved data.
compressor_data : zarr compressor or None, default: None
Zarr compressor for data.
filters_data : list, default: None
List of zarr filters for data.
compressor_times : zarr compressor or None, default: None
Zarr compressor for timestamps.
filters_times : list, default: None
List of zarr filters for timestamps.
verbose : bool, default: False
If True, output is verbose (when chunks are used).
{}
"""
return _write_time_series_to_zarr(
recording,
zarr_group=zarr_group,
dataset_paths=dataset_paths,
dataset_timestamps_paths=dataset_timestamps_paths,
extra_chunks=extra_chunks,
dtype=dtype,
compressor_data=compressor_data,
filters_data=filters_data,
compressor_times=compressor_times,
filters_times=filters_times,
verbose=verbose,
**job_kwargs,
)


write_recording_to_zarr.__doc__ = write_recording_to_zarr.__doc__.format(_shared_job_kwargs_doc)


def read_binary_recording(file, num_channels, dtype, time_axis=0, offset=0):
Expand Down Expand Up @@ -458,7 +611,7 @@ def append_noise_chunk(res):
func = _noise_level_chunk
init_func = _noise_level_chunk_init
init_args = (recording, return_in_uV, method)
executor = ChunkExecutor(
executor = TimeSeriesChunkExecutor(
recording,
func,
init_func,
Expand Down
Loading
Loading