Skip to content

Commit 74faef5

Browse files
authored
Add tests for the logical meter (#13)
2 parents 122626d + ff8ce5d commit 74faef5

14 files changed

Lines changed: 2152 additions & 433 deletions

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ chrono = "0.4"
1313
frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "0b28f99" }
1414
frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "463414a" }
1515
frequenz-resampling = { git = "https://github.com/frequenz-floss/frequenz-resampling-rs.git", rev = "ce84d66" }
16+
futures = "0.3.31"
1617
prost = "0.13"
1718
prost-types = "0.13"
1819
tokio = { version = "1.45", features = ["rt", "rt-multi-thread"] }
@@ -22,6 +23,9 @@ tracing-subscriber = { version = "0.3" }
2223

2324
[dev-dependencies]
2425
tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] }
26+
tokio = { version = "1.45", features = ["test-util"] }
27+
tokio-stream = { version = "0.1.17", features = ["sync"] }
28+
2529

2630
[build-dependencies]
2731
prost-build = { version = "0.13", features = ["cleanup-markdown"] }

examples/logical_meter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ async fn main() -> Result<(), Error> {
1818
.with(fmt::layer().with_file(true).with_line_number(true))
1919
.init();
2020

21-
let client = MicrogridClientHandle::new("http://[::1]:8800");
21+
let client = MicrogridClientHandle::try_new("http://[::1]:8800").await?;
2222
let mut logical_meter = LogicalMeterHandle::try_new(
2323
client,
2424
LogicalMeterConfig {

src/client.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,11 @@ mod instruction;
77
mod microgrid_client_actor;
88
mod retry_tracker;
99

10+
mod microgrid_api_client;
11+
pub use microgrid_api_client::MicrogridApiClient;
12+
1013
mod microgrid_client_handle;
1114
pub use microgrid_client_handle::MicrogridClientHandle;
15+
16+
#[cfg(test)]
17+
pub(crate) mod test_utils;

src/client/microgrid_api_client.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// License: MIT
2+
// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
//! The microgrid API client trait and its implementation for the generated gRPC
5+
//! client.
6+
7+
use tonic::transport::Channel;
8+
9+
use crate::proto::microgrid::v1alpha18::{
10+
ListElectricalComponentConnectionsRequest, ListElectricalComponentConnectionsResponse,
11+
ListElectricalComponentsRequest, ListElectricalComponentsResponse,
12+
ReceiveElectricalComponentTelemetryStreamRequest,
13+
ReceiveElectricalComponentTelemetryStreamResponse,
14+
};
15+
16+
/// A trait representing the microgrid API client.
17+
///
18+
/// Includes all the client methods that are used by the `MicrogridClientActor`.
19+
#[async_trait::async_trait]
20+
pub trait MicrogridApiClient: Send + Sync + 'static {
21+
async fn list_electrical_components(
22+
&mut self,
23+
request: impl tonic::IntoRequest<ListElectricalComponentsRequest> + Send,
24+
) -> std::result::Result<tonic::Response<ListElectricalComponentsResponse>, tonic::Status>;
25+
26+
async fn list_electrical_component_connections(
27+
&mut self,
28+
request: impl tonic::IntoRequest<ListElectricalComponentConnectionsRequest> + Send,
29+
) -> std::result::Result<
30+
tonic::Response<ListElectricalComponentConnectionsResponse>,
31+
tonic::Status,
32+
>;
33+
34+
type TelemetryStream: futures::Stream<
35+
Item = std::result::Result<
36+
ReceiveElectricalComponentTelemetryStreamResponse,
37+
tonic::Status,
38+
>,
39+
> + Send
40+
+ Unpin
41+
+ 'static;
42+
43+
async fn receive_electrical_component_telemetry_stream(
44+
&mut self,
45+
request: impl tonic::IntoRequest<ReceiveElectricalComponentTelemetryStreamRequest> + Send,
46+
) -> std::result::Result<tonic::Response<Self::TelemetryStream>, tonic::Status>;
47+
}
48+
49+
/// Implement the MicrogridApiClient trait for the generated gRPC client.
50+
///
51+
/// Forwards calls to the underlying gRPC client methods, without any additional logic.
52+
#[async_trait::async_trait]
53+
impl MicrogridApiClient
54+
for crate::proto::microgrid::v1alpha18::microgrid_client::MicrogridClient<Channel>
55+
{
56+
async fn list_electrical_components(
57+
&mut self,
58+
request: impl tonic::IntoRequest<ListElectricalComponentsRequest> + Send,
59+
) -> std::result::Result<tonic::Response<ListElectricalComponentsResponse>, tonic::Status> {
60+
self.list_electrical_components(request).await
61+
}
62+
63+
async fn list_electrical_component_connections(
64+
&mut self,
65+
request: impl tonic::IntoRequest<ListElectricalComponentConnectionsRequest> + Send,
66+
) -> std::result::Result<
67+
tonic::Response<ListElectricalComponentConnectionsResponse>,
68+
tonic::Status,
69+
> {
70+
self.list_electrical_component_connections(request).await
71+
}
72+
73+
type TelemetryStream =
74+
tonic::codec::Streaming<ReceiveElectricalComponentTelemetryStreamResponse>;
75+
76+
async fn receive_electrical_component_telemetry_stream(
77+
&mut self,
78+
request: impl tonic::IntoRequest<ReceiveElectricalComponentTelemetryStreamRequest> + Send,
79+
) -> std::result::Result<tonic::Response<Self::TelemetryStream>, tonic::Status> {
80+
self.receive_electrical_component_telemetry_stream(request)
81+
.await
82+
}
83+
}

src/client/microgrid_client_actor.rs

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! The microgrid client actor that handles communication with the microgrid API.
55
66
use crate::{
7-
client::{instruction::Instruction, retry_tracker::RetryTracker},
7+
client::{MicrogridApiClient, instruction::Instruction, retry_tracker::RetryTracker},
88
proto::microgrid::v1alpha18::{
99
ListElectricalComponentConnectionsRequest, ListElectricalComponentsRequest,
1010
ReceiveElectricalComponentTelemetryStreamRequest,
@@ -13,19 +13,15 @@ use crate::{
1313
};
1414
use std::collections::HashMap;
1515

16+
use futures::{Stream, StreamExt};
1617
use tokio::{
1718
select,
1819
sync::{broadcast, mpsc},
1920
};
20-
use tonic::transport::Channel;
2121
use tracing::Instrument as _;
2222

2323
use crate::{
24-
Error,
25-
proto::{
26-
common::v1alpha8::microgrid::electrical_components::ElectricalComponentTelemetry,
27-
microgrid::v1alpha18::microgrid_client::MicrogridClient,
28-
},
24+
Error, proto::common::v1alpha8::microgrid::electrical_components::ElectricalComponentTelemetry,
2925
};
3026

3127
enum StreamStatus {
@@ -39,28 +35,20 @@ enum StreamStatus {
3935
///
4036
/// It allows there to be multiple `MicrogridClientHandle` instances, all
4137
/// sharing the same connection to the microgrid API.
42-
pub(super) struct MicrogridClientActor {
43-
url: String,
38+
pub(super) struct MicrogridClientActor<T> {
39+
client: T,
4440
instructions_rx: mpsc::Receiver<Instruction>,
4541
}
4642

47-
impl MicrogridClientActor {
48-
pub(super) fn new(url: String, instructions_rx: mpsc::Receiver<Instruction>) -> Self {
43+
impl<T: MicrogridApiClient> MicrogridClientActor<T> {
44+
pub(super) fn new_from_client(client: T, instructions_rx: mpsc::Receiver<Instruction>) -> Self {
4945
Self {
50-
url,
46+
client,
5147
instructions_rx,
5248
}
5349
}
5450

5551
pub(super) async fn run(mut self) {
56-
let mut client = match MicrogridClient::<Channel>::connect(self.url).await {
57-
Ok(t) => t,
58-
Err(e) => {
59-
tracing::error!("Could not connect to server: {e}");
60-
return;
61-
}
62-
};
63-
6452
let mut component_streams: HashMap<u64, broadcast::Sender<ElectricalComponentTelemetry>> =
6553
HashMap::new();
6654

@@ -73,7 +61,7 @@ impl MicrogridClientActor {
7361
select! {
7462
instruction = self.instructions_rx.recv() => {
7563
if let Err(e) = handle_instruction(
76-
&mut client,
64+
&mut self.client,
7765
&mut component_streams,
7866
instruction,
7967
stream_status_tx.clone(),
@@ -102,7 +90,7 @@ impl MicrogridClientActor {
10290
}
10391
now = retry_timer.tick() => {
10492
if let Err(e) = handle_retry_timer(
105-
&mut client,
93+
&mut self.client,
10694
&mut component_streams,
10795
&mut components_to_retry,
10896
stream_status_tx.clone(),
@@ -117,8 +105,8 @@ impl MicrogridClientActor {
117105
}
118106

119107
/// Handles the instructions received from the `MicrogridClientHandle` instances.
120-
async fn handle_instruction(
121-
client: &mut MicrogridClient<Channel>,
108+
async fn handle_instruction<T: MicrogridApiClient>(
109+
client: &mut T,
122110
component_streams: &mut HashMap<u64, broadcast::Sender<ElectricalComponentTelemetry>>,
123111
instruction: Option<Instruction>,
124112
stream_status_tx: mpsc::Sender<StreamStatus>,
@@ -199,8 +187,8 @@ async fn handle_instruction(
199187

200188
/// Handles the retry timer, checking if the data streams for any components
201189
/// need to be retried and restarting their streaming tasks if necessary.
202-
async fn handle_retry_timer(
203-
client: &mut MicrogridClient<Channel>,
190+
async fn handle_retry_timer<T: MicrogridApiClient>(
191+
client: &mut T,
204192
component_streams: &mut HashMap<u64, broadcast::Sender<ElectricalComponentTelemetry>>,
205193
components_to_retry: &mut HashMap<u64, RetryTracker>,
206194
stream_status_tx: mpsc::Sender<StreamStatus>,
@@ -234,8 +222,8 @@ async fn handle_retry_timer(
234222

235223
/// Creates a new data stream for the given component ID and starts a task to
236224
/// fetch data from it in a loop.
237-
async fn start_electrical_component_telemetry_stream(
238-
client: &mut MicrogridClient<Channel>,
225+
async fn start_electrical_component_telemetry_stream<T: MicrogridApiClient>(
226+
client: &mut T,
239227
electrical_component_id: u64,
240228
tx: broadcast::Sender<ElectricalComponentTelemetry>,
241229
stream_status_tx: mpsc::Sender<StreamStatus>,
@@ -283,7 +271,9 @@ async fn start_electrical_component_telemetry_stream(
283271
}
284272

285273
async fn run_electrical_component_telemetry_stream(
286-
mut stream: tonic::Streaming<ReceiveElectricalComponentTelemetryStreamResponse>,
274+
mut stream: impl Stream<
275+
Item = Result<ReceiveElectricalComponentTelemetryStreamResponse, tonic::Status>,
276+
> + Unpin,
287277
electrical_component_id: u64,
288278
tx: broadcast::Sender<ElectricalComponentTelemetry>,
289279
stream_status_tx: mpsc::Sender<StreamStatus>,
@@ -306,30 +296,30 @@ async fn run_electrical_component_telemetry_stream(
306296
});
307297
return;
308298
}
309-
let message = match stream.message().await {
310-
Ok(m) => m,
311-
Err(e) => {
299+
let message = match stream.next().await {
300+
Some(m) => m,
301+
None => {
312302
tracing::error!(
313-
"get_component_data stream failed for {:?}: {:?}",
303+
"get_component_data stream failed for {}",
314304
electrical_component_id,
315-
e
316305
);
317306
break;
318307
}
319308
};
320309
let data = match message {
321-
Some(ReceiveElectricalComponentTelemetryStreamResponse { telemetry: Some(d) }) => d,
322-
Some(ReceiveElectricalComponentTelemetryStreamResponse { telemetry: None }) => {
310+
Ok(ReceiveElectricalComponentTelemetryStreamResponse { telemetry: Some(d) }) => d,
311+
Ok(ReceiveElectricalComponentTelemetryStreamResponse { telemetry: None }) => {
323312
tracing::warn!(
324313
"get_component_data stream returned empty data for {}",
325314
electrical_component_id
326315
);
327316
continue;
328317
}
329-
None => {
318+
Err(e) => {
330319
tracing::warn!(
331-
"get_component_data stream ended for {:?}",
332-
electrical_component_id
320+
"get_component_data stream ended for {}: {:?}",
321+
electrical_component_id,
322+
e
333323
);
334324
break;
335325
}

0 commit comments

Comments
 (0)