Skip to content

Commit 74e174d

Browse files
committed
refactor: [#1444] udp server event listener start in app start
1 parent 2fa4e15 commit 74e174d

13 files changed

Lines changed: 149 additions & 81 deletions

File tree

packages/rest-tracker-api-core/src/statistics/services.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,9 @@ mod tests {
158158
// UDP core stats (not used in this test)
159159

160160
// UDP server stats
161-
let (_udp_server_stats_event_sender, udp_server_stats_repository) =
161+
let udp_server_stats_keeper =
162162
torrust_udp_tracker_server::statistics::setup::factory(config.core.tracker_usage_statistics);
163-
let udp_server_stats_repository = Arc::new(udp_server_stats_repository);
163+
let udp_server_stats_repository = udp_server_stats_keeper.repository();
164164

165165
let tracker_metrics = get_metrics(
166166
in_memory_torrent_repository.clone(),

packages/udp-tracker-server/src/container.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use torrust_tracker_configuration::Core;
55
use crate::{event, statistics};
66

77
pub struct UdpTrackerServerContainer {
8+
pub udp_server_stats_keeper: Arc<statistics::keeper::Keeper>,
89
pub udp_server_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
910
pub udp_server_stats_repository: Arc<statistics::repository::Repository>,
1011
}
@@ -15,26 +16,28 @@ impl UdpTrackerServerContainer {
1516
let udp_tracker_server_services = UdpTrackerServerServices::initialize(core_config);
1617

1718
Arc::new(Self {
19+
udp_server_stats_keeper: udp_tracker_server_services.udp_server_stats_keeper.clone(),
1820
udp_server_stats_event_sender: udp_tracker_server_services.udp_server_stats_event_sender.clone(),
1921
udp_server_stats_repository: udp_tracker_server_services.udp_server_stats_repository.clone(),
2022
})
2123
}
2224
}
2325

2426
pub struct UdpTrackerServerServices {
27+
pub udp_server_stats_keeper: Arc<statistics::keeper::Keeper>,
2528
pub udp_server_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
2629
pub udp_server_stats_repository: Arc<statistics::repository::Repository>,
2730
}
2831

2932
impl UdpTrackerServerServices {
3033
#[must_use]
3134
pub fn initialize(core_config: &Arc<Core>) -> Arc<Self> {
32-
let (udp_server_stats_event_sender, udp_server_stats_repository) =
33-
statistics::setup::factory(core_config.tracker_usage_statistics);
34-
let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender);
35-
let udp_server_stats_repository = Arc::new(udp_server_stats_repository);
35+
let udp_server_stats_keeper = statistics::setup::factory(core_config.tracker_usage_statistics);
36+
let udp_server_stats_event_sender = udp_server_stats_keeper.sender();
37+
let udp_server_stats_repository = udp_server_stats_keeper.repository();
3638

3739
Arc::new(Self {
40+
udp_server_stats_keeper: udp_server_stats_keeper.clone(),
3841
udp_server_stats_event_sender: udp_server_stats_event_sender.clone(),
3942
udp_server_stats_repository: udp_server_stats_repository.clone(),
4043
})

packages/udp-tracker-server/src/environment.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ where
2424
pub registar: Registar,
2525
pub server: Server<S>,
2626
pub udp_core_event_listener_job: Option<JoinHandle<()>>,
27+
pub udp_server_event_listener_job: Option<JoinHandle<()>>,
2728
}
2829

2930
impl<S> Environment<S>
@@ -58,6 +59,7 @@ impl Environment<Stopped> {
5859
registar: Registar::default(),
5960
server,
6061
udp_core_event_listener_job: None,
62+
udp_server_event_listener_job: None,
6163
}
6264
}
6365

@@ -72,6 +74,14 @@ impl Environment<Stopped> {
7274
// Start the UDP tracker core event listener
7375
let udp_core_event_listener_job = Some(self.container.udp_tracker_core_container.stats_keeper.run_event_listener());
7476

77+
// Start the UDP tracker server event listener
78+
let udp_server_event_listener_job = Some(
79+
self.container
80+
.udp_tracker_server_container
81+
.udp_server_stats_keeper
82+
.run_event_listener(),
83+
);
84+
7585
// Start the UDP tracker server
7686
let server = self
7787
.server
@@ -89,6 +99,7 @@ impl Environment<Stopped> {
8999
registar: self.registar.clone(),
90100
server,
91101
udp_core_event_listener_job,
102+
udp_server_event_listener_job,
92103
}
93104
}
94105
}
@@ -110,14 +121,21 @@ impl Environment<Running> {
110121
/// Will panic if it cannot stop the service within the timeout.
111122
#[allow(dead_code)]
112123
pub async fn stop(self) -> Environment<Stopped> {
113-
// Stop the event listener
124+
// Stop the UDP tracker core event listener
114125
if let Some(udp_core_event_listener_job) = self.udp_core_event_listener_job {
115126
// todo: send a message to the event listener to stop and wait for
116127
// it to finish
117128
udp_core_event_listener_job.abort();
118129
}
119130

120-
// Stop the server
131+
// Stop the UDP tracker server event listener
132+
if let Some(udp_server_event_listener_job) = self.udp_server_event_listener_job {
133+
// todo: send a message to the event listener to stop and wait for
134+
// it to finish
135+
udp_server_event_listener_job.abort();
136+
}
137+
138+
// Stop the UDP tracker server
121139
let server = tokio::time::timeout(DEFAULT_TIMEOUT, self.server.stop())
122140
.await
123141
.expect("Failed to stop the UDP tracker server within the timeout")
@@ -128,6 +146,7 @@ impl Environment<Running> {
128146
registar: Registar::default(),
129147
server,
130148
udp_core_event_listener_job: None,
149+
udp_server_event_listener_job: None,
131150
}
132151
}
133152

packages/udp-tracker-server/src/event/sender.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub trait Sender: Sync + Send {
1616
}
1717

1818
/// An event sender implementation using a broadcast channel.
19+
#[derive(Clone)]
1920
pub struct Broadcaster {
2021
pub(crate) sender: broadcast::Sender<Event>,
2122
}

packages/udp-tracker-server/src/handlers/announce.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,8 @@ mod tests {
374374
core_tracker_services: Arc<CoreTrackerServices>,
375375
core_udp_tracker_services: Arc<CoreUdpTrackerServices>,
376376
) -> Response {
377-
let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false);
378-
let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender);
377+
let keeper = crate::statistics::setup::factory(false);
378+
let udp_server_stats_event_sender = keeper.sender();
379379

380380
let client_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080);
381381
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
@@ -706,11 +706,11 @@ mod tests {
706706
announce_handler: Arc<AnnounceHandler>,
707707
whitelist_authorization: Arc<whitelist::authorization::WhitelistAuthorization>,
708708
) -> Response {
709-
let keeper = bittorrent_udp_tracker_core::statistics::setup::factory(false);
710-
let udp_core_stats_event_sender = keeper.sender();
709+
let core_keeper = bittorrent_udp_tracker_core::statistics::setup::factory(false);
710+
let udp_core_stats_event_sender = core_keeper.sender();
711711

712-
let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false);
713-
let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender);
712+
let server_keeper = crate::statistics::setup::factory(false);
713+
let udp_server_stats_event_sender = server_keeper.sender();
714714

715715
let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1);
716716
let client_ip_v6 = client_ip_v4.to_ipv6_compatible();

packages/udp-tracker-server/src/handlers/connect.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,11 @@ mod tests {
8181
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
8282
let server_service_binding = ServiceBinding::new(Protocol::UDP, server_socket_addr).unwrap();
8383

84-
let keeper = bittorrent_udp_tracker_core::statistics::setup::factory(false);
85-
let udp_core_stats_event_sender = keeper.sender();
84+
let core_keeper = bittorrent_udp_tracker_core::statistics::setup::factory(false);
85+
let udp_core_stats_event_sender = core_keeper.sender();
8686

87-
let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false);
88-
let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender);
87+
let server_keeper = crate::statistics::setup::factory(false);
88+
let udp_server_stats_event_sender = server_keeper.sender();
8989

9090
let request = ConnectRequest {
9191
transaction_id: TransactionId(0i32.into()),
@@ -117,11 +117,11 @@ mod tests {
117117
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
118118
let server_service_binding = ServiceBinding::new(Protocol::UDP, server_socket_addr).unwrap();
119119

120-
let keeper = bittorrent_udp_tracker_core::statistics::setup::factory(false);
121-
let udp_core_stats_event_sender = keeper.sender();
120+
let core_keeper = bittorrent_udp_tracker_core::statistics::setup::factory(false);
121+
let udp_core_stats_event_sender = core_keeper.sender();
122122

123-
let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false);
124-
let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender);
123+
let server_keeper = crate::statistics::setup::factory(false);
124+
let udp_server_stats_event_sender = server_keeper.sender();
125125

126126
let request = ConnectRequest {
127127
transaction_id: TransactionId(0i32.into()),
@@ -153,11 +153,11 @@ mod tests {
153153
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
154154
let server_service_binding = ServiceBinding::new(Protocol::UDP, server_socket_addr).unwrap();
155155

156-
let keeper = bittorrent_udp_tracker_core::statistics::setup::factory(false);
157-
let udp_core_stats_event_sender = keeper.sender();
156+
let core_keeper = bittorrent_udp_tracker_core::statistics::setup::factory(false);
157+
let udp_core_stats_event_sender = core_keeper.sender();
158158

159-
let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false);
160-
let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender);
159+
let server_keeper = crate::statistics::setup::factory(false);
160+
let udp_server_stats_event_sender = server_keeper.sender();
161161

162162
let request = ConnectRequest {
163163
transaction_id: TransactionId(0i32.into()),

packages/udp-tracker-server/src/handlers/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -284,11 +284,11 @@ pub(crate) mod tests {
284284
));
285285
let scrape_handler = Arc::new(ScrapeHandler::new(&whitelist_authorization, &in_memory_torrent_repository));
286286

287-
let keeper = bittorrent_udp_tracker_core::statistics::setup::factory(false);
288-
let udp_core_stats_event_sender = keeper.sender();
287+
let core_keeper = bittorrent_udp_tracker_core::statistics::setup::factory(false);
288+
let udp_core_stats_event_sender = core_keeper.sender();
289289

290-
let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false);
291-
let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender);
290+
let server_keeper = crate::statistics::setup::factory(false);
291+
let udp_server_stats_event_sender = server_keeper.sender();
292292

293293
let announce_service = Arc::new(AnnounceService::new(
294294
announce_handler.clone(),

packages/udp-tracker-server/src/handlers/scrape.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,8 @@ mod tests {
178178
core_tracker_services: Arc<CoreTrackerServices>,
179179
core_udp_tracker_services: Arc<CoreUdpTrackerServices>,
180180
) -> Response {
181-
let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false);
182-
let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender);
181+
let keeper = crate::statistics::setup::factory(false);
182+
let udp_server_stats_event_sender = keeper.sender();
183183

184184
let client_socket_addr = sample_ipv4_remote_addr();
185185
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);

packages/udp-tracker-server/src/statistics/event/listener.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use bittorrent_udp_tracker_core::UDP_TRACKER_LOG_TARGET;
24
use tokio::sync::broadcast;
35
use torrust_tracker_clock::clock::Time;
@@ -7,7 +9,7 @@ use crate::event::Event;
79
use crate::statistics::repository::Repository;
810
use crate::CurrentClock;
911

10-
pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Repository) {
12+
pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Arc<Repository>) {
1113
loop {
1214
match receiver.recv().await {
1315
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,84 @@
1+
use std::sync::Arc;
2+
13
use bittorrent_udp_tracker_core::UDP_TRACKER_LOG_TARGET;
2-
use tokio::sync::broadcast::Receiver;
4+
use tokio::task::JoinHandle;
35

46
use super::event::listener::dispatch_events;
57
use super::repository::Repository;
6-
use crate::event::Event;
8+
use crate::event::sender::{self, Broadcaster};
79

810
/// The service responsible for keeping tracker metrics (listening to statistics events and handle them).
911
///
1012
/// It actively listen to new statistics events. When it receives a new event
1113
/// it accordingly increases the counters.
1214
pub struct Keeper {
13-
pub repository: Repository,
15+
pub enable_sender: bool,
16+
pub broadcaster: Broadcaster,
17+
pub repository: Arc<Repository>,
1418
}
1519

1620
impl Default for Keeper {
1721
fn default() -> Self {
18-
Self::new()
22+
let enable_sender = true;
23+
let broadcaster = Broadcaster::default();
24+
let repository = Arc::new(Repository::new());
25+
26+
Self::new(enable_sender, broadcaster, repository)
1927
}
2028
}
2129

2230
impl Keeper {
31+
/// Creates a new instance of [`Keeper`].
2332
#[must_use]
24-
pub fn new() -> Self {
33+
pub fn new(enable_sender: bool, broadcaster: Broadcaster, repository: Arc<Repository>) -> Self {
2534
Self {
26-
repository: Repository::new(),
35+
enable_sender,
36+
broadcaster,
37+
repository,
38+
}
39+
}
40+
41+
#[must_use]
42+
pub fn sender(&self) -> Arc<Option<Box<dyn sender::Sender>>> {
43+
if self.enable_sender {
44+
Arc::new(Some(Box::new(self.broadcaster.clone())))
45+
} else {
46+
Arc::new(None)
2747
}
2848
}
2949

30-
pub fn run_event_listener(&mut self, receiver: Receiver<Event>) {
50+
#[must_use]
51+
pub fn repository(&self) -> Arc<Repository> {
52+
self.repository.clone()
53+
}
54+
55+
#[must_use]
56+
pub fn run_event_listener(&self) -> JoinHandle<()> {
3157
let stats_repository = self.repository.clone();
58+
let receiver = self.broadcaster.subscribe();
3259

33-
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting UDP tracker server event listener");
60+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting HTTP tracker core event listener");
3461

3562
tokio::spawn(async move {
3663
dispatch_events(receiver, stats_repository).await;
3764

38-
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "UDP tracker core server listener finished");
39-
});
65+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "HTTP tracker core event listener finished");
66+
})
4067
}
4168
}
4269

4370
#[cfg(test)]
4471
mod tests {
72+
4573
use crate::statistics::keeper::Keeper;
4674
use crate::statistics::metrics::Metrics;
4775

4876
#[tokio::test]
4977
async fn should_contain_the_tracker_statistics() {
50-
let stats_tracker = Keeper::new();
78+
let stats_tracker = Keeper::default();
5179

5280
let stats = stats_tracker.repository.get_stats().await;
5381

54-
assert_eq!(stats.udp4_requests, Metrics::default().udp4_requests);
82+
assert_eq!(stats.udp4_announces_handled, Metrics::default().udp4_announces_handled);
5583
}
5684
}

0 commit comments

Comments
 (0)