@@ -342,6 +342,39 @@ impl LogicalMeterActor {
342342 }
343343 }
344344
345+ async fn start_resamplers (
346+ & mut self ,
347+ components : & HashSet < u64 > ,
348+ metric : Metric ,
349+ resamplers : & mut HashMap < ( u64 , Metric ) , ComponentDataResampler > ,
350+ ) -> Result < ( ) , Error > {
351+ for component_id in components {
352+ let resampler_key = & ( * component_id, metric) ;
353+ if resamplers. contains_key ( resampler_key) {
354+ continue ;
355+ }
356+ let resampler = ComponentDataResampler {
357+ component_id : * component_id,
358+ metric,
359+ resampler : frequenz_resampling:: Resampler :: new (
360+ self . config . resampling_interval ,
361+ ResamplingFunction :: Average ,
362+ 3 ,
363+ Utc :: now ( )
364+ . with_nanosecond ( 0 )
365+ . ok_or_else ( || Error :: chrono_error ( "Failed to get current time." ) ) ?,
366+ false ,
367+ ) ,
368+ receiver : self
369+ . client
370+ . receive_electrical_component_telemetry_stream ( * component_id)
371+ . await ?,
372+ } ;
373+ resamplers. insert ( * resampler_key, resampler) ;
374+ }
375+ Ok ( ( ) )
376+ }
377+
345378 /// Handles SubscribeFormula instructions.
346379 ///
347380 /// If the formula already exists, it sends the existing receiver to the
@@ -372,30 +405,8 @@ impl LogicalMeterActor {
372405 . map_err ( |e| Error :: formula_engine_error ( format ! ( "Failed to parse formula: {e}" ) ) ) ?;
373406 let ( sender, receiver) = broadcast:: channel ( 8 ) ;
374407
375- for component_id in formula_engine. components ( ) {
376- let resampler_key = ( * component_id, metric) ;
377- if resamplers. contains_key ( & resampler_key) {
378- continue ;
379- }
380- let resampler = ComponentDataResampler {
381- component_id : * component_id,
382- metric,
383- resampler : frequenz_resampling:: Resampler :: new (
384- self . config . resampling_interval ,
385- ResamplingFunction :: Average ,
386- 3 ,
387- Utc :: now ( )
388- . with_nanosecond ( 0 )
389- . ok_or_else ( || Error :: chrono_error ( "Failed to get current time." ) ) ?,
390- false ,
391- ) ,
392- receiver : self
393- . client
394- . receive_electrical_component_telemetry_stream ( * component_id)
395- . await ?,
396- } ;
397- resamplers. insert ( resampler_key, resampler) ;
398- }
408+ self . start_resamplers ( formula_engine. components ( ) , metric, resamplers)
409+ . await ?;
399410
400411 formulas. insert (
401412 formula_key,
0 commit comments