@@ -959,4 +959,304 @@ mod timed_tests {
959959 . unwrap ( ) ;
960960 assert ! ( raw_chunks. is_empty( ) ) ;
961961 }
962+
963+ #[ async_test]
964+ async fn test_reload_when_dirty ( ) {
965+ let user_id = user_id ! ( "@mnt_io:matrix.org" ) ;
966+ let room_id = room_id ! ( "!raclette:patate.ch" ) ;
967+
968+ // The storage shared by the two clients.
969+ let event_cache_store = MemoryStore :: new ( ) ;
970+
971+ // Client for the process 0.
972+ let client_p0 = MockClientBuilder :: new ( None )
973+ . on_builder ( |builder| {
974+ builder
975+ . store_config (
976+ StoreConfig :: new ( CrossProcessLockConfig :: multi_process ( "process #0" ) )
977+ . event_cache_store ( event_cache_store. clone ( ) ) ,
978+ )
979+ . with_threading_support ( ThreadingSupport :: Enabled { with_subscriptions : true } )
980+ } )
981+ . build ( )
982+ . await ;
983+
984+ // Client for the process 1.
985+ let client_p1 = MockClientBuilder :: new ( None )
986+ . on_builder ( |builder| {
987+ builder
988+ . store_config (
989+ StoreConfig :: new ( CrossProcessLockConfig :: multi_process ( "process #1" ) )
990+ . event_cache_store ( event_cache_store) ,
991+ )
992+ . with_threading_support ( ThreadingSupport :: Enabled { with_subscriptions : true } )
993+ } )
994+ . build ( )
995+ . await ;
996+
997+ let event_factory = EventFactory :: new ( ) . room ( room_id) . sender ( user_id) ;
998+
999+ let thread_root = event_id ! ( "$t0" ) ;
1000+ let thread_event_id_0 = event_id ! ( "$t0_ev0" ) ;
1001+ let thread_event_id_1 = event_id ! ( "$t0_ev1" ) ;
1002+
1003+ let thread_event_0 = event_factory
1004+ . text_msg ( "comté" )
1005+ . event_id ( thread_event_id_0)
1006+ . in_thread ( thread_root, thread_root)
1007+ . into_event ( ) ;
1008+ let thread_event_1 = event_factory
1009+ . text_msg ( "morbier" )
1010+ . event_id ( thread_event_id_1)
1011+ . in_thread ( thread_root, thread_event_id_0)
1012+ . into_event ( ) ;
1013+
1014+ // Add events to the storage (shared by the two clients!).
1015+ client_p0
1016+ . event_cache_store ( )
1017+ . lock ( )
1018+ . await
1019+ . expect ( "[p0] Could not acquire the event cache lock" )
1020+ . as_clean ( )
1021+ . expect ( "[p0] Could not acquire a clean event cache lock" )
1022+ . handle_linked_chunk_updates (
1023+ LinkedChunkId :: Thread ( room_id, thread_root) ,
1024+ vec ! [
1025+ Update :: NewItemsChunk {
1026+ previous: None ,
1027+ new: ChunkIdentifier :: new( 0 ) ,
1028+ next: None ,
1029+ } ,
1030+ Update :: PushItems {
1031+ at: Position :: new( ChunkIdentifier :: new( 0 ) , 0 ) ,
1032+ items: vec![ thread_event_0] ,
1033+ } ,
1034+ Update :: NewItemsChunk {
1035+ previous: Some ( ChunkIdentifier :: new( 0 ) ) ,
1036+ new: ChunkIdentifier :: new( 1 ) ,
1037+ next: None ,
1038+ } ,
1039+ Update :: PushItems {
1040+ at: Position :: new( ChunkIdentifier :: new( 1 ) , 0 ) ,
1041+ items: vec![ thread_event_1] ,
1042+ } ,
1043+ ] ,
1044+ )
1045+ . await
1046+ . unwrap ( ) ;
1047+
1048+ // Subscribe the event caches, and create the room.
1049+ let ( room_event_cache_p0, room_event_cache_p1) = {
1050+ let event_cache_p0 = client_p0. event_cache ( ) ;
1051+ event_cache_p0. subscribe ( ) . unwrap ( ) ;
1052+
1053+ let event_cache_p1 = client_p1. event_cache ( ) ;
1054+ event_cache_p1. subscribe ( ) . unwrap ( ) ;
1055+
1056+ client_p0. base_client ( ) . get_or_create_room ( room_id, RoomState :: Joined ) ;
1057+ client_p1. base_client ( ) . get_or_create_room ( room_id, RoomState :: Joined ) ;
1058+
1059+ let ( room_event_cache_p0, _drop_handles) =
1060+ client_p0. get_room ( room_id) . unwrap ( ) . event_cache ( ) . await . unwrap ( ) ;
1061+ let ( room_event_cache_p1, _drop_handles) =
1062+ client_p1. get_room ( room_id) . unwrap ( ) . event_cache ( ) . await . unwrap ( ) ;
1063+
1064+ ( room_event_cache_p0, room_event_cache_p1)
1065+ } ;
1066+
1067+ // Okay. We are ready for the test!
1068+ //
1069+ // First off, let's check `room_event_cache_p0` has access to the first event
1070+ // loaded in-memory, then do a pagination, and see more events.
1071+ let mut updates_stream_p0 = {
1072+ let room_event_cache = & room_event_cache_p0;
1073+
1074+ let ( initial_updates, mut updates_stream) =
1075+ room_event_cache_p0. subscribe_to_thread ( thread_root. to_owned ( ) ) . await . unwrap ( ) ;
1076+
1077+ // Initial updates contain `thread_event_id_1` only.
1078+ assert_eq ! ( initial_updates. len( ) , 1 ) ;
1079+ assert_eq ! ( initial_updates[ 0 ] . event_id( ) . as_deref( ) , Some ( thread_event_id_1) ) ;
1080+ assert ! ( updates_stream. is_empty( ) ) ;
1081+
1082+ // Load one more event with a backpagination.
1083+ room_event_cache
1084+ . thread_pagination ( thread_root. to_owned ( ) )
1085+ . await
1086+ . unwrap ( )
1087+ . run_backwards_once ( 1 )
1088+ . await
1089+ . unwrap ( ) ;
1090+
1091+ // A new update for `ev_id_0` must be present.
1092+ assert_matches ! (
1093+ updates_stream. recv( ) . await . unwrap( ) ,
1094+ TimelineVectorDiffs { diffs, .. } => {
1095+ assert_eq!( diffs. len( ) , 1 , "{diffs:#?}" ) ;
1096+ assert_matches!(
1097+ & diffs[ 0 ] ,
1098+ VectorDiff :: Insert { index: 0 , value: event } => {
1099+ assert_eq!( event. event_id( ) . as_deref( ) , Some ( thread_event_id_0) ) ;
1100+ }
1101+ ) ;
1102+ }
1103+ ) ;
1104+
1105+ updates_stream
1106+ } ;
1107+
1108+ // Second, let's check `room_event_cache_p1` has the same accesses.
1109+ let mut updates_stream_p1 = {
1110+ let room_event_cache = & room_event_cache_p1;
1111+ let ( initial_updates, mut updates_stream) =
1112+ room_event_cache_p1. subscribe_to_thread ( thread_root. to_owned ( ) ) . await . unwrap ( ) ;
1113+
1114+ // Initial updates contain `thread_event_id_1` only.
1115+ assert_eq ! ( initial_updates. len( ) , 1 ) ;
1116+ assert_eq ! ( initial_updates[ 0 ] . event_id( ) . as_deref( ) , Some ( thread_event_id_1) ) ;
1117+ assert ! ( updates_stream. is_empty( ) ) ;
1118+
1119+ // Load one more event with a backpagination.
1120+ room_event_cache
1121+ . thread_pagination ( thread_root. to_owned ( ) )
1122+ . await
1123+ . unwrap ( )
1124+ . run_backwards_once ( 1 )
1125+ . await
1126+ . unwrap ( ) ;
1127+
1128+ // A new update for `thread_event_id_0` must be present.
1129+ assert_matches ! (
1130+ updates_stream. recv( ) . await . unwrap( ) ,
1131+ TimelineVectorDiffs { diffs, .. } => {
1132+ assert_eq!( diffs. len( ) , 1 , "{diffs:#?}" ) ;
1133+ assert_matches!(
1134+ & diffs[ 0 ] ,
1135+ VectorDiff :: Insert { index: 0 , value: event } => {
1136+ assert_eq!( event. event_id( ) . as_deref( ) , Some ( thread_event_id_0) ) ;
1137+ }
1138+ ) ;
1139+ }
1140+ ) ;
1141+
1142+ updates_stream
1143+ } ;
1144+
1145+ // Do this a couple times, for the fun.
1146+ for _ in 0 ..3 {
1147+ // Third, because `room_event_cache_p1` has locked the store, the lock
1148+ // is dirty for `room_event_cache_p0`, so it will shrink to its last
1149+ // chunk for the thread!
1150+ {
1151+ let room_event_cache = & room_event_cache_p0;
1152+ let updates_stream = & mut updates_stream_p0;
1153+
1154+ // `thread_event_id_1` must be loaded in memory, just like before.
1155+ // However, `thread_event_id_0` must NOT be loaded in memory. It WAS loaded, but
1156+ // the state has been reloaded to its last chunk.
1157+ let ( initial_updates, _) =
1158+ room_event_cache. subscribe_to_thread ( thread_root. to_owned ( ) ) . await . unwrap ( ) ;
1159+
1160+ assert_eq ! ( initial_updates. len( ) , 1 ) ;
1161+ assert_eq ! ( initial_updates[ 0 ] . event_id( ) . as_deref( ) , Some ( thread_event_id_1) ) ;
1162+
1163+ // The reload can be observed via the updates too.
1164+ assert_matches ! (
1165+ updates_stream. recv( ) . await . unwrap( ) ,
1166+ TimelineVectorDiffs { diffs, .. } => {
1167+ assert_eq!( diffs. len( ) , 2 , "{diffs:#?}" ) ;
1168+ assert_matches!( & diffs[ 0 ] , VectorDiff :: Clear ) ;
1169+ assert_matches!(
1170+ & diffs[ 1 ] ,
1171+ VectorDiff :: Append { values: events } => {
1172+ assert_eq!( events. len( ) , 1 ) ;
1173+ assert_eq!( events[ 0 ] . event_id( ) . as_deref( ) , Some ( thread_event_id_1) ) ;
1174+ }
1175+ ) ;
1176+ }
1177+ ) ;
1178+
1179+ // Load one more event with a backpagination.
1180+ room_event_cache
1181+ . thread_pagination ( thread_root. to_owned ( ) )
1182+ . await
1183+ . unwrap ( )
1184+ . run_backwards_once ( 1 )
1185+ . await
1186+ . unwrap ( ) ;
1187+
1188+ // `thread_event_id_0` must now be loaded in memory.
1189+ // The pagination can be observed via the updates.
1190+ assert_matches ! (
1191+ updates_stream. recv( ) . await . unwrap( ) ,
1192+ TimelineVectorDiffs { diffs, .. } => {
1193+ assert_eq!( diffs. len( ) , 1 , "{diffs:#?}" ) ;
1194+ assert_matches!(
1195+ & diffs[ 0 ] ,
1196+ VectorDiff :: Insert { index: 0 , value: event } => {
1197+ assert_eq!( event. event_id( ) . as_deref( ) , Some ( thread_event_id_0) ) ;
1198+ }
1199+ ) ;
1200+ }
1201+ ) ;
1202+ }
1203+
1204+ // Fourth, because `room_event_cache_p0` has locked the store again, the lock
1205+ // is dirty for `room_event_cache_p1` too!, so it will shrink to its last
1206+ // chunk for the thread!
1207+ {
1208+ let room_event_cache = & room_event_cache_p1;
1209+ let updates_stream = & mut updates_stream_p1;
1210+
1211+ // `thread_event_id_1` must be loaded in memory, just like before.
1212+ // However, `thread_event_id_0` must NOT be loaded in memory. It WAS loaded, but
1213+ // the state has shrunk to its last chunk.
1214+ let ( initial_updates, _) =
1215+ room_event_cache. subscribe_to_thread ( thread_root. to_owned ( ) ) . await . unwrap ( ) ;
1216+
1217+ assert_eq ! ( initial_updates. len( ) , 1 ) ;
1218+ assert_eq ! ( initial_updates[ 0 ] . event_id( ) . as_deref( ) , Some ( thread_event_id_1) ) ;
1219+
1220+ // The reload can be observed via the updates too.
1221+ assert_matches ! (
1222+ updates_stream. recv( ) . await . unwrap( ) ,
1223+ TimelineVectorDiffs { diffs, .. } => {
1224+ assert_eq!( diffs. len( ) , 2 , "{diffs:#?}" ) ;
1225+ assert_matches!( & diffs[ 0 ] , VectorDiff :: Clear ) ;
1226+ assert_matches!(
1227+ & diffs[ 1 ] ,
1228+ VectorDiff :: Append { values: events } => {
1229+ assert_eq!( events. len( ) , 1 ) ;
1230+ assert_eq!( events[ 0 ] . event_id( ) . as_deref( ) , Some ( thread_event_id_1) ) ;
1231+ }
1232+ ) ;
1233+ }
1234+ ) ;
1235+
1236+ // Load one more event with a backpagination.
1237+ room_event_cache
1238+ . thread_pagination ( thread_root. to_owned ( ) )
1239+ . await
1240+ . unwrap ( )
1241+ . run_backwards_once ( 1 )
1242+ . await
1243+ . unwrap ( ) ;
1244+
1245+ // `thread_event_id_0` must now be loaded in memory.
1246+ // The pagination can be observed via the updates.
1247+ assert_matches ! (
1248+ updates_stream. recv( ) . await . unwrap( ) ,
1249+ TimelineVectorDiffs { diffs, .. } => {
1250+ assert_eq!( diffs. len( ) , 1 , "{diffs:#?}" ) ;
1251+ assert_matches!(
1252+ & diffs[ 0 ] ,
1253+ VectorDiff :: Insert { index: 0 , value: event } => {
1254+ assert_eq!( event. event_id( ) . as_deref( ) , Some ( thread_event_id_0) ) ;
1255+ }
1256+ ) ;
1257+ }
1258+ ) ;
1259+ }
1260+ }
1261+ }
9621262}
0 commit comments