diff --git a/packages/metrics/src/counter.rs b/packages/metrics/src/counter.rs index 3a816c75b..ac6d21836 100644 --- a/packages/metrics/src/counter.rs +++ b/packages/metrics/src/counter.rs @@ -20,6 +20,10 @@ impl Counter { pub fn increment(&mut self, value: u64) { self.0 += value; } + + pub fn absolute(&mut self, value: u64) { + self.0 = value; + } } impl From for Counter { @@ -73,6 +77,13 @@ mod tests { assert_eq!(counter.value(), 3); } + #[test] + fn it_could_set_to_an_absolute_value() { + let mut counter = Counter::new(0); + counter.absolute(1); + assert_eq!(counter.value(), 1); + } + #[test] fn it_serializes_to_prometheus() { let counter = Counter::new(42); diff --git a/packages/metrics/src/metric/mod.rs b/packages/metrics/src/metric/mod.rs index 05779f09f..2118637b8 100644 --- a/packages/metrics/src/metric/mod.rs +++ b/packages/metrics/src/metric/mod.rs @@ -55,6 +55,10 @@ impl Metric { pub fn increment(&mut self, label_set: &LabelSet, time: DurationSinceUnixEpoch) { self.sample_collection.increment(label_set, time); } + + pub fn absolute(&mut self, label_set: &LabelSet, value: u64, time: DurationSinceUnixEpoch) { + self.sample_collection.absolute(label_set, value, time); + } } impl Metric { diff --git a/packages/metrics/src/metric_collection.rs b/packages/metrics/src/metric_collection.rs index 83b08f178..824397000 100644 --- a/packages/metrics/src/metric_collection.rs +++ b/packages/metrics/src/metric_collection.rs @@ -72,6 +72,8 @@ impl MetricCollection { self.counters.get_value(name, label_set) } + /// Increases the counter for the given metric name and labels. + /// /// # Errors /// /// Return an error if a metrics of a different type with the same name @@ -93,6 +95,30 @@ impl MetricCollection { Ok(()) } + /// Sets the counter for the given metric name and labels. + /// + /// # Errors + /// + /// Return an error if a metrics of a different type with the same name + /// already exists. + pub fn set_counter( + &mut self, + name: &MetricName, + label_set: &LabelSet, + value: u64, + time: DurationSinceUnixEpoch, + ) -> Result<(), Error> { + if self.gauges.metrics.contains_key(name) { + return Err(Error::MetricNameCollisionAdding { + metric_name: name.clone(), + }); + } + + self.counters.absolute(name, label_set, value, time); + + Ok(()) + } + pub fn ensure_counter_exists(&mut self, name: &MetricName) { self.counters.ensure_metric_exists(name); } @@ -361,7 +387,7 @@ impl MetricKindCollection { /// /// # Panics /// - /// Panics if the metric does not exist and it could not be created. + /// Panics if the metric does not exist. pub fn increment(&mut self, name: &MetricName, label_set: &LabelSet, time: DurationSinceUnixEpoch) { self.ensure_metric_exists(name); @@ -370,6 +396,21 @@ impl MetricKindCollection { metric.increment(label_set, time); } + /// Sets the counter to an absolute value for the given metric name and labels. + /// + /// If the metric name does not exist, it will be created. + /// + /// # Panics + /// + /// Panics if the metric does not exist. + pub fn absolute(&mut self, name: &MetricName, label_set: &LabelSet, value: u64, time: DurationSinceUnixEpoch) { + self.ensure_metric_exists(name); + + let metric = self.metrics.get_mut(name).expect("Counter metric should exist"); + + metric.absolute(label_set, value, time); + } + #[must_use] pub fn get_value(&self, name: &MetricName, label_set: &LabelSet) -> Option { self.metrics diff --git a/packages/metrics/src/sample.rs b/packages/metrics/src/sample.rs index 4621c9906..ad4dff00e 100644 --- a/packages/metrics/src/sample.rs +++ b/packages/metrics/src/sample.rs @@ -122,6 +122,11 @@ impl Measurement { self.value.increment(1); self.set_recorded_at(time); } + + pub fn absolute(&mut self, value: u64, time: DurationSinceUnixEpoch) { + self.value.absolute(value); + self.set_recorded_at(time); + } } impl Measurement { diff --git a/packages/metrics/src/sample_collection.rs b/packages/metrics/src/sample_collection.rs index ea6b4d4af..e815f26ec 100644 --- a/packages/metrics/src/sample_collection.rs +++ b/packages/metrics/src/sample_collection.rs @@ -79,6 +79,15 @@ impl SampleCollection { sample.increment(time); } + + pub fn absolute(&mut self, label_set: &LabelSet, value: u64, time: DurationSinceUnixEpoch) { + let sample = self + .samples + .entry(label_set.clone()) + .or_insert_with(|| Measurement::new(Counter::default(), time)); + + sample.absolute(value, time); + } } impl SampleCollection { diff --git a/packages/tracker-core/migrations/mysql/20240730183000_torrust_tracker_create_all_tables.sql b/packages/tracker-core/migrations/mysql/20240730183000_torrust_tracker_create_all_tables.sql index 407ae4dd1..ab160bd75 100644 --- a/packages/tracker-core/migrations/mysql/20240730183000_torrust_tracker_create_all_tables.sql +++ b/packages/tracker-core/migrations/mysql/20240730183000_torrust_tracker_create_all_tables.sql @@ -4,6 +4,7 @@ CREATE TABLE info_hash VARCHAR(40) NOT NULL UNIQUE ); +# todo: rename to `torrent_metrics` CREATE TABLE IF NOT EXISTS torrents ( id integer PRIMARY KEY AUTO_INCREMENT, diff --git a/packages/tracker-core/migrations/mysql/20250527093000_torrust_tracker_new_torrent_aggregate_metrics_table.sql b/packages/tracker-core/migrations/mysql/20250527093000_torrust_tracker_new_torrent_aggregate_metrics_table.sql new file mode 100644 index 000000000..36f940cc3 --- /dev/null +++ b/packages/tracker-core/migrations/mysql/20250527093000_torrust_tracker_new_torrent_aggregate_metrics_table.sql @@ -0,0 +1,6 @@ +CREATE TABLE + IF NOT EXISTS torrent_aggregate_metrics ( + id integer PRIMARY KEY AUTO_INCREMENT, + metric_name VARCHAR(50) NOT NULL UNIQUE, + value INTEGER DEFAULT 0 NOT NULL + ); \ No newline at end of file diff --git a/packages/tracker-core/migrations/sqlite/20240730183000_torrust_tracker_create_all_tables.sql b/packages/tracker-core/migrations/sqlite/20240730183000_torrust_tracker_create_all_tables.sql index bd451bf8b..c5bcad926 100644 --- a/packages/tracker-core/migrations/sqlite/20240730183000_torrust_tracker_create_all_tables.sql +++ b/packages/tracker-core/migrations/sqlite/20240730183000_torrust_tracker_create_all_tables.sql @@ -4,6 +4,7 @@ CREATE TABLE info_hash TEXT NOT NULL UNIQUE ); +# todo: rename to `torrent_metrics` CREATE TABLE IF NOT EXISTS torrents ( id INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/packages/tracker-core/migrations/sqlite/20250527093000_torrust_tracker_new_torrent_aggregate_metrics_table.sql b/packages/tracker-core/migrations/sqlite/20250527093000_torrust_tracker_new_torrent_aggregate_metrics_table.sql new file mode 100644 index 000000000..34166903c --- /dev/null +++ b/packages/tracker-core/migrations/sqlite/20250527093000_torrust_tracker_new_torrent_aggregate_metrics_table.sql @@ -0,0 +1,6 @@ +CREATE TABLE + IF NOT EXISTS torrent_aggregate_metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + metric_name TEXT NOT NULL UNIQUE, + value INTEGER DEFAULT 0 NOT NULL + ); \ No newline at end of file diff --git a/packages/tracker-core/src/databases/driver/mod.rs b/packages/tracker-core/src/databases/driver/mod.rs index 2cedab2d7..e8f0ecbfb 100644 --- a/packages/tracker-core/src/databases/driver/mod.rs +++ b/packages/tracker-core/src/databases/driver/mod.rs @@ -6,6 +6,9 @@ use sqlite::Sqlite; use super::error::Error; use super::Database; +/// Metric name in DB for the total number of downloads across all torrents. +const TORRENTS_DOWNLOADS_TOTAL: &str = "torrents_downloads_total"; + /// The database management system used by the tracker. /// /// Refer to: @@ -97,9 +100,14 @@ pub(crate) mod tests { // Persistent torrents (stats) + // Torrent metrics handling_torrent_persistence::it_should_save_and_load_persistent_torrents(driver); handling_torrent_persistence::it_should_load_all_persistent_torrents(driver); handling_torrent_persistence::it_should_increase_the_number_of_downloads_for_a_given_torrent(driver); + // Aggregate metrics for all torrents + handling_torrent_persistence::it_should_save_and_load_the_global_number_of_downloads(driver); + handling_torrent_persistence::it_should_load_the_global_number_of_downloads(driver); + handling_torrent_persistence::it_should_increase_the_global_number_of_downloads(driver); // Authentication keys (for private trackers) @@ -154,6 +162,8 @@ pub(crate) mod tests { use crate::databases::Database; use crate::test_helpers::tests::sample_info_hash; + // Metrics per torrent + pub fn it_should_save_and_load_persistent_torrents(driver: &Arc>) { let infohash = sample_info_hash(); @@ -192,6 +202,40 @@ pub(crate) mod tests { assert_eq!(number_of_downloads, 2); } + + // Aggregate metrics for all torrents + + pub fn it_should_save_and_load_the_global_number_of_downloads(driver: &Arc>) { + let number_of_downloads = 1; + + driver.save_global_number_of_downloads(number_of_downloads).unwrap(); + + let number_of_downloads = driver.load_global_number_of_downloads().unwrap().unwrap(); + + assert_eq!(number_of_downloads, 1); + } + + pub fn it_should_load_the_global_number_of_downloads(driver: &Arc>) { + let number_of_downloads = 1; + + driver.save_global_number_of_downloads(number_of_downloads).unwrap(); + + let number_of_downloads = driver.load_global_number_of_downloads().unwrap().unwrap(); + + assert_eq!(number_of_downloads, 1); + } + + pub fn it_should_increase_the_global_number_of_downloads(driver: &Arc>) { + let number_of_downloads = 1; + + driver.save_global_number_of_downloads(number_of_downloads).unwrap(); + + driver.increase_global_number_of_downloads().unwrap(); + + let number_of_downloads = driver.load_global_number_of_downloads().unwrap().unwrap(); + + assert_eq!(number_of_downloads, 2); + } } mod handling_authentication_keys { diff --git a/packages/tracker-core/src/databases/driver/mysql.rs b/packages/tracker-core/src/databases/driver/mysql.rs index d07f061c2..bfbc47ebd 100644 --- a/packages/tracker-core/src/databases/driver/mysql.rs +++ b/packages/tracker-core/src/databases/driver/mysql.rs @@ -15,7 +15,7 @@ use r2d2_mysql::mysql::{params, Opts, OptsBuilder}; use r2d2_mysql::MySqlConnectionManager; use torrust_tracker_primitives::{PersistentTorrent, PersistentTorrents}; -use super::{Database, Driver, Error}; +use super::{Database, Driver, Error, TORRENTS_DOWNLOADS_TOTAL}; use crate::authentication::key::AUTH_KEY_LENGTH; use crate::authentication::{self, Key}; @@ -46,6 +46,27 @@ impl Mysql { Ok(Self { pool }) } + + fn load_torrent_aggregate_metric(&self, metric_name: &str) -> Result, Error> { + let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + let query = conn.exec_first::( + "SELECT value FROM torrent_aggregate_metrics WHERE metric_name = :metric_name", + params! { "metric_name" => metric_name }, + ); + + let persistent_torrent = query?; + + Ok(persistent_torrent) + } + + fn save_torrent_aggregate_metric(&self, metric_name: &str, completed: PersistentTorrent) -> Result<(), Error> { + const COMMAND : &str = "INSERT INTO torrent_aggregate_metrics (metric_name, value) VALUES (:metric_name, :completed) ON DUPLICATE KEY UPDATE value = VALUES(value)"; + + let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + Ok(conn.exec_drop(COMMAND, params! { metric_name, completed })?) + } } impl Database for Mysql { @@ -66,6 +87,14 @@ impl Database for Mysql { );" .to_string(); + let create_torrent_aggregate_metrics_table = " + CREATE TABLE IF NOT EXISTS torrent_aggregate_metrics ( + id integer PRIMARY KEY AUTO_INCREMENT, + metric_name VARCHAR(50) NOT NULL UNIQUE, + value INTEGER DEFAULT 0 NOT NULL + );" + .to_string(); + let create_keys_table = format!( " CREATE TABLE IF NOT EXISTS `keys` ( @@ -82,6 +111,8 @@ impl Database for Mysql { conn.query_drop(&create_torrents_table) .expect("Could not create torrents table."); + conn.query_drop(&create_torrent_aggregate_metrics_table) + .expect("Could not create create_torrent_aggregate_metrics_table table."); conn.query_drop(&create_keys_table).expect("Could not create keys table."); conn.query_drop(&create_whitelist_table) .expect("Could not create whitelist table."); @@ -168,6 +199,30 @@ impl Database for Mysql { Ok(()) } + /// Refer to [`databases::Database::load_global_number_of_downloads`](crate::core::databases::Database::load_global_number_of_downloads). + fn load_global_number_of_downloads(&self) -> Result, Error> { + self.load_torrent_aggregate_metric(TORRENTS_DOWNLOADS_TOTAL) + } + + /// Refer to [`databases::Database::save_global_number_of_downloads`](crate::core::databases::Database::save_global_number_of_downloads). + fn save_global_number_of_downloads(&self, downloaded: PersistentTorrent) -> Result<(), Error> { + self.save_torrent_aggregate_metric(TORRENTS_DOWNLOADS_TOTAL, downloaded) + } + + /// Refer to [`databases::Database::increase_global_number_of_downloads`](crate::core::databases::Database::increase_global_number_of_downloads). + fn increase_global_number_of_downloads(&self) -> Result<(), Error> { + let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + let metric_name = TORRENTS_DOWNLOADS_TOTAL; + + conn.exec_drop( + "UPDATE torrent_aggregate_metrics SET value = value + 1 WHERE metric_name = :metric_name", + params! { metric_name }, + )?; + + Ok(()) + } + /// Refer to [`databases::Database::load_keys`](crate::core::databases::Database::load_keys). fn load_keys(&self) -> Result, Error> { let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?; diff --git a/packages/tracker-core/src/databases/driver/sqlite.rs b/packages/tracker-core/src/databases/driver/sqlite.rs index d36f24f8b..91e969233 100644 --- a/packages/tracker-core/src/databases/driver/sqlite.rs +++ b/packages/tracker-core/src/databases/driver/sqlite.rs @@ -15,7 +15,7 @@ use r2d2_sqlite::rusqlite::types::Null; use r2d2_sqlite::SqliteConnectionManager; use torrust_tracker_primitives::{DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; -use super::{Database, Driver, Error}; +use super::{Database, Driver, Error, TORRENTS_DOWNLOADS_TOTAL}; use crate::authentication::{self, Key}; const DRIVER: Driver = Driver::Sqlite3; @@ -49,6 +49,39 @@ impl Sqlite { Ok(Self { pool }) } + + fn load_torrent_aggregate_metric(&self, metric_name: &str) -> Result, Error> { + let conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + let mut stmt = conn.prepare("SELECT value FROM torrent_aggregate_metrics WHERE metric_name = ?")?; + + let mut rows = stmt.query([metric_name])?; + + let persistent_torrent = rows.next()?; + + Ok(persistent_torrent.map(|f| { + let value: i64 = f.get(0).unwrap(); + u32::try_from(value).unwrap() + })) + } + + fn save_torrent_aggregate_metric(&self, metric_name: &str, completed: PersistentTorrent) -> Result<(), Error> { + let conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + let insert = conn.execute( + "INSERT INTO torrent_aggregate_metrics (metric_name, value) VALUES (?1, ?2) ON CONFLICT(metric_name) DO UPDATE SET value = ?2", + [metric_name.to_string(), completed.to_string()], + )?; + + if insert == 0 { + Err(Error::InsertFailed { + location: Location::caller(), + driver: DRIVER, + }) + } else { + Ok(()) + } + } } impl Database for Sqlite { @@ -69,6 +102,14 @@ impl Database for Sqlite { );" .to_string(); + let create_torrent_aggregate_metrics_table = " + CREATE TABLE IF NOT EXISTS torrent_aggregate_metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + metric_name TEXT NOT NULL UNIQUE, + value INTEGER DEFAULT 0 NOT NULL + );" + .to_string(); + let create_keys_table = " CREATE TABLE IF NOT EXISTS keys ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -82,6 +123,7 @@ impl Database for Sqlite { conn.execute(&create_whitelist_table, [])?; conn.execute(&create_keys_table, [])?; conn.execute(&create_torrents_table, [])?; + conn.execute(&create_torrent_aggregate_metrics_table, [])?; Ok(()) } @@ -172,6 +214,30 @@ impl Database for Sqlite { Ok(()) } + /// Refer to [`databases::Database::load_global_number_of_downloads`](crate::core::databases::Database::load_global_number_of_downloads). + fn load_global_number_of_downloads(&self) -> Result, Error> { + self.load_torrent_aggregate_metric(TORRENTS_DOWNLOADS_TOTAL) + } + + /// Refer to [`databases::Database::save_global_number_of_downloads`](crate::core::databases::Database::save_global_number_of_downloads). + fn save_global_number_of_downloads(&self, downloaded: PersistentTorrent) -> Result<(), Error> { + self.save_torrent_aggregate_metric(TORRENTS_DOWNLOADS_TOTAL, downloaded) + } + + /// Refer to [`databases::Database::increase_global_number_of_downloads`](crate::core::databases::Database::increase_global_number_of_downloads). + fn increase_global_number_of_downloads(&self) -> Result<(), Error> { + let conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + let metric_name = TORRENTS_DOWNLOADS_TOTAL; + + let _ = conn.execute( + "UPDATE torrent_aggregate_metrics SET value = value + 1 WHERE metric_name = ?", + [metric_name], + )?; + + Ok(()) + } + /// Refer to [`databases::Database::load_keys`](crate::core::databases::Database::load_keys). fn load_keys(&self) -> Result, Error> { let conn = self.pool.get().map_err(|e| (e, DRIVER))?; diff --git a/packages/tracker-core/src/databases/mod.rs b/packages/tracker-core/src/databases/mod.rs index 2703ab8bf..a9d6b2a22 100644 --- a/packages/tracker-core/src/databases/mod.rs +++ b/packages/tracker-core/src/databases/mod.rs @@ -131,16 +131,48 @@ pub trait Database: Sync + Send { /// It does not create a new entry if the torrent is not found and it does /// not return an error. /// + /// # Context: Torrent Metrics + /// + /// # Arguments + /// + /// * `info_hash` - A reference to the torrent's info hash. + /// + /// # Errors + /// + /// Returns an [`Error`] if the query failed. + fn increase_number_of_downloads(&self, info_hash: &InfoHash) -> Result<(), Error>; + + /// Loads the total number of downloads for all torrents from the database. + /// + /// # Context: Torrent Metrics + /// + /// # Errors + /// + /// Returns an [`Error`] if the total downloads cannot be loaded. + fn load_global_number_of_downloads(&self) -> Result, Error>; + + /// Saves the total number of downloads for all torrents into the database. + /// + /// # Context: Torrent Metrics + /// /// # Arguments /// /// * `info_hash` - A reference to the torrent's info hash. + /// * `downloaded` - The number of times the torrent has been downloaded. + /// + /// # Errors + /// + /// Returns an [`Error`] if the total downloads cannot be saved. + fn save_global_number_of_downloads(&self, downloaded: PersistentTorrent) -> Result<(), Error>; + + /// Increases the total number of downloads for all torrents. /// /// # Context: Torrent Metrics /// /// # Errors /// /// Returns an [`Error`] if the query failed. - fn increase_number_of_downloads(&self, info_hash: &InfoHash) -> Result<(), Error>; + fn increase_global_number_of_downloads(&self) -> Result<(), Error>; // Whitelist diff --git a/packages/tracker-core/src/statistics/event/handler.rs b/packages/tracker-core/src/statistics/event/handler.rs index ac6d0639e..4002053e2 100644 --- a/packages/tracker-core/src/statistics/event/handler.rs +++ b/packages/tracker-core/src/statistics/event/handler.rs @@ -13,6 +13,7 @@ pub async fn handle_event( event: Event, stats_repository: &Arc, db_torrent_repository: &Arc, + persistent_torrent_completed_stat: bool, now: DurationSinceUnixEpoch, ) { match event { @@ -41,17 +42,7 @@ pub async fn handle_event( Event::PeerDownloadCompleted { info_hash, peer } => { tracing::debug!(info_hash = ?info_hash, peer = ?peer, "Peer download completed", ); - // Increment the number of downloads for the torrent - match db_torrent_repository.increase_number_of_downloads(&info_hash) { - Ok(()) => { - tracing::debug!(info_hash = ?info_hash, "Number of downloads increased"); - } - Err(err) => { - tracing::error!(info_hash = ?info_hash, error = ?err, "Failed to increase number of downloads"); - } - } - - // Increment the number of downloads for all the torrents + // Increment the number of downloads for all the torrents in memory let _unused = stats_repository .increment_counter( &metric_name!(TRACKER_CORE_PERSISTENT_TORRENTS_DOWNLOADS_TOTAL), @@ -60,9 +51,27 @@ pub async fn handle_event( ) .await; - // todo: - // - Persist the metric into the database. - // - Load the metric from the database. + if persistent_torrent_completed_stat { + // Increment the number of downloads for the torrent in the database + match db_torrent_repository.increase_number_of_downloads(&info_hash) { + Ok(()) => { + tracing::debug!(info_hash = ?info_hash, "Number of torrent downloads increased"); + } + Err(err) => { + tracing::error!(info_hash = ?info_hash, error = ?err, "Failed to increase number of downloads for the torrent"); + } + } + + // Increment the global number of downloads (for all torrents) in the database + match db_torrent_repository.increase_global_number_of_downloads() { + Ok(()) => { + tracing::debug!("Global number of downloads increased"); + } + Err(err) => { + tracing::error!(error = ?err, "Failed to increase global number of downloads"); + } + } + } } } } diff --git a/packages/tracker-core/src/statistics/event/listener.rs b/packages/tracker-core/src/statistics/event/listener.rs index f85b2b7a0..cf6d35d6e 100644 --- a/packages/tracker-core/src/statistics/event/listener.rs +++ b/packages/tracker-core/src/statistics/event/listener.rs @@ -15,6 +15,7 @@ pub fn run_event_listener( receiver: Receiver, repository: &Arc, db_torrent_repository: &Arc, + persistent_torrent_completed_stat: bool, ) -> JoinHandle<()> { let stats_repository = repository.clone(); let db_torrent_repository: Arc = db_torrent_repository.clone(); @@ -22,7 +23,13 @@ pub fn run_event_listener( tracing::info!(target: TRACKER_CORE_LOG_TARGET, "Starting torrent repository event listener"); tokio::spawn(async move { - dispatch_events(receiver, stats_repository, db_torrent_repository).await; + dispatch_events( + receiver, + stats_repository, + db_torrent_repository, + persistent_torrent_completed_stat, + ) + .await; tracing::info!(target: TRACKER_CORE_LOG_TARGET, "Torrent repository listener finished"); }) @@ -32,6 +39,7 @@ async fn dispatch_events( mut receiver: Receiver, stats_repository: Arc, db_torrent_repository: Arc, + persistent_torrent_completed_stat: bool, ) { let shutdown_signal = tokio::signal::ctrl_c(); @@ -48,7 +56,12 @@ async fn dispatch_events( result = receiver.recv() => { match result { - Ok(event) => handle_event(event, &stats_repository, &db_torrent_repository, CurrentClock::now()).await, + Ok(event) => handle_event( + event, + &stats_repository, + &db_torrent_repository, + persistent_torrent_completed_stat, + CurrentClock::now()).await, Err(e) => { match e { RecvError::Closed => { diff --git a/packages/tracker-core/src/statistics/metrics.rs b/packages/tracker-core/src/statistics/metrics.rs index f8ab3f9d9..02cc51499 100644 --- a/packages/tracker-core/src/statistics/metrics.rs +++ b/packages/tracker-core/src/statistics/metrics.rs @@ -24,6 +24,19 @@ impl Metrics { self.metric_collection.increase_counter(metric_name, labels, now) } + /// # Errors + /// + /// Returns an error if the metric does not exist and it cannot be created. + pub fn set_counter( + &mut self, + metric_name: &MetricName, + labels: &LabelSet, + value: u64, + now: DurationSinceUnixEpoch, + ) -> Result<(), Error> { + self.metric_collection.set_counter(metric_name, labels, value, now) + } + /// # Errors /// /// Returns an error if the metric does not exist and it cannot be created. diff --git a/packages/tracker-core/src/statistics/mod.rs b/packages/tracker-core/src/statistics/mod.rs index 1cd9aac6b..89d6b79d5 100644 --- a/packages/tracker-core/src/statistics/mod.rs +++ b/packages/tracker-core/src/statistics/mod.rs @@ -1,5 +1,6 @@ pub mod event; pub mod metrics; +pub mod persisted_metrics; pub mod repository; use metrics::Metrics; diff --git a/packages/tracker-core/src/statistics/persisted_metrics.rs b/packages/tracker-core/src/statistics/persisted_metrics.rs new file mode 100644 index 000000000..4d53236a5 --- /dev/null +++ b/packages/tracker-core/src/statistics/persisted_metrics.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use thiserror::Error; +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::{metric_collection, metric_name}; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use super::repository::Repository; +use super::TRACKER_CORE_PERSISTENT_TORRENTS_DOWNLOADS_TOTAL; +use crate::databases; +use crate::torrent::repository::persisted::DatabasePersistentTorrentRepository; + +/// Loads persisted metrics from the database and sets them in the stats repository. +/// +/// # Errors +/// +/// This function will return an error if the database query fails or if the +/// metric collection fails to set the initial metric values. +pub async fn load_persisted_metrics( + stats_repository: &Arc, + db_torrent_repository: &Arc, + now: DurationSinceUnixEpoch, +) -> Result<(), Error> { + if let Some(downloads) = db_torrent_repository.load_global_number_of_downloads()? { + stats_repository + .set_counter( + &metric_name!(TRACKER_CORE_PERSISTENT_TORRENTS_DOWNLOADS_TOTAL), + &LabelSet::default(), + u64::from(downloads), + now, + ) + .await?; + } + + Ok(()) +} + +#[derive(Error, Debug, Clone)] +pub enum Error { + #[error("Database error: {err}")] + DatabaseError { err: databases::error::Error }, + + #[error("Metrics error: {err}")] + MetricsError { err: metric_collection::Error }, +} + +impl From for Error { + fn from(err: databases::error::Error) -> Self { + Self::DatabaseError { err } + } +} + +impl From for Error { + fn from(err: metric_collection::Error) -> Self { + Self::MetricsError { err } + } +} diff --git a/packages/tracker-core/src/statistics/repository.rs b/packages/tracker-core/src/statistics/repository.rs index fe1292d00..dd0ebebe7 100644 --- a/packages/tracker-core/src/statistics/repository.rs +++ b/packages/tracker-core/src/statistics/repository.rs @@ -57,6 +57,31 @@ impl Repository { result } + /// # Errors + /// + /// This function will return an error if the metric collection fails to + /// increment the counter. + pub async fn set_counter( + &self, + metric_name: &MetricName, + labels: &LabelSet, + value: u64, + now: DurationSinceUnixEpoch, + ) -> Result<(), Error> { + let mut stats_lock = self.stats.write().await; + + let result = stats_lock.set_counter(metric_name, labels, value, now); + + drop(stats_lock); + + match result { + Ok(()) => {} + Err(ref err) => tracing::error!("Failed to set the counter: {}", err), + } + + result + } + /// # Errors /// /// This function will return an error if the metric collection fails to diff --git a/packages/tracker-core/src/torrent/repository/persisted.rs b/packages/tracker-core/src/torrent/repository/persisted.rs index dec571baf..1818065fd 100644 --- a/packages/tracker-core/src/torrent/repository/persisted.rs +++ b/packages/tracker-core/src/torrent/repository/persisted.rs @@ -47,6 +47,8 @@ impl DatabasePersistentTorrentRepository { } } + // Single Torrent Metrics + /// Increases the number of downloads for a given torrent. /// /// If the torrent is not found, it creates a new entry. @@ -107,6 +109,33 @@ impl DatabasePersistentTorrentRepository { pub(crate) fn save(&self, info_hash: &InfoHash, downloaded: u32) -> Result<(), Error> { self.database.save_persistent_torrent(info_hash, downloaded) } + + // Aggregate Metrics + + /// Increases the global number of downloads for all torrent. + /// + /// If the metric is not found, it creates it. + /// + /// # Errors + /// + /// Returns an [`Error`] if the database operation fails. + pub(crate) fn increase_global_number_of_downloads(&self) -> Result<(), Error> { + let torrent = self.database.load_global_number_of_downloads()?; + + match torrent { + Some(_number_of_downloads) => self.database.increase_global_number_of_downloads(), + None => self.database.save_global_number_of_downloads(1), + } + } + + /// Loads the global number of downloads for all torrents from the database. + /// + /// # Errors + /// + /// Returns an [`Error`] if the underlying database query fails. + pub(crate) fn load_global_number_of_downloads(&self) -> Result, Error> { + self.database.load_global_number_of_downloads() + } } #[cfg(test)] diff --git a/packages/tracker-core/tests/common/test_env.rs b/packages/tracker-core/tests/common/test_env.rs index 0be8bd4c6..11a4d400a 100644 --- a/packages/tracker-core/tests/common/test_env.rs +++ b/packages/tracker-core/tests/common/test_env.rs @@ -5,11 +5,15 @@ use aquatic_udp_protocol::AnnounceEvent; use bittorrent_primitives::info_hash::InfoHash; use bittorrent_tracker_core::announce_handler::PeersWanted; use bittorrent_tracker_core::container::TrackerCoreContainer; +use bittorrent_tracker_core::statistics::persisted_metrics::load_persisted_metrics; use tokio::task::yield_now; use torrust_tracker_configuration::Core; +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::metric::MetricName; use torrust_tracker_primitives::core::{AnnounceData, ScrapeData}; use torrust_tracker_primitives::peer::Peer; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::DurationSinceUnixEpoch; use torrust_tracker_torrent_repository::container::TorrentRepositoryContainer; pub struct TestEnv { @@ -45,6 +49,22 @@ impl TestEnv { } pub async fn start(&self) { + let now = DurationSinceUnixEpoch::from_secs(0); + self.load_persisted_metrics(now).await; + self.run_jobs().await; + } + + async fn load_persisted_metrics(&self, now: DurationSinceUnixEpoch) { + load_persisted_metrics( + &self.tracker_core_container.stats_repository, + &self.tracker_core_container.db_torrent_repository, + now, + ) + .await + .unwrap(); + } + + async fn run_jobs(&self) { let mut jobs = vec![]; let job = torrust_tracker_torrent_repository::statistics::event::listener::run_event_listener( @@ -58,6 +78,10 @@ impl TestEnv { self.torrent_repository_container.event_bus.receiver(), &self.tracker_core_container.stats_repository, &self.tracker_core_container.db_torrent_repository, + self.tracker_core_container + .core_config + .tracker_policy + .persistent_torrent_completed_stat, ); jobs.push(job); @@ -135,4 +159,15 @@ impl TestEnv { pub async fn remove_swarm(&self, info_hash: &InfoHash) { self.torrent_repository_container.swarms.remove(info_hash).await.unwrap(); } + + pub async fn get_counter_value(&self, metric_name: &str) -> u64 { + self.tracker_core_container + .stats_repository + .get_metrics() + .await + .metric_collection + .get_counter_value(&MetricName::new(metric_name), &LabelSet::default()) + .unwrap() + .value() + } } diff --git a/packages/tracker-core/tests/integration.rs b/packages/tracker-core/tests/integration.rs index d24acf67b..b170aaebd 100644 --- a/packages/tracker-core/tests/integration.rs +++ b/packages/tracker-core/tests/integration.rs @@ -58,7 +58,7 @@ async fn it_should_handle_the_scrape_request() { } #[tokio::test] -async fn it_should_persist_the_number_of_completed_peers_for_all_torrents_into_the_database() { +async fn it_should_persist_the_number_of_completed_peers_for_each_torrent_into_the_database() { let mut core_config = ephemeral_configuration(); core_config.tracker_policy.persistent_torrent_completed_stat = true; @@ -86,3 +86,28 @@ async fn it_should_persist_the_number_of_completed_peers_for_all_torrents_into_t assert!(test_env.get_swarm_metadata(&info_hash).await.unwrap().downloads() == 1); } + +#[tokio::test] +async fn it_should_persist_the_global_number_of_completed_peers_into_the_database() { + let mut core_config = ephemeral_configuration(); + + core_config.tracker_policy.persistent_torrent_completed_stat = true; + + let mut test_env = TestEnv::started(core_config.clone()).await; + + test_env + .increase_number_of_downloads(sample_peer(), &remote_client_ip(), &sample_info_hash()) + .await; + + // We run a new instance of the test environment to simulate a restart. + // The new instance uses the same underlying database. + + let new_test_env = TestEnv::started(core_config).await; + + assert_eq!( + new_test_env + .get_counter_value("tracker_core_persistent_torrents_downloads_total") + .await, + 1 + ); +} diff --git a/src/app.rs b/src/app.rs index 5037ad761..571e034f5 100644 --- a/src/app.rs +++ b/src/app.rs @@ -23,6 +23,7 @@ //! - Tracker REST API: the tracker API can be enabled/disabled. use std::sync::Arc; +use torrust_tracker_clock::clock::Time; use torrust_tracker_configuration::{Configuration, HttpTracker, UdpTracker}; use tracing::instrument; @@ -32,6 +33,7 @@ use crate::bootstrap::jobs::{ }; use crate::bootstrap::{self}; use crate::container::AppContainer; +use crate::CurrentClock; pub async fn run() -> (Arc, JobManager) { let (config, app_container) = bootstrap::app::setup(); @@ -63,6 +65,8 @@ pub async fn start(config: &Configuration, app_container: &Arc) -> async fn load_data_from_database(config: &Configuration, app_container: &Arc) { load_peer_keys(config, app_container).await; load_whitelisted_torrents(config, app_container).await; + load_torrent_metrics(config, app_container).await; + // todo: disabled because of performance issues. // The tracker demo has a lot of torrents and loading them all at once is not // efficient. We also load them on demand but the total number of downloads @@ -134,6 +138,19 @@ fn load_torrents_from_database(config: &Configuration, app_container: &Arc) { + if config.core.tracker_policy.persistent_torrent_completed_stat { + bittorrent_tracker_core::statistics::persisted_metrics::load_persisted_metrics( + &app_container.tracker_core_container.stats_repository, + &app_container.tracker_core_container.db_torrent_repository, + CurrentClock::now(), + ) + .await + .expect("Could not load persisted metrics from database."); + } +} + fn start_torrent_repository_event_listener( config: &Configuration, app_container: &Arc, diff --git a/src/bootstrap/jobs/tracker_core.rs b/src/bootstrap/jobs/tracker_core.rs index 37c53b9e4..161e69aad 100644 --- a/src/bootstrap/jobs/tracker_core.rs +++ b/src/bootstrap/jobs/tracker_core.rs @@ -11,6 +11,11 @@ pub fn start_event_listener(config: &Configuration, app_container: &Arc