Skip to content

Commit 3c21e9c

Browse files
committed
Keep retrying on client connection error
When a streaming method was returning immediately, the client exited leading to the logical meter exiting as well. This commit fixes it. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent ce5ff11 commit 3c21e9c

1 file changed

Lines changed: 15 additions & 20 deletions

File tree

src/client/microgrid_client_actor.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ async fn handle_instruction(
148148
tx,
149149
stream_status_tx,
150150
)
151-
.await?;
151+
.await;
152152

153153
response_tx.send(rx).map_err(|_| {
154154
tracing::error!("failed to send response");
@@ -220,7 +220,7 @@ async fn handle_retry_timer(
220220
tx,
221221
stream_status_tx.clone(),
222222
)
223-
.await?;
223+
.await;
224224
} else {
225225
tracing::error!("Component stream not found for retry: {component_id}");
226226
return Err(Error::internal(format!(
@@ -239,7 +239,7 @@ async fn start_electrical_component_telemetry_stream(
239239
electrical_component_id: u64,
240240
tx: broadcast::Sender<ElectricalComponentTelemetry>,
241241
stream_status_tx: mpsc::Sender<StreamStatus>,
242-
) -> Result<(), Error> {
242+
) {
243243
let stream = match client
244244
.receive_electrical_component_telemetry_stream(
245245
ReceiveElectricalComponentTelemetryStreamRequest {
@@ -251,28 +251,24 @@ async fn start_electrical_component_telemetry_stream(
251251
{
252252
Ok(s) => s.into_inner(),
253253
Err(e) => {
254-
stream_status_tx
254+
let _ = stream_status_tx
255255
.send(StreamStatus::Failed(electrical_component_id))
256-
.await
257-
.map_err(|e| {
258-
Error::connection_failure(format!(
259-
"receive_component_data_stream failed for {electrical_component_id}: {e}",
260-
))
261-
})?;
262-
return Err(Error::connection_failure(format!(
263-
"receive_component_data_stream failed for {electrical_component_id}: {e}",
264-
)));
256+
.await;
257+
258+
tracing::debug!("Failed to start telemetry stream for {electrical_component_id}: {e}",);
259+
return;
265260
}
266261
};
267262

268-
stream_status_tx
263+
if let Err(e) = stream_status_tx
269264
.send(StreamStatus::Connected(electrical_component_id))
270265
.await
271-
.map_err(|e| {
272-
Error::connection_failure(format!(
273-
"Failed to send stream recovered message for {electrical_component_id}: {e}",
274-
))
275-
})?;
266+
{
267+
tracing::error!(
268+
"Failed to send stream connected message for {electrical_component_id}: {e}",
269+
);
270+
return;
271+
}
276272

277273
// create a task to fetch data from the stream in a loop and put into a channel.
278274
tokio::spawn(
@@ -284,7 +280,6 @@ async fn start_electrical_component_telemetry_stream(
284280
)
285281
.in_current_span(),
286282
);
287-
Ok(())
288283
}
289284

290285
async fn run_electrical_component_telemetry_stream(

0 commit comments

Comments
 (0)