@@ -724,7 +724,15 @@ async fn get_retention_policy(
724724//
725725// This is a WAG, but the computations we do today to report table usage are
726726// pretty inexpensive.
727+ //
728+ // NOTE: We explicitly use a smaller value during testing. This is to avoid
729+ // manipulating time with Tokio's test utils. That cannot work, because we run
730+ // every query to ClickHouse with a timeout, and that causes the tokio timers to
731+ // "auto-advance" to the end of that timeout when we pause in a test.
732+ #[ cfg( not( test) ) ]
727733const USAGE_UPDATE_INTERVAL : Duration = Duration :: from_mins ( 2 ) ;
734+ #[ cfg( test) ]
735+ const USAGE_UPDATE_INTERVAL : Duration = Duration :: from_millis ( 250 ) ;
728736
729737async fn long_running_usage_task (
730738 tx : watch:: Sender < DatabaseUsageResult > ,
@@ -981,42 +989,57 @@ mod tests {
981989 let usage = context. database_usage ( ) ;
982990 println ! ( "{usage:#?}" ) ;
983991 assert ! ( usage. last_success. is_some( ) ) ;
984- assert ! (
985- usage
986- . last_success
987- . expect( "Should have successfully computed something" )
988- . tables
989- . is_empty( )
990- ) ;
991992
992- // Jump forward until we actually do compute the usage again.
993- tokio:: time:: pause ( ) ;
994- let now = tokio:: time:: Instant :: now ( ) ;
995- while now. elapsed ( ) < 2 * USAGE_UPDATE_INTERVAL {
996- tokio:: time:: advance ( std:: time:: Duration :: from_millis ( 10 ) ) . await ;
997- }
998- tokio:: time:: resume ( ) ;
999- let usage = context. database_usage ( ) ;
993+ // Wait until we actually do compute the usage again.
994+ let usage = dev:: poll:: wait_for_condition (
995+ || async {
996+ let usage = context. database_usage ( ) ;
997+ match & usage. last_success {
998+ Some ( success) => {
999+ if success. tables . is_empty ( ) {
1000+ Err ( dev:: poll:: CondCheckError :: < ( ) > :: NotYet )
1001+ } else {
1002+ Ok ( usage)
1003+ }
1004+ }
1005+ None => Err ( dev:: poll:: CondCheckError :: < ( ) > :: NotYet ) ,
1006+ }
1007+ } ,
1008+ & std:: time:: Duration :: from_millis ( 50 ) ,
1009+ & ( 2 * USAGE_UPDATE_INTERVAL ) ,
1010+ )
1011+ . await
1012+ . unwrap ( ) ;
10001013 println ! ( "{usage:#?}" ) ;
10011014 let tables = & usage
10021015 . last_success
10031016 . as_ref ( )
10041017 . expect ( "Should have computed something" )
10051018 . tables ;
1006- tables. contains_key ( & String :: from ( "oximeter.measurements_u64" ) ) ;
1007- tables. contains_key ( & String :: from ( "oximeter.measurements_u64" ) ) ;
1008- let version = tables. get ( & String :: from ( "oximeter.version" ) ) . unwrap ( ) ;
1009- assert_eq ! ( version. n_rows, 1 ) ;
1019+ assert ! (
1020+ tables. contains_key( & String :: from( "oximeter.measurements_u64" ) )
1021+ ) ;
1022+ assert ! (
1023+ tables. contains_key( & String :: from( "oximeter.measurements_f64" ) )
1024+ ) ;
1025+ assert ! ( tables. contains_key( & String :: from( "oximeter.version" ) ) ) ;
10101026
1011- // Kill the database, and force another collection.
1027+ // Kill the database, and wait for another collection. This one should
1028+ // fail.
10121029 clickhouse. cleanup ( ) . await . unwrap ( ) ;
1013- tokio:: time:: pause ( ) ;
1014- let now = tokio:: time:: Instant :: now ( ) ;
1015- while now. elapsed ( ) < 2 * USAGE_UPDATE_INTERVAL {
1016- tokio:: time:: advance ( std:: time:: Duration :: from_millis ( 10 ) ) . await ;
1017- }
1018- tokio:: time:: resume ( ) ;
1019- let usage = context. database_usage ( ) ;
1030+ let usage = dev:: poll:: wait_for_condition (
1031+ || async {
1032+ let usage = context. database_usage ( ) ;
1033+ match & usage. last_error {
1034+ Some ( _) => Ok ( usage) ,
1035+ None => Err ( dev:: poll:: CondCheckError :: < ( ) > :: NotYet ) ,
1036+ }
1037+ } ,
1038+ & std:: time:: Duration :: from_millis ( 100 ) ,
1039+ & ( 2 * USAGE_UPDATE_INTERVAL ) ,
1040+ )
1041+ . await
1042+ . unwrap ( ) ;
10201043 println ! ( "{usage:#?}" ) ;
10211044 assert ! (
10221045 usage. last_success. is_some( ) ,
@@ -1025,7 +1048,11 @@ mod tests {
10251048 let Some ( err) = usage. last_error . as_ref ( ) else {
10261049 panic ! ( "expected an error to have occurred, but found None" ) ;
10271050 } ;
1028- assert ! ( err. error. starts_with( "Failed to check out" ) ) ;
1051+ let is_network_err = |msg : & str | -> bool {
1052+ msg. starts_with ( "Failed to check out" )
1053+ || msg. starts_with ( "Native protocol error" )
1054+ } ;
1055+ assert ! ( is_network_err( & err. error) , "Expected a network error error" ) ;
10291056
10301057 logctx. cleanup_successful ( ) ;
10311058 }
0 commit comments