Skip to content

Commit b94824d

Browse files
committed
Add flag to end test server while streaming
1 parent 1f83940 commit b94824d

1 file changed

Lines changed: 9 additions & 0 deletions

File tree

  • rust/crates/sift_cli/src/cmd/test_server

rust/crates/sift_cli/src/cmd/test_server/server.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use sift_rs::ingest::v1::{
1414
use sift_rs::ingestion_configs::v2::{ingestion_config_service_server::IngestionConfigService, *};
1515
use sift_rs::ping::v1::{PingRequest, PingResponse, ping_service_server::PingService};
1616
use std::io::stdout;
17+
use std::sync::atomic::AtomicBool;
1718
use std::time::Duration;
1819
use std::{
1920
collections::HashMap,
@@ -30,6 +31,9 @@ use uuid::Uuid;
3031

3132
#[derive(Default)]
3233
pub struct TestServer {
34+
/// Whether the server is done processing streams.
35+
done: AtomicBool,
36+
3337
/// Total number of streams created.
3438
total_num_streams: AtomicU32,
3539

@@ -66,6 +70,7 @@ impl TestServer {
6670
loop {
6771
tokio::select! {
6872
_ = shutdown.changed() => {
73+
self.done.fetch_or(true, Relaxed);
6974
Output::new().line("Metrics task shutting down").print();
7075
break;
7176
}
@@ -289,6 +294,10 @@ impl IngestService for TestServer {
289294
let inner = msg?;
290295
self.total_num_bytes_read
291296
.fetch_add(inner.encoded_len() as u64, Relaxed);
297+
298+
if self.done.load(Relaxed) {
299+
break;
300+
}
292301
}
293302

294303
Ok(Response::new(IngestWithConfigDataStreamResponse::default()))

0 commit comments

Comments
 (0)