@@ -249,6 +249,7 @@ mod timed_tests {
249249 use assert_matches:: assert_matches;
250250 use assert_matches2:: assert_let;
251251 use eyeball_im:: VectorDiff ;
252+ use futures_util:: FutureExt as _;
252253 use matrix_sdk_base:: {
253254 RoomState , ThreadingSupport ,
254255 cross_process_lock:: CrossProcessLockConfig ,
@@ -697,4 +698,181 @@ mod timed_tests {
697698 // contains a single items chunk that's empty.
698699 assert_eq ! ( linked_chunk. num_items( ) , 0 ) ;
699700 }
701+
702+ #[ async_test]
703+ async fn test_load_from_storage ( ) {
704+ let room_id = room_id ! ( "!r0" ) ;
705+ let f = EventFactory :: new ( ) . room ( room_id) . sender ( user_id ! ( "@mnt_io:matrix.org" ) ) ;
706+
707+ let event_cache_store = Arc :: new ( MemoryStore :: new ( ) ) ;
708+
709+ let thread_root = event_id ! ( "$t0" ) ;
710+ let thread_event_id_0 = event_id ! ( "$t0_ev0" ) ;
711+ let thread_event_id_1 = event_id ! ( "$t0_ev1" ) ;
712+
713+ let thread_event_0 = f
714+ . text_msg ( "hello world" )
715+ . event_id ( thread_event_id_0)
716+ . in_thread ( thread_root, thread_root)
717+ . into_event ( ) ;
718+ let thread_event_1 = f
719+ . text_msg ( "how's it going" )
720+ . event_id ( thread_event_id_1)
721+ . in_thread ( thread_root, thread_event_id_1)
722+ . into_event ( ) ;
723+
724+ // Prefill the store with some data. The room usually has all events duplicated
725+ // from the threads. It's important to make the test pass when checking the
726+ // generic update.
727+ let updates = vec ! [
728+ // An empty items chunk.
729+ Update :: NewItemsChunk { previous: None , new: ChunkIdentifier :: new( 0 ) , next: None } ,
730+ // A gap chunk.
731+ Update :: NewGapChunk {
732+ previous: Some ( ChunkIdentifier :: new( 0 ) ) ,
733+ // Chunk IDs aren't supposed to be ordered, so use a random value here.
734+ new: ChunkIdentifier :: new( 42 ) ,
735+ next: None ,
736+ gap: Gap { token: "gruyère" . to_owned( ) } ,
737+ } ,
738+ // Another items chunk, non-empty this time.
739+ Update :: NewItemsChunk {
740+ previous: Some ( ChunkIdentifier :: new( 42 ) ) ,
741+ new: ChunkIdentifier :: new( 1 ) ,
742+ next: None ,
743+ } ,
744+ Update :: PushItems {
745+ at: Position :: new( ChunkIdentifier :: new( 1 ) , 0 ) ,
746+ items: vec![ thread_event_0. clone( ) ] ,
747+ } ,
748+ // And another items chunk, non-empty again.
749+ Update :: NewItemsChunk {
750+ previous: Some ( ChunkIdentifier :: new( 1 ) ) ,
751+ new: ChunkIdentifier :: new( 2 ) ,
752+ next: None ,
753+ } ,
754+ Update :: PushItems {
755+ at: Position :: new( ChunkIdentifier :: new( 2 ) , 0 ) ,
756+ items: vec![ thread_event_1. clone( ) ] ,
757+ } ,
758+ ] ;
759+ event_cache_store
760+ . handle_linked_chunk_updates ( LinkedChunkId :: Room ( room_id) , updates. clone ( ) )
761+ . await
762+ . unwrap ( ) ;
763+ event_cache_store
764+ . handle_linked_chunk_updates ( LinkedChunkId :: Thread ( room_id, thread_root) , updates)
765+ . await
766+ . unwrap ( ) ;
767+
768+ let client = MockClientBuilder :: new ( None )
769+ . on_builder ( |builder| {
770+ builder
771+ . store_config (
772+ StoreConfig :: new ( CrossProcessLockConfig :: multi_process ( "hodor" ) )
773+ . event_cache_store ( event_cache_store. clone ( ) ) ,
774+ )
775+ . with_threading_support ( ThreadingSupport :: Enabled { with_subscriptions : true } )
776+ } )
777+ . build ( )
778+ . await ;
779+
780+ let event_cache = client. event_cache ( ) ;
781+ event_cache. subscribe ( ) . unwrap ( ) ;
782+
783+ client. base_client ( ) . get_or_create_room ( room_id, RoomState :: Joined ) ;
784+ let room = client. get_room ( room_id) . unwrap ( ) ;
785+
786+ // Let's check whether the generic updates are received for the initialisation.
787+ let mut generic_stream = event_cache. subscribe_to_room_generic_updates ( ) ;
788+ let ( room_event_cache, _drop_handles) = room. event_cache ( ) . await . unwrap ( ) ;
789+ let ( thread_events, mut thread_stream) =
790+ room_event_cache. subscribe_to_thread ( thread_root. to_owned ( ) ) . await . unwrap ( ) ;
791+
792+ // The room **and** the thread have been loaded. Two generic updates must have
793+ // been triggered.
794+ for _ in 0 ..2 {
795+ assert_matches ! (
796+ generic_stream. recv( ) . await ,
797+ Ok ( RoomEventCacheGenericUpdate { room_id: expected_room_id } ) => {
798+ assert_eq!( room_id, expected_room_id) ;
799+ }
800+ ) ;
801+ }
802+ assert ! ( generic_stream. is_empty( ) ) ;
803+
804+ // The initial events contain one event because only the last chunk is loaded by
805+ // default.
806+ assert_eq ! ( thread_events. len( ) , 1 ) ;
807+ assert_eq ! ( thread_events[ 0 ] . event_id( ) . unwrap( ) , thread_event_id_1) ;
808+ assert ! ( thread_stream. is_empty( ) ) ;
809+
810+ // The thread knows all events in the storage though, even if they aren't
811+ // loaded.
812+ assert ! (
813+ room_event_cache
814+ . find_event_in_thread( thread_root. to_owned( ) , thread_event_id_0)
815+ . await
816+ . unwrap( )
817+ . is_some( )
818+ ) ;
819+ assert ! (
820+ room_event_cache
821+ . find_event_in_thread( thread_root. to_owned( ) , thread_event_id_1)
822+ . await
823+ . unwrap( )
824+ . is_some( )
825+ ) ;
826+
827+ // Let's paginate to load more events.
828+ room_event_cache
829+ . thread_pagination ( thread_root. to_owned ( ) )
830+ . await
831+ . unwrap ( )
832+ . run_backwards_once ( 20 )
833+ . await
834+ . unwrap ( ) ;
835+
836+ assert_matches ! (
837+ thread_stream. recv( ) . await ,
838+ Ok ( TimelineVectorDiffs { diffs, .. } ) => {
839+ assert_eq!( diffs. len( ) , 1 ) ;
840+ assert_matches!( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value: event } => {
841+ assert_eq!( event. event_id( ) . as_deref( ) , Some ( thread_event_id_0) ) ;
842+ } ) ;
843+ }
844+ ) ;
845+ assert ! ( thread_stream. is_empty( ) ) ;
846+
847+ // A generic update is triggered too.
848+ assert_matches ! (
849+ generic_stream. recv( ) . await ,
850+ Ok ( RoomEventCacheGenericUpdate { room_id: expected_room_id } ) => {
851+ assert_eq!( expected_room_id, room_id) ;
852+ }
853+ ) ;
854+ assert ! ( generic_stream. is_empty( ) ) ;
855+
856+ // A new update with one of these events leads to deduplication.
857+ let timeline = Timeline { limited : false , prev_batch : None , events : vec ! [ thread_event_1] } ;
858+
859+ room_event_cache
860+ . handle_joined_room_update ( JoinedRoomUpdate { timeline, ..Default :: default ( ) } )
861+ . await
862+ . unwrap ( ) ;
863+
864+ // Just checking the generic update is correct. There is a duplicate event, so
865+ // no generic changes whatsoever!
866+ assert ! ( generic_stream. recv( ) . now_or_never( ) . is_none( ) ) ;
867+
868+ // The stream doesn't report these changes *yet*. Use the events vector given
869+ // when subscribing, to check that the events correspond to their new
870+ // positions. The duplicated item is removed (so it's not the first
871+ // element anymore), and it's added to the back of the list.
872+ let ( thread_events, _) =
873+ room_event_cache. subscribe_to_thread ( thread_root. to_owned ( ) ) . await . unwrap ( ) ;
874+ assert_eq ! ( thread_events. len( ) , 2 ) ;
875+ assert_eq ! ( thread_events[ 0 ] . event_id( ) . as_deref( ) , Some ( thread_event_id_0) ) ;
876+ assert_eq ! ( thread_events[ 1 ] . event_id( ) . as_deref( ) , Some ( thread_event_id_1) ) ;
877+ }
700878}
0 commit comments