Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions crates/matrix-sdk-ui/src/timeline/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,15 +374,9 @@ impl<P: RoomDataProvider> TimelineController<P> {
TimelineFocus::Event { target, thread_mode, num_context_events, .. } => {
TimelineFocusKind::Event {
event_cache: event_cache
.room(room_id)
.event_focused(room_id, target, (*thread_mode).into(), *num_context_events)
.await?
.0
.get_or_create_event_focused_cache(
target.clone(),
*num_context_events,
(*thread_mode).into(),
)
.await?,
.0,
focused_event_id: target.clone(),
// This will be initialized in `Self::init_focus`.
thread_root: OnceLock::new(),
Expand Down
59 changes: 50 additions & 9 deletions crates/matrix-sdk/src/event_cache/caches/event_focused/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

use std::sync::Arc;

use eyeball_im::VectorDiff;
use matrix_sdk_base::{
deserialized_responses::TimelineEvent,
event_cache::{Event, Gap},
Expand Down Expand Up @@ -90,7 +91,7 @@ pub(crate) enum EventFocusedPaginationMode {
},
}

struct EventFocusedCacheInner {
pub(super) struct EventFocusedCacheState {
/// The room owning this event-focused cache.
room: WeakRoom,

Expand All @@ -104,13 +105,13 @@ struct EventFocusedCacheInner {
chunk: EventLinkedChunk,

/// A sender of timeline updates.
sender: Sender<TimelineVectorDiffs>,
sender: EventFocusedCacheUpdateSender,

/// A sender for globally observable linked chunk updates.
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
}

impl EventFocusedCacheInner {
impl EventFocusedCacheState {
/// Initialize the cache from a focused event.
///
/// This uses `/context` to fetch the event with surrounding context.
Expand All @@ -121,13 +122,14 @@ impl EventFocusedCacheInner {
/// Pagination tokens are stored as gaps in the linked chunk:
/// - Backward token (start): Gap at the front of the linked chunk.
/// - Forward token (end): Gap at the back of the linked chunk.
#[instrument(skip(self, room), fields(room_id = %self.room.room_id(), event_id = %self.focused_event_id))]
#[instrument(skip(self), fields(room_id = %self.room.room_id(), event_id = %self.focused_event_id))]
async fn start_from(
&mut self,
room: Room,
num_context_events: u16,
thread_mode: EventFocusThreadMode,
) -> Result<StartFromResult> {
let room = self.room.get().ok_or(EventCacheError::ClientDropped)?;

trace!(num_context_events, "fetching event with context via /context");

let paginator = Paginator::new(room);
Expand Down Expand Up @@ -502,6 +504,24 @@ impl EventFocusedCacheInner {

Ok((result.chunk, result.next_batch_token))
}

/// Reset this data structure as if it were brand new.
///
/// Return a single diff update that is a clear of all events; as a
/// result, the caller may override any pending diff updates
/// with the result of this function.
pub fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>> {
self.chunk.reset();
self.propagate_changes();

let diff_updates = self.chunk.updates_as_vector_diffs();

// Ensure the contract defined in the doc comment is true:
debug_assert_eq!(diff_updates.len(), 1);
debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));

Ok(diff_updates)
}
}

/// A cache for an event-focused timeline.
Expand All @@ -520,7 +540,7 @@ impl EventFocusedCacheInner {
/// This is a shallow data structure, and can be cloned cheaply.
#[derive(Clone)]
pub struct EventFocusedCache {
inner: Arc<RwLock<EventFocusedCacheInner>>,
inner: Arc<RwLock<EventFocusedCacheState>>,
}

impl EventFocusedCache {
Expand All @@ -531,7 +551,7 @@ impl EventFocusedCache {
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
) -> Self {
Self {
inner: Arc::new(RwLock::new(EventFocusedCacheInner {
inner: Arc::new(RwLock::new(EventFocusedCacheState {
room,
focused_event_id,
pagination_mode: EventFocusedPaginationMode::Room { hide_thread_events: false },
Expand All @@ -542,6 +562,16 @@ impl EventFocusedCache {
}
}

/// Return a reference to the state.
pub(super) fn state(&self) -> &Arc<RwLock<EventFocusedCacheState>> {
&self.inner
}

/// Get a reference to the _update sender_.
pub(super) async fn update_sender(&self) -> EventFocusedCacheUpdateSender {
self.inner.read().await.sender.clone()
}

/// Subscribe to updates from this event-focused timeline.
pub async fn subscribe(&self) -> (Vec<Event>, Receiver<TimelineVectorDiffs>) {
let inner = self.inner.read().await;
Expand All @@ -566,11 +596,10 @@ impl EventFocusedCache {
/// context events and detecting thread membership.
pub(super) async fn start_from(
&self,
room: Room,
num_context_events: u16,
thread_mode: EventFocusThreadMode,
) -> Result<StartFromResult> {
self.inner.write().await.start_from(room, num_context_events, thread_mode).await
self.inner.write().await.start_from(num_context_events, thread_mode).await
}

/// Paginate backwards in this event-focused timeline, be it room or thread
Expand Down Expand Up @@ -612,3 +641,15 @@ impl std::fmt::Debug for EventFocusedCache {
f.debug_struct("EventFocusedCache").finish_non_exhaustive()
}
}

/// Key for the event-focused caches.
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub(in super::super) struct EventFocusedCacheKey {
/// The event ID that the cache is focused on.
pub focused_event_id: OwnedEventId,
/// The thread mode for this cache.
pub thread_mode: EventFocusThreadMode,
}

/// A small type to send updates in all channels.
pub type EventFocusedCacheUpdateSender = Sender<TimelineVectorDiffs>;
144 changes: 129 additions & 15 deletions crates/matrix-sdk/src/event_cache/caches/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,30 @@ pub mod thread;
/// A type to hold all the caches for a given room.
#[derive(Debug)]
pub(super) struct Caches {
/// The one and only [`RoomEventCache`].
Comment thread
Hywan marked this conversation as resolved.
///
/// [`RoomEventCache`]: room::RoomEventCache
pub room: room::RoomEventCache,

/// All the lazily-loaded [`ThreadEventCache`].
///
/// [`ThreadEventCache`]: thread::ThreadEventCache
// An `Arc` is used to get an owned lock.
pub threads: Arc<RwLock<HashMap<OwnedEventId, thread::ThreadEventCache>>>,

/// The one and only [`PinnedEventsCache`].
///
/// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache
pub pinned_events: OnceCell<pinned_events::PinnedEventsCache>,

/// All the lazily-loaded [`EventFocusedCache`].
///
/// [`EventFocusedCache`]: event_focused::EventFocusedCache
// An `Arc` is used to get an owned lock.
pub event_focused:
Arc<RwLock<HashMap<event_focused::EventFocusedCacheKey, event_focused::EventFocusedCache>>>,

/// Internals data, used to lazily create caches.
internals: CachesInternals,
}

Expand Down Expand Up @@ -127,6 +148,7 @@ impl Caches {
room: room_event_cache,
threads: Arc::new(RwLock::new(HashMap::new())),
pinned_events: OnceCell::new(),
event_focused: Arc::new(RwLock::new(HashMap::new())),
internals: CachesInternals { store, linked_chunk_update_sender, room_version_rules },
})
}
Expand Down Expand Up @@ -201,19 +223,52 @@ impl Caches {
})
}

/// Get a [`PinnedEventsCache`] if it has been initialised.
/// Get or create a [`EventFocusedCache`].
///
/// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache
#[cfg(feature = "e2e-encryption")]
pub(super) fn pinned_events_without_initialisation(
/// [`EventFocusedCache`]: event_focused::EventFocusedCache
pub async fn event_focused(
&self,
) -> Option<&pinned_events::PinnedEventsCache> {
self.pinned_events.get()
event_id: OwnedEventId,
thread_mode: event_focused::EventFocusThreadMode,
number_of_initial_events: u16,
) -> Result<
OwnedRwLockReadGuard<
HashMap<event_focused::EventFocusedCacheKey, event_focused::EventFocusedCache>,
event_focused::EventFocusedCache,
>,
> {
let key = event_focused::EventFocusedCacheKey { focused_event_id: event_id, thread_mode };

Ok(
match OwnedRwLockWriteGuard::try_downgrade_map(
self.event_focused.clone().write_owned().await,
|event_focused_caches| event_focused_caches.get(&key),
) {
// Event-focused cache exists.
Ok(locked_cache) => locked_cache,
// Event-focused cache does not exist, let's create it.
Err(mut event_focused_caches) => {
let cache = event_focused::EventFocusedCache::new(
self.room.weak_room().clone(),
key.focused_event_id.clone(),
self.internals.linked_chunk_update_sender.clone(),
);
cache.start_from(number_of_initial_events, thread_mode).await?;

event_focused_caches.insert(key.clone(), cache);

OwnedRwLockWriteGuard::downgrade_map(
event_focused_caches,
|event_focused_caches| event_focused_caches.get(&key).unwrap(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's safe to unwrap here because we just added it above?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct!

)
}
},
)
}

/// Update all the event caches with a [`JoinedRoomUpdate`].
pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
let Self { room, threads, pinned_events, internals } = &self;
let Self { room, threads, pinned_events, event_focused, internals } = &self;

// Room.
{
Expand Down Expand Up @@ -264,12 +319,19 @@ impl Caches {
pinned_events.handle_joined_room_update(updates).await?;
}

// Event-focused.
{
// An event-focused cache isn't listening to live update. Consequently, it is
// not interested by this kind of update.
let _ = event_focused;
}

Ok(())
}

/// Update all the event caches with a [`LeftRoomUpdate`].
pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
let Self { room, threads, pinned_events, internals } = &self;
let Self { room, threads, pinned_events, event_focused, internals } = &self;

// Room.
{
Expand Down Expand Up @@ -320,6 +382,13 @@ impl Caches {
pinned_events.handle_left_room_update(updates).await?;
}

// Event-focused.
{
// An event-focused cache isn't listening to live update. Consequently, it is
// not interested by this kind of update.
let _ = event_focused;
}

Ok(())
}

Expand All @@ -334,7 +403,7 @@ impl Caches {
ResetCaches::new(self).await
}

/// Get all events from all the event caches manged by this [`Cacches`].
/// Get all events from all the event caches manged by this [`Caches`].
///
/// Events can be duplicated if present in different event caches.
#[cfg(feature = "e2e-encryption")]
Expand All @@ -361,14 +430,21 @@ pub(super) struct ResetCaches<'c> {
pinned_events::PinnedEventsCacheStateLockWriteGuard<'c>,
pinned_events::PinnedEventsCacheUpdateSender,
)>,
event_focused_lock: OwnedRwLockWriteGuard<
HashMap<event_focused::EventFocusedCacheKey, event_focused::EventFocusedCache>,
>,
event_focused_locks: Vec<(
OwnedRwLockWriteGuard<event_focused::EventFocusedCacheState>,
event_focused::EventFocusedCacheUpdateSender,
)>,
}

impl<'c> ResetCaches<'c> {
/// Create a new [`ResetCaches`].
///
/// It can fail if acquiring an exclusive lock fails.
async fn new(
Caches { room, threads, pinned_events, internals: _ }: &'c mut Caches,
Caches { room, threads, pinned_events, event_focused, internals: _ }: &'c mut Caches,
) -> Result<Self> {
// Acquire an exclusive access to the state of the room.
let room_lock = (room.state().write().await?, room.update_sender().clone());
Expand All @@ -390,7 +466,26 @@ impl<'c> ResetCaches<'c> {
None
};

Ok(Self { room_lock, threads_lock, thread_locks, pinned_events_lock })
// Acquire an exclusive access to the event-focused caches.
// Then, for each event-focused, acquire an exclusive access to its state.
let event_focused_lock = event_focused.clone().write_owned().await;
let mut event_focused_locks = Vec::new();

for event_focused in event_focused_lock.values() {
event_focused_locks.push((
event_focused.state().clone().write_owned().await,
event_focused.update_sender().await,
));
}

Ok(Self {
room_lock,
threads_lock,
thread_locks,
pinned_events_lock,
event_focused_lock,
event_focused_locks,
})
}

/// Reset all the event caches, and broadcast the [`TimelineVectorDiffs`].
Expand All @@ -400,7 +495,14 @@ impl<'c> ResetCaches<'c> {
///
/// It can fail if resetting an event cache fails.
pub async fn reset_all(self) -> Result<()> {
let Self { room_lock, threads_lock, thread_locks, pinned_events_lock } = self;
let Self {
room_lock,
threads_lock,
thread_locks,
pinned_events_lock,
event_focused_lock,
event_focused_locks,
} = self;

// Room.
{
Expand All @@ -418,9 +520,7 @@ impl<'c> ResetCaches<'c> {

// Threads.
{
for thread_lock in thread_locks {
let (mut thread_state, thread_update_sender) = thread_lock;

for (mut thread_state, thread_update_sender) in thread_locks {
let updates_as_vector_diffs = thread_state.reset().await?;
thread_update_sender.send(
TimelineVectorDiffs {
Expand Down Expand Up @@ -449,6 +549,20 @@ impl<'c> ResetCaches<'c> {
}
}

// Event-focused.
{
for (mut event_focused_state, event_focused_update_sender) in event_focused_locks {
let updates_as_vector_diffs = event_focused_state.reset()?;
let _ = event_focused_update_sender.send(TimelineVectorDiffs {
diffs: updates_as_vector_diffs,
origin: EventsOrigin::Cache,
});
}

// Now we can release the exclusive access over the event-focused caches.
drop(event_focused_lock);
}

Ok(())
}
}
Expand Down
Loading
Loading