Skip to content

Commit 9f99848

Browse files
committed
rust(feat): Add test server to CLI
1 parent b146ea4 commit 9f99848

8 files changed

Lines changed: 596 additions & 1 deletion

File tree

rust/crates/sift_cli/Cargo.toml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,24 @@ flate2 = "1.1.2"
2323
indicatif = "0.18.0"
2424
parquet = "56.2.0"
2525
pbjson-types = { workspace = true }
26+
prost = "0.13.5"
2627
reqwest = "0.12.23"
2728
sift_pbfs = { workspace = true }
2829
sift_rs = { workspace = true }
29-
tokio = { version = "1.47.1", features = ["full", "net", "time"] }
30+
sift_stream = { workspace = true }
31+
tokio = { version = "1.47.1", features = ["full", "net", "time", "macros", "rt-multi-thread", "signal"] }
3032
tokio-stream = "0.1.17"
33+
tonic = { workspace = true }
34+
tonic-reflection = "0.12"
3135
toml = "0.8.23"
3236
zip = "6.0.0"
3337

3438
[dev-dependencies]
3539
indoc = "2.0.6"
40+
41+
[dependencies.uuid]
42+
version = "1.19.0"
43+
features = ["v4"]
44+
45+
[build-dependencies]
46+
tonic-build = "0.12"

rust/crates/sift_cli/build.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/// Build descriptor's so that the Black Hole gRPC server can
2+
/// stand up the reflection service.
3+
fn main() -> Result<(), Box<dyn std::error::Error>> {
4+
tonic_build::configure()
5+
.file_descriptor_set_path("descriptor.bin")
6+
.build_client(false)
7+
.build_server(false)
8+
.compile_protos(
9+
&[
10+
"/tmp/exported-protos/sift/assets/v1/assets.proto",
11+
"/tmp/exported-protos/sift/ping/v1/ping.proto",
12+
"/tmp/exported-protos/sift/ingest/v1/ingest.proto",
13+
"/tmp/exported-protos/sift/ingestion_configs/v2/ingestion_configs.proto",
14+
],
15+
// Run the following command to generate exported-protos:
16+
// buf export ../../../protos --output /tmp/exported-protos
17+
&["/tmp/exported-protos"],
18+
)?;
19+
20+
Ok(())
21+
}

rust/crates/sift_cli/src/cli/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ pub enum Cmd {
4848
/// Import time series files into Sift
4949
#[command(subcommand)]
5050
Import(ImportCmd),
51+
52+
/// Start a test gRPC server for streaming.
53+
#[command(subcommand)]
54+
TestServer(TestServerCmd),
5155
}
5256

5357
#[derive(Subcommand)]
@@ -164,6 +168,27 @@ pub enum ConfigCmd {
164168
Update(ConfigUpdateArgs),
165169
}
166170

171+
#[derive(Subcommand)]
172+
pub enum TestServerCmd {
173+
/// Start a test ingestion server.
174+
Run(TestServerArgs),
175+
}
176+
177+
#[derive(clap::Args)]
178+
pub struct TestServerArgs {
179+
/// The address to serve gRPC server. Default is 127.0.0.1:50051.
180+
#[arg(short, long)]
181+
pub local_address: Option<String>,
182+
183+
/// Whether to stream metrics to Sift.
184+
#[arg(short, long)]
185+
pub stream_metrics: Option<bool>,
186+
187+
/// The asset name to use when streaming server ingestion metrics.
188+
#[arg(short, long)]
189+
pub metrics_asset_name: Option<String>,
190+
}
191+
167192
#[derive(clap::Args)]
168193
pub struct ConfigUpdateArgs {
169194
/// Edit or create a profile interactively (ignores other flags)

rust/crates/sift_cli/src/cmd/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub mod completions;
88
pub mod config;
99
pub mod export;
1010
pub mod import;
11+
pub mod test_server;
1112

1213
pub struct Context {
1314
pub grpc_uri: String,
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use super::Context;
2+
use anyhow::{Ok, anyhow};
3+
use crossterm::style::Stylize;
4+
use sift_stream::{
5+
ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowBuilder, FlowConfig,
6+
IngestionConfigEncoder, IngestionConfigForm, RecoveryStrategy, RetryPolicy, RunForm,
7+
SiftStream, SiftStreamBuilder, TimeValue,
8+
};
9+
10+
/// Streams metrics to Sift.
11+
pub struct MetricsStreamingClient {
12+
ctx: Context,
13+
asset_name: String,
14+
sift_stream: Option<SiftStream<IngestionConfigEncoder>>,
15+
}
16+
17+
impl MetricsStreamingClient {
18+
pub fn build(
19+
ctx: Context,
20+
stream_metrics: &Option<bool>,
21+
asset_name: &Option<String>,
22+
) -> Result<Option<MetricsStreamingClient>, anyhow::Error> {
23+
if !stream_metrics.unwrap_or(false) {
24+
return Ok(None);
25+
}
26+
27+
let Some(asset_name) = asset_name else {
28+
return Err(anyhow!(
29+
"must specify {} with streaming enabled",
30+
"--metrics_asset_name".cyan()
31+
));
32+
};
33+
34+
Ok(Some(MetricsStreamingClient {
35+
ctx: ctx,
36+
asset_name: asset_name.clone(),
37+
sift_stream: None,
38+
}))
39+
}
40+
41+
/// Initialize SiftStream and create ingestion config.
42+
pub async fn initialize(&mut self) -> Result<(), anyhow::Error> {
43+
let credentials = Credentials::Config {
44+
apikey: self.ctx.api_key.clone(),
45+
uri: self.ctx.grpc_uri.clone(),
46+
};
47+
48+
let ingestion_config = IngestionConfigForm {
49+
asset_name: self.asset_name.to_string(),
50+
client_key: "stress-test-ingestion-config-test".into(),
51+
flows: vec![FlowConfig {
52+
name: "metrics".into(),
53+
channels: vec![
54+
ChannelConfig {
55+
name: "total_num_streams".into(),
56+
description: "Total number of streams created".into(),
57+
data_type: ChannelDataType::Uint32.into(),
58+
..Default::default()
59+
},
60+
ChannelConfig {
61+
name: "total_num_bytes_read".into(),
62+
description: "Total number of bytes read".into(),
63+
unit: "B".into(),
64+
data_type: ChannelDataType::Uint64.into(),
65+
..Default::default()
66+
},
67+
ChannelConfig {
68+
name: "total_num_messages".into(),
69+
description: "Total number of messages received".into(),
70+
unit: "message".into(),
71+
data_type: ChannelDataType::Uint64.into(),
72+
..Default::default()
73+
},
74+
ChannelConfig {
75+
name: "bytes_per_s".into(),
76+
description: "Number of bytes received per second".into(),
77+
data_type: ChannelDataType::Double.into(),
78+
unit: "B/s".into(),
79+
..Default::default()
80+
},
81+
ChannelConfig {
82+
name: "messages_per_s".into(),
83+
description: "Number of messages received per second".into(),
84+
unit: "message/s".into(),
85+
data_type: ChannelDataType::Double.into(),
86+
..Default::default()
87+
},
88+
],
89+
}],
90+
};
91+
92+
let sift_stream = SiftStreamBuilder::new(credentials)
93+
.ingestion_config(ingestion_config)
94+
.recovery_strategy(RecoveryStrategy::RetryOnly(RetryPolicy::default()))
95+
.build()
96+
.await?;
97+
98+
self.sift_stream = Some(sift_stream);
99+
100+
Ok(())
101+
}
102+
103+
/// Send metrics to Sift.
104+
pub async fn ingest(&mut self, metrics: Metrics) {
105+
let flow = Flow::new(
106+
"metrics",
107+
TimeValue::now(),
108+
&[
109+
ChannelValue::new("total_num_streams", metrics.total_num_streams),
110+
ChannelValue::new("total_num_bytes_read", metrics.total_num_bytes_read),
111+
ChannelValue::new("total_num_messages", metrics.total_num_messages),
112+
ChannelValue::new("bytes_per_s", metrics.bytes_per_s),
113+
ChannelValue::new("messages_per_s", metrics.messages_per_s),
114+
],
115+
);
116+
117+
self.sift_stream.as_mut().unwrap().send(flow).await.unwrap();
118+
}
119+
}
120+
121+
pub struct Metrics {
122+
pub total_num_streams: u32,
123+
pub total_num_bytes_read: u64,
124+
pub total_num_messages: u64,
125+
pub bytes_per_s: f64,
126+
pub messages_per_s: f64,
127+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use super::Context;
2+
use crate::cmd::test_server::metrics_streaming_client::MetricsStreamingClient;
3+
use crate::{cli::TestServerArgs, util::tty::Output};
4+
use anyhow::Result;
5+
use server::TestServer;
6+
use sift_rs::assets::v1::asset_service_server::AssetServiceServer;
7+
use sift_rs::ingest::v1::ingest_service_server::IngestServiceServer;
8+
use sift_rs::ingestion_configs::v2::ingestion_config_service_server::IngestionConfigServiceServer;
9+
use sift_rs::ping::v1::ping_service_server::PingServiceServer;
10+
use std::process::ExitCode;
11+
use std::sync::Arc;
12+
use tokio::sync::mpsc;
13+
use tokio::sync::watch;
14+
use tonic::transport::Server;
15+
use tonic_reflection::server::Builder;
16+
17+
pub const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../../descriptor.bin");
18+
pub mod metrics_streaming_client;
19+
pub mod server;
20+
use crate::cmd::test_server::metrics_streaming_client::Metrics;
21+
22+
pub async fn run(ctx: Context, args: TestServerArgs) -> Result<ExitCode> {
23+
let local_address = args.local_address.unwrap_or("127.0.0.1:50051".into());
24+
let addr = local_address.parse()?;
25+
26+
// Initialize streaming client.
27+
let mut streaming_client =
28+
MetricsStreamingClient::build(ctx, &args.stream_metrics, &args.metrics_asset_name)?;
29+
if streaming_client.is_some() {
30+
streaming_client.as_mut().unwrap().initialize().await?;
31+
}
32+
33+
// Channel to signal program exit.
34+
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
35+
let mut shutdown_rx2 = shutdown_rx.clone();
36+
37+
// Channel to send metrics.
38+
let (metrics_tx, mut metrics_rx) = mpsc::channel::<Metrics>(1024);
39+
40+
// Initialize gRPC server.
41+
let server = Arc::new(TestServer::default());
42+
43+
// Start task to calculate ingestion metrics.
44+
let server_arc = Arc::clone(&server);
45+
let calc_stats_task = tokio::spawn(async move {
46+
server_arc
47+
.calculate_metrics(
48+
&mut shutdown_rx,
49+
metrics_tx,
50+
args.stream_metrics.unwrap_or(false),
51+
)
52+
.await;
53+
});
54+
55+
// Start task to ingest metrics to Sift.
56+
let ingest_metrics_task = tokio::spawn(async move {
57+
if streaming_client.is_none() {
58+
return;
59+
}
60+
61+
let mut client = streaming_client.unwrap();
62+
loop {
63+
tokio::select! {
64+
_ = shutdown_rx2.changed() => {
65+
Output::new().line("Ingest task shutting down").print();
66+
break;
67+
}
68+
Some(metrics) = metrics_rx.recv() => {
69+
client.ingest(metrics).await;
70+
}
71+
};
72+
}
73+
});
74+
75+
let reflection_service = Builder::configure()
76+
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
77+
.build_v1()?;
78+
79+
Output::new()
80+
.line(format!("Server listening on {addr}"))
81+
.print();
82+
83+
Server::builder()
84+
.add_service(reflection_service)
85+
.add_service(PingServiceServer::from_arc(server.clone()))
86+
.add_service(IngestServiceServer::from_arc(server.clone()))
87+
.add_service(IngestionConfigServiceServer::from_arc(server.clone()))
88+
.add_service(AssetServiceServer::from_arc(server.clone()))
89+
.serve_with_shutdown(addr, async move {
90+
tokio::signal::ctrl_c().await.unwrap();
91+
let _ = shutdown_tx.send(true);
92+
})
93+
.await?;
94+
95+
calc_stats_task.await?;
96+
ingest_metrics_task.await?;
97+
98+
Output::new().line("Exiting.").print();
99+
100+
Ok(ExitCode::SUCCESS)
101+
}

0 commit comments

Comments
 (0)