@@ -242,24 +242,24 @@ def ensure_n_jobs(extractor, n_jobs=1):
242242 return n_jobs
243243
244244
245- def chunk_duration_to_chunk_size (chunk_duration , chunkable : "TimeSeries" ):
245+ def chunk_duration_to_chunk_size (chunk_duration , time_series : "TimeSeries" ):
246246 if isinstance (chunk_duration , float ):
247- chunk_size = int (chunk_duration * chunkable .get_sampling_frequency ())
247+ chunk_size = int (chunk_duration * time_series .get_sampling_frequency ())
248248 elif isinstance (chunk_duration , str ):
249249 if chunk_duration .endswith ("ms" ):
250250 chunk_duration = float (chunk_duration .replace ("ms" , "" )) / 1000.0
251251 elif chunk_duration .endswith ("s" ):
252252 chunk_duration = float (chunk_duration .replace ("s" , "" ))
253253 else :
254254 raise ValueError ("chunk_duration must ends with s or ms" )
255- chunk_size = int (chunk_duration * chunkable .get_sampling_frequency ())
255+ chunk_size = int (chunk_duration * time_series .get_sampling_frequency ())
256256 else :
257257 raise ValueError ("chunk_duration must be str or float" )
258258 return chunk_size
259259
260260
261261def ensure_chunk_size (
262- chunkable : "TimeSeries" ,
262+ time_series : "TimeSeries" ,
263263 total_memory = None ,
264264 chunk_size = None ,
265265 chunk_memory = None ,
@@ -299,20 +299,20 @@ def ensure_chunk_size(
299299 assert total_memory is None
300300 # set by memory per worker size
301301 chunk_memory = convert_string_to_bytes (chunk_memory )
302- chunk_size = int (chunk_memory / chunkable .get_sample_size_in_bytes ())
302+ chunk_size = int (chunk_memory / time_series .get_sample_size_in_bytes ())
303303 elif total_memory is not None :
304304 # clip by total memory size
305- n_jobs = ensure_n_jobs (chunkable , n_jobs = n_jobs )
305+ n_jobs = ensure_n_jobs (time_series , n_jobs = n_jobs )
306306 total_memory = convert_string_to_bytes (total_memory )
307- chunk_size = int (total_memory / (chunkable .get_sample_size_in_bytes () * n_jobs ))
307+ chunk_size = int (total_memory / (time_series .get_sample_size_in_bytes () * n_jobs ))
308308 elif chunk_duration is not None :
309- chunk_size = chunk_duration_to_chunk_size (chunk_duration , chunkable )
309+ chunk_size = chunk_duration_to_chunk_size (chunk_duration , time_series )
310310 else :
311311 # Edge case to define single chunk per segment for n_jobs=1.
312312 # All chunking parameters equal None mean single chunk per segment
313313 if n_jobs == 1 :
314- num_segments = chunkable .get_num_segments ()
315- samples_in_larger_segment = max ([chunkable .get_num_samples (segment ) for segment in range (num_segments )])
314+ num_segments = time_series .get_num_segments ()
315+ samples_in_larger_segment = max ([time_series .get_num_samples (segment ) for segment in range (num_segments )])
316316 chunk_size = samples_in_larger_segment
317317 else :
318318 raise ValueError ("For n_jobs >1 you must specify total_memory or chunk_size or chunk_memory" )
@@ -322,7 +322,7 @@ def ensure_chunk_size(
322322
323323class TimeSeriesChunkExecutor :
324324 """
325- Core class for parallel processing to run a "function" over chunks on a chunkable extractor.
325+ Core class for parallel processing to run a "function" over chunks on a time_series extractor.
326326
327327 It supports running a function:
328328 * in loop with chunk processing (low RAM usage)
@@ -334,8 +334,8 @@ class TimeSeriesChunkExecutor:
334334
335335 Parameters
336336 ----------
337- chunkable : TimeSeries
338- The chunkable object to be processed.
337+ time_series : TimeSeries
338+ The time_series object to be processed.
339339 func : function
340340 Function that runs on each chunk
341341 init_func : function
@@ -383,7 +383,7 @@ class TimeSeriesChunkExecutor:
383383
384384 def __init__ (
385385 self ,
386- chunkable : "TimeSeries" ,
386+ time_series : "TimeSeries" ,
387387 func ,
388388 init_func ,
389389 init_args ,
@@ -402,7 +402,7 @@ def __init__(
402402 max_threads_per_worker = 1 ,
403403 need_worker_index = False ,
404404 ):
405- self .chunkable = chunkable
405+ self .time_series = time_series
406406 self .func = func
407407 self .init_func = init_func
408408 self .init_args = init_args
@@ -421,7 +421,7 @@ def __init__(
421421 else :
422422 mp_context = "spawn"
423423
424- preferred_mp_context = chunkable .get_preferred_mp_context ()
424+ preferred_mp_context = time_series .get_preferred_mp_context ()
425425 if preferred_mp_context is not None and preferred_mp_context != mp_context :
426426 warnings .warn (
427427 f"Your processing chain using pool_engine='process' and mp_context='{ mp_context } ' is not possible."
@@ -437,7 +437,7 @@ def __init__(
437437 self .handle_returns = handle_returns
438438 self .gather_func = gather_func
439439
440- self .n_jobs = ensure_n_jobs (self .chunkable , n_jobs = n_jobs )
440+ self .n_jobs = ensure_n_jobs (self .time_series , n_jobs = n_jobs )
441441 self .chunk_size = self .ensure_chunk_size (
442442 total_memory = total_memory ,
443443 chunk_size = chunk_size ,
@@ -455,7 +455,7 @@ def __init__(
455455 if verbose :
456456 chunk_memory = self .get_chunk_memory ()
457457 total_memory = chunk_memory * self .n_jobs
458- chunk_duration = self .chunk_size / chunkable .sampling_frequency
458+ chunk_duration = self .chunk_size / time_series .sampling_frequency
459459 chunk_memory_str = convert_bytes_to_str (chunk_memory )
460460 total_memory_str = convert_bytes_to_str (total_memory )
461461 chunk_duration_str = convert_seconds_to_str (chunk_duration )
@@ -471,13 +471,13 @@ def __init__(
471471 )
472472
473473 def get_chunk_memory (self ):
474- return self .chunk_size * self .chunkable .get_sample_size_in_bytes ()
474+ return self .chunk_size * self .time_series .get_sample_size_in_bytes ()
475475
476476 def ensure_chunk_size (
477477 self , total_memory = None , chunk_size = None , chunk_memory = None , chunk_duration = None , n_jobs = 1 , ** other_kwargs
478478 ):
479479 return ensure_chunk_size (
480- self .chunkable , total_memory , chunk_size , chunk_memory , chunk_duration , n_jobs , ** other_kwargs
480+ self .time_series , total_memory , chunk_size , chunk_memory , chunk_duration , n_jobs , ** other_kwargs
481481 )
482482
483483 def run (self , slices = None ):
@@ -487,7 +487,7 @@ def run(self, slices=None):
487487
488488 if slices is None :
489489 # TODO: rename
490- slices = divide_time_series_into_chunks (self .chunkable , self .chunk_size )
490+ slices = divide_time_series_into_chunks (self .time_series , self .chunk_size )
491491
492492 if self .handle_returns :
493493 returns = []
0 commit comments