Skip to content

Commit 47af758

Browse files
committed
feat: optimize message routing by checking local sessions before federation
1 parent 020c891 commit 47af758

2 files changed

Lines changed: 19 additions & 5 deletions

File tree

src/relay/api.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -383,11 +383,23 @@ impl<M: Metrics, D: TwinDB> Session<M, D> {
383383
anyhow::bail!("message with missing destination");
384384
}
385385

386-
// todo(sameh):
386+
// it's safe to update the local cache since we already authenticated
387+
// the twin hence we trust their information.
388+
update_cache_relays(envelope, &self.twins).await?;
389+
387390
// check if the dst twin id is already connected locally
388391
// if so, we don't have to check federation and directly
389392
// switch the message
393+
if self.switch.is_local(&dst).await {
394+
log::debug!("found local session for '{}', forwarding message", dst);
395+
if let Err(err) = self.switch.send(&dst, &msg).await {
396+
log::error!("failed to route message to peer '{}': {}", dst, err);
397+
}
398+
return Ok(());
399+
}
390400

401+
// if destination is not local, we need to check if we need to federate
402+
// or not.
391403
// instead of sending the message directly to the switch
392404
// we check federation information attached to the envelope
393405
// if federation is not empty and does not match the domain
@@ -399,10 +411,6 @@ impl<M: Metrics, D: TwinDB> Session<M, D> {
399411
.await?
400412
.ok_or_else(|| anyhow::Error::msg("unknown twin destination"))?;
401413

402-
// it's safe to update the local cache since we already authenticated
403-
// the twin hence we trust their information.
404-
update_cache_relays(envelope, &self.twins).await?;
405-
406414
if !twin
407415
.relay
408416
.ok_or_else(|| anyhow::Error::msg("relay info is not set for this twin"))?
@@ -412,6 +420,7 @@ impl<M: Metrics, D: TwinDB> Session<M, D> {
412420
// push message to the (relay.federation) queue
413421
return Ok(self.federator.send(&msg).await?);
414422
}
423+
415424
// we don't return an error because when we return an error
416425
// we will send this error back to the sender user. Hence
417426
// calling the switch.send again

src/relay/switch/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,11 @@ where
275275
) -> Result<MessageID> {
276276
send(id.as_ref(), &self.pool, msg.as_ref()).await
277277
}
278+
279+
/// checks if a session is connected locally
280+
pub async fn is_local(&self, id: &SessionID) -> bool {
281+
self.sessions.lock().await.contains_key(id)
282+
}
278283
}
279284

280285
#[derive(Clone)]

0 commit comments

Comments
 (0)