Skip to content

Commit 43b89d6

Browse files
committed
Implement PR feedback
1 parent 9f99848 commit 43b89d6

3 files changed

Lines changed: 46 additions & 37 deletions

File tree

rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use super::Context;
22
use anyhow::{Ok, anyhow};
33
use crossterm::style::Stylize;
44
use sift_stream::{
5-
ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowBuilder, FlowConfig,
6-
IngestionConfigEncoder, IngestionConfigForm, RecoveryStrategy, RetryPolicy, RunForm,
7-
SiftStream, SiftStreamBuilder, TimeValue,
5+
ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowConfig,
6+
IngestionConfigEncoder, IngestionConfigForm, RecoveryStrategy, RetryPolicy, SiftStream,
7+
SiftStreamBuilder, TimeValue,
88
};
99

1010
/// Streams metrics to Sift.
@@ -32,7 +32,7 @@ impl MetricsStreamingClient {
3232
};
3333

3434
Ok(Some(MetricsStreamingClient {
35-
ctx: ctx,
35+
ctx,
3636
asset_name: asset_name.clone(),
3737
sift_stream: None,
3838
}))

rust/crates/sift_cli/src/cmd/test_server/mod.rs

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::Context;
22
use crate::cmd::test_server::metrics_streaming_client::MetricsStreamingClient;
33
use crate::{cli::TestServerArgs, util::tty::Output};
4-
use anyhow::Result;
4+
use anyhow::{Context as AnyhowContext, Result};
55
use server::TestServer;
66
use sift_rs::assets::v1::asset_service_server::AssetServiceServer;
77
use sift_rs::ingest::v1::ingest_service_server::IngestServiceServer;
@@ -20,14 +20,23 @@ pub mod server;
2020
use crate::cmd::test_server::metrics_streaming_client::Metrics;
2121

2222
pub async fn run(ctx: Context, args: TestServerArgs) -> Result<ExitCode> {
23-
let local_address = args.local_address.unwrap_or("127.0.0.1:50051".into());
24-
let addr = local_address.parse()?;
23+
let local_address = args
24+
.local_address
25+
.unwrap_or_else(|| "0.0.0.0:50051".to_string());
26+
let addr = local_address
27+
.parse()
28+
.context(format!("failed to parse local_address: {}", local_address))?;
2529

2630
// Initialize streaming client.
2731
let mut streaming_client =
28-
MetricsStreamingClient::build(ctx, &args.stream_metrics, &args.metrics_asset_name)?;
29-
if streaming_client.is_some() {
30-
streaming_client.as_mut().unwrap().initialize().await?;
32+
MetricsStreamingClient::build(ctx, &args.stream_metrics, &args.metrics_asset_name)
33+
.context("failed to create metrics streaming client")?;
34+
35+
if let Some(client) = streaming_client.as_mut() {
36+
client
37+
.initialize()
38+
.await
39+
.context("failed to initialize streaming client")?;
3140
}
3241

3342
// Channel to signal program exit.
@@ -54,27 +63,25 @@ pub async fn run(ctx: Context, args: TestServerArgs) -> Result<ExitCode> {
5463

5564
// Start task to ingest metrics to Sift.
5665
let ingest_metrics_task = tokio::spawn(async move {
57-
if streaming_client.is_none() {
58-
return;
59-
}
60-
61-
let mut client = streaming_client.unwrap();
62-
loop {
63-
tokio::select! {
64-
_ = shutdown_rx2.changed() => {
65-
Output::new().line("Ingest task shutting down").print();
66-
break;
67-
}
68-
Some(metrics) = metrics_rx.recv() => {
69-
client.ingest(metrics).await;
70-
}
71-
};
66+
if let Some(client) = streaming_client.as_mut() {
67+
loop {
68+
tokio::select! {
69+
_ = shutdown_rx2.changed() => {
70+
Output::new().line("Ingest task shutting down").print();
71+
break;
72+
}
73+
Some(metrics) = metrics_rx.recv() => {
74+
client.ingest(metrics).await;
75+
}
76+
};
77+
}
7278
}
7379
});
7480

7581
let reflection_service = Builder::configure()
7682
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
77-
.build_v1()?;
83+
.build_v1()
84+
.context("failed to create gRPC reflection service")?;
7885

7986
Output::new()
8087
.line(format!("Server listening on {addr}"))
@@ -92,8 +99,12 @@ pub async fn run(ctx: Context, args: TestServerArgs) -> Result<ExitCode> {
9299
})
93100
.await?;
94101

95-
calc_stats_task.await?;
96-
ingest_metrics_task.await?;
102+
calc_stats_task
103+
.await
104+
.context("failed to await calculation task")?;
105+
ingest_metrics_task
106+
.await
107+
.context("failed to await ingestion task")?;
97108

98109
Output::new().line("Exiting.").print();
99110

rust/crates/sift_cli/src/cmd/test_server/server.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ impl TestServer {
5858
metrics_tx: Sender<Metrics>,
5959
streaming_enabled: bool,
6060
) {
61+
let mut stdout = stdout();
62+
6163
let mut last_total_num_bytes_read: u64 = 0;
6264
let mut last_total_num_messages: u64 = 0;
6365

@@ -79,12 +81,12 @@ impl TestServer {
7981
last_total_num_messages = current_total_num_messages;
8082

8183
// Clear terminal and print metrics.
82-
stdout()
84+
stdout
8385
.execute(terminal::Clear(terminal::ClearType::All))
8486
.expect("");
85-
stdout().execute(cursor::MoveTo(0, 0)).expect("msg");
86-
stdout().execute(cursor::MoveUp(5)).expect("terminal error");
87-
stdout().execute(terminal::Clear(terminal::ClearType::FromCursorDown)).expect("msg");
87+
stdout.execute(cursor::MoveTo(0, 0)).expect("msg");
88+
stdout.execute(cursor::MoveUp(5)).expect("terminal error");
89+
stdout.execute(terminal::Clear(terminal::ClearType::FromCursorDown)).expect("msg");
8890

8991
Output::new().line(format!("Total num streams: {current_total_num_streams}")).print();
9092
Output::new().line(format!("Total num bytes: {current_total_num_bytes_read}")).print();
@@ -115,11 +117,7 @@ impl TestServer {
115117
#[tonic::async_trait]
116118
impl PingService for TestServer {
117119
async fn ping(&self, _request: Request<PingRequest>) -> Result<Response<PingResponse>, Status> {
118-
let resp = PingResponse {
119-
response: "".into(),
120-
};
121-
122-
Ok(Response::new(resp))
120+
Ok(Response::new(PingResponse::default()))
123121
}
124122
}
125123

0 commit comments

Comments
 (0)