From 260f7ffbe557d84ae400f152c4fc3c9980eb4b27 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 20 May 2025 12:07:45 +0100 Subject: [PATCH 1/2] feat: [#1523] add new metric: number of inactive peers The metric is added to the `torrent-repository` package. The metric in Prometheus format: ``` torrent_repository_peers_inactive_total{} 0 ``` It was not included as a new label in the number of peers because it can't be calculated from current events. New inactivity events could have been added but the solution was much more complex than this and having two metrics counting peers is not so bad. The discarded alternative was addinga new label por satte (`active`, `inactive`). --- Cargo.lock | 1 + packages/torrent-repository/Cargo.toml | 1 + .../torrent-repository/src/statistics/mod.rs | 8 +++ .../src/statistics/peers_inactivity_update.rs | 72 +++++++++++++++++++ .../src/statistics/repository.rs | 25 +++++++ packages/torrent-repository/src/swarm.rs | 24 +++++++ packages/torrent-repository/src/swarms.rs | 28 ++++++++ packages/tracker-core/src/torrent/manager.rs | 10 ++- src/app.rs | 19 ++++- src/bootstrap/jobs/mod.rs | 1 + src/bootstrap/jobs/peers_inactivity_update.rs | 27 +++++++ 11 files changed, 211 insertions(+), 5 deletions(-) create mode 100644 packages/torrent-repository/src/statistics/peers_inactivity_update.rs create mode 100644 src/bootstrap/jobs/peers_inactivity_update.rs diff --git a/Cargo.lock b/Cargo.lock index ab898e327..6e4ab415f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4832,6 +4832,7 @@ dependencies = [ "aquatic_udp_protocol", "async-std", "bittorrent-primitives", + "chrono", "criterion", "crossbeam-skiplist", "futures", diff --git a/packages/torrent-repository/Cargo.toml b/packages/torrent-repository/Cargo.toml index 98ae5817d..510a59e9d 100644 --- a/packages/torrent-repository/Cargo.toml +++ b/packages/torrent-repository/Cargo.toml @@ -18,6 +18,7 @@ version.workspace = true [dependencies] aquatic_udp_protocol = "0" bittorrent-primitives = "0.1.0" +chrono = { version = "0", default-features = false, features = ["clock"] } crossbeam-skiplist = "0" futures = "0" serde = { version = "1.0.219", features = ["derive"] } diff --git a/packages/torrent-repository/src/statistics/mod.rs b/packages/torrent-repository/src/statistics/mod.rs index 7d3ad85ce..0f8a839ca 100644 --- a/packages/torrent-repository/src/statistics/mod.rs +++ b/packages/torrent-repository/src/statistics/mod.rs @@ -1,5 +1,6 @@ pub mod event; pub mod metrics; +pub mod peers_inactivity_update; pub mod repository; use metrics::Metrics; @@ -23,6 +24,7 @@ const TORRENT_REPOSITORY_PEERS_UPDATED_TOTAL: &str = "torrent_repository_peers_u const TORRENT_REPOSITORY_PEER_CONNECTIONS_TOTAL: &str = "torrent_repository_peer_connections_total"; const TORRENT_REPOSITORY_UNIQUE_PEERS_TOTAL: &str = "torrent_repository_unique_peers_total"; // todo: not implemented yet +const TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL: &str = "torrent_repository_peers_inactive_total"; #[must_use] pub fn describe_metrics() -> Metrics { @@ -88,5 +90,11 @@ pub fn describe_metrics() -> Metrics { Some(&MetricDescription::new("The total number of unique peers.")), ); + metrics.metric_collection.describe_gauge( + &metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL), + Some(Unit::Count), + Some(&MetricDescription::new("The total number of inactive peers.")), + ); + metrics } diff --git a/packages/torrent-repository/src/statistics/peers_inactivity_update.rs b/packages/torrent-repository/src/statistics/peers_inactivity_update.rs new file mode 100644 index 000000000..e388173a1 --- /dev/null +++ b/packages/torrent-repository/src/statistics/peers_inactivity_update.rs @@ -0,0 +1,72 @@ +//! Job that runs a task on intervals to update peers' inactivity metrics. +use std::sync::Arc; + +use chrono::Utc; +use tokio::task::JoinHandle; +use torrust_tracker_clock::clock::Time; +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::metric_name; +use torrust_tracker_primitives::DurationSinceUnixEpoch; +use tracing::instrument; + +use super::repository::Repository; +use crate::statistics::TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL; +use crate::{CurrentClock, Swarms}; + +#[must_use] +#[instrument(skip(swarms, stats_repository))] +pub fn start_job( + swarms: &Arc, + stats_repository: &Arc, + inactivity_cutoff: DurationSinceUnixEpoch, +) -> JoinHandle<()> { + let weak_swarms = std::sync::Arc::downgrade(swarms); + let weak_stats_repository = std::sync::Arc::downgrade(stats_repository); + + let interval_in_secs = 15; // todo: make this configurable + + tokio::spawn(async move { + let interval = std::time::Duration::from_secs(interval_in_secs); + let mut interval = tokio::time::interval(interval); + interval.tick().await; + + loop { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::info!("Stopping peers inactivity metrics update job ..."); + break; + } + _ = interval.tick() => { + if let (Some(swarms), Some(stats_repository)) = (weak_swarms.upgrade(), weak_stats_repository.upgrade()) { + let start_time = Utc::now().time(); + + tracing::debug!("Updating peers inactivity metrics (executed every {} secs) ...", interval_in_secs); + + let inactive_peers_total = swarms.count_inactive_peers(inactivity_cutoff).await; + + tracing::info!(inactive_peers_total = inactive_peers_total); + + #[allow(clippy::cast_precision_loss)] + let inactive_peers_total = inactive_peers_total as f64; + + let _unused = stats_repository + .set_gauge( + &metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL), + &LabelSet::default(), + inactive_peers_total, + CurrentClock::now(), + ) + .await; + + tracing::debug!( + "Peers inactivity metrics updated in {} ms", + (Utc::now().time() - start_time).num_milliseconds() + ); + } else { + break; + } + } + } + } + }) +} diff --git a/packages/torrent-repository/src/statistics/repository.rs b/packages/torrent-repository/src/statistics/repository.rs index 1e376faf7..fe1292d00 100644 --- a/packages/torrent-repository/src/statistics/repository.rs +++ b/packages/torrent-repository/src/statistics/repository.rs @@ -57,6 +57,31 @@ impl Repository { result } + /// # Errors + /// + /// This function will return an error if the metric collection fails to + /// set the gauge. + pub async fn set_gauge( + &self, + metric_name: &MetricName, + labels: &LabelSet, + value: f64, + now: DurationSinceUnixEpoch, + ) -> Result<(), Error> { + let mut stats_lock = self.stats.write().await; + + let result = stats_lock.set_gauge(metric_name, labels, value, now); + + drop(stats_lock); + + match result { + Ok(()) => {} + Err(ref err) => tracing::error!("Failed to set the gauge: {}", err), + } + + result + } + /// # Errors /// /// This function will return an error if the metric collection fails to diff --git a/packages/torrent-repository/src/swarm.rs b/packages/torrent-repository/src/swarm.rs index f25304979..d7a1ede87 100644 --- a/packages/torrent-repository/src/swarm.rs +++ b/packages/torrent-repository/src/swarm.rs @@ -118,6 +118,14 @@ impl Swarm { (seeders, leechers) } + #[must_use] + pub fn count_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> usize { + self.peers + .iter() + .filter(|(_, peer)| peer::ReadInfo::get_updated(&**peer) <= current_cutoff) + .count() + } + #[must_use] pub fn len(&self) -> usize { self.peers.len() @@ -435,6 +443,22 @@ mod tests { assert_eq!(swarm.peers_excluding(&peer2.peer_addr, None), [Arc::new(peer1)]); } + #[tokio::test] + async fn it_should_count_inactive_peers() { + let mut swarm = Swarm::new(&sample_info_hash(), 0, None); + let mut downloads_increased = false; + let one_second = DurationSinceUnixEpoch::new(1, 0); + + // Insert the peer + let last_update_time = DurationSinceUnixEpoch::new(1_669_397_478_934, 0); + let peer = PeerBuilder::default().last_updated_on(last_update_time).build(); + swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + + let inactive_peers_total = swarm.count_inactive_peers(last_update_time + one_second); + + assert_eq!(inactive_peers_total, 1); + } + #[tokio::test] async fn it_should_remove_inactive_peers() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); diff --git a/packages/torrent-repository/src/swarms.rs b/packages/torrent-repository/src/swarms.rs index ac2490853..811bf6a50 100644 --- a/packages/torrent-repository/src/swarms.rs +++ b/packages/torrent-repository/src/swarms.rs @@ -248,6 +248,18 @@ impl Swarms { } } + /// Counts the number of inactive peers across all torrents. + pub async fn count_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> usize { + let mut inactive_peers_total = 0; + + for swarm_handle in &self.swarms { + let swarm = swarm_handle.value().lock().await; + inactive_peers_total += swarm.count_inactive_peers(current_cutoff); + } + + inactive_peers_total + } + /// Removes inactive peers from all torrent entries. /// /// A peer is considered inactive if its last update timestamp is older than @@ -705,6 +717,22 @@ mod tests { assert!(swarms.get(&info_hash).is_none()); } + #[tokio::test] + async fn it_should_count_inactive_peers() { + let swarms = Arc::new(Swarms::default()); + + let info_hash = sample_info_hash(); + let mut peer = sample_peer(); + peer.updated = DurationSinceUnixEpoch::new(0, 0); + + swarms.handle_announcement(&info_hash, &peer, None).await.unwrap(); + + // Cut off time is 1 second after the peer was updated + let inactive_peers_total = swarms.count_inactive_peers(peer.updated.add(Duration::from_secs(1))).await; + + assert_eq!(inactive_peers_total, 1); + } + #[tokio::test] async fn it_should_remove_peers_that_have_not_been_updated_after_a_cutoff_time() { let swarms = Arc::new(Swarms::default()); diff --git a/packages/tracker-core/src/torrent/manager.rs b/packages/tracker-core/src/torrent/manager.rs index bc193bd4f..bf73f7e8b 100644 --- a/packages/tracker-core/src/torrent/manager.rs +++ b/packages/tracker-core/src/torrent/manager.rs @@ -4,6 +4,7 @@ use std::time::Duration; use torrust_tracker_clock::clock::Time; use torrust_tracker_configuration::Core; +use torrust_tracker_primitives::DurationSinceUnixEpoch; use super::repository::in_memory::InMemoryTorrentRepository; use super::repository::persisted::DatabasePersistentTorrentRepository; @@ -103,10 +104,13 @@ impl TorrentsManager { } async fn remove_inactive_peers(&self) { - let current_cutoff = CurrentClock::now_sub(&Duration::from_secs(u64::from(self.config.tracker_policy.max_peer_timeout))) - .unwrap_or_default(); + self.in_memory_torrent_repository + .remove_inactive_peers(self.current_cutoff()) + .await; + } - self.in_memory_torrent_repository.remove_inactive_peers(current_cutoff).await; + fn current_cutoff(&self) -> DurationSinceUnixEpoch { + CurrentClock::now_sub(&Duration::from_secs(u64::from(self.config.tracker_policy.max_peer_timeout))).unwrap_or_default() } async fn remove_peerless_torrents(&self) { diff --git a/src/app.rs b/src/app.rs index ca8b7a5c3..1c2d9387e 100644 --- a/src/app.rs +++ b/src/app.rs @@ -27,7 +27,9 @@ use torrust_tracker_configuration::{Configuration, HttpTracker, UdpTracker}; use tracing::instrument; use crate::bootstrap::jobs::manager::JobManager; -use crate::bootstrap::jobs::{self, health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker}; +use crate::bootstrap::jobs::{ + self, health_check_api, http_tracker, peers_inactivity_update, torrent_cleanup, tracker_apis, udp_tracker, +}; use crate::bootstrap::{self}; use crate::container::AppContainer; @@ -79,8 +81,11 @@ async fn start_jobs(config: &Configuration, app_container: &Arc) - start_the_udp_instances(config, app_container, &mut job_manager).await; start_the_http_instances(config, app_container, &mut job_manager).await; - start_the_http_api(config, app_container, &mut job_manager).await; + start_torrent_cleanup(config, app_container, &mut job_manager); + start_peers_inactivity_update(config, app_container, &mut job_manager); + + start_the_http_api(config, app_container, &mut job_manager).await; start_health_check_api(config, app_container, &mut job_manager).await; job_manager @@ -260,6 +265,16 @@ fn start_torrent_cleanup(config: &Configuration, app_container: &Arc, job_manager: &mut JobManager) { + if config.core.tracker_usage_statistics { + let handle = peers_inactivity_update::start_job(config, app_container); + + job_manager.push("peers_inactivity_update", handle); + } else { + tracing::info!("Peers inactivity update job is disabled."); + } +} + async fn start_health_check_api(config: &Configuration, app_container: &Arc, job_manager: &mut JobManager) { let handle = health_check_api::start_job(&config.health_check_api, app_container.registar.entries()).await; diff --git a/src/bootstrap/jobs/mod.rs b/src/bootstrap/jobs/mod.rs index b311c6da6..f593ce808 100644 --- a/src/bootstrap/jobs/mod.rs +++ b/src/bootstrap/jobs/mod.rs @@ -10,6 +10,7 @@ pub mod health_check_api; pub mod http_tracker; pub mod http_tracker_core; pub mod manager; +pub mod peers_inactivity_update; pub mod torrent_cleanup; pub mod torrent_repository; pub mod tracker_apis; diff --git a/src/bootstrap/jobs/peers_inactivity_update.rs b/src/bootstrap/jobs/peers_inactivity_update.rs new file mode 100644 index 000000000..e7939720c --- /dev/null +++ b/src/bootstrap/jobs/peers_inactivity_update.rs @@ -0,0 +1,27 @@ +//! Job that runs a task on intervals to update peers' inactivity metrics. +use std::sync::Arc; +use std::time::Duration; + +use tokio::task::JoinHandle; +use torrust_tracker_clock::clock::Time; +use torrust_tracker_configuration::Configuration; + +use crate::container::AppContainer; +use crate::CurrentClock; + +#[must_use] +pub fn start_job(config: &Configuration, app_container: &Arc) -> JoinHandle<()> { + torrust_tracker_torrent_repository::statistics::peers_inactivity_update::start_job( + &app_container.torrent_repository_container.swarms.clone(), + &app_container.torrent_repository_container.stats_repository.clone(), + peer_inactivity_cutoff_timestamp(config.core.tracker_policy.max_peer_timeout), + ) +} + +/// Returns the timestamp of the cutoff for inactive peers. +/// +/// Peers that has not been updated for more than `max_peer_timeout` seconds are +/// considered inactive. +fn peer_inactivity_cutoff_timestamp(max_peer_timeout: u32) -> Duration { + CurrentClock::now_sub(&Duration::from_secs(u64::from(max_peer_timeout))).unwrap_or_default() +} From 677deacdc419526122eff62973f2685ac976a5eb Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 21 May 2025 12:34:12 +0100 Subject: [PATCH 2/2] feat: [#1523] add new metric: number of inactive torrents --- .../statistics/activity_metrics_updater.rs | 104 ++++++++++++++++++ .../torrent-repository/src/statistics/mod.rs | 9 +- .../src/statistics/peers_inactivity_update.rs | 72 ------------ packages/torrent-repository/src/swarm.rs | 35 ++++++ packages/torrent-repository/src/swarms.rs | 51 +++++++++ src/app.rs | 4 +- ..._update.rs => activity_metrics_updater.rs} | 4 +- src/bootstrap/jobs/mod.rs | 2 +- 8 files changed, 203 insertions(+), 78 deletions(-) create mode 100644 packages/torrent-repository/src/statistics/activity_metrics_updater.rs delete mode 100644 packages/torrent-repository/src/statistics/peers_inactivity_update.rs rename src/bootstrap/jobs/{peers_inactivity_update.rs => activity_metrics_updater.rs} (84%) diff --git a/packages/torrent-repository/src/statistics/activity_metrics_updater.rs b/packages/torrent-repository/src/statistics/activity_metrics_updater.rs new file mode 100644 index 000000000..2dfa5fb4e --- /dev/null +++ b/packages/torrent-repository/src/statistics/activity_metrics_updater.rs @@ -0,0 +1,104 @@ +//! Job that runs a task on intervals to update peers' activity metrics. +use std::sync::Arc; + +use chrono::Utc; +use tokio::task::JoinHandle; +use torrust_tracker_clock::clock::Time; +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::metric_name; +use torrust_tracker_primitives::DurationSinceUnixEpoch; +use tracing::instrument; + +use super::repository::Repository; +use crate::statistics::{TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL, TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL}; +use crate::{CurrentClock, Swarms}; + +#[must_use] +#[instrument(skip(swarms, stats_repository))] +pub fn start_job( + swarms: &Arc, + stats_repository: &Arc, + inactivity_cutoff: DurationSinceUnixEpoch, +) -> JoinHandle<()> { + let weak_swarms = std::sync::Arc::downgrade(swarms); + let weak_stats_repository = std::sync::Arc::downgrade(stats_repository); + + let interval_in_secs = 15; // todo: make this configurable + + tokio::spawn(async move { + let interval = std::time::Duration::from_secs(interval_in_secs); + let mut interval = tokio::time::interval(interval); + interval.tick().await; + + loop { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::info!("Stopping peers activity metrics update job (ctrl-c signal received) ..."); + break; + } + _ = interval.tick() => { + if let (Some(swarms), Some(stats_repository)) = (weak_swarms.upgrade(), weak_stats_repository.upgrade()) { + update_activity_metrics(interval_in_secs, &swarms, &stats_repository, inactivity_cutoff).await; + } else { + tracing::info!("Stopping peers activity metrics update job (can't upgrade weak pointers) ..."); + break; + } + } + } + } + }) +} + +async fn update_activity_metrics( + interval_in_secs: u64, + swarms: &Arc, + stats_repository: &Arc, + inactivity_cutoff: DurationSinceUnixEpoch, +) { + let start_time = Utc::now().time(); + + tracing::debug!( + "Updating peers and torrents activity metrics (executed every {} secs) ...", + interval_in_secs + ); + + let activity_metadata = swarms.get_activity_metadata(inactivity_cutoff).await; + + activity_metadata.log(); + + update_inactive_peers_total(stats_repository, activity_metadata.inactive_peers_total).await; + update_inactive_torrents_total(stats_repository, activity_metadata.inactive_torrents_total).await; + + tracing::debug!( + "Peers and torrents activity metrics updated in {} ms", + (Utc::now().time() - start_time).num_milliseconds() + ); +} + +async fn update_inactive_peers_total(stats_repository: &Arc, inactive_peers_total: usize) { + #[allow(clippy::cast_precision_loss)] + let inactive_peers_total = inactive_peers_total as f64; + + let _unused = stats_repository + .set_gauge( + &metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL), + &LabelSet::default(), + inactive_peers_total, + CurrentClock::now(), + ) + .await; +} + +async fn update_inactive_torrents_total(stats_repository: &Arc, inactive_torrents_total: usize) { + #[allow(clippy::cast_precision_loss)] + let inactive_torrents_total = inactive_torrents_total as f64; + + let _unused = stats_repository + .set_gauge( + &metric_name!(TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL), + &LabelSet::default(), + inactive_torrents_total, + CurrentClock::now(), + ) + .await; +} diff --git a/packages/torrent-repository/src/statistics/mod.rs b/packages/torrent-repository/src/statistics/mod.rs index 0f8a839ca..cfc252e34 100644 --- a/packages/torrent-repository/src/statistics/mod.rs +++ b/packages/torrent-repository/src/statistics/mod.rs @@ -1,6 +1,6 @@ +pub mod activity_metrics_updater; pub mod event; pub mod metrics; -pub mod peers_inactivity_update; pub mod repository; use metrics::Metrics; @@ -15,6 +15,7 @@ const TORRENT_REPOSITORY_TORRENTS_REMOVED_TOTAL: &str = "torrent_repository_torr const TORRENT_REPOSITORY_TORRENTS_TOTAL: &str = "torrent_repository_torrents_total"; const TORRENT_REPOSITORY_TORRENTS_DOWNLOADS_TOTAL: &str = "torrent_repository_torrents_downloads_total"; +const TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL: &str = "torrent_repository_torrents_inactive_total"; // Peers metrics @@ -56,6 +57,12 @@ pub fn describe_metrics() -> Metrics { Some(&MetricDescription::new("The total number of torrent downloads.")), ); + metrics.metric_collection.describe_gauge( + &metric_name!(TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL), + Some(Unit::Count), + Some(&MetricDescription::new("The total number of inactive torrents.")), + ); + // Peers metrics metrics.metric_collection.describe_counter( diff --git a/packages/torrent-repository/src/statistics/peers_inactivity_update.rs b/packages/torrent-repository/src/statistics/peers_inactivity_update.rs deleted file mode 100644 index e388173a1..000000000 --- a/packages/torrent-repository/src/statistics/peers_inactivity_update.rs +++ /dev/null @@ -1,72 +0,0 @@ -//! Job that runs a task on intervals to update peers' inactivity metrics. -use std::sync::Arc; - -use chrono::Utc; -use tokio::task::JoinHandle; -use torrust_tracker_clock::clock::Time; -use torrust_tracker_metrics::label::LabelSet; -use torrust_tracker_metrics::metric_name; -use torrust_tracker_primitives::DurationSinceUnixEpoch; -use tracing::instrument; - -use super::repository::Repository; -use crate::statistics::TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL; -use crate::{CurrentClock, Swarms}; - -#[must_use] -#[instrument(skip(swarms, stats_repository))] -pub fn start_job( - swarms: &Arc, - stats_repository: &Arc, - inactivity_cutoff: DurationSinceUnixEpoch, -) -> JoinHandle<()> { - let weak_swarms = std::sync::Arc::downgrade(swarms); - let weak_stats_repository = std::sync::Arc::downgrade(stats_repository); - - let interval_in_secs = 15; // todo: make this configurable - - tokio::spawn(async move { - let interval = std::time::Duration::from_secs(interval_in_secs); - let mut interval = tokio::time::interval(interval); - interval.tick().await; - - loop { - tokio::select! { - _ = tokio::signal::ctrl_c() => { - tracing::info!("Stopping peers inactivity metrics update job ..."); - break; - } - _ = interval.tick() => { - if let (Some(swarms), Some(stats_repository)) = (weak_swarms.upgrade(), weak_stats_repository.upgrade()) { - let start_time = Utc::now().time(); - - tracing::debug!("Updating peers inactivity metrics (executed every {} secs) ...", interval_in_secs); - - let inactive_peers_total = swarms.count_inactive_peers(inactivity_cutoff).await; - - tracing::info!(inactive_peers_total = inactive_peers_total); - - #[allow(clippy::cast_precision_loss)] - let inactive_peers_total = inactive_peers_total as f64; - - let _unused = stats_repository - .set_gauge( - &metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL), - &LabelSet::default(), - inactive_peers_total, - CurrentClock::now(), - ) - .await; - - tracing::debug!( - "Peers inactivity metrics updated in {} ms", - (Utc::now().time() - start_time).num_milliseconds() - ); - } else { - break; - } - } - } - } - }) -} diff --git a/packages/torrent-repository/src/swarm.rs b/packages/torrent-repository/src/swarm.rs index d7a1ede87..b9076289b 100644 --- a/packages/torrent-repository/src/swarm.rs +++ b/packages/torrent-repository/src/swarm.rs @@ -126,6 +126,17 @@ impl Swarm { .count() } + #[must_use] + pub fn get_activity_metadata(&self, current_cutoff: DurationSinceUnixEpoch) -> ActivityMetadata { + let inactive_peers_total = self.count_inactive_peers(current_cutoff); + + let active_peers_total = self.len() - inactive_peers_total; + + let is_active = active_peers_total > 0; + + ActivityMetadata::new(is_active, active_peers_total, inactive_peers_total) + } + #[must_use] pub fn len(&self) -> usize { self.peers.len() @@ -296,6 +307,30 @@ impl Swarm { } } +#[derive(Clone)] +pub struct ActivityMetadata { + /// Indicates if the swarm is active. It's inactive if there are no active + /// peers. + pub is_active: bool, + + /// The number of active peers in the swarm. + pub active_peers_total: usize, + + /// The number of inactive peers in the swarm. + pub inactive_peers_total: usize, +} + +impl ActivityMetadata { + #[must_use] + pub fn new(is_active: bool, active_peers_total: usize, inactive_peers_total: usize) -> Self { + Self { + is_active, + active_peers_total, + inactive_peers_total, + } + } +} + #[cfg(test)] mod tests { diff --git a/packages/torrent-repository/src/swarms.rs b/packages/torrent-repository/src/swarms.rs index 811bf6a50..36f83070d 100644 --- a/packages/torrent-repository/src/swarms.rs +++ b/packages/torrent-repository/src/swarms.rs @@ -248,6 +248,32 @@ impl Swarms { } } + pub async fn get_activity_metadata(&self, current_cutoff: DurationSinceUnixEpoch) -> AggregateActivityMetadata { + let mut active_peers_total = 0; + let mut inactive_peers_total = 0; + let mut active_torrents_total = 0; + + for swarm_handle in &self.swarms { + let swarm = swarm_handle.value().lock().await; + + let activity_metadata = swarm.get_activity_metadata(current_cutoff); + + if activity_metadata.is_active { + active_torrents_total += 1; + } + + active_peers_total += activity_metadata.active_peers_total; + inactive_peers_total += activity_metadata.inactive_peers_total; + } + + AggregateActivityMetadata { + active_peers_total, + inactive_peers_total, + active_torrents_total, + inactive_torrents_total: self.len() - active_torrents_total, + } + } + /// Counts the number of inactive peers across all torrents. pub async fn count_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> usize { let mut inactive_peers_total = 0; @@ -446,6 +472,31 @@ impl Swarms { #[derive(thiserror::Error, Debug, Clone)] pub enum Error {} +#[derive(Clone, Debug, Default)] +pub struct AggregateActivityMetadata { + /// The number of active peers in all swarms. + pub active_peers_total: usize, + + /// The number of inactive peers in all swarms. + pub inactive_peers_total: usize, + + /// The number of active torrents. + pub active_torrents_total: usize, + + /// The number of inactive torrents. + pub inactive_torrents_total: usize, +} + +impl AggregateActivityMetadata { + pub fn log(&self) { + tracing::info!( + active_peers_total = self.active_peers_total, + inactive_peers_total = self.inactive_peers_total, + active_torrents_total = self.active_torrents_total, + inactive_torrents_total = self.inactive_torrents_total + ); + } +} #[cfg(test)] mod tests { diff --git a/src/app.rs b/src/app.rs index 1c2d9387e..5180e4583 100644 --- a/src/app.rs +++ b/src/app.rs @@ -28,7 +28,7 @@ use tracing::instrument; use crate::bootstrap::jobs::manager::JobManager; use crate::bootstrap::jobs::{ - self, health_check_api, http_tracker, peers_inactivity_update, torrent_cleanup, tracker_apis, udp_tracker, + self, activity_metrics_updater, health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker, }; use crate::bootstrap::{self}; use crate::container::AppContainer; @@ -267,7 +267,7 @@ fn start_torrent_cleanup(config: &Configuration, app_container: &Arc, job_manager: &mut JobManager) { if config.core.tracker_usage_statistics { - let handle = peers_inactivity_update::start_job(config, app_container); + let handle = activity_metrics_updater::start_job(config, app_container); job_manager.push("peers_inactivity_update", handle); } else { diff --git a/src/bootstrap/jobs/peers_inactivity_update.rs b/src/bootstrap/jobs/activity_metrics_updater.rs similarity index 84% rename from src/bootstrap/jobs/peers_inactivity_update.rs rename to src/bootstrap/jobs/activity_metrics_updater.rs index e7939720c..7411c05cf 100644 --- a/src/bootstrap/jobs/peers_inactivity_update.rs +++ b/src/bootstrap/jobs/activity_metrics_updater.rs @@ -1,4 +1,4 @@ -//! Job that runs a task on intervals to update peers' inactivity metrics. +//! Job that runs a task on intervals to update peers' activity metrics. use std::sync::Arc; use std::time::Duration; @@ -11,7 +11,7 @@ use crate::CurrentClock; #[must_use] pub fn start_job(config: &Configuration, app_container: &Arc) -> JoinHandle<()> { - torrust_tracker_torrent_repository::statistics::peers_inactivity_update::start_job( + torrust_tracker_torrent_repository::statistics::activity_metrics_updater::start_job( &app_container.torrent_repository_container.swarms.clone(), &app_container.torrent_repository_container.stats_repository.clone(), peer_inactivity_cutoff_timestamp(config.core.tracker_policy.max_peer_timeout), diff --git a/src/bootstrap/jobs/mod.rs b/src/bootstrap/jobs/mod.rs index f593ce808..c8d7a8598 100644 --- a/src/bootstrap/jobs/mod.rs +++ b/src/bootstrap/jobs/mod.rs @@ -6,11 +6,11 @@ //! 2. Launch all the application services as concurrent jobs. //! //! This modules contains all the functions needed to start those jobs. +pub mod activity_metrics_updater; pub mod health_check_api; pub mod http_tracker; pub mod http_tracker_core; pub mod manager; -pub mod peers_inactivity_update; pub mod torrent_cleanup; pub mod torrent_repository; pub mod tracker_apis;