diff --git a/Cargo.lock b/Cargo.lock index 35040f516..feb749d3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4915,7 +4915,6 @@ dependencies = [ "torrust-tracker-clock", "torrust-tracker-configuration", "torrust-tracker-events", - "torrust-tracker-located-error", "torrust-tracker-metrics", "torrust-tracker-primitives", "torrust-tracker-swarm-coordination-registry", diff --git a/packages/tracker-core/src/error.rs b/packages/tracker-core/src/error.rs index 4a35e9a0b..866aa64c5 100644 --- a/packages/tracker-core/src/error.rs +++ b/packages/tracker-core/src/error.rs @@ -84,7 +84,7 @@ pub enum ScrapeError { /// /// This error is returned when an operation involves a torrent that is not /// present in the whitelist. -#[derive(thiserror::Error, Debug, Clone)] +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] pub enum WhitelistError { /// Indicates that the torrent identified by `info_hash` is not whitelisted. #[error("The torrent: {info_hash}, is not whitelisted, {location}")] diff --git a/packages/udp-tracker-core/src/connection_cookie.rs b/packages/udp-tracker-core/src/connection_cookie.rs index 31c116400..ce255705f 100644 --- a/packages/udp-tracker-core/src/connection_cookie.rs +++ b/packages/udp-tracker-core/src/connection_cookie.rs @@ -86,7 +86,7 @@ use zerocopy::AsBytes; use crate::crypto::keys::CipherArrayBlowfish; /// Error returned when there was an error with the connection cookie. -#[derive(Error, Debug, Clone)] +#[derive(Error, Debug, Clone, PartialEq)] pub enum ConnectionCookieError { #[error("cookie value is not normal: {not_normal_value}")] ValueNotNormal { not_normal_value: f64 }, diff --git a/packages/udp-tracker-server/Cargo.toml b/packages/udp-tracker-server/Cargo.toml index 72fa520ba..c0bc94ce3 100644 --- a/packages/udp-tracker-server/Cargo.toml +++ b/packages/udp-tracker-server/Cargo.toml @@ -30,7 +30,6 @@ torrust-server-lib = { version = "3.0.0-develop", path = "../server-lib" } torrust-tracker-clock = { version = "3.0.0-develop", path = "../clock" } torrust-tracker-configuration = { version = "3.0.0-develop", path = "../configuration" } torrust-tracker-events = { version = "3.0.0-develop", path = "../events" } -torrust-tracker-located-error = { version = "3.0.0-develop", path = "../located-error" } torrust-tracker-metrics = { version = "3.0.0-develop", path = "../metrics" } torrust-tracker-primitives = { version = "3.0.0-develop", path = "../primitives" } torrust-tracker-swarm-coordination-registry = { version = "3.0.0-develop", path = "../swarm-coordination-registry" } diff --git a/packages/udp-tracker-server/src/environment.rs b/packages/udp-tracker-server/src/environment.rs index 3f479a02d..268259f1b 100644 --- a/packages/udp-tracker-server/src/environment.rs +++ b/packages/udp-tracker-server/src/environment.rs @@ -82,6 +82,7 @@ impl Environment { let udp_server_event_listener_job = Some(crate::statistics::event::listener::run_event_listener( self.container.udp_tracker_server_container.event_bus.receiver(), &self.container.udp_tracker_server_container.stats_repository, + &self.container.udp_tracker_core_container.ban_service, )); // Start the UDP tracker server diff --git a/packages/udp-tracker-server/src/error.rs b/packages/udp-tracker-server/src/error.rs index 93caf6853..d260ebfd4 100644 --- a/packages/udp-tracker-server/src/error.rs +++ b/packages/udp-tracker-server/src/error.rs @@ -1,59 +1,55 @@ //! Error types for the UDP server. +use std::fmt::Display; use std::panic::Location; -use aquatic_udp_protocol::{ConnectionId, RequestParseError}; +use aquatic_udp_protocol::{ConnectionId, RequestParseError, TransactionId}; use bittorrent_udp_tracker_core::services::announce::UdpAnnounceError; use bittorrent_udp_tracker_core::services::scrape::UdpScrapeError; use derive_more::derive::Display; use thiserror::Error; -use torrust_tracker_located_error::LocatedError; #[derive(Display, Debug)] #[display(":?")] pub struct ConnectionCookie(pub ConnectionId); /// Error returned by the UDP server. -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] pub enum Error { /// Error returned when the request is invalid. - #[error("error when phrasing request: {request_parse_error:?}")] - RequestParseError { request_parse_error: RequestParseError }, + #[error("error parsing request: {request_parse_error:?}")] + InvalidRequest { request_parse_error: SendableRequestParseError }, /// Error returned when the domain tracker returns an announce error. #[error("tracker announce error: {source}")] - UdpAnnounceError { source: UdpAnnounceError }, + AnnounceFailed { source: UdpAnnounceError }, /// Error returned when the domain tracker returns an scrape error. #[error("tracker scrape error: {source}")] - UdpScrapeError { source: UdpScrapeError }, + ScrapeFailed { source: UdpScrapeError }, /// Error returned from a third-party library (`aquatic_udp_protocol`). #[error("internal server error: {message}, {location}")] - InternalServer { + Internal { location: &'static Location<'static>, message: String, }, - /// Error returned when the request is invalid. - #[error("bad request: {source}")] - BadRequest { - source: LocatedError<'static, dyn std::error::Error + Send + Sync>, - }, - /// Error returned when tracker requires authentication. #[error("domain tracker requires authentication but is not supported in current UDP implementation. Location: {location}")] - TrackerAuthenticationRequired { location: &'static Location<'static> }, + AuthRequired { location: &'static Location<'static> }, } impl From for Error { fn from(request_parse_error: RequestParseError) -> Self { - Self::RequestParseError { request_parse_error } + Self::InvalidRequest { + request_parse_error: request_parse_error.into(), + } } } impl From for Error { fn from(udp_announce_error: UdpAnnounceError) -> Self { - Self::UdpAnnounceError { + Self::AnnounceFailed { source: udp_announce_error, } } @@ -61,8 +57,44 @@ impl From for Error { impl From for Error { fn from(udp_scrape_error: UdpScrapeError) -> Self { - Self::UdpScrapeError { + Self::ScrapeFailed { source: udp_scrape_error, } } } + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct SendableRequestParseError { + pub message: String, + pub opt_connection_id: Option, + pub opt_transaction_id: Option, +} + +impl Display for SendableRequestParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "SendableRequestParseError: message: {}, connection_id: {:?}, transaction_id: {:?}", + self.message, self.opt_connection_id, self.opt_transaction_id + ) + } +} + +impl From for SendableRequestParseError { + fn from(request_parse_error: RequestParseError) -> Self { + let (message, opt_connection_id, opt_transaction_id) = match request_parse_error { + RequestParseError::Sendable { + connection_id, + transaction_id, + err, + } => ((*err).to_string(), Some(connection_id), Some(transaction_id)), + RequestParseError::Unsendable { err } => (err.to_string(), None, None), + }; + + Self { + message, + opt_connection_id, + opt_transaction_id, + } + } +} diff --git a/packages/udp-tracker-server/src/event.rs b/packages/udp-tracker-server/src/event.rs index 8aabd7ffb..4fa29940e 100644 --- a/packages/udp-tracker-server/src/event.rs +++ b/packages/udp-tracker-server/src/event.rs @@ -2,12 +2,17 @@ use std::fmt; use std::net::SocketAddr; use std::time::Duration; +use bittorrent_tracker_core::error::{AnnounceError, ScrapeError}; +use bittorrent_udp_tracker_core::services::announce::UdpAnnounceError; +use bittorrent_udp_tracker_core::services::scrape::UdpScrapeError; use torrust_tracker_metrics::label::{LabelSet, LabelValue}; use torrust_tracker_metrics::label_name; use torrust_tracker_primitives::service_binding::ServiceBinding; +use crate::error::Error; + /// A UDP server event. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum Event { UdpRequestReceived { context: ConnectionContext, @@ -30,6 +35,7 @@ pub enum Event { UdpError { context: ConnectionContext, kind: Option, + error: ErrorKind, }, } @@ -109,6 +115,42 @@ impl From for LabelSet { } } +#[derive(Debug, Clone, PartialEq)] +pub enum ErrorKind { + RequestParse(String), + ConnectionCookie(String), + Whitelist(String), + Database(String), + InternalServer(String), + BadRequest(String), + TrackerAuthentication(String), +} + +impl From for ErrorKind { + fn from(error: Error) -> Self { + match error { + Error::InvalidRequest { request_parse_error } => Self::RequestParse(request_parse_error.to_string()), + Error::AnnounceFailed { source } => match source { + UdpAnnounceError::ConnectionCookieError { source } => Self::ConnectionCookie(source.to_string()), + UdpAnnounceError::TrackerCoreAnnounceError { source } => match source { + AnnounceError::Whitelist(whitelist_error) => Self::Whitelist(whitelist_error.to_string()), + AnnounceError::Database(error) => Self::Database(error.to_string()), + }, + UdpAnnounceError::TrackerCoreWhitelistError { source } => Self::Whitelist(source.to_string()), + }, + Error::ScrapeFailed { source } => match source { + UdpScrapeError::ConnectionCookieError { source } => Self::ConnectionCookie(source.to_string()), + UdpScrapeError::TrackerCoreScrapeError { source } => match source { + ScrapeError::Whitelist(whitelist_error) => Self::Whitelist(whitelist_error.to_string()), + }, + UdpScrapeError::TrackerCoreWhitelistError { source } => Self::Whitelist(source.to_string()), + }, + Error::Internal { location: _, message } => Self::InternalServer(message.to_string()), + Error::AuthRequired { location } => Self::TrackerAuthentication(location.to_string()), + } + } +} + pub mod sender { use std::sync::Arc; diff --git a/packages/udp-tracker-server/src/handlers/error.rs b/packages/udp-tracker-server/src/handlers/error.rs index 6259e26ca..7fb4141b2 100644 --- a/packages/udp-tracker-server/src/handlers/error.rs +++ b/packages/udp-tracker-server/src/handlers/error.rs @@ -2,8 +2,7 @@ use std::net::SocketAddr; use std::ops::Range; -use aquatic_udp_protocol::{ErrorResponse, RequestParseError, Response, TransactionId}; -use bittorrent_udp_tracker_core::connection_cookie::{check, gen_remote_fingerprint}; +use aquatic_udp_protocol::{ErrorResponse, Response, TransactionId}; use bittorrent_udp_tracker_core::{self, UDP_TRACKER_LOG_TARGET}; use torrust_tracker_primitives::service_binding::ServiceBinding; use tracing::{instrument, Level}; @@ -22,55 +21,62 @@ pub async fn handle_error( request_id: Uuid, opt_udp_server_stats_event_sender: &crate::event::sender::Sender, cookie_valid_range: Range, - e: &Error, - transaction_id: Option, + error: &Error, + opt_transaction_id: Option, ) -> Response { tracing::trace!("handle error"); let server_socket_addr = server_service_binding.bind_address(); - match transaction_id { + log_error(error, client_socket_addr, server_socket_addr, opt_transaction_id, request_id); + + trigger_udp_error_event( + error, + client_socket_addr, + server_service_binding, + opt_udp_server_stats_event_sender, + req_kind, + ) + .await; + + Response::from(ErrorResponse { + transaction_id: opt_transaction_id.unwrap_or(TransactionId(I32::new(0))), + message: error.to_string().into(), + }) +} + +fn log_error( + error: &Error, + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, + opt_transaction_id: Option, + request_id: Uuid, +) { + match opt_transaction_id { Some(transaction_id) => { let transaction_id = transaction_id.0.to_string(); - tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %client_socket_addr, %server_socket_addr, %request_id, %transaction_id, "response error"); + tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %error, %client_socket_addr, %server_socket_addr, %request_id, %transaction_id, "response error"); } None => { - tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %client_socket_addr, %server_socket_addr, %request_id, "response error"); + tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %error, %client_socket_addr, %server_socket_addr, %request_id, "response error"); } } +} - let e = if let Error::RequestParseError { request_parse_error } = e { - match request_parse_error { - RequestParseError::Sendable { - connection_id, - transaction_id, - err, - } => { - if let Err(e) = check(connection_id, gen_remote_fingerprint(&client_socket_addr), cookie_valid_range) { - (e.to_string(), Some(*transaction_id)) - } else { - ((*err).to_string(), Some(*transaction_id)) - } - } - RequestParseError::Unsendable { err } => (err.to_string(), transaction_id), - } - } else { - (e.to_string(), transaction_id) - }; - - if e.1.is_some() { - if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { - udp_server_stats_event_sender - .send(Event::UdpError { - context: ConnectionContext::new(client_socket_addr, server_service_binding), - kind: req_kind, - }) - .await; - } +async fn trigger_udp_error_event( + error: &Error, + client_socket_addr: SocketAddr, + server_service_binding: ServiceBinding, + opt_udp_server_stats_event_sender: &crate::event::sender::Sender, + req_kind: Option, +) { + if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { + udp_server_stats_event_sender + .send(Event::UdpError { + context: ConnectionContext::new(client_socket_addr, server_service_binding), + kind: req_kind, + error: error.clone().into(), + }) + .await; } - - Response::from(ErrorResponse { - transaction_id: e.1.unwrap_or(TransactionId(I32::new(0))), - message: e.0.into(), - }) } diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs index df550ab72..c1125b97f 100644 --- a/packages/udp-tracker-server/src/handlers/mod.rs +++ b/packages/udp-tracker-server/src/handlers/mod.rs @@ -13,7 +13,6 @@ use announce::handle_announce; use aquatic_udp_protocol::{Request, Response, TransactionId}; use bittorrent_tracker_core::MAX_SCRAPE_TORRENTS; use bittorrent_udp_tracker_core::container::UdpTrackerCoreContainer; -use bittorrent_udp_tracker_core::services::announce::UdpAnnounceError; use connect::handle_connect; use error::handle_error; use scrape::handle_scrape; @@ -84,15 +83,6 @@ pub(crate) async fn handle_packet( { Ok((response, req_kid)) => return (response, Some(req_kid)), Err((error, transaction_id, req_kind)) => { - if let Error::UdpAnnounceError { - source: UdpAnnounceError::ConnectionCookieError { .. }, - } = error - { - // code-review: should we include `RequestParseError` and `BadRequest`? - let mut ban_service = udp_tracker_core_container.ban_service.write().await; - ban_service.increase_counter(&udp_request.from.ip()); - } - let response = handle_error( Some(req_kind.clone()), udp_request.from, @@ -109,6 +99,14 @@ pub(crate) async fn handle_packet( } }, Err(e) => { + // The request payload could not be parsed, so we handle it as an error. + + let opt_transaction_id = if let Error::InvalidRequest { request_parse_error } = e.clone() { + request_parse_error.opt_transaction_id + } else { + None + }; + let response = handle_error( None, udp_request.from, @@ -117,7 +115,7 @@ pub(crate) async fn handle_packet( &udp_tracker_server_container.stats_event_sender, cookie_time_values.valid_range.clone(), &e, - None, + opt_transaction_id, ) .await; diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs deleted file mode 100644 index 1e1502339..000000000 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ /dev/null @@ -1,663 +0,0 @@ -use torrust_tracker_metrics::label::{LabelSet, LabelValue}; -use torrust_tracker_metrics::{label_name, metric_name}; -use torrust_tracker_primitives::DurationSinceUnixEpoch; - -use crate::event::{Event, UdpRequestKind, UdpResponseKind}; -use crate::statistics::repository::Repository; -use crate::statistics::{ - UDP_TRACKER_SERVER_ERRORS_TOTAL, UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, - UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL, - UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL, - UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL, -}; - -/// # Panics -/// -/// This function panics if the client IP version does not match the expected -/// version. -#[allow(clippy::too_many_lines)] -pub async fn handle_event(event: Event, stats_repository: &Repository, now: DurationSinceUnixEpoch) { - match event { - Event::UdpRequestAborted { context } => { - // Global fixed metrics - stats_repository.increase_udp_requests_aborted().await; - - // Extendable metrics - match stats_repository - .increase_counter( - &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL), - &LabelSet::from(context), - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; - } - Event::UdpRequestBanned { context } => { - // Global fixed metrics - stats_repository.increase_udp_requests_banned().await; - - // Extendable metrics - match stats_repository - .increase_counter( - &metric_name!(UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL), - &LabelSet::from(context), - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; - } - Event::UdpRequestReceived { context } => { - // Global fixed metrics - match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_requests().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_requests().await; - } - } - - // Extendable metrics - match stats_repository - .increase_counter( - &metric_name!(UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL), - &LabelSet::from(context), - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; - } - Event::UdpRequestAccepted { context, kind } => { - // Global fixed metrics - match kind { - UdpRequestKind::Connect => match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_connections().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_connections().await; - } - }, - UdpRequestKind::Announce => match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_announces().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_announces().await; - } - }, - UdpRequestKind::Scrape => match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_scrapes().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_scrapes().await; - } - }, - } - - // Extendable metrics - - let mut label_set = LabelSet::from(context); - - label_set.upsert(label_name!("request_kind"), LabelValue::new(&kind.to_string())); - - match stats_repository - .increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &label_set, now) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; - } - Event::UdpResponseSent { - context, - kind, - req_processing_time, - } => { - // Global fixed metrics - match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_responses().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_responses().await; - } - } - - let (result_label_value, kind_label_value) = match kind { - UdpResponseKind::Ok { req_kind } => match req_kind { - UdpRequestKind::Connect => { - let new_avg = stats_repository - .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) - .await; - - // Extendable metrics - - let mut label_set = LabelSet::from(context.clone()); - label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } - - (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Connect.to_string())) - } - UdpRequestKind::Announce => { - let new_avg = stats_repository - .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) - .await; - - // Extendable metrics - - let mut label_set = LabelSet::from(context.clone()); - label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } - - (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Announce.to_string())) - } - UdpRequestKind::Scrape => { - let new_avg = stats_repository - .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) - .await; - - // Extendable metrics - - let mut label_set = LabelSet::from(context.clone()); - label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } - - (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Scrape.to_string())) - } - }, - UdpResponseKind::Error { opt_req_kind: _ } => (LabelValue::new("error"), LabelValue::ignore()), - }; - - // Extendable metrics - - let mut label_set = LabelSet::from(context); - - if result_label_value == LabelValue::new("ok") { - label_set.upsert(label_name!("request_kind"), kind_label_value); - } - label_set.upsert(label_name!("result"), result_label_value); - - match stats_repository - .increase_counter(&metric_name!(UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL), &label_set, now) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; - } - Event::UdpError { context, kind } => { - // Global fixed metrics - match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_errors().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_errors().await; - } - } - - // Extendable metrics - - let mut label_set = LabelSet::from(context); - - if let Some(kind) = kind { - label_set.upsert(label_name!("request_kind"), kind.to_string().into()); - } - - match stats_repository - .increase_counter(&metric_name!(UDP_TRACKER_SERVER_ERRORS_TOTAL), &label_set, now) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; - } - } - - tracing::debug!("stats: {:?}", stats_repository.get_stats().await); -} - -#[cfg(test)] -mod tests { - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; - - use torrust_tracker_clock::clock::Time; - use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; - - use crate::event::{ConnectionContext, Event, UdpRequestKind}; - use crate::statistics::event::handler::handle_event; - use crate::statistics::repository::Repository; - use crate::CurrentClock; - - #[tokio::test] - async fn should_increase_the_number_of_aborted_requests_when_it_receives_a_udp_request_aborted_event() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpRequestAborted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp_requests_aborted, 1); - } - - #[tokio::test] - async fn should_increase_the_number_of_banned_requests_when_it_receives_a_udp_request_banned_event() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpRequestBanned { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp_requests_banned, 1); - } - - #[tokio::test] - async fn should_increase_the_number_of_incoming_requests_when_it_receives_a_udp4_incoming_request_event() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpRequestReceived { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_requests, 1); - } - - #[tokio::test] - async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpRequestAborted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp_requests_aborted, 1); - } - #[tokio::test] - async fn should_increase_the_udp_ban_counter_when_it_receives_a_udp_banned_event() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpRequestBanned { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp_requests_banned, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_connect_requests_counter_when_it_receives_a_udp4_request_event_of_connect_kind() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Connect, - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_connections_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_announce_requests_counter_when_it_receives_a_udp4_request_event_of_announce_kind() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Announce, - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_announces_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_scrape_requests_counter_when_it_receives_a_udp4_request_event_of_scrape_kind() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Scrape, - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_scrapes_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_responses_counter_when_it_receives_a_udp4_response_event() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpResponseSent { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpResponseKind::Ok { - req_kind: UdpRequestKind::Announce, - }, - req_processing_time: std::time::Duration::from_secs(1), - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_responses, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpError { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: None, - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_errors_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp6_connect_requests_counter_when_it_receives_a_udp6_request_event_of_connect_kind() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Connect, - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_connections_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp6_announce_requests_counter_when_it_receives_a_udp6_request_event_of_announce_kind() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Announce, - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_announces_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp6_scrape_requests_counter_when_it_receives_a_udp6_request_event_of_scrape_kind() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Scrape, - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_scrapes_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp6_response_counter_when_it_receives_a_udp6_response_event() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpResponseSent { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpResponseKind::Ok { - req_kind: UdpRequestKind::Announce, - }, - req_processing_time: std::time::Duration::from_secs(1), - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_responses, 1); - } - #[tokio::test] - async fn should_increase_the_udp6_errors_counter_when_it_receives_a_udp6_error_event() { - let stats_repository = Repository::new(); - - handle_event( - Event::UdpError { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: None, - }, - &stats_repository, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_errors_handled, 1); - } -} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/error.rs b/packages/udp-tracker-server/src/statistics/event/handler/error.rs new file mode 100644 index 000000000..e1023a56b --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/error.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; + +use bittorrent_udp_tracker_core::services::banning::BanService; +use tokio::sync::RwLock; +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::{label_name, metric_name}; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::{ConnectionContext, ErrorKind, UdpRequestKind}; +use crate::statistics::repository::Repository; +use crate::statistics::UDP_TRACKER_SERVER_ERRORS_TOTAL; + +pub async fn handle_event( + context: ConnectionContext, + kind: Option, + error: ErrorKind, + stats_repository: &Repository, + ban_service: &Arc>, + now: DurationSinceUnixEpoch, +) { + // Increase the number of errors + // code-review: should we ban IP due to other errors too? + if let ErrorKind::ConnectionCookie(_msg) = error { + let mut ban_service = ban_service.write().await; + ban_service.increase_counter(&context.client_socket_addr().ip()); + } + + // Global fixed metrics + match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_errors().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_errors().await; + } + } + + // Extendable metrics + let mut label_set = LabelSet::from(context); + if let Some(kind) = kind { + label_set.upsert(label_name!("request_kind"), kind.to_string().into()); + } + match stats_repository + .increase_counter(&metric_name!(UDP_TRACKER_SERVER_ERRORS_TOTAL), &label_set, now) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event}; + use crate::statistics::event::handler::error::ErrorKind; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpError { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: None, + error: ErrorKind::RequestParse("Invalid request format".to_string()), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_errors_handled, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/mod.rs b/packages/udp-tracker-server/src/statistics/event/handler/mod.rs new file mode 100644 index 000000000..c8ac864a3 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/mod.rs @@ -0,0 +1,49 @@ +mod error; +mod request_aborted; +mod request_accepted; +mod request_banned; +mod request_received; +mod response_sent; + +use std::sync::Arc; + +use bittorrent_udp_tracker_core::services::banning::BanService; +use tokio::sync::RwLock; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::Event; +use crate::statistics::repository::Repository; + +pub async fn handle_event( + event: Event, + stats_repository: &Repository, + ban_service: &Arc>, + now: DurationSinceUnixEpoch, +) { + match event { + Event::UdpRequestAborted { context } => { + request_aborted::handle_event(context, stats_repository, now).await; + } + Event::UdpRequestBanned { context } => { + request_banned::handle_event(context, stats_repository, now).await; + } + Event::UdpRequestReceived { context } => { + request_received::handle_event(context, stats_repository, now).await; + } + Event::UdpRequestAccepted { context, kind } => { + request_accepted::handle_event(context, kind, stats_repository, now).await; + } + Event::UdpResponseSent { + context, + kind, + req_processing_time, + } => { + response_sent::handle_event(context, kind, req_processing_time, stats_repository, now).await; + } + Event::UdpError { context, kind, error } => { + error::handle_event(context, kind, error, stats_repository, ban_service, now).await; + } + } + + tracing::debug!("stats: {:?}", stats_repository.get_stats().await); +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs new file mode 100644 index 000000000..270ec2a45 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs @@ -0,0 +1,92 @@ +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::metric_name; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::ConnectionContext; +use crate::statistics::repository::Repository; +use crate::statistics::UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL; + +pub async fn handle_event(context: ConnectionContext, stats_repository: &Repository, now: DurationSinceUnixEpoch) { + // Global fixed metrics + stats_repository.increase_udp_requests_aborted().await; + + // Extendable metrics + match stats_repository + .increase_counter( + &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL), + &LabelSet::from(context), + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event}; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_number_of_aborted_requests_when_it_receives_a_udp_request_aborted_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAborted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp_requests_aborted, 1); + } + + #[tokio::test] + async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAborted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + let stats = stats_repository.get_stats().await; + assert_eq!(stats.udp_requests_aborted, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_accepted.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_accepted.rs new file mode 100644 index 000000000..25c1311e5 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_accepted.rs @@ -0,0 +1,236 @@ +use torrust_tracker_metrics::label::{LabelSet, LabelValue}; +use torrust_tracker_metrics::{label_name, metric_name}; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::{ConnectionContext, UdpRequestKind}; +use crate::statistics::repository::Repository; +use crate::statistics::UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL; + +pub async fn handle_event( + context: ConnectionContext, + kind: UdpRequestKind, + stats_repository: &Repository, + now: DurationSinceUnixEpoch, +) { + // Global fixed metrics + match kind { + UdpRequestKind::Connect => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_connections().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_connections().await; + } + }, + UdpRequestKind::Announce => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_announces().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_announces().await; + } + }, + UdpRequestKind::Scrape => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_scrapes().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_scrapes().await; + } + }, + } + + // Extendable metrics + let mut label_set = LabelSet::from(context); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&kind.to_string())); + match stats_repository + .increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &label_set, now) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event}; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_udp4_connect_requests_counter_when_it_receives_a_udp4_request_event_of_connect_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Connect, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_announce_requests_counter_when_it_receives_a_udp4_request_event_of_announce_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Announce, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_scrape_requests_counter_when_it_receives_a_udp4_request_event_of_scrape_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Scrape, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_scrapes_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_connect_requests_counter_when_it_receives_a_udp6_request_event_of_connect_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Connect, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_announce_requests_counter_when_it_receives_a_udp6_request_event_of_announce_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Announce, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_scrape_requests_counter_when_it_receives_a_udp6_request_event_of_scrape_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Scrape, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_scrapes_handled, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_banned.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_banned.rs new file mode 100644 index 000000000..74641574a --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_banned.rs @@ -0,0 +1,92 @@ +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::metric_name; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::ConnectionContext; +use crate::statistics::repository::Repository; +use crate::statistics::UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL; + +pub async fn handle_event(context: ConnectionContext, stats_repository: &Repository, now: DurationSinceUnixEpoch) { + // Global fixed metrics + stats_repository.increase_udp_requests_banned().await; + + // Extendable metrics + match stats_repository + .increase_counter( + &metric_name!(UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL), + &LabelSet::from(context), + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event}; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_number_of_banned_requests_when_it_receives_a_udp_request_banned_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestBanned { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp_requests_banned, 1); + } + + #[tokio::test] + async fn should_increase_the_udp_ban_counter_when_it_receives_a_udp_banned_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestBanned { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + let stats = stats_repository.get_stats().await; + assert_eq!(stats.udp_requests_banned, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_received.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_received.rs new file mode 100644 index 000000000..8333258c2 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_received.rs @@ -0,0 +1,74 @@ +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::metric_name; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::ConnectionContext; +use crate::statistics::repository::Repository; +use crate::statistics::UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL; + +pub async fn handle_event(context: ConnectionContext, stats_repository: &Repository, now: DurationSinceUnixEpoch) { + // Global fixed metrics + match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_requests().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_requests().await; + } + } + + // Extendable metrics + match stats_repository + .increase_counter( + &metric_name!(UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL), + &LabelSet::from(context), + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event}; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_number_of_incoming_requests_when_it_receives_a_udp4_incoming_request_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestReceived { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_requests, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs b/packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs new file mode 100644 index 000000000..a69184e08 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs @@ -0,0 +1,182 @@ +use torrust_tracker_metrics::label::{LabelSet, LabelValue}; +use torrust_tracker_metrics::{label_name, metric_name}; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::{ConnectionContext, UdpRequestKind, UdpResponseKind}; +use crate::statistics::repository::Repository; +use crate::statistics::{UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL}; + +pub async fn handle_event( + context: ConnectionContext, + kind: UdpResponseKind, + req_processing_time: std::time::Duration, + stats_repository: &Repository, + now: DurationSinceUnixEpoch, +) { + // Global fixed metrics + match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_responses().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_responses().await; + } + } + + let (result_label_value, kind_label_value) = match kind { + UdpResponseKind::Ok { req_kind } => match req_kind { + UdpRequestKind::Connect => { + let new_avg = stats_repository + .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) + .await; + let mut label_set = LabelSet::from(context.clone()); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); + match stats_repository + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &label_set, + new_avg, + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to set gauge: {}", err), + } + (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Connect.to_string())) + } + UdpRequestKind::Announce => { + let new_avg = stats_repository + .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) + .await; + let mut label_set = LabelSet::from(context.clone()); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); + match stats_repository + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &label_set, + new_avg, + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to set gauge: {}", err), + } + (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Announce.to_string())) + } + UdpRequestKind::Scrape => { + let new_avg = stats_repository + .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) + .await; + let mut label_set = LabelSet::from(context.clone()); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); + match stats_repository + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &label_set, + new_avg, + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to set gauge: {}", err), + } + (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Scrape.to_string())) + } + }, + UdpResponseKind::Error { opt_req_kind: _ } => (LabelValue::new("error"), LabelValue::ignore()), + }; + + // Extendable metrics + let mut label_set = LabelSet::from(context); + if result_label_value == LabelValue::new("ok") { + label_set.upsert(label_name!("request_kind"), kind_label_value); + } + label_set.upsert(label_name!("result"), result_label_value); + match stats_repository + .increase_counter(&metric_name!(UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL), &label_set, now) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event, UdpRequestKind}; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_udp4_responses_counter_when_it_receives_a_udp4_response_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpResponseSent { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpResponseKind::Ok { + req_kind: UdpRequestKind::Announce, + }, + req_processing_time: std::time::Duration::from_secs(1), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_responses, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_response_counter_when_it_receives_a_udp6_response_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpResponseSent { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpResponseKind::Ok { + req_kind: UdpRequestKind::Announce, + }, + req_processing_time: std::time::Duration::from_secs(1), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_responses, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/listener.rs b/packages/udp-tracker-server/src/statistics/event/listener.rs index d805cc87f..e6c9a85ce 100644 --- a/packages/udp-tracker-server/src/statistics/event/listener.rs +++ b/packages/udp-tracker-server/src/statistics/event/listener.rs @@ -1,6 +1,8 @@ use std::sync::Arc; +use bittorrent_udp_tracker_core::services::banning::BanService; use bittorrent_udp_tracker_core::UDP_TRACKER_LOG_TARGET; +use tokio::sync::RwLock; use tokio::task::JoinHandle; use torrust_tracker_clock::clock::Time; use torrust_tracker_events::receiver::RecvError; @@ -11,19 +13,24 @@ use crate::statistics::repository::Repository; use crate::CurrentClock; #[must_use] -pub fn run_event_listener(receiver: Receiver, repository: &Arc) -> JoinHandle<()> { - let stats_repository = repository.clone(); +pub fn run_event_listener( + receiver: Receiver, + repository: &Arc, + ban_service: &Arc>, +) -> JoinHandle<()> { + let repository_clone = repository.clone(); + let ban_service_clone = ban_service.clone(); tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting UDP tracker server event listener"); tokio::spawn(async move { - dispatch_events(receiver, stats_repository).await; + dispatch_events(receiver, repository_clone, ban_service_clone).await; tracing::info!(target: UDP_TRACKER_LOG_TARGET, "UDP tracker server event listener finished"); }) } -async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc) { +async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc, ban_service: Arc>) { let shutdown_signal = tokio::signal::ctrl_c(); tokio::pin!(shutdown_signal); @@ -38,7 +45,7 @@ async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc { match result { - Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await, + Ok(event) => handle_event(event, &stats_repository, &ban_service, CurrentClock::now()).await, Err(e) => { match e { RecvError::Closed => { diff --git a/packages/udp-tracker-server/tests/server/contract.rs b/packages/udp-tracker-server/tests/server/contract.rs index 860fd1f0b..04ad0f39d 100644 --- a/packages/udp-tracker-server/tests/server/contract.rs +++ b/packages/udp-tracker-server/tests/server/contract.rs @@ -59,7 +59,9 @@ async fn should_return_a_bad_request_response_when_the_client_sends_an_empty_req let response = Response::parse_bytes(&response, true).unwrap(); - assert_eq!(get_error_response_message(&response).unwrap(), "Protocol identifier missing"); + assert!(get_error_response_message(&response) + .unwrap() + .contains("Protocol identifier missing")); env.stop().await; } diff --git a/src/bootstrap/jobs/udp_tracker_server.rs b/src/bootstrap/jobs/udp_tracker_server.rs index 42ac2d03e..8a4c2a273 100644 --- a/src/bootstrap/jobs/udp_tracker_server.rs +++ b/src/bootstrap/jobs/udp_tracker_server.rs @@ -10,6 +10,7 @@ pub fn start_event_listener(config: &Configuration, app_container: &Arc