diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index bb1ae9b3f68..5164f421b0d 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -374,15 +374,9 @@ impl TimelineController

{ TimelineFocus::Event { target, thread_mode, num_context_events, .. } => { TimelineFocusKind::Event { event_cache: event_cache - .room(room_id) + .event_focused(room_id, target, (*thread_mode).into(), *num_context_events) .await? - .0 - .get_or_create_event_focused_cache( - target.clone(), - *num_context_events, - (*thread_mode).into(), - ) - .await?, + .0, focused_event_id: target.clone(), // This will be initialized in `Self::init_focus`. thread_root: OnceLock::new(), diff --git a/crates/matrix-sdk/src/event_cache/caches/event_focused/mod.rs b/crates/matrix-sdk/src/event_cache/caches/event_focused/mod.rs index 800011a6e38..2005c2e0c9e 100644 --- a/crates/matrix-sdk/src/event_cache/caches/event_focused/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/event_focused/mod.rs @@ -31,6 +31,7 @@ use std::sync::Arc; +use eyeball_im::VectorDiff; use matrix_sdk_base::{ deserialized_responses::TimelineEvent, event_cache::{Event, Gap}, @@ -90,7 +91,7 @@ pub(crate) enum EventFocusedPaginationMode { }, } -struct EventFocusedCacheInner { +pub(super) struct EventFocusedCacheState { /// The room owning this event-focused cache. room: WeakRoom, @@ -104,13 +105,13 @@ struct EventFocusedCacheInner { chunk: EventLinkedChunk, /// A sender of timeline updates. - sender: Sender, + sender: EventFocusedCacheUpdateSender, /// A sender for globally observable linked chunk updates. linked_chunk_update_sender: Sender, } -impl EventFocusedCacheInner { +impl EventFocusedCacheState { /// Initialize the cache from a focused event. /// /// This uses `/context` to fetch the event with surrounding context. @@ -121,13 +122,14 @@ impl EventFocusedCacheInner { /// Pagination tokens are stored as gaps in the linked chunk: /// - Backward token (start): Gap at the front of the linked chunk. /// - Forward token (end): Gap at the back of the linked chunk. - #[instrument(skip(self, room), fields(room_id = %self.room.room_id(), event_id = %self.focused_event_id))] + #[instrument(skip(self), fields(room_id = %self.room.room_id(), event_id = %self.focused_event_id))] async fn start_from( &mut self, - room: Room, num_context_events: u16, thread_mode: EventFocusThreadMode, ) -> Result { + let room = self.room.get().ok_or(EventCacheError::ClientDropped)?; + trace!(num_context_events, "fetching event with context via /context"); let paginator = Paginator::new(room); @@ -502,6 +504,24 @@ impl EventFocusedCacheInner { Ok((result.chunk, result.next_batch_token)) } + + /// Reset this data structure as if it were brand new. + /// + /// Return a single diff update that is a clear of all events; as a + /// result, the caller may override any pending diff updates + /// with the result of this function. + pub fn reset(&mut self) -> Result>> { + self.chunk.reset(); + self.propagate_changes(); + + let diff_updates = self.chunk.updates_as_vector_diffs(); + + // Ensure the contract defined in the doc comment is true: + debug_assert_eq!(diff_updates.len(), 1); + debug_assert!(matches!(diff_updates[0], VectorDiff::Clear)); + + Ok(diff_updates) + } } /// A cache for an event-focused timeline. @@ -520,7 +540,7 @@ impl EventFocusedCacheInner { /// This is a shallow data structure, and can be cloned cheaply. #[derive(Clone)] pub struct EventFocusedCache { - inner: Arc>, + inner: Arc>, } impl EventFocusedCache { @@ -531,7 +551,7 @@ impl EventFocusedCache { linked_chunk_update_sender: Sender, ) -> Self { Self { - inner: Arc::new(RwLock::new(EventFocusedCacheInner { + inner: Arc::new(RwLock::new(EventFocusedCacheState { room, focused_event_id, pagination_mode: EventFocusedPaginationMode::Room { hide_thread_events: false }, @@ -542,6 +562,16 @@ impl EventFocusedCache { } } + /// Return a reference to the state. + pub(super) fn state(&self) -> &Arc> { + &self.inner + } + + /// Get a reference to the _update sender_. + pub(super) async fn update_sender(&self) -> EventFocusedCacheUpdateSender { + self.inner.read().await.sender.clone() + } + /// Subscribe to updates from this event-focused timeline. pub async fn subscribe(&self) -> (Vec, Receiver) { let inner = self.inner.read().await; @@ -566,11 +596,10 @@ impl EventFocusedCache { /// context events and detecting thread membership. pub(super) async fn start_from( &self, - room: Room, num_context_events: u16, thread_mode: EventFocusThreadMode, ) -> Result { - self.inner.write().await.start_from(room, num_context_events, thread_mode).await + self.inner.write().await.start_from(num_context_events, thread_mode).await } /// Paginate backwards in this event-focused timeline, be it room or thread @@ -612,3 +641,15 @@ impl std::fmt::Debug for EventFocusedCache { f.debug_struct("EventFocusedCache").finish_non_exhaustive() } } + +/// Key for the event-focused caches. +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub(in super::super) struct EventFocusedCacheKey { + /// The event ID that the cache is focused on. + pub focused_event_id: OwnedEventId, + /// The thread mode for this cache. + pub thread_mode: EventFocusThreadMode, +} + +/// A small type to send updates in all channels. +pub type EventFocusedCacheUpdateSender = Sender; diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index a9026d2251f..da59a2e989d 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -42,9 +42,30 @@ pub mod thread; /// A type to hold all the caches for a given room. #[derive(Debug)] pub(super) struct Caches { + /// The one and only [`RoomEventCache`]. + /// + /// [`RoomEventCache`]: room::RoomEventCache pub room: room::RoomEventCache, + + /// All the lazily-loaded [`ThreadEventCache`]. + /// + /// [`ThreadEventCache`]: thread::ThreadEventCache + // An `Arc` is used to get an owned lock. pub threads: Arc>>, + + /// The one and only [`PinnedEventsCache`]. + /// + /// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache pub pinned_events: OnceCell, + + /// All the lazily-loaded [`EventFocusedCache`]. + /// + /// [`EventFocusedCache`]: event_focused::EventFocusedCache + // An `Arc` is used to get an owned lock. + pub event_focused: + Arc>>, + + /// Internals data, used to lazily create caches. internals: CachesInternals, } @@ -127,6 +148,7 @@ impl Caches { room: room_event_cache, threads: Arc::new(RwLock::new(HashMap::new())), pinned_events: OnceCell::new(), + event_focused: Arc::new(RwLock::new(HashMap::new())), internals: CachesInternals { store, linked_chunk_update_sender, room_version_rules }, }) } @@ -201,19 +223,52 @@ impl Caches { }) } - /// Get a [`PinnedEventsCache`] if it has been initialised. + /// Get or create a [`EventFocusedCache`]. /// - /// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache - #[cfg(feature = "e2e-encryption")] - pub(super) fn pinned_events_without_initialisation( + /// [`EventFocusedCache`]: event_focused::EventFocusedCache + pub async fn event_focused( &self, - ) -> Option<&pinned_events::PinnedEventsCache> { - self.pinned_events.get() + event_id: OwnedEventId, + thread_mode: event_focused::EventFocusThreadMode, + number_of_initial_events: u16, + ) -> Result< + OwnedRwLockReadGuard< + HashMap, + event_focused::EventFocusedCache, + >, + > { + let key = event_focused::EventFocusedCacheKey { focused_event_id: event_id, thread_mode }; + + Ok( + match OwnedRwLockWriteGuard::try_downgrade_map( + self.event_focused.clone().write_owned().await, + |event_focused_caches| event_focused_caches.get(&key), + ) { + // Event-focused cache exists. + Ok(locked_cache) => locked_cache, + // Event-focused cache does not exist, let's create it. + Err(mut event_focused_caches) => { + let cache = event_focused::EventFocusedCache::new( + self.room.weak_room().clone(), + key.focused_event_id.clone(), + self.internals.linked_chunk_update_sender.clone(), + ); + cache.start_from(number_of_initial_events, thread_mode).await?; + + event_focused_caches.insert(key.clone(), cache); + + OwnedRwLockWriteGuard::downgrade_map( + event_focused_caches, + |event_focused_caches| event_focused_caches.get(&key).unwrap(), + ) + } + }, + ) } /// Update all the event caches with a [`JoinedRoomUpdate`]. pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { - let Self { room, threads, pinned_events, internals } = &self; + let Self { room, threads, pinned_events, event_focused, internals } = &self; // Room. { @@ -264,12 +319,19 @@ impl Caches { pinned_events.handle_joined_room_update(updates).await?; } + // Event-focused. + { + // An event-focused cache isn't listening to live update. Consequently, it is + // not interested by this kind of update. + let _ = event_focused; + } + Ok(()) } /// Update all the event caches with a [`LeftRoomUpdate`]. pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { - let Self { room, threads, pinned_events, internals } = &self; + let Self { room, threads, pinned_events, event_focused, internals } = &self; // Room. { @@ -320,6 +382,13 @@ impl Caches { pinned_events.handle_left_room_update(updates).await?; } + // Event-focused. + { + // An event-focused cache isn't listening to live update. Consequently, it is + // not interested by this kind of update. + let _ = event_focused; + } + Ok(()) } @@ -334,7 +403,7 @@ impl Caches { ResetCaches::new(self).await } - /// Get all events from all the event caches manged by this [`Cacches`]. + /// Get all events from all the event caches manged by this [`Caches`]. /// /// Events can be duplicated if present in different event caches. #[cfg(feature = "e2e-encryption")] @@ -361,6 +430,13 @@ pub(super) struct ResetCaches<'c> { pinned_events::PinnedEventsCacheStateLockWriteGuard<'c>, pinned_events::PinnedEventsCacheUpdateSender, )>, + event_focused_lock: OwnedRwLockWriteGuard< + HashMap, + >, + event_focused_locks: Vec<( + OwnedRwLockWriteGuard, + event_focused::EventFocusedCacheUpdateSender, + )>, } impl<'c> ResetCaches<'c> { @@ -368,7 +444,7 @@ impl<'c> ResetCaches<'c> { /// /// It can fail if acquiring an exclusive lock fails. async fn new( - Caches { room, threads, pinned_events, internals: _ }: &'c mut Caches, + Caches { room, threads, pinned_events, event_focused, internals: _ }: &'c mut Caches, ) -> Result { // Acquire an exclusive access to the state of the room. let room_lock = (room.state().write().await?, room.update_sender().clone()); @@ -390,7 +466,26 @@ impl<'c> ResetCaches<'c> { None }; - Ok(Self { room_lock, threads_lock, thread_locks, pinned_events_lock }) + // Acquire an exclusive access to the event-focused caches. + // Then, for each event-focused, acquire an exclusive access to its state. + let event_focused_lock = event_focused.clone().write_owned().await; + let mut event_focused_locks = Vec::new(); + + for event_focused in event_focused_lock.values() { + event_focused_locks.push(( + event_focused.state().clone().write_owned().await, + event_focused.update_sender().await, + )); + } + + Ok(Self { + room_lock, + threads_lock, + thread_locks, + pinned_events_lock, + event_focused_lock, + event_focused_locks, + }) } /// Reset all the event caches, and broadcast the [`TimelineVectorDiffs`]. @@ -400,7 +495,14 @@ impl<'c> ResetCaches<'c> { /// /// It can fail if resetting an event cache fails. pub async fn reset_all(self) -> Result<()> { - let Self { room_lock, threads_lock, thread_locks, pinned_events_lock } = self; + let Self { + room_lock, + threads_lock, + thread_locks, + pinned_events_lock, + event_focused_lock, + event_focused_locks, + } = self; // Room. { @@ -418,9 +520,7 @@ impl<'c> ResetCaches<'c> { // Threads. { - for thread_lock in thread_locks { - let (mut thread_state, thread_update_sender) = thread_lock; - + for (mut thread_state, thread_update_sender) in thread_locks { let updates_as_vector_diffs = thread_state.reset().await?; thread_update_sender.send( TimelineVectorDiffs { @@ -449,6 +549,20 @@ impl<'c> ResetCaches<'c> { } } + // Event-focused. + { + for (mut event_focused_state, event_focused_update_sender) in event_focused_locks { + let updates_as_vector_diffs = event_focused_state.reset()?; + let _ = event_focused_update_sender.send(TimelineVectorDiffs { + diffs: updates_as_vector_diffs, + origin: EventsOrigin::Cache, + }); + } + + // Now we can release the exclusive access over the event-focused caches. + drop(event_focused_lock); + } + Ok(()) } } diff --git a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs index 6f91480c715..734896d64a8 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs @@ -37,6 +37,7 @@ use ruma::{ use tokio::sync::{Notify, mpsc}; use tracing::{instrument, trace, warn}; +use self::pagination::RoomPagination; pub(super) use self::state::{ LockedRoomEventCacheState, RoomEventCacheStateLockReadGuard, RoomEventCacheStateLockWriteGuard, }; @@ -48,18 +49,12 @@ pub use self::{ }, }; use super::{ - super::{AutoShrinkChannelPayload, EventCacheError, EventsOrigin, Result, RoomPagination}, + super::{AutoShrinkChannelPayload, EventsOrigin, Result}, TimelineVectorDiffs, event_linked_chunk::sort_positions_descending, + pagination::SharedPaginationStatus, }; -use crate::{ - client::WeakClient, - event_cache::{ - EventFocusThreadMode, - caches::{event_focused::EventFocusedCache, pagination::SharedPaginationStatus}, - }, - room::WeakRoom, -}; +use crate::room::WeakRoom; /// A subset of an event cache, for a room. /// @@ -150,82 +145,6 @@ impl RoomEventCache { Ok((events, subscriber)) } - /// Create or get an event-focused timeline cache for this room. - /// - /// This creates a timeline centered around a specific event (e.g., for - /// permalinks), in a given mode, supporting both forward and backward - /// pagination. - /// - /// If the focused event is part of a thread, the timeline will - /// automatically use thread-specific pagination. - /// - /// If the thread mode is defined to [`EventFocusThreadMode::ForceThread`], - /// the timeline will be focused on the thread root of the thread the - /// target event belongs to, or it will consider that the target event - /// itself is the thread root. - #[instrument(skip(self), fields(room_id = %self.inner.room_id, event_id = %event_id, thread_mode = ?thread_mode))] - pub async fn get_or_create_event_focused_cache( - &self, - event_id: OwnedEventId, - num_context_events: u16, - thread_mode: EventFocusThreadMode, - ) -> Result { - let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?; - let guard = self.inner.state.read().await?; - - // Check if we already have a cache for this event. - if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) { - trace!("the cache was already created, returning it"); - return Ok(cache); - } - - // Create a new cache. - let linked_chunk_update_sender = guard.state.linked_chunk_update_sender.clone(); - - // Make sure to drop the guard before calling `start_from` below, as it may need - // to lock the room event cache's state again, when memoizing events - // received from the network response. - drop(guard); - - let room_id = room.room_id().to_owned(); - let weak_room = WeakRoom::new(WeakClient::from_client(&room.client()), room_id.clone()); - - trace!("creating a fresh event-focused cache"); - let cache = EventFocusedCache::new(weak_room, event_id.clone(), linked_chunk_update_sender); - - // Initialize the cache from the server. - cache.start_from(room, num_context_events, thread_mode).await?; - - let mut guard = self.inner.state.write().await?; - - // Check again if we already have a cache for this event, just in case there was - // a race with another caller during initialization. - if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) { - trace!("another cache has been racily created, returning it"); - return Ok(cache); - } - - // Insert the cache in the map. - guard.insert_event_focused_cache(event_id, thread_mode, cache.clone()); - - Ok(cache) - } - - /// Get an event-focused cache for this event and thread mode, if it exists. - /// - /// Otherwise, returns `None`. - /// - /// Use [`Self::get_or_create_event_focused_cache`] for ensuring such a - /// cache exists. - #[instrument(skip(self), fields(room_id = %self.inner.room_id))] - pub async fn get_event_focused_cache( - &self, - event_id: OwnedEventId, - thread_mode: EventFocusThreadMode, - ) -> Result> { - Ok(self.inner.state.read().await?.get_event_focused_cache(event_id, thread_mode)) - } - /// Return a [`RoomPagination`] type useful for running back-pagination /// queries in the current room. pub fn pagination(&self) -> RoomPagination { diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index f2a2f3facc7..6bfbe755cc5 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{ - collections::HashMap, - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, }; use eyeball::SharedObservable; @@ -62,7 +59,6 @@ use super::{ }, }, EventLocation, TimelineVectorDiffs, - event_focused::{EventFocusThreadMode, EventFocusedCache}, event_linked_chunk::EventLinkedChunk, lock, pagination::SharedPaginationStatus, @@ -73,15 +69,6 @@ use super::{ }; use crate::room::WeakRoom; -/// Key for the event-focused caches. -#[derive(Hash, PartialEq, Eq)] -struct EventFocusedCacheKey { - /// The event ID that the cache is focused on. - focused: OwnedEventId, - /// The thread mode for this cache. - thread_mode: EventFocusThreadMode, -} - pub struct RoomEventCacheState { /// Whether thread support has been enabled for the event cache. enabled_thread_support: bool, @@ -102,13 +89,6 @@ pub struct RoomEventCacheState { /// linked chunk for this room. room_linked_chunk: EventLinkedChunk, - /// Event-focused caches for this room. - /// - /// Keyed by the focused event ID and thread mode. Each entry represents - /// a timeline centered around a specific event (e.g. from a - /// permalink). - event_focused_caches: HashMap, - pagination_status: SharedObservable, /// A clone of [`super::RoomEventCacheInner::update_sender`]. @@ -247,9 +227,6 @@ impl LockedRoomEventCacheState { linked_chunk, full_linked_chunk_metadata, ), - // Event-focused caches are created on-demand when the user navigates to a - // permalink. - event_focused_caches: HashMap::new(), pagination_status, update_sender, linked_chunk_update_sender, @@ -349,18 +326,6 @@ impl<'a> RoomEventCacheStateLockReadGuard<'a> { pub fn is_dirty(&self) -> bool { EventCacheStoreLockGuard::is_dirty(&self.store) } - - /// Get an event-focused cache for this event and thread mode, if it - /// exists. - /// - /// Otherwise, returns `None`. - pub fn get_event_focused_cache( - &self, - event_id: OwnedEventId, - thread_mode: EventFocusThreadMode, - ) -> Option { - get_event_focused_cache(&self.state, event_id, thread_mode) - } } impl<'a> RoomEventCacheStateLockWriteGuard<'a> { @@ -369,12 +334,6 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { &mut self.state.room_linked_chunk } - /// Get a reference to all the live [`event_focused_caches`]. - #[cfg(feature = "e2e-encryption")] - pub fn event_focused_caches(&self) -> impl Iterator { - self.state.event_focused_caches.values() - } - /// Get the `waited_for_initial_prev_token` value. pub fn waited_for_initial_prev_token(&self) -> bool { self.state.waited_for_initial_prev_token @@ -874,29 +833,6 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { pub fn is_dirty(&self) -> bool { EventCacheStoreLockGuard::is_dirty(&self.store) } - - /// Insert an initialized event-focused cache for the given event id. - pub fn insert_event_focused_cache( - &mut self, - event_id: OwnedEventId, - thread_mode: EventFocusThreadMode, - cache: EventFocusedCache, - ) { - let key = EventFocusedCacheKey { focused: event_id, thread_mode }; - self.state.event_focused_caches.insert(key, cache); - } - - /// Get an event-focused cache for this event and thread mode, if it - /// exists. - /// - /// Otherwise, returns `None`. - pub fn get_event_focused_cache( - &self, - event_id: OwnedEventId, - thread_mode: EventFocusThreadMode, - ) -> Option { - get_event_focused_cache(&self.state, event_id, thread_mode) - } } /// Extract a valid read receipt event from the ephemeral events, if @@ -923,18 +859,3 @@ fn extract_read_receipt( receipt_event } - -/// Get an event-focused cache for this event and thread mode, if it exists. -/// -/// Otherwise, returns `None`. -/// -/// Extracted as a separate function to avoid duplicating the implementation for -/// both the read and write guards. -fn get_event_focused_cache( - state: &RoomEventCacheState, - event_id: OwnedEventId, - thread_mode: EventFocusThreadMode, -) -> Option { - let key = EventFocusedCacheKey { focused: event_id, thread_mode }; - state.event_focused_caches.get(&key).cloned() -} diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 14878b4487b..55258736a0b 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -403,16 +403,28 @@ impl EventCache { Ok((caches_for_room.pinned_events()?.clone(), drop_handles)) } - /// Return a pinned-events-specific view over the [`EventCache`] if it has - /// been initialised. - #[cfg(feature = "e2e-encryption")] - async fn pinned_events_without_initialisation( + /// Return an event-focused view over the [`EventCache`]. + pub async fn event_focused( &self, room_id: &RoomId, - ) -> Result> { + event_id: &EventId, + thread_mode: EventFocusThreadMode, + number_of_initial_events: u16, + ) -> Result<(EventFocusedCache, Arc)> { + let Some(drop_handles) = self.inner.drop_handles.get().cloned() else { + return Err(EventCacheError::NotSubscribedYet); + }; + let caches_for_room = self.inner.all_caches_for_room(room_id).await?; - Ok(caches_for_room.pinned_events_without_initialisation().cloned()) + Ok(( + caches_for_room + .event_focused(event_id.to_owned(), thread_mode, number_of_initial_events) + .await? + .deref() + .clone(), + drop_handles, + )) } /// Cleanly clear all the rooms' event caches. diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 0b59579248b..e7c97ae55a5 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -362,26 +362,18 @@ impl EventCache { timer!("Resolving UTDs"); - // Get the cache for this particular room. - let (room_cache, _drop_handles) = self.room(room_id).await?; - let event_ids: BTreeSet<_> = events.iter().cloned().map(|(event_id, _, _)| event_id).collect(); - // Phase 1: under the room state write lock, collect cache handles and - // perform all room-linked-chunk mutations. We deliberately do NOT call - // replace_utds() on event-focused/pinned-evenst caches here to avoid an ABBA - // deadlock: pagination holds an event-focused cache lock and then tries - // to acquire the room state lock (via `save_events`), while this method - // would hold the room state lock and try to acquire event-focused cache - // locks. - let ef_caches = { - let mut state = room_cache.state().write().await?; + let all_caches = self.inner.all_caches_for_room(room_id).await?; - let ef_caches: Vec<_> = state.event_focused_caches().cloned().collect(); + // Resolve on the room cache. + { + let room_cache = &all_caches.room; + let mut state = room_cache.state().write().await?; - // Consider the room linked chunk. let mut new_events = Vec::with_capacity(events.len()); + for (event_id, decrypted, actions) in &events { if let Some((location, mut target_event)) = state.find_event(event_id).await? { target_event.kind = TimelineEventKind::Decrypted(decrypted.clone()); @@ -415,25 +407,29 @@ impl EventCache { }), Some(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() }), ); + } - ef_caches - }; - // Room state write lock is dropped here. - - // Phase 2: replace UTDs in pinned-events and event-focused caches - // WITHOUT holding the room state lock. These caches have their own - // internal locks and don't need the room state lock. - if let Ok(Some(pinned_events_cache)) = - self.pinned_events_without_initialisation(room_id).await - { + // Resolve on the pinned-events cache. + if let Some(pinned_events_cache) = all_caches.pinned_events.get() { pinned_events_cache.replace_utds(&events).await?; } - // TODO: This ain't great for performance; there shouldn't be that many - // event-focused caches alive at the same time, but they could - // accumulate over time. Consider keeping track of which linked chunk - // contain which event id, to avoid doing the linear searches here. - join_all(ef_caches.iter().map(|cache| cache.replace_utds(&events))).await; + // Resolve on the event-focused caches. + { + // TODO: This ain't great for performance; there shouldn't be that many + // event-focused caches alive at the same time, but they could + // accumulate over time. Consider keeping track of which linked chunk + // contain which event id, to avoid doing the linear searches here. + join_all( + all_caches + .event_focused + .read() + .await + .values() + .map(|event_focused_cache| event_focused_cache.replace_utds(&events)), + ) + .await; + } let report = RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids }; @@ -1762,6 +1758,10 @@ mod tests { /// Regression test: the redecryptor must NOT hold the room state write /// lock while calling replace_utds() on event-focused caches, otherwise /// an ABBA deadlock occurs with concurrent event-focused cache pagination. + /// + /// Note that this situation is no longer possible because a refactoring + /// happened after the fix. Anyway, we kept the test because it was nice + /// with us. #[async_test] async fn test_redecryptor_no_deadlock_with_event_focused_cache_pagination() { use crate::{ @@ -1818,12 +1818,8 @@ mod tests { .mount() .await; - let event_focused_cache = room_cache - .get_or_create_event_focused_cache( - focused_event_id.to_owned(), - 20, - EventFocusThreadMode::Automatic, - ) + let (event_focused_cache, _) = event_cache + .event_focused(room_id, focused_event_id, EventFocusThreadMode::Automatic, 20) .await .unwrap(); @@ -1849,10 +1845,6 @@ mod tests { // Send the room key to Bob. // // The redecryptor background task will pick it up and decrypt the UTD. - // Previously, on_resolved_utds would hold the room state write lock - // while calling replace_utds on event-focused caches, creating an ABBA - // deadlock with pagination (which holds event-focused lock and needs - // room state lock via save_events). server .mock_sync() .ok_and_run(&bob, |builder| { @@ -1868,7 +1860,8 @@ mod tests { sleep(Duration::from_secs(1)).await; // Subscribing requires a room state read lock (which would be awaited forever, - // before the fix). + // before the fix; note that the fix is no longer relevant but the test has been + // kept in case of). let (_events, _subscriber) = timeout(room_cache.subscribe(), Duration::from_millis(100)) .await .expect("subscribing shouldn't timeout")