From e82d683e6490e37d62f9b19fef70b377e2d3d75e Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 May 2026 14:27:05 +0200 Subject: [PATCH 01/22] chore(sdk): Implement `Debug` for `PinnedEventCache`. --- .../src/event_cache/caches/pinned_events/mod.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 3c3fd1304a6..88bed879f6a 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::BTreeSet, sync::Arc}; +use std::{collections::BTreeSet, fmt, sync::Arc}; use futures_util::{StreamExt as _, stream}; use matrix_sdk_base::{ @@ -77,8 +77,8 @@ impl lock::Store for PinnedEventCacheState { } #[cfg(not(tarpaulin_include))] -impl std::fmt::Debug for PinnedEventCacheState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Debug for PinnedEventCacheState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PinnedEventCacheState") .field("room_id", &self.room_id) .field("chunk", &self.chunk) @@ -554,3 +554,9 @@ impl PinnedEventCache { self.state.write().await?.reload().await } } + +impl fmt::Debug for PinnedEventCache { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PinnedEventCache").finish_non_exhaustive() + } +} From 3bc164f0717aaa9bf465b55fb147dc2c88b555fd Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 May 2026 14:53:15 +0200 Subject: [PATCH 02/22] doc(sdk): Remove a useless `TODO`. --- crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 88bed879f6a..3c6dacc1f58 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -60,7 +60,6 @@ pub(in super::super) struct PinnedEventCacheState { chunk: EventLinkedChunk, /// Reference to the underlying backing store. - // TODO: can be removed? store: EventCacheStoreLock, /// A sender for the globally observable linked chunk updates that happened From 147d35e0a66b55a0bb1a9822d3f54c3b7ed213d8 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 May 2026 15:33:34 +0200 Subject: [PATCH 03/22] refactor(sdk): Rename `PinnedEventCache` to `PinnedEventsCache`. This patch renames `PinnedEventCache` to `PinnedEventsCache` (and same for all types having `PinnedEventCache` as a prefix). Why? Because it's a cache about _pinned-events_, not a single _pinned-event_ :-). --- .../src/timeline/controller/mod.rs | 2 +- .../event_cache/caches/pinned_events/mod.rs | 38 +++++++++---------- .../src/event_cache/caches/room/state.rs | 26 ++++++------- .../matrix-sdk/src/event_cache/redecryptor.rs | 2 +- .../tests/integration/room/pinned_events.rs | 8 ++-- 5 files changed, 38 insertions(+), 38 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index ab875b93960..e1c0c5eab04 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -1463,7 +1463,7 @@ impl TimelineController { .client() .task_monitor() .spawn_infinite_task( - "timeline::pinned_event_cache_updates", + "timeline::pinned_events_cache_updates", pinned_events_task(event_cache.clone(), self.clone(), pinned_events_recv), ) .abort_on_drop(); diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 3c6dacc1f58..8776b0d5fe4 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -44,7 +44,7 @@ use super::{ }; use crate::{Room, client::WeakClient, config::RequestConfig, room::WeakRoom}; -pub(in super::super) struct PinnedEventCacheState { +pub(in super::super) struct PinnedEventsCacheState { /// The ID of the room owning this list of pinned events. room_id: OwnedRoomId, @@ -69,16 +69,16 @@ pub(in super::super) struct PinnedEventCacheState { linked_chunk_update_sender: Sender, } -impl lock::Store for PinnedEventCacheState { +impl lock::Store for PinnedEventsCacheState { fn store(&self) -> &EventCacheStoreLock { &self.store } } #[cfg(not(tarpaulin_include))] -impl fmt::Debug for PinnedEventCacheState { +impl fmt::Debug for PinnedEventsCacheState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PinnedEventCacheState") + f.debug_struct("PinnedEventsCacheState") .field("room_id", &self.room_id) .field("chunk", &self.chunk) .finish_non_exhaustive() @@ -89,12 +89,12 @@ impl fmt::Debug for PinnedEventCacheState { /// /// This contains all the inner mutable states that ought to be updated at /// the same time. -pub type PinnedEventCacheStateLock = lock::StateLock; +pub type PinnedEventsCacheStateLock = lock::StateLock; -pub type PinnedEventCacheStateLockWriteGuard<'a> = - lock::StateLockWriteGuard<'a, PinnedEventCacheState>; +pub type PinnedEventsCacheStateLockWriteGuard<'a> = + lock::StateLockWriteGuard<'a, PinnedEventsCacheState>; -impl<'a> lock::Reload for PinnedEventCacheStateLockWriteGuard<'a> { +impl<'a> lock::Reload for PinnedEventsCacheStateLockWriteGuard<'a> { async fn reload(&mut self) -> Result<()> { self.reload_from_storage().await?; @@ -102,7 +102,7 @@ impl<'a> lock::Reload for PinnedEventCacheStateLockWriteGuard<'a> { } } -impl<'a> PinnedEventCacheStateLockWriteGuard<'a> { +impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { /// Reload all the pinned events from storage, replacing the current linked /// chunk. async fn reload_from_storage(&mut self) -> Result<()> { @@ -190,7 +190,7 @@ impl<'a> PinnedEventCacheStateLockWriteGuard<'a> { } } -impl PinnedEventCacheState { +impl PinnedEventsCacheState { /// Return a list of the current event IDs in this linked chunk. fn current_event_ids(&self) -> Vec { self.chunk.events().filter_map(|(_position, event)| event.event_id()).collect() @@ -201,16 +201,16 @@ impl PinnedEventCacheState { /// /// This is cheap to clone, because it's a shallow data type. #[derive(Clone)] -pub struct PinnedEventCache { - state: Arc, +pub struct PinnedEventsCache { + state: Arc, /// The task handling the refreshing of pinned events for this specific /// room. _task: Arc, } -impl PinnedEventCache { - /// Creates a new [`PinnedEventCache`] for the given room. +impl PinnedEventsCache { + /// Creates a new [`PinnedEventsCache`] for the given room. pub(in super::super) fn new( room: Room, linked_chunk_update_sender: Sender, @@ -223,8 +223,8 @@ impl PinnedEventCache { let chunk = EventLinkedChunk::new(); let state = - PinnedEventCacheState { room_id, chunk, sender, linked_chunk_update_sender, store }; - let state = Arc::new(PinnedEventCacheStateLock::new_inner(state)); + PinnedEventsCacheState { room_id, chunk, sender, linked_chunk_update_sender, store }; + let state = Arc::new(PinnedEventsCacheStateLock::new_inner(state)); let task = Arc::new( room.client() @@ -356,7 +356,7 @@ impl PinnedEventCache { } #[instrument(fields(%room_id = room.room_id()), skip(room, state))] - async fn pinned_event_listener_task(room: Room, state: Arc) { + async fn pinned_event_listener_task(room: Room, state: Arc) { debug!("pinned events listener task started"); let reload_from_network = async |room: Room| { @@ -554,8 +554,8 @@ impl PinnedEventCache { } } -impl fmt::Debug for PinnedEventCache { +impl fmt::Debug for PinnedEventsCache { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PinnedEventCache").finish_non_exhaustive() + f.debug_struct("PinnedEventsCache").finish_non_exhaustive() } } diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index 1bd2d073b2d..a1d1a953dd4 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -66,7 +66,7 @@ use super::{ event_linked_chunk::EventLinkedChunk, lock, pagination::SharedPaginationStatus, - pinned_events::PinnedEventCache, + pinned_events::PinnedEventsCache, read_receipts::compute_unread_counts, }, EventsOrigin, RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, @@ -111,7 +111,7 @@ pub struct RoomEventCacheState { event_focused_caches: HashMap, /// Cache for pinned events in this room, initialized on-demand. - pinned_event_cache: OnceLock, + pinned_events_cache: OnceLock, pagination_status: SharedObservable, @@ -260,7 +260,7 @@ impl LockedRoomEventCacheState { room_version_rules, waited_for_initial_prev_token: false, subscriber_count: Default::default(), - pinned_event_cache: OnceLock::new(), + pinned_events_cache: OnceLock::new(), automatic_pagination, })) } @@ -282,8 +282,8 @@ impl<'a> lock::Reload for RoomEventCacheStateLockWriteGuard<'a> { self.shrink_to_last_chunk().await?; // Reload the pinned-events. - if let Some(pinned_event_cache) = self.pinned_event_cache.get_mut() { - pinned_event_cache.reload().await?; + if let Some(pinned_events_cache) = self.pinned_events_cache.get_mut() { + pinned_events_cache.reload().await?; } let diffs = self.state.room_linked_chunk.updates_as_vector_diffs(); @@ -375,15 +375,15 @@ impl<'a> RoomEventCacheStateLockReadGuard<'a> { &self, room: Room, ) -> Result<(Vec, Receiver), EventCacheError> { - let pinned_event_cache = self.state.pinned_event_cache.get_or_init(|| { - PinnedEventCache::new( + let pinned_events_cache = self.state.pinned_events_cache.get_or_init(|| { + PinnedEventsCache::new( room, self.state.linked_chunk_update_sender.clone(), self.state.store.clone(), ) }); - pinned_event_cache.subscribe().await + pinned_events_cache.subscribe().await } /// Get an event-focused cache for this event and thread mode, if it @@ -405,11 +405,11 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { &mut self.state.room_linked_chunk } - /// Get a reference to the [`pinned_event_cache`] if it has been + /// Get a reference to the [`pinned_events_cache`] if it has been /// initialized. #[cfg(any(feature = "e2e-encryption", test))] - pub fn pinned_event_cache(&self) -> Option<&PinnedEventCache> { - self.state.pinned_event_cache.get() + pub fn pinned_events_cache(&self) -> Option<&PinnedEventsCache> { + self.state.pinned_events_cache.get() } /// Get a reference to all the live [`event_focused_caches`]. @@ -741,8 +741,8 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { // below. let state = &mut *self.state; - if let Some(pinned_event_cache) = state.pinned_event_cache.get_mut() { - pinned_event_cache + if let Some(pinned_events_cache) = state.pinned_events_cache.get_mut() { + pinned_events_cache .maybe_add_live_related_events(&events, &state.room_version_rules.redaction) .await?; } diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 17d62c18738..650eba9720b 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -377,7 +377,7 @@ impl EventCache { let (pinned_cache, ef_caches) = { let mut state = room_cache.state().write().await?; - let pinned_cache = state.pinned_event_cache().cloned(); + let pinned_cache = state.pinned_events_cache().cloned(); let ef_caches: Vec<_> = state.event_focused_caches().cloned().collect(); // Consider the room linked chunk. diff --git a/crates/matrix-sdk/tests/integration/room/pinned_events.rs b/crates/matrix-sdk/tests/integration/room/pinned_events.rs index 69b6d41a3a6..9f9d2ce9815 100644 --- a/crates/matrix-sdk/tests/integration/room/pinned_events.rs +++ b/crates/matrix-sdk/tests/integration/room/pinned_events.rs @@ -191,7 +191,7 @@ async fn test_pinned_events_are_loaded_from_network_then_are_reloaded_from_stora // Get the room event cache and subscribe to pinned events. let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventCache::new() which + // Subscribe to pinned events - this triggers PinnedEventsCache::new() which // spawns a task that calls reload_from_storage() first. let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); let mut events = events.into(); @@ -237,7 +237,7 @@ async fn test_pinned_events_are_loaded_from_network_then_are_reloaded_from_stora // Get the room event cache and subscribe to pinned events. let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventCache::new() which + // Subscribe to pinned events - this triggers PinnedEventsCache::new() which // spawns a task that calls reload_from_storage() first. let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); let mut events = events.into(); @@ -313,7 +313,7 @@ async fn test_pinned_events_are_reloaded_from_storage_from_many_chunks() { // Get the room event cache and subscribe to pinned events. let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventCache::new() which + // Subscribe to pinned events - this triggers PinnedEventsCache::new() which // spawns a task that calls reload_from_storage() first. let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); let mut events = events.into(); @@ -389,7 +389,7 @@ async fn test_pinned_events_dont_include_thread_responses() { // Get the room event cache and subscribe to pinned events. let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventCache::new() which + // Subscribe to pinned events - this triggers PinnedEventsCache::new() which // spawns a task that calls reload_from_storage() first. let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); let mut events = events.into(); From c1aad243f9b1236a7c26e7dad11575946a9cc281 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 May 2026 16:16:46 +0200 Subject: [PATCH 04/22] chore(sdk): `PinnedEventsCache::new` takes a `WeakRoom` instead of a `Room`. This patch changes `PinnedEventsCache::new` to receive a `WeakRoom` instead of a `Room`. It then returns a `Result`, with `Err` if the `WeakRoom` doesn't point to the `Room` anymore. Thus, this patch changes `get_or_init` by `get_or_try_init`, but this one is unstable. So this patch switches from `OnceLock` to `OnceCell`, which provides a stable one. Why this change? Because in a later refactoring, providing a `Room` is a bit annoying: all we will have is a `WeakRoom`. We could do this work of upgrading the `WeakRoom` to a `Room`, but it seems akward as all caches are holding a `WeakRoom` if room is necessary. --- Cargo.lock | 1 + Cargo.toml | 1 + crates/matrix-sdk/Cargo.toml | 1 + .../src/event_cache/caches/pinned_events/mod.rs | 11 ++++++----- .../src/event_cache/caches/room/mod.rs | 3 +-- .../src/event_cache/caches/room/state.rs | 17 +++++++++-------- 6 files changed, 19 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9cc5cf992b..23037388f12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3222,6 +3222,7 @@ dependencies = [ "mime2ext", "oauth2", "oauth2-reqwest", + "once_cell", "percent-encoding", "pin-project-lite", "proptest", diff --git a/Cargo.toml b/Cargo.toml index aea2a91c70c..e95cd3e3044 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ js-sys = { version = "0.3.82", default-features = false, features = ["std"] } mime = { version = "0.3.17", default-features = false } oauth2 = { version = "5.0.0", default-features = false, features = ["timing-resistant-secret-traits"] } oauth2-reqwest = { version = "0.1.0-alpha.3", default-features = false } +once_cell = { version = "1.21", default-features = false } pbkdf2 = { version = "0.12.2", default-features = false } pin-project-lite = { version = "0.2.16", default-features = false } proc-macro2 = { version = "1.0.106", default-features = false } diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index 97e4ea36afe..8bdc1cd10f4 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -128,6 +128,7 @@ mime.workspace = true mime2ext = "0.1.54" oauth2.workspace = true oauth2-reqwest.workspace = true +once_cell.workspace = true percent-encoding = "2.3.2" pin-project-lite.workspace = true rand = { workspace = true, optional = true } diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 8776b0d5fe4..f2acdd65895 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -212,14 +212,15 @@ pub struct PinnedEventsCache { impl PinnedEventsCache { /// Creates a new [`PinnedEventsCache`] for the given room. pub(in super::super) fn new( - room: Room, + weak_room: &WeakRoom, linked_chunk_update_sender: Sender, store: EventCacheStoreLock, - ) -> Self { - let sender = Sender::new(32); - + ) -> Result { + let room = weak_room.get().ok_or(EventCacheError::ClientDropped)?; let room_id = room.room_id().to_owned(); + let sender = Sender::new(32); + let chunk = EventLinkedChunk::new(); let state = @@ -236,7 +237,7 @@ impl PinnedEventsCache { .abort_on_drop(), ); - Self { state, _task: task } + Ok(Self { state, _task: task }) } /// Subscribe to live events from this room's pinned events cache. diff --git a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs index 9eb26092123..d6450e74946 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs @@ -162,10 +162,9 @@ impl RoomEventCache { pub async fn subscribe_to_pinned_events( &self, ) -> Result<(Vec, Receiver)> { - let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?; let state = self.inner.state.read().await?; - state.subscribe_to_pinned_events(room).await + state.subscribe_to_pinned_events(&self.inner.weak_room).await } /// Create or get an event-focused timeline cache for this room. diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index a1d1a953dd4..1a1b8183f47 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -15,7 +15,7 @@ use std::{ collections::HashMap, sync::{ - Arc, OnceLock, + Arc, atomic::{AtomicUsize, Ordering}, }, }; @@ -36,6 +36,7 @@ use matrix_sdk_base::{ sync::Timeline, }; use matrix_sdk_common::executor::spawn; +use once_cell::sync::OnceCell; use ruma::{ EventId, OwnedEventId, OwnedRoomId, OwnedUserId, events::{ @@ -72,7 +73,7 @@ use super::{ EventsOrigin, RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, RoomEventCacheUpdateSender, sort_positions_descending, }; -use crate::{Room, room::WeakRoom}; +use crate::room::WeakRoom; /// Key for the event-focused caches. #[derive(Hash, PartialEq, Eq)] @@ -111,7 +112,7 @@ pub struct RoomEventCacheState { event_focused_caches: HashMap, /// Cache for pinned events in this room, initialized on-demand. - pinned_events_cache: OnceLock, + pinned_events_cache: OnceCell, pagination_status: SharedObservable, @@ -260,7 +261,7 @@ impl LockedRoomEventCacheState { room_version_rules, waited_for_initial_prev_token: false, subscriber_count: Default::default(), - pinned_events_cache: OnceLock::new(), + pinned_events_cache: OnceCell::new(), automatic_pagination, })) } @@ -373,15 +374,15 @@ impl<'a> RoomEventCacheStateLockReadGuard<'a> { /// This requires the room's event cache to be initialized. pub async fn subscribe_to_pinned_events( &self, - room: Room, + weak_room: &WeakRoom, ) -> Result<(Vec, Receiver), EventCacheError> { - let pinned_events_cache = self.state.pinned_events_cache.get_or_init(|| { + let pinned_events_cache = self.state.pinned_events_cache.get_or_try_init(|| { PinnedEventsCache::new( - room, + weak_room, self.state.linked_chunk_update_sender.clone(), self.state.store.clone(), ) - }); + })?; pinned_events_cache.subscribe().await } From 83e2bcd76313ac97f30f5e2f6cc3ac11774097bd Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 May 2026 16:45:51 +0200 Subject: [PATCH 05/22] refactor(sdk): Rename `PinnedEventsCacheStateLock` to `LockedPinnedEventsCacheState`. This patch renames `PinnedEventsCacheStateLock` to `LockedPinnedEventsCacheState` to match other namings in `RoomEventCache` and `ThreadEventCache` for the sake of consistency. --- .../src/event_cache/caches/pinned_events/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index f2acdd65895..83301bfbf41 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -89,7 +89,7 @@ impl fmt::Debug for PinnedEventsCacheState { /// /// This contains all the inner mutable states that ought to be updated at /// the same time. -pub type PinnedEventsCacheStateLock = lock::StateLock; +pub type LockedPinnedEventsCacheState = lock::StateLock; pub type PinnedEventsCacheStateLockWriteGuard<'a> = lock::StateLockWriteGuard<'a, PinnedEventsCacheState>; @@ -202,7 +202,7 @@ impl PinnedEventsCacheState { /// This is cheap to clone, because it's a shallow data type. #[derive(Clone)] pub struct PinnedEventsCache { - state: Arc, + state: Arc, /// The task handling the refreshing of pinned events for this specific /// room. @@ -225,7 +225,7 @@ impl PinnedEventsCache { let state = PinnedEventsCacheState { room_id, chunk, sender, linked_chunk_update_sender, store }; - let state = Arc::new(PinnedEventsCacheStateLock::new_inner(state)); + let state = Arc::new(LockedPinnedEventsCacheState::new_inner(state)); let task = Arc::new( room.client() @@ -357,7 +357,7 @@ impl PinnedEventsCache { } #[instrument(fields(%room_id = room.room_id()), skip(room, state))] - async fn pinned_event_listener_task(room: Room, state: Arc) { + async fn pinned_event_listener_task(room: Room, state: Arc) { debug!("pinned events listener task started"); let reload_from_network = async |room: Room| { From a6d4e875128045d9560eb80414b4a2fc0a0b9a77 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 May 2026 16:50:19 +0200 Subject: [PATCH 06/22] doc(sdk): Fix an intra-link. --- crates/matrix-sdk/src/event_cache/caches/thread/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index 5151085a019..06f7ed559da 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -164,7 +164,7 @@ impl ThreadEventCache { &self.inner.state } - /// Get a reference to the [`RoomEventCacheUpdateSender`]. + /// Get a reference to the [`ThreadEventCacheUpdateSender`]. pub(in super::super) fn update_sender(&self) -> &ThreadEventCacheUpdateSender { &self.inner.update_sender } From 54037fbfc6eb965563adeeb9bcec353295c51ef7 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 May 2026 16:57:18 +0200 Subject: [PATCH 07/22] refactor(sdk): Introduce `PinnedEventsCacheUpdateSender`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch introduces the new type `PinnedEventsCacheUpdateSender` _à la_ `ThreadEventCacheUpdateSender` or `RoomEventCacheUpdateSender`. It abstracts the channels to send updates about. Yes, for the moment, it has a single channel, but it's better to provide the same abstractions for all caches. --- .../event_cache/caches/pinned_events/mod.rs | 20 +++++++--- .../caches/pinned_events/updates.rs | 40 +++++++++++++++++++ 2 files changed, 54 insertions(+), 6 deletions(-) create mode 100644 crates/matrix-sdk/src/event_cache/caches/pinned_events/updates.rs diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 83301bfbf41..b3f4077d257 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod updates; + use std::{collections::BTreeSet, fmt, sync::Arc}; use futures_util::{StreamExt as _, stream}; @@ -32,6 +34,7 @@ use ruma::{ use tokio::sync::broadcast::{Receiver, Sender}; use tracing::{debug, instrument, trace, warn}; +pub(super) use self::updates::PinnedEventsCacheUpdateSender; #[cfg(feature = "e2e-encryption")] use super::super::redecryptor::ResolvedUtd; use super::{ @@ -49,7 +52,7 @@ pub(in super::super) struct PinnedEventsCacheState { room_id: OwnedRoomId, /// A sender for live events updates in this room's pinned events list. - sender: Sender, + update_sender: PinnedEventsCacheUpdateSender, /// The linked chunk representing this room's pinned events. /// @@ -185,7 +188,7 @@ impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { fn notify_subscribers(&mut self, origin: EventsOrigin) { let diffs = self.state.chunk.updates_as_vector_diffs(); if !diffs.is_empty() { - let _ = self.state.sender.send(TimelineVectorDiffs { diffs, origin }); + self.state.update_sender.send(TimelineVectorDiffs { diffs, origin }); } } } @@ -219,12 +222,17 @@ impl PinnedEventsCache { let room = weak_room.get().ok_or(EventCacheError::ClientDropped)?; let room_id = room.room_id().to_owned(); - let sender = Sender::new(32); + let update_sender = PinnedEventsCacheUpdateSender::new(); let chunk = EventLinkedChunk::new(); - let state = - PinnedEventsCacheState { room_id, chunk, sender, linked_chunk_update_sender, store }; + let state = PinnedEventsCacheState { + room_id, + chunk, + update_sender, + linked_chunk_update_sender, + store, + }; let state = Arc::new(LockedPinnedEventsCacheState::new_inner(state)); let task = Arc::new( @@ -245,7 +253,7 @@ impl PinnedEventsCache { let guard = self.state.read().await?; let events = guard.state.chunk.events().map(|(_position, item)| item.clone()).collect(); - let recv = guard.state.sender.subscribe(); + let recv = guard.state.update_sender.new_pinned_events_receiver(); Ok((events, recv)) } diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/updates.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/updates.rs new file mode 100644 index 00000000000..8feb576a14c --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/updates.rs @@ -0,0 +1,40 @@ +// Copyright 2026 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tokio::sync::broadcast::{Receiver, Sender}; + +use super::super::TimelineVectorDiffs; + +/// A small type to send updates in all channels. +#[derive(Clone)] +pub struct PinnedEventsCacheUpdateSender { + thread_sender: Sender, +} + +impl PinnedEventsCacheUpdateSender { + /// Create a new [`PinnedEventsCacheUpdateSender`]. + pub fn new() -> Self { + Self { thread_sender: Sender::new(32) } + } + + /// Send a [`TimelineVectorDiffs`]. + pub fn send(&self, thread_update: TimelineVectorDiffs) { + let _ = self.thread_sender.send(thread_update); + } + + /// Create a new [`Receiver`] of [`TimelineVectorDiffs`]. + pub(super) fn new_pinned_events_receiver(&self) -> Receiver { + self.thread_sender.subscribe() + } +} From 1c4c67bf1c816115a924cce54039394d0763b9c4 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 May 2026 17:00:26 +0200 Subject: [PATCH 08/22] doc(sdk): Fix document in thread cache. --- crates/matrix-sdk/src/event_cache/caches/thread/mod.rs | 4 ++-- crates/matrix-sdk/src/event_cache/caches/thread/state.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index 06f7ed559da..f2837458e6e 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -54,7 +54,7 @@ pub struct ThreadEventCache { inner: Arc, } -/// The (non-cloneable) details of the `RoomEventCache`. +/// The (non-cloneable) details of the `ThreadEventCache`. struct ThreadEventCacheInner { /// The room ID. room_id: OwnedRoomId, @@ -71,7 +71,7 @@ struct ThreadEventCacheInner { /// A notifier that we received a new pagination token. pagination_batch_token_notifier: Notify, - /// Update sender for this room. + /// Update sender for this thread. update_sender: ThreadEventCacheUpdateSender, } diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs index a77ca7d671c..bdb7e7fc9ec 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs @@ -75,8 +75,8 @@ pub struct ThreadEventCacheState { /// A clone of [`super::ThreadEventCacheInner::update_sender`]. /// - /// This is used only by the [`ThreadEventCacheStateLock::read`] and - /// [`ThreadEventCacheStateLock::write`] when the state must be reset. + /// This is used only by the [`LockedThreadEventCacheState::read`] and + /// [`LockedThreadEventCacheState::write`] when the state must be reset. update_sender: ThreadEventCacheUpdateSender, /// A sender for the globally observable linked chunk updates that happened From a9b2462d48d5668ec6eba8c03f6c1e91238f02f3 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 May 2026 17:06:04 +0200 Subject: [PATCH 09/22] refactor(sdk): Introduce `PinnedEventsCacheInner`. This patch introduces the `PinnedEventsCacheInner` type that is used inside `PinnedEventsCache` to make it cheap to clone. It was already the case before with all the fields beind `Arc<_>` but we are about to move data in their correct place, and thus it will add more `Arc<_>`, which is not good. Let's adopt the same patterns as the other caches too for the sake of consistency. --- .../event_cache/caches/pinned_events/mod.rs | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index b3f4077d257..145ba94ec61 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -200,16 +200,24 @@ impl PinnedEventsCacheState { } } -/// All the information related to a room's pinned events cache. +/// All the information related to room's pinned events.. /// -/// This is cheap to clone, because it's a shallow data type. +/// Cloning is shallow, and thus is cheap to do. #[derive(Clone)] pub struct PinnedEventsCache { + inner: Arc, +} + +/// The (non-cloneable) details of the `PinnedEventsCache`. +struct PinnedEventsCacheInner { + /// State of this `PinnedEventsCache`. + /// + /// It is behind an `Arc` because it is shared with the task. state: Arc, /// The task handling the refreshing of pinned events for this specific /// room. - _task: Arc, + _task: BackgroundTaskHandle, } impl PinnedEventsCache { @@ -235,22 +243,21 @@ impl PinnedEventsCache { }; let state = Arc::new(LockedPinnedEventsCacheState::new_inner(state)); - let task = Arc::new( - room.client() - .task_monitor() - .spawn_infinite_task( - "pinned_event_listener_task", - Self::pinned_event_listener_task(room, state.clone()), - ) - .abort_on_drop(), - ); - - Ok(Self { state, _task: task }) + let task = room + .client() + .task_monitor() + .spawn_infinite_task( + "pinned_event_listener_task", + Self::pinned_event_listener_task(room, state.clone()), + ) + .abort_on_drop(); + + Ok(Self { inner: Arc::new(PinnedEventsCacheInner { state, _task: task }) }) } /// Subscribe to live events from this room's pinned events cache. pub async fn subscribe(&self) -> Result<(Vec, Receiver)> { - let guard = self.state.read().await?; + let guard = self.inner.state.read().await?; let events = guard.state.chunk.events().map(|(_position, item)| item.clone()).collect(); let recv = guard.state.update_sender.new_pinned_events_receiver(); @@ -263,7 +270,7 @@ impl PinnedEventsCache { /// about the update. #[cfg(feature = "e2e-encryption")] pub(in crate::event_cache) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> { - let mut guard = self.state.write().await?; + let mut guard = self.inner.state.write().await?; if guard.state.chunk.replace_utds(events) { guard.propagate_changes().await?; @@ -319,7 +326,7 @@ impl PinnedEventsCache { room_redaction_rules: &RedactionRules, ) -> Result<()> { trace!("checking live events for relations to pinned events"); - let mut guard = self.state.write().await?; + let mut guard = self.inner.state.write().await?; let pinned_event_ids: BTreeSet = guard.state.current_event_ids().into_iter().collect(); @@ -559,7 +566,7 @@ impl PinnedEventsCache { // TODO(@hywan): Temporary fix. All the states must be in a single struct behind // the cross-process lock instead of being dispatched in each cache. pub(super) async fn reload(&self) -> Result<()> { - self.state.write().await?.reload().await + self.inner.state.write().await?.reload().await } } From c77fee4d5f311c46930005a53b413ff2881df6f8 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 May 2026 17:14:30 +0200 Subject: [PATCH 10/22] feat(sdk): Add `PinnedEventsCacheInner::update_sender`. This patch adds `PinnedEventsCacheInner::update_sender`, making `PinnedEventsCacheState::update_sender` a clone of the former. This is going to be useful in the next commit for handling pinned-events in `ResetCaches`. --- .../event_cache/caches/pinned_events/mod.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 145ba94ec61..a732bce7ed0 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -51,9 +51,6 @@ pub(in super::super) struct PinnedEventsCacheState { /// The ID of the room owning this list of pinned events. room_id: OwnedRoomId, - /// A sender for live events updates in this room's pinned events list. - update_sender: PinnedEventsCacheUpdateSender, - /// The linked chunk representing this room's pinned events. /// /// This linked chunk also contains related events. The events are sorted in @@ -65,6 +62,12 @@ pub(in super::super) struct PinnedEventsCacheState { /// Reference to the underlying backing store. store: EventCacheStoreLock, + /// A clone of [`PinnedEventsCacheInner::update_sender`]. + /// + /// This is used only by the [`LockedPinnedEventsCacheState::read`] and + /// [`LockedPinnedEventsCacheState::write`] when the state must be reset. + update_sender: PinnedEventsCacheUpdateSender, + /// A sender for the globally observable linked chunk updates that happened /// during a sync or a back-pagination. /// @@ -187,8 +190,9 @@ impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { /// Notify subscribers of timeline updates. fn notify_subscribers(&mut self, origin: EventsOrigin) { let diffs = self.state.chunk.updates_as_vector_diffs(); + if !diffs.is_empty() { - self.state.update_sender.send(TimelineVectorDiffs { diffs, origin }); + self.update_sender.send(TimelineVectorDiffs { diffs, origin }); } } } @@ -215,6 +219,9 @@ struct PinnedEventsCacheInner { /// It is behind an `Arc` because it is shared with the task. state: Arc, + /// Update sender for this pinned events cache. + update_sender: PinnedEventsCacheUpdateSender, + /// The task handling the refreshing of pinned events for this specific /// room. _task: BackgroundTaskHandle, @@ -237,7 +244,7 @@ impl PinnedEventsCache { let state = PinnedEventsCacheState { room_id, chunk, - update_sender, + update_sender: update_sender.clone(), linked_chunk_update_sender, store, }; @@ -252,7 +259,7 @@ impl PinnedEventsCache { ) .abort_on_drop(); - Ok(Self { inner: Arc::new(PinnedEventsCacheInner { state, _task: task }) }) + Ok(Self { inner: Arc::new(PinnedEventsCacheInner { state, update_sender, _task: task }) }) } /// Subscribe to live events from this room's pinned events cache. From d333a48cddad39ba434ee45eaa6f51fe579a6e72 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 May 2026 17:41:34 +0200 Subject: [PATCH 11/22] feat(sdk): Add `EventCache::pinned_events`. This patch adds `EventCache::pinned_events` to get the `PinnedEventsCache`. This patch adds `Caches::pinned_events`. `ResetCaches` also handle the pinned-events cache. To make it works, this patch adds `PinnedEventsCache::state`, `PinnedEventsCache::update_sender` and `PinnedEventsCacheStateLockWriteGuard::reset`. This one is new as the feature wasn't implemented before! --- .../matrix-sdk/src/event_cache/caches/mod.rs | 67 +++++++++++++++++-- .../event_cache/caches/pinned_events/mod.rs | 34 ++++++++++ crates/matrix-sdk/src/event_cache/mod.rs | 14 ++++ 3 files changed, 110 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 94d8f17ff91..2dbdf34e01a 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -22,6 +22,7 @@ use matrix_sdk_base::{ linked_chunk::Position, sync::{JoinedRoomUpdate, LeftRoomUpdate}, }; +use once_cell::sync::OnceCell; use ruma::{OwnedEventId, OwnedRoomId, RoomId, room_version_rules::RoomVersionRules}; use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, broadcast::Sender, mpsc}; @@ -43,6 +44,7 @@ pub mod thread; pub(super) struct Caches { pub room: room::RoomEventCache, pub threads: Arc>>, + pub pinned_events: OnceCell, internals: CachesInternals, } @@ -124,6 +126,7 @@ impl Caches { Ok(Self { room: room_event_cache, threads: Arc::new(RwLock::new(HashMap::new())), + pinned_events: OnceCell::new(), internals: CachesInternals { store, linked_chunk_update_sender, room_version_rules }, }) } @@ -183,9 +186,22 @@ impl Caches { ) } + /// Get or create a [`PinnedEventsCache`]. + /// + /// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache + pub fn pinned_events(&self) -> Result<&pinned_events::PinnedEventsCache> { + self.pinned_events.get_or_try_init(|| { + pinned_events::PinnedEventsCache::new( + self.room.weak_room(), + self.internals.linked_chunk_update_sender.clone(), + self.internals.store.clone(), + ) + }) + } + /// Update all the event caches with a [`JoinedRoomUpdate`]. pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { - let Self { room, threads, internals } = &self; + let Self { room, threads, pinned_events, internals } = &self; // Room. { @@ -224,12 +240,19 @@ impl Caches { } } + // Pinned-events. + { + if let Some(pinned_events) = pinned_events.get() { + todo!() + } + } + Ok(()) } /// Update all the event caches with a [`LeftRoomUpdate`]. pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { - let Self { room, threads, internals } = &self; + let Self { room, threads, pinned_events, internals } = &self; // Room. { @@ -268,6 +291,13 @@ impl Caches { } } + // Pinned-events. + { + if let Some(pinned_events) = pinned_events.get() { + todo!() + } + } + Ok(()) } @@ -305,13 +335,19 @@ pub(super) struct ResetCaches<'c> { thread::OwnedThreadEventCacheStateLockWriteGuard, thread::ThreadEventCacheUpdateSender, )>, + pinned_events_lock: Option<( + pinned_events::PinnedEventsCacheStateLockWriteGuard<'c>, + pinned_events::PinnedEventsCacheUpdateSender, + )>, } impl<'c> ResetCaches<'c> { /// Create a new [`ResetCaches`]. /// /// It can fail if acquiring an exclusive lock fails. - async fn new(Caches { room, threads, internals: _ }: &'c mut Caches) -> Result { + async fn new( + Caches { room, threads, pinned_events, internals: _ }: &'c mut Caches, + ) -> Result { // Acquire an exclusive access to the state of the room. let room_lock = (room.state().write().await?, room.update_sender().clone()); @@ -325,7 +361,14 @@ impl<'c> ResetCaches<'c> { .push((thread.state().write_owned().await?, thread.update_sender().clone())); } - Ok(Self { room_lock, threads_lock, thread_locks }) + // Acquire an exclusive access to the pinned-events if any. + let pinned_events_lock = if let Some(pinned_events) = pinned_events.get_mut() { + Some((pinned_events.state().write().await?, pinned_events.update_sender().clone())) + } else { + None + }; + + Ok(Self { room_lock, threads_lock, thread_locks, pinned_events_lock }) } /// Reset all the event caches, and broadcast the [`TimelineVectorDiffs`]. @@ -335,8 +378,9 @@ impl<'c> ResetCaches<'c> { /// /// It can fail if resetting an event cache fails. pub async fn reset_all(self) -> Result<()> { - let Self { room_lock, threads_lock, thread_locks } = self; + let Self { room_lock, threads_lock, thread_locks, pinned_events_lock } = self; + // Room. { let (mut room_state, room_update_sender) = room_lock; @@ -350,6 +394,7 @@ impl<'c> ResetCaches<'c> { ); } + // Threads. { for thread_lock in thread_locks { let (mut thread_state, thread_update_sender) = thread_lock; @@ -370,6 +415,18 @@ impl<'c> ResetCaches<'c> { drop(threads_lock); } + // Pinned-events. + { + if let Some((mut pinned_events_state, pinned_events_update_sender)) = pinned_events_lock + { + let updates_as_vector_diffs = pinned_events_state.reset().await?; + pinned_events_update_sender.send(TimelineVectorDiffs { + diffs: updates_as_vector_diffs, + origin: EventsOrigin::Cache, + }); + } + } + Ok(()) } } diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index a732bce7ed0..9b3b772f56f 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -16,6 +16,7 @@ mod updates; use std::{collections::BTreeSet, fmt, sync::Arc}; +use eyeball_im::VectorDiff; use futures_util::{StreamExt as _, stream}; use matrix_sdk_base::{ event_cache::{Event, store::EventCacheStoreLock}, @@ -109,6 +110,29 @@ impl<'a> lock::Reload for PinnedEventsCacheStateLockWriteGuard<'a> { } impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { + /// Reset this data structure as if it were brand new. + /// + /// However, pinned-events are not only stored here. They are also coming at + /// a state-event containing all the pinned-events ID list in the `Room`. + /// This is a best-effort here: we are resetting the events stored in this + /// cache only. + /// + /// Return a single diff update that is a clear of all events; as a + /// result, the caller may override any pending diff updates + /// with the result of this function. + pub async fn reset(&mut self) -> Result>> { + self.state.chunk.reset(); + self.propagate_changes().await?; + + let diff_updates = self.state.chunk.updates_as_vector_diffs(); + + // Ensure the contract defined in the doc comment is true: + debug_assert_eq!(diff_updates.len(), 1); + debug_assert!(matches!(diff_updates[0], VectorDiff::Clear)); + + Ok(diff_updates) + } + /// Reload all the pinned events from storage, replacing the current linked /// chunk. async fn reload_from_storage(&mut self) -> Result<()> { @@ -262,6 +286,16 @@ impl PinnedEventsCache { Ok(Self { inner: Arc::new(PinnedEventsCacheInner { state, update_sender, _task: task }) }) } + /// Return a reference to the state. + pub(super) fn state(&self) -> &LockedPinnedEventsCacheState { + &self.inner.state + } + + /// Get a reference to the [`RoomEventCacheUpdateSender`]. + pub(in super::super) fn update_sender(&self) -> &PinnedEventsCacheUpdateSender { + &self.inner.update_sender + } + /// Subscribe to live events from this room's pinned events cache. pub async fn subscribe(&self) -> Result<(Vec, Receiver)> { let guard = self.inner.state.read().await?; diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index c94c96ae3c4..d1a5ac27297 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -75,6 +75,7 @@ pub use self::{ TimelineVectorDiffs, event_focused::{EventFocusThreadMode, EventFocusedCache}, pagination::{BackPaginationOutcome, PaginationStatus}, + pinned_events::PinnedEventsCache, room::{ RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheSubscriber, RoomEventCacheUpdate, pagination::RoomPagination, @@ -388,6 +389,19 @@ impl EventCache { Ok((caches_for_room.thread(thread_id.to_owned()).await?.deref().clone(), drop_handles)) } + /// Return a pinned-events-specific view over the [`EventCache`]. + pub async fn pinned_events( + &self, + room_id: &RoomId, + ) -> Result<(PinnedEventsCache, Arc)> { + let Some(drop_handles) = self.inner.drop_handles.get().cloned() else { + return Err(EventCacheError::NotSubscribedYet); + }; + + let caches_for_room = self.inner.all_caches_for_room(room_id).await?; + + Ok((caches_for_room.pinned_events()?.clone(), drop_handles)) + } /// Cleanly clear all the rooms' event caches. /// /// This will notify any live observers that the room has been cleared. From d370992ad064d7687c01dd1e416dd6137bf53abc Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 13 May 2026 11:03:47 +0200 Subject: [PATCH 12/22] feat(sdk): Add `aggregator_timeline_for_pinned_events`. This patch adds the pipeline for `PinnedEventsCache` via `aggregator_timeline_for_pinned_events`. --- .../src/event_cache/caches/aggregator.rs | 54 +++++++++++++++++++ .../matrix-sdk/src/event_cache/caches/mod.rs | 22 +++++--- .../event_cache/caches/pinned_events/mod.rs | 2 +- 3 files changed, 69 insertions(+), 9 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/aggregator.rs b/crates/matrix-sdk/src/event_cache/caches/aggregator.rs index 59c93e75205..238668d08fd 100644 --- a/crates/matrix-sdk/src/event_cache/caches/aggregator.rs +++ b/crates/matrix-sdk/src/event_cache/caches/aggregator.rs @@ -129,3 +129,57 @@ pub async fn aggregate_timeline_for_threads( Ok(new_events_by_thread) } + +pub fn aggregate_timeline_for_pinned_events( + timeline: &Timeline, + pinned_event_ids: &[OwnedEventId], + redaction_rules: &RedactionRules, +) -> Timeline { + let mut new_timeline = Timeline { + limited: timeline.limited, + prev_batch: timeline.prev_batch.clone(), + events: Vec::new(), + }; + + // No events are pinned? The `Timeline` must be empty. + if pinned_event_ids.is_empty() { + return new_timeline; + } + + // Look for events that relate to pinned events. We already know the + // pinned-events, we don't need to look for them. We are only interested by + // related events. + for event in &timeline.events { + match extract_relation(event.raw()) { + // Ohh, this event relates to another event! + Some((relation_type, related_event_id)) => match relation_type { + // `event` relates to a thread: not what we want. + RelationType::Thread => {} + + // `event` represents an annotation (e.g. reactions), a replacement (an edit), a + // reference or something custom. Let's see if the `related_event_id` is a + // pinned-event. + RelationType::Annotation + | RelationType::Replacement + | RelationType::Reference + | _ => { + if pinned_event_ids.contains(&related_event_id) { + new_timeline.events.push(event.clone()); + } + } + }, + + // No explicit relation, but it can be a redaction of a pinned-event! + None => { + if let Some(redaction_target) = + extract_redaction_target(event.raw(), redaction_rules) + && pinned_event_ids.contains(&redaction_target) + { + new_timeline.events.push(event.clone()); + } + } + } + } + + new_timeline +} diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 2dbdf34e01a..10cbf4062e3 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -241,10 +241,13 @@ impl Caches { } // Pinned-events. - { - if let Some(pinned_events) = pinned_events.get() { - todo!() - } + if let Some(pinned_events) = pinned_events.get() { + let timeline_for_pinned_events = aggregator::aggregate_timeline_for_pinned_events( + &updates.timeline, + &pinned_events.state().read().await?.current_event_ids(), + &internals.room_version_rules.redaction, + ); + todo!() } Ok(()) @@ -292,10 +295,13 @@ impl Caches { } // Pinned-events. - { - if let Some(pinned_events) = pinned_events.get() { - todo!() - } + if let Some(pinned_events) = pinned_events.get() { + let timeline_for_pinned_events = aggregator::aggregate_timeline_for_pinned_events( + &updates.timeline, + &pinned_events.state().read().await?.current_event_ids(), + &internals.room_version_rules.redaction, + ); + todo!() } Ok(()) diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 9b3b772f56f..d831a8d2434 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -223,7 +223,7 @@ impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { impl PinnedEventsCacheState { /// Return a list of the current event IDs in this linked chunk. - fn current_event_ids(&self) -> Vec { + pub(super) fn current_event_ids(&self) -> Vec { self.chunk.events().filter_map(|(_position, event)| event.event_id()).collect() } } From 709e18493e319acb525d482b57cf060d05c024cf Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 May 2026 15:24:16 +0200 Subject: [PATCH 13/22] refactor(sdk): Handle the `Timeline` for pinned-events in `Caches` instead of `RoomEventCache`. This patch moves `maybe_add_live_related_events` to `handle_timeline`. `handle_timeline` is called by `handle_(joined|left)_room_update`, which are themselves called by `Caches`. This patch also removes other methods used by `maybe_add_live_related_events` but are no longer useful since we have the aggregator, namely `extract_relation_target` and `extract_redaction_target`. Finally, this patch removes the call to `maybe_add_live_related_events` in `RoomEventCacheStateLockWriteGuard::post_process_new_events`, which removes the need to acquire a write lock over the room' state here. --- .../matrix-sdk/src/event_cache/caches/mod.rs | 12 +- .../event_cache/caches/pinned_events/mod.rs | 123 +++++------------- .../src/event_cache/caches/room/state.rs | 10 -- 3 files changed, 43 insertions(+), 102 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 10cbf4062e3..9100d71be04 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -242,12 +242,14 @@ impl Caches { // Pinned-events. if let Some(pinned_events) = pinned_events.get() { - let timeline_for_pinned_events = aggregator::aggregate_timeline_for_pinned_events( + let mut updates = updates.clone(); + updates.timeline = aggregator::aggregate_timeline_for_pinned_events( &updates.timeline, &pinned_events.state().read().await?.current_event_ids(), &internals.room_version_rules.redaction, ); - todo!() + + pinned_events.handle_joined_room_update(updates).await?; } Ok(()) @@ -296,12 +298,14 @@ impl Caches { // Pinned-events. if let Some(pinned_events) = pinned_events.get() { - let timeline_for_pinned_events = aggregator::aggregate_timeline_for_pinned_events( + let mut updates = updates.clone(); + updates.timeline = aggregator::aggregate_timeline_for_pinned_events( &updates.timeline, &pinned_events.state().read().await?.current_event_ids(), &internals.room_version_rules.redaction, ); - todo!() + + pinned_events.handle_left_room_update(updates).await?; } Ok(()) diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index d831a8d2434..4dbb75d5248 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -21,17 +21,10 @@ use futures_util::{StreamExt as _, stream}; use matrix_sdk_base::{ event_cache::{Event, store::EventCacheStoreLock}, linked_chunk::{LinkedChunkId, OwnedLinkedChunkId}, - serde_helpers::extract_relation, + sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, task_monitor::BackgroundTaskHandle, }; -use ruma::{ - MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, - events::{ - AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType, relation::RelationType, - }, - room_version_rules::RedactionRules, - serde::Raw, -}; +use ruma::{MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, events::relation::RelationType}; use tokio::sync::broadcast::{Receiver, Sender}; use tracing::{debug, instrument, trace, warn}; @@ -133,6 +126,16 @@ impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { Ok(diff_updates) } + async fn handle_sync(&mut self, timeline: Timeline) -> Result<()> { + // We've found new relations; append them to the linked chunk. + self.state.chunk.push_live_events(None, &timeline.events); + + self.propagate_changes().await?; + self.notify_subscribers(EventsOrigin::Sync); + + Ok(()) + } + /// Reload all the pinned events from storage, replacing the current linked /// chunk. async fn reload_from_storage(&mut self) -> Result<()> { @@ -238,6 +241,9 @@ pub struct PinnedEventsCache { /// The (non-cloneable) details of the `PinnedEventsCache`. struct PinnedEventsCacheInner { + /// The ID of the room owning this list of pinned events. + room_id: OwnedRoomId, + /// State of this `PinnedEventsCache`. /// /// It is behind an `Arc` because it is shared with the task. @@ -266,7 +272,7 @@ impl PinnedEventsCache { let chunk = EventLinkedChunk::new(); let state = PinnedEventsCacheState { - room_id, + room_id: room_id.clone(), chunk, update_sender: update_sender.clone(), linked_chunk_update_sender, @@ -283,7 +289,9 @@ impl PinnedEventsCache { ) .abort_on_drop(); - Ok(Self { inner: Arc::new(PinnedEventsCacheInner { state, update_sender, _task: task }) }) + Ok(Self { + inner: Arc::new(PinnedEventsCacheInner { room_id, state, update_sender, _task: task }), + }) } /// Return a reference to the state. @@ -321,93 +329,32 @@ impl PinnedEventsCache { Ok(()) } - /// Given a raw event, try to extract the target event ID of a relation as - /// defined with `m.relates_to`. - fn extract_relation_target(raw: &Raw) -> Option { - let (rel_type, event_id) = extract_relation(raw)?; + /// Handle a [`JoinedRoomUpdate`]. + #[instrument(skip_all, fields(room_id = %self.inner.room_id))] + pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { + self.handle_timeline(updates.timeline).await?; - // Don't include thread responses in the pinned event chunk. - match rel_type { - RelationType::Thread => None, - _ => Some(event_id), - } + Ok(()) } - /// Given a raw event, try to extract the target event ID of a live - /// redaction. - fn extract_redaction_target( - raw: &Raw, - room_redaction_rules: &RedactionRules, - ) -> Option { - // Try to find a redaction, but do not deserialize the entire event if we aren't - // certain it's a `m.room.redaction`. - if raw.get_field::("type").ok()?? - != MessageLikeEventType::RoomRedaction - { - return None; - } - - let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(redaction)) = - raw.deserialize().ok()? - else { - return None; - }; + /// Handle a [`LeftRoomUpdate`]. + #[instrument(skip_all, fields(room_id = %self.inner.room_id))] + pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { + self.handle_timeline(updates.timeline).await?; - redaction.redacts(room_redaction_rules).map(ToOwned::to_owned).or_else(|| { - warn!("missing target event id from the redaction event"); - None - }) + Ok(()) } - /// Check if any of the given events relate to an event in the pinned events - /// linked chunk, and append it, in this case. - pub(in super::super) async fn maybe_add_live_related_events( - &mut self, - events: &[Event], - room_redaction_rules: &RedactionRules, - ) -> Result<()> { - trace!("checking live events for relations to pinned events"); - let mut guard = self.inner.state.write().await?; - - let pinned_event_ids: BTreeSet = - guard.state.current_event_ids().into_iter().collect(); - - if pinned_event_ids.is_empty() { + /// Handle a [`Timeline`], i.e. new events received by a sync for this + /// thread. + async fn handle_timeline(&self, timeline: Timeline) -> Result<()> { + if timeline.events.is_empty() { return Ok(()); } - let mut new_relations = Vec::new(); - - // For all events that relate to an event in the pinned events chunk, push this - // event to the linked chunk, and propagate changes to observers. - for ev in events { - // Try to find a regular relation in ev. - if let Some(relation_target) = Self::extract_relation_target(ev.raw()) - && pinned_event_ids.contains(&relation_target) - { - new_relations.push(ev.clone()); - continue; - } - - // Try to find a redaction in ev. - if let Some(redaction_target) = - Self::extract_redaction_target(ev.raw(), room_redaction_rules) - && pinned_event_ids.contains(&redaction_target) - { - new_relations.push(ev.clone()); - continue; - } - } - - if !new_relations.is_empty() { - trace!("found {} new related events to pinned events", new_relations.len()); - - // We've found new relations; append them to the linked chunk. - guard.state.chunk.push_live_events(None, &new_relations); + trace!("adding new {} events", timeline.events.len()); - guard.propagate_changes().await?; - guard.notify_subscribers(EventsOrigin::Sync); - } + self.inner.state.write().await?.handle_sync(timeline).await?; Ok(()) } diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index 1a1b8183f47..0affae2061d 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -738,16 +738,6 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { // Update the store before doing the post-processing. self.propagate_changes().await?; - // Need an explicit re-borrow to avoid a deref vs deref-mut borrowck conflict - // below. - let state = &mut *self.state; - - if let Some(pinned_events_cache) = state.pinned_events_cache.get_mut() { - pinned_events_cache - .maybe_add_live_related_events(&events, &state.room_version_rules.redaction) - .await?; - } - for event in events { self.maybe_apply_new_redaction(&event).await?; From a571e745278cb5e43f83f552df2e7b83bf5a432b Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 May 2026 16:00:54 +0200 Subject: [PATCH 14/22] feat(sdk): Deduplicate pinned events. This patch runs the `filter_duplicate_events` function inside `handle_sync` to deduplicate events related to pinned events. --- .../matrix-sdk/src/event_cache/caches/mod.rs | 1 + .../event_cache/caches/pinned_events/mod.rs | 116 ++++++++++++++++-- .../src/event_cache/caches/room/state.rs | 1 + 3 files changed, 111 insertions(+), 7 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 9100d71be04..6b974378704 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -193,6 +193,7 @@ impl Caches { self.pinned_events.get_or_try_init(|| { pinned_events::PinnedEventsCache::new( self.room.weak_room(), + self.room.own_user_id().clone(), self.internals.linked_chunk_update_sender.clone(), self.internals.store.clone(), ) diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 4dbb75d5248..4f59091ffbd 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -19,12 +19,15 @@ use std::{collections::BTreeSet, fmt, sync::Arc}; use eyeball_im::VectorDiff; use futures_util::{StreamExt as _, stream}; use matrix_sdk_base::{ - event_cache::{Event, store::EventCacheStoreLock}, - linked_chunk::{LinkedChunkId, OwnedLinkedChunkId}, + event_cache::{Event, Gap, store::EventCacheStoreLock}, + linked_chunk::{LinkedChunkId, OwnedLinkedChunkId, Position, Update}, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, task_monitor::BackgroundTaskHandle, }; -use ruma::{MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, events::relation::RelationType}; +use ruma::{ + MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, + events::relation::RelationType, +}; use tokio::sync::broadcast::{Receiver, Sender}; use tracing::{debug, instrument, trace, warn}; @@ -39,12 +42,24 @@ use super::{ lock::Reload as _, room::RoomEventCacheLinkedChunkUpdate, }; -use crate::{Room, client::WeakClient, config::RequestConfig, room::WeakRoom}; +use crate::{ + Room, + client::WeakClient, + config::RequestConfig, + event_cache::{ + caches::event_linked_chunk::sort_positions_descending, + deduplicator::{DeduplicationOutcome, filter_duplicate_events}, + }, + room::WeakRoom, +}; pub(in super::super) struct PinnedEventsCacheState { /// The ID of the room owning this list of pinned events. room_id: OwnedRoomId, + /// The user's own user id. + own_user_id: OwnedUserId, + /// The linked chunk representing this room's pinned events. /// /// This linked chunk also contains related events. The events are sorted in @@ -127,8 +142,34 @@ impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { } async fn handle_sync(&mut self, timeline: Timeline) -> Result<()> { + let DeduplicationOutcome { + all_events: events, + in_memory_duplicated_event_ids, + in_store_duplicated_event_ids, + non_empty_all_duplicates: all_duplicates, + } = filter_duplicate_events( + &self.state.own_user_id, + &self.store, + LinkedChunkId::PinnedEvents(&self.state.room_id), + &self.state.chunk, + timeline.events, + ) + .await?; + + if all_duplicates { + // If all events are duplicates, we don't need to do anything; ignore + // the new events. + return Ok(()); + } + + // Remove the old duplicated events. + // + // We don't have to worry about the removals can change the position of the + // existing events, because we are pushing all _new_ `events` at the back. + self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids).await?; + // We've found new relations; append them to the linked chunk. - self.state.chunk.push_live_events(None, &timeline.events); + self.state.chunk.push_live_events(None, &events); self.propagate_changes().await?; self.notify_subscribers(EventsOrigin::Sync); @@ -136,6 +177,59 @@ impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { Ok(()) } + /// Remove events by their position, in `EventLinkedChunk`. + /// + /// This method is purposely isolated because it must ensure that + /// positions are sorted appropriately or it can be disastrous. + #[instrument(skip_all)] + pub async fn remove_events( + &mut self, + in_memory_events: Vec<(OwnedEventId, Position)>, + in_store_events: Vec<(OwnedEventId, Position)>, + ) -> Result<()> { + // In-store events. + if !in_store_events.is_empty() { + let mut positions = in_store_events + .into_iter() + .map(|(_event_id, position)| position) + .collect::>(); + + sort_positions_descending(&mut positions); + + let updates = + positions.into_iter().map(|pos| Update::RemoveItem { at: pos }).collect::>(); + + self.apply_store_only_updates(updates).await?; + } + + // In-memory events. + if in_memory_events.is_empty() { + // Nothing else to do, return early. + return Ok(()); + } + + // `remove_events_by_position` is responsible of sorting positions. + self.state + .chunk + .remove_events_by_position( + in_memory_events.into_iter().map(|(_event_id, position)| position).collect(), + ) + .expect("failed to remove an event"); + + self.propagate_changes().await + } + + /// Apply some updates that are effective only on the store itself. + /// + /// This method should be used only for updates that happen *outside* + /// the in-memory linked chunk. Such updates must be applied + /// onto the ordering tracker as well as to the persistent + /// storage. + async fn apply_store_only_updates(&mut self, updates: Vec>) -> Result<()> { + self.state.chunk.order_tracker.map_updates(&updates); + self.send_updates_to_store(updates).await + } + /// Reload all the pinned events from storage, replacing the current linked /// chunk. async fn reload_from_storage(&mut self) -> Result<()> { @@ -202,9 +296,15 @@ impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { /// Propagate the changes in this linked chunk to observers, and save the /// changes on disk. - async fn propagate_changes(&mut self) -> Result<()> { + pub async fn propagate_changes(&mut self) -> Result<()> { let updates = self.state.chunk.store_updates().take(); - let linked_chunk_id = OwnedLinkedChunkId::PinnedEvents(self.state.room_id.clone()); + + self.send_updates_to_store(updates).await + } + + async fn send_updates_to_store(&mut self, updates: Vec>) -> Result<()> { + let linked_chunk_id = OwnedLinkedChunkId::PinnedEvents(self.room_id.clone()); + send_updates_to_store( &self.store, linked_chunk_id, @@ -261,6 +361,7 @@ impl PinnedEventsCache { /// Creates a new [`PinnedEventsCache`] for the given room. pub(in super::super) fn new( weak_room: &WeakRoom, + own_user_id: OwnedUserId, linked_chunk_update_sender: Sender, store: EventCacheStoreLock, ) -> Result { @@ -273,6 +374,7 @@ impl PinnedEventsCache { let state = PinnedEventsCacheState { room_id: room_id.clone(), + own_user_id, chunk, update_sender: update_sender.clone(), linked_chunk_update_sender, diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index 0affae2061d..f01ece020b8 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -379,6 +379,7 @@ impl<'a> RoomEventCacheStateLockReadGuard<'a> { let pinned_events_cache = self.state.pinned_events_cache.get_or_try_init(|| { PinnedEventsCache::new( weak_room, + self.own_user_id.clone(), self.state.linked_chunk_update_sender.clone(), self.state.store.clone(), ) From b5ea61b19191b3d815ddda66427568a7d6c92dc0 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 May 2026 16:16:49 +0200 Subject: [PATCH 15/22] feat(sdk): `PinnedEventsCache` handles redaction. This patch applies event redactions over pinned events. --- .../matrix-sdk/src/event_cache/caches/mod.rs | 1 + .../event_cache/caches/pinned_events/mod.rs | 122 +++++++++++++++++- .../src/event_cache/caches/room/state.rs | 1 + 3 files changed, 121 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 6b974378704..0ea9c0ab1e7 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -194,6 +194,7 @@ impl Caches { pinned_events::PinnedEventsCache::new( self.room.weak_room(), self.room.own_user_id().clone(), + self.internals.room_version_rules.clone(), self.internals.linked_chunk_update_sender.clone(), self.internals.store.clone(), ) diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 4f59091ffbd..299e3e6b9d8 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -19,14 +19,18 @@ use std::{collections::BTreeSet, fmt, sync::Arc}; use eyeball_im::VectorDiff; use futures_util::{StreamExt as _, stream}; use matrix_sdk_base::{ + apply_redaction, event_cache::{Event, Gap, store::EventCacheStoreLock}, linked_chunk::{LinkedChunkId, OwnedLinkedChunkId, Position, Update}, + serde_helpers::extract_redaction_target, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, task_monitor::BackgroundTaskHandle, }; +use matrix_sdk_common::executor::spawn; use ruma::{ - MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, - events::relation::RelationType, + EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, + events::{relation::RelationType, room::redaction::SyncRoomRedactionEvent}, + room_version_rules::RoomVersionRules, }; use tokio::sync::broadcast::{Receiver, Sender}; use tracing::{debug, instrument, trace, warn}; @@ -47,8 +51,9 @@ use crate::{ client::WeakClient, config::RequestConfig, event_cache::{ - caches::event_linked_chunk::sort_positions_descending, + caches::{EventLocation, event_linked_chunk::sort_positions_descending}, deduplicator::{DeduplicationOutcome, filter_duplicate_events}, + persistence::find_event, }, room::WeakRoom, }; @@ -60,6 +65,9 @@ pub(in super::super) struct PinnedEventsCacheState { /// The user's own user id. own_user_id: OwnedUserId, + /// The rules for the version of this room. + room_version_rules: RoomVersionRules, + /// The linked chunk representing this room's pinned events. /// /// This linked chunk also contains related events. The events are sorted in @@ -174,6 +182,12 @@ impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { self.propagate_changes().await?; self.notify_subscribers(EventsOrigin::Sync); + // Do stuff for each event. + for event in &events { + // Handle redaction. + self.maybe_apply_new_redaction(event).await?; + } + Ok(()) } @@ -230,6 +244,106 @@ impl<'a> PinnedEventsCacheStateLockWriteGuard<'a> { self.send_updates_to_store(updates).await } + /// If the given event is a redaction, try to retrieve the + /// to-be-redacted event in the chunk, and replace it by the + /// redacted form. + #[instrument(skip_all)] + async fn maybe_apply_new_redaction(&mut self, event: &Event) -> Result<()> { + let Some(event_id) = + extract_redaction_target(event.raw(), &self.room_version_rules.redaction) + else { + return Ok(()); + }; + + // Replace the redacted event by a redacted form, if we knew about it. + let Some((location, mut target_event)) = self.find_event(&event_id).await? else { + trace!("redacted event is missing from the linked chunk"); + return Ok(()); + }; + + let target_event_raw = target_event.raw(); + + // Don't redact already redacted events. + if let Ok(deserialized) = target_event_raw.deserialize() + && deserialized.is_redacted() + { + return Ok(()); + } + + if let Some(redacted_event) = apply_redaction( + target_event_raw, + event.raw().cast_ref_unchecked::(), + &self.room_version_rules.redaction, + ) { + // It's safe to cast `redacted_event` here: + // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent` + // when calling .raw(), so it's still one under the hood. + // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case. + target_event.replace_raw(redacted_event.cast_unchecked()); + + self.replace_event_at(location, target_event.clone()).await?; + } + + Ok(()) + } + + /// See documentation of [`find_event`]. + pub(super) async fn find_event( + &self, + event_id: &EventId, + ) -> Result> { + find_event(event_id, &self.room_id, &self.chunk, &self.store).await + } + + /// Replaces a single event, be it saved in memory or in the store. + /// + /// If it was saved in memory, this will emit a notification to + /// observers that a single item has been replaced. Otherwise, + /// such a notification is not emitted, because observers are + /// unlikely to observe the store updates directly. + pub async fn replace_event_at( + &mut self, + location: EventLocation, + new_event: Event, + ) -> Result<()> { + match location { + EventLocation::Memory(position) => { + self.state + .chunk + .replace_event_at(position, new_event) + .expect("should have been a valid position of an item"); + // We just changed the in-memory representation; synchronize this with + // the store. + self.propagate_changes().await?; + } + EventLocation::Store => { + self.save_events([new_event]).await?; + } + } + + Ok(()) + } + + /// Save events into the database, without notifying observers. + pub async fn save_events(&mut self, events: impl IntoIterator) -> Result<()> { + let store = self.store.clone(); + let room_id = self.state.room_id.clone(); + let events = events.into_iter().collect::>(); + + // Spawn a task so the save is uninterrupted by task cancellation. + spawn(async move { + for event in events { + store.save_event(&room_id, event).await?; + } + + Result::Ok(()) + }) + .await + .expect("joining failed")?; + + Ok(()) + } + /// Reload all the pinned events from storage, replacing the current linked /// chunk. async fn reload_from_storage(&mut self) -> Result<()> { @@ -362,6 +476,7 @@ impl PinnedEventsCache { pub(in super::super) fn new( weak_room: &WeakRoom, own_user_id: OwnedUserId, + room_version_rules: RoomVersionRules, linked_chunk_update_sender: Sender, store: EventCacheStoreLock, ) -> Result { @@ -375,6 +490,7 @@ impl PinnedEventsCache { let state = PinnedEventsCacheState { room_id: room_id.clone(), own_user_id, + room_version_rules, chunk, update_sender: update_sender.clone(), linked_chunk_update_sender, diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index f01ece020b8..a611f1457fc 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -380,6 +380,7 @@ impl<'a> RoomEventCacheStateLockReadGuard<'a> { PinnedEventsCache::new( weak_room, self.own_user_id.clone(), + self.room_version_rules.clone(), self.state.linked_chunk_update_sender.clone(), self.state.store.clone(), ) From 849713abf6e40689a17d1d8d62e8ee442a46f930 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 May 2026 17:20:40 +0200 Subject: [PATCH 16/22] chore(sdk): Simplify imports. --- .../event_cache/caches/pinned_events/mod.rs | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 299e3e6b9d8..07c5dab51ed 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -39,24 +39,18 @@ pub(super) use self::updates::PinnedEventsCacheUpdateSender; #[cfg(feature = "e2e-encryption")] use super::super::redecryptor::ResolvedUtd; use super::{ - super::{EventCacheError, EventsOrigin, Result, persistence::send_updates_to_store}, - TimelineVectorDiffs, - event_linked_chunk::EventLinkedChunk, + super::{ + EventCacheError, EventsOrigin, Result, + deduplicator::{DeduplicationOutcome, filter_duplicate_events}, + persistence::{find_event, send_updates_to_store}, + }, + EventLocation, TimelineVectorDiffs, + event_linked_chunk::{EventLinkedChunk, sort_positions_descending}, lock, lock::Reload as _, room::RoomEventCacheLinkedChunkUpdate, }; -use crate::{ - Room, - client::WeakClient, - config::RequestConfig, - event_cache::{ - caches::{EventLocation, event_linked_chunk::sort_positions_descending}, - deduplicator::{DeduplicationOutcome, filter_duplicate_events}, - persistence::find_event, - }, - room::WeakRoom, -}; +use crate::{Room, client::WeakClient, config::RequestConfig, room::WeakRoom}; pub(in super::super) struct PinnedEventsCacheState { /// The ID of the room owning this list of pinned events. From 009621ad9241e8e44a62f244311743d5342bb5c1 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 May 2026 17:45:11 +0200 Subject: [PATCH 17/22] feat(ui): `TimelineFocusKind::PinnedEvents` now uses `PinnedEventsCache`. This patch updates `TimelineFocusKind::PinnedEvents { event_cache }` to use a `PinnedEventsCache` instead of a `RoomEventCache`. --- .../matrix-sdk-ui/src/timeline/controller/mod.rs | 15 +++++++-------- crates/matrix-sdk-ui/src/timeline/tasks.rs | 7 +++---- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index e1c0c5eab04..bb1ae9b3f68 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -27,8 +27,8 @@ use imbl::Vector; use matrix_sdk::{ deserialized_responses::TimelineEvent, event_cache::{ - DecryptionRetryRequest, EventCache, EventFocusedCache, PaginationStatus, RoomEventCache, - ThreadEventCache, TimelineVectorDiffs, + DecryptionRetryRequest, EventCache, EventFocusedCache, PaginationStatus, PinnedEventsCache, + RoomEventCache, ThreadEventCache, TimelineVectorDiffs, }, send_queue::{ LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle, @@ -148,7 +148,7 @@ pub(in crate::timeline) enum TimelineFocusKind { PinnedEvents { /// The cache holding all the events for this focus. - event_cache: RoomEventCache, + event_cache: PinnedEventsCache, }, } @@ -395,9 +395,9 @@ impl TimelineController

{ root_event_id: root_event_id.clone(), }, - TimelineFocus::PinnedEvents => { - TimelineFocusKind::PinnedEvents { event_cache: event_cache.room(room_id).await?.0 } - } + TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents { + event_cache: event_cache.pinned_events(room_id).await?.0, + }, }; let focus = Arc::new(focus); @@ -1447,8 +1447,7 @@ impl TimelineController { } TimelineFocusKind::PinnedEvents { event_cache } => { - let (initial_events, pinned_events_recv) = - event_cache.subscribe_to_pinned_events().await?; + let (initial_events, pinned_events_recv) = event_cache.subscribe().await?; let has_events = !initial_events.is_empty(); diff --git a/crates/matrix-sdk-ui/src/timeline/tasks.rs b/crates/matrix-sdk-ui/src/timeline/tasks.rs index 5f411933b18..490170800b4 100644 --- a/crates/matrix-sdk-ui/src/timeline/tasks.rs +++ b/crates/matrix-sdk-ui/src/timeline/tasks.rs @@ -18,7 +18,7 @@ use std::collections::BTreeSet; use matrix_sdk::{ event_cache::{ - EventFocusThreadMode, EventFocusedCache, EventsOrigin, RoomEventCache, + EventFocusThreadMode, EventFocusedCache, EventsOrigin, PinnedEventsCache, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate, ThreadEventCache, TimelineVectorDiffs, }, send_queue::RoomSendQueueUpdate, @@ -38,7 +38,7 @@ use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEvent ) )] pub(in crate::timeline) async fn pinned_events_task( - room_event_cache: RoomEventCache, + pinned_events_cache: PinnedEventsCache, timeline_controller: TimelineController, mut pinned_events_recv: Receiver, ) { @@ -54,8 +54,7 @@ pub(in crate::timeline) async fn pinned_events_task( // The updates might have lagged, but the room event cache might have // events, so retrieve them and add them back again to the timeline, // after clearing it. - let (initial_events, _) = match room_event_cache.subscribe_to_pinned_events().await - { + let (initial_events, _) = match pinned_events_cache.subscribe().await { Ok(initial_events) => initial_events, Err(err) => { error!( From 95a027fb369b90431d47c4f1561b763cd0fb5512 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 May 2026 17:46:50 +0200 Subject: [PATCH 18/22] chore(sdk): Remove `subscribe_to_pinned_events`. This patch removes `subscribe_to_pinned_events`, it is now useless. --- .../src/event_cache/caches/room/mod.rs | 19 +----------- .../src/event_cache/caches/room/state.rs | 30 +------------------ 2 files changed, 2 insertions(+), 47 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs index d6450e74946..d5a1e39153d 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs @@ -34,7 +34,7 @@ use ruma::{ events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType}, serde::Raw, }; -use tokio::sync::{Notify, broadcast::Receiver, mpsc}; +use tokio::sync::{Notify, mpsc}; use tracing::{instrument, trace, warn}; pub(super) use self::state::{ @@ -150,23 +150,6 @@ impl RoomEventCache { Ok((events, subscriber)) } - /// Subscribe to the pinned event cache for this room. - /// - /// This is a persisted view over the pinned events of a room. - /// - /// The pinned events will be initially reloaded from storage, and/or loaded - /// from a network request to fetch the latest pinned events and their - /// relations, to update it as needed. The list of pinned events will - /// also be kept up-to-date as new events are pinned, and new - /// related events show up from other sources. - pub async fn subscribe_to_pinned_events( - &self, - ) -> Result<(Vec, Receiver)> { - let state = self.inner.state.read().await?; - - state.subscribe_to_pinned_events(&self.inner.weak_room).await - } - /// Create or get an event-focused timeline cache for this room. /// /// This creates a timeline centered around a specific event (e.g., for diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index a611f1457fc..857cebb5ce4 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -48,7 +48,7 @@ use ruma::{ room_version_rules::RoomVersionRules, serde::Raw, }; -use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::broadcast::Sender; use tracing::{debug, error, instrument, trace, warn}; use super::{ @@ -361,34 +361,6 @@ impl<'a> RoomEventCacheStateLockReadGuard<'a> { EventCacheStoreLockGuard::is_dirty(&self.store) } - /// Subscribe to the lazily initialized pinned event cache for this - /// room. - /// - /// This is a persisted view over the pinned events of a room. The - /// pinned events will be initially loaded from a network - /// request to fetch the latest pinned events will be performed, - /// to update it as needed. The list of pinned events will also - /// be kept up-to-date as new events are pinned, and new related - /// events show up from sync or backpagination. - /// - /// This requires the room's event cache to be initialized. - pub async fn subscribe_to_pinned_events( - &self, - weak_room: &WeakRoom, - ) -> Result<(Vec, Receiver), EventCacheError> { - let pinned_events_cache = self.state.pinned_events_cache.get_or_try_init(|| { - PinnedEventsCache::new( - weak_room, - self.own_user_id.clone(), - self.room_version_rules.clone(), - self.state.linked_chunk_update_sender.clone(), - self.state.store.clone(), - ) - })?; - - pinned_events_cache.subscribe().await - } - /// Get an event-focused cache for this event and thread mode, if it /// exists. /// From f452ec3300ed005d1b082fb120fd16cc67beaf86 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 May 2026 18:47:16 +0200 Subject: [PATCH 19/22] refactor(sdk): R2D2 replaces UTD on `PinnedEventsCache` without involving `RoomEventCache`. This patch updates R2D2 to fetch the `PinnedEventsCache` from `EventCache` without using `RoomEventCache`. --- .../matrix-sdk/src/event_cache/caches/mod.rs | 10 +++++++ .../event_cache/caches/pinned_events/mod.rs | 2 +- crates/matrix-sdk/src/event_cache/mod.rs | 13 ++++++++++ .../matrix-sdk/src/event_cache/redecryptor.rs | 26 ++++++++++--------- 4 files changed, 38 insertions(+), 13 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 0ea9c0ab1e7..a9026d2251f 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -201,6 +201,16 @@ impl Caches { }) } + /// Get a [`PinnedEventsCache`] if it has been initialised. + /// + /// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache + #[cfg(feature = "e2e-encryption")] + pub(super) fn pinned_events_without_initialisation( + &self, + ) -> Option<&pinned_events::PinnedEventsCache> { + self.pinned_events.get() + } + /// Update all the event caches with a [`JoinedRoomUpdate`]. pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { let Self { room, threads, pinned_events, internals } = &self; diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index 07c5dab51ed..aab70876cf3 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -530,7 +530,7 @@ impl PinnedEventsCache { /// list of decrypted events, and replace them, while alerting observers /// about the update. #[cfg(feature = "e2e-encryption")] - pub(in crate::event_cache) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> { + pub(in super::super) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> { let mut guard = self.inner.state.write().await?; if guard.state.chunk.replace_utds(events) { diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index d1a5ac27297..14878b4487b 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -402,6 +402,19 @@ impl EventCache { Ok((caches_for_room.pinned_events()?.clone(), drop_handles)) } + + /// Return a pinned-events-specific view over the [`EventCache`] if it has + /// been initialised. + #[cfg(feature = "e2e-encryption")] + async fn pinned_events_without_initialisation( + &self, + room_id: &RoomId, + ) -> Result> { + let caches_for_room = self.inner.all_caches_for_room(room_id).await?; + + Ok(caches_for_room.pinned_events_without_initialisation().cloned()) + } + /// Cleanly clear all the rooms' event caches. /// /// This will notify any live observers that the room has been cleared. diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 650eba9720b..56b1903e158 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -370,14 +370,14 @@ impl EventCache { // Phase 1: under the room state write lock, collect cache handles and // perform all room-linked-chunk mutations. We deliberately do NOT call - // replace_utds() on event-focused/pinned caches here to avoid an ABBA deadlock: - // pagination holds an event-focused cache lock and then tries to acquire the - // room state lock (via `save_events`), while this method would hold the - // room state lock and try to acquire event-focused cache locks. - let (pinned_cache, ef_caches) = { + // replace_utds() on event-focused/pinned-evenst caches here to avoid an ABBA + // deadlock: pagination holds an event-focused cache lock and then tries + // to acquire the room state lock (via `save_events`), while this method + // would hold the room state lock and try to acquire event-focused cache + // locks. + let ef_caches = { let mut state = room_cache.state().write().await?; - let pinned_cache = state.pinned_events_cache().cloned(); let ef_caches: Vec<_> = state.event_focused_caches().cloned().collect(); // Consider the room linked chunk. @@ -416,15 +416,17 @@ impl EventCache { Some(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() }), ); - (pinned_cache, ef_caches) + ef_caches }; // Room state write lock is dropped here. - // Phase 2: replace UTDs in pinned and event-focused caches WITHOUT - // holding the room state lock. These caches have their own internal - // locks and don't need the room state lock. - if let Some(pinned_cache) = pinned_cache { - pinned_cache.replace_utds(&events).await?; + // Phase 2: replace UTDs in pinned-events and event-focused caches + // WITHOUT holding the room state lock. These caches have their own + // internal locks and don't need the room state lock. + if let Ok(Some(pinned_events_cache)) = + self.pinned_events_without_initialisation(room_id).await + { + pinned_events_cache.replace_utds(&events).await?; } // TODO: This ain't great for performance; there shouldn't be that many From f4befd3e278a577a14bf6a9e3541cbef65bd4e08 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 May 2026 18:28:54 +0200 Subject: [PATCH 20/22] chore(sdk): Remove `PinnedEventsCache` entirely from `RoomEventCache`. This patch removes deadcode about `PinnedEventsCache` in `RoomEventCache`. --- .../src/event_cache/caches/room/state.rs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index 857cebb5ce4..f2a2f3facc7 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -36,7 +36,6 @@ use matrix_sdk_base::{ sync::Timeline, }; use matrix_sdk_common::executor::spawn; -use once_cell::sync::OnceCell; use ruma::{ EventId, OwnedEventId, OwnedRoomId, OwnedUserId, events::{ @@ -67,7 +66,6 @@ use super::{ event_linked_chunk::EventLinkedChunk, lock, pagination::SharedPaginationStatus, - pinned_events::PinnedEventsCache, read_receipts::compute_unread_counts, }, EventsOrigin, RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, @@ -111,9 +109,6 @@ pub struct RoomEventCacheState { /// permalink). event_focused_caches: HashMap, - /// Cache for pinned events in this room, initialized on-demand. - pinned_events_cache: OnceCell, - pagination_status: SharedObservable, /// A clone of [`super::RoomEventCacheInner::update_sender`]. @@ -261,7 +256,6 @@ impl LockedRoomEventCacheState { room_version_rules, waited_for_initial_prev_token: false, subscriber_count: Default::default(), - pinned_events_cache: OnceCell::new(), automatic_pagination, })) } @@ -282,11 +276,6 @@ impl<'a> lock::Reload for RoomEventCacheStateLockWriteGuard<'a> { async fn reload(&mut self) -> Result<(), EventCacheError> { self.shrink_to_last_chunk().await?; - // Reload the pinned-events. - if let Some(pinned_events_cache) = self.pinned_events_cache.get_mut() { - pinned_events_cache.reload().await?; - } - let diffs = self.state.room_linked_chunk.updates_as_vector_diffs(); // Notify observers about the update. @@ -380,13 +369,6 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { &mut self.state.room_linked_chunk } - /// Get a reference to the [`pinned_events_cache`] if it has been - /// initialized. - #[cfg(any(feature = "e2e-encryption", test))] - pub fn pinned_events_cache(&self) -> Option<&PinnedEventsCache> { - self.state.pinned_events_cache.get() - } - /// Get a reference to all the live [`event_focused_caches`]. #[cfg(feature = "e2e-encryption")] pub fn event_focused_caches(&self) -> impl Iterator { From e82cc0f60e9e5fd7605d6ff97e758c14073229b9 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 May 2026 18:13:36 +0200 Subject: [PATCH 21/22] test(sdk,ui): Update tests to the new API. --- .../integration/timeline/pinned_event.rs | 15 ++--- .../tests/integration/room/pinned_events.rs | 62 +++++++++++-------- 2 files changed, 43 insertions(+), 34 deletions(-) diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs b/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs index 53b4b933ae8..cf138ef7969 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs @@ -771,10 +771,10 @@ async fn test_redacted_events_are_reflected_in_sync() { let room_id = room_id!("!test:localhost"); let f = EventFactory::new().room(room_id).sender(*BOB); - let event_id = event_id!("$1"); + let pinned_event_id = event_id!("$1"); let pinned_event = f .text_msg("in the end") - .event_id(event_id!("$1")) + .event_id(pinned_event_id) .server_ts(MilliSecondsSinceUnixEpoch::now()) .into_raw_sync(); @@ -784,7 +784,7 @@ async fn test_redacted_events_are_reflected_in_sync() { // Mock /relations for pinned timeline event. server .mock_room_relations() - .match_target_event(event_id.to_owned()) + .match_target_event(pinned_event_id.to_owned()) .match_limit(256) .ok(RoomRelationsResponseTemplate::default().events(Vec::>::new())) .mock_once() @@ -794,7 +794,7 @@ async fn test_redacted_events_are_reflected_in_sync() { // Load initial timeline items: a text message and a `m.room.pinned_events` with // event $1 let room = PinnedEventsSync::new(room_id) - .with_pinned_event_ids(vec!["$1"]) + .with_pinned_event_ids(vec![pinned_event_id.as_str()]) .mock_and_sync(&client, &server) .await .expect("Sync failed"); @@ -818,12 +818,13 @@ async fn test_redacted_events_are_reflected_in_sync() { assert_eq!(items.len(), 1 + 1); // event item + a date divider assert!(items[0].is_date_divider()); - assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "in the end"); + assert_eq!(items[1].as_event().unwrap().event_id(), Some(pinned_event_id)); assert_pending!(timeline_stream); + let redaction_event_id = event_id!("$2"); let redaction_event = f - .redaction(event_id!("$1")) - .event_id(event_id!("$2")) + .redaction(pinned_event_id) + .event_id(redaction_event_id) .server_ts(MilliSecondsSinceUnixEpoch::now()) .into_raw_sync(); diff --git a/crates/matrix-sdk/tests/integration/room/pinned_events.rs b/crates/matrix-sdk/tests/integration/room/pinned_events.rs index 9f9d2ce9815..b1d8f12c311 100644 --- a/crates/matrix-sdk/tests/integration/room/pinned_events.rs +++ b/crates/matrix-sdk/tests/integration/room/pinned_events.rs @@ -171,8 +171,10 @@ async fn test_pinned_events_are_loaded_from_network_then_are_reloaded_from_stora .build() .await; + let event_cache = client.event_cache(); + // Subscribe the event cache to sync updates. - client.event_cache().subscribe().unwrap(); + event_cache.subscribe().unwrap(); // Sync the room with the pinned event ID in the room state. // @@ -181,19 +183,20 @@ async fn test_pinned_events_are_loaded_from_network_then_are_reloaded_from_stora // events. let pinned_events_state = f.room_pinned_events(vec![pinned_event_id.to_owned()]); - let room = server + let _room = server .sync_room( &client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![pinned_events_state.into()]), ) .await; - // Get the room event cache and subscribe to pinned events. - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + // Get the pinned events cache and subscribe to it. + let (pinned_events_cache, _drop_handles) = + event_cache.pinned_events(room_id).await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventsCache::new() which - // spawns a task that calls reload_from_storage() first. - let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); + // Getting the pinned events cache triggers `PinnedEventsCache::new()` which + // spawns a task that calls `reload_from_storage()` first. + let (events, mut subscriber) = pinned_events_cache.subscribe().await.unwrap(); let mut events = events.into(); // Wait for the background task to reload the events. @@ -229,17 +232,19 @@ async fn test_pinned_events_are_loaded_from_network_then_are_reloaded_from_stora .build() .await; + let event_cache = client.event_cache(); + // Subscribe the event cache to sync updates. - client.event_cache().subscribe().unwrap(); + event_cache.subscribe().unwrap(); - let room = client.get_room(room_id).unwrap(); + let _room = client.get_room(room_id).unwrap(); - // Get the room event cache and subscribe to pinned events. - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + // Get the pinned events cache and subscribe to it. + let (pinned_events_cache, _drop_handles) = event_cache.pinned_events(room_id).await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventsCache::new() which - // spawns a task that calls reload_from_storage() first. - let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); + // Getting the pinned events cache triggers `PinnedEventsCache::new()` which + // spawns a task that calls `reload_from_storage()` first. + let (events, mut subscriber) = pinned_events_cache.subscribe().await.unwrap(); let mut events = events.into(); // Wait for the background task to reload the events from storage. @@ -277,8 +282,10 @@ async fn test_pinned_events_are_reloaded_from_storage_from_many_chunks() { let server = MatrixMockServer::new().await; let client = server.client_builder().build().await; + let event_cache = client.event_cache(); + // Create a non empty Event Cache store containing two chunks. - client.event_cache().subscribe().unwrap(); + event_cache.subscribe().unwrap(); client .event_cache_store() .lock() @@ -308,14 +315,14 @@ async fn test_pinned_events_are_reloaded_from_storage_from_many_chunks() { .await .unwrap(); - let room = server.sync_room(&client, JoinedRoomBuilder::new(room_id)).await; + let _room = server.sync_room(&client, JoinedRoomBuilder::new(room_id)).await; - // Get the room event cache and subscribe to pinned events. - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + // Get the pinned events cache and subscribe to it. + let (pinned_events_cache, _drop_handles) = event_cache.pinned_events(room_id).await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventsCache::new() which - // spawns a task that calls reload_from_storage() first. - let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); + // Getting the pinned events cache triggers `PinnedEventsCache::new()` which + // spawns a task that calls `reload_from_storage()` first. + let (events, mut subscriber) = pinned_events_cache.subscribe().await.unwrap(); let mut events = events.into(); // Wait for the background task to reload the events. @@ -368,9 +375,10 @@ async fn test_pinned_events_dont_include_thread_responses() { .await; let client = server.client_builder().build().await; + let event_cache = client.event_cache(); // Subscribe the event cache to sync updates. - client.event_cache().subscribe().unwrap(); + event_cache.subscribe().unwrap(); // Sync the room with the pinned event ID in the room state. // @@ -379,7 +387,7 @@ async fn test_pinned_events_dont_include_thread_responses() { // events. let pinned_events_state = f.room_pinned_events(vec![pinned_event_id.to_owned()]); - let room = server + let _room = server .sync_room( &client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![pinned_events_state.into()]), @@ -387,11 +395,11 @@ async fn test_pinned_events_dont_include_thread_responses() { .await; // Get the room event cache and subscribe to pinned events. - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + let (pinned_events_cache, _drop_handles) = event_cache.pinned_events(room_id).await.unwrap(); - // Subscribe to pinned events - this triggers PinnedEventsCache::new() which - // spawns a task that calls reload_from_storage() first. - let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); + // Getting the pinned events cache triggers `PinnedEventsCache::new()` which + // spawns a task that calls `reload_from_storage()` first. + let (events, mut subscriber) = pinned_events_cache.subscribe().await.unwrap(); let mut events = events.into(); // Wait for the background task to reload the events. From 42fef1644944b53f4f97dcb59e02147a33bdf3fb Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 19 May 2026 12:02:53 +0200 Subject: [PATCH 22/22] chore: Remove an unused `reload` method. --- .../src/event_cache/caches/pinned_events/mod.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs index aab70876cf3..3f56228309e 100644 --- a/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/pinned_events/mod.rs @@ -47,7 +47,6 @@ use super::{ EventLocation, TimelineVectorDiffs, event_linked_chunk::{EventLinkedChunk, sort_positions_descending}, lock, - lock::Reload as _, room::RoomEventCacheLinkedChunkUpdate, }; use crate::{Room, client::WeakClient, config::RequestConfig, room::WeakRoom}; @@ -760,14 +759,6 @@ impl PinnedEventsCache { Ok(Some(loaded_events)) } - - /// Force to reload the pinned events. - // - // TODO(@hywan): Temporary fix. All the states must be in a single struct behind - // the cross-process lock instead of being dispatched in each cache. - pub(super) async fn reload(&self) -> Result<()> { - self.inner.state.write().await?.reload().await - } } impl fmt::Debug for PinnedEventsCache {