Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ path = "src/lib.rs"

[dependencies]
chrono = "0.4"
frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "b2fd616" }
frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "0b28f99" }
frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "463414a" }
frequenz-resampling = { git = "https://github.com/frequenz-floss/frequenz-resampling-rs.git", rev = "ce84d66" }
prost = "0.13"
Expand Down
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ fn main() -> Result<(), std::io::Error> {
tonic_build::configure()
.compile_protos_with_config(
config,
&["submodules/frequenz-api-microgrid/proto/frequenz/api/microgrid/v1/microgrid.proto"],
&["submodules/frequenz-api-microgrid/proto/frequenz/api/microgrid/v1alpha18/microgrid.proto"],
&[
"submodules/frequenz-api-microgrid/proto",
"submodules/frequenz-api-microgrid/submodules/frequenz-api-common/proto",
Expand Down
12 changes: 6 additions & 6 deletions examples/logical_meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ async fn main() -> Result<(), Error> {
.await?;

// Create a formula that calculates `grid_power - battery_power`.
let formula_grid = logical_meter.grid(metric::AcActivePower)?;
let formula_battery = logical_meter.battery(None, metric::AcActivePower)?;
let formula_consumer = logical_meter.consumer(metric::AcActivePower)?;
let formula_grid = logical_meter.grid(metric::AcPowerActive)?;
let formula_battery = logical_meter.battery(None, metric::AcPowerActive)?;
let formula_consumer = logical_meter.consumer(metric::AcPowerActive)?;

let formula = (logical_meter.grid(metric::AcActivePower)?
- logical_meter.battery(None, metric::AcActivePower)?
+ logical_meter.consumer(metric::AcActivePower)?)?;
let formula = (logical_meter.grid(metric::AcPowerActive)?
- logical_meter.battery(None, metric::AcPowerActive)?
+ logical_meter.consumer(metric::AcPowerActive)?)?;

let mut rx = formula.subscribe().await?;
let mut grid_rx = formula_grid.subscribe().await?;
Expand Down
26 changes: 14 additions & 12 deletions src/client/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,26 @@ use tokio::sync::{broadcast, oneshot};

use crate::{
Error,
proto::common::v1::microgrid::components::{Component, ComponentConnection, ComponentData},
proto::common::v1alpha8::microgrid::electrical_components::{
ElectricalComponent, ElectricalComponentConnection, ElectricalComponentTelemetry,
},
};

/// Instructions that can be sent to the client actor from client handles.
#[derive(Debug)]
pub(super) enum Instruction {
GetComponentDataStream {
component_id: u64,
response_tx: oneshot::Sender<broadcast::Receiver<ComponentData>>,
ReceiveElectricalComponentTelemetryStream {
electrical_component_id: u64,
response_tx: oneshot::Sender<broadcast::Receiver<ElectricalComponentTelemetry>>,
},
ListComponents {
component_ids: Vec<u64>,
categories: Vec<i32>,
response_tx: oneshot::Sender<Result<Vec<Component>, Error>>,
ListElectricalComponents {
electrical_component_ids: Vec<u64>,
electrical_component_categories: Vec<i32>,
response_tx: oneshot::Sender<Result<Vec<ElectricalComponent>, Error>>,
},
ListConnections {
starts: Vec<u64>,
ends: Vec<u64>,
response_tx: oneshot::Sender<Result<Vec<ComponentConnection>, Error>>,
ListElectricalComponentConnections {
source_electrical_component_ids: Vec<u64>,
destination_electrical_component_ids: Vec<u64>,
response_tx: oneshot::Sender<Result<Vec<ElectricalComponentConnection>, Error>>,
},
}
148 changes: 89 additions & 59 deletions src/client/microgrid_client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@

//! The microgrid client actor that handles communication with the microgrid API.

use crate::client::{instruction::Instruction, retry_tracker::RetryTracker};
use crate::{
client::{instruction::Instruction, retry_tracker::RetryTracker},
proto::microgrid::v1alpha18::{
ListElectricalComponentConnectionsRequest, ListElectricalComponentsRequest,
ReceiveElectricalComponentTelemetryStreamRequest,
ReceiveElectricalComponentTelemetryStreamResponse,
},
};
use std::collections::HashMap;

use tokio::{
Expand All @@ -16,11 +23,8 @@ use tracing::Instrument as _;
use crate::{
Error,
proto::{
common::v1::microgrid::components::ComponentData,
microgrid::v1::{
ListComponentsRequest, ListConnectionsRequest, ReceiveComponentDataStreamRequest,
ReceiveComponentDataStreamResponse, microgrid_client::MicrogridClient,
},
common::v1alpha8::microgrid::electrical_components::ElectricalComponentTelemetry,
microgrid::v1alpha18::microgrid_client::MicrogridClient,
},
};

Expand Down Expand Up @@ -57,7 +61,8 @@ impl MicrogridClientActor {
}
};

let mut component_streams: HashMap<u64, broadcast::Sender<ComponentData>> = HashMap::new();
let mut component_streams: HashMap<u64, broadcast::Sender<ElectricalComponentTelemetry>> =
HashMap::new();

let (stream_status_tx, mut stream_status_rx) = mpsc::channel(50);
let mut retry_timer = tokio::time::interval(std::time::Duration::from_secs(1));
Expand Down Expand Up @@ -114,64 +119,73 @@ impl MicrogridClientActor {
/// Handles the instructions received from the `MicrogridClientHandle` instances.
async fn handle_instruction(
client: &mut MicrogridClient<Channel>,
component_streams: &mut HashMap<u64, broadcast::Sender<ComponentData>>,
component_streams: &mut HashMap<u64, broadcast::Sender<ElectricalComponentTelemetry>>,
instruction: Option<Instruction>,
stream_status_tx: mpsc::Sender<StreamStatus>,
) -> Result<(), Error> {
match instruction {
Some(Instruction::GetComponentDataStream {
component_id,
Some(Instruction::ReceiveElectricalComponentTelemetryStream {
electrical_component_id,
response_tx,
}) => {
// If a stream for the given component already exists, subscribe to
// it and return.
if let Some(stream) = component_streams.get(&component_id) {
if let Some(stream) = component_streams.get(&electrical_component_id) {
response_tx
.send(stream.subscribe())
.map_err(|_| Error::internal("failed to send response"))?;
return Ok(());
}

// If a stream for the given component does not exist, create a new
// channel and start a task for streaming component data from the
// API service into the channel.
let (tx, rx) = broadcast::channel::<ComponentData>(100);
component_streams.insert(component_id, tx.clone());
start_component_data_stream(client, component_id, tx, stream_status_tx).await?;
// If a stream for the given electrical component does not exist,
// create a new channel and start a task for streaming telemetry
// from the API service into the channel.
let (tx, rx) = broadcast::channel::<ElectricalComponentTelemetry>(100);
component_streams.insert(electrical_component_id, tx.clone());
start_electrical_component_telemetry_stream(
client,
electrical_component_id,
tx,
stream_status_tx,
)
.await?;

response_tx.send(rx).map_err(|_| {
tracing::error!("failed to send response");
Error::internal("failed to send response")
})?;
}
Some(Instruction::ListComponents {
Some(Instruction::ListElectricalComponents {
response_tx,
component_ids,
categories,
electrical_component_ids,
electrical_component_categories,
}) => {
let components = client
.list_components(ListComponentsRequest {
component_ids,
categories,
.list_electrical_components(ListElectricalComponentsRequest {
electrical_component_ids,
electrical_component_categories,
})
.await
.map_err(|e| Error::connection_failure(format!("list_components failed: {e}")))
.map(|r| r.into_inner().components);
.map(|r| r.into_inner().electrical_components);

response_tx
.send(components)
.map_err(|_| Error::internal("failed to send response"))?;
}
Some(Instruction::ListConnections {
Some(Instruction::ListElectricalComponentConnections {
response_tx,
starts,
ends,
source_electrical_component_ids,
destination_electrical_component_ids,
}) => {
let connections = client
.list_connections(ListConnectionsRequest { starts, ends })
.list_electrical_component_connections(ListElectricalComponentConnectionsRequest {
source_electrical_component_ids,
destination_electrical_component_ids,
})
.await
.map_err(|e| Error::connection_failure(format!("list_connections failed: {e}")))
.map(|r| r.into_inner().connections);
.map(|r| r.into_inner().electrical_component_connections);

response_tx
.send(connections)
Expand All @@ -187,7 +201,7 @@ async fn handle_instruction(
/// need to be retried and restarting their streaming tasks if necessary.
async fn handle_retry_timer(
client: &mut MicrogridClient<Channel>,
component_streams: &mut HashMap<u64, broadcast::Sender<ComponentData>>,
component_streams: &mut HashMap<u64, broadcast::Sender<ElectricalComponentTelemetry>>,
components_to_retry: &mut HashMap<u64, RetryTracker>,
stream_status_tx: mpsc::Sender<StreamStatus>,
now: tokio::time::Instant,
Expand All @@ -200,8 +214,13 @@ async fn handle_retry_timer(
item.1.mark_new_retry();
let (component_id, _) = item;
if let Some(tx) = component_streams.get(component_id).cloned() {
start_component_data_stream(client, *component_id, tx, stream_status_tx.clone())
.await?;
start_electrical_component_telemetry_stream(
client,
*component_id,
tx,
stream_status_tx.clone(),
)
.await?;
} else {
tracing::error!("Component stream not found for retry: {component_id}");
return Err(Error::internal(format!(
Expand All @@ -215,70 +234,78 @@ async fn handle_retry_timer(

/// Creates a new data stream for the given component ID and starts a task to
/// fetch data from it in a loop.
async fn start_component_data_stream(
async fn start_electrical_component_telemetry_stream(
client: &mut MicrogridClient<Channel>,
component_id: u64,
tx: broadcast::Sender<ComponentData>,
electrical_component_id: u64,
tx: broadcast::Sender<ElectricalComponentTelemetry>,
stream_status_tx: mpsc::Sender<StreamStatus>,
) -> Result<(), Error> {
let stream = match client
.receive_component_data_stream(ReceiveComponentDataStreamRequest {
component_id,
filter: None,
})
.receive_electrical_component_telemetry_stream(
ReceiveElectricalComponentTelemetryStreamRequest {
electrical_component_id,
filter: None,
},
)
.await
{
Ok(s) => s.into_inner(),
Err(e) => {
stream_status_tx
.send(StreamStatus::Failed(component_id))
.send(StreamStatus::Failed(electrical_component_id))
.await
.map_err(|e| {
Error::connection_failure(format!(
"receive_component_data_stream failed for {component_id}: {e}",
"receive_component_data_stream failed for {electrical_component_id}: {e}",
))
})?;
return Err(Error::connection_failure(format!(
"receive_component_data_stream failed for {component_id}: {e}",
"receive_component_data_stream failed for {electrical_component_id}: {e}",
)));
}
};

stream_status_tx
.send(StreamStatus::Connected(component_id))
.send(StreamStatus::Connected(electrical_component_id))
.await
.map_err(|e| {
Error::connection_failure(format!(
"Failed to send stream recovered message for {component_id}: {e}",
"Failed to send stream recovered message for {electrical_component_id}: {e}",
))
})?;

// create a task to fetch data from the stream in a loop and put into a channel.
tokio::spawn(
run_component_data_stream(stream, component_id, tx, stream_status_tx).in_current_span(),
run_electrical_component_telemetry_stream(
stream,
electrical_component_id,
tx,
stream_status_tx,
)
.in_current_span(),
);
Ok(())
}

async fn run_component_data_stream(
mut stream: tonic::Streaming<ReceiveComponentDataStreamResponse>,
component_id: u64,
tx: broadcast::Sender<ComponentData>,
async fn run_electrical_component_telemetry_stream(
mut stream: tonic::Streaming<ReceiveElectricalComponentTelemetryStreamResponse>,
electrical_component_id: u64,
tx: broadcast::Sender<ElectricalComponentTelemetry>,
stream_status_tx: mpsc::Sender<StreamStatus>,
) {
loop {
if tx.receiver_count() == 0 {
tracing::debug!(
"Dropping ComponentData stream for component_id:{:?}",
component_id
electrical_component_id
);
stream_status_tx
.send(StreamStatus::Ended(component_id))
.send(StreamStatus::Ended(electrical_component_id))
.await
.unwrap_or_else(|e| {
tracing::error!(
"Failed to send stream ended message for {:?}: {:?}",
component_id,
electrical_component_id,
e
);
});
Expand All @@ -289,23 +316,26 @@ async fn run_component_data_stream(
Err(e) => {
tracing::error!(
"get_component_data stream failed for {:?}: {:?}",
component_id,
electrical_component_id,
e
);
break;
}
};
let data = match message {
Some(ReceiveComponentDataStreamResponse { data: Some(d) }) => d,
Some(ReceiveComponentDataStreamResponse { data: None }) => {
Some(ReceiveElectricalComponentTelemetryStreamResponse { telemetry: Some(d) }) => d,
Some(ReceiveElectricalComponentTelemetryStreamResponse { telemetry: None }) => {
tracing::warn!(
"get_component_data stream returned empty data for {}",
component_id
electrical_component_id
);
continue;
}
None => {
tracing::warn!("get_component_data stream ended for {:?}", component_id);
tracing::warn!(
"get_component_data stream ended for {:?}",
electrical_component_id
);
break;
}
};
Expand All @@ -316,12 +346,12 @@ async fn run_component_data_stream(
}

if let Err(e) = stream_status_tx
.send(StreamStatus::Failed(component_id))
.send(StreamStatus::Failed(electrical_component_id))
.await
{
tracing::error!(
"Failed to send stream stopped message for {:?}: {:?}",
component_id,
electrical_component_id,
e
);
}
Expand Down
Loading