Skip to content

Commit bcf19c2

Browse files
committed
Rename stream status channel for clarity
The channel used to communicate the status of a component data stream was called `stream_stopped_tx/rx`, which is no longer accurate. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent adac1a1 commit bcf19c2

1 file changed

Lines changed: 15 additions & 15 deletions

File tree

src/client/microgrid_client_actor.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl MicrogridClientActor {
5959

6060
let mut component_streams: HashMap<u64, broadcast::Sender<ComponentData>> = HashMap::new();
6161

62-
let (stream_stopped_tx, mut stream_stopped_rx) = mpsc::channel(50);
62+
let (stream_status_tx, mut stream_status_rx) = mpsc::channel(50);
6363
let mut retry_timer = tokio::time::interval(std::time::Duration::from_secs(1));
6464
retry_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
6565
let mut components_to_retry = HashMap::new();
@@ -71,12 +71,12 @@ impl MicrogridClientActor {
7171
&mut client,
7272
&mut component_streams,
7373
instruction,
74-
stream_stopped_tx.clone(),
74+
stream_status_tx.clone(),
7575
).await {
7676
tracing::error!("MicrogridClientActor: Error handling instruction: {e}");
7777
}
7878
}
79-
stream_status = stream_stopped_rx.recv() => {
79+
stream_status = stream_status_rx.recv() => {
8080
match stream_status {
8181
Some(StreamStatus::Failed(component_id)) => {
8282
components_to_retry.entry(component_id).or_insert_with(
@@ -100,7 +100,7 @@ impl MicrogridClientActor {
100100
&mut client,
101101
&mut component_streams,
102102
&mut components_to_retry,
103-
stream_stopped_tx.clone(),
103+
stream_status_tx.clone(),
104104
now,
105105
).await {
106106
tracing::error!("MicrogridClientActor: Error handling retry timer: {e}");
@@ -116,7 +116,7 @@ async fn handle_instruction(
116116
client: &mut MicrogridClient<Channel>,
117117
component_streams: &mut HashMap<u64, broadcast::Sender<ComponentData>>,
118118
instruction: Option<Instruction>,
119-
stream_stopped_tx: mpsc::Sender<StreamStatus>,
119+
stream_status_tx: mpsc::Sender<StreamStatus>,
120120
) -> Result<(), Error> {
121121
match instruction {
122122
Some(Instruction::GetComponentDataStream {
@@ -137,7 +137,7 @@ async fn handle_instruction(
137137
// API service into the channel.
138138
let (tx, rx) = broadcast::channel::<ComponentData>(100);
139139
component_streams.insert(component_id, tx.clone());
140-
start_component_data_stream(client, component_id, tx, stream_stopped_tx).await?;
140+
start_component_data_stream(client, component_id, tx, stream_status_tx).await?;
141141

142142
response_tx.send(rx).map_err(|_| {
143143
tracing::error!("failed to send response");
@@ -189,7 +189,7 @@ async fn handle_retry_timer(
189189
client: &mut MicrogridClient<Channel>,
190190
component_streams: &mut HashMap<u64, broadcast::Sender<ComponentData>>,
191191
components_to_retry: &mut HashMap<u64, RetryTracker>,
192-
stream_stopped_tx: mpsc::Sender<StreamStatus>,
192+
stream_status_tx: mpsc::Sender<StreamStatus>,
193193
now: tokio::time::Instant,
194194
) -> Result<(), Error> {
195195
for item in components_to_retry.iter_mut() {
@@ -200,7 +200,7 @@ async fn handle_retry_timer(
200200
item.1.mark_new_retry();
201201
let (component_id, _) = item;
202202
if let Some(tx) = component_streams.get(component_id).cloned() {
203-
start_component_data_stream(client, *component_id, tx, stream_stopped_tx.clone())
203+
start_component_data_stream(client, *component_id, tx, stream_status_tx.clone())
204204
.await?;
205205
} else {
206206
tracing::error!("Component stream not found for retry: {component_id}");
@@ -219,7 +219,7 @@ async fn start_component_data_stream(
219219
client: &mut MicrogridClient<Channel>,
220220
component_id: u64,
221221
tx: broadcast::Sender<ComponentData>,
222-
stream_stopped_tx: mpsc::Sender<StreamStatus>,
222+
stream_status_tx: mpsc::Sender<StreamStatus>,
223223
) -> Result<(), Error> {
224224
let stream = match client
225225
.receive_component_data_stream(ReceiveComponentDataStreamRequest {
@@ -230,7 +230,7 @@ async fn start_component_data_stream(
230230
{
231231
Ok(s) => s.into_inner(),
232232
Err(e) => {
233-
stream_stopped_tx
233+
stream_status_tx
234234
.send(StreamStatus::Failed(component_id))
235235
.await
236236
.map_err(|e| {
@@ -244,7 +244,7 @@ async fn start_component_data_stream(
244244
}
245245
};
246246

247-
stream_stopped_tx
247+
stream_status_tx
248248
.send(StreamStatus::Connected(component_id))
249249
.await
250250
.map_err(|e| {
@@ -255,7 +255,7 @@ async fn start_component_data_stream(
255255

256256
// create a task to fetch data from the stream in a loop and put into a channel.
257257
tokio::spawn(
258-
run_component_data_stream(stream, component_id, tx, stream_stopped_tx).in_current_span(),
258+
run_component_data_stream(stream, component_id, tx, stream_status_tx).in_current_span(),
259259
);
260260
Ok(())
261261
}
@@ -264,15 +264,15 @@ async fn run_component_data_stream(
264264
mut stream: tonic::Streaming<ReceiveComponentDataStreamResponse>,
265265
component_id: u64,
266266
tx: broadcast::Sender<ComponentData>,
267-
stream_stopped_tx: mpsc::Sender<StreamStatus>,
267+
stream_status_tx: mpsc::Sender<StreamStatus>,
268268
) {
269269
loop {
270270
if tx.receiver_count() == 0 {
271271
tracing::debug!(
272272
"Dropping ComponentData stream for component_id:{:?}",
273273
component_id
274274
);
275-
stream_stopped_tx
275+
stream_status_tx
276276
.send(StreamStatus::Ended(component_id))
277277
.await
278278
.unwrap_or_else(|e| {
@@ -315,7 +315,7 @@ async fn run_component_data_stream(
315315
};
316316
}
317317

318-
if let Err(e) = stream_stopped_tx
318+
if let Err(e) = stream_status_tx
319319
.send(StreamStatus::Failed(component_id))
320320
.await
321321
{

0 commit comments

Comments
 (0)