@@ -20,6 +20,46 @@ type ActorFactory = Arc<dyn Fn(ActorConfig) -> Box<dyn TestActor> + Send + Sync>
2020
2121pub type TestEnvoy = Envoy ;
2222
23+ #[ derive( Clone , Debug , PartialEq , Eq ) ]
24+ pub enum EnvoyConnectionEvent {
25+ Connected ,
26+ Disconnected ,
27+ }
28+
29+ pub struct EnvoyConnectionEventWaiter {
30+ rx : broadcast:: Receiver < EnvoyConnectionEvent > ,
31+ expected : EnvoyConnectionEvent ,
32+ timeout : std:: time:: Duration ,
33+ }
34+
35+ impl EnvoyConnectionEventWaiter {
36+ pub fn assert_no_event ( & mut self ) {
37+ match self . rx . try_recv ( ) {
38+ Err ( tokio:: sync:: broadcast:: error:: TryRecvError :: Empty ) => { }
39+ Ok ( event) => panic ! ( "unexpected Envoy connection event before fault: {event:?}" ) ,
40+ Err ( tokio:: sync:: broadcast:: error:: TryRecvError :: Lagged ( count) ) => {
41+ panic ! ( "missed {count} Envoy connection events before fault" )
42+ }
43+ Err ( err) => panic ! ( "Envoy connection event channel closed: {err}" ) ,
44+ }
45+ }
46+
47+ pub async fn wait ( mut self ) {
48+ tokio:: time:: timeout ( self . timeout , async {
49+ loop {
50+ match self . rx . recv ( ) . await {
51+ Ok ( event) if event == self . expected => break ,
52+ Ok ( _) => { }
53+ Err ( tokio:: sync:: broadcast:: error:: RecvError :: Lagged ( _) ) => { }
54+ Err ( err) => panic ! ( "Envoy connection event channel closed: {err}" ) ,
55+ }
56+ }
57+ } )
58+ . await
59+ . expect ( "timed out waiting for Envoy connection event" ) ;
60+ }
61+ }
62+
2363#[ derive( Clone ) ]
2464pub struct EnvoyConfig {
2565 endpoint : String ,
@@ -119,6 +159,7 @@ impl EnvoyBuilder {
119159 actor_factories : self . actor_factories ,
120160 actors : tokio:: sync:: Mutex :: new ( HashMap :: new ( ) ) ,
121161 lifecycle_tx,
162+ connection_tx : broadcast:: channel ( 100 ) . 0 ,
122163 } ) ,
123164 handle : tokio:: sync:: Mutex :: new ( None ) ,
124165 envoy_key : uuid:: Uuid :: new_v4 ( ) . to_string ( ) ,
@@ -130,6 +171,7 @@ struct EnvoyInner {
130171 actor_factories : HashMap < String , ActorFactory > ,
131172 actors : tokio:: sync:: Mutex < HashMap < String , Box < dyn TestActor > > > ,
132173 lifecycle_tx : broadcast:: Sender < ActorLifecycleEvent > ,
174+ connection_tx : broadcast:: Sender < EnvoyConnectionEvent > ,
133175}
134176
135177pub struct Envoy {
@@ -197,6 +239,21 @@ impl Envoy {
197239 self . inner . lifecycle_tx . subscribe ( )
198240 }
199241
242+ pub fn subscribe_connection_events ( & self ) -> broadcast:: Receiver < EnvoyConnectionEvent > {
243+ self . inner . connection_tx . subscribe ( )
244+ }
245+
246+ pub fn wait_for_next_connection_event (
247+ & self ,
248+ expected : EnvoyConnectionEvent ,
249+ ) -> EnvoyConnectionEventWaiter {
250+ EnvoyConnectionEventWaiter {
251+ rx : self . subscribe_connection_events ( ) ,
252+ expected,
253+ timeout : std:: time:: Duration :: from_secs ( 20 ) ,
254+ }
255+ }
256+
200257 pub async fn shutdown ( & self ) {
201258 if let Some ( handle) = self . handle . lock ( ) . await . take ( ) {
202259 handle. shutdown_and_wait ( false ) . await ;
@@ -241,6 +298,17 @@ impl TestEnvoyCallbacks {
241298}
242299
243300impl rivet_test_envoy:: EnvoyCallbacks for TestEnvoyCallbacks {
301+ fn on_connect ( & self , _handle : EnvoyHandle ) {
302+ let _ = self . inner . connection_tx . send ( EnvoyConnectionEvent :: Connected ) ;
303+ }
304+
305+ fn on_disconnect ( & self , _handle : EnvoyHandle ) {
306+ let _ = self
307+ . inner
308+ . connection_tx
309+ . send ( EnvoyConnectionEvent :: Disconnected ) ;
310+ }
311+
244312 fn on_actor_start (
245313 & self ,
246314 handle : EnvoyHandle ,
@@ -526,6 +594,7 @@ pub struct TestEnvoyBuilder {
526594 namespace : String ,
527595 pool_name : String ,
528596 version : u32 ,
597+ endpoint : Option < String > ,
529598 actor_factories : HashMap < String , ActorFactory > ,
530599}
531600
@@ -535,6 +604,7 @@ impl TestEnvoyBuilder {
535604 namespace : namespace. to_string ( ) ,
536605 pool_name : "test-envoy" . to_string ( ) ,
537606 version : 1 ,
607+ endpoint : None ,
538608 actor_factories : HashMap :: new ( ) ,
539609 }
540610 }
@@ -549,6 +619,11 @@ impl TestEnvoyBuilder {
549619 self
550620 }
551621
622+ pub fn with_endpoint ( mut self , endpoint : impl Into < String > ) -> Self {
623+ self . endpoint = Some ( endpoint. into ( ) ) ;
624+ self
625+ }
626+
552627 pub fn with_actor_behavior < F > ( mut self , actor_name : & str , factory : F ) -> Self
553628 where
554629 F : Fn ( ActorConfig ) -> Box < dyn TestActor > + Send + Sync + ' static ,
@@ -560,7 +635,10 @@ impl TestEnvoyBuilder {
560635
561636 pub async fn build ( self , dc : & super :: TestDatacenter ) -> Result < Envoy > {
562637 let config = EnvoyConfig :: builder ( )
563- . endpoint ( format ! ( "http://127.0.0.1:{}" , dc. guard_port( ) ) )
638+ . endpoint (
639+ self . endpoint
640+ . unwrap_or_else ( || format ! ( "http://127.0.0.1:{}" , dc. guard_port( ) ) ) ,
641+ )
564642 . token ( "dev" )
565643 . namespace ( & self . namespace )
566644 . pool_name ( & self . pool_name )
0 commit comments