Skip to content

Commit 1cb99a6

Browse files
authored
Keep retrying on client connection error (#10)
When a streaming method was returning immediately, the client exited leading to the logical meter exiting as well. This PR fixes it.
2 parents ce5ff11 + a773d05 commit 1cb99a6

2 files changed

Lines changed: 25 additions & 22 deletions

File tree

src/client/microgrid_client_actor.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ async fn handle_instruction(
148148
tx,
149149
stream_status_tx,
150150
)
151-
.await?;
151+
.await;
152152

153153
response_tx.send(rx).map_err(|_| {
154154
tracing::error!("failed to send response");
@@ -220,7 +220,7 @@ async fn handle_retry_timer(
220220
tx,
221221
stream_status_tx.clone(),
222222
)
223-
.await?;
223+
.await;
224224
} else {
225225
tracing::error!("Component stream not found for retry: {component_id}");
226226
return Err(Error::internal(format!(
@@ -239,7 +239,7 @@ async fn start_electrical_component_telemetry_stream(
239239
electrical_component_id: u64,
240240
tx: broadcast::Sender<ElectricalComponentTelemetry>,
241241
stream_status_tx: mpsc::Sender<StreamStatus>,
242-
) -> Result<(), Error> {
242+
) {
243243
let stream = match client
244244
.receive_electrical_component_telemetry_stream(
245245
ReceiveElectricalComponentTelemetryStreamRequest {
@@ -251,28 +251,24 @@ async fn start_electrical_component_telemetry_stream(
251251
{
252252
Ok(s) => s.into_inner(),
253253
Err(e) => {
254-
stream_status_tx
254+
let _ = stream_status_tx
255255
.send(StreamStatus::Failed(electrical_component_id))
256-
.await
257-
.map_err(|e| {
258-
Error::connection_failure(format!(
259-
"receive_component_data_stream failed for {electrical_component_id}: {e}",
260-
))
261-
})?;
262-
return Err(Error::connection_failure(format!(
263-
"receive_component_data_stream failed for {electrical_component_id}: {e}",
264-
)));
256+
.await;
257+
258+
tracing::debug!("Failed to start telemetry stream for {electrical_component_id}: {e}",);
259+
return;
265260
}
266261
};
267262

268-
stream_status_tx
263+
if let Err(e) = stream_status_tx
269264
.send(StreamStatus::Connected(electrical_component_id))
270265
.await
271-
.map_err(|e| {
272-
Error::connection_failure(format!(
273-
"Failed to send stream recovered message for {electrical_component_id}: {e}",
274-
))
275-
})?;
266+
{
267+
tracing::error!(
268+
"Failed to send stream connected message for {electrical_component_id}: {e}",
269+
);
270+
return;
271+
}
276272

277273
// create a task to fetch data from the stream in a loop and put into a channel.
278274
tokio::spawn(
@@ -284,7 +280,6 @@ async fn start_electrical_component_telemetry_stream(
284280
)
285281
.in_current_span(),
286282
);
287-
Ok(())
288283
}
289284

290285
async fn run_electrical_component_telemetry_stream(

src/proto.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
66
mod graph;
77

8-
#[allow(clippy::doc_lazy_continuation, clippy::doc_overindented_list_items)]
8+
#[allow(
9+
clippy::doc_lazy_continuation,
10+
clippy::doc_overindented_list_items,
11+
dead_code
12+
)]
913
pub mod common {
1014
pub mod v1alpha8 {
1115
pub mod grid {
@@ -40,7 +44,11 @@ pub mod common {
4044
}
4145
}
4246

43-
#[allow(clippy::doc_lazy_continuation, clippy::doc_overindented_list_items)]
47+
#[allow(
48+
clippy::doc_lazy_continuation,
49+
clippy::doc_overindented_list_items,
50+
dead_code
51+
)]
4452
pub mod microgrid {
4553
pub mod v1alpha18 {
4654
#![allow(clippy::derive_partial_eq_without_eq)]

0 commit comments

Comments
 (0)