@@ -13,6 +13,7 @@ use crate::statistics::{
1313
1414pub async fn handle_event ( event : Event , stats_repository : & Arc < Repository > , now : DurationSinceUnixEpoch ) {
1515 match event {
16+ // Torrent events
1617 Event :: TorrentAdded { info_hash, .. } => {
1718 tracing:: debug!( info_hash = ?info_hash, "Torrent added" , ) ;
1819
@@ -27,6 +28,8 @@ pub async fn handle_event(event: Event, stats_repository: &Arc<Repository>, now:
2728 . decrement_gauge ( & metric_name ! ( TORRENT_REPOSITORY_TORRENTS_TOTAL ) , & LabelSet :: default ( ) , now)
2829 . await ;
2930 }
31+
32+ // Peer events
3033 Event :: PeerAdded { info_hash, peer } => {
3134 tracing:: debug!( info_hash = ?info_hash, peer = ?peer, "Peer added" , ) ;
3235
@@ -96,3 +99,266 @@ fn label_set_for_peer(peer: &Peer) -> LabelSet {
9699 ( label_name ! ( "peer_role" ) , LabelValue :: new ( "leecher" ) ) . into ( )
97100 }
98101}
102+
103+ #[ cfg( test) ]
104+ mod tests {
105+ use std:: sync:: Arc ;
106+
107+ use torrust_tracker_metrics:: label:: LabelSet ;
108+ use torrust_tracker_metrics:: metric:: MetricName ;
109+
110+ use crate :: statistics:: repository:: Repository ;
111+
112+ async fn expect_metric_to_be (
113+ stats_repository : & Arc < Repository > ,
114+ metric_name : & MetricName ,
115+ label_set : & LabelSet ,
116+ expected_value : f64 ,
117+ ) {
118+ let value = get_metric ( stats_repository, metric_name, label_set) . await ;
119+ assert_eq ! ( value. to_string( ) , expected_value. to_string( ) ) ;
120+ }
121+
122+ async fn get_metric ( stats_repository : & Arc < Repository > , metric_name : & MetricName , label_set : & LabelSet ) -> f64 {
123+ stats_repository
124+ . get_metrics ( )
125+ . await
126+ . metric_collection
127+ . get_gauge_value ( metric_name, label_set)
128+ . unwrap_or_else ( || panic ! ( "Failed to get metric value for metric name '{metric_name}' and label set '{label_set}'" ) )
129+ . value ( )
130+ }
131+
132+ mod for_torrent_metrics {
133+
134+ use std:: sync:: Arc ;
135+
136+ use torrust_tracker_clock:: clock:: stopped:: Stopped ;
137+ use torrust_tracker_clock:: clock:: { self , Time } ;
138+ use torrust_tracker_metrics:: label:: LabelSet ;
139+ use torrust_tracker_metrics:: metric_name;
140+
141+ use crate :: event:: Event ;
142+ use crate :: statistics:: event:: handler:: handle_event;
143+ use crate :: statistics:: event:: handler:: tests:: expect_metric_to_be;
144+ use crate :: statistics:: repository:: Repository ;
145+ use crate :: statistics:: TORRENT_REPOSITORY_TORRENTS_TOTAL ;
146+ use crate :: tests:: { sample_info_hash, sample_peer} ;
147+ use crate :: CurrentClock ;
148+
149+ #[ tokio:: test]
150+ async fn it_should_increment_the_number_of_torrents_when_a_torrent_added_event_is_received ( ) {
151+ clock:: Stopped :: local_set_to_unix_epoch ( ) ;
152+
153+ let stats_repository = Arc :: new ( Repository :: new ( ) ) ;
154+
155+ handle_event (
156+ Event :: TorrentAdded {
157+ info_hash : sample_info_hash ( ) ,
158+ announcement : sample_peer ( ) ,
159+ } ,
160+ & stats_repository,
161+ CurrentClock :: now ( ) ,
162+ )
163+ . await ;
164+
165+ expect_metric_to_be (
166+ & stats_repository,
167+ & metric_name ! ( TORRENT_REPOSITORY_TORRENTS_TOTAL ) ,
168+ & LabelSet :: default ( ) ,
169+ 1.0 ,
170+ )
171+ . await ;
172+ }
173+
174+ #[ tokio:: test]
175+ async fn it_should_decrement_the_number_of_torrents_when_a_torrent_removed_event_is_received ( ) {
176+ clock:: Stopped :: local_set_to_unix_epoch ( ) ;
177+
178+ let stats_repository = Arc :: new ( Repository :: new ( ) ) ;
179+ let metric_name = metric_name ! ( TORRENT_REPOSITORY_TORRENTS_TOTAL ) ;
180+ let label_set = LabelSet :: default ( ) ;
181+
182+ // Increment the gauge first to simulate a torrent being added.
183+ stats_repository
184+ . increment_gauge ( & metric_name, & label_set, CurrentClock :: now ( ) )
185+ . await
186+ . unwrap ( ) ;
187+
188+ handle_event (
189+ Event :: TorrentRemoved {
190+ info_hash : sample_info_hash ( ) ,
191+ } ,
192+ & stats_repository,
193+ CurrentClock :: now ( ) ,
194+ )
195+ . await ;
196+
197+ expect_metric_to_be ( & stats_repository, & metric_name, & label_set, 0.0 ) . await ;
198+ }
199+ }
200+
201+ mod for_peer_metrics {
202+
203+ mod peer_connections_total {
204+
205+ use std:: sync:: Arc ;
206+
207+ use aquatic_udp_protocol:: NumberOfBytes ;
208+ use rstest:: rstest;
209+ use torrust_tracker_clock:: clock:: stopped:: Stopped ;
210+ use torrust_tracker_clock:: clock:: { self , Time } ;
211+ use torrust_tracker_metrics:: label:: LabelValue ;
212+ use torrust_tracker_metrics:: { label_name, metric_name} ;
213+ use torrust_tracker_primitives:: peer:: { Peer , PeerRole } ;
214+
215+ use crate :: event:: Event ;
216+ use crate :: statistics:: event:: handler:: handle_event;
217+ use crate :: statistics:: event:: handler:: tests:: expect_metric_to_be;
218+ use crate :: statistics:: repository:: Repository ;
219+ use crate :: statistics:: TORRENT_REPOSITORY_PEER_CONNECTIONS_TOTAL ;
220+ use crate :: tests:: { leecher, sample_info_hash, seeder} ;
221+ use crate :: CurrentClock ;
222+
223+ fn make_peer ( role : PeerRole ) -> Peer {
224+ match role {
225+ PeerRole :: Seeder => seeder ( ) ,
226+ PeerRole :: Leecher => leecher ( ) ,
227+ }
228+ }
229+
230+ // It returns a peer with the opposite role of the given peer.
231+ fn make_opposite_role_peer ( peer : & Peer ) -> Peer {
232+ let mut opposite_role_peer = * peer;
233+
234+ match peer. role ( ) {
235+ PeerRole :: Seeder => {
236+ opposite_role_peer. left = NumberOfBytes :: new ( 1 ) ;
237+ }
238+ PeerRole :: Leecher => {
239+ opposite_role_peer. left = NumberOfBytes :: new ( 0 ) ;
240+ }
241+ }
242+
243+ opposite_role_peer
244+ }
245+
246+ #[ rstest]
247+ #[ case( "seeder" ) ]
248+ #[ case( "leecher" ) ]
249+ #[ tokio:: test]
250+ async fn it_should_increment_the_number_of_peer_connections_when_a_peer_added_event_is_received (
251+ #[ case] role : PeerRole ,
252+ ) {
253+ clock:: Stopped :: local_set_to_unix_epoch ( ) ;
254+
255+ let peer = make_peer ( role) ;
256+
257+ let stats_repository = Arc :: new ( Repository :: new ( ) ) ;
258+ let metric_name = metric_name ! ( TORRENT_REPOSITORY_PEER_CONNECTIONS_TOTAL ) ;
259+ let label_set = ( label_name ! ( "peer_role" ) , LabelValue :: new ( & role. to_string ( ) ) ) . into ( ) ;
260+
261+ handle_event (
262+ Event :: PeerAdded {
263+ info_hash : sample_info_hash ( ) ,
264+ peer,
265+ } ,
266+ & stats_repository,
267+ CurrentClock :: now ( ) ,
268+ )
269+ . await ;
270+
271+ expect_metric_to_be ( & stats_repository, & metric_name, & label_set, 1.0 ) . await ;
272+ }
273+
274+ #[ rstest]
275+ #[ case( "seeder" ) ]
276+ #[ case( "leecher" ) ]
277+ #[ tokio:: test]
278+ async fn it_should_decrement_the_number_of_peer_connections_when_a_peer_removed_event_is_received (
279+ #[ case] role : PeerRole ,
280+ ) {
281+ clock:: Stopped :: local_set_to_unix_epoch ( ) ;
282+
283+ let peer = make_peer ( role) ;
284+
285+ let stats_repository = Arc :: new ( Repository :: new ( ) ) ;
286+
287+ let metric_name = metric_name ! ( TORRENT_REPOSITORY_PEER_CONNECTIONS_TOTAL ) ;
288+ let label_set = ( label_name ! ( "peer_role" ) , LabelValue :: new ( & role. to_string ( ) ) ) . into ( ) ;
289+
290+ // Increment the gauge first to simulate a peer being added.
291+ stats_repository
292+ . increment_gauge ( & metric_name, & label_set, CurrentClock :: now ( ) )
293+ . await
294+ . unwrap ( ) ;
295+
296+ handle_event (
297+ Event :: PeerRemoved {
298+ info_hash : sample_info_hash ( ) ,
299+ peer,
300+ } ,
301+ & stats_repository,
302+ CurrentClock :: now ( ) ,
303+ )
304+ . await ;
305+
306+ expect_metric_to_be ( & stats_repository, & metric_name, & label_set, 0.0 ) . await ;
307+ }
308+
309+ #[ rstest]
310+ #[ case( "seeder" ) ]
311+ #[ case( "leecher" ) ]
312+ #[ tokio:: test]
313+ async fn it_should_adjust_the_number_of_seeders_and_leechers_when_a_peer_updated_event_is_received_and_the_peer_changed_its_role (
314+ #[ case] old_role : PeerRole ,
315+ ) {
316+ use crate :: statistics:: event:: handler:: tests:: get_metric;
317+
318+ clock:: Stopped :: local_set_to_unix_epoch ( ) ;
319+
320+ let stats_repository = Arc :: new ( Repository :: new ( ) ) ;
321+
322+ let old_peer = make_peer ( old_role) ;
323+ let new_peer = make_opposite_role_peer ( & old_peer) ;
324+
325+ let metric_name = metric_name ! ( TORRENT_REPOSITORY_PEER_CONNECTIONS_TOTAL ) ;
326+ let old_role_label_set = ( label_name ! ( "peer_role" ) , LabelValue :: new ( & old_peer. role ( ) . to_string ( ) ) ) . into ( ) ;
327+ let new_role_label_set = ( label_name ! ( "peer_role" ) , LabelValue :: new ( & new_peer. role ( ) . to_string ( ) ) ) . into ( ) ;
328+
329+ // Increment the gauge first by simulating a peer was added.
330+ handle_event (
331+ Event :: PeerAdded {
332+ info_hash : sample_info_hash ( ) ,
333+ peer : old_peer,
334+ } ,
335+ & stats_repository,
336+ CurrentClock :: now ( ) ,
337+ )
338+ . await ;
339+
340+ let old_role_total = get_metric ( & stats_repository, & metric_name, & old_role_label_set) . await ;
341+ let new_role_total = 0.0 ;
342+
343+ // The peer's role has changed, so we need to increment the new
344+ // role and decrement the old one.
345+ handle_event (
346+ Event :: PeerUpdated {
347+ info_hash : sample_info_hash ( ) ,
348+ old_peer,
349+ new_peer,
350+ } ,
351+ & stats_repository,
352+ CurrentClock :: now ( ) ,
353+ )
354+ . await ;
355+
356+ // The peer's role has changed, so the new role has incremented.
357+ expect_metric_to_be ( & stats_repository, & metric_name, & new_role_label_set, new_role_total + 1.0 ) . await ;
358+
359+ // And the old role has decremented.
360+ expect_metric_to_be ( & stats_repository, & metric_name, & old_role_label_set, old_role_total - 1.0 ) . await ;
361+ }
362+ }
363+ }
364+ }
0 commit comments