Skip to content

Commit 8dc42b9

Browse files
fix unnecessary Arc Mutex on stream
1 parent fd61dfd commit 8dc42b9

4 files changed

Lines changed: 40 additions & 44 deletions

File tree

crates/network-scanner/examples/scan.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,25 +38,21 @@ fn main() -> anyhow::Result<()> {
3838
let rt = tokio::runtime::Runtime::new()?;
3939
rt.block_on(async move {
4040
let scanner = NetworkScanner::new(params).unwrap();
41-
let stream = scanner.start()?;
42-
let stream_clone = stream.clone();
41+
let mut stream = scanner.start()?;
4342
let now = std::time::Instant::now();
4443
tokio::task::spawn(async move {
4544
if tokio::signal::ctrl_c().await.is_ok() {
4645
tracing::info!("Ctrl-C received, stopping network scan");
47-
stream.stop();
4846
}
4947
});
50-
while let Ok(Some(res)) = timeout(Duration::from_secs(120), stream_clone.recv())
51-
.await
52-
.with_context(|| {
53-
tracing::error!("Failed to receive from stream");
54-
"Failed to receive from stream"
55-
})
56-
{
48+
while let Ok(Some(res)) = timeout(Duration::from_secs(120), stream.recv()).await.with_context(|| {
49+
tracing::error!("Failed to receive from stream");
50+
"Failed to receive from stream"
51+
}) {
5752
tracing::warn!("Result: {:?}", res);
5853
}
59-
stream_clone.stop();
54+
55+
stream.stop();
6056
tracing::warn!("Network Scan finished. elapsed: {:?}", now.elapsed());
6157
anyhow::Result::<()>::Ok(())
6258
})?;

crates/network-scanner/src/scanner.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use std::fmt::Display;
1010
use std::net::IpAddr;
1111
use std::sync::Arc;
1212
use std::time::Duration;
13-
use tokio::sync::Mutex;
1413
use typed_builder::TypedBuilder;
1514

1615
/// Represents a network scanner for discovering devices and their services over a network.
@@ -28,8 +27,8 @@ pub struct NetworkScanner {
2827
}
2928

3029
impl NetworkScanner {
31-
pub fn start(&self) -> anyhow::Result<Arc<NetworkScannerStream>> {
32-
let mut task_executor = TaskExecutionRunner::new(self.clone())?;
30+
pub fn start(&self) -> anyhow::Result<NetworkScannerStream> {
31+
let (mut task_executor, result_receiver) = TaskExecutionRunner::new(self.clone())?;
3332

3433
start_port_scan(&mut task_executor);
3534

@@ -46,27 +45,27 @@ impl NetworkScanner {
4645
}
4746

4847
let TaskExecutionRunner {
49-
context:
50-
TaskExecutionContext {
51-
result_receiver: port_receiver,
52-
mdns_daemon,
53-
..
54-
},
48+
context: TaskExecutionContext { mdns_daemon, .. },
5549
task_manager,
5650
} = task_executor;
5751

58-
let scanner_stream = Arc::new(NetworkScannerStream {
59-
result_receiver: port_receiver,
52+
let task_manager_clone = task_manager.clone();
53+
let mdns_daemon_clone = mdns_daemon.clone();
54+
55+
let scanner_stream = NetworkScannerStream {
56+
result_receiver,
6057
task_manager,
6158
mdns_daemon,
62-
});
59+
};
6360

64-
let scanner_stream_clone = Arc::clone(&scanner_stream);
6561
let max_wait_time = Duration::from_millis(self.configs.max_wait_time);
6662

6763
tokio::spawn(async move {
6864
tokio::time::sleep(max_wait_time).await;
69-
scanner_stream_clone.stop();
65+
task_manager_clone.stop();
66+
if let Some(daemon) = &mdns_daemon_clone {
67+
daemon.stop();
68+
}
7069
});
7170

7271
return Ok(scanner_stream);
@@ -394,24 +393,24 @@ pub enum ScanEntry {
394393
}
395394

396395
pub struct NetworkScannerStream {
397-
result_receiver: Arc<Mutex<ScanEntryReceiver>>,
396+
result_receiver: ScanEntryReceiver,
398397
task_manager: TaskManager,
399398
mdns_daemon: Option<MdnsDaemon>,
400399
}
401400

402401
impl NetworkScannerStream {
403-
pub async fn recv(self: &Arc<Self>) -> Option<ScanEntry> {
402+
pub async fn recv(self: &mut Self) -> Option<ScanEntry> {
404403
// The caller sometimes require Send, hence the Arc is necessary for socket_addr_receiver.
405-
self.result_receiver.lock().await.recv().await
404+
self.result_receiver.recv().await
406405
}
407406

408-
pub async fn recv_timeout(self: &Arc<Self>, duration: Duration) -> anyhow::Result<Option<ScanEntry>> {
409-
tokio::time::timeout(duration, self.result_receiver.lock().await.recv())
407+
pub async fn recv_timeout(self: &mut Self, duration: Duration) -> anyhow::Result<Option<ScanEntry>> {
408+
tokio::time::timeout(duration, self.result_receiver.recv())
410409
.await
411410
.context("recv_timeout timed out")
412411
}
413412

414-
pub fn stop(self: Arc<Self>) {
413+
pub fn stop(self: &Self) {
415414
self.task_manager.stop();
416415
if let Some(daemon) = &self.mdns_daemon {
417416
daemon.stop();

crates/network-scanner/src/task_utils.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ pub(crate) struct TaskExecutionContext {
5959
pub(crate) ip_receiver: Arc<Mutex<IpReceiver>>,
6060

6161
pub(crate) result_sender: ScanEntrySender,
62-
pub(crate) result_receiver: Arc<Mutex<ScanEntryReceiver>>,
6362

6463
pub(crate) ip_cache: Arc<parking_lot::RwLock<HashMap<IpAddr, Option<String>>>>,
6564

@@ -79,12 +78,11 @@ pub(crate) struct TaskExecutionRunner {
7978
}
8079

8180
impl TaskExecutionContext {
82-
pub(crate) fn new(network_scanner: NetworkScanner) -> anyhow::Result<Self> {
81+
pub(crate) fn new(network_scanner: NetworkScanner) -> anyhow::Result<(Self, ScanEntryReceiver)> {
8382
let (ip_sender, ip_receiver) = tokio::sync::mpsc::channel(5);
8483
let ip_receiver = Arc::new(Mutex::new(ip_receiver));
8584

86-
let (port_sender, port_receiver) = tokio::sync::mpsc::channel(100);
87-
let port_receiver = Arc::new(Mutex::new(port_receiver));
85+
let (result_sender, result_receiver) = tokio::sync::mpsc::channel(100);
8886

8987
let NetworkScanner {
9088
mdns_daemon,
@@ -98,16 +96,15 @@ impl TaskExecutionContext {
9896
let res = Self {
9997
ip_sender,
10098
ip_receiver,
101-
result_sender: port_sender,
102-
result_receiver: port_receiver,
99+
result_sender,
103100
ip_cache: Arc::new(parking_lot::RwLock::new(HashMap::new())),
104101
runtime,
105102
mdns_daemon,
106103
configs: ContextConfig::new(configs, &toggles, broadcast_subnet),
107104
toggles,
108105
};
109106

110-
Ok(res)
107+
Ok((res, result_receiver))
111108
}
112109
}
113110

@@ -122,11 +119,15 @@ impl TaskExecutionRunner {
122119
.spawn_no_sub_task(task(context, self.task_manager.clone()));
123120
}
124121

125-
pub(crate) fn new(scanner: NetworkScanner) -> anyhow::Result<Self> {
126-
Ok(Self {
127-
context: TaskExecutionContext::new(scanner)?,
128-
task_manager: TaskManager::new(),
129-
})
122+
pub(crate) fn new(scanner: NetworkScanner) -> anyhow::Result<(Self, ScanEntryReceiver)> {
123+
let (context, receiver) = TaskExecutionContext::new(scanner)?;
124+
Ok((
125+
Self {
126+
context,
127+
task_manager: TaskManager::new(),
128+
},
129+
receiver,
130+
))
130131
}
131132
}
132133

devolutions-gateway/src/api/net.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub async fn handle_network_scan(
4949
})?;
5050

5151
let res = ws.on_upgrade(move |mut websocket| async move {
52-
let stream = match scanner.start() {
52+
let mut stream = match scanner.start() {
5353
Ok(stream) => stream,
5454
Err(e) => {
5555
error!(error = format!("{e:#}"), "Failed to start network scan");

0 commit comments

Comments
 (0)