11use crate :: cmd:: test_server:: metrics_streaming_client:: Metrics ;
22use crate :: util:: tty:: Output ;
3- use anyhow:: Result ;
3+ use anyhow:: { Context , Ok as AnyhowOk , Result as AnyhowResult } ;
44use crossterm:: { ExecutableCommand , cursor, terminal} ;
55use prost:: Message ;
66use sift_rs:: assets:: v1:: {
@@ -61,7 +61,7 @@ impl TestServer {
6161 shutdown : & mut watch:: Receiver < bool > ,
6262 metrics_tx : Sender < Metrics > ,
6363 streaming_enabled : bool ,
64- ) {
64+ ) -> AnyhowResult < ( ) > {
6565 let mut stdout = stdout ( ) ;
6666
6767 let mut last_total_num_bytes_read: u64 = 0 ;
@@ -72,10 +72,10 @@ impl TestServer {
7272 _ = shutdown. changed( ) => {
7373 self . done. fetch_or( true , Relaxed ) ;
7474 Output :: new( ) . line( "Metrics task shutting down" ) . print( ) ;
75- break ;
75+ return AnyhowOk ( ( ) ) ;
7676 }
7777
78- _ = tokio:: time:: sleep( Duration :: from_millis ( 100 ) ) => {
78+ _ = tokio:: time:: sleep( Duration :: from_secs ( 1 ) ) => {
7979 let current_total_num_bytes_read = self . total_num_bytes_read. load( Relaxed ) ;
8080 let current_total_num_messages = self . total_num_messages. load( Relaxed ) ;
8181 let current_total_num_streams = self . total_num_streams. load( Relaxed ) ;
@@ -88,10 +88,13 @@ impl TestServer {
8888 // Clear terminal and print metrics.
8989 stdout
9090 . execute( terminal:: Clear ( terminal:: ClearType :: All ) )
91- . expect( "" ) ;
92- stdout. execute( cursor:: MoveTo ( 0 , 0 ) ) . expect( "msg" ) ;
93- stdout. execute( cursor:: MoveUp ( 5 ) ) . expect( "terminal error" ) ;
94- stdout. execute( terminal:: Clear ( terminal:: ClearType :: FromCursorDown ) ) . expect( "msg" ) ;
91+ . context( "failed to clear terminal" ) ?;
92+ stdout. execute( cursor:: MoveTo ( 0 , 0 ) )
93+ . context( "failed to move terminal cursor" ) ?;
94+ stdout. execute( cursor:: MoveUp ( 5 ) )
95+ . context( "failed to move terminal cursor" ) ?;
96+ stdout. execute( terminal:: Clear ( terminal:: ClearType :: FromCursorDown ) )
97+ . context( "failed to move terminal cursor" ) ?;
9598
9699 Output :: new( ) . line( format!( "Total num streams: {current_total_num_streams}" ) ) . print( ) ;
97100 Output :: new( ) . line( format!( "Total num bytes: {current_total_num_bytes_read}" ) ) . print( ) ;
@@ -105,8 +108,8 @@ impl TestServer {
105108 total_num_streams: current_total_num_streams,
106109 total_num_bytes_read: current_total_num_bytes_read,
107110 total_num_messages: current_total_num_messages,
108- bytes_per_s: ( 10 * bytes_per_s ) as f64 ,
109- messages_per_s: ( 10 * messages_per_s) as f64 ,
111+ bytes_per_s: bytes_per_s as f64 ,
112+ messages_per_s: messages_per_s as f64 ,
110113 } ) ;
111114
112115 if e. is_err( ) {
0 commit comments