Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ rand = { version = "0.10.1", default-features = false, features = ["std", "std_r
regex = { version = "1.12.2", default-features = false }
reqwest = { version = "0.13.1", default-features = false }
rmp-serde = { version = "1.3.0", default-features = false }
ruma = { version = "0.15.1", features = [
ruma = { version = "0.15.1", git = "https://github.com/bradtgmurray/ruma", branch = "msc4471-event-streams", features = [
"client-api-c",
"compat-unset-avatar",
"compat-upload-signatures",
Expand Down
3 changes: 2 additions & 1 deletion crates/matrix-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ features = ["docsrs"]
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]

[features]
default = ["e2e-encryption", "automatic-room-key-forwarding", "sqlite"]
default = ["e2e-encryption", "automatic-room-key-forwarding", "sqlite", "experimental-event-streams"]
testing = [
"matrix-sdk-sqlite?/testing",
"matrix-sdk-indexeddb?/testing",
Expand Down Expand Up @@ -81,6 +81,7 @@ federation-api = ["ruma/federation-api-c"]
uniffi = ["dep:uniffi", "matrix-sdk-base/uniffi", "dep:matrix-sdk-ffi-macros"]

experimental-widgets = ["dep:uuid", "experimental-send-custom-to-device"]
experimental-event-streams = ["e2e-encryption", "ruma/unstable-msc4471"]

docsrs = ["e2e-encryption", "sqlite", "indexeddb", "sso-login", "qrcode", "federation-api"]

Expand Down
2 changes: 2 additions & 0 deletions crates/matrix-sdk/changelog.d/6607.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add experimental support for MSC4471 event streams, allowing clients to send
transient updates to room messages and subscribe to updates from other devices.
15 changes: 15 additions & 0 deletions crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(feature = "experimental-event-streams")]
use std::sync::OnceLock;
use std::{
collections::{BTreeMap, BTreeSet, btree_map},
fmt::{self, Debug},
Expand Down Expand Up @@ -87,6 +89,8 @@ use self::{
caches::{Cache, CachedValue, ClientCaches},
futures::SendRequest,
};
#[cfg(feature = "experimental-event-streams")]
use crate::event_streams::EventStreams;
use crate::{
Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
Room, SessionTokens, TransmissionProgress,
Expand Down Expand Up @@ -394,6 +398,9 @@ pub(crate) struct ClientInner {
#[cfg(feature = "e2e-encryption")]
pub(crate) duplicate_key_upload_error_sender:
broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,

#[cfg(feature = "experimental-event-streams")]
event_streams: OnceLock<EventStreams>,
}

impl ClientInner {
Expand Down Expand Up @@ -464,6 +471,8 @@ impl ClientInner {
task_monitor: TaskMonitor::new(),
#[cfg(feature = "e2e-encryption")]
duplicate_key_upload_error_sender: broadcast::channel(1).0,
#[cfg(feature = "experimental-event-streams")]
event_streams: OnceLock::new(),
};

#[allow(clippy::let_and_return)]
Expand Down Expand Up @@ -514,6 +523,12 @@ impl Client {
ClientBuilder::new()
}

/// Get the client-owned event stream manager.
#[cfg(feature = "experimental-event-streams")]
pub fn event_streams(&self) -> EventStreams {
self.inner.event_streams.get_or_init(|| EventStreams::new(self.clone())).clone()
}

pub(crate) fn base_client(&self) -> &BaseClient {
&self.inner.base_client
}
Expand Down
158 changes: 158 additions & 0 deletions crates/matrix-sdk/src/event_streams/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//! Core support for MSC4471 event streams.
//!
//! Event streams let a client publish short-lived, device-scoped updates for a
//! room message without committing each intermediate update to room history.
//! This module keeps that state in memory and exposes publisher and subscriber
//! handles around the Ruma MSC4471 event content types.

mod publisher;
mod subscription;

use std::collections::BTreeMap;

use ruma::{
DeviceId, OwnedEventId, OwnedRoomId, TransactionId, UserId,
api::client::to_device::send_event_to_device::v3::Request as ToDeviceRequest,
events::{AnyToDeviceEventContent, ToDeviceEventContent},
serde::Raw,
to_device::DeviceIdOrAllDevices,
};
use thiserror::Error;
use tokio::sync::broadcast;

pub use self::{
publisher::{EventStreamPublisher, EventStreamPublisherOptions, EventStreamPublishers},
subscription::{
EventStreamSubscriberUpdate, EventStreamSubscription, EventStreamSubscriptions,
},
};
use crate::{Client, HttpError, room::edit::EditError};

/// A specialized result for event stream operations.
pub type Result<T, E = EventStreamError> = std::result::Result<T, E>;

/// An error returned by an event stream operation.
#[derive(Debug, Error)]
pub enum EventStreamError {
/// The operation needs the current device ID, but the client is not logged
/// in.
#[error("event streams require a logged-in client")]
AuthenticationRequired,

/// The requested stream is not active in this client's in-memory state.
#[error("unknown event stream")]
UnknownStream,

/// The descriptor event is not an unredacted room message with a stream
/// descriptor.
#[error("stream descriptor event is not an unredacted room message with a stream descriptor")]
InvalidDescriptorEvent,

/// The descriptor event does not have a sender.
#[error("stream descriptor event does not have a sender")]
MissingDescriptorSender,

/// An HTTP request failed.
#[error(transparent)]
Http(#[from] HttpError),

/// A matrix-sdk operation failed.
#[error(transparent)]
Sdk(#[from] crate::Error),

/// Creating a final edit event failed.
#[error(transparent)]
Edit(#[from] EditError),

/// Serializing a to-device payload failed.
#[error(transparent)]
Json(#[from] serde_json::Error),

/// The event stream update receiver lagged.
#[error("event stream update receiver lagged")]
Lagged,
}

impl From<broadcast::error::RecvError> for EventStreamError {
fn from(value: broadcast::error::RecvError) -> Self {
match value {
broadcast::error::RecvError::Closed => Self::UnknownStream,
broadcast::error::RecvError::Lagged(_) => Self::Lagged,
}
}
}

/// Identifies a stream by the room event that advertised it.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct StreamId {
/// The room containing the descriptor event.
pub room_id: OwnedRoomId,

/// The message event that advertised the stream.
pub event_id: OwnedEventId,
}

impl StreamId {
/// Create a stream identifier from its descriptor event.
pub fn new(room_id: OwnedRoomId, event_id: OwnedEventId) -> Self {
Self { room_id, event_id }
}
}

/// Client-owned namespace for publishing and subscribing to event streams.
#[derive(Clone, Debug)]
pub struct EventStreams {
publishers: EventStreamPublishers,
subscriptions: EventStreamSubscriptions,
}

impl EventStreams {
pub(crate) fn new(client: Client) -> Self {
let publishers = EventStreamPublishers::new(client.clone());
let subscriptions = EventStreamSubscriptions::new(client);

Self { publishers, subscriptions }
}

/// Access publisher-side event stream operations.
pub fn publishers(&self) -> EventStreamPublishers {
self.publishers.clone()
}

/// Access subscriber-side event stream operations.
pub fn subscriptions(&self) -> EventStreamSubscriptions {
self.subscriptions.clone()
}
}

/// Send one typed to-device event to one specific device.
async fn send_to_device<C>(
client: &Client,
user_id: &UserId,
device_id: &DeviceId,
content: C,
) -> Result<()>
where
C: ToDeviceEventContent,
{
let event_type = content.event_type();
let messages = BTreeMap::from([(
user_id.to_owned(),
BTreeMap::from([(
DeviceIdOrAllDevices::DeviceId(device_id.to_owned()),
raw_content(&content)?,
)]),
)]);

let request = ToDeviceRequest::new_raw(event_type, TransactionId::new(), messages);
client.send(request).await?;

Ok(())
}

fn raw_content<C>(content: &C) -> Result<Raw<AnyToDeviceEventContent>>
where
C: ToDeviceEventContent,
{
Ok(Raw::new(content)?.cast_unchecked())
}
Loading
Loading