Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/metrics/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ impl Counter {
pub fn increment(&mut self, value: u64) {
self.0 += value;
}

pub fn absolute(&mut self, value: u64) {
self.0 = value;
}
}

impl From<u64> for Counter {
Expand Down Expand Up @@ -73,6 +77,13 @@ mod tests {
assert_eq!(counter.value(), 3);
}

#[test]
fn it_could_set_to_an_absolute_value() {
let mut counter = Counter::new(0);
counter.absolute(1);
assert_eq!(counter.value(), 1);
}

#[test]
fn it_serializes_to_prometheus() {
let counter = Counter::new(42);
Expand Down
4 changes: 4 additions & 0 deletions packages/metrics/src/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ impl Metric<Counter> {
pub fn increment(&mut self, label_set: &LabelSet, time: DurationSinceUnixEpoch) {
self.sample_collection.increment(label_set, time);
}

pub fn absolute(&mut self, label_set: &LabelSet, value: u64, time: DurationSinceUnixEpoch) {
self.sample_collection.absolute(label_set, value, time);
}
}

impl Metric<Gauge> {
Expand Down
43 changes: 42 additions & 1 deletion packages/metrics/src/metric_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ impl MetricCollection {
self.counters.get_value(name, label_set)
}

/// Increases the counter for the given metric name and labels.
///
/// # Errors
///
/// Return an error if a metrics of a different type with the same name
Expand All @@ -93,6 +95,30 @@ impl MetricCollection {
Ok(())
}

/// Sets the counter for the given metric name and labels.
///
/// # Errors
///
/// Return an error if a metrics of a different type with the same name
/// already exists.
pub fn set_counter(
&mut self,
name: &MetricName,
label_set: &LabelSet,
value: u64,
time: DurationSinceUnixEpoch,
) -> Result<(), Error> {
if self.gauges.metrics.contains_key(name) {
return Err(Error::MetricNameCollisionAdding {
metric_name: name.clone(),
});
}

self.counters.absolute(name, label_set, value, time);

Ok(())
}

pub fn ensure_counter_exists(&mut self, name: &MetricName) {
self.counters.ensure_metric_exists(name);
}
Expand Down Expand Up @@ -361,7 +387,7 @@ impl MetricKindCollection<Counter> {
///
/// # Panics
///
/// Panics if the metric does not exist and it could not be created.
/// Panics if the metric does not exist.
pub fn increment(&mut self, name: &MetricName, label_set: &LabelSet, time: DurationSinceUnixEpoch) {
self.ensure_metric_exists(name);

Expand All @@ -370,6 +396,21 @@ impl MetricKindCollection<Counter> {
metric.increment(label_set, time);
}

/// Sets the counter to an absolute value for the given metric name and labels.
///
/// If the metric name does not exist, it will be created.
///
/// # Panics
///
/// Panics if the metric does not exist.
pub fn absolute(&mut self, name: &MetricName, label_set: &LabelSet, value: u64, time: DurationSinceUnixEpoch) {
self.ensure_metric_exists(name);

let metric = self.metrics.get_mut(name).expect("Counter metric should exist");

metric.absolute(label_set, value, time);
}

#[must_use]
pub fn get_value(&self, name: &MetricName, label_set: &LabelSet) -> Option<Counter> {
self.metrics
Expand Down
5 changes: 5 additions & 0 deletions packages/metrics/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ impl Measurement<Counter> {
self.value.increment(1);
self.set_recorded_at(time);
}

pub fn absolute(&mut self, value: u64, time: DurationSinceUnixEpoch) {
self.value.absolute(value);
self.set_recorded_at(time);
}
}

impl Measurement<Gauge> {
Expand Down
9 changes: 9 additions & 0 deletions packages/metrics/src/sample_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ impl SampleCollection<Counter> {

sample.increment(time);
}

pub fn absolute(&mut self, label_set: &LabelSet, value: u64, time: DurationSinceUnixEpoch) {
let sample = self
.samples
.entry(label_set.clone())
.or_insert_with(|| Measurement::new(Counter::default(), time));

sample.absolute(value, time);
}
}

impl SampleCollection<Gauge> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CREATE TABLE
info_hash VARCHAR(40) NOT NULL UNIQUE
);

# todo: rename to `torrent_metrics`
CREATE TABLE
IF NOT EXISTS torrents (
id integer PRIMARY KEY AUTO_INCREMENT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE
IF NOT EXISTS torrent_aggregate_metrics (
id integer PRIMARY KEY AUTO_INCREMENT,
metric_name VARCHAR(50) NOT NULL UNIQUE,
value INTEGER DEFAULT 0 NOT NULL
);
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CREATE TABLE
info_hash TEXT NOT NULL UNIQUE
);

# todo: rename to `torrent_metrics`
CREATE TABLE
IF NOT EXISTS torrents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE
IF NOT EXISTS torrent_aggregate_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
metric_name TEXT NOT NULL UNIQUE,
value INTEGER DEFAULT 0 NOT NULL
);
44 changes: 44 additions & 0 deletions packages/tracker-core/src/databases/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use sqlite::Sqlite;
use super::error::Error;
use super::Database;

/// Metric name in DB for the total number of downloads across all torrents.
const TORRENTS_DOWNLOADS_TOTAL: &str = "torrents_downloads_total";

/// The database management system used by the tracker.
///
/// Refer to:
Expand Down Expand Up @@ -97,9 +100,14 @@ pub(crate) mod tests {

// Persistent torrents (stats)

// Torrent metrics
handling_torrent_persistence::it_should_save_and_load_persistent_torrents(driver);
handling_torrent_persistence::it_should_load_all_persistent_torrents(driver);
handling_torrent_persistence::it_should_increase_the_number_of_downloads_for_a_given_torrent(driver);
// Aggregate metrics for all torrents
handling_torrent_persistence::it_should_save_and_load_the_global_number_of_downloads(driver);
handling_torrent_persistence::it_should_load_the_global_number_of_downloads(driver);
handling_torrent_persistence::it_should_increase_the_global_number_of_downloads(driver);

// Authentication keys (for private trackers)

Expand Down Expand Up @@ -154,6 +162,8 @@ pub(crate) mod tests {
use crate::databases::Database;
use crate::test_helpers::tests::sample_info_hash;

// Metrics per torrent

pub fn it_should_save_and_load_persistent_torrents(driver: &Arc<Box<dyn Database>>) {
let infohash = sample_info_hash();

Expand Down Expand Up @@ -192,6 +202,40 @@ pub(crate) mod tests {

assert_eq!(number_of_downloads, 2);
}

// Aggregate metrics for all torrents

pub fn it_should_save_and_load_the_global_number_of_downloads(driver: &Arc<Box<dyn Database>>) {
let number_of_downloads = 1;

driver.save_global_number_of_downloads(number_of_downloads).unwrap();

let number_of_downloads = driver.load_global_number_of_downloads().unwrap().unwrap();

assert_eq!(number_of_downloads, 1);
}

pub fn it_should_load_the_global_number_of_downloads(driver: &Arc<Box<dyn Database>>) {
let number_of_downloads = 1;

driver.save_global_number_of_downloads(number_of_downloads).unwrap();

let number_of_downloads = driver.load_global_number_of_downloads().unwrap().unwrap();

assert_eq!(number_of_downloads, 1);
}

pub fn it_should_increase_the_global_number_of_downloads(driver: &Arc<Box<dyn Database>>) {
let number_of_downloads = 1;

driver.save_global_number_of_downloads(number_of_downloads).unwrap();

driver.increase_global_number_of_downloads().unwrap();

let number_of_downloads = driver.load_global_number_of_downloads().unwrap().unwrap();

assert_eq!(number_of_downloads, 2);
}
}

mod handling_authentication_keys {
Expand Down
57 changes: 56 additions & 1 deletion packages/tracker-core/src/databases/driver/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use r2d2_mysql::mysql::{params, Opts, OptsBuilder};
use r2d2_mysql::MySqlConnectionManager;
use torrust_tracker_primitives::{PersistentTorrent, PersistentTorrents};

use super::{Database, Driver, Error};
use super::{Database, Driver, Error, TORRENTS_DOWNLOADS_TOTAL};
use crate::authentication::key::AUTH_KEY_LENGTH;
use crate::authentication::{self, Key};

Expand Down Expand Up @@ -46,6 +46,27 @@ impl Mysql {

Ok(Self { pool })
}

fn load_torrent_aggregate_metric(&self, metric_name: &str) -> Result<Option<PersistentTorrent>, Error> {
let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?;

let query = conn.exec_first::<u32, _, _>(
"SELECT value FROM torrent_aggregate_metrics WHERE metric_name = :metric_name",
params! { "metric_name" => metric_name },
);

let persistent_torrent = query?;

Ok(persistent_torrent)
}

fn save_torrent_aggregate_metric(&self, metric_name: &str, completed: PersistentTorrent) -> Result<(), Error> {
const COMMAND : &str = "INSERT INTO torrent_aggregate_metrics (metric_name, value) VALUES (:metric_name, :completed) ON DUPLICATE KEY UPDATE value = VALUES(value)";

let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?;

Ok(conn.exec_drop(COMMAND, params! { metric_name, completed })?)
}
}

impl Database for Mysql {
Expand All @@ -66,6 +87,14 @@ impl Database for Mysql {
);"
.to_string();

let create_torrent_aggregate_metrics_table = "
CREATE TABLE IF NOT EXISTS torrent_aggregate_metrics (
id integer PRIMARY KEY AUTO_INCREMENT,
metric_name VARCHAR(50) NOT NULL UNIQUE,
value INTEGER DEFAULT 0 NOT NULL
);"
.to_string();

let create_keys_table = format!(
"
CREATE TABLE IF NOT EXISTS `keys` (
Expand All @@ -82,6 +111,8 @@ impl Database for Mysql {

conn.query_drop(&create_torrents_table)
.expect("Could not create torrents table.");
conn.query_drop(&create_torrent_aggregate_metrics_table)
.expect("Could not create create_torrent_aggregate_metrics_table table.");
conn.query_drop(&create_keys_table).expect("Could not create keys table.");
conn.query_drop(&create_whitelist_table)
.expect("Could not create whitelist table.");
Expand Down Expand Up @@ -168,6 +199,30 @@ impl Database for Mysql {
Ok(())
}

/// Refer to [`databases::Database::load_global_number_of_downloads`](crate::core::databases::Database::load_global_number_of_downloads).
fn load_global_number_of_downloads(&self) -> Result<Option<PersistentTorrent>, Error> {
self.load_torrent_aggregate_metric(TORRENTS_DOWNLOADS_TOTAL)
}

/// Refer to [`databases::Database::save_global_number_of_downloads`](crate::core::databases::Database::save_global_number_of_downloads).
fn save_global_number_of_downloads(&self, downloaded: PersistentTorrent) -> Result<(), Error> {
self.save_torrent_aggregate_metric(TORRENTS_DOWNLOADS_TOTAL, downloaded)
}

/// Refer to [`databases::Database::increase_global_number_of_downloads`](crate::core::databases::Database::increase_global_number_of_downloads).
fn increase_global_number_of_downloads(&self) -> Result<(), Error> {
let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?;

let metric_name = TORRENTS_DOWNLOADS_TOTAL;

conn.exec_drop(
"UPDATE torrent_aggregate_metrics SET value = value + 1 WHERE metric_name = :metric_name",
params! { metric_name },
)?;

Ok(())
}

/// Refer to [`databases::Database::load_keys`](crate::core::databases::Database::load_keys).
fn load_keys(&self) -> Result<Vec<authentication::PeerKey>, Error> {
let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?;
Expand Down
Loading