@@ -54,16 +54,21 @@ pub struct MockMicrogridApiClient {
5454 clock : TokioSyncedClock ,
5555}
5656
57+ /// One row per emitted telemetry frame: `(power, reactive_power, voltage,
58+ /// current)`. Each field is independently optional so individual metrics
59+ /// can be omitted from a frame.
60+ pub type MockMetricRow = (
61+ Option < Power > ,
62+ Option < ReactivePower > ,
63+ Option < Voltage > ,
64+ Option < Current > ,
65+ ) ;
66+
5767#[ derive( Default , Debug , Clone ) ]
5868pub struct MockComponent {
5969 pub component : ElectricalComponent ,
6070 pub children : Vec < MockComponent > ,
61- pub metrics : Vec < (
62- Option < Power > ,
63- Option < ReactivePower > ,
64- Option < Voltage > ,
65- Option < Current > ,
66- ) > ,
71+ pub metrics : Vec < MockMetricRow > ,
6772 /// Overrides the state code reported in each telemetry sample. `None`
6873 /// defaults to `Ready`.
6974 state_code : Option < ElectricalComponentStateCode > ,
@@ -173,7 +178,7 @@ impl MockComponent {
173178 if self . component . category == ElectricalComponentCategory :: Unspecified as i32 {
174179 panic ! ( "Cannot add children to a hidden load component" ) ;
175180 }
176- self . children . extend ( children. into_iter ( ) ) ;
181+ self . children . extend ( children) ;
177182 self
178183 }
179184
@@ -356,134 +361,134 @@ impl MicrogridApiClient for MockMicrogridApiClient {
356361 . find ( |c| c. component . id == comp_id)
357362 . cloned ( ) ;
358363
359- if let Some ( component) = component {
360- if !component. metrics . is_empty ( ) {
361- let metrics = component . metrics . clone ( ) ;
362- let state_code = component
363- . state_code
364- . unwrap_or ( ElectricalComponentStateCode :: Ready ) ;
365- let silence_after_metrics = component . silence_after_metrics ;
366- let clock = self . clock . clone ( ) ;
367- tokio :: spawn ( async move {
368- let dur = std :: time :: Duration :: from_millis ( 200 ) ;
369- let mut interval = tokio :: time:: interval ( dur ) ;
370- let offset = chrono :: TimeDelta :: from_std ( dur) . unwrap_or_default ( ) ;
371-
372- for metrics in metrics . iter ( ) {
373- interval . tick ( ) . await ;
374- // `tokio::time:: interval`'s first tick fires
375- // immediately, so `clock.wall_now()` is still the
376- // anchor here. Add one interval so the first sample
377- // is timestamped at `anchor + dur`, matching the
378- // resampler's first interval boundary.
379- let wall = clock . wall_now ( ) + offset ;
380- let sys_delta =
381- wall . signed_duration_since ( chrono :: DateTime :: < chrono :: Utc > :: UNIX_EPOCH ) ;
382- let next_ts = SystemTime :: UNIX_EPOCH
383- + std :: time :: Duration :: from_nanos (
384- sys_delta . num_nanoseconds ( ) . unwrap_or ( 0 ) . max ( 0 ) as u64 ,
385- ) ;
386- let duration_since_epoch =
387- next_ts . duration_since ( SystemTime :: UNIX_EPOCH ) . unwrap ( ) ;
388- let ts = Some ( protobuf :: Timestamp {
389- seconds : duration_since_epoch . as_secs ( ) as i64 ,
390- nanos : duration_since_epoch. subsec_nanos ( ) as i32 ,
391- } ) ;
392- let mut metric_samples = vec ! [ ] ;
393- if let Some ( power ) = metrics . 0 {
394- metric_samples . push ( MetricSample {
395- sample_time : ts . clone ( ) ,
396- metric : Metric :: AcPowerActive as i32 ,
397- value : Some ( MetricValueVariant {
398- metric_value_variant : Some (
399- metric_value_variant:: MetricValueVariant :: SimpleMetric (
400- SimpleMetricValue {
401- value : power . as_watts ( ) ,
402- } ,
403- ) ,
364+ if let Some ( component) = component
365+ && !component. metrics . is_empty ( )
366+ {
367+ let metrics = component. metrics . clone ( ) ;
368+ let state_code = component
369+ . state_code
370+ . unwrap_or ( ElectricalComponentStateCode :: Ready ) ;
371+ let silence_after_metrics = component . silence_after_metrics ;
372+ let clock = self . clock . clone ( ) ;
373+ tokio :: spawn ( async move {
374+ let dur = std :: time:: Duration :: from_millis ( 200 ) ;
375+ let mut interval = tokio :: time :: interval ( dur) ;
376+ let offset = chrono :: TimeDelta :: from_std ( dur ) . unwrap_or_default ( ) ;
377+
378+ for metrics in metrics . iter ( ) {
379+ interval. tick ( ) . await ;
380+ // `tokio::time::interval`'s first tick fires
381+ // immediately, so `clock.wall_now()` is still the
382+ // anchor here. Add one interval so the first sample
383+ // is timestamped at `anchor + dur`, matching the
384+ // resampler's first interval boundary.
385+ let wall = clock . wall_now ( ) + offset ;
386+ let sys_delta =
387+ wall . signed_duration_since ( chrono :: DateTime :: < chrono :: Utc > :: UNIX_EPOCH ) ;
388+ let next_ts = SystemTime :: UNIX_EPOCH
389+ + std :: time :: Duration :: from_nanos (
390+ sys_delta . num_nanoseconds ( ) . unwrap_or ( 0 ) . max ( 0 ) as u64 ,
391+ ) ;
392+ let duration_since_epoch =
393+ next_ts . duration_since ( SystemTime :: UNIX_EPOCH ) . unwrap ( ) ;
394+ let ts = Some ( protobuf :: Timestamp {
395+ seconds : duration_since_epoch. as_secs ( ) as i64 ,
396+ nanos : duration_since_epoch . subsec_nanos ( ) as i32 ,
397+ } ) ;
398+ let mut metric_samples = vec ! [ ] ;
399+ if let Some ( power ) = metrics . 0 {
400+ metric_samples . push ( MetricSample {
401+ sample_time : ts ,
402+ metric : Metric :: AcPowerActive as i32 ,
403+ value : Some ( MetricValueVariant {
404+ metric_value_variant : Some (
405+ metric_value_variant :: MetricValueVariant :: SimpleMetric (
406+ SimpleMetricValue {
407+ value : power . as_watts ( ) ,
408+ } ,
404409 ) ,
405- } ) ,
406- bounds : vec ! [ ] ,
407- connection : None ,
408- } ) ;
409- }
410- if let Some ( reactive_power ) = metrics . 1 {
411- metric_samples . push ( MetricSample {
412- sample_time : ts . clone ( ) ,
413- metric : Metric :: AcPowerReactive as i32 ,
414- value : Some ( MetricValueVariant {
415- metric_value_variant : Some (
416- metric_value_variant:: MetricValueVariant :: SimpleMetric (
417- SimpleMetricValue {
418- value : reactive_power . as_volt_amperes_reactive ( ) ,
419- } ,
420- ) ,
410+ ) ,
411+ } ) ,
412+ bounds : vec ! [ ] ,
413+ connection : None ,
414+ } ) ;
415+ }
416+ if let Some ( reactive_power ) = metrics . 1 {
417+ metric_samples . push ( MetricSample {
418+ sample_time : ts ,
419+ metric : Metric :: AcPowerReactive as i32 ,
420+ value : Some ( MetricValueVariant {
421+ metric_value_variant : Some (
422+ metric_value_variant :: MetricValueVariant :: SimpleMetric (
423+ SimpleMetricValue {
424+ value : reactive_power . as_volt_amperes_reactive ( ) ,
425+ } ,
421426 ) ,
422- } ) ,
423- bounds : vec ! [ ] ,
424- connection : None ,
425- } ) ;
426- }
427- if let Some ( voltage ) = metrics . 2 {
428- metric_samples . push ( MetricSample {
429- sample_time : ts . clone ( ) ,
430- metric : Metric :: AcVoltage as i32 ,
431- value : Some ( MetricValueVariant {
432- metric_value_variant : Some (
433- metric_value_variant:: MetricValueVariant :: SimpleMetric (
434- SimpleMetricValue {
435- value : voltage . as_volts ( ) ,
436- } ,
437- ) ,
427+ ) ,
428+ } ) ,
429+ bounds : vec ! [ ] ,
430+ connection : None ,
431+ } ) ;
432+ }
433+ if let Some ( voltage ) = metrics . 2 {
434+ metric_samples . push ( MetricSample {
435+ sample_time : ts ,
436+ metric : Metric :: AcVoltage as i32 ,
437+ value : Some ( MetricValueVariant {
438+ metric_value_variant : Some (
439+ metric_value_variant :: MetricValueVariant :: SimpleMetric (
440+ SimpleMetricValue {
441+ value : voltage . as_volts ( ) ,
442+ } ,
438443 ) ,
439- } ) ,
440- bounds : vec ! [ ] ,
441- connection : None ,
442- } ) ;
443- }
444- if let Some ( current ) = metrics . 3 {
445- metric_samples . push ( MetricSample {
446- sample_time : ts . clone ( ) ,
447- metric : Metric :: AcCurrent as i32 ,
448- value : Some ( MetricValueVariant {
449- metric_value_variant : Some (
450- metric_value_variant:: MetricValueVariant :: SimpleMetric (
451- SimpleMetricValue {
452- value : current . as_amperes ( ) ,
453- } ,
454- ) ,
444+ ) ,
445+ } ) ,
446+ bounds : vec ! [ ] ,
447+ connection : None ,
448+ } ) ;
449+ }
450+ if let Some ( current ) = metrics . 3 {
451+ metric_samples . push ( MetricSample {
452+ sample_time : ts ,
453+ metric : Metric :: AcCurrent as i32 ,
454+ value : Some ( MetricValueVariant {
455+ metric_value_variant : Some (
456+ metric_value_variant :: MetricValueVariant :: SimpleMetric (
457+ SimpleMetricValue {
458+ value : current . as_amperes ( ) ,
459+ } ,
455460 ) ,
456- } ) ,
457- bounds : vec ! [ ] ,
458- connection : None ,
459- } ) ;
460- }
461-
462- let resp = ReceiveElectricalComponentTelemetryStreamResponse {
463- telemetry : Some ( ElectricalComponentTelemetry {
464- electrical_component_id : comp_id,
465- metric_samples,
466- // TODO: support sending errors
467- state_snapshots : vec ! [ ElectricalComponentStateSnapshot {
468- origin_time: ts,
469- states: vec![ state_code as i32 ] ,
470- warnings: vec![ ] ,
471- errors: vec![ ] ,
472- } ] ,
461+ ) ,
473462 } ) ,
474- } ;
475- if tx. send ( Ok ( resp) ) . await . is_err ( ) {
476- break ;
477- }
463+ bounds : vec ! [ ] ,
464+ connection : None ,
465+ } ) ;
478466 }
479- if silence_after_metrics {
480- // Hold the sender open indefinitely so the client
481- // actor doesn't see the stream end and reconnect.
482- let _keep_open = tx;
483- std:: future:: pending :: < ( ) > ( ) . await ;
467+
468+ let resp = ReceiveElectricalComponentTelemetryStreamResponse {
469+ telemetry : Some ( ElectricalComponentTelemetry {
470+ electrical_component_id : comp_id,
471+ metric_samples,
472+ // TODO: support sending errors
473+ state_snapshots : vec ! [ ElectricalComponentStateSnapshot {
474+ origin_time: ts,
475+ states: vec![ state_code as i32 ] ,
476+ warnings: vec![ ] ,
477+ errors: vec![ ] ,
478+ } ] ,
479+ } ) ,
480+ } ;
481+ if tx. send ( Ok ( resp) ) . await . is_err ( ) {
482+ break ;
484483 }
485- } ) ;
486- }
484+ }
485+ if silence_after_metrics {
486+ // Hold the sender open indefinitely so the client
487+ // actor doesn't see the stream end and reconnect.
488+ let _keep_open = tx;
489+ std:: future:: pending :: < ( ) > ( ) . await ;
490+ }
491+ } ) ;
487492 }
488493
489494 let stream = tokio_stream:: wrappers:: ReceiverStream :: new ( rx) ;
0 commit comments