From a32893c6e2d62f37e165d0ff77ac2413bca91819 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Fri, 8 Aug 2025 15:43:57 +0200 Subject: [PATCH 1/4] Upgrade to microgrid API 0.18 Signed-off-by: Sahas Subramanian --- Cargo.toml | 2 +- build.rs | 2 +- examples/logical_meter.rs | 12 +- src/client/instruction.rs | 10 +- src/client/microgrid_client_actor.rs | 60 +++++----- src/client/microgrid_client_handle.rs | 10 +- src/logical_meter/formula.rs | 2 +- .../formula/aggregation_formula.rs | 2 +- src/logical_meter/formula/coalesce_formula.rs | 2 +- .../formula/graph_formula_provider.rs | 10 +- src/logical_meter/logical_meter_actor.rs | 12 +- src/logical_meter/logical_meter_handle.rs | 6 +- src/logical_meter/metric.rs | 6 +- src/proto.rs | 32 +++--- src/proto/graph.rs | 105 ++++++++++-------- submodules/frequenz-api-microgrid | 2 +- 16 files changed, 156 insertions(+), 119 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 55e4dda..b4f1992 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ path = "src/lib.rs" [dependencies] chrono = "0.4" -frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "b2fd616" } +frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "0b28f99" } frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "463414a" } frequenz-resampling = { git = "https://github.com/frequenz-floss/frequenz-resampling-rs.git", rev = "ce84d66" } prost = "0.13" diff --git a/build.rs b/build.rs index 4737d2b..9f8d502 100644 --- a/build.rs +++ b/build.rs @@ -7,7 +7,7 @@ fn main() -> Result<(), std::io::Error> { tonic_build::configure() .compile_protos_with_config( config, - &["submodules/frequenz-api-microgrid/proto/frequenz/api/microgrid/v1/microgrid.proto"], + &["submodules/frequenz-api-microgrid/proto/frequenz/api/microgrid/v1alpha18/microgrid.proto"], &[ "submodules/frequenz-api-microgrid/proto", "submodules/frequenz-api-microgrid/submodules/frequenz-api-common/proto", diff --git a/examples/logical_meter.rs b/examples/logical_meter.rs index 07ebdb5..fd71e6c 100644 --- a/examples/logical_meter.rs +++ b/examples/logical_meter.rs @@ -28,13 +28,13 @@ async fn main() -> Result<(), Error> { .await?; // Create a formula that calculates `grid_power - battery_power`. - let formula_grid = logical_meter.grid(metric::AcActivePower)?; - let formula_battery = logical_meter.battery(None, metric::AcActivePower)?; - let formula_consumer = logical_meter.consumer(metric::AcActivePower)?; + let formula_grid = logical_meter.grid(metric::AcPowerActive)?; + let formula_battery = logical_meter.battery(None, metric::AcPowerActive)?; + let formula_consumer = logical_meter.consumer(metric::AcPowerActive)?; - let formula = (logical_meter.grid(metric::AcActivePower)? - - logical_meter.battery(None, metric::AcActivePower)? - + logical_meter.consumer(metric::AcActivePower)?)?; + let formula = (logical_meter.grid(metric::AcPowerActive)? + - logical_meter.battery(None, metric::AcPowerActive)? + + logical_meter.consumer(metric::AcPowerActive)?)?; let mut rx = formula.subscribe().await?; let mut grid_rx = formula_grid.subscribe().await?; diff --git a/src/client/instruction.rs b/src/client/instruction.rs index 801b93e..17c7123 100644 --- a/src/client/instruction.rs +++ b/src/client/instruction.rs @@ -7,7 +7,9 @@ use tokio::sync::{broadcast, oneshot}; use crate::{ Error, - proto::common::v1::microgrid::components::{Component, ComponentConnection, ComponentData}, + proto::common::v1alpha8::microgrid::electrical_components::{ + ElectricalComponent, ElectricalComponentConnection, ElectricalComponentTelemetry, + }, }; /// Instructions that can be sent to the client actor from client handles. @@ -15,16 +17,16 @@ use crate::{ pub(super) enum Instruction { GetComponentDataStream { component_id: u64, - response_tx: oneshot::Sender>, + response_tx: oneshot::Sender>, }, ListComponents { component_ids: Vec, categories: Vec, - response_tx: oneshot::Sender, Error>>, + response_tx: oneshot::Sender, Error>>, }, ListConnections { starts: Vec, ends: Vec, - response_tx: oneshot::Sender, Error>>, + response_tx: oneshot::Sender, Error>>, }, } diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index 04d7a34..eb7712f 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -3,7 +3,14 @@ //! The microgrid client actor that handles communication with the microgrid API. -use crate::client::{instruction::Instruction, retry_tracker::RetryTracker}; +use crate::{ + client::{instruction::Instruction, retry_tracker::RetryTracker}, + proto::microgrid::v1alpha18::{ + ListElectricalComponentConnectionsRequest, ListElectricalComponentsRequest, + ReceiveElectricalComponentTelemetryStreamRequest, + ReceiveElectricalComponentTelemetryStreamResponse, + }, +}; use std::collections::HashMap; use tokio::{ @@ -16,11 +23,8 @@ use tracing::Instrument as _; use crate::{ Error, proto::{ - common::v1::microgrid::components::ComponentData, - microgrid::v1::{ - ListComponentsRequest, ListConnectionsRequest, ReceiveComponentDataStreamRequest, - ReceiveComponentDataStreamResponse, microgrid_client::MicrogridClient, - }, + common::v1alpha8::microgrid::electrical_components::ElectricalComponentTelemetry, + microgrid::v1alpha18::microgrid_client::MicrogridClient, }, }; @@ -57,7 +61,8 @@ impl MicrogridClientActor { } }; - let mut component_streams: HashMap> = HashMap::new(); + let mut component_streams: HashMap> = + HashMap::new(); let (stream_status_tx, mut stream_status_rx) = mpsc::channel(50); let mut retry_timer = tokio::time::interval(std::time::Duration::from_secs(1)); @@ -114,7 +119,7 @@ impl MicrogridClientActor { /// Handles the instructions received from the `MicrogridClientHandle` instances. async fn handle_instruction( client: &mut MicrogridClient, - component_streams: &mut HashMap>, + component_streams: &mut HashMap>, instruction: Option, stream_status_tx: mpsc::Sender, ) -> Result<(), Error> { @@ -135,7 +140,7 @@ async fn handle_instruction( // If a stream for the given component does not exist, create a new // channel and start a task for streaming component data from the // API service into the channel. - let (tx, rx) = broadcast::channel::(100); + let (tx, rx) = broadcast::channel::(100); component_streams.insert(component_id, tx.clone()); start_component_data_stream(client, component_id, tx, stream_status_tx).await?; @@ -150,13 +155,13 @@ async fn handle_instruction( categories, }) => { let components = client - .list_components(ListComponentsRequest { - component_ids, - categories, + .list_electrical_components(ListElectricalComponentsRequest { + electrical_component_ids: component_ids, + electrical_component_categories: categories, }) .await .map_err(|e| Error::connection_failure(format!("list_components failed: {e}"))) - .map(|r| r.into_inner().components); + .map(|r| r.into_inner().electrical_components); response_tx .send(components) @@ -168,10 +173,13 @@ async fn handle_instruction( ends, }) => { let connections = client - .list_connections(ListConnectionsRequest { starts, ends }) + .list_electrical_component_connections(ListElectricalComponentConnectionsRequest { + source_electrical_component_ids: starts, + destination_electrical_component_ids: ends, + }) .await .map_err(|e| Error::connection_failure(format!("list_connections failed: {e}"))) - .map(|r| r.into_inner().connections); + .map(|r| r.into_inner().electrical_component_connections); response_tx .send(connections) @@ -187,7 +195,7 @@ async fn handle_instruction( /// need to be retried and restarting their streaming tasks if necessary. async fn handle_retry_timer( client: &mut MicrogridClient, - component_streams: &mut HashMap>, + component_streams: &mut HashMap>, components_to_retry: &mut HashMap, stream_status_tx: mpsc::Sender, now: tokio::time::Instant, @@ -218,14 +226,16 @@ async fn handle_retry_timer( async fn start_component_data_stream( client: &mut MicrogridClient, component_id: u64, - tx: broadcast::Sender, + tx: broadcast::Sender, stream_status_tx: mpsc::Sender, ) -> Result<(), Error> { let stream = match client - .receive_component_data_stream(ReceiveComponentDataStreamRequest { - component_id, - filter: None, - }) + .receive_electrical_component_telemetry_stream( + ReceiveElectricalComponentTelemetryStreamRequest { + electrical_component_id: component_id, + filter: None, + }, + ) .await { Ok(s) => s.into_inner(), @@ -261,9 +271,9 @@ async fn start_component_data_stream( } async fn run_component_data_stream( - mut stream: tonic::Streaming, + mut stream: tonic::Streaming, component_id: u64, - tx: broadcast::Sender, + tx: broadcast::Sender, stream_status_tx: mpsc::Sender, ) { loop { @@ -296,8 +306,8 @@ async fn run_component_data_stream( } }; let data = match message { - Some(ReceiveComponentDataStreamResponse { data: Some(d) }) => d, - Some(ReceiveComponentDataStreamResponse { data: None }) => { + Some(ReceiveElectricalComponentTelemetryStreamResponse { telemetry: Some(d) }) => d, + Some(ReceiveElectricalComponentTelemetryStreamResponse { telemetry: None }) => { tracing::warn!( "get_component_data stream returned empty data for {}", component_id diff --git a/src/client/microgrid_client_handle.rs b/src/client/microgrid_client_handle.rs index 5648730..6500fd8 100644 --- a/src/client/microgrid_client_handle.rs +++ b/src/client/microgrid_client_handle.rs @@ -10,7 +10,9 @@ use tokio::sync::{broadcast, mpsc, oneshot}; use crate::{ Error, - proto::common::v1::microgrid::components::{Component, ComponentConnection, ComponentData}, + proto::common::v1alpha8::microgrid::electrical_components::{ + ElectricalComponent, ElectricalComponentConnection, ElectricalComponentTelemetry, + }, }; use super::{instruction::Instruction, microgrid_client_actor::MicrogridClientActor}; @@ -41,7 +43,7 @@ impl MicrogridClientHandle { pub async fn get_component_data_stream( &self, component_id: u64, - ) -> Result, Error> { + ) -> Result, Error> { let (response_tx, response_rx) = oneshot::channel(); self.instructions_tx @@ -79,7 +81,7 @@ impl MicrogridClientHandle { &self, component_ids: Vec, categories: Vec, - ) -> Result, Error> { + ) -> Result, Error> { let (response_tx, response_rx) = oneshot::channel(); self.instructions_tx @@ -117,7 +119,7 @@ impl MicrogridClientHandle { &self, starts: Vec, ends: Vec, - ) -> Result, Error> { + ) -> Result, Error> { let (response_tx, response_rx) = oneshot::channel(); self.instructions_tx diff --git a/src/logical_meter/formula.rs b/src/logical_meter/formula.rs index efd3cbc..e4110fb 100644 --- a/src/logical_meter/formula.rs +++ b/src/logical_meter/formula.rs @@ -10,7 +10,7 @@ pub(crate) mod graph_formula_provider; pub use aggregation_formula::AggregationFormula; pub use coalesce_formula::CoalesceFormula; -use crate::{Error, Sample, proto::common::v1::metrics::Metric}; +use crate::{Error, Sample, proto::common::v1alpha8::metrics::Metric}; use tokio::sync::{broadcast, mpsc}; use super::logical_meter_actor; diff --git a/src/logical_meter/formula/aggregation_formula.rs b/src/logical_meter/formula/aggregation_formula.rs index 1010878..8d145b8 100644 --- a/src/logical_meter/formula/aggregation_formula.rs +++ b/src/logical_meter/formula/aggregation_formula.rs @@ -5,7 +5,7 @@ use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider}; use crate::{ - Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric, + Error, Sample, logical_meter::logical_meter_actor, proto::common::v1alpha8::metrics::Metric, }; use tokio::sync::{broadcast, mpsc, oneshot}; diff --git a/src/logical_meter/formula/coalesce_formula.rs b/src/logical_meter/formula/coalesce_formula.rs index 551a2dc..c52f1dc 100644 --- a/src/logical_meter/formula/coalesce_formula.rs +++ b/src/logical_meter/formula/coalesce_formula.rs @@ -5,7 +5,7 @@ use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider}; use crate::{ - Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric, + Error, Sample, logical_meter::logical_meter_actor, proto::common::v1alpha8::metrics::Metric, }; use tokio::sync::{broadcast, mpsc, oneshot}; diff --git a/src/logical_meter/formula/graph_formula_provider.rs b/src/logical_meter/formula/graph_formula_provider.rs index 5b0fd94..0d816a7 100644 --- a/src/logical_meter/formula/graph_formula_provider.rs +++ b/src/logical_meter/formula/graph_formula_provider.rs @@ -6,8 +6,10 @@ use crate::Error; use crate::logical_meter::formula::FormulaParams; use crate::logical_meter::logical_meter_actor; -use crate::proto::common::v1::microgrid::components::Component; -use crate::proto::common::v1::microgrid::components::ComponentConnection; +use crate::proto::common::v1alpha8::microgrid::electrical_components::{ + ElectricalComponent, ElectricalComponentConnection, +}; + use frequenz_microgrid_component_graph::ComponentGraph; use std::collections::BTreeSet; use tokio::sync::mpsc; @@ -18,7 +20,7 @@ macro_rules! graph_formula_provider { ($(($fnname:ident $(, ids:$idsparam:ident)? $(, id:$idparam:ident)?)),+ $(,)?) => {$( fn $fnname( - _graph: &ComponentGraph, + _graph: &ComponentGraph, _metric: M, _instructions_tx: mpsc::Sender, $($idsparam: Option>,)? @@ -64,7 +66,7 @@ macro_rules! impl_graph_formula_provider { )),+ $(,)?) => {$( fn $fnname( - graph: &ComponentGraph, + graph: &ComponentGraph, _metric: M, instructions_tx: mpsc::Sender, $($idsparam: Option>,)? diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index 2a1424e..6382ecf 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -12,10 +12,10 @@ use std::collections::{HashMap, HashSet}; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{MissedTickBehavior, interval}; -use crate::proto::common::v1::metrics::Metric; -use crate::proto::common::v1::metrics::metric_value_variant::MetricValueVariant; +use crate::proto::common::v1alpha8::metrics::{Metric, metric_value_variant::MetricValueVariant}; use crate::{ - Error, MicrogridClientHandle, Sample, proto::common::v1::microgrid::components::ComponentData, + Error, MicrogridClientHandle, Sample, + proto::common::v1alpha8::microgrid::electrical_components::ElectricalComponentTelemetry, }; use super::config::LogicalMeterConfig; @@ -29,7 +29,7 @@ struct ComponentDataResampler { component_id: u64, metric: Metric, resampler: frequenz_resampling::Resampler, - receiver: broadcast::Receiver, + receiver: broadcast::Receiver, } pub(crate) enum Instruction { @@ -281,7 +281,7 @@ impl LogicalMeterActor { fn push_to_resampler( &mut self, resampler: &mut ComponentDataResampler, - data: ComponentData, + data: ElectricalComponentTelemetry, metric: Metric, ) { let Some(dd) = data @@ -296,7 +296,7 @@ impl LogicalMeterActor { ); return; }; - let timestamp = if let Some(timestamp) = dd.sampled_at { + let timestamp = if let Some(timestamp) = dd.sample_time { if let Some(timestamp) = DateTime::from_timestamp(timestamp.seconds, timestamp.nanos as u32) { diff --git a/src/logical_meter/logical_meter_handle.rs b/src/logical_meter/logical_meter_handle.rs index 55b9634..52c159c 100644 --- a/src/logical_meter/logical_meter_handle.rs +++ b/src/logical_meter/logical_meter_handle.rs @@ -5,7 +5,9 @@ use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider; use crate::{ client::MicrogridClientHandle, error::Error, - proto::common::v1::microgrid::components::{Component, ComponentConnection}, + proto::common::v1alpha8::microgrid::electrical_components::{ + ElectricalComponent, ElectricalComponentConnection, + }, }; use frequenz_microgrid_component_graph::{self, ComponentGraph}; use std::collections::BTreeSet; @@ -17,7 +19,7 @@ use super::{LogicalMeterConfig, logical_meter_actor::LogicalMeterActor}; #[derive(Clone)] pub struct LogicalMeterHandle { instructions_tx: mpsc::Sender, - graph: ComponentGraph, + graph: ComponentGraph, } impl LogicalMeterHandle { diff --git a/src/logical_meter/metric.rs b/src/logical_meter/metric.rs index 16ec880..c42c50b 100644 --- a/src/logical_meter/metric.rs +++ b/src/logical_meter/metric.rs @@ -3,7 +3,7 @@ //! Metrics supported by the logical meter. -use crate::proto::common::v1::metrics::Metric as MetricPb; +use crate::proto::common::v1alpha8::metrics::Metric as MetricPb; use super::formula; @@ -38,8 +38,8 @@ macro_rules! define_metric { } define_metric! { - {name: AcActivePower, formula: AggregationFormula}, - {name: AcReactivePower, formula: AggregationFormula}, + {name: AcPowerActive, formula: AggregationFormula}, + {name: AcPowerReactive, formula: AggregationFormula}, {name: AcCurrent, formula: AggregationFormula}, {name: AcCurrentPhase1, formula: AggregationFormula}, {name: AcCurrentPhase2, formula: AggregationFormula}, diff --git a/src/proto.rs b/src/proto.rs index 9deef5b..d39985e 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -5,41 +5,45 @@ mod graph; -#[allow(clippy::doc_lazy_continuation)] +#[allow(clippy::doc_lazy_continuation, clippy::doc_overindented_list_items)] pub mod common { - pub mod v1 { - #![allow(clippy::derive_partial_eq_without_eq)] - tonic::include_proto!("frequenz.api.common.v1"); - + pub mod v1alpha8 { pub mod grid { #![allow(clippy::derive_partial_eq_without_eq)] - tonic::include_proto!("frequenz.api.common.v1.grid"); + tonic::include_proto!("frequenz.api.common.v1alpha8.grid"); } pub mod microgrid { #![allow(clippy::derive_partial_eq_without_eq)] - tonic::include_proto!("frequenz.api.common.v1.microgrid"); - pub mod components { + tonic::include_proto!("frequenz.api.common.v1alpha8.microgrid"); + pub mod electrical_components { #![allow(clippy::derive_partial_eq_without_eq)] - tonic::include_proto!("frequenz.api.common.v1.microgrid.components"); + tonic::include_proto!( + "frequenz.api.common.v1alpha8.microgrid.electrical_components" + ); } pub mod sensors { #![allow(clippy::derive_partial_eq_without_eq)] - tonic::include_proto!("frequenz.api.common.v1.microgrid.sensors"); + tonic::include_proto!("frequenz.api.common.v1alpha8.microgrid.sensors"); } } pub mod metrics { #![allow(clippy::derive_partial_eq_without_eq)] - tonic::include_proto!("frequenz.api.common.v1.metrics"); + tonic::include_proto!("frequenz.api.common.v1alpha8.metrics"); + } + + pub mod types { + #![allow(clippy::derive_partial_eq_without_eq)] + tonic::include_proto!("frequenz.api.common.v1alpha8.types"); } } } -#[allow(clippy::doc_lazy_continuation)] +#[allow(clippy::doc_lazy_continuation, clippy::doc_overindented_list_items)] pub mod microgrid { - pub mod v1 { + pub mod v1alpha18 { #![allow(clippy::derive_partial_eq_without_eq)] - tonic::include_proto!("frequenz.api.microgrid.v1"); + tonic::include_proto!("frequenz.api.microgrid.v1alpha18"); } } diff --git a/src/proto/graph.rs b/src/proto/graph.rs index ccec7c0..329cd93 100644 --- a/src/proto/graph.rs +++ b/src/proto/graph.rs @@ -6,43 +6,46 @@ use tracing::{error, warn}; impl frequenz_microgrid_component_graph::Node - for super::common::v1::microgrid::components::Component + for super::common::v1alpha8::microgrid::electrical_components::ElectricalComponent { fn component_id(&self) -> u64 { self.id } fn category(&self) -> frequenz_microgrid_component_graph::ComponentCategory { - use super::common::v1::microgrid::components as pb; + use super::common::v1alpha8::microgrid::electrical_components as pb; use frequenz_microgrid_component_graph as gr; - let category = pb::ComponentCategory::try_from(self.category).unwrap_or_else(|e| { - error!("Error converting component category: {}", e); - pb::ComponentCategory::Unspecified - }); + let category = + pb::ElectricalComponentCategory::try_from(self.category).unwrap_or_else(|e| { + error!("Error converting component category: {}", e); + pb::ElectricalComponentCategory::Unspecified + }); match category { - pb::ComponentCategory::Unspecified => gr::ComponentCategory::Unspecified, - pb::ComponentCategory::Grid => gr::ComponentCategory::Grid, - pb::ComponentCategory::Meter => gr::ComponentCategory::Meter, - pb::ComponentCategory::Inverter => { - gr::ComponentCategory::Inverter(match self.category_type { - Some(pb::ComponentCategoryMetadataVariant { metadata }) => match metadata { - Some(pb::component_category_metadata_variant::Metadata::Inverter( + pb::ElectricalComponentCategory::Unspecified => gr::ComponentCategory::Unspecified, + pb::ElectricalComponentCategory::GridConnectionPoint => { + gr::ComponentCategory::GridConnectionPoint + } + pb::ElectricalComponentCategory::Meter => gr::ComponentCategory::Meter, + pb::ElectricalComponentCategory::Inverter => { + gr::ComponentCategory::Inverter(match self.category_specific_info { + Some(pb::ElectricalComponentCategorySpecificInfo { kind }) => match kind { + Some(pb::electrical_component_category_specific_info::Kind::Inverter( inverter, )) => { match pb::InverterType::try_from(inverter.r#type).unwrap_or_else(|e| { error!("Error converting inverter type: {}", e); pb::InverterType::Unspecified }) { - pb::InverterType::Solar => gr::InverterType::Solar, + pb::InverterType::Pv => gr::InverterType::Pv, pb::InverterType::Battery => gr::InverterType::Battery, pb::InverterType::Hybrid => gr::InverterType::Hybrid, pb::InverterType::Unspecified => gr::InverterType::Unspecified, } } Some(_) => { - warn!("Unknown metadata variant for inverter: {:?}", metadata); + warn!("Unknown component specific info for inverter: {:?}", kind); gr::InverterType::Unspecified } None => gr::InverterType::Unspecified, @@ -50,22 +53,24 @@ impl frequenz_microgrid_component_graph::Node _ => gr::InverterType::Unspecified, }) } - pb::ComponentCategory::Converter => gr::ComponentCategory::Converter, - pb::ComponentCategory::Battery => { - gr::ComponentCategory::Battery(match self.category_type { - Some(pb::ComponentCategoryMetadataVariant { metadata }) => match metadata { - Some(pb::component_category_metadata_variant::Metadata::Battery( + pb::ElectricalComponentCategory::Converter => gr::ComponentCategory::Converter, + pb::ElectricalComponentCategory::Battery => { + gr::ComponentCategory::Battery(match self.category_specific_info { + Some(pb::ElectricalComponentCategorySpecificInfo { kind }) => match kind { + Some(pb::electrical_component_category_specific_info::Kind::Battery( battery, - )) => match pb::BatteryType::try_from(battery.r#type).unwrap_or_else(|e| { - error!("Error converting battery type: {}", e); - pb::BatteryType::Unspecified - }) { - pb::BatteryType::LiIon => gr::BatteryType::LiIon, - pb::BatteryType::NaIon => gr::BatteryType::NaIon, - pb::BatteryType::Unspecified => gr::BatteryType::Unspecified, - }, + )) => { + match pb::BatteryType::try_from(battery.r#type).unwrap_or_else(|e| { + error!("Error converting battery type: {}", e); + pb::BatteryType::Unspecified + }) { + pb::BatteryType::LiIon => gr::BatteryType::LiIon, + pb::BatteryType::NaIon => gr::BatteryType::NaIon, + pb::BatteryType::Unspecified => gr::BatteryType::Unspecified, + } + } Some(_) => { - warn!("Unknown metadata variant for battery: {:?}", metadata); + warn!("Unknown component specific info for battery: {:?}", kind); gr::BatteryType::Unspecified } None => gr::BatteryType::Unspecified, @@ -73,10 +78,10 @@ impl frequenz_microgrid_component_graph::Node _ => gr::BatteryType::Unspecified, }) } - pb::ComponentCategory::EvCharger => { - gr::ComponentCategory::EvCharger(match self.category_type { - Some(pb::ComponentCategoryMetadataVariant { metadata }) => match metadata { - Some(pb::component_category_metadata_variant::Metadata::EvCharger( + pb::ElectricalComponentCategory::EvCharger => { + gr::ComponentCategory::EvCharger(match self.category_specific_info { + Some(pb::ElectricalComponentCategorySpecificInfo { kind }) => match kind { + Some(pb::electrical_component_category_specific_info::Kind::EvCharger( ev_charger, )) => match pb::EvChargerType::try_from(ev_charger.r#type).unwrap_or_else( |e| { @@ -90,7 +95,7 @@ impl frequenz_microgrid_component_graph::Node pb::EvChargerType::Unspecified => gr::EvChargerType::Unspecified, }, Some(_) => { - warn!("Unknown metadata variant for ev charger: {:?}", metadata); + warn!("Unknown component specific info for ev charger: {:?}", kind); gr::EvChargerType::Unspecified } None => gr::EvChargerType::Unspecified, @@ -98,26 +103,36 @@ impl frequenz_microgrid_component_graph::Node _ => gr::EvChargerType::Unspecified, }) } - pb::ComponentCategory::CryptoMiner => gr::ComponentCategory::CryptoMiner, - pb::ComponentCategory::Electrolyzer => gr::ComponentCategory::Electrolyzer, - pb::ComponentCategory::Chp => gr::ComponentCategory::Chp, - pb::ComponentCategory::Relay => gr::ComponentCategory::Relay, - pb::ComponentCategory::Precharger => gr::ComponentCategory::Precharger, - pb::ComponentCategory::Fuse => gr::ComponentCategory::Fuse, - pb::ComponentCategory::VoltageTransformer => gr::ComponentCategory::VoltageTransformer, - pb::ComponentCategory::Hvac => gr::ComponentCategory::Hvac, + pb::ElectricalComponentCategory::CryptoMiner => gr::ComponentCategory::CryptoMiner, + pb::ElectricalComponentCategory::Electrolyzer => gr::ComponentCategory::Electrolyzer, + pb::ElectricalComponentCategory::Chp => gr::ComponentCategory::Chp, + pb::ElectricalComponentCategory::Hvac => gr::ComponentCategory::Hvac, + pb::ElectricalComponentCategory::Breaker => gr::ComponentCategory::Breaker, + pb::ElectricalComponentCategory::Precharger => gr::ComponentCategory::Precharger, + pb::ElectricalComponentCategory::PowerTransformer => { + gr::ComponentCategory::PowerTransformer + } + pb::ElectricalComponentCategory::Plc => gr::ComponentCategory::Plc, + pb::ElectricalComponentCategory::StaticTransferSwitch => { + gr::ComponentCategory::StaticTransferSwitch + } + pb::ElectricalComponentCategory::UninterruptiblePowerSupply => { + gr::ComponentCategory::UninterruptiblePowerSupply + } + pb::ElectricalComponentCategory::CapacitorBank => gr::ComponentCategory::CapacitorBank, + pb::ElectricalComponentCategory::WindTurbine => gr::ComponentCategory::WindTurbine, } } } impl frequenz_microgrid_component_graph::Edge - for super::common::v1::microgrid::components::ComponentConnection + for super::common::v1alpha8::microgrid::electrical_components::ElectricalComponentConnection { fn source(&self) -> u64 { - self.source_component_id + self.source_electrical_component_id } fn destination(&self) -> u64 { - self.destination_component_id + self.destination_electrical_component_id } } diff --git a/submodules/frequenz-api-microgrid b/submodules/frequenz-api-microgrid index b97288d..0e5529e 160000 --- a/submodules/frequenz-api-microgrid +++ b/submodules/frequenz-api-microgrid @@ -1 +1 @@ -Subproject commit b97288d951a64e79e2b9a3f59c984d26b71b6abe +Subproject commit 0e5529ed0ebd59cf6ef4c26e54f0c2381dad9a16 From c67da7069319d49f388cd591e0bab90f3e948587 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Fri, 8 Aug 2025 16:56:31 +0200 Subject: [PATCH 2/4] Align client's component telemetry stream method with microgrid API Rename functions, variables, and instructions related to streaming component data to use the more specific "electrical component telemetry". This change improves clarity and makes the client-side code more consistent with the gRPC service definitions. Signed-off-by: Sahas Subramanian --- src/client/instruction.rs | 4 +- src/client/microgrid_client_actor.rs | 78 +++++++++++++++--------- src/client/microgrid_client_handle.rs | 10 +-- src/logical_meter/logical_meter_actor.rs | 5 +- 4 files changed, 60 insertions(+), 37 deletions(-) diff --git a/src/client/instruction.rs b/src/client/instruction.rs index 17c7123..44a1ae4 100644 --- a/src/client/instruction.rs +++ b/src/client/instruction.rs @@ -15,8 +15,8 @@ use crate::{ /// Instructions that can be sent to the client actor from client handles. #[derive(Debug)] pub(super) enum Instruction { - GetComponentDataStream { - component_id: u64, + ReceiveElectricalComponentTelemetryStream { + electrical_component_id: u64, response_tx: oneshot::Sender>, }, ListComponents { diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index eb7712f..e73d579 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -124,25 +124,31 @@ async fn handle_instruction( stream_status_tx: mpsc::Sender, ) -> Result<(), Error> { match instruction { - Some(Instruction::GetComponentDataStream { - component_id, + Some(Instruction::ReceiveElectricalComponentTelemetryStream { + electrical_component_id, response_tx, }) => { // If a stream for the given component already exists, subscribe to // it and return. - if let Some(stream) = component_streams.get(&component_id) { + if let Some(stream) = component_streams.get(&electrical_component_id) { response_tx .send(stream.subscribe()) .map_err(|_| Error::internal("failed to send response"))?; return Ok(()); } - // If a stream for the given component does not exist, create a new - // channel and start a task for streaming component data from the - // API service into the channel. + // If a stream for the given electrical component does not exist, + // create a new channel and start a task for streaming telemetry + // from the API service into the channel. let (tx, rx) = broadcast::channel::(100); - component_streams.insert(component_id, tx.clone()); - start_component_data_stream(client, component_id, tx, stream_status_tx).await?; + component_streams.insert(electrical_component_id, tx.clone()); + start_electrical_component_telemetry_stream( + client, + electrical_component_id, + tx, + stream_status_tx, + ) + .await?; response_tx.send(rx).map_err(|_| { tracing::error!("failed to send response"); @@ -208,8 +214,13 @@ async fn handle_retry_timer( item.1.mark_new_retry(); let (component_id, _) = item; if let Some(tx) = component_streams.get(component_id).cloned() { - start_component_data_stream(client, *component_id, tx, stream_status_tx.clone()) - .await?; + start_electrical_component_telemetry_stream( + client, + *component_id, + tx, + stream_status_tx.clone(), + ) + .await?; } else { tracing::error!("Component stream not found for retry: {component_id}"); return Err(Error::internal(format!( @@ -223,16 +234,16 @@ async fn handle_retry_timer( /// Creates a new data stream for the given component ID and starts a task to /// fetch data from it in a loop. -async fn start_component_data_stream( +async fn start_electrical_component_telemetry_stream( client: &mut MicrogridClient, - component_id: u64, + electrical_component_id: u64, tx: broadcast::Sender, stream_status_tx: mpsc::Sender, ) -> Result<(), Error> { let stream = match client .receive_electrical_component_telemetry_stream( ReceiveElectricalComponentTelemetryStreamRequest { - electrical_component_id: component_id, + electrical_component_id, filter: None, }, ) @@ -241,38 +252,44 @@ async fn start_component_data_stream( Ok(s) => s.into_inner(), Err(e) => { stream_status_tx - .send(StreamStatus::Failed(component_id)) + .send(StreamStatus::Failed(electrical_component_id)) .await .map_err(|e| { Error::connection_failure(format!( - "receive_component_data_stream failed for {component_id}: {e}", + "receive_component_data_stream failed for {electrical_component_id}: {e}", )) })?; return Err(Error::connection_failure(format!( - "receive_component_data_stream failed for {component_id}: {e}", + "receive_component_data_stream failed for {electrical_component_id}: {e}", ))); } }; stream_status_tx - .send(StreamStatus::Connected(component_id)) + .send(StreamStatus::Connected(electrical_component_id)) .await .map_err(|e| { Error::connection_failure(format!( - "Failed to send stream recovered message for {component_id}: {e}", + "Failed to send stream recovered message for {electrical_component_id}: {e}", )) })?; // create a task to fetch data from the stream in a loop and put into a channel. tokio::spawn( - run_component_data_stream(stream, component_id, tx, stream_status_tx).in_current_span(), + run_electrical_component_telemetry_stream( + stream, + electrical_component_id, + tx, + stream_status_tx, + ) + .in_current_span(), ); Ok(()) } -async fn run_component_data_stream( +async fn run_electrical_component_telemetry_stream( mut stream: tonic::Streaming, - component_id: u64, + electrical_component_id: u64, tx: broadcast::Sender, stream_status_tx: mpsc::Sender, ) { @@ -280,15 +297,15 @@ async fn run_component_data_stream( if tx.receiver_count() == 0 { tracing::debug!( "Dropping ComponentData stream for component_id:{:?}", - component_id + electrical_component_id ); stream_status_tx - .send(StreamStatus::Ended(component_id)) + .send(StreamStatus::Ended(electrical_component_id)) .await .unwrap_or_else(|e| { tracing::error!( "Failed to send stream ended message for {:?}: {:?}", - component_id, + electrical_component_id, e ); }); @@ -299,7 +316,7 @@ async fn run_component_data_stream( Err(e) => { tracing::error!( "get_component_data stream failed for {:?}: {:?}", - component_id, + electrical_component_id, e ); break; @@ -310,12 +327,15 @@ async fn run_component_data_stream( Some(ReceiveElectricalComponentTelemetryStreamResponse { telemetry: None }) => { tracing::warn!( "get_component_data stream returned empty data for {}", - component_id + electrical_component_id ); continue; } None => { - tracing::warn!("get_component_data stream ended for {:?}", component_id); + tracing::warn!( + "get_component_data stream ended for {:?}", + electrical_component_id + ); break; } }; @@ -326,12 +346,12 @@ async fn run_component_data_stream( } if let Err(e) = stream_status_tx - .send(StreamStatus::Failed(component_id)) + .send(StreamStatus::Failed(electrical_component_id)) .await { tracing::error!( "Failed to send stream stopped message for {:?}: {:?}", - component_id, + electrical_component_id, e ); } diff --git a/src/client/microgrid_client_handle.rs b/src/client/microgrid_client_handle.rs index 6500fd8..cad5a5c 100644 --- a/src/client/microgrid_client_handle.rs +++ b/src/client/microgrid_client_handle.rs @@ -35,20 +35,20 @@ impl MicrogridClientHandle { Self { instructions_tx } } - /// Returns a stream containing data from a component with a given ID. + /// Returns a telemetry stream from an electrical component with a given ID. /// /// When a connection to the API service is lost, reconnecting is handled /// automatically, and the receiver will resume receiving data from the /// component once the connection is re-established. - pub async fn get_component_data_stream( + pub async fn receive_electrical_component_telemetry_stream( &self, - component_id: u64, + electrical_component_id: u64, ) -> Result, Error> { let (response_tx, response_rx) = oneshot::channel(); self.instructions_tx - .send(Instruction::GetComponentDataStream { - component_id, + .send(Instruction::ReceiveElectricalComponentTelemetryStream { + electrical_component_id, response_tx, }) .await diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index 6382ecf..60bcdd8 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -180,7 +180,10 @@ impl LogicalMeterActor { .ok_or_else(|| Error::chrono_error("Failed to get current time."))?, false, ), - receiver: self.client.get_component_data_stream(*component_id).await?, + receiver: self + .client + .receive_electrical_component_telemetry_stream(*component_id) + .await?, }; resamplers.insert(resampler_key, resampler); } From fa073ccab3a0df4b6f1bd4564cb15fe9d7b55e4d Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Fri, 8 Aug 2025 18:01:34 +0200 Subject: [PATCH 3/4] =?UTF-8?q?Rename=20MicrogridClient::list=5Fcomponents?= =?UTF-8?q?=20=E2=86=92=20list=5Felectrical=5Fcomponents?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sahas Subramanian --- src/client/instruction.rs | 6 +++--- src/client/microgrid_client_actor.rs | 10 +++++----- src/client/microgrid_client_handle.rs | 12 ++++++------ src/logical_meter/logical_meter_handle.rs | 2 +- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/client/instruction.rs b/src/client/instruction.rs index 44a1ae4..4ea6439 100644 --- a/src/client/instruction.rs +++ b/src/client/instruction.rs @@ -19,9 +19,9 @@ pub(super) enum Instruction { electrical_component_id: u64, response_tx: oneshot::Sender>, }, - ListComponents { - component_ids: Vec, - categories: Vec, + ListElectricalComponents { + electrical_component_ids: Vec, + electrical_component_categories: Vec, response_tx: oneshot::Sender, Error>>, }, ListConnections { diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index e73d579..3138748 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -155,15 +155,15 @@ async fn handle_instruction( Error::internal("failed to send response") })?; } - Some(Instruction::ListComponents { + Some(Instruction::ListElectricalComponents { response_tx, - component_ids, - categories, + electrical_component_ids, + electrical_component_categories, }) => { let components = client .list_electrical_components(ListElectricalComponentsRequest { - electrical_component_ids: component_ids, - electrical_component_categories: categories, + electrical_component_ids, + electrical_component_categories, }) .await .map_err(|e| Error::connection_failure(format!("list_components failed: {e}"))) diff --git a/src/client/microgrid_client_handle.rs b/src/client/microgrid_client_handle.rs index cad5a5c..abaa4b8 100644 --- a/src/client/microgrid_client_handle.rs +++ b/src/client/microgrid_client_handle.rs @@ -77,18 +77,18 @@ impl MicrogridClientHandle { /// `ComponentCategory::COMPONENT_CATEGORY_BATTERY`. /// /// If a filter list is empty, then that filter is not applied. - pub async fn list_components( + pub async fn list_electrical_components( &self, - component_ids: Vec, - categories: Vec, + electrical_component_ids: Vec, + electrical_component_categories: Vec, ) -> Result, Error> { let (response_tx, response_rx) = oneshot::channel(); self.instructions_tx - .send(Instruction::ListComponents { + .send(Instruction::ListElectricalComponents { response_tx, - component_ids, - categories, + electrical_component_ids, + electrical_component_categories, }) .await .map_err(|_| Error::internal("failed to send instruction"))?; diff --git a/src/logical_meter/logical_meter_handle.rs b/src/logical_meter/logical_meter_handle.rs index 52c159c..85fd885 100644 --- a/src/logical_meter/logical_meter_handle.rs +++ b/src/logical_meter/logical_meter_handle.rs @@ -30,7 +30,7 @@ impl LogicalMeterHandle { ) -> Result { let (sender, receiver) = mpsc::channel(8); let graph = ComponentGraph::try_new( - client.list_components(vec![], vec![]).await?, + client.list_electrical_components(vec![], vec![]).await?, client.list_connections(vec![], vec![]).await?, frequenz_microgrid_component_graph::ComponentGraphConfig { allow_component_validation_failures: true, From dfd7bbb6bd9bd861e02ed53ad0ea5a8fb1e6d2b6 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 11 Aug 2025 09:46:05 +0200 Subject: [PATCH 4/4] =?UTF-8?q?Rename=20list=5Fconnections=20=E2=86=92=20l?= =?UTF-8?q?ist=5Felectrical=5Fcomponent=5Fconnections?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sahas Subramanian --- src/client/instruction.rs | 6 +++--- src/client/microgrid_client_actor.rs | 10 +++++----- src/client/microgrid_client_handle.rs | 12 ++++++------ src/logical_meter/logical_meter_handle.rs | 4 +++- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/client/instruction.rs b/src/client/instruction.rs index 4ea6439..ca9d8ca 100644 --- a/src/client/instruction.rs +++ b/src/client/instruction.rs @@ -24,9 +24,9 @@ pub(super) enum Instruction { electrical_component_categories: Vec, response_tx: oneshot::Sender, Error>>, }, - ListConnections { - starts: Vec, - ends: Vec, + ListElectricalComponentConnections { + source_electrical_component_ids: Vec, + destination_electrical_component_ids: Vec, response_tx: oneshot::Sender, Error>>, }, } diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index 3138748..51355ea 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -173,15 +173,15 @@ async fn handle_instruction( .send(components) .map_err(|_| Error::internal("failed to send response"))?; } - Some(Instruction::ListConnections { + Some(Instruction::ListElectricalComponentConnections { response_tx, - starts, - ends, + source_electrical_component_ids, + destination_electrical_component_ids, }) => { let connections = client .list_electrical_component_connections(ListElectricalComponentConnectionsRequest { - source_electrical_component_ids: starts, - destination_electrical_component_ids: ends, + source_electrical_component_ids, + destination_electrical_component_ids, }) .await .map_err(|e| Error::connection_failure(format!("list_connections failed: {e}"))) diff --git a/src/client/microgrid_client_handle.rs b/src/client/microgrid_client_handle.rs index abaa4b8..93752a0 100644 --- a/src/client/microgrid_client_handle.rs +++ b/src/client/microgrid_client_handle.rs @@ -115,18 +115,18 @@ impl MicrogridClientHandle { /// * each `start` component ID is either `1`, `2`, OR `3`, /// AND /// * each `end` component ID is either `4`, `5`, OR `6`. - pub async fn list_connections( + pub async fn list_electrical_component_connections( &self, - starts: Vec, - ends: Vec, + source_electrical_component_ids: Vec, + destination_electrical_component_ids: Vec, ) -> Result, Error> { let (response_tx, response_rx) = oneshot::channel(); self.instructions_tx - .send(Instruction::ListConnections { + .send(Instruction::ListElectricalComponentConnections { response_tx, - starts, - ends, + source_electrical_component_ids, + destination_electrical_component_ids, }) .await .map_err(|_| Error::internal("failed to send instruction"))?; diff --git a/src/logical_meter/logical_meter_handle.rs b/src/logical_meter/logical_meter_handle.rs index 85fd885..68350ef 100644 --- a/src/logical_meter/logical_meter_handle.rs +++ b/src/logical_meter/logical_meter_handle.rs @@ -31,7 +31,9 @@ impl LogicalMeterHandle { let (sender, receiver) = mpsc::channel(8); let graph = ComponentGraph::try_new( client.list_electrical_components(vec![], vec![]).await?, - client.list_connections(vec![], vec![]).await?, + client + .list_electrical_component_connections(vec![], vec![]) + .await?, frequenz_microgrid_component_graph::ComponentGraphConfig { allow_component_validation_failures: true, allow_unconnected_components: true,