Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
- `MicrogridClientHandle::try_new`, `LogicalMeterHandle::try_new`, and `Microgrid::try_new` no longer return an error when the microgrid API server is unreachable at startup or when the server returns data that doesn't yet form a valid component graph; instead they wait for the server to recover. Callers that relied on a quick failure to detect a misconfigured or unavailable endpoint should wrap the call in `tokio::time::timeout` (or equivalent) to bound the wait. URL validation still fails fast: a malformed endpoint URL is still surfaced as `ConnectionFailure` from `MicrogridClientHandle::try_new`, and an invalid `LogicalMeterConfig` still surfaces synchronously from `LogicalMeterHandle::try_new`.

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- The microgrid client now tolerates the API server being absent or returning incomplete data at startup. `MicrogridClientHandle::try_new` establishes the gRPC connection lazily, so it succeeds regardless of whether the server is reachable; transient stream errors are then handled by the existing per-stream retry loop. `LogicalMeterHandle::try_new` (and therefore `Microgrid::try_new`) wraps the entire component-graph setup — listing components, listing connections, and building the graph — in a single retry loop that sleeps 3 seconds between attempts, so applications block waiting for the server and a valid graph instead of exiting with an error.

- `Bounds::combine_parallel`, `Bounds::intersect`, and `Bounds::merge_if_overlapping` are now public, allowing external callers to combine bounds without going through higher-level types.

## Bug Fixes

Expand Down
6 changes: 3 additions & 3 deletions src/bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl<Q: Quantity> Bounds<Q> {
}

/// Combines two bounds as if their components were connected in parallel.
pub(crate) fn combine_parallel(&self, other: &Self) -> Vec<Self> {
pub fn combine_parallel(&self, other: &Self) -> Vec<Self> {
if self.intersect(other).is_none() {
return vec![self.clone(), other.clone()];
}
Expand Down Expand Up @@ -67,7 +67,7 @@ impl<Q: Quantity> Bounds<Q> {

/// Returns the intersection of `self` and `other`, or `None` if the
/// intersection is empty.
pub(crate) fn intersect(&self, other: &Self) -> Option<Self> {
pub fn intersect(&self, other: &Self) -> Option<Self> {
let lower = Self::map_or_any(Q::max, self.lower, other.lower);
let upper = Self::map_or_any(Q::min, self.upper, other.upper);
if let (Some(lower), Some(upper)) = (lower, upper)
Expand All @@ -80,7 +80,7 @@ impl<Q: Quantity> Bounds<Q> {

/// If `self` and `other` overlap, returns the smallest single interval
/// that contains both; otherwise returns `None`.
pub(crate) fn merge_if_overlapping(&self, other: &Self) -> Option<Self> {
pub fn merge_if_overlapping(&self, other: &Self) -> Option<Self> {
self.intersect(other)?;
Some(Bounds {
lower: self.lower.and_then(|a| other.lower.map(|b| a.min(b))),
Expand Down
33 changes: 19 additions & 14 deletions src/client/microgrid_client_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use chrono::TimeDelta;
use tokio::sync::{broadcast, mpsc, oneshot};
use tonic::transport::Channel;
use tonic::transport::{Channel, Endpoint};

use crate::{
Bounds, Error,
Expand Down Expand Up @@ -36,20 +36,25 @@ pub struct MicrogridClientHandle {
}

impl MicrogridClientHandle {
/// Creates a new `MicrogridClientHandle` that connects to the microgrid API
/// at the specified URL.
/// Creates a new `MicrogridClientHandle` for the microgrid API at the
/// specified URL.
///
/// The connection is established lazily on the first RPC, so this method
/// succeeds even when no server is reachable yet. Per-call errors will
/// surface from the individual RPC methods, and the actor's per-stream
/// retry loop will keep attempting to reconnect telemetry streams.
///
/// Returns an error only if `url` is not a valid endpoint URL.
pub async fn try_new(url: impl Into<String>) -> Result<Self, Error> {
let client = match MicrogridClient::<Channel>::connect(url.into()).await {
Ok(t) => t,
Err(e) => {
tracing::error!("Could not connect to server: {e}");
return Err(Error::connection_failure(format!(
"Could not connect to server: {e}"
)));
}
};

Ok(Self::new_from_client(client))
let url = url.into();
let channel = Endpoint::from_shared(url.clone())
.map_err(|e| {
Error::connection_failure(format!("Invalid microgrid API URL {url}: {e}"))
})?
.connect_lazy();
Ok(Self::new_from_client(MicrogridClient::<Channel>::new(
channel,
)))
}

pub fn new_from_client(client: impl MicrogridApiClient) -> Self {
Expand Down
61 changes: 46 additions & 15 deletions src/logical_meter/logical_meter_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
};
use frequenz_microgrid_component_graph::{self, ComponentGraph};
use std::collections::BTreeSet;
use std::time::Duration;
use tokio::sync::mpsc;

use super::{LogicalMeterConfig, logical_meter_actor::LogicalMeterActor};
Expand All @@ -26,6 +27,11 @@ pub struct LogicalMeterHandle {

impl LogicalMeterHandle {
/// Creates a new LogicalMeter instance.
///
/// Listing the components and connections from the API and building the
/// component graph is retried indefinitely with a 3 second backoff, so
/// this call blocks until the server is reachable and returns data that
/// forms a valid graph. Returns an error only if `config` is invalid.
pub async fn try_new(
client: MicrogridClientHandle,
config: LogicalMeterConfig,
Expand All @@ -39,21 +45,19 @@ impl LogicalMeterHandle {
clock: C,
) -> Result<Self, Error> {
let (sender, receiver) = mpsc::channel(8);
let graph = ComponentGraph::try_new(
client.list_electrical_components(vec![], vec![]).await?,
client
.list_electrical_component_connections(vec![], vec![])
.await?,
frequenz_microgrid_component_graph::ComponentGraphConfig {
allow_component_validation_failures: true,
allow_unconnected_components: true,
allow_unspecified_inverters: false,
disable_fallback_components: false,
},
)
.map_err(|e| {
Error::component_graph_error(format!("Unable to create a component graph: {e}"))
})?;
const RETRY_DELAY: Duration = Duration::from_secs(3);
let graph = loop {
match build_component_graph(&client).await {
Ok(g) => break g,
Err(reason) => {
tracing::warn!(
"Microgrid logical-meter setup failed, retrying in {:?}: {reason}",
RETRY_DELAY
);
tokio::time::sleep(RETRY_DELAY).await;
}
}
};

let logical_meter = LogicalMeterActor::try_new(receiver, client, config, clock)?;

Expand Down Expand Up @@ -174,6 +178,33 @@ impl LogicalMeterHandle {
}
}

/// Lists the components and connections from the API and builds the
/// component graph. Errors from each step are stringified with a prefix so
/// the retry loop can log a concise reason.
async fn build_component_graph(
client: &MicrogridClientHandle,
) -> Result<ComponentGraph<ElectricalComponent, ElectricalComponentConnection>, String> {
let components = client
.list_electrical_components(vec![], vec![])
.await
.map_err(|e| format!("fetching components failed: {e}"))?;
let connections = client
.list_electrical_component_connections(vec![], vec![])
.await
.map_err(|e| format!("fetching component connections failed: {e}"))?;
ComponentGraph::try_new(
components,
connections,
frequenz_microgrid_component_graph::ComponentGraphConfig {
allow_component_validation_failures: true,
allow_unconnected_components: true,
allow_unspecified_inverters: false,
disable_fallback_components: false,
},
)
.map_err(|e| format!("building component graph failed: {e}"))
}

#[cfg(test)]
mod tests {
use chrono::TimeDelta;
Expand Down
7 changes: 5 additions & 2 deletions src/microgrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ impl Microgrid {
/// Creates a new `Microgrid` instance with the given microgrid API URL and
/// logical meter configuration.
///
/// Returns an error if the URL is unreachable, or if the component graph
/// cannot be created with the given configuration.
/// The microgrid API connection is established lazily and connection or
/// component-graph build errors during setup are retried indefinitely, so
/// this call blocks until the server is reachable and returns valid data.
/// Returns an error only if the URL is malformed or if the provided
/// logical meter configuration is invalid.
pub async fn try_new(
url: impl Into<String>,
config: LogicalMeterConfig,
Expand Down
Loading