Skip to content

Commit 48eee27

Browse files
committed
End ComponentData streams that have no receivers
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 569b110 commit 48eee27

1 file changed

Lines changed: 23 additions & 10 deletions

File tree

src/client/microgrid_client_actor.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::{
2727
enum StreamStatus {
2828
Failed(u64),
2929
Connected(u64),
30+
Ended(u64),
3031
}
3132

3233
/// This actor owns the connection to the microgrid API and processes instructions
@@ -85,6 +86,9 @@ impl MicrogridClientActor {
8586
Some(StreamStatus::Connected(component_id)) => {
8687
components_to_retry.remove(&component_id);
8788
}
89+
Some(StreamStatus::Ended(component_id)) => {
90+
components_to_retry.remove(&component_id);
91+
}
8892
None => {
8993
tracing::error!("MicrogridClientActor: Stream status channel closed, exiting.");
9094
return;
@@ -263,6 +267,23 @@ async fn run_component_data_stream(
263267
stream_stopped_tx: mpsc::Sender<StreamStatus>,
264268
) {
265269
loop {
270+
if tx.receiver_count() == 0 {
271+
tracing::debug!(
272+
"Dropping ComponentData stream for component_id:{:?}",
273+
component_id
274+
);
275+
stream_stopped_tx
276+
.send(StreamStatus::Ended(component_id))
277+
.await
278+
.unwrap_or_else(|e| {
279+
tracing::error!(
280+
"Failed to send stream ended message for {:?}: {:?}",
281+
component_id,
282+
e
283+
);
284+
});
285+
return;
286+
}
266287
let message = match stream.message().await {
267288
Ok(m) => m,
268289
Err(e) => {
@@ -289,16 +310,8 @@ async fn run_component_data_stream(
289310
}
290311
};
291312

292-
match tx.send(data) {
293-
Ok(_) => {}
294-
Err(e) => {
295-
tracing::error!(
296-
"Unable to send component data for {:?}: {}. Closing stream.",
297-
component_id,
298-
e
299-
);
300-
break;
301-
}
313+
if tx.send(data).is_err() {
314+
continue;
302315
};
303316
}
304317

0 commit comments

Comments
 (0)