@@ -982,335 +982,7 @@ pub(crate) fn make_connection_key(conn_id: &str) -> Vec<u8> {
982982 key
983983}
984984
985+ // Test shim keeps moved tests in crate-root tests/ with private-module access.
985986#[ cfg( test) ]
986- mod tests {
987- use std:: collections:: BTreeSet ;
988- use std:: sync:: Arc ;
989- use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
990-
991- use parking_lot:: Mutex ;
992- use tokio:: sync:: { Barrier , mpsc} ;
993- use tokio:: task:: yield_now;
994-
995- use super :: {
996- HibernatableConnectionMetadata , PersistedConnection , decode_persisted_connection,
997- encode_persisted_connection, hibernatable_id_from_slice, make_connection_key,
998- } ;
999- use crate :: actor:: context:: ActorContext ;
1000- use crate :: actor:: messages:: ActorEvent ;
1001- use crate :: actor:: preload:: PreloadedKv ;
1002- use crate :: actor:: task:: LifecycleEvent ;
1003- use crate :: kv:: Kv ;
1004-
1005- fn next_non_activity_lifecycle_event (
1006- rx : & mut mpsc:: Receiver < LifecycleEvent > ,
1007- ) -> Option < LifecycleEvent > {
1008- rx. try_recv ( ) . ok ( )
1009- }
1010-
1011- #[ tokio:: test]
1012- async fn restore_persisted_uses_preloaded_connection_prefix_when_present ( ) {
1013- let ctx = ActorContext :: new_with_kv (
1014- "actor-preload" ,
1015- "actor" ,
1016- Vec :: new ( ) ,
1017- "local" ,
1018- Kv :: new_in_memory ( ) ,
1019- ) ;
1020- let persisted = PersistedConnection {
1021- id : "conn-preloaded" . to_owned ( ) ,
1022- parameters : vec ! [ 1 ] ,
1023- state : vec ! [ 2 ] ,
1024- gateway_id : [ 1 , 2 , 3 , 4 ] ,
1025- request_id : [ 5 , 6 , 7 , 8 ] ,
1026- request_path : "/socket" . to_owned ( ) ,
1027- ..PersistedConnection :: default ( )
1028- } ;
1029- let preloaded = PreloadedKv :: new_with_requested_get_keys (
1030- [ (
1031- make_connection_key ( & persisted. id ) ,
1032- encode_persisted_connection ( & persisted)
1033- . expect ( "persisted connection should encode" ) ,
1034- ) ] ,
1035- Vec :: new ( ) ,
1036- vec ! [ vec![ 2 ] ] ,
1037- ) ;
1038-
1039- let restored = ctx
1040- . restore_persisted ( Some ( & preloaded) )
1041- . await
1042- . expect ( "restore should use preloaded entries instead of unconfigured kv" ) ;
1043-
1044- assert_eq ! ( restored. len( ) , 1 ) ;
1045- assert_eq ! ( restored[ 0 ] . id( ) , "conn-preloaded" ) ;
1046- assert_eq ! ( restored[ 0 ] . state( ) , vec![ 2 ] ) ;
1047- assert ! ( ctx. connection( "conn-preloaded" ) . is_some( ) ) ;
1048- }
1049-
1050- #[ test]
1051- fn persisted_connection_uses_ts_v4_fixed_id_wire_format ( ) {
1052- let persisted = PersistedConnection {
1053- id : "c" . to_owned ( ) ,
1054- parameters : vec ! [ 1 , 2 ] ,
1055- state : vec ! [ 3 ] ,
1056- gateway_id : [ 10 , 11 , 12 , 13 ] ,
1057- request_id : [ 20 , 21 , 22 , 23 ] ,
1058- server_message_index : 9 ,
1059- client_message_index : 10 ,
1060- request_path : "/" . to_owned ( ) ,
1061- ..PersistedConnection :: default ( )
1062- } ;
1063-
1064- let encoded =
1065- encode_persisted_connection ( & persisted) . expect ( "persisted connection should encode" ) ;
1066-
1067- assert_eq ! (
1068- encoded,
1069- vec![
1070- 4 , 0 , // embedded version
1071- 1 , b'c' , // id
1072- 2 , 1 , 2 , // parameters
1073- 1 , 3 , // state
1074- 0 , // subscriptions
1075- 10 , 11 , 12 , 13 , // gatewayId fixed data[4]
1076- 20 , 21 , 22 , 23 , // requestId fixed data[4]
1077- 9 , 0 , // serverMessageIndex
1078- 10 , 0 , // clientMessageIndex
1079- 1 , b'/' , // requestPath
1080- 0 , // requestHeaders
1081- ]
1082- ) ;
1083-
1084- let decoded =
1085- decode_persisted_connection ( & encoded) . expect ( "persisted connection should decode" ) ;
1086- assert_eq ! ( decoded. gateway_id, [ 10 , 11 , 12 , 13 ] ) ;
1087- assert_eq ! ( decoded. request_id, [ 20 , 21 , 22 , 23 ] ) ;
1088- }
1089-
1090- #[ test]
1091- fn hibernatable_id_validation_returns_rivet_error ( ) {
1092- let error = hibernatable_id_from_slice ( "gateway_id" , & [ 1 , 2 , 3 ] )
1093- . expect_err ( "invalid id should fail" ) ;
1094- let error = rivet_error:: RivetError :: extract ( & error) ;
1095-
1096- assert_eq ! ( error. group( ) , "actor" ) ;
1097- assert_eq ! ( error. code( ) , "invalid_request" ) ;
1098- }
1099-
1100- #[ tokio:: test( start_paused = true ) ]
1101- async fn concurrent_disconnects_only_emit_one_close_and_one_hibernation_removal ( ) {
1102- let ctx = ActorContext :: new_with_kv (
1103- "actor-race" ,
1104- "actor" ,
1105- Vec :: new ( ) ,
1106- "local" ,
1107- Kv :: new_in_memory ( ) ,
1108- ) ;
1109- ctx. configure_connection_runtime ( crate :: actor:: config:: ActorConfig :: default ( ) ) ;
1110- let ( events_tx, mut events_rx) = mpsc:: unbounded_channel ( ) ;
1111- ctx. configure_actor_events ( Some ( events_tx) ) ;
1112- let closed = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
1113- let observed_conn_id = Arc :: new ( Mutex :: new ( None :: < String > ) ) ;
1114-
1115- let recv = tokio:: spawn ( {
1116- let closed = closed. clone ( ) ;
1117- let observed_conn_id = observed_conn_id. clone ( ) ;
1118- async move {
1119- while let Some ( event) = events_rx. recv ( ) . await {
1120- match event {
1121- ActorEvent :: ConnectionOpen { reply, .. } => reply. send ( Ok ( ( ) ) ) ,
1122- ActorEvent :: ConnectionClosed { conn } => {
1123- * observed_conn_id. lock ( ) = Some ( conn. id ( ) . to_owned ( ) ) ;
1124- closed. fetch_add ( 1 , Ordering :: SeqCst ) ;
1125- break ;
1126- }
1127- other => panic ! ( "unexpected event: {other:?}" ) ,
1128- }
1129- }
1130- }
1131- } ) ;
1132-
1133- let conn = ctx
1134- . connect_with_state (
1135- vec ! [ 1 ] ,
1136- true ,
1137- Some ( HibernatableConnectionMetadata {
1138- gateway_id : [ 1 , 2 , 3 , 4 ] ,
1139- request_id : [ 5 , 6 , 7 , 8 ] ,
1140- ..HibernatableConnectionMetadata :: default ( )
1141- } ) ,
1142- None ,
1143- async { Ok ( vec ! [ 9 ] ) } ,
1144- )
1145- . await
1146- . expect ( "connection should open" ) ;
1147- let conn_id = conn. id ( ) . to_owned ( ) ;
1148- ctx. record_connections_updated ( ) ;
1149- ctx. reset_sleep_timer ( ) ;
1150-
1151- let barrier = Arc :: new ( Barrier :: new ( 2 ) ) ;
1152- conn. configure_transport_disconnect_handler ( Some ( Arc :: new ( {
1153- let barrier = barrier. clone ( ) ;
1154- move |_reason| {
1155- let barrier = barrier. clone ( ) ;
1156- Box :: pin ( async move {
1157- barrier. wait ( ) . await ;
1158- Ok ( ( ) )
1159- } )
1160- }
1161- } ) ) ) ;
1162-
1163- let first = tokio:: spawn ( {
1164- let conn = conn. clone ( ) ;
1165- async move { conn. disconnect ( Some ( "first" ) ) . await }
1166- } ) ;
1167- let second = tokio:: spawn ( {
1168- let conn = conn. clone ( ) ;
1169- async move { conn. disconnect ( Some ( "second" ) ) . await }
1170- } ) ;
1171-
1172- yield_now ( ) . await ;
1173- first
1174- . await
1175- . expect ( "first disconnect task should join" )
1176- . expect ( "first disconnect should succeed" ) ;
1177- second
1178- . await
1179- . expect ( "second disconnect task should join" )
1180- . expect ( "second disconnect should succeed" ) ;
1181- recv. await . expect ( "event receiver should join" ) ;
1182-
1183- assert_eq ! ( closed. load( Ordering :: SeqCst ) , 1 ) ;
1184- assert_eq ! ( observed_conn_id. lock( ) . as_deref( ) , Some ( conn_id. as_str( ) ) ) ;
1185- assert ! ( ctx. connection( & conn_id) . is_none( ) ) ;
1186-
1187- let pending = ctx. take_pending_hibernation_changes_inner ( ) ;
1188- assert ! ( pending. updated. is_empty( ) ) ;
1189- assert_eq ! ( pending. removed, BTreeSet :: from( [ conn_id] ) ) ;
1190- }
1191-
1192- #[ tokio:: test]
1193- async fn hibernatable_set_state_queues_save_and_non_hibernatable_stays_memory_only ( ) {
1194- let ctx = ActorContext :: new_with_kv (
1195- "actor-state-dirty" ,
1196- "actor" ,
1197- Vec :: new ( ) ,
1198- "local" ,
1199- Kv :: new_in_memory ( ) ,
1200- ) ;
1201- let ( actor_events_tx, mut actor_events_rx) = mpsc:: unbounded_channel ( ) ;
1202- let ( lifecycle_events_tx, mut lifecycle_events_rx) = mpsc:: channel ( 4 ) ;
1203- ctx. configure_actor_events ( Some ( actor_events_tx) ) ;
1204- ctx. configure_lifecycle_events ( Some ( lifecycle_events_tx) ) ;
1205-
1206- let open_replies = tokio:: spawn ( async move {
1207- for _ in 0 ..2 {
1208- match actor_events_rx
1209- . recv ( )
1210- . await
1211- . expect ( "open event should arrive" )
1212- {
1213- ActorEvent :: ConnectionOpen { reply, .. } => reply. send ( Ok ( ( ) ) ) ,
1214- other => panic ! ( "unexpected actor event: {other:?}" ) ,
1215- }
1216- }
1217- } ) ;
1218-
1219- let non_hibernatable = ctx
1220- . connect_with_state ( vec ! [ 1 ] , false , None , None , async { Ok ( vec ! [ 2 ] ) } )
1221- . await
1222- . expect ( "non-hibernatable connection should open" ) ;
1223- non_hibernatable. set_state ( vec ! [ 3 ] ) ;
1224- assert_eq ! ( non_hibernatable. state( ) , vec![ 3 ] ) ;
1225- assert ! (
1226- ctx. dirty_hibernatable_conns_inner( ) . is_empty( ) ,
1227- "non-hibernatable state changes should not queue persistence"
1228- ) ;
1229- assert ! (
1230- next_non_activity_lifecycle_event( & mut lifecycle_events_rx) . is_none( ) ,
1231- "non-hibernatable state changes should not request actor save"
1232- ) ;
1233-
1234- let hibernatable = ctx
1235- . connect_with_state (
1236- vec ! [ 4 ] ,
1237- true ,
1238- Some ( HibernatableConnectionMetadata {
1239- gateway_id : [ 1 , 2 , 3 , 4 ] ,
1240- request_id : [ 5 , 6 , 7 , 8 ] ,
1241- ..HibernatableConnectionMetadata :: default ( )
1242- } ) ,
1243- None ,
1244- async { Ok ( vec ! [ 5 ] ) } ,
1245- )
1246- . await
1247- . expect ( "hibernatable connection should open" ) ;
1248- hibernatable. set_state ( vec ! [ 6 ] ) ;
1249-
1250- assert_eq ! (
1251- ctx. dirty_hibernatable_conns_inner( )
1252- . into_iter( )
1253- . map( |conn| conn. id( ) . to_owned( ) )
1254- . collect:: <Vec <_>>( ) ,
1255- vec![ hibernatable. id( ) . to_owned( ) ]
1256- ) ;
1257- assert_eq ! (
1258- next_non_activity_lifecycle_event( & mut lifecycle_events_rx)
1259- . expect( "hibernatable state change should request save" ) ,
1260- LifecycleEvent :: SaveRequested { immediate: false }
1261- ) ;
1262-
1263- open_replies
1264- . await
1265- . expect ( "open reply task should join cleanly" ) ;
1266- }
1267-
1268- #[ tokio:: test( start_paused = true ) ]
1269- async fn remove_existing_for_disconnect_has_exactly_one_winner ( ) {
1270- let ctx = ActorContext :: new_with_kv (
1271- "actor-race" ,
1272- "actor" ,
1273- Vec :: new ( ) ,
1274- "local" ,
1275- Kv :: new_in_memory ( ) ,
1276- ) ;
1277- let conn = super :: ConnHandle :: new ( "conn-race" , vec ! [ 1 ] , vec ! [ 2 ] , true ) ;
1278- conn. configure_hibernation ( Some ( HibernatableConnectionMetadata {
1279- gateway_id : [ 1 , 2 , 3 , 4 ] ,
1280- request_id : [ 5 , 6 , 7 , 8 ] ,
1281- ..HibernatableConnectionMetadata :: default ( )
1282- } ) ) ;
1283- ctx. insert_existing ( conn) ;
1284-
1285- let barrier = Arc :: new ( Barrier :: new ( 2 ) ) ;
1286- let first = tokio:: spawn ( {
1287- let ctx = ctx. clone ( ) ;
1288- let barrier = barrier. clone ( ) ;
1289- async move {
1290- barrier. wait ( ) . await ;
1291- ctx. remove_existing_for_disconnect ( "conn-race" )
1292- . map ( |conn| conn. id ( ) . to_owned ( ) )
1293- }
1294- } ) ;
1295- let second = tokio:: spawn ( {
1296- let ctx = ctx. clone ( ) ;
1297- let barrier = barrier. clone ( ) ;
1298- async move {
1299- barrier. wait ( ) . await ;
1300- ctx. remove_existing_for_disconnect ( "conn-race" )
1301- . map ( |conn| conn. id ( ) . to_owned ( ) )
1302- }
1303- } ) ;
1304-
1305- let first = first. await . expect ( "first task should join" ) ;
1306- let second = second. await . expect ( "second task should join" ) ;
1307- let winners = [ first, second] . into_iter ( ) . flatten ( ) . collect :: < Vec < _ > > ( ) ;
1308-
1309- assert_eq ! ( winners, vec![ "conn-race" . to_owned( ) ] ) ;
1310- assert ! ( ctx. connection( "conn-race" ) . is_none( ) ) ;
1311-
1312- let pending = ctx. take_pending_hibernation_changes_inner ( ) ;
1313- assert ! ( pending. updated. is_empty( ) ) ;
1314- assert_eq ! ( pending. removed, BTreeSet :: from( [ "conn-race" . to_owned( ) ] ) ) ;
1315- }
1316- }
987+ #[ path = "../../tests/connection.rs" ]
988+ mod tests;
0 commit comments