From 09cffe9e47dcc36b6679b4ed68112ed3b71ce12b Mon Sep 17 00:00:00 2001 From: SimonThormeyer Date: Wed, 20 May 2026 10:42:04 +0200 Subject: [PATCH 1/2] refactor: `conversation_mut()` closure returns a `BoxFuture` --- .../conversation/conversation_guard/mod.rs | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/crypto/src/mls/conversation/conversation_guard/mod.rs b/crypto/src/mls/conversation/conversation_guard/mod.rs index cbfe491e27..4ada811263 100644 --- a/crypto/src/mls/conversation/conversation_guard/mod.rs +++ b/crypto/src/mls/conversation/conversation_guard/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{future::Future, pin::Pin, sync::Arc}; use async_lock::{RwLock, RwLockReadGuard}; use core_crypto_keystore::{CryptoKeystoreMls as _, Database}; @@ -16,6 +16,13 @@ mod encrypt; mod history_sharing; mod merge; +/// The return type of the operation closure passed to [`ConversationGuard::conversation_mut`]. +/// On non-WASM targets the future must also be [`Send`]. +#[cfg(not(target_os = "unknown"))] +pub(crate) type BoxFuture<'a, T> = Pin> + Send + 'a>>; +#[cfg(target_os = "unknown")] +pub(crate) type BoxFuture<'a, T> = Pin> + 'a>>; + /// A Conversation Guard wraps an [`Arc>`]. /// /// The conversation is ultimately owned by the conversation cache, but we take an `Arc` @@ -61,7 +68,7 @@ impl ConversationGuard { /// an error. pub(crate) async fn conversation_mut( &mut self, - operation: impl AsyncFnOnce(&mut MlsConversation) -> Result, + operation: impl FnOnce(&mut MlsConversation) -> BoxFuture<'_, T>, ) -> Result { // we can't get the database if the transaction context has been invalidated, // and we want to have that error first before evaluating anything in the operation. @@ -102,9 +109,14 @@ impl ConversationGuard { .map_err(RecursiveError::transaction("getting mls groups"))?; let id = self - .conversation_mut(async |conversation| { - conversation.wipe_associated_entities(&provider).await?; - Ok(conversation.id().to_owned()) + .conversation_mut({ + let provider = provider.clone(); + |conversation| { + Box::pin(async move { + conversation.wipe_associated_entities(&provider).await?; + Ok(conversation.id().to_owned()) + }) + } }) .await?; provider From 9f721049f6e09a0a8c48ecb2904cb7ad6e652ef6 Mon Sep 17 00:00:00 2001 From: SimonThormeyer Date: Wed, 20 May 2026 10:42:33 +0200 Subject: [PATCH 2/2] refactor: create a `BoxFuture` when calling `conversation_mut()` --- .../conversation/conversation_guard/commit.rs | 199 ++++++++-------- .../conversation_guard/decrypt/mod.rs | 223 ++++++++++-------- .../conversation_guard/encrypt.rs | 27 ++- .../conversation_guard/history_sharing.rs | 44 ++-- .../conversation/conversation_guard/merge.rs | 20 +- crypto/src/mls/conversation/merge.rs | 14 +- .../test_utils/test_conversation/proposal.rs | 31 ++- .../conversation/external_commit.rs | 26 +- 8 files changed, 317 insertions(+), 267 deletions(-) diff --git a/crypto/src/mls/conversation/conversation_guard/commit.rs b/crypto/src/mls/conversation/conversation_guard/commit.rs index 8fc8e8391d..17c2e4ed3b 100644 --- a/crypto/src/mls/conversation/conversation_guard/commit.rs +++ b/crypto/src/mls/conversation/conversation_guard/commit.rs @@ -32,9 +32,11 @@ impl ConversationGuard { pub(super) async fn merge_commit(&mut self) -> Result<()> { let provider = self.crypto_provider().await?; let (conversation_id, epoch) = self - .conversation_mut(async |conversation| { - conversation.commit_accepted(&provider).await?; - Ok((conversation.id().to_owned(), conversation.group.epoch().as_u64())) + .conversation_mut(|conversation| { + Box::pin(async move { + conversation.commit_accepted(&provider).await?; + Ok((conversation.id().to_owned(), conversation.group.epoch().as_u64())) + }) }) .await?; self.central_context @@ -69,26 +71,28 @@ impl ConversationGuard { let backend = self.crypto_provider().await?; let credential = self.credential().await?; - self.conversation_mut(async move |conversation| { - let signer = credential.signature_key(); - let (commit, welcome, group_info) = conversation - .group - .add_members(&backend, signer, key_packages.clone()) - .await - .map_err(|err| { - if Self::err_is_duplicate_signature_key(&err) { - let affected_clients = Self::clients_with_duplicate_signature_keys(key_packages.as_ref()); - Error::DuplicateSignature { affected_clients } - } else { - MlsError::wrap("group add members")(err).into() - } - })?; - - Ok(MlsCommitBundle { - commit, - welcome: Some(welcome), - group_info: Self::group_info(group_info)?, - encrypted_message: None, + self.conversation_mut(|conversation| { + Box::pin(async move { + let signer = credential.signature_key(); + let (commit, welcome, group_info) = conversation + .group + .add_members(&backend, signer, key_packages.clone()) + .await + .map_err(|err| { + if Self::err_is_duplicate_signature_key(&err) { + let affected_clients = Self::clients_with_duplicate_signature_keys(key_packages.as_ref()); + Error::DuplicateSignature { affected_clients } + } else { + MlsError::wrap("group add members")(err).into() + } + })?; + + Ok(MlsCommitBundle { + commit, + welcome: Some(welcome), + group_info: Self::group_info(group_info)?, + encrypted_message: None, + }) }) }) .await @@ -133,27 +137,32 @@ impl ConversationGuard { self.ensure_no_pending_commit().await?; let backend = self.crypto_provider().await?; let credential = self.credential().await?; - let signer = credential.signature_key(); + + let members = { + let guard = self.conversation().await; + guard + .group + .members() + .filter_map(|kp| { + clients + .iter() + .any(|client_id| client_id.borrow() == kp.credential.identity()) + .then_some(kp.index) + }) + .collect::>() + }; let (commit, welcome, group_info) = self - .conversation_mut(async |conversation| { - let members = conversation - .group - .members() - .filter_map(|kp| { - clients - .iter() - .any(move |client_id| client_id.borrow() == kp.credential.identity()) - .then_some(kp.index) - }) - .collect::>(); - - conversation - .group - .remove_members(&backend, signer, &members) - .await - .map_err(MlsError::wrap("group remove members")) - .map_err(Into::into) + .conversation_mut(|conversation| { + Box::pin(async move { + let signer = credential.signature_key(); + conversation + .group + .remove_members(&backend, signer, &members) + .await + .map_err(MlsError::wrap("group remove members")) + .map_err(Into::into) + }) }) .await?; @@ -194,36 +203,38 @@ impl ConversationGuard { let backend = self.crypto_provider().await?; let credential = credential.clone(); - self.conversation_mut(async move |conversation| { - // If the credential remains the same and we still want to update, we explicitly need to pass `None` to - // openmls, if we just passed an unchanged leaf node, no update commit would be created. - // Also, we can avoid cloning in the case we don't need to create a new leaf node. - let updated_leaf_node = { - let leaf_node = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?; - if leaf_node.credential() == &credential.mls_credential { - None - } else { - let mut leaf_node = leaf_node.clone(); - leaf_node.set_credential_with_key(credential.to_mls_credential_with_key()); - Some(leaf_node) - } - }; - - let (commit, welcome, group_info) = conversation - .group - .explicit_self_update(&backend, &credential.signature_key_pair, updated_leaf_node) - .await - .map_err(MlsError::wrap("group self update"))?; - - // We should always have ratchet tree extension turned on hence GroupInfo should always be present - let group_info = group_info.ok_or(LeafError::MissingGroupInfo)?; - let group_info = MlsGroupInfoBundle::try_new_full_plaintext(group_info)?; - - Ok(MlsCommitBundle { - welcome, - commit, - group_info, - encrypted_message: None, + self.conversation_mut(|conversation| { + Box::pin(async move { + // If the credential remains the same and we still want to update, we explicitly need to pass `None` to + // openmls, if we just passed an unchanged leaf node, no update commit would be created. + // Also, we can avoid cloning in the case we don't need to create a new leaf node. + let updated_leaf_node = { + let leaf_node = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?; + if leaf_node.credential() == &credential.mls_credential { + None + } else { + let mut leaf_node = leaf_node.clone(); + leaf_node.set_credential_with_key(credential.to_mls_credential_with_key()); + Some(leaf_node) + } + }; + + let (commit, welcome, group_info) = conversation + .group + .explicit_self_update(&backend, &credential.signature_key_pair, updated_leaf_node) + .await + .map_err(MlsError::wrap("group self update"))?; + + // We should always have ratchet tree extension turned on hence GroupInfo should always be present + let group_info = group_info.ok_or(LeafError::MissingGroupInfo)?; + let group_info = MlsGroupInfoBundle::try_new_full_plaintext(group_info)?; + + Ok(MlsCommitBundle { + welcome, + commit, + group_info, + encrypted_message: None, + }) }) }) .await @@ -240,21 +251,23 @@ impl ConversationGuard { } pub(crate) async fn commit_pending_proposals_inner(&mut self) -> Result> { - let session = &self.session().await?; - let provider = &self.crypto_provider().await?; + let session = self.session().await?; + let provider = self.crypto_provider().await?; if self.conversation().await.group().pending_proposals().next().is_none() { return Ok(None); } let (commit, welcome, openmls_group_info) = self - .conversation_mut(async |inner| { - let signer = &inner.find_current_credential(session).await?.signature_key_pair; - inner - .group - .commit_to_pending_proposals(provider, signer) - .await - .map_err(MlsError::wrap("group commit to pending proposals")) - .map_err(Into::into) + .conversation_mut(|inner| { + Box::pin(async move { + let signer = &inner.find_current_credential(&session).await?.signature_key_pair; + inner + .group + .commit_to_pending_proposals(&provider, signer) + .await + .map_err(MlsError::wrap("group commit to pending proposals")) + .map_err(Into::into) + }) }) .await?; let group_info = MlsGroupInfoBundle::try_new_full_plaintext( @@ -273,21 +286,23 @@ impl ConversationGuard { &mut self, proposals: Vec, ) -> Result> { - let session = &self.session().await?; - let provider = &self.crypto_provider().await?; + let session = self.session().await?; + let provider = self.crypto_provider().await?; if proposals.is_empty() { return Ok(None); } let (commit, welcome, openmls_group_info) = self - .conversation_mut(async |inner| { - let signer = &inner.find_current_credential(session).await?.signature_key_pair; - inner - .group - .commit_to_inline_proposals(provider, signer, proposals) - .await - .map_err(MlsError::wrap("group commit to pending proposals")) - .map_err(Into::into) + .conversation_mut(|inner| { + Box::pin(async move { + let signer = &inner.find_current_credential(&session).await?.signature_key_pair; + inner + .group + .commit_to_inline_proposals(&provider, signer, proposals) + .await + .map_err(MlsError::wrap("group commit to pending proposals")) + .map_err(Into::into) + }) }) .await?; let group_info = MlsGroupInfoBundle::try_new_full_plaintext( diff --git a/crypto/src/mls/conversation/conversation_guard/decrypt/mod.rs b/crypto/src/mls/conversation/conversation_guard/decrypt/mod.rs index 5466015593..710acb64b6 100644 --- a/crypto/src/mls/conversation/conversation_guard/decrypt/mod.rs +++ b/crypto/src/mls/conversation/conversation_guard/decrypt/mod.rs @@ -145,7 +145,7 @@ impl ConversationGuard { message: MlsMessageIn, recursion_policy: RecursionPolicy, ) -> Result { - let provider = &self.crypto_provider().await?; + let provider = self.crypto_provider().await?; let parsed_message = self.parse_message(message.clone()).await?; let message_result = self.process_message(parsed_message).await; @@ -158,9 +158,11 @@ impl ConversationGuard { })) = message_result { let mut decrypted_message = self - .conversation_mut(async |conversation| { - let ct = conversation.extract_confirmation_tag_from_own_commit(&message)?; - conversation.handle_own_commit(provider, ct).await + .conversation_mut(|conversation| { + Box::pin(async move { + let ct = conversation.extract_confirmation_tag_from_own_commit(&message)?; + conversation.handle_own_commit(&provider, ct).await + }) }) .await?; debug_assert!( @@ -219,15 +221,17 @@ impl ConversationGuard { } } ProcessedMessageContent::ProposalMessage(proposal) => { - self.conversation_mut(async move |conversation| { - info!( - group_id = conversation.id, - sender = Obfuscated::from(proposal.sender()), - proposals = Obfuscated::from(&proposal.proposal); - "Received proposal" - ); - conversation.group.store_pending_proposal(*proposal); - Ok(()) + self.conversation_mut(|conversation| { + Box::pin(async move { + info!( + group_id = conversation.id, + sender = Obfuscated::from(proposal.sender()), + proposals = Obfuscated::from(&proposal.proposal); + "Received proposal" + ); + conversation.group.store_pending_proposal(*proposal); + Ok(()) + }) }) .await?; @@ -276,37 +280,47 @@ impl ConversationGuard { ProcessedMessageContent::StagedCommitMessage(staged_commit) => { self.validate_commit(&staged_commit).await?; - let (is_active, delay, removed_members, added_members, group_id) = self - .conversation_mut(async |conversation| { - let removed_indices = staged_commit - .remove_proposals() - .map(|p| p.remove_proposal().removed()) - .collect::>(); - - let added_credentials = staged_commit - .add_proposals() - .map(|p| p.add_proposal().key_package.leaf_node().credential().to_owned()) - .collect::>(); - - let removed_members = Self::members_at_indices(removed_indices, conversation.group()); - - conversation - .group - .merge_staged_commit(provider, *staged_commit.clone()) - .await - .map_err(MlsError::wrap("merge staged commit"))?; - - let added_members = conversation - .group - .members() - .filter_map(|member| added_credentials.contains(&member.credential).then(|| member.clone())) - .collect::>(); - - let is_active = conversation.group.is_active(); - let delay = conversation.compute_next_commit_delay(); - let group_id = conversation.id.clone(); + let epoch = staged_commit.staged_context().epoch().as_u64(); + let queued_proposals: Vec = staged_commit + .queued_proposals() + .map(|p| format!("{:?}", Obfuscated::from(p))) + .collect(); - Ok((is_active, delay, removed_members, added_members, group_id)) + let (is_active, delay, removed_members, added_members, group_id) = self + .conversation_mut(|conversation| { + Box::pin(async move { + let removed_indices = staged_commit + .remove_proposals() + .map(|p| p.remove_proposal().removed()) + .collect::>(); + + let added_credentials = staged_commit + .add_proposals() + .map(|p| p.add_proposal().key_package.leaf_node().credential().to_owned()) + .collect::>(); + + let removed_members = Self::members_at_indices(removed_indices, conversation.group()); + + conversation + .group + .merge_staged_commit(&provider, *staged_commit.clone()) + .await + .map_err(MlsError::wrap("merge staged commit"))?; + + let added_members = conversation + .group + .members() + .filter_map(|member| { + added_credentials.contains(&member.credential).then(|| member.clone()) + }) + .collect::>(); + + let is_active = conversation.group.is_active(); + let delay = conversation.compute_next_commit_delay(); + let group_id = conversation.id.clone(); + + Ok((is_active, delay, removed_members, added_members, group_id)) + }) }) .await?; @@ -316,14 +330,13 @@ impl ConversationGuard { buffered_messages = self.restore_and_clear_pending_messages().await?; } - let epoch = staged_commit.staged_context().epoch().as_u64(); info!( added = Obfuscated::from(&added_members), removed = Obfuscated::from(&removed_members), group_id, epoch, - proposals:? = staged_commit.queued_proposals().map(Obfuscated::from).collect::>(); - "Epoch advanced" + proposals:? = queued_proposals; + "Epoch Advanced" ); self.central_context .queue_epoch_changed(group_id, epoch) @@ -341,14 +354,16 @@ impl ConversationGuard { } ProcessedMessageContent::ExternalJoinProposalMessage(proposal) => { let delay = self - .conversation_mut(async move |conversation| { - info!( - group_id = conversation.id, - sender = Obfuscated::from(proposal.sender()); - "Received external join proposal" - ); - conversation.group.store_pending_proposal(*proposal); - Ok(conversation.compute_next_commit_delay()) + .conversation_mut(|conversation| { + Box::pin(async move { + info!( + group_id = conversation.id, + sender = Obfuscated::from(proposal.sender()); + "Received external join proposal" + ); + conversation.group.store_pending_proposal(*proposal); + Ok(conversation.compute_next_commit_delay()) + }) }) .await?; @@ -410,53 +425,55 @@ impl ConversationGuard { ) -> Result { let msg_epoch = protocol_message.epoch().as_u64(); let backend = self.crypto_provider().await?; - self.conversation_mut(async move |conversation| { - let group_epoch = conversation.group.epoch().as_u64(); - let processed_msg = conversation - .group - .process_message(&backend, protocol_message) - .await - .map_err(|e| match e { - ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt( - MessageDecryptionError::GenerationOutOfBound, - )) => Error::DuplicateMessage, - ProcessMessageError::ValidationError(ValidationError::WrongEpoch) => { - if is_duplicate { - Error::DuplicateMessage - } else if msg_epoch == group_epoch + 1 { - // limit to next epoch otherwise if we were buffering a commit for epoch + 2 - // we would fail when trying to decrypt it in [MlsCentral::commit_accepted] - - // We need to buffer the message until the group has advanced to the right - // epoch. We can't do that here--we don't have the appropriate data in scope - // --but we can at least produce the proper error and return that, so our - // caller can handle it. Our caller needs to know about the epoch number, so - // we pass it back inside the error. - Error::BufferedFutureMessage { - message_epoch: msg_epoch, - } - } else if msg_epoch < group_epoch { - match content_type { - ContentType::Application => Error::StaleMessage, - ContentType::Commit => Error::StaleCommit, - ContentType::Proposal => Error::StaleProposal, + self.conversation_mut(|conversation| { + Box::pin(async move { + let group_epoch = conversation.group.epoch().as_u64(); + let processed_msg = conversation + .group + .process_message(&backend, protocol_message) + .await + .map_err(|e| match e { + ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt( + MessageDecryptionError::GenerationOutOfBound, + )) => Error::DuplicateMessage, + ProcessMessageError::ValidationError(ValidationError::WrongEpoch) => { + if is_duplicate { + Error::DuplicateMessage + } else if msg_epoch == group_epoch + 1 { + // limit to next epoch otherwise if we were buffering a commit for epoch + 2 + // we would fail when trying to decrypt it in [MlsCentral::commit_accepted] + + // We need to buffer the message until the group has advanced to the right + // epoch. We can't do that here--we don't have the appropriate data in scope + // --but we can at least produce the proper error and return that, so our + // caller can handle it. Our caller needs to know about the epoch number, so + // we pass it back inside the error. + Error::BufferedFutureMessage { + message_epoch: msg_epoch, + } + } else if msg_epoch < group_epoch { + match content_type { + ContentType::Application => Error::StaleMessage, + ContentType::Commit => Error::StaleCommit, + ContentType::Proposal => Error::StaleProposal, + } + } else { + Error::UnbufferedFarFutureMessage } - } else { - Error::UnbufferedFarFutureMessage } - } - ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt( - MessageDecryptionError::AeadError, - )) => Error::DecryptionError, - ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt( - MessageDecryptionError::SecretTreeError(SecretTreeError::TooDistantInThePast), - )) => Error::MessageEpochTooOld, - _ => MlsError::wrap("processing message")(e).into(), - })?; - if is_duplicate { - return Err(Error::DuplicateMessage); - } - Ok(processed_msg) + ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt( + MessageDecryptionError::AeadError, + )) => Error::DecryptionError, + ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt( + MessageDecryptionError::SecretTreeError(SecretTreeError::TooDistantInThePast), + )) => Error::MessageEpochTooOld, + _ => MlsError::wrap("processing message")(e).into(), + })?; + if is_duplicate { + return Err(Error::DuplicateMessage); + } + Ok(processed_msg) + }) }) .await } @@ -820,9 +837,11 @@ mod tests { conversation .guard() .await - .conversation_mut(async |conv| { - conv.group.clear_pending_proposals(); - Ok(()) + .conversation_mut(|conv| { + Box::pin(async move { + conv.group.clear_pending_proposals(); + Ok(()) + }) }) .await .unwrap(); diff --git a/crypto/src/mls/conversation/conversation_guard/encrypt.rs b/crypto/src/mls/conversation/conversation_guard/encrypt.rs index 8923ffaa77..ca13da736e 100644 --- a/crypto/src/mls/conversation/conversation_guard/encrypt.rs +++ b/crypto/src/mls/conversation/conversation_guard/encrypt.rs @@ -19,23 +19,26 @@ impl ConversationGuard { /// If the conversation can't be found, an error will be returned. Other errors are originating /// from OpenMls and the KeyStore pub async fn encrypt_message(&mut self, message: impl AsRef<[u8]>) -> Result> { + let message = message.as_ref().to_vec(); let backend = self.crypto_provider().await?; let credential = self.credential().await?; - self.conversation_mut(async move |conversation| { - let signer = credential.signature_key(); - let encrypted = conversation - .group - .create_message(&backend, signer, message.as_ref()) - .map_err(MlsError::wrap("creating message"))?; + self.conversation_mut(|conversation| { + Box::pin(async move { + let signer = credential.signature_key(); + let encrypted = conversation + .group + .create_message(&backend, signer, &message) + .map_err(MlsError::wrap("creating message"))?; - // make sure all application messages are encrypted - debug_assert!(matches!(encrypted.body, MlsMessageOutBody::PrivateMessage(_))); + // make sure all application messages are encrypted + debug_assert!(matches!(encrypted.body, MlsMessageOutBody::PrivateMessage(_))); - encrypted - .to_bytes() - .map_err(MlsError::wrap("constructing byte vector of encrypted message")) - .map_err(Into::into) + encrypted + .to_bytes() + .map_err(MlsError::wrap("constructing byte vector of encrypted message")) + .map_err(Into::into) + }) }) .await } diff --git a/crypto/src/mls/conversation/conversation_guard/history_sharing.rs b/crypto/src/mls/conversation/conversation_guard/history_sharing.rs index dfbdebaec1..70987a359e 100644 --- a/crypto/src/mls/conversation/conversation_guard/history_sharing.rs +++ b/crypto/src/mls/conversation/conversation_guard/history_sharing.rs @@ -155,26 +155,30 @@ impl ConversationGuard { let key_package = history_secret.key_package.clone().into(); let remove_and_add = self - .conversation_mut(async move |conversation| { - // Propose to remove the old history client - for history_client in existing_history_clients { - conversation.propose_remove_member(&session, history_client).await?; - } - - // Propose to add a new history client - conversation.propose_add_member(&session, key_package).await?; - - // We're getting the proposals we just created from the pending proposals queue, as the previously - // called `propose_remove()` and `propose_add()` pushed them to that queue as a side effect. - let remove_and_add = conversation - .group() - .pending_proposals() - .filter(|&p| matches!(p.sender(), Sender::Member(i) if i == &conversation.group().own_leaf_index())) - .map(|proposal| proposal.proposal()) - .cloned() - .collect::>(); - - Ok(remove_and_add) + .conversation_mut(|conversation| { + Box::pin(async move { + // Propose to remove the old history client + for history_client in existing_history_clients { + conversation.propose_remove_member(&session, history_client).await?; + } + + // Propose to add a new history client + conversation.propose_add_member(&session, key_package).await?; + + // We're getting the proposals we just created from the pending proposals queue, as the previously + // called `propose_remove()` and `propose_add()` pushed them to that queue as a side effect. + let remove_and_add = conversation + .group() + .pending_proposals() + .filter( + |&p| matches!(p.sender(), Sender::Member(i) if i == &conversation.group().own_leaf_index()), + ) + .map(|proposal| proposal.proposal()) + .cloned() + .collect::>(); + + Ok(remove_and_add) + }) }) .await?; diff --git a/crypto/src/mls/conversation/conversation_guard/merge.rs b/crypto/src/mls/conversation/conversation_guard/merge.rs index 7310c837bd..9bb61711b3 100644 --- a/crypto/src/mls/conversation/conversation_guard/merge.rs +++ b/crypto/src/mls/conversation/conversation_guard/merge.rs @@ -28,15 +28,17 @@ impl ConversationGuard { /// When there is no pending commit pub(crate) async fn clear_pending_commit(&mut self) -> Result<()> { let database = self.database().await?; - self.conversation_mut(async |conversation| { - if conversation.group.pending_commit().is_some() { - conversation.group.clear_pending_commit(); - conversation.persist_group_when_changed(&database, true).await?; - log::info!(group_id = conversation.id(); "Cleared pending commit."); - Ok(()) - } else { - Err(Error::PendingCommitNotFound) - } + self.conversation_mut(|conversation| { + Box::pin(async move { + if conversation.group.pending_commit().is_some() { + conversation.group.clear_pending_commit(); + conversation.persist_group_when_changed(&database, true).await?; + log::info!(group_id = conversation.id(); "Cleared pending commit."); + Ok(()) + } else { + Err(Error::PendingCommitNotFound) + } + }) }) .await } diff --git a/crypto/src/mls/conversation/merge.rs b/crypto/src/mls/conversation/merge.rs index f3636a40bc..2427bba848 100644 --- a/crypto/src/mls/conversation/merge.rs +++ b/crypto/src/mls/conversation/merge.rs @@ -57,13 +57,12 @@ mod tests { assert!(conversation.has_pending_commit().await); + let provider = alice.session().await.crypto_provider.clone(); conversation .guard() .await - .conversation_mut(async |conversation| { - conversation - .commit_accepted(&alice.session().await.crypto_provider) - .await + .conversation_mut(|conversation| { + Box::pin(async move { conversation.commit_accepted(&provider).await }) }) .await .unwrap(); @@ -91,13 +90,12 @@ mod tests { assert!(conversation.has_pending_proposals().await); assert!(conversation.has_pending_commit().await); + let provider = alice.session().await.crypto_provider.clone(); conversation .guard() .await - .conversation_mut(async |conversation| { - conversation - .commit_accepted(&alice.session().await.crypto_provider) - .await + .conversation_mut(|conversation| { + Box::pin(async move { conversation.commit_accepted(&provider).await }) }) .await .unwrap(); diff --git a/crypto/src/test_utils/test_conversation/proposal.rs b/crypto/src/test_utils/test_conversation/proposal.rs index 610bd88da4..3b12bab609 100644 --- a/crypto/src/test_utils/test_conversation/proposal.rs +++ b/crypto/src/test_utils/test_conversation/proposal.rs @@ -17,9 +17,11 @@ impl<'a> TestConversation<'a> { let proposer = self.actor(); let key_package = new_member.new_keypackage(self.case).await; let mut guard = self.guard().await; - let session = &proposer.session().await; + let session = proposer.session().await; let proposal = guard - .conversation_mut(async |conversation| conversation.propose_add_member(session, key_package.into()).await) + .conversation_mut(|conversation| { + Box::pin(async move { conversation.propose_add_member(&session, key_package.into()).await }) + }) .await .unwrap(); let proposer_index = self.member_index(proposer).await; @@ -42,18 +44,23 @@ impl<'a> TestConversation<'a> { pub async fn remove_proposal(self, member: &'a SessionContext) -> OperationGuard<'a, Proposal> { let proposer = self.actor(); let mut guard = self.guard().await; - let session = &proposer.session().await; + let session = proposer.session().await; let member_id = member.session().await.id(); let proposal = guard - .conversation_mut(async |conversation| { - let member_index = conversation - .group - .members() - .find(|member| member.credential.identity() == member_id.as_slice()) - .map(|member| member.index) - .unwrap(); - let proposal = conversation.propose_remove_member(session, member_index).await.unwrap(); - Ok(proposal) + .conversation_mut(|conversation| { + Box::pin(async move { + let member_index = conversation + .group + .members() + .find(|member| member.credential.identity() == member_id.as_slice()) + .map(|member| member.index) + .unwrap(); + let proposal = conversation + .propose_remove_member(&session, member_index) + .await + .unwrap(); + Ok(proposal) + }) }) .await .unwrap(); diff --git a/crypto/src/transaction_context/conversation/external_commit.rs b/crypto/src/transaction_context/conversation/external_commit.rs index 406e8f53cc..84c97bb933 100644 --- a/crypto/src/transaction_context/conversation/external_commit.rs +++ b/crypto/src/transaction_context/conversation/external_commit.rs @@ -390,18 +390,20 @@ mod tests { let credential = alice.find_any_credential(ciphersuite.into(), credential_type).await; let mls_provider = alice.transaction.mls_provider().await.unwrap(); guard - .conversation_mut(async move |conversation| { - let gi = conversation - .group - .export_group_info( - &mls_provider, - &credential.signature_key_pair, - // joining by external commit assumes we include a ratchet tree, but this `false` - // says to leave it out - false, - ) - .unwrap(); - Ok(gi.group_info().unwrap()) + .conversation_mut(|conversation| { + Box::pin(async move { + let gi = conversation + .group + .export_group_info( + &mls_provider, + &credential.signature_key_pair, + // joining by external commit assumes we include a ratchet tree, but this `false` + // says to leave it out + false, + ) + .unwrap(); + Ok(gi.group_info().unwrap()) + }) }) .await .unwrap()