@@ -4,6 +4,7 @@ mod moved_tests {
44 use std:: collections:: BTreeSet ;
55 use std:: sync:: Arc ;
66 use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
7+ use std:: time:: Duration ;
78
89 use parking_lot:: Mutex ;
910 use tokio:: sync:: { Barrier , mpsc} ;
@@ -70,6 +71,101 @@ mod moved_tests {
7071 assert ! ( ctx. connection( "conn-preloaded" ) . is_some( ) ) ;
7172 }
7273
74+ #[ tokio:: test]
75+ async fn pending_connection_is_invisible_until_preflight_succeeds ( ) {
76+ let ctx = ActorContext :: new_with_kv (
77+ "actor-preflight-visibility" ,
78+ "actor" ,
79+ Vec :: new ( ) ,
80+ "local" ,
81+ Kv :: new_in_memory ( ) ,
82+ ) ;
83+ ctx. configure_connection_runtime ( crate :: actor:: config:: ActorConfig :: default ( ) ) ;
84+ let ( events_tx, mut events_rx) = mpsc:: unbounded_channel ( ) ;
85+ ctx. configure_actor_events ( Some ( events_tx) ) ;
86+
87+ let events_ctx = ctx. clone ( ) ;
88+ let event_task = tokio:: spawn ( async move {
89+ let preflight_conn_id = match events_rx. recv ( ) . await . expect ( "preflight event" ) {
90+ ActorEvent :: ConnectionPreflight { conn, reply, .. } => {
91+ assert ! ( events_ctx. connection( conn. id( ) ) . is_none( ) ) ;
92+ conn. set_state_initial ( vec ! [ 7 ] ) ;
93+ let conn_id = conn. id ( ) . to_owned ( ) ;
94+ reply. send ( Ok ( ( ) ) ) ;
95+ conn_id
96+ }
97+ other => panic ! ( "unexpected event: {other:?}" ) ,
98+ } ;
99+
100+ match events_rx. recv ( ) . await . expect ( "open event" ) {
101+ ActorEvent :: ConnectionOpen { conn, reply, .. } => {
102+ assert_eq ! ( conn. id( ) , preflight_conn_id) ;
103+ let visible = events_ctx
104+ . connection ( conn. id ( ) )
105+ . expect ( "connection should be visible for onConnect" ) ;
106+ assert_eq ! ( visible. state( ) , vec![ 7 ] ) ;
107+ reply. send ( Ok ( ( ) ) ) ;
108+ }
109+ other => panic ! ( "unexpected event: {other:?}" ) ,
110+ }
111+ } ) ;
112+
113+ let conn = ctx
114+ . connect_with_state ( vec ! [ 1 ] , false , None , None , async { Ok ( vec ! [ 2 ] ) } )
115+ . await
116+ . expect ( "connection should succeed" ) ;
117+
118+ assert_eq ! ( conn. state( ) , vec![ 7 ] ) ;
119+ assert ! ( ctx. connection( conn. id( ) ) . is_some( ) ) ;
120+ event_task. await . expect ( "event task should complete" ) ;
121+ }
122+
123+ #[ tokio:: test]
124+ async fn failed_preflight_never_exposes_connection ( ) {
125+ let ctx = ActorContext :: new_with_kv (
126+ "actor-preflight-failure" ,
127+ "actor" ,
128+ Vec :: new ( ) ,
129+ "local" ,
130+ Kv :: new_in_memory ( ) ,
131+ ) ;
132+ ctx. configure_connection_runtime ( crate :: actor:: config:: ActorConfig :: default ( ) ) ;
133+ let ( events_tx, mut events_rx) = mpsc:: unbounded_channel ( ) ;
134+ ctx. configure_actor_events ( Some ( events_tx) ) ;
135+ let failed_conn_id = Arc :: new ( Mutex :: new ( None :: < String > ) ) ;
136+
137+ let events_ctx = ctx. clone ( ) ;
138+ let event_failed_conn_id = failed_conn_id. clone ( ) ;
139+ let event_task = tokio:: spawn ( async move {
140+ match events_rx. recv ( ) . await . expect ( "preflight event" ) {
141+ ActorEvent :: ConnectionPreflight { conn, reply, .. } => {
142+ assert ! ( events_ctx. connection( conn. id( ) ) . is_none( ) ) ;
143+ * event_failed_conn_id. lock ( ) = Some ( conn. id ( ) . to_owned ( ) ) ;
144+ reply. send ( Err ( anyhow:: anyhow!( "reject preflight" ) ) ) ;
145+ }
146+ other => panic ! ( "unexpected event: {other:?}" ) ,
147+ }
148+ assert ! (
149+ tokio:: time:: timeout( Duration :: from_millis( 20 ) , events_rx. recv( ) )
150+ . await
151+ . is_err( )
152+ ) ;
153+ } ) ;
154+
155+ let error = ctx
156+ . connect_with_state ( vec ! [ 1 ] , false , None , None , async { Ok ( vec ! [ 2 ] ) } )
157+ . await
158+ . expect_err ( "connection should fail" ) ;
159+
160+ assert ! ( format!( "{error:#}" ) . contains( "reject preflight" ) ) ;
161+ let conn_id = failed_conn_id
162+ . lock ( )
163+ . clone ( )
164+ . expect ( "failed connection id should be recorded" ) ;
165+ assert ! ( ctx. connection( & conn_id) . is_none( ) ) ;
166+ event_task. await . expect ( "event task should complete" ) ;
167+ }
168+
73169 #[ test]
74170 fn persisted_connection_uses_ts_v4_fixed_id_wire_format ( ) {
75171 let persisted = PersistedConnection {
@@ -132,6 +228,7 @@ mod moved_tests {
132228 async move {
133229 while let Some ( event) = events_rx. recv ( ) . await {
134230 match event {
231+ ActorEvent :: ConnectionPreflight { reply, .. } => reply. send ( Ok ( ( ) ) ) ,
135232 ActorEvent :: ConnectionOpen { reply, .. } => reply. send ( Ok ( ( ) ) ) ,
136233 ActorEvent :: ConnectionClosed { conn } => {
137234 * observed_conn_id. lock ( ) = Some ( conn. id ( ) . to_owned ( ) ) ;
@@ -218,12 +315,13 @@ mod moved_tests {
218315 ctx. configure_lifecycle_events ( Some ( lifecycle_events_tx) ) ;
219316
220317 let open_replies = tokio:: spawn ( async move {
221- for _ in 0 ..2 {
318+ for _ in 0 ..4 {
222319 match actor_events_rx
223320 . recv ( )
224321 . await
225322 . expect ( "open event should arrive" )
226323 {
324+ ActorEvent :: ConnectionPreflight { reply, .. } => reply. send ( Ok ( ( ) ) ) ,
227325 ActorEvent :: ConnectionOpen { reply, .. } => reply. send ( Ok ( ( ) ) ) ,
228326 other => panic ! ( "unexpected actor event: {other:?}" ) ,
229327 }
0 commit comments