diff --git a/Cargo.lock b/Cargo.lock index b9cc5cf992b..23037388f12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3222,6 +3222,7 @@ dependencies = [ "mime2ext", "oauth2", "oauth2-reqwest", + "once_cell", "percent-encoding", "pin-project-lite", "proptest", diff --git a/Cargo.toml b/Cargo.toml index aea2a91c70c..e95cd3e3044 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ js-sys = { version = "0.3.82", default-features = false, features = ["std"] } mime = { version = "0.3.17", default-features = false } oauth2 = { version = "5.0.0", default-features = false, features = ["timing-resistant-secret-traits"] } oauth2-reqwest = { version = "0.1.0-alpha.3", default-features = false } +once_cell = { version = "1.21", default-features = false } pbkdf2 = { version = "0.12.2", default-features = false } pin-project-lite = { version = "0.2.16", default-features = false } proc-macro2 = { version = "1.0.106", default-features = false } diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index ab875b93960..bb1ae9b3f68 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -27,8 +27,8 @@ use imbl::Vector; use matrix_sdk::{ deserialized_responses::TimelineEvent, event_cache::{ - DecryptionRetryRequest, EventCache, EventFocusedCache, PaginationStatus, RoomEventCache, - ThreadEventCache, TimelineVectorDiffs, + DecryptionRetryRequest, EventCache, EventFocusedCache, PaginationStatus, PinnedEventsCache, + RoomEventCache, ThreadEventCache, TimelineVectorDiffs, }, send_queue::{ LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle, @@ -148,7 +148,7 @@ pub(in crate::timeline) enum TimelineFocusKind { PinnedEvents { /// The cache holding all the events for this focus. - event_cache: RoomEventCache, + event_cache: PinnedEventsCache, }, } @@ -395,9 +395,9 @@ impl TimelineController

{ root_event_id: root_event_id.clone(), }, - TimelineFocus::PinnedEvents => { - TimelineFocusKind::PinnedEvents { event_cache: event_cache.room(room_id).await?.0 } - } + TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents { + event_cache: event_cache.pinned_events(room_id).await?.0, + }, }; let focus = Arc::new(focus); @@ -1447,8 +1447,7 @@ impl TimelineController { } TimelineFocusKind::PinnedEvents { event_cache } => { - let (initial_events, pinned_events_recv) = - event_cache.subscribe_to_pinned_events().await?; + let (initial_events, pinned_events_recv) = event_cache.subscribe().await?; let has_events = !initial_events.is_empty(); @@ -1463,7 +1462,7 @@ impl TimelineController { .client() .task_monitor() .spawn_infinite_task( - "timeline::pinned_event_cache_updates", + "timeline::pinned_events_cache_updates", pinned_events_task(event_cache.clone(), self.clone(), pinned_events_recv), ) .abort_on_drop(); diff --git a/crates/matrix-sdk-ui/src/timeline/tasks.rs b/crates/matrix-sdk-ui/src/timeline/tasks.rs index 5f411933b18..490170800b4 100644 --- a/crates/matrix-sdk-ui/src/timeline/tasks.rs +++ b/crates/matrix-sdk-ui/src/timeline/tasks.rs @@ -18,7 +18,7 @@ use std::collections::BTreeSet; use matrix_sdk::{ event_cache::{ - EventFocusThreadMode, EventFocusedCache, EventsOrigin, RoomEventCache, + EventFocusThreadMode, EventFocusedCache, EventsOrigin, PinnedEventsCache, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate, ThreadEventCache, TimelineVectorDiffs, }, send_queue::RoomSendQueueUpdate, @@ -38,7 +38,7 @@ use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEvent ) )] pub(in crate::timeline) async fn pinned_events_task( - room_event_cache: RoomEventCache, + pinned_events_cache: PinnedEventsCache, timeline_controller: TimelineController, mut pinned_events_recv: Receiver, ) { @@ -54,8 +54,7 @@ pub(in crate::timeline) async fn pinned_events_task( // The updates might have lagged, but the room event cache might have // events, so retrieve them and add them back again to the timeline, // after clearing it. - let (initial_events, _) = match room_event_cache.subscribe_to_pinned_events().await - { + let (initial_events, _) = match pinned_events_cache.subscribe().await { Ok(initial_events) => initial_events, Err(err) => { error!( diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs b/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs index 53b4b933ae8..cf138ef7969 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs @@ -771,10 +771,10 @@ async fn test_redacted_events_are_reflected_in_sync() { let room_id = room_id!("!test:localhost"); let f = EventFactory::new().room(room_id).sender(*BOB); - let event_id = event_id!("$1"); + let pinned_event_id = event_id!("$1"); let pinned_event = f .text_msg("in the end") - .event_id(event_id!("$1")) + .event_id(pinned_event_id) .server_ts(MilliSecondsSinceUnixEpoch::now()) .into_raw_sync(); @@ -784,7 +784,7 @@ async fn test_redacted_events_are_reflected_in_sync() { // Mock /relations for pinned timeline event. server .mock_room_relations() - .match_target_event(event_id.to_owned()) + .match_target_event(pinned_event_id.to_owned()) .match_limit(256) .ok(RoomRelationsResponseTemplate::default().events(Vec::>::new())) .mock_once() @@ -794,7 +794,7 @@ async fn test_redacted_events_are_reflected_in_sync() { // Load initial timeline items: a text message and a `m.room.pinned_events` with // event $1 let room = PinnedEventsSync::new(room_id) - .with_pinned_event_ids(vec!["$1"]) + .with_pinned_event_ids(vec![pinned_event_id.as_str()]) .mock_and_sync(&client, &server) .await .expect("Sync failed"); @@ -818,12 +818,13 @@ async fn test_redacted_events_are_reflected_in_sync() { assert_eq!(items.len(), 1 + 1); // event item + a date divider assert!(items[0].is_date_divider()); - assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "in the end"); + assert_eq!(items[1].as_event().unwrap().event_id(), Some(pinned_event_id)); assert_pending!(timeline_stream); + let redaction_event_id = event_id!("$2"); let redaction_event = f - .redaction(event_id!("$1")) - .event_id(event_id!("$2")) + .redaction(pinned_event_id) + .event_id(redaction_event_id) .server_ts(MilliSecondsSinceUnixEpoch::now()) .into_raw_sync(); diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index 97e4ea36afe..8bdc1cd10f4 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -128,6 +128,7 @@ mime.workspace = true mime2ext = "0.1.54" oauth2.workspace = true oauth2-reqwest.workspace = true +once_cell.workspace = true percent-encoding = "2.3.2" pin-project-lite.workspace = true rand = { workspace = true, optional = true } diff --git a/crates/matrix-sdk/src/event_cache/caches/aggregator.rs b/crates/matrix-sdk/src/event_cache/caches/aggregator.rs index 59c93e75205..238668d08fd 100644 --- a/crates/matrix-sdk/src/event_cache/caches/aggregator.rs +++ b/crates/matrix-sdk/src/event_cache/caches/aggregator.rs @@ -129,3 +129,57 @@ pub async fn aggregate_timeline_for_threads( Ok(new_events_by_thread) } + +pub fn aggregate_timeline_for_pinned_events( + timeline: &Timeline, + pinned_event_ids: &[OwnedEventId], + redaction_rules: &RedactionRules, +) -> Timeline { + let mut new_timeline = Timeline { + limited: timeline.limited, + prev_batch: timeline.prev_batch.clone(), + events: Vec::new(), + }; + + // No events are pinned? The `Timeline` must be empty. + if pinned_event_ids.is_empty() { + return new_timeline; + } + + // Look for events that relate to pinned events. We already know the + // pinned-events, we don't need to look for them. We are only interested by + // related events. + for event in &timeline.events { + match extract_relation(event.raw()) { + // Ohh, this event relates to another event! + Some((relation_type, related_event_id)) => match relation_type { + // `event` relates to a thread: not what we want. + RelationType::Thread => {} + + // `event` represents an annotation (e.g. reactions), a replacement (an edit), a + // reference or something custom. Let's see if the `related_event_id` is a + // pinned-event. + RelationType::Annotation + | RelationType::Replacement + | RelationType::Reference + | _ => { + if pinned_event_ids.contains(&related_event_id) { + new_timeline.events.push(event.clone()); + } + } + }, + + // No explicit relation, but it can be a redaction of a pinned-event! + None => { + if let Some(redaction_target) = + extract_redaction_target(event.raw(), redaction_rules) + && pinned_event_ids.contains(&redaction_target) + { + new_timeline.events.push(event.clone()); + } + } + } + } + + new_timeline +} diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 94d8f17ff91..a9026d2251f 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -22,6 +22,7 @@ use matrix_sdk_base::{ linked_chunk::Position, sync::{JoinedRoomUpdate, LeftRoomUpdate}, }; +use once_cell::sync::OnceCell; use ruma::{OwnedEventId, OwnedRoomId, RoomId, room_version_rules::RoomVersionRules}; use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, broadcast::Sender, mpsc}; @@ -43,6 +44,7 @@ pub mod thread; pub(super) struct Caches { pub room: room::RoomEventCache, pub threads: Arc>>, + pub pinned_events: OnceCell, internals: CachesInternals, } @@ -124,6 +126,7 @@ impl Caches { Ok(Self { room: room_event_cache, threads: Arc::new(RwLock::new(HashMap::new())), + pinned_events: OnceCell::new(), internals: CachesInternals { store, linked_chunk_update_sender, room_version_rules }, }) } @@ -183,9 +186,34 @@ impl Caches { ) } + /// Get or create a [`PinnedEventsCache`]. + /// + /// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache + pub fn pinned_events(&self) -> Result<&pinned_events::PinnedEventsCache> { + self.pinned_events.get_or_try_init(|| { + pinned_events::PinnedEventsCache::new( + self.room.weak_room(), + self.room.own_user_id().clone(), + self.internals.room_version_rules.clone(), + self.internals.linked_chunk_update_sender.clone(), + self.internals.store.clone(), + ) + }) + } + + /// Get a [`PinnedEventsCache`] if it has been initialised. + /// + /// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache + #[cfg(feature = "e2e-encryption")] + pub(super) fn pinned_events_without_initialisation( + &self, + ) -> Option<&pinned_events::PinnedEventsCache> { + self.pinned_events.get() + } + /// Update all the event caches with a [`JoinedRoomUpdate`]. pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { - let Self { room, threads, internals } = &self; + let Self { room, threads, pinned_events, internals } = &self; // Room. { @@ -224,12 +252,24 @@ impl Caches { } } + // Pinned-events. + if let Some(pinned_events) = pinned_events.get() { + let mut updates = updates.clone(); + updates.timeline = aggregator::aggregate_timeline_for_pinned_events( + &updates.timeline, + &pinned_events.state().read().await?.current_event_ids(), + &internals.room_version_rules.redaction, + ); + + pinned_events.handle_joined_room_update(updates).await?; + } + Ok(()) } /// Update all the event caches with a [`LeftRoomUpdate`]. pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { - let Self { room, threads, internals } = &self; + let Self { room, threads, pinned_events, internals } = &self; // Room. { @@ -268,6 +308,18 @@ impl Caches { } } + // Pinned-events. + if let Some(pinned_events) = pinned_events.get() { + let mut updates = updates.clone(); + updates.timeline = aggregator::aggregate_timeline_for_pinned_events( + &updates.timeline, + &pinned_events.state().read().await?.current_event_ids(), + &internals.room_version_rules.redaction, + ); + + pinned_events.handle_left_room_update(updates).await?; + } + Ok(()) } @@ -305,13 +357,19 @@ pub(super) struct ResetCaches<'c> { thread::OwnedThreadEventCacheStateLockWriteGuard, thread::ThreadEventCacheUpdateSender, )>, + pinned_events_lock: Option<( + pinned_events::PinnedEventsCacheStateLockWriteGuard<'c>, + pinned_events::PinnedEventsCacheUpdateSender, + )>, } impl<'c> ResetCaches<'c> { /// Create a new [`ResetCaches`]. /// /// It can fail if acquiring an exclusive lock fails. - async fn new(Caches { room, threads, internals: _ }: &'c mut Caches) -> Result { + async fn new( + Caches { room, threads, pinned_events, internals: _ }: &'c mut Caches, + ) -> Result { // Acquire an exclusive access to the state of the room. let room_lock = (room.state().write().await?, room.update_sender().clone()); @@ -325,7 +383,14 @@ impl<'c> ResetCaches<'c> { .push((thread.state().write_owned().await?, thread.update_sender().clone())); } - Ok(Self { room_lock, threads_lock, thread_locks }) + // Acquire an exclusive access to the pinned-events if any. + let pinned_events_lock = if let Some(pinned_events) = pinned_events.get_mut() { + Some((pinned_events.state().write().await?, pinned_events.update_sender().clone())) + } else { + None + }; + + Ok(Self { room_lock, threads_lock, thread_locks, pinned_events_lock }) } /// Reset all the event caches, and broadcast the [`TimelineVectorDiffs`]. @@ -335,8 +400,9 @@ impl<'c> ResetCaches<'c> { /// /// It can fail if resetting an event cache fails. pub async fn reset_all(self) -> Result<()> { - let Self { room_lock, threads_lock, thread_locks } = self; + let Self { room_lock, threads_lock, thread_locks, pinned_events_lock } = self; + // Room. { let (mut room_state, room_update_sender) = room_lock; @@ -350,6 +416,7 @@ impl<'c> ResetCaches<'c> { ); } + // Threads. { for thread_lock in thread_locks { let (mut thread_state, thread_update_sender) = thread_lock; @@ -370,6 +437,18 @@ impl<'c> ResetCaches<'c> { drop(threads_lock); } + // Pinned-events. + { + if let Some((mut pinned_events_state, pinned_events_update_sender)) = pinned_events_lock + { + let updates_as_vector_diffs = pinned_events_state.reset().await?; + pinned_events_update_sender.send(TimelineVectorDiffs { + diffs: updates_as_vector_diffs, + origin: EventsOrigin::Cache, + }); + } + } + Ok(()) } } diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 3c3fd1304a6..3f56228309e 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -12,44 +12,54 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::BTreeSet, sync::Arc}; +mod updates; +use std::{collections::BTreeSet, fmt, sync::Arc}; + +use eyeball_im::VectorDiff; use futures_util::{StreamExt as _, stream}; use matrix_sdk_base::{ - event_cache::{Event, store::EventCacheStoreLock}, - linked_chunk::{LinkedChunkId, OwnedLinkedChunkId}, - serde_helpers::extract_relation, + apply_redaction, + event_cache::{Event, Gap, store::EventCacheStoreLock}, + linked_chunk::{LinkedChunkId, OwnedLinkedChunkId, Position, Update}, + serde_helpers::extract_redaction_target, + sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, task_monitor::BackgroundTaskHandle, }; +use matrix_sdk_common::executor::spawn; use ruma::{ - MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, - events::{ - AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType, relation::RelationType, - }, - room_version_rules::RedactionRules, - serde::Raw, + EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, + events::{relation::RelationType, room::redaction::SyncRoomRedactionEvent}, + room_version_rules::RoomVersionRules, }; use tokio::sync::broadcast::{Receiver, Sender}; use tracing::{debug, instrument, trace, warn}; +pub(super) use self::updates::PinnedEventsCacheUpdateSender; #[cfg(feature = "e2e-encryption")] use super::super::redecryptor::ResolvedUtd; use super::{ - super::{EventCacheError, EventsOrigin, Result, persistence::send_updates_to_store}, - TimelineVectorDiffs, - event_linked_chunk::EventLinkedChunk, + super::{ + EventCacheError, EventsOrigin, Result, + deduplicator::{DeduplicationOutcome, filter_duplicate_events}, + persistence::{find_event, send_updates_to_store}, + }, + EventLocation, TimelineVectorDiffs, + event_linked_chunk::{EventLinkedChunk, sort_positions_descending}, lock, - lock::Reload as _, room::RoomEventCacheLinkedChunkUpdate, }; use crate::{Room, client::WeakClient, config::RequestConfig, room::WeakRoom}; -pub(in super::super) struct PinnedEventCacheState { +pub(in super::super) struct PinnedEventsCacheState { /// The ID of the room owning this list of pinned events. room_id: OwnedRoomId, - /// A sender for live events updates in this room's pinned events list. - sender: Sender, + /// The user's own user id. + own_user_id: OwnedUserId, + + /// The rules for the version of this room. + room_version_rules: RoomVersionRules, /// The linked chunk representing this room's pinned events. /// @@ -60,9 +70,14 @@ pub(in super::super) struct PinnedEventCacheState { chunk: EventLinkedChunk, /// Reference to the underlying backing store. - // TODO: can be removed? store: EventCacheStoreLock, + /// A clone of [`PinnedEventsCacheInner::update_sender`]. + /// + /// This is used only by the [`LockedPinnedEventsCacheState::read`] and + /// [`LockedPinnedEventsCacheState::write`] when the state must be reset. + update_sender: PinnedEventsCacheUpdateSender, + /// A sender for the globally observable linked chunk updates that happened /// during a sync or a back-pagination. /// @@ -70,16 +85,16 @@ pub(in super::super) struct PinnedEventCacheState { linked_chunk_update_sender: Sender, } -impl lock::Store for PinnedEventCacheState { +impl lock::Store for PinnedEventsCacheState { fn store(&self) -> &EventCacheStoreLock { &self.store } } #[cfg(not(tarpaulin_include))] -impl std::fmt::Debug for PinnedEventCacheState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PinnedEventCacheState") +impl fmt::Debug for PinnedEventsCacheState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PinnedEventsCacheState") .field("room_id", &self.room_id) .field("chunk", &self.chunk) .finish_non_exhaustive() @@ -90,12 +105,12 @@ impl std::fmt::Debug for PinnedEventCacheState { /// /// This contains all the inner mutable states that ought to be updated at /// the same time. -pub type PinnedEventCacheStateLock = lock::StateLock; +pub type LockedPinnedEventsCacheState = lock::StateLock; -pub type PinnedEventCacheStateLockWriteGuard<'a> = - lock::StateLockWriteGuard<'a, PinnedEventCacheState>; +pub type PinnedEventsCacheStateLockWriteGuard<'a> = + lock::StateLockWriteGuard<'a, PinnedEventsCacheState>; -impl<'a> lock::Reload for PinnedEventCacheStateLockWriteGuard<'a> { +impl<'a> lock::Reload for PinnedEventsCacheStateLockWriteGuard<'a> { async fn reload(&mut self) -> Result<()> { self.reload_from_storage().await?; @@ -103,7 +118,225 @@ impl<'a> lock::Reload for PinnedEventCacheStateLockWriteGuard<'a> { } } -impl<'a> PinnedEventCacheStateLockWriteGuard<'a> { +impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { + /// Reset this data structure as if it were brand new. + /// + /// However, pinned-events are not only stored here. They are also coming at + /// a state-event containing all the pinned-events ID list in the `Room`. + /// This is a best-effort here: we are resetting the events stored in this + /// cache only. + /// + /// Return a single diff update that is a clear of all events; as a + /// result, the caller may override any pending diff updates + /// with the result of this function. + pub async fn reset(&mut self) -> Result>> { + self.state.chunk.reset(); + self.propagate_changes().await?; + + let diff_updates = self.state.chunk.updates_as_vector_diffs(); + + // Ensure the contract defined in the doc comment is true: + debug_assert_eq!(diff_updates.len(), 1); + debug_assert!(matches!(diff_updates[0], VectorDiff::Clear)); + + Ok(diff_updates) + } + + async fn handle_sync(&mut self, timeline: Timeline) -> Result<()> { + let DeduplicationOutcome { + all_events: events, + in_memory_duplicated_event_ids, + in_store_duplicated_event_ids, + non_empty_all_duplicates: all_duplicates, + } = filter_duplicate_events( + &self.state.own_user_id, + &self.store, + LinkedChunkId::PinnedEvents(&self.state.room_id), + &self.state.chunk, + timeline.events, + ) + .await?; + + if all_duplicates { + // If all events are duplicates, we don't need to do anything; ignore + // the new events. + return Ok(()); + } + + // Remove the old duplicated events. + // + // We don't have to worry about the removals can change the position of the + // existing events, because we are pushing all _new_ `events` at the back. + self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids).await?; + + // We've found new relations; append them to the linked chunk. + self.state.chunk.push_live_events(None, &events); + + self.propagate_changes().await?; + self.notify_subscribers(EventsOrigin::Sync); + + // Do stuff for each event. + for event in &events { + // Handle redaction. + self.maybe_apply_new_redaction(event).await?; + } + + Ok(()) + } + + /// Remove events by their position, in `EventLinkedChunk`. + /// + /// This method is purposely isolated because it must ensure that + /// positions are sorted appropriately or it can be disastrous. + #[instrument(skip_all)] + pub async fn remove_events( + &mut self, + in_memory_events: Vec<(OwnedEventId, Position)>, + in_store_events: Vec<(OwnedEventId, Position)>, + ) -> Result<()> { + // In-store events. + if !in_store_events.is_empty() { + let mut positions = in_store_events + .into_iter() + .map(|(_event_id, position)| position) + .collect::>(); + + sort_positions_descending(&mut positions); + + let updates = + positions.into_iter().map(|pos| Update::RemoveItem { at: pos }).collect::>(); + + self.apply_store_only_updates(updates).await?; + } + + // In-memory events. + if in_memory_events.is_empty() { + // Nothing else to do, return early. + return Ok(()); + } + + // `remove_events_by_position` is responsible of sorting positions. + self.state + .chunk + .remove_events_by_position( + in_memory_events.into_iter().map(|(_event_id, position)| position).collect(), + ) + .expect("failed to remove an event"); + + self.propagate_changes().await + } + + /// Apply some updates that are effective only on the store itself. + /// + /// This method should be used only for updates that happen *outside* + /// the in-memory linked chunk. Such updates must be applied + /// onto the ordering tracker as well as to the persistent + /// storage. + async fn apply_store_only_updates(&mut self, updates: Vec>) -> Result<()> { + self.state.chunk.order_tracker.map_updates(&updates); + self.send_updates_to_store(updates).await + } + + /// If the given event is a redaction, try to retrieve the + /// to-be-redacted event in the chunk, and replace it by the + /// redacted form. + #[instrument(skip_all)] + async fn maybe_apply_new_redaction(&mut self, event: &Event) -> Result<()> { + let Some(event_id) = + extract_redaction_target(event.raw(), &self.room_version_rules.redaction) + else { + return Ok(()); + }; + + // Replace the redacted event by a redacted form, if we knew about it. + let Some((location, mut target_event)) = self.find_event(&event_id).await? else { + trace!("redacted event is missing from the linked chunk"); + return Ok(()); + }; + + let target_event_raw = target_event.raw(); + + // Don't redact already redacted events. + if let Ok(deserialized) = target_event_raw.deserialize() + && deserialized.is_redacted() + { + return Ok(()); + } + + if let Some(redacted_event) = apply_redaction( + target_event_raw, + event.raw().cast_ref_unchecked::(), + &self.room_version_rules.redaction, + ) { + // It's safe to cast `redacted_event` here: + // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent` + // when calling .raw(), so it's still one under the hood. + // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case. + target_event.replace_raw(redacted_event.cast_unchecked()); + + self.replace_event_at(location, target_event.clone()).await?; + } + + Ok(()) + } + + /// See documentation of [`find_event`]. + pub(super) async fn find_event( + &self, + event_id: &EventId, + ) -> Result> { + find_event(event_id, &self.room_id, &self.chunk, &self.store).await + } + + /// Replaces a single event, be it saved in memory or in the store. + /// + /// If it was saved in memory, this will emit a notification to + /// observers that a single item has been replaced. Otherwise, + /// such a notification is not emitted, because observers are + /// unlikely to observe the store updates directly. + pub async fn replace_event_at( + &mut self, + location: EventLocation, + new_event: Event, + ) -> Result<()> { + match location { + EventLocation::Memory(position) => { + self.state + .chunk + .replace_event_at(position, new_event) + .expect("should have been a valid position of an item"); + // We just changed the in-memory representation; synchronize this with + // the store. + self.propagate_changes().await?; + } + EventLocation::Store => { + self.save_events([new_event]).await?; + } + } + + Ok(()) + } + + /// Save events into the database, without notifying observers. + pub async fn save_events(&mut self, events: impl IntoIterator) -> Result<()> { + let store = self.store.clone(); + let room_id = self.state.room_id.clone(); + let events = events.into_iter().collect::>(); + + // Spawn a task so the save is uninterrupted by task cancellation. + spawn(async move { + for event in events { + store.save_event(&room_id, event).await?; + } + + Result::Ok(()) + }) + .await + .expect("joining failed")?; + + Ok(()) + } + /// Reload all the pinned events from storage, replacing the current linked /// chunk. async fn reload_from_storage(&mut self) -> Result<()> { @@ -170,9 +403,15 @@ impl<'a> PinnedEventCacheStateLockWriteGuard<'a> { /// Propagate the changes in this linked chunk to observers, and save the /// changes on disk. - async fn propagate_changes(&mut self) -> Result<()> { + pub async fn propagate_changes(&mut self) -> Result<()> { let updates = self.state.chunk.store_updates().take(); - let linked_chunk_id = OwnedLinkedChunkId::PinnedEvents(self.state.room_id.clone()); + + self.send_updates_to_store(updates).await + } + + async fn send_updates_to_store(&mut self, updates: Vec>) -> Result<()> { + let linked_chunk_id = OwnedLinkedChunkId::PinnedEvents(self.room_id.clone()); + send_updates_to_store( &self.store, linked_chunk_id, @@ -185,67 +424,103 @@ impl<'a> PinnedEventCacheStateLockWriteGuard<'a> { /// Notify subscribers of timeline updates. fn notify_subscribers(&mut self, origin: EventsOrigin) { let diffs = self.state.chunk.updates_as_vector_diffs(); + if !diffs.is_empty() { - let _ = self.state.sender.send(TimelineVectorDiffs { diffs, origin }); + self.update_sender.send(TimelineVectorDiffs { diffs, origin }); } } } -impl PinnedEventCacheState { +impl PinnedEventsCacheState { /// Return a list of the current event IDs in this linked chunk. - fn current_event_ids(&self) -> Vec { + pub(super) fn current_event_ids(&self) -> Vec { self.chunk.events().filter_map(|(_position, event)| event.event_id()).collect() } } -/// All the information related to a room's pinned events cache. +/// All the information related to room's pinned events.. /// -/// This is cheap to clone, because it's a shallow data type. +/// Cloning is shallow, and thus is cheap to do. #[derive(Clone)] -pub struct PinnedEventCache { - state: Arc, +pub struct PinnedEventsCache { + inner: Arc, +} + +/// The (non-cloneable) details of the `PinnedEventsCache`. +struct PinnedEventsCacheInner { + /// The ID of the room owning this list of pinned events. + room_id: OwnedRoomId, + + /// State of this `PinnedEventsCache`. + /// + /// It is behind an `Arc` because it is shared with the task. + state: Arc, + + /// Update sender for this pinned events cache. + update_sender: PinnedEventsCacheUpdateSender, /// The task handling the refreshing of pinned events for this specific /// room. - _task: Arc, + _task: BackgroundTaskHandle, } -impl PinnedEventCache { - /// Creates a new [`PinnedEventCache`] for the given room. +impl PinnedEventsCache { + /// Creates a new [`PinnedEventsCache`] for the given room. pub(in super::super) fn new( - room: Room, + weak_room: &WeakRoom, + own_user_id: OwnedUserId, + room_version_rules: RoomVersionRules, linked_chunk_update_sender: Sender, store: EventCacheStoreLock, - ) -> Self { - let sender = Sender::new(32); - + ) -> Result { + let room = weak_room.get().ok_or(EventCacheError::ClientDropped)?; let room_id = room.room_id().to_owned(); + let update_sender = PinnedEventsCacheUpdateSender::new(); + let chunk = EventLinkedChunk::new(); - let state = - PinnedEventCacheState { room_id, chunk, sender, linked_chunk_update_sender, store }; - let state = Arc::new(PinnedEventCacheStateLock::new_inner(state)); - - let task = Arc::new( - room.client() - .task_monitor() - .spawn_infinite_task( - "pinned_event_listener_task", - Self::pinned_event_listener_task(room, state.clone()), - ) - .abort_on_drop(), - ); - - Self { state, _task: task } + let state = PinnedEventsCacheState { + room_id: room_id.clone(), + own_user_id, + room_version_rules, + chunk, + update_sender: update_sender.clone(), + linked_chunk_update_sender, + store, + }; + let state = Arc::new(LockedPinnedEventsCacheState::new_inner(state)); + + let task = room + .client() + .task_monitor() + .spawn_infinite_task( + "pinned_event_listener_task", + Self::pinned_event_listener_task(room, state.clone()), + ) + .abort_on_drop(); + + Ok(Self { + inner: Arc::new(PinnedEventsCacheInner { room_id, state, update_sender, _task: task }), + }) + } + + /// Return a reference to the state. + pub(super) fn state(&self) -> &LockedPinnedEventsCacheState { + &self.inner.state + } + + /// Get a reference to the [`RoomEventCacheUpdateSender`]. + pub(in super::super) fn update_sender(&self) -> &PinnedEventsCacheUpdateSender { + &self.inner.update_sender } /// Subscribe to live events from this room's pinned events cache. pub async fn subscribe(&self) -> Result<(Vec, Receiver)> { - let guard = self.state.read().await?; + let guard = self.inner.state.read().await?; let events = guard.state.chunk.events().map(|(_position, item)| item.clone()).collect(); - let recv = guard.state.sender.subscribe(); + let recv = guard.state.update_sender.new_pinned_events_receiver(); Ok((events, recv)) } @@ -254,8 +529,8 @@ impl PinnedEventCache { /// list of decrypted events, and replace them, while alerting observers /// about the update. #[cfg(feature = "e2e-encryption")] - pub(in crate::event_cache) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> { - let mut guard = self.state.write().await?; + pub(in super::super) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> { + let mut guard = self.inner.state.write().await?; if guard.state.chunk.replace_utds(events) { guard.propagate_changes().await?; @@ -265,99 +540,38 @@ impl PinnedEventCache { Ok(()) } - /// Given a raw event, try to extract the target event ID of a relation as - /// defined with `m.relates_to`. - fn extract_relation_target(raw: &Raw) -> Option { - let (rel_type, event_id) = extract_relation(raw)?; + /// Handle a [`JoinedRoomUpdate`]. + #[instrument(skip_all, fields(room_id = %self.inner.room_id))] + pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { + self.handle_timeline(updates.timeline).await?; - // Don't include thread responses in the pinned event chunk. - match rel_type { - RelationType::Thread => None, - _ => Some(event_id), - } + Ok(()) } - /// Given a raw event, try to extract the target event ID of a live - /// redaction. - fn extract_redaction_target( - raw: &Raw, - room_redaction_rules: &RedactionRules, - ) -> Option { - // Try to find a redaction, but do not deserialize the entire event if we aren't - // certain it's a `m.room.redaction`. - if raw.get_field::("type").ok()?? - != MessageLikeEventType::RoomRedaction - { - return None; - } + /// Handle a [`LeftRoomUpdate`]. + #[instrument(skip_all, fields(room_id = %self.inner.room_id))] + pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { + self.handle_timeline(updates.timeline).await?; - let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(redaction)) = - raw.deserialize().ok()? - else { - return None; - }; - - redaction.redacts(room_redaction_rules).map(ToOwned::to_owned).or_else(|| { - warn!("missing target event id from the redaction event"); - None - }) + Ok(()) } - /// Check if any of the given events relate to an event in the pinned events - /// linked chunk, and append it, in this case. - pub(in super::super) async fn maybe_add_live_related_events( - &mut self, - events: &[Event], - room_redaction_rules: &RedactionRules, - ) -> Result<()> { - trace!("checking live events for relations to pinned events"); - let mut guard = self.state.write().await?; - - let pinned_event_ids: BTreeSet = - guard.state.current_event_ids().into_iter().collect(); - - if pinned_event_ids.is_empty() { + /// Handle a [`Timeline`], i.e. new events received by a sync for this + /// thread. + async fn handle_timeline(&self, timeline: Timeline) -> Result<()> { + if timeline.events.is_empty() { return Ok(()); } - let mut new_relations = Vec::new(); - - // For all events that relate to an event in the pinned events chunk, push this - // event to the linked chunk, and propagate changes to observers. - for ev in events { - // Try to find a regular relation in ev. - if let Some(relation_target) = Self::extract_relation_target(ev.raw()) - && pinned_event_ids.contains(&relation_target) - { - new_relations.push(ev.clone()); - continue; - } - - // Try to find a redaction in ev. - if let Some(redaction_target) = - Self::extract_redaction_target(ev.raw(), room_redaction_rules) - && pinned_event_ids.contains(&redaction_target) - { - new_relations.push(ev.clone()); - continue; - } - } - - if !new_relations.is_empty() { - trace!("found {} new related events to pinned events", new_relations.len()); + trace!("adding new {} events", timeline.events.len()); - // We've found new relations; append them to the linked chunk. - guard.state.chunk.push_live_events(None, &new_relations); - - guard.propagate_changes().await?; - guard.notify_subscribers(EventsOrigin::Sync); - } + self.inner.state.write().await?.handle_sync(timeline).await?; Ok(()) } #[instrument(fields(%room_id = room.room_id()), skip(room, state))] - async fn pinned_event_listener_task(room: Room, state: Arc) { + async fn pinned_event_listener_task(room: Room, state: Arc) { debug!("pinned events listener task started"); let reload_from_network = async |room: Room| { @@ -545,12 +759,10 @@ impl PinnedEventCache { Ok(Some(loaded_events)) } +} - /// Force to reload the pinned events. - // - // TODO(@hywan): Temporary fix. All the states must be in a single struct behind - // the cross-process lock instead of being dispatched in each cache. - pub(super) async fn reload(&self) -> Result<()> { - self.state.write().await?.reload().await +impl fmt::Debug for PinnedEventsCache { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PinnedEventsCache").finish_non_exhaustive() } } diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/updates.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/updates.rs new file mode 100644 index 00000000000..8feb576a14c --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/updates.rs @@ -0,0 +1,40 @@ +// Copyright 2026 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tokio::sync::broadcast::{Receiver, Sender}; + +use super::super::TimelineVectorDiffs; + +/// A small type to send updates in all channels. +#[derive(Clone)] +pub struct PinnedEventsCacheUpdateSender { + thread_sender: Sender, +} + +impl PinnedEventsCacheUpdateSender { + /// Create a new [`PinnedEventsCacheUpdateSender`]. + pub fn new() -> Self { + Self { thread_sender: Sender::new(32) } + } + + /// Send a [`TimelineVectorDiffs`]. + pub fn send(&self, thread_update: TimelineVectorDiffs) { + let _ = self.thread_sender.send(thread_update); + } + + /// Create a new [`Receiver`] of [`TimelineVectorDiffs`]. + pub(super) fn new_pinned_events_receiver(&self) -> Receiver { + self.thread_sender.subscribe() + } +} diff --git a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs index 9eb26092123..d5a1e39153d 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs @@ -34,7 +34,7 @@ use ruma::{ events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType}, serde::Raw, }; -use tokio::sync::{Notify, broadcast::Receiver, mpsc}; +use tokio::sync::{Notify, mpsc}; use tracing::{instrument, trace, warn}; pub(super) use self::state::{ @@ -150,24 +150,6 @@ impl RoomEventCache { Ok((events, subscriber)) } - /// Subscribe to the pinned event cache for this room. - /// - /// This is a persisted view over the pinned events of a room. - /// - /// The pinned events will be initially reloaded from storage, and/or loaded - /// from a network request to fetch the latest pinned events and their - /// relations, to update it as needed. The list of pinned events will - /// also be kept up-to-date as new events are pinned, and new - /// related events show up from other sources. - pub async fn subscribe_to_pinned_events( - &self, - ) -> Result<(Vec, Receiver)> { - let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?; - let state = self.inner.state.read().await?; - - state.subscribe_to_pinned_events(room).await - } - /// Create or get an event-focused timeline cache for this room. /// /// This creates a timeline centered around a specific event (e.g., for diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index 1bd2d073b2d..f2a2f3facc7 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -15,7 +15,7 @@ use std::{ collections::HashMap, sync::{ - Arc, OnceLock, + Arc, atomic::{AtomicUsize, Ordering}, }, }; @@ -47,7 +47,7 @@ use ruma::{ room_version_rules::RoomVersionRules, serde::Raw, }; -use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::broadcast::Sender; use tracing::{debug, error, instrument, trace, warn}; use super::{ @@ -66,13 +66,12 @@ use super::{ event_linked_chunk::EventLinkedChunk, lock, pagination::SharedPaginationStatus, - pinned_events::PinnedEventCache, read_receipts::compute_unread_counts, }, EventsOrigin, RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, RoomEventCacheUpdateSender, sort_positions_descending, }; -use crate::{Room, room::WeakRoom}; +use crate::room::WeakRoom; /// Key for the event-focused caches. #[derive(Hash, PartialEq, Eq)] @@ -110,9 +109,6 @@ pub struct RoomEventCacheState { /// permalink). event_focused_caches: HashMap, - /// Cache for pinned events in this room, initialized on-demand. - pinned_event_cache: OnceLock, - pagination_status: SharedObservable, /// A clone of [`super::RoomEventCacheInner::update_sender`]. @@ -260,7 +256,6 @@ impl LockedRoomEventCacheState { room_version_rules, waited_for_initial_prev_token: false, subscriber_count: Default::default(), - pinned_event_cache: OnceLock::new(), automatic_pagination, })) } @@ -281,11 +276,6 @@ impl<'a> lock::Reload for RoomEventCacheStateLockWriteGuard<'a> { async fn reload(&mut self) -> Result<(), EventCacheError> { self.shrink_to_last_chunk().await?; - // Reload the pinned-events. - if let Some(pinned_event_cache) = self.pinned_event_cache.get_mut() { - pinned_event_cache.reload().await?; - } - let diffs = self.state.room_linked_chunk.updates_as_vector_diffs(); // Notify observers about the update. @@ -360,32 +350,6 @@ impl<'a> RoomEventCacheStateLockReadGuard<'a> { EventCacheStoreLockGuard::is_dirty(&self.store) } - /// Subscribe to the lazily initialized pinned event cache for this - /// room. - /// - /// This is a persisted view over the pinned events of a room. The - /// pinned events will be initially loaded from a network - /// request to fetch the latest pinned events will be performed, - /// to update it as needed. The list of pinned events will also - /// be kept up-to-date as new events are pinned, and new related - /// events show up from sync or backpagination. - /// - /// This requires the room's event cache to be initialized. - pub async fn subscribe_to_pinned_events( - &self, - room: Room, - ) -> Result<(Vec, Receiver), EventCacheError> { - let pinned_event_cache = self.state.pinned_event_cache.get_or_init(|| { - PinnedEventCache::new( - room, - self.state.linked_chunk_update_sender.clone(), - self.state.store.clone(), - ) - }); - - pinned_event_cache.subscribe().await - } - /// Get an event-focused cache for this event and thread mode, if it /// exists. /// @@ -405,13 +369,6 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { &mut self.state.room_linked_chunk } - /// Get a reference to the [`pinned_event_cache`] if it has been - /// initialized. - #[cfg(any(feature = "e2e-encryption", test))] - pub fn pinned_event_cache(&self) -> Option<&PinnedEventCache> { - self.state.pinned_event_cache.get() - } - /// Get a reference to all the live [`event_focused_caches`]. #[cfg(feature = "e2e-encryption")] pub fn event_focused_caches(&self) -> impl Iterator { @@ -737,16 +694,6 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { // Update the store before doing the post-processing. self.propagate_changes().await?; - // Need an explicit re-borrow to avoid a deref vs deref-mut borrowck conflict - // below. - let state = &mut *self.state; - - if let Some(pinned_event_cache) = state.pinned_event_cache.get_mut() { - pinned_event_cache - .maybe_add_live_related_events(&events, &state.room_version_rules.redaction) - .await?; - } - for event in events { self.maybe_apply_new_redaction(&event).await?; diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index 5151085a019..f2837458e6e 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -54,7 +54,7 @@ pub struct ThreadEventCache { inner: Arc, } -/// The (non-cloneable) details of the `RoomEventCache`. +/// The (non-cloneable) details of the `ThreadEventCache`. struct ThreadEventCacheInner { /// The room ID. room_id: OwnedRoomId, @@ -71,7 +71,7 @@ struct ThreadEventCacheInner { /// A notifier that we received a new pagination token. pagination_batch_token_notifier: Notify, - /// Update sender for this room. + /// Update sender for this thread. update_sender: ThreadEventCacheUpdateSender, } @@ -164,7 +164,7 @@ impl ThreadEventCache { &self.inner.state } - /// Get a reference to the [`RoomEventCacheUpdateSender`]. + /// Get a reference to the [`ThreadEventCacheUpdateSender`]. pub(in super::super) fn update_sender(&self) -> &ThreadEventCacheUpdateSender { &self.inner.update_sender } diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs index a77ca7d671c..bdb7e7fc9ec 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs @@ -75,8 +75,8 @@ pub struct ThreadEventCacheState { /// A clone of [`super::ThreadEventCacheInner::update_sender`]. /// - /// This is used only by the [`ThreadEventCacheStateLock::read`] and - /// [`ThreadEventCacheStateLock::write`] when the state must be reset. + /// This is used only by the [`LockedThreadEventCacheState::read`] and + /// [`LockedThreadEventCacheState::write`] when the state must be reset. update_sender: ThreadEventCacheUpdateSender, /// A sender for the globally observable linked chunk updates that happened diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index c94c96ae3c4..14878b4487b 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -75,6 +75,7 @@ pub use self::{ TimelineVectorDiffs, event_focused::{EventFocusThreadMode, EventFocusedCache}, pagination::{BackPaginationOutcome, PaginationStatus}, + pinned_events::PinnedEventsCache, room::{ RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheSubscriber, RoomEventCacheUpdate, pagination::RoomPagination, @@ -388,6 +389,32 @@ impl EventCache { Ok((caches_for_room.thread(thread_id.to_owned()).await?.deref().clone(), drop_handles)) } + /// Return a pinned-events-specific view over the [`EventCache`]. + pub async fn pinned_events( + &self, + room_id: &RoomId, + ) -> Result<(PinnedEventsCache, Arc)> { + let Some(drop_handles) = self.inner.drop_handles.get().cloned() else { + return Err(EventCacheError::NotSubscribedYet); + }; + + let caches_for_room = self.inner.all_caches_for_room(room_id).await?; + + Ok((caches_for_room.pinned_events()?.clone(), drop_handles)) + } + + /// Return a pinned-events-specific view over the [`EventCache`] if it has + /// been initialised. + #[cfg(feature = "e2e-encryption")] + async fn pinned_events_without_initialisation( + &self, + room_id: &RoomId, + ) -> Result> { + let caches_for_room = self.inner.all_caches_for_room(room_id).await?; + + Ok(caches_for_room.pinned_events_without_initialisation().cloned()) + } + /// Cleanly clear all the rooms' event caches. /// /// This will notify any live observers that the room has been cleared. diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 17d62c18738..56b1903e158 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -370,14 +370,14 @@ impl EventCache { // Phase 1: under the room state write lock, collect cache handles and // perform all room-linked-chunk mutations. We deliberately do NOT call - // replace_utds() on event-focused/pinned caches here to avoid an ABBA deadlock: - // pagination holds an event-focused cache lock and then tries to acquire the - // room state lock (via `save_events`), while this method would hold the - // room state lock and try to acquire event-focused cache locks. - let (pinned_cache, ef_caches) = { + // replace_utds() on event-focused/pinned-evenst caches here to avoid an ABBA + // deadlock: pagination holds an event-focused cache lock and then tries + // to acquire the room state lock (via `save_events`), while this method + // would hold the room state lock and try to acquire event-focused cache + // locks. + let ef_caches = { let mut state = room_cache.state().write().await?; - let pinned_cache = state.pinned_event_cache().cloned(); let ef_caches: Vec<_> = state.event_focused_caches().cloned().collect(); // Consider the room linked chunk. @@ -416,15 +416,17 @@ impl EventCache { Some(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() }), ); - (pinned_cache, ef_caches) + ef_caches }; // Room state write lock is dropped here. - // Phase 2: replace UTDs in pinned and event-focused caches WITHOUT - // holding the room state lock. These caches have their own internal - // locks and don't need the room state lock. - if let Some(pinned_cache) = pinned_cache { - pinned_cache.replace_utds(&events).await?; + // Phase 2: replace UTDs in pinned-events and event-focused caches + // WITHOUT holding the room state lock. These caches have their own + // internal locks and don't need the room state lock. + if let Ok(Some(pinned_events_cache)) = + self.pinned_events_without_initialisation(room_id).await + { + pinned_events_cache.replace_utds(&events).await?; } // TODO: This ain't great for performance; there shouldn't be that many diff --git a/crates/matrix-sdk/tests/integration/room/pinned_events.rs b/crates/matrix-sdk/tests/integration/room/pinned_events.rs index 69b6d41a3a6..b1d8f12c311 100644 --- a/crates/matrix-sdk/tests/integration/room/pinned_events.rs +++ b/crates/matrix-sdk/tests/integration/room/pinned_events.rs @@ -171,8 +171,10 @@ async fn test_pinned_events_are_loaded_from_network_then_are_reloaded_from_stora .build() .await; + let event_cache = client.event_cache(); + // Subscribe the event cache to sync updates. - client.event_cache().subscribe().unwrap(); + event_cache.subscribe().unwrap(); // Sync the room with the pinned event ID in the room state. // @@ -181,19 +183,20 @@ async fn test_pinned_events_are_loaded_from_network_then_are_reloaded_from_stora // events. let pinned_events_state = f.room_pinned_events(vec![pinned_event_id.to_owned()]); - let room = server + let _room = server .sync_room( &client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![pinned_events_state.into()]), ) .await; - // Get the room event cache and subscribe to pinned events. - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + // Get the pinned events cache and subscribe to it. + let (pinned_events_cache, _drop_handles) = + event_cache.pinned_events(room_id).await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventCache::new() which - // spawns a task that calls reload_from_storage() first. - let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); + // Getting the pinned events cache triggers `PinnedEventsCache::new()` which + // spawns a task that calls `reload_from_storage()` first. + let (events, mut subscriber) = pinned_events_cache.subscribe().await.unwrap(); let mut events = events.into(); // Wait for the background task to reload the events. @@ -229,17 +232,19 @@ async fn test_pinned_events_are_loaded_from_network_then_are_reloaded_from_stora .build() .await; + let event_cache = client.event_cache(); + // Subscribe the event cache to sync updates. - client.event_cache().subscribe().unwrap(); + event_cache.subscribe().unwrap(); - let room = client.get_room(room_id).unwrap(); + let _room = client.get_room(room_id).unwrap(); - // Get the room event cache and subscribe to pinned events. - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + // Get the pinned events cache and subscribe to it. + let (pinned_events_cache, _drop_handles) = event_cache.pinned_events(room_id).await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventCache::new() which - // spawns a task that calls reload_from_storage() first. - let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); + // Getting the pinned events cache triggers `PinnedEventsCache::new()` which + // spawns a task that calls `reload_from_storage()` first. + let (events, mut subscriber) = pinned_events_cache.subscribe().await.unwrap(); let mut events = events.into(); // Wait for the background task to reload the events from storage. @@ -277,8 +282,10 @@ async fn test_pinned_events_are_reloaded_from_storage_from_many_chunks() { let server = MatrixMockServer::new().await; let client = server.client_builder().build().await; + let event_cache = client.event_cache(); + // Create a non empty Event Cache store containing two chunks. - client.event_cache().subscribe().unwrap(); + event_cache.subscribe().unwrap(); client .event_cache_store() .lock() @@ -308,14 +315,14 @@ async fn test_pinned_events_are_reloaded_from_storage_from_many_chunks() { .await .unwrap(); - let room = server.sync_room(&client, JoinedRoomBuilder::new(room_id)).await; + let _room = server.sync_room(&client, JoinedRoomBuilder::new(room_id)).await; - // Get the room event cache and subscribe to pinned events. - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + // Get the pinned events cache and subscribe to it. + let (pinned_events_cache, _drop_handles) = event_cache.pinned_events(room_id).await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventCache::new() which - // spawns a task that calls reload_from_storage() first. - let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); + // Getting the pinned events cache triggers `PinnedEventsCache::new()` which + // spawns a task that calls `reload_from_storage()` first. + let (events, mut subscriber) = pinned_events_cache.subscribe().await.unwrap(); let mut events = events.into(); // Wait for the background task to reload the events. @@ -368,9 +375,10 @@ async fn test_pinned_events_dont_include_thread_responses() { .await; let client = server.client_builder().build().await; + let event_cache = client.event_cache(); // Subscribe the event cache to sync updates. - client.event_cache().subscribe().unwrap(); + event_cache.subscribe().unwrap(); // Sync the room with the pinned event ID in the room state. // @@ -379,7 +387,7 @@ async fn test_pinned_events_dont_include_thread_responses() { // events. let pinned_events_state = f.room_pinned_events(vec![pinned_event_id.to_owned()]); - let room = server + let _room = server .sync_room( &client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![pinned_events_state.into()]), @@ -387,11 +395,11 @@ async fn test_pinned_events_dont_include_thread_responses() { .await; // Get the room event cache and subscribe to pinned events. - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + let (pinned_events_cache, _drop_handles) = event_cache.pinned_events(room_id).await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventCache::new() which - // spawns a task that calls reload_from_storage() first. - let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); + // Getting the pinned events cache triggers `PinnedEventsCache::new()` which + // spawns a task that calls `reload_from_storage()` first. + let (events, mut subscriber) = pinned_events_cache.subscribe().await.unwrap(); let mut events = events.into(); // Wait for the background task to reload the events.