Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e82d683
chore(sdk): Implement `Debug` for `PinnedEventCache`.
Hywan May 12, 2026
3bc164f
doc(sdk): Remove a useless `TODO`.
Hywan May 12, 2026
147d35e
refactor(sdk): Rename `PinnedEventCache` to `PinnedEventsCache`.
Hywan May 12, 2026
c1aad24
chore(sdk): `PinnedEventsCache::new` takes a `WeakRoom` instead of a …
Hywan May 12, 2026
83e2bcd
refactor(sdk): Rename `PinnedEventsCacheStateLock` to `LockedPinnedEv…
Hywan May 12, 2026
a6d4e87
doc(sdk): Fix an intra-link.
Hywan May 12, 2026
54037fb
refactor(sdk): Introduce `PinnedEventsCacheUpdateSender`.
Hywan May 12, 2026
1c4c67b
doc(sdk): Fix document in thread cache.
Hywan May 12, 2026
a9b2462
refactor(sdk): Introduce `PinnedEventsCacheInner`.
Hywan May 12, 2026
c77fee4
feat(sdk): Add `PinnedEventsCacheInner::update_sender`.
Hywan May 12, 2026
d333a48
feat(sdk): Add `EventCache::pinned_events`.
Hywan May 12, 2026
d370992
feat(sdk): Add `aggregator_timeline_for_pinned_events`.
Hywan May 13, 2026
709e184
refactor(sdk): Handle the `Timeline` for pinned-events in `Caches` in…
Hywan May 18, 2026
a571e74
feat(sdk): Deduplicate pinned events.
Hywan May 18, 2026
b5ea61b
feat(sdk): `PinnedEventsCache` handles redaction.
Hywan May 18, 2026
849713a
chore(sdk): Simplify imports.
Hywan May 18, 2026
009621a
feat(ui): `TimelineFocusKind::PinnedEvents` now uses `PinnedEventsCac…
Hywan May 18, 2026
95a027f
chore(sdk): Remove `subscribe_to_pinned_events`.
Hywan May 18, 2026
f452ec3
refactor(sdk): R2D2 replaces UTD on `PinnedEventsCache` without invol…
Hywan May 18, 2026
f4befd3
chore(sdk): Remove `PinnedEventsCache` entirely from `RoomEventCache`.
Hywan May 18, 2026
e82cc0f
test(sdk,ui): Update tests to the new API.
Hywan May 18, 2026
42fef16
chore: Remove an unused `reload` method.
Hywan May 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ js-sys = { version = "0.3.82", default-features = false, features = ["std"] }
mime = { version = "0.3.17", default-features = false }
oauth2 = { version = "5.0.0", default-features = false, features = ["timing-resistant-secret-traits"] }
oauth2-reqwest = { version = "0.1.0-alpha.3", default-features = false }
once_cell = { version = "1.21", default-features = false }
pbkdf2 = { version = "0.12.2", default-features = false }
pin-project-lite = { version = "0.2.16", default-features = false }
proc-macro2 = { version = "1.0.106", default-features = false }
Expand Down
17 changes: 8 additions & 9 deletions crates/matrix-sdk-ui/src/timeline/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use imbl::Vector;
use matrix_sdk::{
deserialized_responses::TimelineEvent,
event_cache::{
DecryptionRetryRequest, EventCache, EventFocusedCache, PaginationStatus, RoomEventCache,
ThreadEventCache, TimelineVectorDiffs,
DecryptionRetryRequest, EventCache, EventFocusedCache, PaginationStatus, PinnedEventsCache,
RoomEventCache, ThreadEventCache, TimelineVectorDiffs,
},
send_queue::{
LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
Expand Down Expand Up @@ -148,7 +148,7 @@ pub(in crate::timeline) enum TimelineFocusKind {

PinnedEvents {
/// The cache holding all the events for this focus.
event_cache: RoomEventCache,
event_cache: PinnedEventsCache,
},
}

Expand Down Expand Up @@ -395,9 +395,9 @@ impl<P: RoomDataProvider> TimelineController<P> {
root_event_id: root_event_id.clone(),
},

TimelineFocus::PinnedEvents => {
TimelineFocusKind::PinnedEvents { event_cache: event_cache.room(room_id).await?.0 }
}
TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents {
event_cache: event_cache.pinned_events(room_id).await?.0,
},
};

let focus = Arc::new(focus);
Expand Down Expand Up @@ -1447,8 +1447,7 @@ impl TimelineController {
}

TimelineFocusKind::PinnedEvents { event_cache } => {
let (initial_events, pinned_events_recv) =
event_cache.subscribe_to_pinned_events().await?;
let (initial_events, pinned_events_recv) = event_cache.subscribe().await?;

let has_events = !initial_events.is_empty();

Expand All @@ -1463,7 +1462,7 @@ impl TimelineController {
.client()
.task_monitor()
.spawn_infinite_task(
"timeline::pinned_event_cache_updates",
"timeline::pinned_events_cache_updates",
pinned_events_task(event_cache.clone(), self.clone(), pinned_events_recv),
)
.abort_on_drop();
Expand Down
7 changes: 3 additions & 4 deletions crates/matrix-sdk-ui/src/timeline/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::BTreeSet;

use matrix_sdk::{
event_cache::{
EventFocusThreadMode, EventFocusedCache, EventsOrigin, RoomEventCache,
EventFocusThreadMode, EventFocusedCache, EventsOrigin, PinnedEventsCache, RoomEventCache,
RoomEventCacheSubscriber, RoomEventCacheUpdate, ThreadEventCache, TimelineVectorDiffs,
},
send_queue::RoomSendQueueUpdate,
Expand All @@ -38,7 +38,7 @@ use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEvent
)
)]
pub(in crate::timeline) async fn pinned_events_task(
room_event_cache: RoomEventCache,
pinned_events_cache: PinnedEventsCache,
timeline_controller: TimelineController,
mut pinned_events_recv: Receiver<TimelineVectorDiffs>,
) {
Expand All @@ -54,8 +54,7 @@ pub(in crate::timeline) async fn pinned_events_task(
// The updates might have lagged, but the room event cache might have
// events, so retrieve them and add them back again to the timeline,
// after clearing it.
let (initial_events, _) = match room_event_cache.subscribe_to_pinned_events().await
{
let (initial_events, _) = match pinned_events_cache.subscribe().await {
Ok(initial_events) => initial_events,
Err(err) => {
error!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,10 +771,10 @@ async fn test_redacted_events_are_reflected_in_sync() {
let room_id = room_id!("!test:localhost");

let f = EventFactory::new().room(room_id).sender(*BOB);
let event_id = event_id!("$1");
let pinned_event_id = event_id!("$1");
let pinned_event = f
.text_msg("in the end")
.event_id(event_id!("$1"))
.event_id(pinned_event_id)
.server_ts(MilliSecondsSinceUnixEpoch::now())
.into_raw_sync();

Expand All @@ -784,7 +784,7 @@ async fn test_redacted_events_are_reflected_in_sync() {
// Mock /relations for pinned timeline event.
server
.mock_room_relations()
.match_target_event(event_id.to_owned())
.match_target_event(pinned_event_id.to_owned())
.match_limit(256)
.ok(RoomRelationsResponseTemplate::default().events(Vec::<Raw<AnyTimelineEvent>>::new()))
.mock_once()
Expand All @@ -794,7 +794,7 @@ async fn test_redacted_events_are_reflected_in_sync() {
// Load initial timeline items: a text message and a `m.room.pinned_events` with
// event $1
let room = PinnedEventsSync::new(room_id)
.with_pinned_event_ids(vec!["$1"])
.with_pinned_event_ids(vec![pinned_event_id.as_str()])
.mock_and_sync(&client, &server)
.await
.expect("Sync failed");
Expand All @@ -818,12 +818,13 @@ async fn test_redacted_events_are_reflected_in_sync() {

assert_eq!(items.len(), 1 + 1); // event item + a date divider
assert!(items[0].is_date_divider());
assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "in the end");
assert_eq!(items[1].as_event().unwrap().event_id(), Some(pinned_event_id));
assert_pending!(timeline_stream);

let redaction_event_id = event_id!("$2");
let redaction_event = f
.redaction(event_id!("$1"))
.event_id(event_id!("$2"))
.redaction(pinned_event_id)
.event_id(redaction_event_id)
.server_ts(MilliSecondsSinceUnixEpoch::now())
.into_raw_sync();

Expand Down
1 change: 1 addition & 0 deletions crates/matrix-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ mime.workspace = true
mime2ext = "0.1.54"
oauth2.workspace = true
oauth2-reqwest.workspace = true
once_cell.workspace = true
percent-encoding = "2.3.2"
pin-project-lite.workspace = true
rand = { workspace = true, optional = true }
Expand Down
54 changes: 54 additions & 0 deletions crates/matrix-sdk/src/event_cache/caches/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,57 @@ pub async fn aggregate_timeline_for_threads(

Ok(new_events_by_thread)
}

pub fn aggregate_timeline_for_pinned_events(
timeline: &Timeline,
pinned_event_ids: &[OwnedEventId],
redaction_rules: &RedactionRules,
) -> Timeline {
let mut new_timeline = Timeline {
limited: timeline.limited,
prev_batch: timeline.prev_batch.clone(),
events: Vec::new(),
};

// No events are pinned? The `Timeline` must be empty.
if pinned_event_ids.is_empty() {
return new_timeline;
}

// Look for events that relate to pinned events. We already know the
// pinned-events, we don't need to look for them. We are only interested by
// related events.
for event in &timeline.events {
match extract_relation(event.raw()) {
// Ohh, this event relates to another event!
Some((relation_type, related_event_id)) => match relation_type {
// `event` relates to a thread: not what we want.
RelationType::Thread => {}

// `event` represents an annotation (e.g. reactions), a replacement (an edit), a
// reference or something custom. Let's see if the `related_event_id` is a
// pinned-event.
RelationType::Annotation
| RelationType::Replacement
Comment thread
poljar marked this conversation as resolved.
| RelationType::Reference
| _ => {
if pinned_event_ids.contains(&related_event_id) {
new_timeline.events.push(event.clone());
}
}
},

// No explicit relation, but it can be a redaction of a pinned-event!
None => {
if let Some(redaction_target) =
extract_redaction_target(event.raw(), redaction_rules)
&& pinned_event_ids.contains(&redaction_target)
{
new_timeline.events.push(event.clone());
}
}
}
}

new_timeline
}
89 changes: 84 additions & 5 deletions crates/matrix-sdk/src/event_cache/caches/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use matrix_sdk_base::{
linked_chunk::Position,
sync::{JoinedRoomUpdate, LeftRoomUpdate},
};
use once_cell::sync::OnceCell;
use ruma::{OwnedEventId, OwnedRoomId, RoomId, room_version_rules::RoomVersionRules};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, broadcast::Sender, mpsc};

Expand All @@ -43,6 +44,7 @@ pub mod thread;
pub(super) struct Caches {
pub room: room::RoomEventCache,
pub threads: Arc<RwLock<HashMap<OwnedEventId, thread::ThreadEventCache>>>,
pub pinned_events: OnceCell<pinned_events::PinnedEventsCache>,
internals: CachesInternals,
}

Expand Down Expand Up @@ -124,6 +126,7 @@ impl Caches {
Ok(Self {
room: room_event_cache,
threads: Arc::new(RwLock::new(HashMap::new())),
pinned_events: OnceCell::new(),
internals: CachesInternals { store, linked_chunk_update_sender, room_version_rules },
})
}
Expand Down Expand Up @@ -183,9 +186,34 @@ impl Caches {
)
}

/// Get or create a [`PinnedEventsCache`].
///
/// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache
pub fn pinned_events(&self) -> Result<&pinned_events::PinnedEventsCache> {
self.pinned_events.get_or_try_init(|| {
pinned_events::PinnedEventsCache::new(
self.room.weak_room(),
self.room.own_user_id().clone(),
self.internals.room_version_rules.clone(),
self.internals.linked_chunk_update_sender.clone(),
self.internals.store.clone(),
)
})
}

/// Get a [`PinnedEventsCache`] if it has been initialised.
///
/// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache
#[cfg(feature = "e2e-encryption")]
pub(super) fn pinned_events_without_initialisation(
&self,
) -> Option<&pinned_events::PinnedEventsCache> {
self.pinned_events.get()
}

/// Update all the event caches with a [`JoinedRoomUpdate`].
pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
let Self { room, threads, internals } = &self;
let Self { room, threads, pinned_events, internals } = &self;

// Room.
{
Expand Down Expand Up @@ -224,12 +252,24 @@ impl Caches {
}
}

// Pinned-events.
if let Some(pinned_events) = pinned_events.get() {
let mut updates = updates.clone();
updates.timeline = aggregator::aggregate_timeline_for_pinned_events(
&updates.timeline,
&pinned_events.state().read().await?.current_event_ids(),
&internals.room_version_rules.redaction,
);

pinned_events.handle_joined_room_update(updates).await?;
}

Ok(())
}

/// Update all the event caches with a [`LeftRoomUpdate`].
pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
let Self { room, threads, internals } = &self;
let Self { room, threads, pinned_events, internals } = &self;

// Room.
{
Expand Down Expand Up @@ -268,6 +308,18 @@ impl Caches {
}
}

// Pinned-events.
if let Some(pinned_events) = pinned_events.get() {
let mut updates = updates.clone();
updates.timeline = aggregator::aggregate_timeline_for_pinned_events(
&updates.timeline,
&pinned_events.state().read().await?.current_event_ids(),
&internals.room_version_rules.redaction,
);

pinned_events.handle_left_room_update(updates).await?;
}

Ok(())
}

Expand Down Expand Up @@ -305,13 +357,19 @@ pub(super) struct ResetCaches<'c> {
thread::OwnedThreadEventCacheStateLockWriteGuard,
thread::ThreadEventCacheUpdateSender,
)>,
pinned_events_lock: Option<(
pinned_events::PinnedEventsCacheStateLockWriteGuard<'c>,
pinned_events::PinnedEventsCacheUpdateSender,
)>,
}

impl<'c> ResetCaches<'c> {
/// Create a new [`ResetCaches`].
///
/// It can fail if acquiring an exclusive lock fails.
async fn new(Caches { room, threads, internals: _ }: &'c mut Caches) -> Result<Self> {
async fn new(
Caches { room, threads, pinned_events, internals: _ }: &'c mut Caches,
) -> Result<Self> {
// Acquire an exclusive access to the state of the room.
let room_lock = (room.state().write().await?, room.update_sender().clone());

Expand All @@ -325,7 +383,14 @@ impl<'c> ResetCaches<'c> {
.push((thread.state().write_owned().await?, thread.update_sender().clone()));
}

Ok(Self { room_lock, threads_lock, thread_locks })
// Acquire an exclusive access to the pinned-events if any.
let pinned_events_lock = if let Some(pinned_events) = pinned_events.get_mut() {
Some((pinned_events.state().write().await?, pinned_events.update_sender().clone()))
} else {
None
};

Ok(Self { room_lock, threads_lock, thread_locks, pinned_events_lock })
}

/// Reset all the event caches, and broadcast the [`TimelineVectorDiffs`].
Expand All @@ -335,8 +400,9 @@ impl<'c> ResetCaches<'c> {
///
/// It can fail if resetting an event cache fails.
pub async fn reset_all(self) -> Result<()> {
let Self { room_lock, threads_lock, thread_locks } = self;
let Self { room_lock, threads_lock, thread_locks, pinned_events_lock } = self;

// Room.
{
let (mut room_state, room_update_sender) = room_lock;

Expand All @@ -350,6 +416,7 @@ impl<'c> ResetCaches<'c> {
);
}

// Threads.
{
for thread_lock in thread_locks {
let (mut thread_state, thread_update_sender) = thread_lock;
Expand All @@ -370,6 +437,18 @@ impl<'c> ResetCaches<'c> {
drop(threads_lock);
}

// Pinned-events.
{
if let Some((mut pinned_events_state, pinned_events_update_sender)) = pinned_events_lock
{
let updates_as_vector_diffs = pinned_events_state.reset().await?;
pinned_events_update_sender.send(TimelineVectorDiffs {
diffs: updates_as_vector_diffs,
origin: EventsOrigin::Cache,
});
}
}

Ok(())
}
}
Expand Down
Loading
Loading