From 52b9660eeb6172fc4a03285751d9fe201eaca7a4 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 2 Jun 2025 13:49:09 +0100 Subject: [PATCH 01/11] feat: [#1456] wrapper over aquatic RequestParseError to make it sendable The error will be included in the UdpError event ans sent via tokio channel. --- packages/udp-tracker-server/src/error.rs | 34 +++++++++++++++++-- .../udp-tracker-server/src/handlers/error.rs | 20 +++-------- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/packages/udp-tracker-server/src/error.rs b/packages/udp-tracker-server/src/error.rs index 93caf6853..6a63a4c9a 100644 --- a/packages/udp-tracker-server/src/error.rs +++ b/packages/udp-tracker-server/src/error.rs @@ -1,7 +1,7 @@ //! Error types for the UDP server. use std::panic::Location; -use aquatic_udp_protocol::{ConnectionId, RequestParseError}; +use aquatic_udp_protocol::{ConnectionId, RequestParseError, TransactionId}; use bittorrent_udp_tracker_core::services::announce::UdpAnnounceError; use bittorrent_udp_tracker_core::services::scrape::UdpScrapeError; use derive_more::derive::Display; @@ -17,7 +17,7 @@ pub struct ConnectionCookie(pub ConnectionId); pub enum Error { /// Error returned when the request is invalid. #[error("error when phrasing request: {request_parse_error:?}")] - RequestParseError { request_parse_error: RequestParseError }, + RequestParseError { request_parse_error: SendableRequestParseError }, /// Error returned when the domain tracker returns an announce error. #[error("tracker announce error: {source}")] @@ -47,7 +47,9 @@ pub enum Error { impl From for Error { fn from(request_parse_error: RequestParseError) -> Self { - Self::RequestParseError { request_parse_error } + Self::RequestParseError { + request_parse_error: request_parse_error.into(), + } } } @@ -66,3 +68,29 @@ impl From for Error { } } } + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct SendableRequestParseError { + pub message: String, + pub opt_connection_id: Option, + pub opt_transaction_id: Option, +} + +impl From for SendableRequestParseError { + fn from(request_parse_error: RequestParseError) -> Self { + let (message, opt_connection_id, opt_transaction_id) = match request_parse_error { + RequestParseError::Sendable { + connection_id, + transaction_id, + err, + } => ((*err).to_string(), Some(connection_id), Some(transaction_id)), + RequestParseError::Unsendable { err } => (err.to_string(), None, None), + }; + + Self { + message, + opt_connection_id, + opt_transaction_id, + } + } +} diff --git a/packages/udp-tracker-server/src/handlers/error.rs b/packages/udp-tracker-server/src/handlers/error.rs index 6259e26ca..7b477d84f 100644 --- a/packages/udp-tracker-server/src/handlers/error.rs +++ b/packages/udp-tracker-server/src/handlers/error.rs @@ -2,8 +2,7 @@ use std::net::SocketAddr; use std::ops::Range; -use aquatic_udp_protocol::{ErrorResponse, RequestParseError, Response, TransactionId}; -use bittorrent_udp_tracker_core::connection_cookie::{check, gen_remote_fingerprint}; +use aquatic_udp_protocol::{ErrorResponse, Response, TransactionId}; use bittorrent_udp_tracker_core::{self, UDP_TRACKER_LOG_TARGET}; use torrust_tracker_primitives::service_binding::ServiceBinding; use tracing::{instrument, Level}; @@ -40,25 +39,14 @@ pub async fn handle_error( } let e = if let Error::RequestParseError { request_parse_error } = e { - match request_parse_error { - RequestParseError::Sendable { - connection_id, - transaction_id, - err, - } => { - if let Err(e) = check(connection_id, gen_remote_fingerprint(&client_socket_addr), cookie_valid_range) { - (e.to_string(), Some(*transaction_id)) - } else { - ((*err).to_string(), Some(*transaction_id)) - } - } - RequestParseError::Unsendable { err } => (err.to_string(), transaction_id), - } + (request_parse_error.message.clone(), transaction_id) } else { (e.to_string(), transaction_id) }; if e.1.is_some() { + // code-review: why we trigger an event only if transaction_id is present? + if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender .send(Event::UdpError { From 8f3c22aaa3bbdb643545af72c48e27499f3a283c Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 2 Jun 2025 16:29:27 +0100 Subject: [PATCH 02/11] feat: [#1456] expose error kind in the UdpError event Not exposing the original complex error type becuase: - It's too complex. - It forces all errors to be "Sent", "PartialEq". - It would expose a lot of internals. --- packages/tracker-core/src/error.rs | 2 +- .../udp-tracker-core/src/connection_cookie.rs | 2 +- packages/udp-tracker-server/src/error.rs | 13 +++++- packages/udp-tracker-server/src/event.rs | 45 ++++++++++++++++++- .../udp-tracker-server/src/handlers/error.rs | 11 ++--- .../src/statistics/event/handler.rs | 6 ++- 6 files changed, 68 insertions(+), 11 deletions(-) diff --git a/packages/tracker-core/src/error.rs b/packages/tracker-core/src/error.rs index 4a35e9a0b..866aa64c5 100644 --- a/packages/tracker-core/src/error.rs +++ b/packages/tracker-core/src/error.rs @@ -84,7 +84,7 @@ pub enum ScrapeError { /// /// This error is returned when an operation involves a torrent that is not /// present in the whitelist. -#[derive(thiserror::Error, Debug, Clone)] +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] pub enum WhitelistError { /// Indicates that the torrent identified by `info_hash` is not whitelisted. #[error("The torrent: {info_hash}, is not whitelisted, {location}")] diff --git a/packages/udp-tracker-core/src/connection_cookie.rs b/packages/udp-tracker-core/src/connection_cookie.rs index 31c116400..ce255705f 100644 --- a/packages/udp-tracker-core/src/connection_cookie.rs +++ b/packages/udp-tracker-core/src/connection_cookie.rs @@ -86,7 +86,7 @@ use zerocopy::AsBytes; use crate::crypto::keys::CipherArrayBlowfish; /// Error returned when there was an error with the connection cookie. -#[derive(Error, Debug, Clone)] +#[derive(Error, Debug, Clone, PartialEq)] pub enum ConnectionCookieError { #[error("cookie value is not normal: {not_normal_value}")] ValueNotNormal { not_normal_value: f64 }, diff --git a/packages/udp-tracker-server/src/error.rs b/packages/udp-tracker-server/src/error.rs index 6a63a4c9a..d45b96569 100644 --- a/packages/udp-tracker-server/src/error.rs +++ b/packages/udp-tracker-server/src/error.rs @@ -1,4 +1,5 @@ //! Error types for the UDP server. +use std::fmt::Display; use std::panic::Location; use aquatic_udp_protocol::{ConnectionId, RequestParseError, TransactionId}; @@ -13,7 +14,7 @@ use torrust_tracker_located_error::LocatedError; pub struct ConnectionCookie(pub ConnectionId); /// Error returned by the UDP server. -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] pub enum Error { /// Error returned when the request is invalid. #[error("error when phrasing request: {request_parse_error:?}")] @@ -76,6 +77,16 @@ pub struct SendableRequestParseError { pub opt_transaction_id: Option, } +impl Display for SendableRequestParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "SendableRequestParseError: message: {}, connection_id: {:?}, transaction_id: {:?}", + self.message, self.opt_connection_id, self.opt_transaction_id + ) + } +} + impl From for SendableRequestParseError { fn from(request_parse_error: RequestParseError) -> Self { let (message, opt_connection_id, opt_transaction_id) = match request_parse_error { diff --git a/packages/udp-tracker-server/src/event.rs b/packages/udp-tracker-server/src/event.rs index 8aabd7ffb..4d3646563 100644 --- a/packages/udp-tracker-server/src/event.rs +++ b/packages/udp-tracker-server/src/event.rs @@ -2,12 +2,17 @@ use std::fmt; use std::net::SocketAddr; use std::time::Duration; +use bittorrent_tracker_core::error::{AnnounceError, ScrapeError}; +use bittorrent_udp_tracker_core::services::announce::UdpAnnounceError; +use bittorrent_udp_tracker_core::services::scrape::UdpScrapeError; use torrust_tracker_metrics::label::{LabelSet, LabelValue}; use torrust_tracker_metrics::label_name; use torrust_tracker_primitives::service_binding::ServiceBinding; +use crate::error::Error; + /// A UDP server event. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum Event { UdpRequestReceived { context: ConnectionContext, @@ -30,6 +35,7 @@ pub enum Event { UdpError { context: ConnectionContext, kind: Option, + error: ErrorKind, }, } @@ -109,6 +115,43 @@ impl From for LabelSet { } } +#[derive(Debug, Clone, PartialEq)] +pub enum ErrorKind { + RequestParse(String), + ConnectionCookie(String), + Whitelist(String), + Database(String), + InternalServer(String), + BadRequest(String), + TrackerAuthentication(String), +} + +impl From for ErrorKind { + fn from(error: Error) -> Self { + match error { + Error::RequestParseError { request_parse_error } => Self::RequestParse(request_parse_error.to_string()), + Error::UdpAnnounceError { source } => match source { + UdpAnnounceError::ConnectionCookieError { source } => Self::ConnectionCookie(source.to_string()), + UdpAnnounceError::TrackerCoreAnnounceError { source } => match source { + AnnounceError::Whitelist(whitelist_error) => Self::Whitelist(whitelist_error.to_string()), + AnnounceError::Database(error) => Self::Database(error.to_string()), + }, + UdpAnnounceError::TrackerCoreWhitelistError { source } => Self::Whitelist(source.to_string()), + }, + Error::UdpScrapeError { source } => match source { + UdpScrapeError::ConnectionCookieError { source } => Self::ConnectionCookie(source.to_string()), + UdpScrapeError::TrackerCoreScrapeError { source } => match source { + ScrapeError::Whitelist(whitelist_error) => Self::Whitelist(whitelist_error.to_string()), + }, + UdpScrapeError::TrackerCoreWhitelistError { source } => Self::Whitelist(source.to_string()), + }, + Error::InternalServer { location: _, message } => Self::InternalServer(message.to_string()), + Error::BadRequest { source } => Self::BadRequest(source.to_string()), + Error::TrackerAuthenticationRequired { location } => Self::TrackerAuthentication(location.to_string()), + } + } +} + pub mod sender { use std::sync::Arc; diff --git a/packages/udp-tracker-server/src/handlers/error.rs b/packages/udp-tracker-server/src/handlers/error.rs index 7b477d84f..54163aca5 100644 --- a/packages/udp-tracker-server/src/handlers/error.rs +++ b/packages/udp-tracker-server/src/handlers/error.rs @@ -21,7 +21,7 @@ pub async fn handle_error( request_id: Uuid, opt_udp_server_stats_event_sender: &crate::event::sender::Sender, cookie_valid_range: Range, - e: &Error, + error: &Error, transaction_id: Option, ) -> Response { tracing::trace!("handle error"); @@ -31,17 +31,17 @@ pub async fn handle_error( match transaction_id { Some(transaction_id) => { let transaction_id = transaction_id.0.to_string(); - tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %client_socket_addr, %server_socket_addr, %request_id, %transaction_id, "response error"); + tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %error, %client_socket_addr, %server_socket_addr, %request_id, %transaction_id, "response error"); } None => { - tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %client_socket_addr, %server_socket_addr, %request_id, "response error"); + tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %error, %client_socket_addr, %server_socket_addr, %request_id, "response error"); } } - let e = if let Error::RequestParseError { request_parse_error } = e { + let e = if let Error::RequestParseError { request_parse_error } = error { (request_parse_error.message.clone(), transaction_id) } else { - (e.to_string(), transaction_id) + (error.to_string(), transaction_id) }; if e.1.is_some() { @@ -52,6 +52,7 @@ pub async fn handle_error( .send(Event::UdpError { context: ConnectionContext::new(client_socket_addr, server_service_binding), kind: req_kind, + error: error.clone().into(), }) .await; } diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs index 1e1502339..b231d8336 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler.rs @@ -232,7 +232,7 @@ pub async fn handle_event(event: Event, stats_repository: &Repository, now: Dura Err(err) => tracing::error!("Failed to increase the counter: {}", err), }; } - Event::UdpError { context, kind } => { + Event::UdpError { context, kind, error: _ } => { // Global fixed metrics match context.client_socket_addr().ip() { std::net::IpAddr::V4(_) => { @@ -271,7 +271,7 @@ mod tests { use torrust_tracker_clock::clock::Time; use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; - use crate::event::{ConnectionContext, Event, UdpRequestKind}; + use crate::event::{ConnectionContext, ErrorKind, Event, UdpRequestKind}; use crate::statistics::event::handler::handle_event; use crate::statistics::repository::Repository; use crate::CurrentClock; @@ -518,6 +518,7 @@ mod tests { .unwrap(), ), kind: None, + error: ErrorKind::RequestParse("Invalid request format".to_string()), }, &stats_repository, CurrentClock::now(), @@ -650,6 +651,7 @@ mod tests { .unwrap(), ), kind: None, + error: ErrorKind::RequestParse("Invalid request format".to_string()), }, &stats_repository, CurrentClock::now(), From d7902f1d670bf4411303fa3934e0a4ce595a20ef Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 2 Jun 2025 16:36:50 +0100 Subject: [PATCH 03/11] refactor: [#1456] remove unused enum variant in udp server error --- Cargo.lock | 1 - packages/udp-tracker-server/Cargo.toml | 1 - packages/udp-tracker-server/src/error.rs | 7 ------- packages/udp-tracker-server/src/event.rs | 1 - packages/udp-tracker-server/src/handlers/mod.rs | 1 + 5 files changed, 1 insertion(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35040f516..feb749d3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4915,7 +4915,6 @@ dependencies = [ "torrust-tracker-clock", "torrust-tracker-configuration", "torrust-tracker-events", - "torrust-tracker-located-error", "torrust-tracker-metrics", "torrust-tracker-primitives", "torrust-tracker-swarm-coordination-registry", diff --git a/packages/udp-tracker-server/Cargo.toml b/packages/udp-tracker-server/Cargo.toml index 72fa520ba..c0bc94ce3 100644 --- a/packages/udp-tracker-server/Cargo.toml +++ b/packages/udp-tracker-server/Cargo.toml @@ -30,7 +30,6 @@ torrust-server-lib = { version = "3.0.0-develop", path = "../server-lib" } torrust-tracker-clock = { version = "3.0.0-develop", path = "../clock" } torrust-tracker-configuration = { version = "3.0.0-develop", path = "../configuration" } torrust-tracker-events = { version = "3.0.0-develop", path = "../events" } -torrust-tracker-located-error = { version = "3.0.0-develop", path = "../located-error" } torrust-tracker-metrics = { version = "3.0.0-develop", path = "../metrics" } torrust-tracker-primitives = { version = "3.0.0-develop", path = "../primitives" } torrust-tracker-swarm-coordination-registry = { version = "3.0.0-develop", path = "../swarm-coordination-registry" } diff --git a/packages/udp-tracker-server/src/error.rs b/packages/udp-tracker-server/src/error.rs index d45b96569..aecf960b8 100644 --- a/packages/udp-tracker-server/src/error.rs +++ b/packages/udp-tracker-server/src/error.rs @@ -7,7 +7,6 @@ use bittorrent_udp_tracker_core::services::announce::UdpAnnounceError; use bittorrent_udp_tracker_core::services::scrape::UdpScrapeError; use derive_more::derive::Display; use thiserror::Error; -use torrust_tracker_located_error::LocatedError; #[derive(Display, Debug)] #[display(":?")] @@ -35,12 +34,6 @@ pub enum Error { message: String, }, - /// Error returned when the request is invalid. - #[error("bad request: {source}")] - BadRequest { - source: LocatedError<'static, dyn std::error::Error + Send + Sync>, - }, - /// Error returned when tracker requires authentication. #[error("domain tracker requires authentication but is not supported in current UDP implementation. Location: {location}")] TrackerAuthenticationRequired { location: &'static Location<'static> }, diff --git a/packages/udp-tracker-server/src/event.rs b/packages/udp-tracker-server/src/event.rs index 4d3646563..e320ceb8a 100644 --- a/packages/udp-tracker-server/src/event.rs +++ b/packages/udp-tracker-server/src/event.rs @@ -146,7 +146,6 @@ impl From for ErrorKind { UdpScrapeError::TrackerCoreWhitelistError { source } => Self::Whitelist(source.to_string()), }, Error::InternalServer { location: _, message } => Self::InternalServer(message.to_string()), - Error::BadRequest { source } => Self::BadRequest(source.to_string()), Error::TrackerAuthenticationRequired { location } => Self::TrackerAuthentication(location.to_string()), } } diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs index df550ab72..6785bd293 100644 --- a/packages/udp-tracker-server/src/handlers/mod.rs +++ b/packages/udp-tracker-server/src/handlers/mod.rs @@ -109,6 +109,7 @@ pub(crate) async fn handle_packet( } }, Err(e) => { + // The request payload could not be parsed, so we handle it as an error. let response = handle_error( None, udp_request.from, From 0108c26b6db35d11522589cb20ce62904a97c059 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 2 Jun 2025 16:54:40 +0100 Subject: [PATCH 04/11] fix: test. Error message changed --- packages/udp-tracker-server/src/error.rs | 2 +- packages/udp-tracker-server/tests/server/contract.rs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/udp-tracker-server/src/error.rs b/packages/udp-tracker-server/src/error.rs index aecf960b8..697cc5cab 100644 --- a/packages/udp-tracker-server/src/error.rs +++ b/packages/udp-tracker-server/src/error.rs @@ -16,7 +16,7 @@ pub struct ConnectionCookie(pub ConnectionId); #[derive(Error, Debug, Clone)] pub enum Error { /// Error returned when the request is invalid. - #[error("error when phrasing request: {request_parse_error:?}")] + #[error("error parsing request: {request_parse_error:?}")] RequestParseError { request_parse_error: SendableRequestParseError }, /// Error returned when the domain tracker returns an announce error. diff --git a/packages/udp-tracker-server/tests/server/contract.rs b/packages/udp-tracker-server/tests/server/contract.rs index 860fd1f0b..04ad0f39d 100644 --- a/packages/udp-tracker-server/tests/server/contract.rs +++ b/packages/udp-tracker-server/tests/server/contract.rs @@ -59,7 +59,9 @@ async fn should_return_a_bad_request_response_when_the_client_sends_an_empty_req let response = Response::parse_bytes(&response, true).unwrap(); - assert_eq!(get_error_response_message(&response).unwrap(), "Protocol identifier missing"); + assert!(get_error_response_message(&response) + .unwrap() + .contains("Protocol identifier missing")); env.stop().await; } From f485501f8e7705fe886932d5889b79c8eafb9057 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 2 Jun 2025 16:55:24 +0100 Subject: [PATCH 05/11] refactor: [#1456 clean code --- .../udp-tracker-server/src/handlers/error.rs | 16 +++++----------- packages/udp-tracker-server/src/handlers/mod.rs | 9 ++++++++- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/packages/udp-tracker-server/src/handlers/error.rs b/packages/udp-tracker-server/src/handlers/error.rs index 54163aca5..4ebe24075 100644 --- a/packages/udp-tracker-server/src/handlers/error.rs +++ b/packages/udp-tracker-server/src/handlers/error.rs @@ -22,13 +22,13 @@ pub async fn handle_error( opt_udp_server_stats_event_sender: &crate::event::sender::Sender, cookie_valid_range: Range, error: &Error, - transaction_id: Option, + opt_transaction_id: Option, ) -> Response { tracing::trace!("handle error"); let server_socket_addr = server_service_binding.bind_address(); - match transaction_id { + match opt_transaction_id { Some(transaction_id) => { let transaction_id = transaction_id.0.to_string(); tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %error, %client_socket_addr, %server_socket_addr, %request_id, %transaction_id, "response error"); @@ -38,13 +38,7 @@ pub async fn handle_error( } } - let e = if let Error::RequestParseError { request_parse_error } = error { - (request_parse_error.message.clone(), transaction_id) - } else { - (error.to_string(), transaction_id) - }; - - if e.1.is_some() { + if opt_transaction_id.is_some() { // code-review: why we trigger an event only if transaction_id is present? if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { @@ -59,7 +53,7 @@ pub async fn handle_error( } Response::from(ErrorResponse { - transaction_id: e.1.unwrap_or(TransactionId(I32::new(0))), - message: e.0.into(), + transaction_id: opt_transaction_id.unwrap_or(TransactionId(I32::new(0))), + message: error.to_string().into(), }) } diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs index 6785bd293..69c62a638 100644 --- a/packages/udp-tracker-server/src/handlers/mod.rs +++ b/packages/udp-tracker-server/src/handlers/mod.rs @@ -110,6 +110,13 @@ pub(crate) async fn handle_packet( }, Err(e) => { // The request payload could not be parsed, so we handle it as an error. + + let opt_transaction_id = if let Error::RequestParseError { request_parse_error } = e.clone() { + request_parse_error.opt_transaction_id + } else { + None + }; + let response = handle_error( None, udp_request.from, @@ -118,7 +125,7 @@ pub(crate) async fn handle_packet( &udp_tracker_server_container.stats_event_sender, cookie_time_values.valid_range.clone(), &e, - None, + opt_transaction_id, ) .await; From 525ab738d485a15175a8924520d88f66515f927a Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 2 Jun 2025 17:04:25 +0100 Subject: [PATCH 06/11] refactor: [#1456] extract methods --- .../udp-tracker-server/src/handlers/error.rs | 41 ++++++++++++++++--- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/packages/udp-tracker-server/src/handlers/error.rs b/packages/udp-tracker-server/src/handlers/error.rs index 4ebe24075..af530efd6 100644 --- a/packages/udp-tracker-server/src/handlers/error.rs +++ b/packages/udp-tracker-server/src/handlers/error.rs @@ -28,6 +28,32 @@ pub async fn handle_error( let server_socket_addr = server_service_binding.bind_address(); + log_error(error, client_socket_addr, server_socket_addr, opt_transaction_id, request_id); + + trigger_udp_error_event( + error.clone(), + client_socket_addr, + server_service_binding, + opt_transaction_id, + opt_udp_server_stats_event_sender, + req_kind, + ) + .await; + + Response::from(ErrorResponse { + transaction_id: opt_transaction_id.unwrap_or(TransactionId(I32::new(0))), + message: error.to_string().into(), + }) +} + +fn log_error( + error: &Error, + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, + opt_transaction_id: Option, + + request_id: Uuid, +) { match opt_transaction_id { Some(transaction_id) => { let transaction_id = transaction_id.0.to_string(); @@ -37,7 +63,17 @@ pub async fn handle_error( tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %error, %client_socket_addr, %server_socket_addr, %request_id, "response error"); } } +} + +async fn trigger_udp_error_event( + error: Error, + client_socket_addr: SocketAddr, + server_service_binding: ServiceBinding, + opt_transaction_id: Option, + opt_udp_server_stats_event_sender: &crate::event::sender::Sender, + req_kind: Option, +) { if opt_transaction_id.is_some() { // code-review: why we trigger an event only if transaction_id is present? @@ -51,9 +87,4 @@ pub async fn handle_error( .await; } } - - Response::from(ErrorResponse { - transaction_id: opt_transaction_id.unwrap_or(TransactionId(I32::new(0))), - message: error.to_string().into(), - }) } From ad1b19a366573dd24f35c3d6250758ee082ba9f6 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 2 Jun 2025 17:09:45 +0100 Subject: [PATCH 07/11] feat: trigger UDP error event when there is no transaction ID too --- .../udp-tracker-server/src/handlers/error.rs | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/packages/udp-tracker-server/src/handlers/error.rs b/packages/udp-tracker-server/src/handlers/error.rs index af530efd6..7fb4141b2 100644 --- a/packages/udp-tracker-server/src/handlers/error.rs +++ b/packages/udp-tracker-server/src/handlers/error.rs @@ -31,10 +31,9 @@ pub async fn handle_error( log_error(error, client_socket_addr, server_socket_addr, opt_transaction_id, request_id); trigger_udp_error_event( - error.clone(), + error, client_socket_addr, server_service_binding, - opt_transaction_id, opt_udp_server_stats_event_sender, req_kind, ) @@ -51,7 +50,6 @@ fn log_error( client_socket_addr: SocketAddr, server_socket_addr: SocketAddr, opt_transaction_id: Option, - request_id: Uuid, ) { match opt_transaction_id { @@ -66,25 +64,19 @@ fn log_error( } async fn trigger_udp_error_event( - error: Error, + error: &Error, client_socket_addr: SocketAddr, server_service_binding: ServiceBinding, - opt_transaction_id: Option, - opt_udp_server_stats_event_sender: &crate::event::sender::Sender, req_kind: Option, ) { - if opt_transaction_id.is_some() { - // code-review: why we trigger an event only if transaction_id is present? - - if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { - udp_server_stats_event_sender - .send(Event::UdpError { - context: ConnectionContext::new(client_socket_addr, server_service_binding), - kind: req_kind, - error: error.clone().into(), - }) - .await; - } + if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { + udp_server_stats_event_sender + .send(Event::UdpError { + context: ConnectionContext::new(client_socket_addr, server_service_binding), + kind: req_kind, + error: error.clone().into(), + }) + .await; } } From 21bea5b4bf30f3c220b443fed839521df50f453c Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 2 Jun 2025 17:35:17 +0100 Subject: [PATCH 08/11] refactor: [#1456] increase ban counters asyncronously --- .../udp-tracker-server/src/environment.rs | 1 + .../udp-tracker-server/src/handlers/mod.rs | 10 ---- .../src/statistics/event/handler.rs | 54 +++++++++++++++++-- .../src/statistics/event/listener.rs | 17 ++++-- src/bootstrap/jobs/udp_tracker_server.rs | 1 + 5 files changed, 65 insertions(+), 18 deletions(-) diff --git a/packages/udp-tracker-server/src/environment.rs b/packages/udp-tracker-server/src/environment.rs index 3f479a02d..268259f1b 100644 --- a/packages/udp-tracker-server/src/environment.rs +++ b/packages/udp-tracker-server/src/environment.rs @@ -82,6 +82,7 @@ impl Environment { let udp_server_event_listener_job = Some(crate::statistics::event::listener::run_event_listener( self.container.udp_tracker_server_container.event_bus.receiver(), &self.container.udp_tracker_server_container.stats_repository, + &self.container.udp_tracker_core_container.ban_service, )); // Start the UDP tracker server diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs index 69c62a638..0bd455701 100644 --- a/packages/udp-tracker-server/src/handlers/mod.rs +++ b/packages/udp-tracker-server/src/handlers/mod.rs @@ -13,7 +13,6 @@ use announce::handle_announce; use aquatic_udp_protocol::{Request, Response, TransactionId}; use bittorrent_tracker_core::MAX_SCRAPE_TORRENTS; use bittorrent_udp_tracker_core::container::UdpTrackerCoreContainer; -use bittorrent_udp_tracker_core::services::announce::UdpAnnounceError; use connect::handle_connect; use error::handle_error; use scrape::handle_scrape; @@ -84,15 +83,6 @@ pub(crate) async fn handle_packet( { Ok((response, req_kid)) => return (response, Some(req_kid)), Err((error, transaction_id, req_kind)) => { - if let Error::UdpAnnounceError { - source: UdpAnnounceError::ConnectionCookieError { .. }, - } = error - { - // code-review: should we include `RequestParseError` and `BadRequest`? - let mut ban_service = udp_tracker_core_container.ban_service.write().await; - ban_service.increase_counter(&udp_request.from.ip()); - } - let response = handle_error( Some(req_kind.clone()), udp_request.from, diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs index b231d8336..394850844 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler.rs @@ -1,8 +1,12 @@ +use std::sync::Arc; + +use bittorrent_udp_tracker_core::services::banning::BanService; +use tokio::sync::RwLock; use torrust_tracker_metrics::label::{LabelSet, LabelValue}; use torrust_tracker_metrics::{label_name, metric_name}; use torrust_tracker_primitives::DurationSinceUnixEpoch; -use crate::event::{Event, UdpRequestKind, UdpResponseKind}; +use crate::event::{ErrorKind, Event, UdpRequestKind, UdpResponseKind}; use crate::statistics::repository::Repository; use crate::statistics::{ UDP_TRACKER_SERVER_ERRORS_TOTAL, UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, @@ -16,7 +20,12 @@ use crate::statistics::{ /// This function panics if the client IP version does not match the expected /// version. #[allow(clippy::too_many_lines)] -pub async fn handle_event(event: Event, stats_repository: &Repository, now: DurationSinceUnixEpoch) { +pub async fn handle_event( + event: Event, + stats_repository: &Repository, + ban_service: &Arc>, + now: DurationSinceUnixEpoch, +) { match event { Event::UdpRequestAborted { context } => { // Global fixed metrics @@ -232,7 +241,14 @@ pub async fn handle_event(event: Event, stats_repository: &Repository, now: Dura Err(err) => tracing::error!("Failed to increase the counter: {}", err), }; } - Event::UdpError { context, kind, error: _ } => { + Event::UdpError { context, kind, error } => { + // Increase the number of errors + // code-review: should we ban IP due to other errors too? + if let ErrorKind::ConnectionCookie(_msg) = error { + let mut ban_service = ban_service.write().await; + ban_service.increase_counter(&context.client_socket_addr().ip()); + } + // Global fixed metrics match context.client_socket_addr().ip() { std::net::IpAddr::V4(_) => { @@ -267,7 +283,9 @@ pub async fn handle_event(event: Event, stats_repository: &Repository, now: Dura #[cfg(test)] mod tests { use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use std::sync::Arc; + use bittorrent_udp_tracker_core::services::banning::BanService; use torrust_tracker_clock::clock::Time; use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; @@ -279,6 +297,7 @@ mod tests { #[tokio::test] async fn should_increase_the_number_of_aborted_requests_when_it_receives_a_udp_request_aborted_event() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpRequestAborted { @@ -292,6 +311,7 @@ mod tests { ), }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -304,6 +324,7 @@ mod tests { #[tokio::test] async fn should_increase_the_number_of_banned_requests_when_it_receives_a_udp_request_banned_event() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpRequestBanned { @@ -317,6 +338,7 @@ mod tests { ), }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -329,6 +351,7 @@ mod tests { #[tokio::test] async fn should_increase_the_number_of_incoming_requests_when_it_receives_a_udp4_incoming_request_event() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpRequestReceived { @@ -342,6 +365,7 @@ mod tests { ), }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -354,6 +378,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpRequestAborted { @@ -367,6 +392,7 @@ mod tests { ), }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -376,6 +402,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp_ban_counter_when_it_receives_a_udp_banned_event() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpRequestBanned { @@ -389,6 +416,7 @@ mod tests { ), }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -399,6 +427,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp4_connect_requests_counter_when_it_receives_a_udp4_request_event_of_connect_kind() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpRequestAccepted { @@ -413,6 +442,7 @@ mod tests { kind: crate::event::UdpRequestKind::Connect, }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -425,6 +455,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp4_announce_requests_counter_when_it_receives_a_udp4_request_event_of_announce_kind() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpRequestAccepted { @@ -439,6 +470,7 @@ mod tests { kind: crate::event::UdpRequestKind::Announce, }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -451,6 +483,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp4_scrape_requests_counter_when_it_receives_a_udp4_request_event_of_scrape_kind() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpRequestAccepted { @@ -465,6 +498,7 @@ mod tests { kind: crate::event::UdpRequestKind::Scrape, }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -477,6 +511,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp4_responses_counter_when_it_receives_a_udp4_response_event() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpResponseSent { @@ -494,6 +529,7 @@ mod tests { req_processing_time: std::time::Duration::from_secs(1), }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -506,6 +542,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpError { @@ -521,6 +558,7 @@ mod tests { error: ErrorKind::RequestParse("Invalid request format".to_string()), }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -533,6 +571,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp6_connect_requests_counter_when_it_receives_a_udp6_request_event_of_connect_kind() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpRequestAccepted { @@ -547,6 +586,7 @@ mod tests { kind: crate::event::UdpRequestKind::Connect, }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -559,6 +599,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp6_announce_requests_counter_when_it_receives_a_udp6_request_event_of_announce_kind() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpRequestAccepted { @@ -573,6 +614,7 @@ mod tests { kind: crate::event::UdpRequestKind::Announce, }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -585,6 +627,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp6_scrape_requests_counter_when_it_receives_a_udp6_request_event_of_scrape_kind() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpRequestAccepted { @@ -599,6 +642,7 @@ mod tests { kind: crate::event::UdpRequestKind::Scrape, }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -611,6 +655,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp6_response_counter_when_it_receives_a_udp6_response_event() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpResponseSent { @@ -628,6 +673,7 @@ mod tests { req_processing_time: std::time::Duration::from_secs(1), }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; @@ -639,6 +685,7 @@ mod tests { #[tokio::test] async fn should_increase_the_udp6_errors_counter_when_it_receives_a_udp6_error_event() { let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); handle_event( Event::UdpError { @@ -654,6 +701,7 @@ mod tests { error: ErrorKind::RequestParse("Invalid request format".to_string()), }, &stats_repository, + &ban_service, CurrentClock::now(), ) .await; diff --git a/packages/udp-tracker-server/src/statistics/event/listener.rs b/packages/udp-tracker-server/src/statistics/event/listener.rs index d805cc87f..e6c9a85ce 100644 --- a/packages/udp-tracker-server/src/statistics/event/listener.rs +++ b/packages/udp-tracker-server/src/statistics/event/listener.rs @@ -1,6 +1,8 @@ use std::sync::Arc; +use bittorrent_udp_tracker_core::services::banning::BanService; use bittorrent_udp_tracker_core::UDP_TRACKER_LOG_TARGET; +use tokio::sync::RwLock; use tokio::task::JoinHandle; use torrust_tracker_clock::clock::Time; use torrust_tracker_events::receiver::RecvError; @@ -11,19 +13,24 @@ use crate::statistics::repository::Repository; use crate::CurrentClock; #[must_use] -pub fn run_event_listener(receiver: Receiver, repository: &Arc) -> JoinHandle<()> { - let stats_repository = repository.clone(); +pub fn run_event_listener( + receiver: Receiver, + repository: &Arc, + ban_service: &Arc>, +) -> JoinHandle<()> { + let repository_clone = repository.clone(); + let ban_service_clone = ban_service.clone(); tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting UDP tracker server event listener"); tokio::spawn(async move { - dispatch_events(receiver, stats_repository).await; + dispatch_events(receiver, repository_clone, ban_service_clone).await; tracing::info!(target: UDP_TRACKER_LOG_TARGET, "UDP tracker server event listener finished"); }) } -async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc) { +async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc, ban_service: Arc>) { let shutdown_signal = tokio::signal::ctrl_c(); tokio::pin!(shutdown_signal); @@ -38,7 +45,7 @@ async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc { match result { - Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await, + Ok(event) => handle_event(event, &stats_repository, &ban_service, CurrentClock::now()).await, Err(e) => { match e { RecvError::Closed => { diff --git a/src/bootstrap/jobs/udp_tracker_server.rs b/src/bootstrap/jobs/udp_tracker_server.rs index 42ac2d03e..8a4c2a273 100644 --- a/src/bootstrap/jobs/udp_tracker_server.rs +++ b/src/bootstrap/jobs/udp_tracker_server.rs @@ -10,6 +10,7 @@ pub fn start_event_listener(config: &Configuration, app_container: &Arc Date: Mon, 2 Jun 2025 17:49:41 +0100 Subject: [PATCH 09/11] refactor: rename UDP tracker server error variants --- packages/udp-tracker-server/src/error.rs | 16 ++++++++-------- packages/udp-tracker-server/src/event.rs | 10 +++++----- packages/udp-tracker-server/src/handlers/mod.rs | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/packages/udp-tracker-server/src/error.rs b/packages/udp-tracker-server/src/error.rs index 697cc5cab..d260ebfd4 100644 --- a/packages/udp-tracker-server/src/error.rs +++ b/packages/udp-tracker-server/src/error.rs @@ -17,31 +17,31 @@ pub struct ConnectionCookie(pub ConnectionId); pub enum Error { /// Error returned when the request is invalid. #[error("error parsing request: {request_parse_error:?}")] - RequestParseError { request_parse_error: SendableRequestParseError }, + InvalidRequest { request_parse_error: SendableRequestParseError }, /// Error returned when the domain tracker returns an announce error. #[error("tracker announce error: {source}")] - UdpAnnounceError { source: UdpAnnounceError }, + AnnounceFailed { source: UdpAnnounceError }, /// Error returned when the domain tracker returns an scrape error. #[error("tracker scrape error: {source}")] - UdpScrapeError { source: UdpScrapeError }, + ScrapeFailed { source: UdpScrapeError }, /// Error returned from a third-party library (`aquatic_udp_protocol`). #[error("internal server error: {message}, {location}")] - InternalServer { + Internal { location: &'static Location<'static>, message: String, }, /// Error returned when tracker requires authentication. #[error("domain tracker requires authentication but is not supported in current UDP implementation. Location: {location}")] - TrackerAuthenticationRequired { location: &'static Location<'static> }, + AuthRequired { location: &'static Location<'static> }, } impl From for Error { fn from(request_parse_error: RequestParseError) -> Self { - Self::RequestParseError { + Self::InvalidRequest { request_parse_error: request_parse_error.into(), } } @@ -49,7 +49,7 @@ impl From for Error { impl From for Error { fn from(udp_announce_error: UdpAnnounceError) -> Self { - Self::UdpAnnounceError { + Self::AnnounceFailed { source: udp_announce_error, } } @@ -57,7 +57,7 @@ impl From for Error { impl From for Error { fn from(udp_scrape_error: UdpScrapeError) -> Self { - Self::UdpScrapeError { + Self::ScrapeFailed { source: udp_scrape_error, } } diff --git a/packages/udp-tracker-server/src/event.rs b/packages/udp-tracker-server/src/event.rs index e320ceb8a..4fa29940e 100644 --- a/packages/udp-tracker-server/src/event.rs +++ b/packages/udp-tracker-server/src/event.rs @@ -129,8 +129,8 @@ pub enum ErrorKind { impl From for ErrorKind { fn from(error: Error) -> Self { match error { - Error::RequestParseError { request_parse_error } => Self::RequestParse(request_parse_error.to_string()), - Error::UdpAnnounceError { source } => match source { + Error::InvalidRequest { request_parse_error } => Self::RequestParse(request_parse_error.to_string()), + Error::AnnounceFailed { source } => match source { UdpAnnounceError::ConnectionCookieError { source } => Self::ConnectionCookie(source.to_string()), UdpAnnounceError::TrackerCoreAnnounceError { source } => match source { AnnounceError::Whitelist(whitelist_error) => Self::Whitelist(whitelist_error.to_string()), @@ -138,15 +138,15 @@ impl From for ErrorKind { }, UdpAnnounceError::TrackerCoreWhitelistError { source } => Self::Whitelist(source.to_string()), }, - Error::UdpScrapeError { source } => match source { + Error::ScrapeFailed { source } => match source { UdpScrapeError::ConnectionCookieError { source } => Self::ConnectionCookie(source.to_string()), UdpScrapeError::TrackerCoreScrapeError { source } => match source { ScrapeError::Whitelist(whitelist_error) => Self::Whitelist(whitelist_error.to_string()), }, UdpScrapeError::TrackerCoreWhitelistError { source } => Self::Whitelist(source.to_string()), }, - Error::InternalServer { location: _, message } => Self::InternalServer(message.to_string()), - Error::TrackerAuthenticationRequired { location } => Self::TrackerAuthentication(location.to_string()), + Error::Internal { location: _, message } => Self::InternalServer(message.to_string()), + Error::AuthRequired { location } => Self::TrackerAuthentication(location.to_string()), } } } diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs index 0bd455701..c1125b97f 100644 --- a/packages/udp-tracker-server/src/handlers/mod.rs +++ b/packages/udp-tracker-server/src/handlers/mod.rs @@ -101,7 +101,7 @@ pub(crate) async fn handle_packet( Err(e) => { // The request payload could not be parsed, so we handle it as an error. - let opt_transaction_id = if let Error::RequestParseError { request_parse_error } = e.clone() { + let opt_transaction_id = if let Error::InvalidRequest { request_parse_error } = e.clone() { request_parse_error.opt_transaction_id } else { None From 89ac87cbc1c26fd93e6a019faeb10161f9f6e058 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 2 Jun 2025 18:03:25 +0100 Subject: [PATCH 10/11] refactor: [#1551] extract methods in udp event handler" --- .../src/statistics/event/handler.rs | 482 +++++++++--------- 1 file changed, 254 insertions(+), 228 deletions(-) diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs index 394850844..a1e9007e9 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler.rs @@ -6,7 +6,7 @@ use torrust_tracker_metrics::label::{LabelSet, LabelValue}; use torrust_tracker_metrics::{label_name, metric_name}; use torrust_tracker_primitives::DurationSinceUnixEpoch; -use crate::event::{ErrorKind, Event, UdpRequestKind, UdpResponseKind}; +use crate::event::{ConnectionContext, ErrorKind, Event, UdpRequestKind, UdpResponseKind}; use crate::statistics::repository::Repository; use crate::statistics::{ UDP_TRACKER_SERVER_ERRORS_TOTAL, UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, @@ -15,10 +15,6 @@ use crate::statistics::{ UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL, }; -/// # Panics -/// -/// This function panics if the client IP version does not match the expected -/// version. #[allow(clippy::too_many_lines)] pub async fn handle_event( event: Event, @@ -28,256 +24,286 @@ pub async fn handle_event( ) { match event { Event::UdpRequestAborted { context } => { - // Global fixed metrics - stats_repository.increase_udp_requests_aborted().await; - - // Extendable metrics - match stats_repository - .increase_counter( - &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL), - &LabelSet::from(context), - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; + handle_udp_request_aborted_event(context, stats_repository, now).await; } Event::UdpRequestBanned { context } => { - // Global fixed metrics - stats_repository.increase_udp_requests_banned().await; - - // Extendable metrics - match stats_repository - .increase_counter( - &metric_name!(UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL), - &LabelSet::from(context), - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; + handle_udp_request_banned_event(context, stats_repository, now).await; } Event::UdpRequestReceived { context } => { - // Global fixed metrics - match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_requests().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_requests().await; - } - } - - // Extendable metrics - match stats_repository - .increase_counter( - &metric_name!(UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL), - &LabelSet::from(context), - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; + handle_udp_request_received_event(context, stats_repository, now).await; } Event::UdpRequestAccepted { context, kind } => { - // Global fixed metrics - match kind { - UdpRequestKind::Connect => match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_connections().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_connections().await; - } - }, - UdpRequestKind::Announce => match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_announces().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_announces().await; - } - }, - UdpRequestKind::Scrape => match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_scrapes().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_scrapes().await; - } - }, - } - - // Extendable metrics - - let mut label_set = LabelSet::from(context); - - label_set.upsert(label_name!("request_kind"), LabelValue::new(&kind.to_string())); - - match stats_repository - .increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &label_set, now) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; + handle_udp_request_accepted_event(context, kind, stats_repository, now).await; } Event::UdpResponseSent { context, kind, req_processing_time, } => { - // Global fixed metrics - match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_responses().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_responses().await; - } - } + handle_udp_response_sent_event(context, kind, req_processing_time, stats_repository, now).await; + } + Event::UdpError { context, kind, error } => { + handle_udp_error_event(context, kind, error, stats_repository, ban_service, now).await; + } + } - let (result_label_value, kind_label_value) = match kind { - UdpResponseKind::Ok { req_kind } => match req_kind { - UdpRequestKind::Connect => { - let new_avg = stats_repository - .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) - .await; - - // Extendable metrics - - let mut label_set = LabelSet::from(context.clone()); - label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } - - (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Connect.to_string())) - } - UdpRequestKind::Announce => { - let new_avg = stats_repository - .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) - .await; - - // Extendable metrics - - let mut label_set = LabelSet::from(context.clone()); - label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } - - (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Announce.to_string())) - } - UdpRequestKind::Scrape => { - let new_avg = stats_repository - .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) - .await; - - // Extendable metrics - - let mut label_set = LabelSet::from(context.clone()); - label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } - - (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Scrape.to_string())) - } - }, - UdpResponseKind::Error { opt_req_kind: _ } => (LabelValue::new("error"), LabelValue::ignore()), - }; + tracing::debug!("stats: {:?}", stats_repository.get_stats().await); +} - // Extendable metrics +async fn handle_udp_request_aborted_event( + context: ConnectionContext, + stats_repository: &Repository, + now: DurationSinceUnixEpoch, +) { + // Global fixed metrics + stats_repository.increase_udp_requests_aborted().await; + + // Extendable metrics + match stats_repository + .increase_counter( + &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL), + &LabelSet::from(context), + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} - let mut label_set = LabelSet::from(context); +async fn handle_udp_request_banned_event(context: ConnectionContext, stats_repository: &Repository, now: DurationSinceUnixEpoch) { + // Global fixed metrics + stats_repository.increase_udp_requests_banned().await; - if result_label_value == LabelValue::new("ok") { - label_set.upsert(label_name!("request_kind"), kind_label_value); - } - label_set.upsert(label_name!("result"), result_label_value); - - match stats_repository - .increase_counter(&metric_name!(UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL), &label_set, now) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; + // Extendable metrics + match stats_repository + .increase_counter( + &metric_name!(UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL), + &LabelSet::from(context), + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +async fn handle_udp_request_received_event( + context: ConnectionContext, + stats_repository: &Repository, + now: DurationSinceUnixEpoch, +) { + // Global fixed metrics + match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_requests().await; } - Event::UdpError { context, kind, error } => { - // Increase the number of errors - // code-review: should we ban IP due to other errors too? - if let ErrorKind::ConnectionCookie(_msg) = error { - let mut ban_service = ban_service.write().await; - ban_service.increase_counter(&context.client_socket_addr().ip()); - } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_requests().await; + } + } - // Global fixed metrics - match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_errors().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_errors().await; - } + // Extendable metrics + match stats_repository + .increase_counter( + &metric_name!(UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL), + &LabelSet::from(context), + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +async fn handle_udp_request_accepted_event( + context: ConnectionContext, + kind: UdpRequestKind, + stats_repository: &Repository, + now: DurationSinceUnixEpoch, +) { + // Global fixed metrics + match kind { + UdpRequestKind::Connect => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_connections().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_connections().await; + } + }, + UdpRequestKind::Announce => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_announces().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_announces().await; + } + }, + UdpRequestKind::Scrape => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_scrapes().await; } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_scrapes().await; + } + }, + } - // Extendable metrics + // Extendable metrics + let mut label_set = LabelSet::from(context); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&kind.to_string())); + match stats_repository + .increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &label_set, now) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} - let mut label_set = LabelSet::from(context); +/// # Panics +/// +/// This function panics if the client IP version does not match the expected +/// version. +async fn handle_udp_response_sent_event( + context: ConnectionContext, + kind: UdpResponseKind, + req_processing_time: std::time::Duration, + stats_repository: &Repository, + now: DurationSinceUnixEpoch, +) { + // Global fixed metrics + match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_responses().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_responses().await; + } + } - if let Some(kind) = kind { - label_set.upsert(label_name!("request_kind"), kind.to_string().into()); + let (result_label_value, kind_label_value) = match kind { + UdpResponseKind::Ok { req_kind } => match req_kind { + UdpRequestKind::Connect => { + let new_avg = stats_repository + .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) + .await; + let mut label_set = LabelSet::from(context.clone()); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); + match stats_repository + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &label_set, + new_avg, + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to set gauge: {}", err), + } + (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Connect.to_string())) } + UdpRequestKind::Announce => { + let new_avg = stats_repository + .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) + .await; + let mut label_set = LabelSet::from(context.clone()); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); + match stats_repository + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &label_set, + new_avg, + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to set gauge: {}", err), + } + (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Announce.to_string())) + } + UdpRequestKind::Scrape => { + let new_avg = stats_repository + .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) + .await; + let mut label_set = LabelSet::from(context.clone()); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); + match stats_repository + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &label_set, + new_avg, + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to set gauge: {}", err), + } + (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Scrape.to_string())) + } + }, + UdpResponseKind::Error { opt_req_kind: _ } => (LabelValue::new("error"), LabelValue::ignore()), + }; + + // Extendable metrics + let mut label_set = LabelSet::from(context); + if result_label_value == LabelValue::new("ok") { + label_set.upsert(label_name!("request_kind"), kind_label_value); + } + label_set.upsert(label_name!("result"), result_label_value); + match stats_repository + .increase_counter(&metric_name!(UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL), &label_set, now) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +async fn handle_udp_error_event( + context: ConnectionContext, + kind: Option, + error: ErrorKind, + stats_repository: &Repository, + ban_service: &Arc>, + now: DurationSinceUnixEpoch, +) { + // Increase the number of errors + // code-review: should we ban IP due to other errors too? + if let ErrorKind::ConnectionCookie(_msg) = error { + let mut ban_service = ban_service.write().await; + ban_service.increase_counter(&context.client_socket_addr().ip()); + } - match stats_repository - .increase_counter(&metric_name!(UDP_TRACKER_SERVER_ERRORS_TOTAL), &label_set, now) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; + // Global fixed metrics + match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_errors().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_errors().await; } } - tracing::debug!("stats: {:?}", stats_repository.get_stats().await); + // Extendable metrics + let mut label_set = LabelSet::from(context); + if let Some(kind) = kind { + label_set.upsert(label_name!("request_kind"), kind.to_string().into()); + } + match stats_repository + .increase_counter(&metric_name!(UDP_TRACKER_SERVER_ERRORS_TOTAL), &label_set, now) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; } #[cfg(test)] From a8f3a973c661815b7721d87cc24b828915d0deec Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 2 Jun 2025 18:47:46 +0100 Subject: [PATCH 11/11] refactor: [#1551] extract event handler for each udp event --- .../src/statistics/event/handler.rs | 739 ------------------ .../src/statistics/event/handler/error.rs | 95 +++ .../src/statistics/event/handler/mod.rs | 49 ++ .../event/handler/request_aborted.rs | 92 +++ .../event/handler/request_accepted.rs | 236 ++++++ .../event/handler/request_banned.rs | 92 +++ .../event/handler/request_received.rs | 74 ++ .../statistics/event/handler/response_sent.rs | 182 +++++ 8 files changed, 820 insertions(+), 739 deletions(-) delete mode 100644 packages/udp-tracker-server/src/statistics/event/handler.rs create mode 100644 packages/udp-tracker-server/src/statistics/event/handler/error.rs create mode 100644 packages/udp-tracker-server/src/statistics/event/handler/mod.rs create mode 100644 packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs create mode 100644 packages/udp-tracker-server/src/statistics/event/handler/request_accepted.rs create mode 100644 packages/udp-tracker-server/src/statistics/event/handler/request_banned.rs create mode 100644 packages/udp-tracker-server/src/statistics/event/handler/request_received.rs create mode 100644 packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs deleted file mode 100644 index a1e9007e9..000000000 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ /dev/null @@ -1,739 +0,0 @@ -use std::sync::Arc; - -use bittorrent_udp_tracker_core::services::banning::BanService; -use tokio::sync::RwLock; -use torrust_tracker_metrics::label::{LabelSet, LabelValue}; -use torrust_tracker_metrics::{label_name, metric_name}; -use torrust_tracker_primitives::DurationSinceUnixEpoch; - -use crate::event::{ConnectionContext, ErrorKind, Event, UdpRequestKind, UdpResponseKind}; -use crate::statistics::repository::Repository; -use crate::statistics::{ - UDP_TRACKER_SERVER_ERRORS_TOTAL, UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, - UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL, - UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL, - UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL, -}; - -#[allow(clippy::too_many_lines)] -pub async fn handle_event( - event: Event, - stats_repository: &Repository, - ban_service: &Arc>, - now: DurationSinceUnixEpoch, -) { - match event { - Event::UdpRequestAborted { context } => { - handle_udp_request_aborted_event(context, stats_repository, now).await; - } - Event::UdpRequestBanned { context } => { - handle_udp_request_banned_event(context, stats_repository, now).await; - } - Event::UdpRequestReceived { context } => { - handle_udp_request_received_event(context, stats_repository, now).await; - } - Event::UdpRequestAccepted { context, kind } => { - handle_udp_request_accepted_event(context, kind, stats_repository, now).await; - } - Event::UdpResponseSent { - context, - kind, - req_processing_time, - } => { - handle_udp_response_sent_event(context, kind, req_processing_time, stats_repository, now).await; - } - Event::UdpError { context, kind, error } => { - handle_udp_error_event(context, kind, error, stats_repository, ban_service, now).await; - } - } - - tracing::debug!("stats: {:?}", stats_repository.get_stats().await); -} - -async fn handle_udp_request_aborted_event( - context: ConnectionContext, - stats_repository: &Repository, - now: DurationSinceUnixEpoch, -) { - // Global fixed metrics - stats_repository.increase_udp_requests_aborted().await; - - // Extendable metrics - match stats_repository - .increase_counter( - &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL), - &LabelSet::from(context), - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; -} - -async fn handle_udp_request_banned_event(context: ConnectionContext, stats_repository: &Repository, now: DurationSinceUnixEpoch) { - // Global fixed metrics - stats_repository.increase_udp_requests_banned().await; - - // Extendable metrics - match stats_repository - .increase_counter( - &metric_name!(UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL), - &LabelSet::from(context), - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; -} - -async fn handle_udp_request_received_event( - context: ConnectionContext, - stats_repository: &Repository, - now: DurationSinceUnixEpoch, -) { - // Global fixed metrics - match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_requests().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_requests().await; - } - } - - // Extendable metrics - match stats_repository - .increase_counter( - &metric_name!(UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL), - &LabelSet::from(context), - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; -} - -async fn handle_udp_request_accepted_event( - context: ConnectionContext, - kind: UdpRequestKind, - stats_repository: &Repository, - now: DurationSinceUnixEpoch, -) { - // Global fixed metrics - match kind { - UdpRequestKind::Connect => match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_connections().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_connections().await; - } - }, - UdpRequestKind::Announce => match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_announces().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_announces().await; - } - }, - UdpRequestKind::Scrape => match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_scrapes().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_scrapes().await; - } - }, - } - - // Extendable metrics - let mut label_set = LabelSet::from(context); - label_set.upsert(label_name!("request_kind"), LabelValue::new(&kind.to_string())); - match stats_repository - .increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &label_set, now) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; -} - -/// # Panics -/// -/// This function panics if the client IP version does not match the expected -/// version. -async fn handle_udp_response_sent_event( - context: ConnectionContext, - kind: UdpResponseKind, - req_processing_time: std::time::Duration, - stats_repository: &Repository, - now: DurationSinceUnixEpoch, -) { - // Global fixed metrics - match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_responses().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_responses().await; - } - } - - let (result_label_value, kind_label_value) = match kind { - UdpResponseKind::Ok { req_kind } => match req_kind { - UdpRequestKind::Connect => { - let new_avg = stats_repository - .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) - .await; - let mut label_set = LabelSet::from(context.clone()); - label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } - (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Connect.to_string())) - } - UdpRequestKind::Announce => { - let new_avg = stats_repository - .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) - .await; - let mut label_set = LabelSet::from(context.clone()); - label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } - (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Announce.to_string())) - } - UdpRequestKind::Scrape => { - let new_avg = stats_repository - .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) - .await; - let mut label_set = LabelSet::from(context.clone()); - label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } - (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Scrape.to_string())) - } - }, - UdpResponseKind::Error { opt_req_kind: _ } => (LabelValue::new("error"), LabelValue::ignore()), - }; - - // Extendable metrics - let mut label_set = LabelSet::from(context); - if result_label_value == LabelValue::new("ok") { - label_set.upsert(label_name!("request_kind"), kind_label_value); - } - label_set.upsert(label_name!("result"), result_label_value); - match stats_repository - .increase_counter(&metric_name!(UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL), &label_set, now) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; -} - -async fn handle_udp_error_event( - context: ConnectionContext, - kind: Option, - error: ErrorKind, - stats_repository: &Repository, - ban_service: &Arc>, - now: DurationSinceUnixEpoch, -) { - // Increase the number of errors - // code-review: should we ban IP due to other errors too? - if let ErrorKind::ConnectionCookie(_msg) = error { - let mut ban_service = ban_service.write().await; - ban_service.increase_counter(&context.client_socket_addr().ip()); - } - - // Global fixed metrics - match context.client_socket_addr().ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_errors().await; - } - std::net::IpAddr::V6(_) => { - stats_repository.increase_udp6_errors().await; - } - } - - // Extendable metrics - let mut label_set = LabelSet::from(context); - if let Some(kind) = kind { - label_set.upsert(label_name!("request_kind"), kind.to_string().into()); - } - match stats_repository - .increase_counter(&metric_name!(UDP_TRACKER_SERVER_ERRORS_TOTAL), &label_set, now) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to increase the counter: {}", err), - }; -} - -#[cfg(test)] -mod tests { - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; - use std::sync::Arc; - - use bittorrent_udp_tracker_core::services::banning::BanService; - use torrust_tracker_clock::clock::Time; - use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; - - use crate::event::{ConnectionContext, ErrorKind, Event, UdpRequestKind}; - use crate::statistics::event::handler::handle_event; - use crate::statistics::repository::Repository; - use crate::CurrentClock; - - #[tokio::test] - async fn should_increase_the_number_of_aborted_requests_when_it_receives_a_udp_request_aborted_event() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpRequestAborted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp_requests_aborted, 1); - } - - #[tokio::test] - async fn should_increase_the_number_of_banned_requests_when_it_receives_a_udp_request_banned_event() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpRequestBanned { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp_requests_banned, 1); - } - - #[tokio::test] - async fn should_increase_the_number_of_incoming_requests_when_it_receives_a_udp4_incoming_request_event() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpRequestReceived { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_requests, 1); - } - - #[tokio::test] - async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpRequestAborted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp_requests_aborted, 1); - } - #[tokio::test] - async fn should_increase_the_udp_ban_counter_when_it_receives_a_udp_banned_event() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpRequestBanned { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp_requests_banned, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_connect_requests_counter_when_it_receives_a_udp4_request_event_of_connect_kind() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Connect, - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_connections_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_announce_requests_counter_when_it_receives_a_udp4_request_event_of_announce_kind() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Announce, - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_announces_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_scrape_requests_counter_when_it_receives_a_udp4_request_event_of_scrape_kind() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Scrape, - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_scrapes_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_responses_counter_when_it_receives_a_udp4_response_event() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpResponseSent { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpResponseKind::Ok { - req_kind: UdpRequestKind::Announce, - }, - req_processing_time: std::time::Duration::from_secs(1), - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_responses, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpError { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: None, - error: ErrorKind::RequestParse("Invalid request format".to_string()), - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_errors_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp6_connect_requests_counter_when_it_receives_a_udp6_request_event_of_connect_kind() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Connect, - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_connections_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp6_announce_requests_counter_when_it_receives_a_udp6_request_event_of_announce_kind() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Announce, - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_announces_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp6_scrape_requests_counter_when_it_receives_a_udp6_request_event_of_scrape_kind() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpRequestAccepted { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpRequestKind::Scrape, - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_scrapes_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp6_response_counter_when_it_receives_a_udp6_response_event() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpResponseSent { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: crate::event::UdpResponseKind::Ok { - req_kind: UdpRequestKind::Announce, - }, - req_processing_time: std::time::Duration::from_secs(1), - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_responses, 1); - } - #[tokio::test] - async fn should_increase_the_udp6_errors_counter_when_it_receives_a_udp6_error_event() { - let stats_repository = Repository::new(); - let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); - - handle_event( - Event::UdpError { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), - ServiceBinding::new( - Protocol::UDP, - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), - ) - .unwrap(), - ), - kind: None, - error: ErrorKind::RequestParse("Invalid request format".to_string()), - }, - &stats_repository, - &ban_service, - CurrentClock::now(), - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_errors_handled, 1); - } -} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/error.rs b/packages/udp-tracker-server/src/statistics/event/handler/error.rs new file mode 100644 index 000000000..e1023a56b --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/error.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; + +use bittorrent_udp_tracker_core::services::banning::BanService; +use tokio::sync::RwLock; +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::{label_name, metric_name}; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::{ConnectionContext, ErrorKind, UdpRequestKind}; +use crate::statistics::repository::Repository; +use crate::statistics::UDP_TRACKER_SERVER_ERRORS_TOTAL; + +pub async fn handle_event( + context: ConnectionContext, + kind: Option, + error: ErrorKind, + stats_repository: &Repository, + ban_service: &Arc>, + now: DurationSinceUnixEpoch, +) { + // Increase the number of errors + // code-review: should we ban IP due to other errors too? + if let ErrorKind::ConnectionCookie(_msg) = error { + let mut ban_service = ban_service.write().await; + ban_service.increase_counter(&context.client_socket_addr().ip()); + } + + // Global fixed metrics + match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_errors().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_errors().await; + } + } + + // Extendable metrics + let mut label_set = LabelSet::from(context); + if let Some(kind) = kind { + label_set.upsert(label_name!("request_kind"), kind.to_string().into()); + } + match stats_repository + .increase_counter(&metric_name!(UDP_TRACKER_SERVER_ERRORS_TOTAL), &label_set, now) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event}; + use crate::statistics::event::handler::error::ErrorKind; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpError { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: None, + error: ErrorKind::RequestParse("Invalid request format".to_string()), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_errors_handled, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/mod.rs b/packages/udp-tracker-server/src/statistics/event/handler/mod.rs new file mode 100644 index 000000000..c8ac864a3 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/mod.rs @@ -0,0 +1,49 @@ +mod error; +mod request_aborted; +mod request_accepted; +mod request_banned; +mod request_received; +mod response_sent; + +use std::sync::Arc; + +use bittorrent_udp_tracker_core::services::banning::BanService; +use tokio::sync::RwLock; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::Event; +use crate::statistics::repository::Repository; + +pub async fn handle_event( + event: Event, + stats_repository: &Repository, + ban_service: &Arc>, + now: DurationSinceUnixEpoch, +) { + match event { + Event::UdpRequestAborted { context } => { + request_aborted::handle_event(context, stats_repository, now).await; + } + Event::UdpRequestBanned { context } => { + request_banned::handle_event(context, stats_repository, now).await; + } + Event::UdpRequestReceived { context } => { + request_received::handle_event(context, stats_repository, now).await; + } + Event::UdpRequestAccepted { context, kind } => { + request_accepted::handle_event(context, kind, stats_repository, now).await; + } + Event::UdpResponseSent { + context, + kind, + req_processing_time, + } => { + response_sent::handle_event(context, kind, req_processing_time, stats_repository, now).await; + } + Event::UdpError { context, kind, error } => { + error::handle_event(context, kind, error, stats_repository, ban_service, now).await; + } + } + + tracing::debug!("stats: {:?}", stats_repository.get_stats().await); +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs new file mode 100644 index 000000000..270ec2a45 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs @@ -0,0 +1,92 @@ +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::metric_name; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::ConnectionContext; +use crate::statistics::repository::Repository; +use crate::statistics::UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL; + +pub async fn handle_event(context: ConnectionContext, stats_repository: &Repository, now: DurationSinceUnixEpoch) { + // Global fixed metrics + stats_repository.increase_udp_requests_aborted().await; + + // Extendable metrics + match stats_repository + .increase_counter( + &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL), + &LabelSet::from(context), + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event}; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_number_of_aborted_requests_when_it_receives_a_udp_request_aborted_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAborted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp_requests_aborted, 1); + } + + #[tokio::test] + async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAborted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + let stats = stats_repository.get_stats().await; + assert_eq!(stats.udp_requests_aborted, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_accepted.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_accepted.rs new file mode 100644 index 000000000..25c1311e5 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_accepted.rs @@ -0,0 +1,236 @@ +use torrust_tracker_metrics::label::{LabelSet, LabelValue}; +use torrust_tracker_metrics::{label_name, metric_name}; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::{ConnectionContext, UdpRequestKind}; +use crate::statistics::repository::Repository; +use crate::statistics::UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL; + +pub async fn handle_event( + context: ConnectionContext, + kind: UdpRequestKind, + stats_repository: &Repository, + now: DurationSinceUnixEpoch, +) { + // Global fixed metrics + match kind { + UdpRequestKind::Connect => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_connections().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_connections().await; + } + }, + UdpRequestKind::Announce => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_announces().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_announces().await; + } + }, + UdpRequestKind::Scrape => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_scrapes().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_scrapes().await; + } + }, + } + + // Extendable metrics + let mut label_set = LabelSet::from(context); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&kind.to_string())); + match stats_repository + .increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &label_set, now) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event}; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_udp4_connect_requests_counter_when_it_receives_a_udp4_request_event_of_connect_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Connect, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_announce_requests_counter_when_it_receives_a_udp4_request_event_of_announce_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Announce, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_scrape_requests_counter_when_it_receives_a_udp4_request_event_of_scrape_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Scrape, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_scrapes_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_connect_requests_counter_when_it_receives_a_udp6_request_event_of_connect_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Connect, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_announce_requests_counter_when_it_receives_a_udp6_request_event_of_announce_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Announce, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_scrape_requests_counter_when_it_receives_a_udp6_request_event_of_scrape_kind() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestAccepted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpRequestKind::Scrape, + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_scrapes_handled, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_banned.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_banned.rs new file mode 100644 index 000000000..74641574a --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_banned.rs @@ -0,0 +1,92 @@ +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::metric_name; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::ConnectionContext; +use crate::statistics::repository::Repository; +use crate::statistics::UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL; + +pub async fn handle_event(context: ConnectionContext, stats_repository: &Repository, now: DurationSinceUnixEpoch) { + // Global fixed metrics + stats_repository.increase_udp_requests_banned().await; + + // Extendable metrics + match stats_repository + .increase_counter( + &metric_name!(UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL), + &LabelSet::from(context), + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event}; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_number_of_banned_requests_when_it_receives_a_udp_request_banned_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestBanned { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp_requests_banned, 1); + } + + #[tokio::test] + async fn should_increase_the_udp_ban_counter_when_it_receives_a_udp_banned_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestBanned { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + let stats = stats_repository.get_stats().await; + assert_eq!(stats.udp_requests_banned, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_received.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_received.rs new file mode 100644 index 000000000..8333258c2 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_received.rs @@ -0,0 +1,74 @@ +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::metric_name; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::ConnectionContext; +use crate::statistics::repository::Repository; +use crate::statistics::UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL; + +pub async fn handle_event(context: ConnectionContext, stats_repository: &Repository, now: DurationSinceUnixEpoch) { + // Global fixed metrics + match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_requests().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_requests().await; + } + } + + // Extendable metrics + match stats_repository + .increase_counter( + &metric_name!(UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL), + &LabelSet::from(context), + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event}; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_number_of_incoming_requests_when_it_receives_a_udp4_incoming_request_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpRequestReceived { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_requests, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs b/packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs new file mode 100644 index 000000000..a69184e08 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs @@ -0,0 +1,182 @@ +use torrust_tracker_metrics::label::{LabelSet, LabelValue}; +use torrust_tracker_metrics::{label_name, metric_name}; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use crate::event::{ConnectionContext, UdpRequestKind, UdpResponseKind}; +use crate::statistics::repository::Repository; +use crate::statistics::{UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL}; + +pub async fn handle_event( + context: ConnectionContext, + kind: UdpResponseKind, + req_processing_time: std::time::Duration, + stats_repository: &Repository, + now: DurationSinceUnixEpoch, +) { + // Global fixed metrics + match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_responses().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_responses().await; + } + } + + let (result_label_value, kind_label_value) = match kind { + UdpResponseKind::Ok { req_kind } => match req_kind { + UdpRequestKind::Connect => { + let new_avg = stats_repository + .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) + .await; + let mut label_set = LabelSet::from(context.clone()); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); + match stats_repository + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &label_set, + new_avg, + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to set gauge: {}", err), + } + (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Connect.to_string())) + } + UdpRequestKind::Announce => { + let new_avg = stats_repository + .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) + .await; + let mut label_set = LabelSet::from(context.clone()); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); + match stats_repository + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &label_set, + new_avg, + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to set gauge: {}", err), + } + (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Announce.to_string())) + } + UdpRequestKind::Scrape => { + let new_avg = stats_repository + .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) + .await; + let mut label_set = LabelSet::from(context.clone()); + label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); + match stats_repository + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &label_set, + new_avg, + now, + ) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to set gauge: {}", err), + } + (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Scrape.to_string())) + } + }, + UdpResponseKind::Error { opt_req_kind: _ } => (LabelValue::new("error"), LabelValue::ignore()), + }; + + // Extendable metrics + let mut label_set = LabelSet::from(context); + if result_label_value == LabelValue::new("ok") { + label_set.upsert(label_name!("request_kind"), kind_label_value); + } + label_set.upsert(label_name!("result"), result_label_value); + match stats_repository + .increase_counter(&metric_name!(UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL), &label_set, now) + .await + { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increase the counter: {}", err), + }; +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use std::sync::Arc; + + use bittorrent_udp_tracker_core::services::banning::BanService; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding}; + + use crate::event::{ConnectionContext, Event, UdpRequestKind}; + use crate::statistics::event::handler::handle_event; + use crate::statistics::repository::Repository; + use crate::CurrentClock; + + #[tokio::test] + async fn should_increase_the_udp4_responses_counter_when_it_receives_a_udp4_response_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpResponseSent { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpResponseKind::Ok { + req_kind: UdpRequestKind::Announce, + }, + req_processing_time: std::time::Duration::from_secs(1), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_responses, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_response_counter_when_it_receives_a_udp6_response_event() { + let stats_repository = Repository::new(); + let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1))); + + handle_event( + Event::UdpResponseSent { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + ServiceBinding::new( + Protocol::UDP, + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ) + .unwrap(), + ), + kind: crate::event::UdpResponseKind::Ok { + req_kind: UdpRequestKind::Announce, + }, + req_processing_time: std::time::Duration::from_secs(1), + }, + &stats_repository, + &ban_service, + CurrentClock::now(), + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_responses, 1); + } +}